Coverage for src / anpr2mqtt / event_handler.py: 67%
242 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 15:35 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 15:35 +0000
1import datetime as dt
2import json
3import re
4from io import BytesIO
5from pathlib import Path
6from typing import Any
8import paho.mqtt.client as mqtt
9import PIL.ImageOps
10import pytesseract
11import structlog
12import tzlocal
13from PIL import Image
14from watchdog.events import DirCreatedEvent, FileClosedEvent, FileCreatedEvent, RegexMatchingEventHandler
16from anpr2mqtt.api_client import DVLA, APIClient
17from anpr2mqtt.const import ImageInfo
18from anpr2mqtt.hass import post_image_message, post_state_message
19from anpr2mqtt.settings import (
20 TARGET_TYPE_PLATE,
21 DVLASettings,
22 EventSettings,
23 ImageSettings,
24 OCRFieldSettings,
25 OCRSettings,
26 TargetSettings,
27 TrackerSettings,
28)
30log = structlog.get_logger()
33class EventHandler(RegexMatchingEventHandler):
34 def __init__(
35 self,
36 client: mqtt.Client,
37 state_topic: str,
38 image_topic: str,
39 event_config: EventSettings,
40 target_config: TargetSettings | None,
41 ocr_config: OCRSettings,
42 image_config: ImageSettings,
43 dvla_config: DVLASettings,
44 tracker_config: TrackerSettings,
45 ) -> None:
46 fqre = f"{event_config.watch_path.resolve() / event_config.image_name_re.pattern}"
47 super().__init__(regexes=[fqre], ignore_directories=True, case_sensitive=True)
48 log.debug("Listening for images matching %s", fqre)
49 self.client: mqtt.Client = client
50 self.state_topic: str = state_topic
51 self.event_config: EventSettings = event_config
52 self.tracker_config: TrackerSettings = tracker_config
53 self.target_config: TargetSettings | None = target_config
54 self.ocr_config: OCRSettings = ocr_config
55 self.image_config: ImageSettings = image_config
56 self.dvla_config: DVLASettings = dvla_config
57 if event_config.image_url_base: 57 ↛ 59line 57 didn't jump to line 59 because the condition on line 57 was always true
58 log.info("Images available from web server with prefix %s", event_config.image_url_base)
59 self.image_topic: str | None = image_topic
61 if dvla_config.api_key and event_config.target_type == TARGET_TYPE_PLATE: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 log.info("Configured gov API lookup")
63 self.api_client: APIClient | None = DVLA(dvla_config.api_key, cache_ttl=dvla_config.cache_ttl)
64 else:
65 log.info("No gov API lookup configured")
66 self.api_client = None
68 @property
69 def ignore_directories(self) -> bool:
70 return True
72 def on_created(self, event: DirCreatedEvent | FileCreatedEvent) -> None:
73 if event.event_type != "created" or event.is_directory:
74 log.debug("on_created: skipping irrelevant event: %s", event)
75 return
76 log.info("New file detected: %s", event.src_path)
78 def on_closed(self, event: FileClosedEvent) -> None:
79 if event.event_type != "closed" or event.is_directory: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 log.debug("on_closed: skipping irrelevant event: %s", event)
81 return
82 log.info("New complete file detected: %s", event.src_path)
84 file_path = Path(str(event.src_path))
85 if not file_path.stat() or file_path.stat().st_size == 0: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 log.warning("Empty image file, ignoring, at %s", file_path)
87 return
88 url: str | None = (
89 f"{self.event_config.image_url_base}/{file_path.name!s}" if self.event_config.image_url_base and file_path else None
90 )
92 try:
93 image_info: ImageInfo | None = examine_file(file_path, self.event_config.image_name_re)
94 if image_info is not None:
95 target: str = image_info.target
96 log.info("Examining image for %s at %s", target, file_path.absolute())
98 image: Image.Image | None = process_image(
99 file_path.absolute(), image_info, jpeg_opts=self.image_config.jpeg_opts, png_opts=self.image_config.png_opts
100 )
101 ocr_fields: dict[str, str | None] = scan_ocr_fields(image, self.event_config, self.ocr_config)
103 classification: dict[str, Any] = self.classify_target(target)
104 if classification["target"] != target: 104 ↛ 106line 104 didn't jump to line 106 because the condition on line 104 was never true
105 # apply corrected target name if changed
106 target = classification["target"]
108 reg_info: list[Any] | dict[str, Any] | None = None
109 if ( 109 ↛ 115line 109 didn't jump to line 115 because the condition on line 109 was never true
110 not classification.get("known")
111 and self.api_client
112 and image_info.target
113 and self.event_config.target_type == TARGET_TYPE_PLATE
114 ):
115 reg_info = self.api_client.lookup(target)
117 visit_count: int
118 last_seen: dt.datetime | None
119 visit_count, last_seen = self.track_target(target, self.event_config.target_type, image_info.timestamp)
121 if classification.get("ignore"): 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 log.info("Skipping MQTT publication for ignored %s", target)
123 return
125 post_state_message(
126 self.client,
127 self.state_topic,
128 target=target,
129 event_config=self.event_config,
130 image_info=image_info,
131 ocr_fields=ocr_fields,
132 classification=classification,
133 previous_sightings=visit_count,
134 last_sighting=last_seen,
135 url=url,
136 reg_info=reg_info,
137 file_path=file_path,
138 )
139 if self.image_topic and image: 139 ↛ exitline 139 didn't return from function 'on_closed' because the condition on line 139 was always true
140 img_format = image_info.ext.upper() if image_info.ext else None
141 img_format = "JPEG" if img_format == "JPG" else img_format
142 if img_format: 142 ↛ 145line 142 didn't jump to line 145 because the condition on line 142 was always true
143 post_image_message(self.client, self.image_topic, image, img_format)
144 else:
145 log.warn("Unknown image format for %s", file_path)
146 else:
147 ocr_fields = scan_ocr_fields(None, self.event_config, self.ocr_config)
149 post_state_message(
150 self.client,
151 self.state_topic,
152 event_config=self.event_config,
153 ocr_fields=ocr_fields,
154 target=None,
155 url=url,
156 file_path=file_path,
157 )
159 except Exception as e:
160 log.error("Failed to parse file event %s: %s", event, e, exc_info=1)
161 post_state_message(
162 self.client,
163 self.state_topic,
164 event_config=self.event_config,
165 target=None,
166 ocr_fields={},
167 error=str(e),
168 file_path=file_path,
169 )
171 def track_target(self, target: str, target_type: str, event_dt: dt.datetime | None) -> tuple[int, dt.datetime | None]:
172 target = target or "UNKNOWN"
173 target_type_path = self.tracker_config.data_dir / target_type
174 target_type_path.mkdir(exist_ok=True)
175 target_file = target_type_path / f"{target}.json"
176 last_visit: dt.datetime | None = None
177 previous_visits: int = 0
178 try:
179 sightings: list[str] = []
180 if target_file.exists(): 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true
181 with target_file.open("r") as f:
182 sightings = json.load(f)
183 previous_visits = len(sightings)
184 if previous_visits > 0:
185 last_visit = dt.datetime.fromisoformat(sightings[-1])
187 sightings.append(event_dt.isoformat() if event_dt else dt.datetime.now(tz=tzlocal.get_localzone()).isoformat())
188 with target_file.open("w") as f:
189 json.dump(sightings, f)
190 except Exception as e:
191 log.exception("Failed to track sightings for %s:%s", target, e)
192 return previous_visits, last_visit
194 def classify_target(self, target: str | None) -> dict[str, Any]:
195 results = {
196 "orig_target": target,
197 "target": target,
198 "ignore": False,
199 "known": False,
200 "dangerous": False,
201 "priority": "high",
202 "description": "Unknown vehicle",
203 }
204 if not target or self.target_config is None: 204 ↛ 206line 204 didn't jump to line 206 because the condition on line 204 was never true
205 # empty dict to make home assistant template logic easier
206 return results
207 for corrected_target, patterns in self.target_config.correction.items():
208 if any(re.match(pat, target) for pat in patterns): 208 ↛ 207line 208 didn't jump to line 207 because the condition on line 208 was always true
209 results["target"] = corrected_target
210 target = corrected_target
211 log.info("Corrected target %s -> %s", results["orig_target"], target)
212 break
213 for pat in self.target_config.ignore:
214 if re.match(pat, target): 214 ↛ 213line 214 didn't jump to line 213 because the condition on line 214 was always true
215 log.info("Ignoring %s matching ignore pattern %s", target, pat)
216 results["ignore"] = True
217 results["priority"] = "low"
218 results["description"] = "Ignored"
219 break
220 if target in self.target_config.dangerous:
221 log.warning("%s known as potential danger", target)
222 results["dangerous"] = True
223 results["priority"] = "critical"
224 results["description"] = self.target_config.dangerous[target] or "Potential threat"
225 if target in self.target_config.known:
226 log.warning("%s known to household", target)
227 results["known"] = True
228 results["priority"] = "medium"
229 results["description"] = self.target_config.known[target] or "Known"
231 return results
234def process_image(
235 file_path: Path, image_info: ImageInfo, jpeg_opts: dict[str, Any], png_opts: dict[str, Any]
236) -> Image.Image | None:
237 try:
238 image: Image.Image | None = Image.open(file_path.absolute())
239 if image is None: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 log.error("Unable to open image at %s", file_path.absolute())
241 return None
242 image_format: str | None = image.format.lower() if image.format else image_info.ext
243 img_args: dict[str, Any] | None = None
245 if image_format in ("jpg", "jpeg") and jpeg_opts: 245 ↛ 247line 245 didn't jump to line 247 because the condition on line 245 was always true
246 img_args = jpeg_opts
247 elif image_format == "png" and png_opts:
248 img_args = png_opts
249 if img_args: 249 ↛ 259line 249 didn't jump to line 259 because the condition on line 249 was always true
250 log.debug("Rewriting image to process %s", img_args)
251 buffer = BytesIO()
252 image.save(buffer, image_format, **img_args)
253 size = buffer.getbuffer().nbytes
254 if size != image_info.size: 254 ↛ 257line 254 didn't jump to line 257 because the condition on line 254 was always true
255 log.info("Image size %s -> %s", image_info.size, size)
256 image_info.size = size
257 image = Image.open(buffer)
258 log.info("Resaved image with %s", img_args)
259 return image
260 except Exception as e:
261 log.warn("Unable to load image at %s: %s", file_path, e)
262 return None
265def examine_file(file_path: Path, image_name_re: re.Pattern) -> ImageInfo | None:
266 try:
267 match = re.match(image_name_re, file_path.name)
268 if match:
269 groups = match.groupdict()
270 size: int = file_path.stat().st_size
271 raw_date = match.group("dt")
272 year, month, day = map(int, (raw_date[:4], raw_date[4:6], raw_date[6:8]))
273 hours, minutes, seconds, microseconds = map(
274 int, (raw_date[8:10], raw_date[10:12], raw_date[12:14], raw_date[14:17])
275 )
276 timestamp = dt.datetime(year, month, day, hours, minutes, seconds, microseconds, tzinfo=tzlocal.get_localzone())
277 file_ext: str | None = groups.get("ext")
278 event: str | None = groups.get("event")
279 target: str | None = groups.get("target")
280 if target is None: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true
281 log.warning("No target found for match: %s", groups)
282 return None
283 if file_ext is None: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true
284 file_parts = file_path.name.rsplit(".", 1)
285 if file_parts:
286 file_ext = file_parts[0]
287 return ImageInfo(target=target, event=event, timestamp=timestamp, ext=file_ext, size=size)
288 except Exception as e:
289 log.warning("Unable to parse %s: %s", file_path, e)
290 return None
293def scan_ocr_fields(image: Image.Image | None, event_config: EventSettings, ocr_config: OCRSettings) -> dict[str, str | None]:
294 ocr_field_defs: list[OCRFieldSettings] = [
295 ocr_config.fields[k] for k in event_config.ocr_field_ids if k in ocr_config.fields
296 ]
297 results: dict[str, str | None] = {f.label: "Unknown" for f in ocr_field_defs}
298 log.debug("OCR default values: %s", results)
300 if image is None:
301 log.debug("OCR Empty image")
302 return results
303 if not ocr_field_defs: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true
304 log.debug("OCR No fields to scan")
305 return results
307 try:
308 width, height = image.size
309 except Exception as e:
310 log.error("OCR fail loading image:%s", e)
311 results["IMAGE_ERROR"] = str(e)
312 return results
314 """
315 The Python Imaging Library uses a Cartesian pixel coordinate system, with (0,0) in the upper left corner.
316 Note that the coordinates refer to the implied pixel corners; the centre of a pixel addressed as (0, 0)
317 actually lies at (0.5, 0.5).
319 Coordinates are usually passed to the library as 2-tuples (x, y).
320 Rectangles are represented as 4-tuples, (x1, y1, x2, y2), with the upper left corner given first.
321 """
322 for field_settings in ocr_field_defs:
323 try:
324 if field_settings.crop: 324 ↛ 333line 324 didn't jump to line 333 because the condition on line 324 was always true
325 x1: int = field_settings.crop.x # top-left x
326 y1: int = height - (field_settings.crop.y + field_settings.crop.h) # top-left y [ 0 == top of image]
327 x2: int = x1 + field_settings.crop.w # bottom-right x
328 y2: int = height - field_settings.crop.y # bottom-right y [ 0 == top of image]
329 log.debug("Cropping %s by %s image using PIL to %s", height, width, (x1, y1, x2, y2))
330 # region = im.crop((850, height - 30, 1500, height)) "850,30,650,30"
331 region: Image.Image = image.crop((x1, y1, x2, y2))
332 else:
333 log.debug("No image crop")
334 region = image
335 if field_settings.invert: 335 ↛ 337line 335 didn't jump to line 337 because the condition on line 335 was always true
336 region = PIL.ImageOps.invert(region)
337 txt = pytesseract.image_to_string(region, config=r"")
338 if txt: 338 ↛ 342line 338 didn't jump to line 342 because the condition on line 338 was always true
339 log.debug("Tesseract found text %s", txt)
340 parsed: list[str] = txt.split(":", 1)
341 else:
342 log.debug("Tesseract found nothing")
343 parsed = []
345 if len(parsed) > 1: 345 ↛ 363line 345 didn't jump to line 363 because the condition on line 345 was always true
346 candidate: str = parsed[1].strip()
347 if field_settings.correction and candidate not in field_settings.correction: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true
348 for correct_to, correct_patterns in field_settings.correction.items():
349 if any(re.match(pat, candidate) for pat in correct_patterns):
350 log.debug("Auto-correcting %s from %s to %s", field_settings.label, candidate, correct_to)
351 candidate = correct_to
352 if candidate and field_settings.values: 352 ↛ 353line 352 didn't jump to line 353 because the condition on line 352 was never true
353 for v in field_settings.values:
354 if candidate.upper() == v.upper() and candidate != v:
355 log.debug("OCR case correcting field %s from %s to %s", field_settings.label, candidate, v)
356 candidate = v
357 if field_settings.values is None or candidate in field_settings.values: 357 ↛ 360line 357 didn't jump to line 360 because the condition on line 357 was always true
358 results[field_settings.label] = candidate
359 else:
360 log.warning("Unknown value %s for OCR field %s", candidate, field_settings.label)
361 results[field_settings.label] = "Unknown"
362 else:
363 log.warning("Unparsable field %s: %s", field_settings.label, txt)
365 except Exception as e:
366 log.error("OCR fail on image:%s", e, exc_info=1)
367 results["OCR_ERROR"] = f"field:{field_settings.label}, error:{e}"
369 return results