Coverage for src / anpr2mqtt / event_handler.py: 67%

242 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 15:35 +0000

1import datetime as dt 

2import json 

3import re 

4from io import BytesIO 

5from pathlib import Path 

6from typing import Any 

7 

8import paho.mqtt.client as mqtt 

9import PIL.ImageOps 

10import pytesseract 

11import structlog 

12import tzlocal 

13from PIL import Image 

14from watchdog.events import DirCreatedEvent, FileClosedEvent, FileCreatedEvent, RegexMatchingEventHandler 

15 

16from anpr2mqtt.api_client import DVLA, APIClient 

17from anpr2mqtt.const import ImageInfo 

18from anpr2mqtt.hass import post_image_message, post_state_message 

19from anpr2mqtt.settings import ( 

20 TARGET_TYPE_PLATE, 

21 DVLASettings, 

22 EventSettings, 

23 ImageSettings, 

24 OCRFieldSettings, 

25 OCRSettings, 

26 TargetSettings, 

27 TrackerSettings, 

28) 

29 

30log = structlog.get_logger() 

31 

32 

33class EventHandler(RegexMatchingEventHandler): 

34 def __init__( 

35 self, 

36 client: mqtt.Client, 

37 state_topic: str, 

38 image_topic: str, 

39 event_config: EventSettings, 

40 target_config: TargetSettings | None, 

41 ocr_config: OCRSettings, 

42 image_config: ImageSettings, 

43 dvla_config: DVLASettings, 

44 tracker_config: TrackerSettings, 

45 ) -> None: 

46 fqre = f"{event_config.watch_path.resolve() / event_config.image_name_re.pattern}" 

47 super().__init__(regexes=[fqre], ignore_directories=True, case_sensitive=True) 

48 log.debug("Listening for images matching %s", fqre) 

49 self.client: mqtt.Client = client 

50 self.state_topic: str = state_topic 

51 self.event_config: EventSettings = event_config 

52 self.tracker_config: TrackerSettings = tracker_config 

53 self.target_config: TargetSettings | None = target_config 

54 self.ocr_config: OCRSettings = ocr_config 

55 self.image_config: ImageSettings = image_config 

56 self.dvla_config: DVLASettings = dvla_config 

57 if event_config.image_url_base: 57 ↛ 59line 57 didn't jump to line 59 because the condition on line 57 was always true

58 log.info("Images available from web server with prefix %s", event_config.image_url_base) 

59 self.image_topic: str | None = image_topic 

60 

61 if dvla_config.api_key and event_config.target_type == TARGET_TYPE_PLATE: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 log.info("Configured gov API lookup") 

63 self.api_client: APIClient | None = DVLA(dvla_config.api_key, cache_ttl=dvla_config.cache_ttl) 

64 else: 

65 log.info("No gov API lookup configured") 

66 self.api_client = None 

67 

68 @property 

69 def ignore_directories(self) -> bool: 

70 return True 

71 

72 def on_created(self, event: DirCreatedEvent | FileCreatedEvent) -> None: 

73 if event.event_type != "created" or event.is_directory: 

74 log.debug("on_created: skipping irrelevant event: %s", event) 

75 return 

76 log.info("New file detected: %s", event.src_path) 

77 

78 def on_closed(self, event: FileClosedEvent) -> None: 

79 if event.event_type != "closed" or event.is_directory: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 log.debug("on_closed: skipping irrelevant event: %s", event) 

81 return 

82 log.info("New complete file detected: %s", event.src_path) 

83 

84 file_path = Path(str(event.src_path)) 

85 if not file_path.stat() or file_path.stat().st_size == 0: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 log.warning("Empty image file, ignoring, at %s", file_path) 

87 return 

88 url: str | None = ( 

89 f"{self.event_config.image_url_base}/{file_path.name!s}" if self.event_config.image_url_base and file_path else None 

90 ) 

91 

92 try: 

93 image_info: ImageInfo | None = examine_file(file_path, self.event_config.image_name_re) 

94 if image_info is not None: 

95 target: str = image_info.target 

96 log.info("Examining image for %s at %s", target, file_path.absolute()) 

97 

98 image: Image.Image | None = process_image( 

99 file_path.absolute(), image_info, jpeg_opts=self.image_config.jpeg_opts, png_opts=self.image_config.png_opts 

100 ) 

