Coverage for src / anpr2mqtt / handler_common.py: 79%

67 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-30 16:07 +0000

1import datetime as dt 

2import threading 

3from typing import Any 

4 

5import structlog 

6 

7from anpr2mqtt.api_client import APIClient, DVLAClient 

8from anpr2mqtt.normalizers import Normalizer, fuzzy_match 

9from anpr2mqtt.settings import TARGET_TYPE_PLATE, DVLASettings, EventSettings 

10 

11log = structlog.get_logger() 

12 

13 

14def build_dvla_client(dvla_settings: DVLASettings, target_type: str | None = None) -> APIClient | None: 

15 if not dvla_settings.api_key: 

16 return None 

17 if target_type is not None and target_type != TARGET_TYPE_PLATE: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true

18 return None 

19 return DVLAClient( 

20 dvla_settings.api_key, 

21 cache_type=dvla_settings.cache_type, 

22 cache_ttl=dvla_settings.cache_ttl, 

23 cache_dir=dvla_settings.cache_dir, 

24 verify_plate=dvla_settings.verify_plate, 

25 ) 

26 

27 

28def correct_against_good_read( 

29 plate: str, 

30 cached: tuple[str, dt.datetime] | None, 

31 ttl: int, 

32 tolerance: int, 

33 normalizer: Normalizer | None = None, 

34) -> str: 

35 if ttl <= 0 or tolerance <= 0 or cached is None: 

36 return plate 

37 good_plate, good_ts = cached 

38 age = (dt.datetime.now(dt.UTC) - good_ts).total_seconds() 

39 if age > ttl: 

40 return plate 

41 candidates: list[str] = [good_plate] 

42 alt_candidate: str | None = normalizer.normalize(plate) if normalizer else None 

43 if alt_candidate: 

44 candidates.append(alt_candidate) 

45 

46 if fuzzy_match(plate, tolerance, candidates) is not None and plate != good_plate: 

47 log.info("Correcting %s -> %s via last known good (age=%.1fs)", plate, good_plate, age) 

48 return good_plate 

49 return plate 

50 

51 

52class CameraGatekeeper: 

53 """Camera-level duplicate gate. 

54 

55 Suppresses events within the visit gap window even when the plate string differs 

56 (e.g. two bad reads of the same vehicle). Exception: if the last published event 

57 had no DVLA enrichment, one replacement is allowed through so a subsequently-enriched 

58 read can supersede the raw notification. 

59 """ 

60 

61 def __init__(self) -> None: 

62 self._last_time: dt.datetime | None = None 

63 self._last_had_dvla: bool = False 

64 self._lock = threading.Lock() 

65 

66 def allow(self, event_time: dt.datetime, has_dvla: bool, gap_seconds: int) -> bool: 

67 """Return True (and update state) if this event should be published.""" 

68 with self._lock: 

69 if self._last_time is None or gap_seconds <= 0: 69 ↛ 74line 69 didn't jump to line 74 because the condition on line 69 was always true

70 self._last_time = event_time 

71 self._last_had_dvla = has_dvla 

72 return True 

73 

74 elapsed = (event_time - self._last_time).total_seconds() 

75 

76 if elapsed < 0 or elapsed >= gap_seconds: 

77 self._last_time = event_time 

78 self._last_had_dvla = has_dvla 

79 return True 

80 

81 # Within gap — prior had DVLA enrichment: suppress 

82 if self._last_had_dvla: 

83 log.info( 

84 "Camera gate: suppressing within-gap event (prior enriched, elapsed=%.1fs gap=%ds)", 

85 elapsed, 

86 gap_seconds, 

87 ) 

88 return False 

89 

90 # Prior lacked enrichment: allow one replacement 

91 log.info( 

92 "Camera gate: allowing replacement (prior unenriched, elapsed=%.1fs gap=%ds)", 

93 elapsed, 

94 gap_seconds, 

95 ) 

96 self._last_time = event_time 

97 self._last_had_dvla = has_dvla 

98 return True 

99 

100 

101class AutoclearTimer: 

102 """Manages a cancel-and-restart debounce timer for a single autoclear slot.""" 

103 

104 def __init__(self) -> None: 

105 self._timer: threading.Timer | None = None 

106 self._lock = threading.Lock() 

107 

108 def schedule(self, event_config: EventSettings, callback: Any, label: str) -> None: 

109 autoclear = event_config.autoclear 

110 if not autoclear.enabled: 

111 return 

112 with self._lock: 

113 if self._timer is not None: 

114 self._timer.cancel() 

115 self._timer = threading.Timer(autoclear.post_event, callback) 

116 self._timer.daemon = True 

117 self._timer.start() 

118 log.debug("Autoclear scheduled in %ss for %s", autoclear.post_event, label)