Coverage for src / anpr2mqtt / event_handler.py: 94%

224 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-30 16:07 +0000

1import datetime as dt 

2import re 

3from io import BytesIO 

4from pathlib import Path 

5from typing import TYPE_CHECKING, Any 

6 

7import PIL.ImageOps 

8import pytesseract 

9import structlog 

10import tzlocal 

11from PIL import Image 

12from watchdog.events import DirCreatedEvent, FileClosedEvent, FileCreatedEvent, RegexMatchingEventHandler 

13 

14from anpr2mqtt.const import ImageInfo 

15from anpr2mqtt.handler_common import AutoclearTimer, CameraGatekeeper, build_dvla_client, correct_against_good_read 

16from anpr2mqtt.hass import HomeAssistantPublisher 

17from anpr2mqtt.settings import ( 

18 TARGET_TYPE_PLATE, 

19 CameraSettings, 

20 DVLASettings, 

21 EventSettings, 

22 ImageSettings, 

23 OCRFieldSettings, 

24 OCRSettings, 

25) 

26from anpr2mqtt.tracker import Sighting, Tracker 

27 

28if TYPE_CHECKING: 

29 from anpr2mqtt.api_client import APIClient 

30 

31log = structlog.get_logger() 

32 

33 

34class EventHandler(RegexMatchingEventHandler): 

35 def __init__( 

36 self, 

37 publisher: HomeAssistantPublisher, 

38 state_topic: str, 

39 image_topic: str, 

40 event_config: EventSettings, 

41 camera: CameraSettings, 

42 ocr_config: OCRSettings, 

43 image_config: ImageSettings, 

44 dvla_config: DVLASettings, 

45 tracker: Tracker, 

46 mqtt_topic_root: str = "anpr2mqtt", 

47 ) -> None: 

48 fqre = f"{event_config.watch_path.resolve() / event_config.image_name_re.pattern}" 

49 super().__init__(regexes=[fqre], ignore_directories=True, case_sensitive=True) 

50 log.debug("Listening for images matching %s", fqre) 

51 self.publisher = publisher 

52 self.state_topic: str = state_topic 

53 self.event_config: EventSettings = event_config 

54 self.camera: CameraSettings = camera 

55 self.tracker: Tracker = tracker 

56 self.ocr_config: OCRSettings = ocr_config 

57 self.image_config: ImageSettings = image_config 

58 self.dvla_config: DVLASettings = dvla_config 

59 if event_config.image_url_base: 

60 log.info("Images available from web server with prefix %s", event_config.image_url_base) 

61 self.image_topic: str = image_topic 

62 self.mqtt_topic_root: str = mqtt_topic_root 

63 self.api_client: APIClient | None = build_dvla_client(dvla_config, event_config.target_type) 

64 if self.api_client: 

65 log.info("Configured gov API lookup, cache type %s, ttl %s", dvla_config.cache_type, dvla_config.cache_ttl) 

66 else: 

67 log.info("No gov API lookup configured") 

68 

69 self._autoclear_timer = AutoclearTimer() 

70 self._camera_gate = CameraGatekeeper() 

71 self._last_good_plate: tuple[str, dt.datetime] | None = None 

72 

73 @property 

74 def ignore_directories(self) -> bool: 

75 return True 

76 

77 def on_created(self, event: DirCreatedEvent | FileCreatedEvent) -> None: 

78 if event.event_type != "created" or event.is_directory: 

79 log.debug("on_created: skipping irrelevant event: %s", event) 

80 return 

81 log.info("New file detected: %s", event.src_path) 

82 

83 def on_closed(self, event: FileClosedEvent) -> None: 

84 if event.event_type != "closed" or event.is_directory: 

85 log.debug("on_closed: skipping irrelevant event: %s", event) 

86 return 

87 log.info("New complete file detected: %s", event.src_path) 

88 

89 file_path = Path(str(event.src_path)) 

90 if not file_path.stat() or file_path.stat().st_size == 0: 

91 log.warning("Empty image file, ignoring, at %s", file_path) 

92 return 

93 url: str | None = ( 

94 f"{self.event_config.image_url_base}/{file_path.name!s}" if self.event_config.image_url_base and file_path else None 

95 ) 