101 ocr_fields: dict[str, str | None] = scan_ocr_fields(image, self.event_config, self.ocr_config) 

102 

103 classification: dict[str, Any] = self.classify_target(target) 

104 if classification["target"] != target: 104 ↛ 106line 104 didn't jump to line 106 because the condition on line 104 was never true

105 # apply corrected target name if changed 

106 target = classification["target"] 

107 

108 reg_info: list[Any] | dict[str, Any] | None = None 

109 if ( 109 ↛ 115line 109 didn't jump to line 115 because the condition on line 109 was never true

110 not classification.get("known") 

111 and self.api_client 

112 and image_info.target 

113 and self.event_config.target_type == TARGET_TYPE_PLATE 

114 ): 

115 reg_info = self.api_client.lookup(target) 

116 

117 visit_count: int 

118 last_seen: dt.datetime | None 

119 visit_count, last_seen = self.track_target(target, self.event_config.target_type, image_info.timestamp) 

120 

121 if classification.get("ignore"): 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 log.info("Skipping MQTT publication for ignored %s", target) 

123 return 

124 

125 post_state_message( 

126 self.client, 

127 self.state_topic, 

128 target=target, 

129 event_config=self.event_config, 

130 image_info=image_info, 

131 ocr_fields=ocr_fields, 

132 classification=classification, 

133 previous_sightings=visit_count, 

134 last_sighting=last_seen, 

135 url=url, 

136 reg_info=reg_info, 

137 file_path=file_path, 

138 ) 

139 if self.image_topic and image: 139 ↛ exitline 139 didn't return from function 'on_closed' because the condition on line 139 was always true

140 img_format = image_info.ext.upper() if image_info.ext else None 

141 img_format = "JPEG" if img_format == "JPG" else img_format 

142 if img_format: 142 ↛ 145line 142 didn't jump to line 145 because the condition on line 142 was always true

143 post_image_message(self.client, self.image_topic, image, img_format) 

144 else: 

145 log.warn("Unknown image format for %s", file_path) 

146 else: 

147 ocr_fields = scan_ocr_fields(None, self.event_config, self.ocr_config) 

148 

149 post_state_message( 

150 self.client, 

151 self.state_topic, 

152 event_config=self.event_config, 

153 ocr_fields=ocr_fields, 

154 target=None, 

155 url=url, 

156 file_path=file_path, 

157 ) 

158 

159 except Exception as e: 

160 log.error("Failed to parse file event %s: %s", event, e, exc_info=1) 

161 post_state_message( 

162 self.client, 

163 self.state_topic, 

164 event_config=self.event_config, 

165 target=None, 

166 ocr_fields={}, 

167 error=str(e), 

168 file_path=file_path, 

169 ) 

170 

171 def track_target(self, target: str, target_type: str, event_dt: dt.datetime | None) -> tuple[int, dt.datetime | None]: 

172 target = target or "UNKNOWN" 

173 target_type_path = self.tracker_config.data_dir / target_type 

174 target_type_path.mkdir(exist_ok=True) 

175 target_file = target_type_path / f"{target}.json" 

176 last_visit: dt.datetime | None = None 

177 previous_visits: int = 0 

178 try: 

179 sightings: list[str] = [] 

180 if target_file.exists(): 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true

181 with target_file.open("r") as f: 

182 sightings = json.load(f) 

183 previous_visits = len(sightings) 

184 if previous_visits > 0: 

185 last_visit = dt.datetime.fromisoformat(sightings[-1]) 

186 

187 sightings.append(event_dt.isoformat() if event_dt else dt.datetime.now(tz=tzlocal.get_localzone()).isoformat()) 

188 with target_file.open("w") as f: 

189 json.dump(sightings, f) 

190 except Exception as e: 

191 log.exception("Failed to track sightings for %s:%s", target, e) 

192 return previous_visits, last_visit 

193 

194 def classify_target(self, target: str | None) -> dict[str, Any]: 

195 results = { 

196 "orig_target": target, 

197 "target": target, 

198 "ignore": False, 

199 "known": False, 

200 "dangerous": False, 

201 "priority": "high", 

202 "description": "Unknown vehicle", 

203 } 

204 if not target or self.target_config is None: 204 ↛ 206line 204 didn't jump to line 206 because the condition on line 204 was never true

