Coverage for src / anpr2mqtt / event_handler.py: 84%
242 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 17:29 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 17:29 +0000
1import datetime as dt
2import json
3import re
4from io import BytesIO
5from pathlib import Path
6from typing import Any
8import PIL.ImageOps
9import pytesseract
10import structlog
11import tzlocal
12from PIL import Image
13from watchdog.events import DirCreatedEvent, FileClosedEvent, FileCreatedEvent, RegexMatchingEventHandler
15from anpr2mqtt.api_client import DVLA, APIClient
16from anpr2mqtt.const import ImageInfo
17from anpr2mqtt.hass import HomeAssistantPublisher
18from anpr2mqtt.settings import (
19 TARGET_TYPE_PLATE,
20 CameraSettings,
21 DVLASettings,
22 EventSettings,
23 ImageSettings,
24 OCRFieldSettings,
25 OCRSettings,
26 TargetSettings,
27 TrackerSettings,
28)
30log = structlog.get_logger()
33class EventHandler(RegexMatchingEventHandler):
34 def __init__(
35 self,
36 publisher: HomeAssistantPublisher,
37 state_topic: str,
38 image_topic: str,
39 event_config: EventSettings,
40 camera: CameraSettings,
41 target_config: TargetSettings | None,
42 ocr_config: OCRSettings,
43 image_config: ImageSettings,
44 dvla_config: DVLASettings,
45 tracker_config: TrackerSettings,
46 ) -> None:
47 fqre = f"{event_config.watch_path.resolve() / event_config.image_name_re.pattern}"
48 super().__init__(regexes=[fqre], ignore_directories=True, case_sensitive=True)
49 log.debug("Listening for images matching %s", fqre)
50 self.publisher = publisher
51 self.state_topic: str = state_topic
52 self.event_config: EventSettings = event_config
53 self.camera: CameraSettings = camera
54 self.tracker_config: TrackerSettings = tracker_config
55 self.target_config: TargetSettings | None = target_config
56 self.ocr_config: OCRSettings = ocr_config
57 self.image_config: ImageSettings = image_config
58 self.dvla_config: DVLASettings = dvla_config
59 if event_config.image_url_base:
60 log.info("Images available from web server with prefix %s", event_config.image_url_base)
61 self.image_topic: str = image_topic
63 if dvla_config.api_key and event_config.target_type == TARGET_TYPE_PLATE: 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true
64 log.info("Configured gov API lookup")
65 self.api_client: APIClient | None = DVLA(dvla_config.api_key, cache_ttl=dvla_config.cache_ttl)
66 else:
67 log.info("No gov API lookup configured")
68 self.api_client = None
70 @property
71 def ignore_directories(self) -> bool:
72 return True
74 def on_created(self, event: DirCreatedEvent | FileCreatedEvent) -> None:
75 if event.event_type != "created" or event.is_directory:
76 log.debug("on_created: skipping irrelevant event: %s", event)
77 return
78 log.info("New file detected: %s", event.src_path)
80 def on_closed(self, event: FileClosedEvent) -> None:
81 if event.event_type != "closed" or event.is_directory:
82 log.debug("on_closed: skipping irrelevant event: %s", event)
83 return
84 log.info("New complete file detected: %s", event.src_path)
86 file_path = Path(str(event.src_path))
87 if not file_path.stat() or file_path.stat().st_size == 0:
88 log.warning("Empty image file, ignoring, at %s", file_path)
89 return
90 url: str | None = (
91 f"{self.event_config.image_url_base}/{file_path.name!s}" if self.event_config.image_url_base and file_path else None
92 )
94 try:
95 image_info: ImageInfo | None = examine_file(file_path, self.event_config.image_name_re)
96 if image_info is not None:
97 target: str = image_info.target
98 log.info("Examining image for %s at %s", target, file_path.absolute())
100 image: Image.Image | None = process_image(
101 file_path.absolute(), image_info, jpeg_opts=self.image_config.jpeg_opts, png_opts=self.image_config.png_opts
102 )
103 ocr_fields: dict[str, str | None] = scan_ocr_fields(image, self.event_config, self.ocr_config)
105 classification: dict[str, Any] = self.classify_target(target)
106 if classification["target"] != target: 106 ↛ 108line 106 didn't jump to line 108 because the condition on line 106 was never true
107 # apply corrected target name if changed
108 target = classification["target"]
110 reg_info: list[Any] | dict[str, Any] | None = None
111 if (
112 not classification.get("known")
113 and self.api_client
114 and image_info.target
115 and self.event_config.target_type == TARGET_TYPE_PLATE
116 ):
117 reg_info = self.api_client.lookup(target)
119 visit_count: int
120 last_seen: dt.datetime | None
121 visit_count, last_seen = self.track_target(target, self.event_config.target_type, image_info.timestamp)
123 if classification.get("ignore"):
124 log.info("Skipping MQTT publication for ignored %s", target)
125 return
127 self.publisher.post_state_message(
128 self.state_topic,
129 target=target,
130 event_config=self.event_config,
131 camera=self.camera,
132 image_info=image_info,
133 ocr_fields=ocr_fields,
134 classification=classification,
135 previous_sightings=visit_count,
136 last_sighting=last_seen,
137 url=url,
138 reg_info=reg_info,
139 file_path=file_path,
140 )
141 if image: 141 ↛ exitline 141 didn't return from function 'on_closed' because the condition on line 141 was always true
142 img_format = image_info.ext.upper() if image_info.ext else None
143 img_format = "JPEG" if img_format == "JPG" else img_format
144 if img_format: 144 ↛ 147line 144 didn't jump to line 147 because the condition on line 144 was always true
145 self.publisher.post_image_message(self.image_topic, image, img_format)
146 else:
147 log.warn("Unknown image format for %s", file_path)
148 else:
149 ocr_fields = scan_ocr_fields(None, self.event_config, self.ocr_config)
151 self.publisher.post_state_message(
152 self.state_topic,
153 event_config=self.event_config,
154 camera=self.camera,
155 ocr_fields=ocr_fields,
156 target=None,
157 url=url,
158 file_path=file_path,
159 )
161 except Exception as e:
162 log.error("Failed to parse file event %s: %s", event, e, exc_info=1)
163 self.publisher.post_state_message(
164 self.state_topic,
165 event_config=self.event_config,
166 camera=self.camera,
167 target=None,
168 error=str(e),
169 file_path=file_path,
170 )
172 def track_target(self, target: str, target_type: str, event_dt: dt.datetime | None) -> tuple[int, dt.datetime | None]:
173 target = target or "UNKNOWN"
174 target_type_path = self.tracker_config.data_dir / target_type
175 target_type_path.mkdir(exist_ok=True)
176 target_file = target_type_path / f"{target}.json"
177 last_visit: dt.datetime | None = None
178 previous_visits: int = 0
179 try:
180 sightings: list[str] = []
181 if target_file.exists():
182 with target_file.open("r") as f:
183 sightings = json.load(f)
184 previous_visits = len(sightings)
185 if previous_visits > 0: 185 ↛ 188line 185 didn't jump to line 188
186 last_visit = dt.datetime.fromisoformat(sightings[-1])
188 sightings.append(event_dt.isoformat() if event_dt else dt.datetime.now(tz=tzlocal.get_localzone()).isoformat())
189 with target_file.open("w") as f:
190 json.dump(sightings, f)
191 except Exception as e:
192 log.exception("Failed to track sightings for %s:%s", target, e)
193 return previous_visits, last_visit
195 def classify_target(self, target: str | None) -> dict[str, Any]:
196 results = {
197 "orig_target": target,
198 "target": target,
199 "ignore": False,
200 "known": False,
201 "dangerous": False,
202 "priority": "high",
203 "description": "Unknown vehicle",
204 }
205 if not target or self.target_config is None:
206 # empty dict to make home assistant template logic easier
207 return results
208 for corrected_target, patterns in self.target_config.correction.items():
209 if any(re.match(pat, target) for pat in patterns): 209 ↛ 208line 209 didn't jump to line 208 because the condition on line 209 was always true
210 results["target"] = corrected_target
211 target = corrected_target
212 log.info("Corrected target %s -> %s", results["orig_target"], target)
213 break
214 for pat in self.target_config.ignore:
215 if re.match(pat, target): 215 ↛ 214line 215 didn't jump to line 214 because the condition on line 215 was always true
216 log.info("Ignoring %s matching ignore pattern %s", target, pat)
217 results["ignore"] = True
218 results["priority"] = "low"
219 results["description"] = "Ignored"
220 break
221 if target in self.target_config.dangerous:
222 log.warning("%s known as potential danger", target)
223 results["dangerous"] = True
224 results["priority"] = "critical"
225 results["description"] = self.target_config.dangerous[target] or "Potential threat"
226 if target in self.target_config.known:
227 log.warning("%s known to household", target)
228 results["known"] = True
229 results["priority"] = "medium"
230 results["description"] = self.target_config.known[target] or "Known"
232 return results
235def process_image(
236 file_path: Path, image_info: ImageInfo, jpeg_opts: dict[str, Any], png_opts: dict[str, Any]
237) -> Image.Image | None:
238 try:
239 image: Image.Image | None = Image.open(file_path.absolute())
240 if image is None: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 log.error("Unable to open image at %s", file_path.absolute())
242 return None
243 image_format: str | None = image.format.lower() if image.format else image_info.ext
244 img_args: dict[str, Any] | None = None
246 if image_format in ("jpg", "jpeg") and jpeg_opts:
247 img_args = jpeg_opts
248 elif image_format == "png" and png_opts:
249 img_args = png_opts
250 if img_args:
251 log.debug("Rewriting image to process %s", img_args)
252 buffer = BytesIO()
253 image.save(buffer, image_format, **img_args)
254 size = buffer.getbuffer().nbytes
255 if size != image_info.size: 255 ↛ 258line 255 didn't jump to line 258 because the condition on line 255 was always true
256 log.info("Image size %s -> %s", image_info.size, size)
257 image_info.size = size
258 image = Image.open(buffer)
259 log.info("Resaved image with %s", img_args)
260 return image
261 except Exception as e:
262 log.warn("Unable to load image at %s: %s", file_path, e)
263 return None
266def examine_file(file_path: Path, image_name_re: re.Pattern) -> ImageInfo | None:
267 try:
268 match = re.match(image_name_re, file_path.name)
269 if match:
270 groups = match.groupdict()
271 size: int = file_path.stat().st_size
272 raw_date = match.group("dt")
273 year, month, day = map(int, (raw_date[:4], raw_date[4:6], raw_date[6:8]))
274 hours, minutes, seconds, microseconds = map(
275 int, (raw_date[8:10], raw_date[10:12], raw_date[12:14], raw_date[14:17])
276 )
277 timestamp = dt.datetime(year, month, day, hours, minutes, seconds, microseconds, tzinfo=tzlocal.get_localzone())
278 file_ext: str | None = groups.get("ext")
279 event: str | None = groups.get("event")
280 target: str | None = groups.get("target")
281 if target is None:
282 log.warning("No target found for match: %s", groups)
283 return None
284 if file_ext is None:
285 file_parts = file_path.name.rsplit(".", 1)
286 if file_parts: 286 ↛ 288line 286 didn't jump to line 288 because the condition on line 286 was always true
287 file_ext = file_parts[0]
288 return ImageInfo(target=target, event=event, timestamp=timestamp, ext=file_ext, size=size)
289 except Exception as e:
290 log.warning("Unable to parse %s: %s", file_path, e)
291 return None
294def scan_ocr_fields(image: Image.Image | None, event_config: EventSettings, ocr_config: OCRSettings) -> dict[str, str | None]:
295 ocr_field_defs: list[OCRFieldSettings] = [
296 ocr_config.fields[k] for k in event_config.ocr_field_ids if k in ocr_config.fields
297 ]
298 results: dict[str, str | None] = {f.label: "Unknown" for f in ocr_field_defs}
299 log.debug("OCR default values: %s", results)
301 if image is None:
302 log.debug("OCR Empty image")
303 return results
304 if not ocr_field_defs:
305 log.debug("OCR No fields to scan")
306 return results
308 try:
309 width, height = image.size
310 except Exception as e:
311 log.error("OCR fail loading image:%s", e)
312 results["IMAGE_ERROR"] = str(e)
313 return results
315 """
316 The Python Imaging Library uses a Cartesian pixel coordinate system, with (0,0) in the upper left corner.
317 Note that the coordinates refer to the implied pixel corners; the centre of a pixel addressed as (0, 0)
318 actually lies at (0.5, 0.5).
320 Coordinates are usually passed to the library as 2-tuples (x, y).
321 Rectangles are represented as 4-tuples, (x1, y1, x2, y2), with the upper left corner given first.
322 """
323 for field_settings in ocr_field_defs:
324 try:
325 if field_settings.crop: 325 ↛ 334line 325 didn't jump to line 334 because the condition on line 325 was always true
326 x1: int = field_settings.crop.x # top-left x
327 y1: int = height - (field_settings.crop.y + field_settings.crop.h) # top-left y [ 0 == top of image]
328 x2: int = x1 + field_settings.crop.w # bottom-right x
329 y2: int = height - field_settings.crop.y # bottom-right y [ 0 == top of image]
330 log.debug("Cropping %s by %s image using PIL to %s", height, width, (x1, y1, x2, y2))
331 # region = im.crop((850, height - 30, 1500, height)) "850,30,650,30"
332 region: Image.Image = image.crop((x1, y1, x2, y2))
333 else:
334 log.debug("No image crop")
335 region = image
336 if field_settings.invert: 336 ↛ 338line 336 didn't jump to line 338 because the condition on line 336 was always true
337 region = PIL.ImageOps.invert(region)
338 txt = pytesseract.image_to_string(region, config=r"")
339 if txt: 339 ↛ 343line 339 didn't jump to line 343 because the condition on line 339 was always true
340 log.debug("Tesseract found text %s", txt)
341 parsed: list[str] = txt.split(":", 1)
342 else:
343 log.debug("Tesseract found nothing")
344 parsed = []
346 if len(parsed) > 1: 346 ↛ 364line 346 didn't jump to line 364 because the condition on line 346 was always true
347 candidate: str = parsed[1].strip()
348 if field_settings.correction and candidate not in field_settings.correction: 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true
349 for correct_to, correct_patterns in field_settings.correction.items():
350 if any(re.match(pat, candidate) for pat in correct_patterns):
351 log.debug("Auto-correcting %s from %s to %s", field_settings.label, candidate, correct_to)
352 candidate = correct_to
353 if candidate and field_settings.values:
354 for v in field_settings.values:
355 if candidate.upper() == v.upper() and candidate != v: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 log.debug("OCR case correcting field %s from %s to %s", field_settings.label, candidate, v)
357 candidate = v
358 if field_settings.values is None or candidate in field_settings.values: 358 ↛ 361line 358 didn't jump to line 361 because the condition on line 358 was always true
359 results[field_settings.label] = candidate
360 else:
361 log.warning("Unknown value %s for OCR field %s", candidate, field_settings.label)
362 results[field_settings.label] = "Unknown"
363 else:
364 log.warning("Unparsable field %s: %s", field_settings.label, txt)
366 except Exception as e:
367 log.error("OCR fail on image:%s", e, exc_info=1)
368 results["OCR_ERROR"] = f"field:{field_settings.label}, error:{e}"
370 return results