Coverage for src / anpr2mqtt / tracker.py: 93%
155 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-30 16:07 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-30 16:07 +0000
1import datetime as dt
2import json
3import re
4from dataclasses import dataclass
5from typing import Any, cast
7import structlog
8import tzlocal
10from anpr2mqtt.normalizers import Normalizer, fuzzy_match
11from anpr2mqtt.settings import (
12 Target,
13 TargetSettings,
14 TrackerSettings,
15)
17log = structlog.get_logger()
20@dataclass
21class Sighting:
22 target: Target
23 uncorrected: str | None = None
24 ignore: bool = False
25 previous_sightings: list[str] | None = None
27 def as_dict(self) -> dict[str, Any]:
28 result = self.target.as_dict()
29 result.update({"orig_target": self.uncorrected, "ignore": self.ignore})
30 return result
33class Tracker:
34 def __init__(
35 self,
36 target_type: str,
37 tracker_config: TrackerSettings,
38 region: str | None = None,
39 target_config: TargetSettings | None = None,
40 auto_match_tolerance: int = 0,
41 ) -> None:
42 self.target_type: str = target_type
43 self.tracker_config: TrackerSettings = tracker_config
44 self.entities: dict[str, list[Target]] = {}
45 self.ids: dict[str, Target] = {}
46 self._target_config: TargetSettings | None = None
47 self.target_config = target_config
48 self.auto_match_tolerance = auto_match_tolerance
49 self.region: str | None = region
50 self.normalizer: Normalizer | None = None
51 if region and target_type:
52 self.normalizer = Normalizer(target_type=target_type, region=region)
54 @property
55 def target_config(self) -> TargetSettings | None:
56 return self._target_config
58 @target_config.setter
59 def target_config(self, value: TargetSettings | None) -> None:
60 self._target_config = value
61 self.ids = {}
62 self.entities = {}
63 if value:
64 for target_group in value.groups:
65 for target in target_group.members:
66 self.ids[target.id] = target
67 entity_id: str | None = target.entity_id
68 if entity_id is not None: 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true
69 self.entities.setdefault(entity_id, [])
70 self.entities[entity_id].append(target)
72 def history(self, target_id: str, target_type: str) -> list[str]:
73 target_id = target_id or "UNKNOWN"
74 target_type_path = self.tracker_config.data_dir / target_type
75 target_type_path.mkdir(exist_ok=True)
76 target_file = target_type_path / f"{target_id}.json"
78 try:
79 if target_file.exists():
80 with target_file.open("r") as f:
81 return cast("list[str]", json.load(f))
83 except Exception as e:
84 log.exception("Failed to find sightings for %s:%s", target_id, e)
85 return []
87 def record(self, target: str, target_type: str, event_dt: dt.datetime | None) -> dict[str, Any]:
88 target = target or "UNKNOWN"
89 target_type_path = self.tracker_config.data_dir / target_type
90 target_type_path.mkdir(exist_ok=True)
91 target_file = target_type_path / f"{target}.json"
92 sightings = self.history(target, target_type)
93 time_analysis: dict[str, Any] = {}
94 try:
95 time_analysis = compute_time_analysis(sightings, event_dt)
96 min_gap = self.tracker_config.min_visit_gap_seconds
97 if min_gap > 0 and sightings:
98 try:
99 last_ts = dt.datetime.fromisoformat(sightings[-1])
100 current_ts = event_dt if event_dt is not None else dt.datetime.now(tz=tzlocal.get_localzone())
101 if last_ts.tzinfo is None: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true
102 last_ts = last_ts.replace(tzinfo=tzlocal.get_localzone())
103 if current_ts.tzinfo is None: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 current_ts = current_ts.replace(tzinfo=tzlocal.get_localzone())
105 elapsed = (current_ts - last_ts).total_seconds()
106 if 0 <= elapsed < min_gap:
107 log.info("Visit gap: %s seen %.1fs ago (gap=%ds), not recording", target, elapsed, min_gap)
108 time_analysis["is_new_visit"] = False
109 return time_analysis
110 except Exception as gap_err:
111 log.warning("Visit gap check failed for %s: %s", target, gap_err)
112 sightings.append(event_dt.isoformat() if event_dt else dt.datetime.now(tz=tzlocal.get_localzone()).isoformat())
113 with target_file.open("w") as f:
114 json.dump(sightings, f)
115 time_analysis["is_new_visit"] = True
116 except Exception as e:
117 log.exception("Failed to record sightings for %s:%s", target, e)
118 return time_analysis
120 def find(self, target_id: str) -> Sighting:
121 result: Sighting = Sighting(
122 target=Target(id=target_id, target_type=self.target_type, priority="high", lookup=True), uncorrected=target_id
123 )
124 if not target_id or self.target_config is None:
125 # empty dict to make home assistant template logic easier
126 return result
128 if self.normalizer:
129 normalised = self.normalizer.normalize(target_id)
130 if normalised: 130 ↛ 135line 130 didn't jump to line 135 because the condition on line 130 was always true
131 log.info("%s %s normalised %s -> %s", self.region, self.target_type, target_id, normalised)
132 target_id = normalised
133 result.target.id = normalised
135 lookup_id = target_id
136 for corrected_target, patterns in self.target_config.correction.items():
137 if any(re.match(pat, target_id) for pat in patterns) and corrected_target != target_id: 137 ↛ 136line 137 didn't jump to line 136 because the condition on line 137 was always true
138 result.target.id = corrected_target
139 lookup_id = corrected_target
140 log.info("Corrected target %s -> %s", target_id, lookup_id)
141 break
142 if lookup_id == target_id:
143 for registered_target in self.ids.values():
144 if any(re.match(pat, target_id) for pat in registered_target.correction):
145 lookup_id = registered_target.id
146 result.target.id = registered_target.id
147 log.info("Corrected target %s -> %s (per-target)", target_id, lookup_id)
148 break
149 for pat in self.target_config.ignore:
150 if re.match(pat, target_id): 150 ↛ 149line 150 didn't jump to line 149 because the condition on line 150 was always true
151 log.info("Ignoring %s matching ignore pattern %s", target_id, pat)
152 result.ignore = True
153 result.target.priority = "low"
154 if result.target.group is None: # not yet found in registered lists 154 ↛ 156line 154 didn't jump to line 156 because the condition on line 154 was always true
155 result.target.description = "Ignored"
156 break
157 max_dist = self.auto_match_tolerance
158 target: Target | None = None
159 registered_match: str | None = (
160 lookup_id
161 if lookup_id in self.ids
162 else (fuzzy_match(lookup_id, max_dist, list(self.ids.keys())) if max_dist > 0 else None)
163 )
164 if registered_match:
165 target = self.ids[registered_match]
167 if target:
168 if registered_match != lookup_id:
169 log.info("Fuzzy-matched %s to registered plate %s", lookup_id, registered_match)
170 result.target = target
171 return result
174def compute_time_analysis(sightings: list[str], current_dt: dt.datetime | None = None) -> dict[str, Any]:
175 """Derive visit history and time-of-day statistics from previous sightings.
177 Called before the current visit is appended, so all counts/times reflect prior history only.
179 Returns a dict with:
180 - previous_sightings: int — number of times seen before the current visit
181 - last_seen: ISO datetime string of the most recent prior sighting, or None
182 - hourly_counts: dict[int,int] of 24 ints, index = hour (0-23)
183 - earliest_time: "HH:MM:SS" of the earliest time-of-day previously seen, or None
184 - latest_time: "HH:MM:SS" of the latest time-of-day previously seen, or None
185 - within_time_range: bool — current time falls within [earliest, latest], or None if no history
186 """
187 hourly_counts: dict[int, int] = {}
188 times: list[dt.time] = []
189 last_seen: str | None = sightings[-1] if sightings else None
190 for s in sightings:
191 try:
192 ts: dt.datetime = dt.datetime.fromisoformat(s)
193 hourly_counts.setdefault(ts.hour, 0)
194 hourly_counts[ts.hour] += 1
195 times.append(ts.replace(tzinfo=None).time())
196 except Exception as e:
197 log.warning("Skipping unparsable sighting timestamp %r: %s", s, e)
199 earliest = min(times) if times else None
200 latest = max(times) if times else None
202 result = {
203 "previous_sightings": len(sightings),
204 "last_seen": last_seen,
205 "hourly_counts": hourly_counts,
206 "earliest_time": earliest.isoformat() if earliest else None,
207 "latest_time": latest.isoformat() if latest else None,
208 }
209 if current_dt is not None:
210 if earliest is not None and latest is not None:
211 current_t = current_dt.replace(tzinfo=None).time()
212 result["within_time_range"] = earliest <= current_t <= latest
213 else:
214 result["within_time_range"] = None
216 return result