205 # empty dict to make home assistant template logic easier 

206 return results 

207 for corrected_target, patterns in self.target_config.correction.items(): 

208 if any(re.match(pat, target) for pat in patterns): 208 ↛ 207line 208 didn't jump to line 207 because the condition on line 208 was always true

209 results["target"] = corrected_target 

210 target = corrected_target 

211 log.info("Corrected target %s -> %s", results["orig_target"], target) 

212 break 

213 for pat in self.target_config.ignore: 

214 if re.match(pat, target): 214 ↛ 213line 214 didn't jump to line 213 because the condition on line 214 was always true

215 log.info("Ignoring %s matching ignore pattern %s", target, pat) 

216 results["ignore"] = True 

217 results["priority"] = "low" 

218 results["description"] = "Ignored" 

219 break 

220 if target in self.target_config.dangerous: 

221 log.warning("%s known as potential danger", target) 

222 results["dangerous"] = True 

223 results["priority"] = "critical" 

224 results["description"] = self.target_config.dangerous[target] or "Potential threat" 

225 if target in self.target_config.known: 

226 log.warning("%s known to household", target) 

227 results["known"] = True 

228 results["priority"] = "medium" 

229 results["description"] = self.target_config.known[target] or "Known" 

230 

231 return results 

232 

233 

234def process_image( 

235 file_path: Path, image_info: ImageInfo, jpeg_opts: dict[str, Any], png_opts: dict[str, Any] 

236) -> Image.Image | None: 

237 try: 

238 image: Image.Image | None = Image.open(file_path.absolute()) 

239 if image is None: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 log.error("Unable to open image at %s", file_path.absolute()) 

241 return None 

242 image_format: str | None = image.format.lower() if image.format else image_info.ext 

243 img_args: dict[str, Any] | None = None 

244 

245 if image_format in ("jpg", "jpeg") and jpeg_opts: 245 ↛ 247line 245 didn't jump to line 247 because the condition on line 245 was always true

246 img_args = jpeg_opts 

247 elif image_format == "png" and png_opts: 

248 img_args = png_opts 

249 if img_args: 249 ↛ 259line 249 didn't jump to line 259 because the condition on line 249 was always true

250 log.debug("Rewriting image to process %s", img_args) 

251 buffer = BytesIO() 

252 image.save(buffer, image_format, **img_args) 

253 size = buffer.getbuffer().nbytes 

254 if size != image_info.size: 254 ↛ 257line 254 didn't jump to line 257 because the condition on line 254 was always true

255 log.info("Image size %s -> %s", image_info.size, size) 

256 image_info.size = size 

257 image = Image.open(buffer) 

258 log.info("Resaved image with %s", img_args) 

259 return image 

260 except Exception as e: 

261 log.warn("Unable to load image at %s: %s", file_path, e) 

262 return None 

263 

264 

265def examine_file(file_path: Path, image_name_re: re.Pattern) -> ImageInfo | None: 

266 try: 

267 match = re.match(image_name_re, file_path.name) 

268 if match: 

269 groups = match.groupdict() 

270 size: int = file_path.stat().st_size 

271 raw_date = match.group("dt") 

272 year, month, day = map(int, (raw_date[:4], raw_date[4:6], raw_date[6:8])) 

273 hours, minutes, seconds, microseconds = map( 

274 int, (raw_date[8:10], raw_date[10:12], raw_date[12:14], raw_date[14:17]) 

275 ) 

276 timestamp = dt.datetime(year, month, day, hours, minutes, seconds, microseconds, tzinfo=tzlocal.get_localzone()) 

277 file_ext: str | None = groups.get("ext") 

278 event: str | None = groups.get("event") 

279 target: str | None = groups.get("target") 

280 if target is None: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true

281 log.warning("No target found for match: %s", groups) 

282 return None 

283 if file_ext is None: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true

284 file_parts = file_path.name.rsplit(".", 1) 

285 if file_parts: 

286 file_ext = file_parts[0] 

287 return ImageInfo(target=target, event=event, timestamp=timestamp, ext=file_ext, size=size) 

288 except Exception as e: 

289 log.warning("Unable to parse %s: %s", file_path, e) 

290 return None 

291 

292 

