Coverage for src / anpr2mqtt / event_handler.py: 84%

242 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-08 17:29 +0000

1import datetime as dt 

2import json 

3import re 

4from io import BytesIO 

5from pathlib import Path 

6from typing import Any 

7 

8import PIL.ImageOps 

9import pytesseract 

10import structlog 

11import tzlocal 

12from PIL import Image 

13from watchdog.events import DirCreatedEvent, FileClosedEvent, FileCreatedEvent, RegexMatchingEventHandler 

14 

15from anpr2mqtt.api_client import DVLA, APIClient 

16from anpr2mqtt.const import ImageInfo 

17from anpr2mqtt.hass import HomeAssistantPublisher 

18from anpr2mqtt.settings import ( 

19 TARGET_TYPE_PLATE, 

20 CameraSettings, 

21 DVLASettings, 

22 EventSettings, 

23 ImageSettings, 

24 OCRFieldSettings, 

25 OCRSettings, 

26 TargetSettings, 

27 TrackerSettings, 

28) 

29 

30log = structlog.get_logger() 

31 

32 

33class EventHandler(RegexMatchingEventHandler): 

34 def __init__( 

35 self, 

36 publisher: HomeAssistantPublisher, 

37 state_topic: str, 

38 image_topic: str, 

39 event_config: EventSettings, 

40 camera: CameraSettings, 

41 target_config: TargetSettings | None, 

42 ocr_config: OCRSettings, 

43 image_config: ImageSettings, 

44 dvla_config: DVLASettings, 

45 tracker_config: TrackerSettings, 

46 ) -> None: 

47 fqre = f"{event_config.watch_path.resolve() / event_config.image_name_re.pattern}" 

48 super().__init__(regexes=[fqre], ignore_directories=True, case_sensitive=True) 

49 log.debug("Listening for images matching %s", fqre) 

50 self.publisher = publisher 

51 self.state_topic: str = state_topic 

52 self.event_config: EventSettings = event_config 

53 self.camera: CameraSettings = camera 

54 self.tracker_config: TrackerSettings = tracker_config 

55 self.target_config: TargetSettings | None = target_config 

56 self.ocr_config: OCRSettings = ocr_config 

57 self.image_config: ImageSettings = image_config 

58 self.dvla_config: DVLASettings = dvla_config 

59 if event_config.image_url_base: 

60 log.info("Images available from web server with prefix %s", event_config.image_url_base) 

61 self.image_topic: str = image_topic 

62 

63 if dvla_config.api_key and event_config.target_type == TARGET_TYPE_PLATE: 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true

64 log.info("Configured gov API lookup") 

65 self.api_client: APIClient | None = DVLA(dvla_config.api_key, cache_ttl=dvla_config.cache_ttl) 

66 else: 

67 log.info("No gov API lookup configured") 

68 self.api_client = None 

69 

70 @property 

71 def ignore_directories(self) -> bool: 

72 return True 

73 

74 def on_created(self, event: DirCreatedEvent | FileCreatedEvent) -> None: 

75 if event.event_type != "created" or event.is_directory: 

76 log.debug("on_created: skipping irrelevant event: %s", event) 

77 return 

78 log.info("New file detected: %s", event.src_path) 

79 

80 def on_closed(self, event: FileClosedEvent) -> None: 

81 if event.event_type != "closed" or event.is_directory: 

82 log.debug("on_closed: skipping irrelevant event: %s", event) 

83 return 

84 log.info("New complete file detected: %s", event.src_path) 

85 

86 file_path = Path(str(event.src_path)) 

87 if not file_path.stat() or file_path.stat().st_size == 0: 

88 log.warning("Empty image file, ignoring, at %s", file_path) 

89 return 

90 url: str | None = ( 

91 f"{self.event_config.image_url_base}/{file_path.name!s}" if self.event_config.image_url_base and file_path else None 

92 ) 

93 

94 try: 

95 image_info: ImageInfo | None = examine_file(file_path, self.event_config.image_name_re) 

96 if image_info is not None: 

97 target: str = image_info.target 

98 log.info("Examining image for %s at %s", target, file_path.absolute()) 

99 

