Coverage for src / anpr2mqtt / event_handler.py: 94%
224 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-30 16:07 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-30 16:07 +0000
1import datetime as dt
2import re
3from io import BytesIO
4from pathlib import Path
5from typing import TYPE_CHECKING, Any
7import PIL.ImageOps
8import pytesseract
9import structlog
10import tzlocal
11from PIL import Image
12from watchdog.events import DirCreatedEvent, FileClosedEvent, FileCreatedEvent, RegexMatchingEventHandler
14from anpr2mqtt.const import ImageInfo
15from anpr2mqtt.handler_common import AutoclearTimer, CameraGatekeeper, build_dvla_client, correct_against_good_read
16from anpr2mqtt.hass import HomeAssistantPublisher
17from anpr2mqtt.settings import (
18 TARGET_TYPE_PLATE,
19 CameraSettings,
20 DVLASettings,
21 EventSettings,
22 ImageSettings,
23 OCRFieldSettings,
24 OCRSettings,
25)
26from anpr2mqtt.tracker import Sighting, Tracker
28if TYPE_CHECKING:
29 from anpr2mqtt.api_client import APIClient
31log = structlog.get_logger()
34class EventHandler(RegexMatchingEventHandler):
35 def __init__(
36 self,
37 publisher: HomeAssistantPublisher,
38 state_topic: str,
39 image_topic: str,
40 event_config: EventSettings,
41 camera: CameraSettings,
42 ocr_config: OCRSettings,
43 image_config: ImageSettings,
44 dvla_config: DVLASettings,
45 tracker: Tracker,
46 mqtt_topic_root: str = "anpr2mqtt",
47 ) -> None:
48 fqre = f"{event_config.watch_path.resolve() / event_config.image_name_re.pattern}"
49 super().__init__(regexes=[fqre], ignore_directories=True, case_sensitive=True)
50 log.debug("Listening for images matching %s", fqre)
51 self.publisher = publisher
52 self.state_topic: str = state_topic
53 self.event_config: EventSettings = event_config
54 self.camera: CameraSettings = camera
55 self.tracker: Tracker = tracker
56 self.ocr_config: OCRSettings = ocr_config
57 self.image_config: ImageSettings = image_config
58 self.dvla_config: DVLASettings = dvla_config
59 if event_config.image_url_base:
60 log.info("Images available from web server with prefix %s", event_config.image_url_base)
61 self.image_topic: str = image_topic
62 self.mqtt_topic_root: str = mqtt_topic_root
63 self.api_client: APIClient | None = build_dvla_client(dvla_config, event_config.target_type)
64 if self.api_client:
65 log.info("Configured gov API lookup, cache type %s, ttl %s", dvla_config.cache_type, dvla_config.cache_ttl)
66 else:
67 log.info("No gov API lookup configured")
69 self._autoclear_timer = AutoclearTimer()
70 self._camera_gate = CameraGatekeeper()
71 self._last_good_plate: tuple[str, dt.datetime] | None = None
73 @property
74 def ignore_directories(self) -> bool:
75 return True
77 def on_created(self, event: DirCreatedEvent | FileCreatedEvent) -> None:
78 if event.event_type != "created" or event.is_directory:
79 log.debug("on_created: skipping irrelevant event: %s", event)
80 return
81 log.info("New file detected: %s", event.src_path)
83 def on_closed(self, event: FileClosedEvent) -> None:
84 if event.event_type != "closed" or event.is_directory:
85 log.debug("on_closed: skipping irrelevant event: %s", event)
86 return
87 log.info("New complete file detected: %s", event.src_path)
89 file_path = Path(str(event.src_path))
90 if not file_path.stat() or file_path.stat().st_size == 0:
91 log.warning("Empty image file, ignoring, at %s", file_path)
92 return
93 url: str | None = (
94 f"{self.event_config.image_url_base}/{file_path.name!s}" if self.event_config.image_url_base and file_path else None
95 )
96 if not url:
97 log.warning(
98 "No URL available for image, URL base %s, file path name %s",
99 self.event_config.image_url_base,
100 file_path.name if file_path else None,
101 )
103 try:
104 image_info: ImageInfo | None = examine_file(file_path, self.event_config.image_name_re)
105 if image_info is not None and image_info.target is not None:
106 target_id: str = image_info.target
107 log.info("Examining image for %s at %s", target_id, file_path.absolute())
109 target_id = correct_against_good_read(
110 target_id,
111 self._last_good_plate,
112 self.event_config.good_read_ttl,
113 self.event_config.good_read_tolerance,
114 self.tracker.normalizer,
115 )
117 image: Image.Image | None = process_image(
118 file_path.absolute(), image_info, jpeg_opts=self.image_config.jpeg_opts, png_opts=self.image_config.png_opts
119 )
120 ocr_fields: dict[str, str | None] = scan_ocr_fields(image, self.event_config, self.ocr_config)
122 sighting: Sighting = self.tracker.find(target_id)
124 reg_info: dict[str, Any] | None = None
125 if (
126 sighting.target.lookup
127 and self.api_client
128 and image_info.target
129 and self.event_config.target_type == TARGET_TYPE_PLATE
130 ):
131 api_info: dict[str, Any] = self.api_client.lookup(sighting.target.id)
132 if api_info.get("success"): 132 ↛ 138line 132 didn't jump to line 138 because the condition on line 132 was always true
133 reg_info = api_info.get("plate")
134 if sighting.target.description is None and api_info and api_info.get("description"): 134 ↛ 136line 134 didn't jump to line 136 because the condition on line 134 was always true
135 sighting.target.description = api_info["description"]
136 self._last_good_plate = (sighting.target.id, dt.datetime.now(dt.UTC))
138 time_analysis: dict[str, Any] = self.tracker.record(
139 sighting.target.id, self.event_config.target_type, image_info.timestamp
140 )
142 if not time_analysis.get("is_new_visit", True): 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true
143 log.info("Skipping duplicate filesystem visit for %s (within gap window)", sighting.target.id)
144 return
146 if not self._camera_gate.allow( 146 ↛ 149line 146 didn't jump to line 149 because the condition on line 146 was never true
147 image_info.timestamp, reg_info is not None, self.tracker.tracker_config.min_visit_gap_seconds
148 ):
149 log.info("Skipping cross-plate duplicate for %s (plate=%s)", self.event_config.camera, target_id)
150 return
152 entity_id: str | None = sighting.target.entity_id
153 if entity_id: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true
154 target_state_topic = f"{self.mqtt_topic_root}/targets/{self.event_config.target_type}/{entity_id}"
155 self.publisher.publish_target_state(
156 state_topic=target_state_topic,
157 description=sighting.target.description,
158 time_analysis=time_analysis,
159 )
161 if sighting.ignore:
162 log.info("Skipping MQTT publication for ignored %s", sighting.target.id)
163 return
165 self.publisher.post_state_message(
166 self.state_topic,
167 sighting=sighting,
168 event_config=self.event_config,
169 camera=self.camera,
170 image_info=image_info,
171 extra_info=ocr_fields,
172 time_analysis=time_analysis,
173 url=url,
174 reg_info=reg_info,
175 file_path=file_path,
176 source="filesystem",
177 )
178 if image:
179 img_format = image_info.ext.upper() if image_info.ext else None
180 img_format = "JPEG" if img_format == "JPG" else img_format
181 if img_format:
182 self.publisher.post_image_message(self.image_topic, image, img_format)
183 else:
184 log.warn("Unknown image format for %s", file_path)
185 self._schedule_autoclear()
186 else:
187 log.warning("No image found for %s", file_path)
188 ocr_fields = scan_ocr_fields(None, self.event_config, self.ocr_config)
190 self.publisher.post_state_message(
191 self.state_topic,
192 event_config=self.event_config,
193 camera=self.camera,
194 extra_info=ocr_fields,
195 sighting=None,
196 url=url,
197 file_path=file_path,
198 source="filesystem",
199 )
200 self._schedule_autoclear()
202 except Exception as e:
203 log.error("Failed to parse file event %s: %s", event, e, exc_info=1)
204 self.publisher.post_state_message(
205 self.state_topic,
206 event_config=self.event_config,
207 camera=self.camera,
208 sighting=None,
209 error=str(e),
210 file_path=file_path,
211 )
213 def _schedule_autoclear(self) -> None:
214 self._autoclear_timer.schedule(
215 self.event_config,
216 self._do_autoclear,
217 f"{self.event_config.event}/{self.event_config.camera}",
218 )
220 def _do_autoclear(self) -> None:
221 autoclear = self.event_config.autoclear
222 log.info("Autoclear firing for %s/%s", self.event_config.event, self.event_config.camera)
223 if autoclear.state:
224 self.publisher.post_state_message(
225 self.state_topic, sighting=None, event_config=self.event_config, camera=self.camera
226 )
227 if autoclear.image:
228 self.publisher.post_image_message(self.image_topic, image=None)
231def process_image(
232 file_path: Path, image_info: ImageInfo, jpeg_opts: dict[str, Any], png_opts: dict[str, Any]
233) -> Image.Image | None:
234 try:
235 image: Image.Image | None = Image.open(file_path.absolute())
236 if image is None: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 log.error("Unable to open image at %s", file_path.absolute())
238 return None
239 image_format: str | None = image.format.lower() if image.format else image_info.ext
240 img_args: dict[str, Any] | None = None
242 if image_format in ("jpg", "jpeg") and jpeg_opts:
243 img_args = jpeg_opts
244 elif image_format == "png" and png_opts:
245 img_args = png_opts
246 else:
247 log.info("Unknown image format %s, no tuning available", image_format)
248 if img_args:
249 log.debug("Rewriting image to process %s", img_args)
250 buffer = BytesIO()
251 image.save(buffer, image_format, **img_args)
252 size = buffer.getbuffer().nbytes
253 if image_info.size and size != image_info.size:
254 log.info(
255 "Image size %s -> %s, %0.2f%% saving",
256 image_info.size,
257 size,
258 (image_info.size / (image_info.size - size)) * 100,
259 )
260 image_info.size = size
261 image = Image.open(buffer)
262 log.debug("Resaved image with %s", img_args)
263 return image
264 except Exception as e:
265 log.warn("Unable to load image at %s: %s", file_path, e)
266 return None
269def examine_file(file_path: Path, image_name_re: re.Pattern[str]) -> ImageInfo | None:
270 try:
271 match = re.match(image_name_re, file_path.name)
272 if match:
273 groups = match.groupdict()
274 size: int = file_path.stat().st_size
275 raw_date = match.group("dt")
276 year, month, day = map(int, (raw_date[:4], raw_date[4:6], raw_date[6:8]))
277 hours, minutes, seconds, microseconds = map(
278 int, (raw_date[8:10], raw_date[10:12], raw_date[12:14], raw_date[14:17])
279 )
280 timestamp = dt.datetime(year, month, day, hours, minutes, seconds, microseconds, tzinfo=tzlocal.get_localzone())
281 file_ext: str | None = groups.get("ext")
282 event: str | None = groups.get("event")
283 target: str | None = groups.get("target")
284 if target is None:
285 log.warning("No target found for match: %s", groups)
286 return None
287 if file_ext is None:
288 file_parts = file_path.name.rsplit(".", 1)
289 if file_parts: 289 ↛ 291line 289 didn't jump to line 291 because the condition on line 289 was always true
290 file_ext = file_parts[0]
291 return ImageInfo(target=target, event=event, timestamp=timestamp, ext=file_ext, size=size)
292 except Exception as e:
293 log.warning("Unable to parse %s: %s", file_path, e)
294 return None
297def scan_ocr_fields(image: Image.Image | None, event_config: EventSettings, ocr_config: OCRSettings) -> dict[str, str | None]:
298 ocr_field_defs: list[OCRFieldSettings] = [
299 ocr_config.fields[k] for k in event_config.ocr_field_ids if k in ocr_config.fields
300 ]
301 results: dict[str, str | None] = {f.label: "Unknown" for f in ocr_field_defs}
302 log.debug("OCR default values: %s", results)
304 if image is None:
305 log.debug("OCR Empty image")
306 return results
307 if not ocr_field_defs:
308 log.debug("OCR No fields to scan")
309 return results
311 try:
312 width, height = image.size
313 except Exception as e:
314 log.error("OCR fail loading image:%s", e)
315 results["IMAGE_ERROR"] = str(e)
316 return results
318 """
319 The Python Imaging Library uses a Cartesian pixel coordinate system, with (0,0) in the upper left corner.
320 Note that the coordinates refer to the implied pixel corners; the centre of a pixel addressed as (0, 0)
321 actually lies at (0.5, 0.5).
323 Coordinates are usually passed to the library as 2-tuples (x, y).
324 Rectangles are represented as 4-tuples, (x1, y1, x2, y2), with the upper left corner given first.
325 """
326 for field_settings in ocr_field_defs:
327 try:
328 if field_settings.crop:
329 x1: int = field_settings.crop.x # top-left x
330 y1: int = height - (field_settings.crop.y + field_settings.crop.h) # top-left y [ 0 == top of image]
331 x2: int = x1 + field_settings.crop.w # bottom-right x
332 y2: int = height - field_settings.crop.y # bottom-right y [ 0 == top of image]
333 log.debug("Cropping %s by %s image using PIL to %s", height, width, (x1, y1, x2, y2))
334 # region = im.crop((850, height - 30, 1500, height)) "850,30,650,30"
335 region: Image.Image = image.crop((x1, y1, x2, y2))
336 else:
337 log.debug("No image crop")
338 region = image
339 if field_settings.invert:
340 region = PIL.ImageOps.invert(region)
341 txt = pytesseract.image_to_string(region, config=r"")
342 if txt:
343 log.debug("Tesseract found text %s", txt)
344 parsed: list[str] = txt.split(":", 1)
345 else:
346 log.debug("Tesseract found nothing")
347 parsed = []
349 if len(parsed) > 1:
350 candidate: str = parsed[1].strip()
351 if field_settings.correction and candidate not in field_settings.correction:
352 for correct_to, correct_patterns in field_settings.correction.items():
353 if any(re.match(pat, candidate) for pat in correct_patterns): 353 ↛ 352line 353 didn't jump to line 352 because the condition on line 353 was always true
354 log.debug("Auto-correcting %s from %s to %s", field_settings.label, candidate, correct_to)
355 candidate = correct_to
356 if candidate and field_settings.values:
357 for v in field_settings.values:
358 if candidate.upper() == v.upper() and candidate != v:
359 log.debug("OCR case correcting field %s from %s to %s", field_settings.label, candidate, v)
360 candidate = v
361 if field_settings.values is None or candidate in field_settings.values:
362 results[field_settings.label] = candidate
363 else:
364 log.warning("Unknown value %s for OCR field %s", candidate, field_settings.label)
365 results[field_settings.label] = "Unknown"
366 else:
367 log.warning("Unparsable field %s: %s", field_settings.label, txt)
369 except Exception as e:
370 log.error("OCR fail on image:%s", e, exc_info=1)
371 results["OCR_ERROR"] = f"field:{field_settings.label}, error:{e}"
373 return results