Coverage for src / anpr2mqtt / handler_common.py: 79%
67 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-30 16:07 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-30 16:07 +0000
1import datetime as dt
2import threading
3from typing import Any
5import structlog
7from anpr2mqtt.api_client import APIClient, DVLAClient
8from anpr2mqtt.normalizers import Normalizer, fuzzy_match
9from anpr2mqtt.settings import TARGET_TYPE_PLATE, DVLASettings, EventSettings
11log = structlog.get_logger()
14def build_dvla_client(dvla_settings: DVLASettings, target_type: str | None = None) -> APIClient | None:
15 if not dvla_settings.api_key:
16 return None
17 if target_type is not None and target_type != TARGET_TYPE_PLATE: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true
18 return None
19 return DVLAClient(
20 dvla_settings.api_key,
21 cache_type=dvla_settings.cache_type,
22 cache_ttl=dvla_settings.cache_ttl,
23 cache_dir=dvla_settings.cache_dir,
24 verify_plate=dvla_settings.verify_plate,
25 )
28def correct_against_good_read(
29 plate: str,
30 cached: tuple[str, dt.datetime] | None,
31 ttl: int,
32 tolerance: int,
33 normalizer: Normalizer | None = None,
34) -> str:
35 if ttl <= 0 or tolerance <= 0 or cached is None:
36 return plate
37 good_plate, good_ts = cached
38 age = (dt.datetime.now(dt.UTC) - good_ts).total_seconds()
39 if age > ttl:
40 return plate
41 candidates: list[str] = [good_plate]
42 alt_candidate: str | None = normalizer.normalize(plate) if normalizer else None
43 if alt_candidate:
44 candidates.append(alt_candidate)
46 if fuzzy_match(plate, tolerance, candidates) is not None and plate != good_plate:
47 log.info("Correcting %s -> %s via last known good (age=%.1fs)", plate, good_plate, age)
48 return good_plate
49 return plate
52class CameraGatekeeper:
53 """Camera-level duplicate gate.
55 Suppresses events within the visit gap window even when the plate string differs
56 (e.g. two bad reads of the same vehicle). Exception: if the last published event
57 had no DVLA enrichment, one replacement is allowed through so a subsequently-enriched
58 read can supersede the raw notification.
59 """
61 def __init__(self) -> None:
62 self._last_time: dt.datetime | None = None
63 self._last_had_dvla: bool = False
64 self._lock = threading.Lock()
66 def allow(self, event_time: dt.datetime, has_dvla: bool, gap_seconds: int) -> bool:
67 """Return True (and update state) if this event should be published."""
68 with self._lock:
69 if self._last_time is None or gap_seconds <= 0: 69 ↛ 74line 69 didn't jump to line 74 because the condition on line 69 was always true
70 self._last_time = event_time
71 self._last_had_dvla = has_dvla
72 return True
74 elapsed = (event_time - self._last_time).total_seconds()
76 if elapsed < 0 or elapsed >= gap_seconds:
77 self._last_time = event_time
78 self._last_had_dvla = has_dvla
79 return True
81 # Within gap — prior had DVLA enrichment: suppress
82 if self._last_had_dvla:
83 log.info(
84 "Camera gate: suppressing within-gap event (prior enriched, elapsed=%.1fs gap=%ds)",
85 elapsed,
86 gap_seconds,
87 )
88 return False
90 # Prior lacked enrichment: allow one replacement
91 log.info(
92 "Camera gate: allowing replacement (prior unenriched, elapsed=%.1fs gap=%ds)",
93 elapsed,
94 gap_seconds,
95 )
96 self._last_time = event_time
97 self._last_had_dvla = has_dvla
98 return True
101class AutoclearTimer:
102 """Manages a cancel-and-restart debounce timer for a single autoclear slot."""
104 def __init__(self) -> None:
105 self._timer: threading.Timer | None = None
106 self._lock = threading.Lock()
108 def schedule(self, event_config: EventSettings, callback: Any, label: str) -> None:
109 autoclear = event_config.autoclear
110 if not autoclear.enabled:
111 return
112 with self._lock:
113 if self._timer is not None:
114 self._timer.cancel()
115 self._timer = threading.Timer(autoclear.post_event, callback)
116 self._timer.daemon = True
117 self._timer.start()
118 log.debug("Autoclear scheduled in %ss for %s", autoclear.post_event, label)