100 image: Image.Image | None = process_image( 

101 file_path.absolute(), image_info, jpeg_opts=self.image_config.jpeg_opts, png_opts=self.image_config.png_opts 

102 ) 

103 ocr_fields: dict[str, str | None] = scan_ocr_fields(image, self.event_config, self.ocr_config) 

104 

105 classification: dict[str, Any] = self.classify_target(target) 

106 if classification["target"] != target: 106 ↛ 108line 106 didn't jump to line 108 because the condition on line 106 was never true

107 # apply corrected target name if changed 

108 target = classification["target"] 

109 

110 reg_info: list[Any] | dict[str, Any] | None = None 

111 if ( 

112 not classification.get("known") 

113 and self.api_client 

114 and image_info.target 

115 and self.event_config.target_type == TARGET_TYPE_PLATE 

116 ): 

117 reg_info = self.api_client.lookup(target) 

118 

119 visit_count: int 

120 last_seen: dt.datetime | None 

121 visit_count, last_seen = self.track_target(target, self.event_config.target_type, image_info.timestamp) 

122 

123 if classification.get("ignore"): 

124 log.info("Skipping MQTT publication for ignored %s", target) 

125 return 

126 

127 self.publisher.post_state_message( 

128 self.state_topic, 

129 target=target, 

130 event_config=self.event_config, 

131 camera=self.camera, 

132 image_info=image_info, 

133 ocr_fields=ocr_fields, 

134 classification=classification, 

135 previous_sightings=visit_count, 

136 last_sighting=last_seen, 

137 url=url, 

138 reg_info=reg_info, 

139 file_path=file_path, 

140 ) 

141 if image: 141 ↛ exitline 141 didn't return from function 'on_closed' because the condition on line 141 was always true

142 img_format = image_info.ext.upper() if image_info.ext else None 

143 img_format = "JPEG" if img_format == "JPG" else img_format 

144 if img_format: 144 ↛ 147line 144 didn't jump to line 147 because the condition on line 144 was always true

145 self.publisher.post_image_message(self.image_topic, image, img_format) 

146 else: 

147 log.warn("Unknown image format for %s", file_path) 

148 else: 

149 ocr_fields = scan_ocr_fields(None, self.event_config, self.ocr_config) 

150 

151 self.publisher.post_state_message( 

152 self.state_topic, 

153 event_config=self.event_config, 

154 camera=self.camera, 

155 ocr_fields=ocr_fields, 

156 target=None, 

157 url=url, 

158 file_path=file_path, 

159 ) 

160 

161 except Exception as e: 

162 log.error("Failed to parse file event %s: %s", event, e, exc_info=1) 

163 self.publisher.post_state_message( 

164 self.state_topic, 

165 event_config=self.event_config, 

166 camera=self.camera, 

167 target=None, 

168 error=str(e), 

169 file_path=file_path, 

170 ) 

171 

172 def track_target(self, target: str, target_type: str, event_dt: dt.datetime | None) -> tuple[int, dt.datetime | None]: 

173 target = target or "UNKNOWN" 

174 target_type_path = self.tracker_config.data_dir / target_type 

175 target_type_path.mkdir(exist_ok=True) 

176 target_file = target_type_path / f"{target}.json" 

177 last_visit: dt.datetime | None = None 

178 previous_visits: int = 0 

179 try: 

180 sightings: list[str] = [] 

181 if target_file.exists(): 

182 with target_file.open("r") as f: 

183 sightings = json.load(f) 

184 previous_visits = len(sightings) 

185 if previous_visits > 0: 185 ↛ 188line 185 didn't jump to line 188

186 last_visit = dt.datetime.fromisoformat(sightings[-1]) 

187 

188 sightings.append(event_dt.isoformat() if event_dt else dt.datetime.now(tz=tzlocal.get_localzone()).isoformat()) 

189 with target_file.open("w") as f: 

190 json.dump(sightings, f) 

191 except Exception as e: 

192 log.exception("Failed to track sightings for %s:%s", target, e) 

193 return previous_visits, last_visit 

194 

195 def classify_target(self, target: str | None) -> dict[str, Any]: 

196 results = { 

197 "orig_target": target, 

198 "target": target, 

199 "ignore": False, 

200 "known": False, 

201 "dangerous": False, 

202 "priority": "high", 

203 "description": "Unknown vehicle", 

204 } 