96 if not url: 

97 log.warning( 

98 "No URL available for image, URL base %s, file path name %s", 

99 self.event_config.image_url_base, 

100 file_path.name if file_path else None, 

101 ) 

102 

103 try: 

104 image_info: ImageInfo | None = examine_file(file_path, self.event_config.image_name_re) 

105 if image_info is not None and image_info.target is not None: 

106 target_id: str = image_info.target 

107 log.info("Examining image for %s at %s", target_id, file_path.absolute()) 

108 

109 target_id = correct_against_good_read( 

110 target_id, 

111 self._last_good_plate, 

112 self.event_config.good_read_ttl, 

113 self.event_config.good_read_tolerance, 

114 self.tracker.normalizer, 

115 ) 

116 

117 image: Image.Image | None = process_image( 

118 file_path.absolute(), image_info, jpeg_opts=self.image_config.jpeg_opts, png_opts=self.image_config.png_opts 

119 ) 

120 ocr_fields: dict[str, str | None] = scan_ocr_fields(image, self.event_config, self.ocr_config) 

121 

122 sighting: Sighting = self.tracker.find(target_id) 

123 

124 reg_info: dict[str, Any] | None = None 

125 if ( 

126 sighting.target.lookup 

127 and self.api_client 

128 and image_info.target 

129 and self.event_config.target_type == TARGET_TYPE_PLATE 

130 ): 

131 api_info: dict[str, Any] = self.api_client.lookup(sighting.target.id) 

132 if api_info.get("success"): 132 ↛ 138line 132 didn't jump to line 138 because the condition on line 132 was always true

133 reg_info = api_info.get("plate") 

134 if sighting.target.description is None and api_info and api_info.get("description"): 134 ↛ 136line 134 didn't jump to line 136 because the condition on line 134 was always true

135 sighting.target.description = api_info["description"] 

136 self._last_good_plate = (sighting.target.id, dt.datetime.now(dt.UTC)) 

137 

138 time_analysis: dict[str, Any] = self.tracker.record( 

139 sighting.target.id, self.event_config.target_type, image_info.timestamp 

140 ) 

141 

142 if not time_analysis.get("is_new_visit", True): 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 log.info("Skipping duplicate filesystem visit for %s (within gap window)", sighting.target.id) 

144 return 

145 

146 if not self._camera_gate.allow( 146 ↛ 149line 146 didn't jump to line 149 because the condition on line 146 was never true

147 image_info.timestamp, reg_info is not None, self.tracker.tracker_config.min_visit_gap_seconds 

148 ): 

149 log.info("Skipping cross-plate duplicate for %s (plate=%s)", self.event_config.camera, target_id) 

150 return 

151 

152 entity_id: str | None = sighting.target.entity_id 

153 if entity_id: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true

154 target_state_topic = f"{self.mqtt_topic_root}/targets/{self.event_config.target_type}/{entity_id}" 

155 self.publisher.publish_target_state( 

156 state_topic=target_state_topic, 

157 description=sighting.target.description, 

158 time_analysis=time_analysis, 

159 ) 

160 

161 if sighting.ignore: 

162 log.info("Skipping MQTT publication for ignored %s", sighting.target.id) 

163 return 

164 

165 self.publisher.post_state_message( 

166 self.state_topic, 

167 sighting=sighting, 

168 event_config=self.event_config, 

169 camera=self.camera, 

170 image_info=image_info, 

171 extra_info=ocr_fields, 

172 time_analysis=time_analysis, 

173 url=url, 

174 reg_info=reg_info, 

175 file_path=file_path, 

176 source="filesystem", 

177 ) 

178 if image: 

179 img_format = image_info.ext.upper() if image_info.ext else None 

180 img_format = "JPEG" if img_format == "JPG" else img_format 

181 if img_format: 

182 self.publisher.post_image_message(self.image_topic, image, img_format) 

183 else: 

184 log.warn("Unknown image format for %s", file_path) 

185 self._schedule_autoclear() 

186 else: 

187 log.warning("No image found for %s", file_path) 

188 ocr_fields = scan_ocr_fields(None, self.event_config, self.ocr_config) 