293def scan_ocr_fields(image: Image.Image | None, event_config: EventSettings, ocr_config: OCRSettings) -> dict[str, str | None]: 

294 ocr_field_defs: list[OCRFieldSettings] = [ 

295 ocr_config.fields[k] for k in event_config.ocr_field_ids if k in ocr_config.fields 

296 ] 

297 results: dict[str, str | None] = {f.label: "Unknown" for f in ocr_field_defs} 

298 log.debug("OCR default values: %s", results) 

299 

300 if image is None: 

301 log.debug("OCR Empty image") 

302 return results 

303 if not ocr_field_defs: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true

304 log.debug("OCR No fields to scan") 

305 return results 

306 

307 try: 

308 width, height = image.size 

309 except Exception as e: 

310 log.error("OCR fail loading image:%s", e) 

311 results["IMAGE_ERROR"] = str(e) 

312 return results 

313 

314 """ 

315 The Python Imaging Library uses a Cartesian pixel coordinate system, with (0,0) in the upper left corner. 

316 Note that the coordinates refer to the implied pixel corners; the centre of a pixel addressed as (0, 0) 

317 actually lies at (0.5, 0.5). 

318 

319 Coordinates are usually passed to the library as 2-tuples (x, y). 

320 Rectangles are represented as 4-tuples, (x1, y1, x2, y2), with the upper left corner given first. 

321 """ 

322 for field_settings in ocr_field_defs: 

323 try: 

324 if field_settings.crop: 324 ↛ 333line 324 didn't jump to line 333 because the condition on line 324 was always true

325 x1: int = field_settings.crop.x # top-left x 

326 y1: int = height - (field_settings.crop.y + field_settings.crop.h) # top-left y [ 0 == top of image] 

327 x2: int = x1 + field_settings.crop.w # bottom-right x 

328 y2: int = height - field_settings.crop.y # bottom-right y [ 0 == top of image] 

329 log.debug("Cropping %s by %s image using PIL to %s", height, width, (x1, y1, x2, y2)) 

330 # region = im.crop((850, height - 30, 1500, height)) "850,30,650,30" 

331 region: Image.Image = image.crop((x1, y1, x2, y2)) 

332 else: 

333 log.debug("No image crop") 

334 region = image 

335 if field_settings.invert: 335 ↛ 337line 335 didn't jump to line 337 because the condition on line 335 was always true

336 region = PIL.ImageOps.invert(region) 

337 txt = pytesseract.image_to_string(region, config=r"") 

338 if txt: 338 ↛ 342line 338 didn't jump to line 342 because the condition on line 338 was always true

339 log.debug("Tesseract found text %s", txt) 

340 parsed: list[str] = txt.split(":", 1) 

341 else: 

342 log.debug("Tesseract found nothing") 

343 parsed = [] 

344 

345 if len(parsed) > 1: 345 ↛ 363line 345 didn't jump to line 363 because the condition on line 345 was always true

346 candidate: str = parsed[1].strip() 

347 if field_settings.correction and candidate not in field_settings.correction: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true

348 for correct_to, correct_patterns in field_settings.correction.items(): 

349 if any(re.match(pat, candidate) for pat in correct_patterns): 

350 log.debug("Auto-correcting %s from %s to %s", field_settings.label, candidate, correct_to) 

351 candidate = correct_to 

352 if candidate and field_settings.values: 352 ↛ 353line 352 didn't jump to line 353 because the condition on line 352 was never true

353 for v in field_settings.values: 

354 if candidate.upper() == v.upper() and candidate != v: 

355 log.debug("OCR case correcting field %s from %s to %s", field_settings.label, candidate, v) 

356 candidate = v 

357 if field_settings.values is None or candidate in field_settings.values: 357 ↛ 360line 357 didn't jump to line 360 because the condition on line 357 was always true

358 results[field_settings.label] = candidate 

359 else: 

360 log.warning("Unknown value %s for OCR field %s", candidate, field_settings.label) 

361 results[field_settings.label] = "Unknown" 

362 else: 

363 log.warning("Unparsable field %s: %s", field_settings.label, txt) 

364 

365 except Exception as e: 

366 log.error("OCR fail on image:%s", e, exc_info=1) 

367 results["OCR_ERROR"] = f"field:{field_settings.label}, error:{e}" 

368 

369 return results