205 if not target or self.target_config is None: 

206 # empty dict to make home assistant template logic easier 

207 return results 

208 for corrected_target, patterns in self.target_config.correction.items(): 

209 if any(re.match(pat, target) for pat in patterns): 209 ↛ 208line 209 didn't jump to line 208 because the condition on line 209 was always true

210 results["target"] = corrected_target 

211 target = corrected_target 

212 log.info("Corrected target %s -> %s", results["orig_target"], target) 

213 break 

214 for pat in self.target_config.ignore: 

215 if re.match(pat, target): 215 ↛ 214line 215 didn't jump to line 214 because the condition on line 215 was always true

216 log.info("Ignoring %s matching ignore pattern %s", target, pat) 

217 results["ignore"] = True 

218 results["priority"] = "low" 

219 results["description"] = "Ignored" 

220 break 

221 if target in self.target_config.dangerous: 

222 log.warning("%s known as potential danger", target) 

223 results["dangerous"] = True 

224 results["priority"] = "critical" 

225 results["description"] = self.target_config.dangerous[target] or "Potential threat" 

226 if target in self.target_config.known: 

227 log.warning("%s known to household", target) 

228 results["known"] = True 

229 results["priority"] = "medium" 

230 results["description"] = self.target_config.known[target] or "Known" 

231 

232 return results 

233 

234 

235def process_image( 

236 file_path: Path, image_info: ImageInfo, jpeg_opts: dict[str, Any], png_opts: dict[str, Any] 

237) -> Image.Image | None: 

238 try: 

239 image: Image.Image | None = Image.open(file_path.absolute()) 

240 if image is None: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 log.error("Unable to open image at %s", file_path.absolute()) 

242 return None 

243 image_format: str | None = image.format.lower() if image.format else image_info.ext 

244 img_args: dict[str, Any] | None = None 

245 

246 if image_format in ("jpg", "jpeg") and jpeg_opts: 

247 img_args = jpeg_opts 

248 elif image_format == "png" and png_opts: 

249 img_args = png_opts 

250 if img_args: 

251 log.debug("Rewriting image to process %s", img_args) 

252 buffer = BytesIO() 

253 image.save(buffer, image_format, **img_args) 

254 size = buffer.getbuffer().nbytes 

255 if size != image_info.size: 255 ↛ 258line 255 didn't jump to line 258 because the condition on line 255 was always true

256 log.info("Image size %s -> %s", image_info.size, size) 

257 image_info.size = size 

258 image = Image.open(buffer) 

259 log.info("Resaved image with %s", img_args) 

260 return image 

261 except Exception as e: 

262 log.warn("Unable to load image at %s: %s", file_path, e) 

263 return None 

264 

265 

266def examine_file(file_path: Path, image_name_re: re.Pattern) -> ImageInfo | None: 

267 try: 

268 match = re.match(image_name_re, file_path.name) 

269 if match: 

270 groups = match.groupdict() 

271 size: int = file_path.stat().st_size 

272 raw_date = match.group("dt") 

273 year, month, day = map(int, (raw_date[:4], raw_date[4:6], raw_date[6:8])) 

274 hours, minutes, seconds, microseconds = map( 

275 int, (raw_date[8:10], raw_date[10:12], raw_date[12:14], raw_date[14:17]) 

276 ) 

277 timestamp = dt.datetime(year, month, day, hours, minutes, seconds, microseconds, tzinfo=tzlocal.get_localzone()) 

278 file_ext: str | None = groups.get("ext") 

279 event: str | None = groups.get("event") 

280 target: str | None = groups.get("target") 

281 if target is None: 

282 log.warning("No target found for match: %s", groups) 

283 return None 

284 if file_ext is None: 

285 file_parts = file_path.name.rsplit(".", 1) 

286 if file_parts: 286 ↛ 288line 286 didn't jump to line 288 because the condition on line 286 was always true

287 file_ext = file_parts[0] 

288 return ImageInfo(target=target, event=event, timestamp=timestamp, ext=file_ext, size=size) 

289 except Exception as e: 

290 log.warning("Unable to parse %s: %s", file_path, e) 

291 return None 

292 

293 