189 

190 self.publisher.post_state_message( 

191 self.state_topic, 

192 event_config=self.event_config, 

193 camera=self.camera, 

194 extra_info=ocr_fields, 

195 sighting=None, 

196 url=url, 

197 file_path=file_path, 

198 source="filesystem", 

199 ) 

200 self._schedule_autoclear() 

201 

202 except Exception as e: 

203 log.error("Failed to parse file event %s: %s", event, e, exc_info=1) 

204 self.publisher.post_state_message( 

205 self.state_topic, 

206 event_config=self.event_config, 

207 camera=self.camera, 

208 sighting=None, 

209 error=str(e), 

210 file_path=file_path, 

211 ) 

212 

213 def _schedule_autoclear(self) -> None: 

214 self._autoclear_timer.schedule( 

215 self.event_config, 

216 self._do_autoclear, 

217 f"{self.event_config.event}/{self.event_config.camera}", 

218 ) 

219 

220 def _do_autoclear(self) -> None: 

221 autoclear = self.event_config.autoclear 

222 log.info("Autoclear firing for %s/%s", self.event_config.event, self.event_config.camera) 

223 if autoclear.state: 

224 self.publisher.post_state_message( 

225 self.state_topic, sighting=None, event_config=self.event_config, camera=self.camera 

226 ) 

227 if autoclear.image: 

228 self.publisher.post_image_message(self.image_topic, image=None) 

229 

230 

231def process_image( 

232 file_path: Path, image_info: ImageInfo, jpeg_opts: dict[str, Any], png_opts: dict[str, Any] 

233) -> Image.Image | None: 

234 try: 

235 image: Image.Image | None = Image.open(file_path.absolute()) 

236 if image is None: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 log.error("Unable to open image at %s", file_path.absolute()) 

238 return None 

239 image_format: str | None = image.format.lower() if image.format else image_info.ext 

240 img_args: dict[str, Any] | None = None 

241 

242 if image_format in ("jpg", "jpeg") and jpeg_opts: 

243 img_args = jpeg_opts 

244 elif image_format == "png" and png_opts: 

245 img_args = png_opts 

246 else: 

247 log.info("Unknown image format %s, no tuning available", image_format) 

248 if img_args: 

249 log.debug("Rewriting image to process %s", img_args) 

250 buffer = BytesIO() 

251 image.save(buffer, image_format, **img_args) 

252 size = buffer.getbuffer().nbytes 

253 if image_info.size and size != image_info.size: 

254 log.info( 

255 "Image size %s -> %s, %0.2f%% saving", 

256 image_info.size, 

257 size, 

258 (image_info.size / (image_info.size - size)) * 100, 

259 ) 

260 image_info.size = size 

261 image = Image.open(buffer) 

262 log.debug("Resaved image with %s", img_args) 

263 return image 

264 except Exception as e: 

265 log.warn("Unable to load image at %s: %s", file_path, e) 

266 return None 

267 

268 

269def examine_file(file_path: Path, image_name_re: re.Pattern[str]) -> ImageInfo | None: 

270 try: 

271 match = re.match(image_name_re, file_path.name) 

272 if match: 

273 groups = match.groupdict() 

274 size: int = file_path.stat().st_size 

275 raw_date = match.group("dt") 

276 year, month, day = map(int, (raw_date[:4], raw_date[4:6], raw_date[6:8])) 

277 hours, minutes, seconds, microseconds = map( 

278 int, (raw_date[8:10], raw_date[10:12], raw_date[12:14], raw_date[14:17]) 

279 ) 

280 timestamp = dt.datetime(year, month, day, hours, minutes, seconds, microseconds, tzinfo=tzlocal.get_localzone()) 

281 file_ext: str | None = groups.get("ext") 

282 event: str | None = groups.get("event") 

283 target: str | None = groups.get("target") 

284 if target is None: 

285 log.warning("No target found for match: %s", groups) 

286 return None 

287 if file_ext is None: 

288 file_parts = file_path.name.rsplit(".", 1) 

289 if file_parts: 289 ↛ 291line 289 didn't jump to line 291 because the condition on line 289 was always true

290 file_ext = file_parts[0] 

291 return ImageInfo(target=target, event=event, timestamp=timestamp, ext=file_ext, size=size) 

292 except Exception as e: 

293 log.warning("Unable to parse %s: %s", file_path, e) 

294 return None 

295 

296 

297def scan_ocr_fields(image: Image.Image | None, event_config: EventSettings, ocr_config: OCRSettings) -> dict[str, str | None]: 

298 ocr_field_defs: list[OCRFieldSettings] = [ 

299 ocr_config.fields[k] for k in event_config.ocr_field_ids if k in ocr_config.fields 

300 ] 

301 results: dict[str, str | None] = {f.label: "Unknown" for f in ocr_field_defs} 

302 log.debug("OCR default values: %s", results) 

303 

304 if image is None: 

305 log.debug("OCR Empty image") 

306 return results 

307 if not ocr_field_defs: 

308 log.debug("OCR No fields to scan") 

309 return results 

310 

311 try: 

312 width, height = image.size 

313 except Exception as e: 

314 log.error("OCR fail loading image:%s", e) 

315 results["IMAGE_ERROR"] = str(e) 

316 return results 

317 

318 """ 

319 The Python Imaging Library uses a Cartesian pixel coordinate system, with (0,0) in the upper left corner. 

320 Note that the coordinates refer to the implied pixel corners; the centre of a pixel addressed as (0, 0) 

321 actually lies at (0.5, 0.5). 

322 

323 Coordinates are usually passed to the library as 2-tuples (x, y). 

324 Rectangles are represented as 4-tuples, (x1, y1, x2, y2), with the upper left corner given first. 

325 """ 

326 for field_settings in ocr_field_defs: 

327 try: 

328 if field_settings.crop: 

329 x1: int = field_settings.crop.x # top-left x 

330 y1: int = height - (field_settings.crop.y + field_settings.crop.h) # top-left y [ 0 == top of image] 

331 x2: int = x1 + field_settings.crop.w # bottom-right x 

332 y2: int = height - field_settings.crop.y # bottom-right y [ 0 == top of image] 

333 log.debug("Cropping %s by %s image using PIL to %s", height, width, (x1, y1, x2, y2)) 

334 # region = im.crop((850, height - 30, 1500, height)) "850,30,650,30" 

335 region: Image.Image = image.crop((x1, y1, x2, y2)) 

336 else: 

337 log.debug("No image crop") 

338 region = image 

339 if field_settings.invert: 

340 region = PIL.ImageOps.invert(region) 

341 txt = pytesseract.image_to_string(region, config=r"") 

342 if txt: 

343 log.debug("Tesseract found text %s", txt) 

344 parsed: list[str] = txt.split(":", 1) 

345 else: 

346 log.debug("Tesseract found nothing") 

347 parsed = [] 

348 

349 if len(parsed) > 1: 

350 candidate: str = parsed[1].strip() 

351 if field_settings.correction and candidate not in field_settings.correction: 

352 for correct_to, correct_patterns in field_settings.correction.items(): 

353 if any(re.match(pat, candidate) for pat in correct_patterns): 353 ↛ 352line 353 didn't jump to line 352 because the condition on line 353 was always true

354 log.debug("Auto-correcting %s from %s to %s", field_settings.label, candidate, correct_to) 

355 candidate = correct_to 

356 if candidate and field_settings.values: 

357 for v in field_settings.values: 

358 if candidate.upper() == v.upper() and candidate != v: 

359 log.debug("OCR case correcting field %s from %s to %s", field_settings.label, candidate, v) 

360 candidate = v 

361 if field_settings.values is None or candidate in field_settings.values: 

362 results[field_settings.label] = candidate 

363 else: 

364 log.warning("Unknown value %s for OCR field %s", candidate, field_settings.label) 

365 results[field_settings.label] = "Unknown" 

366 else: 

367 log.warning("Unparsable field %s: %s", field_settings.label, txt) 

368 

369 except Exception as e: 

370 log.error("OCR fail on image:%s", e, exc_info=1) 

371 results["OCR_ERROR"] = f"field:{field_settings.label}, error:{e}" 

372 

373 return results