294def scan_ocr_fields(image: Image.Image | None, event_config: EventSettings, ocr_config: OCRSettings) -> dict[str, str | None]: 

295 ocr_field_defs: list[OCRFieldSettings] = [ 

296 ocr_config.fields[k] for k in event_config.ocr_field_ids if k in ocr_config.fields 

297 ] 

298 results: dict[str, str | None] = {f.label: "Unknown" for f in ocr_field_defs} 

299 log.debug("OCR default values: %s", results) 

300 

301 if image is None: 

302 log.debug("OCR Empty image") 

303 return results 

304 if not ocr_field_defs: 

305 log.debug("OCR No fields to scan") 

306 return results 

307 

308 try: 

309 width, height = image.size 

310 except Exception as e: 

311 log.error("OCR fail loading image:%s", e) 

312 results["IMAGE_ERROR"] = str(e) 

313 return results 

314 

315 """ 

316 The Python Imaging Library uses a Cartesian pixel coordinate system, with (0,0) in the upper left corner. 

317 Note that the coordinates refer to the implied pixel corners; the centre of a pixel addressed as (0, 0) 

318 actually lies at (0.5, 0.5). 

319 

320 Coordinates are usually passed to the library as 2-tuples (x, y). 

321 Rectangles are represented as 4-tuples, (x1, y1, x2, y2), with the upper left corner given first. 

322 """ 

323 for field_settings in ocr_field_defs: 

324 try: 

325 if field_settings.crop: 325 ↛ 334line 325 didn't jump to line 334 because the condition on line 325 was always true

326 x1: int = field_settings.crop.x # top-left x 

327 y1: int = height - (field_settings.crop.y + field_settings.crop.h) # top-left y [ 0 == top of image] 

328 x2: int = x1 + field_settings.crop.w # bottom-right x 

329 y2: int = height - field_settings.crop.y # bottom-right y [ 0 == top of image] 

330 log.debug("Cropping %s by %s image using PIL to %s", height, width, (x1, y1, x2, y2)) 

331 # region = im.crop((850, height - 30, 1500, height)) "850,30,650,30" 

332 region: Image.Image = image.crop((x1, y1, x2, y2)) 

333 else: 

334 log.debug("No image crop") 

335 region = image 

336 if field_settings.invert: 336 ↛ 338line 336 didn't jump to line 338 because the condition on line 336 was always true

337 region = PIL.ImageOps.invert(region) 

338 txt = pytesseract.image_to_string(region, config=r"") 

339 if txt: 339 ↛ 343line 339 didn't jump to line 343 because the condition on line 339 was always true

340 log.debug("Tesseract found text %s", txt) 

341 parsed: list[str] = txt.split(":", 1) 

342 else: 

343 log.debug("Tesseract found nothing") 

344 parsed = [] 

345 

346 if len(parsed) > 1: 346 ↛ 364line 346 didn't jump to line 364 because the condition on line 346 was always true

347 candidate: str = parsed[1].strip() 

348 if field_settings.correction and candidate not in field_settings.correction: 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true

349 for correct_to, correct_patterns in field_settings.correction.items(): 

350 if any(re.match(pat, candidate) for pat in correct_patterns): 

351 log.debug("Auto-correcting %s from %s to %s", field_settings.label, candidate, correct_to) 

352 candidate = correct_to 

353 if candidate and field_settings.values: 

354 for v in field_settings.values: 

355 if candidate.upper() == v.upper() and candidate != v: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 log.debug("OCR case correcting field %s from %s to %s", field_settings.label, candidate, v) 

357 candidate = v 

358 if field_settings.values is None or candidate in field_settings.values: 358 ↛ 361line 358 didn't jump to line 361 because the condition on line 358 was always true

359 results[field_settings.label] = candidate 

360 else: 

361 log.warning("Unknown value %s for OCR field %s", candidate, field_settings.label) 

362 results[field_settings.label] = "Unknown" 

363 else: 

364 log.warning("Unparsable field %s: %s", field_settings.label, txt) 

365 

366 except Exception as e: 

367 log.error("OCR fail on image:%s", e, exc_info=1) 

368 results["OCR_ERROR"] = f"field:{field_settings.label}, error:{e}" 

369 

370 return results