Coverage for src / anpr2mqtt / frigate_handler.py: 90%
209 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-30 16:07 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-30 16:07 +0000
1import datetime as dt
2import json
3import threading
4from io import BytesIO
5from typing import TYPE_CHECKING, Any, cast
7import niquests
8import paho.mqtt.client as mqtt
9import structlog
10from PIL import Image
12from anpr2mqtt.const import ImageInfo
13from anpr2mqtt.handler_common import AutoclearTimer, CameraGatekeeper, build_dvla_client, correct_against_good_read
14from anpr2mqtt.hass import HomeAssistantPublisher
15from anpr2mqtt.settings import (
16 TARGET_TYPE_PLATE,
17 CameraSettings,
18 DVLASettings,
19 EventSettings,
20 FrigateSettings,
21 ImageSettings,
22)
23from anpr2mqtt.tracker import Sighting, Tracker
25if TYPE_CHECKING:
26 from anpr2mqtt.api_client import APIClient
28log = structlog.get_logger()
30# dict value: (event_config, camera_settings, tracker, state_topic, image_topic)
31CameraConfig = tuple[EventSettings, CameraSettings, Tracker, str, str]
34class FrigateHandler:
35 def __init__(
36 self,
37 mqtt_client: mqtt.Client,
38 frigate_settings: FrigateSettings,
39 publisher: HomeAssistantPublisher,
40 image_settings: ImageSettings,
41 dvla_settings: DVLASettings,
42 camera_configs: dict[str, CameraConfig],
43 mqtt_topic_root: str = "anpr2mqtt",
44 default_tracker: Tracker | None = None,
45 ) -> None:
46 self.mqtt_client = mqtt_client
47 self.frigate_settings = frigate_settings
48 self.publisher = publisher
49 self.image_settings = image_settings
50 self.camera_configs = camera_configs
51 self.mqtt_topic_root = mqtt_topic_root
52 self.default_tracker = default_tracker
54 # Latest JPEG snapshot bytes per camera, from MQTT retained messages
55 self._snapshot_cache: dict[str, bytes] = {}
56 self._snapshot_lock = threading.Lock()
58 # Track processed event IDs to avoid duplicate publications
59 self._processed_events: set[str] = set()
60 self._processed_lock = threading.Lock()
62 # Per-camera last DVLA-confirmed plate for mis-read correction
63 self._last_good_plate: dict[str, tuple[str, dt.datetime]] = {}
64 self._good_plate_lock = threading.Lock()
66 # Per-camera autoclear timers
67 self._autoclear_timers: dict[str, AutoclearTimer] = {}
69 # Per-camera cross-plate duplicate gate
70 self._camera_gates: dict[str, CameraGatekeeper] = {}
72 self.api_client: APIClient | None = build_dvla_client(dvla_settings)
74 def start(self) -> None:
75 for topic in self.frigate_settings.topic:
76 self.mqtt_client.message_callback_add(topic, self._on_event_message)
77 self.mqtt_client.subscribe(topic)
78 if self.frigate_settings.cameras is None: 78 ↛ 82line 78 didn't jump to line 82 because the condition on line 78 was always true
79 self.mqtt_client.message_callback_add("frigate/+/snapshot", self._on_snapshot_message)
80 self.mqtt_client.subscribe("frigate/+/snapshot")
81 else:
82 for camera in self.frigate_settings.cameras:
83 self.mqtt_client.message_callback_add(f"frigate/{camera}/snapshot", self._on_snapshot_message)
84 self.mqtt_client.subscribe(f"frigate/{camera}/snapshot")
85 log.info(
86 "Frigate listener started, min_score=%.2f topic=%s",
87 self.frigate_settings.min_score,
88 self.frigate_settings.topic,
89 )
91 def _on_snapshot_message(self, _client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage) -> None:
92 parts = msg.topic.split("/")
93 if len(parts) >= 3 and msg.payload:
94 camera = parts[1]
95 with self._snapshot_lock:
96 self._snapshot_cache[camera] = bytes(msg.payload)
97 log.debug("Cached Frigate snapshot for camera %s (%d bytes)", camera, len(msg.payload))
99 def _on_event_message(self, _client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage) -> None:
100 try:
101 self._process_event(msg.topic, msg.payload)
102 except Exception as e:
103 log.error("Frigate event processing error: %s", e, exc_info=True)
105 def _process_event(self, topic: str, raw: bytes) -> None:
106 try:
107 payload: dict[str, Any] = json.loads(raw)
108 except json.JSONDecodeError as e:
109 log.error("Frigate event JSON parse error: %s", e)
110 return
112 event_data: dict[str, str | int | float | bool] = payload or {}
113 if topic == "frigate/events": 113 ↛ 115line 113 didn't jump to line 115 because the condition on line 113 was always true
114 event_data = payload.get("after", {}) or {}
115 elif topic != "frigate/tracked_object_update":
116 log.debug("Skipping Frigate message for %s", topic)
117 return
119 # For frigate/events the event type ("new"/"update"/"end") is at the top level of the
120 # payload; the after dict holds the tracked-object type (e.g. "car"). For
121 # frigate/tracked_object_update the type field is directly in the payload.
122 if topic == "frigate/events": 122 ↛ 125line 122 didn't jump to line 125 because the condition on line 122 was always true
123 event_type: str = cast("str", payload.get("type", "") or "")
124 else:
125 event_type = cast("str", event_data.get("type", "") or "")
126 event_id: str = cast("str", event_data.get("id", "") or "")
127 camera: str = cast("str", event_data.get("camera", "") or "")
129 # Only process 'end' for frigate/events — they carry the most complete plate data
130 if topic == "frigate/events" and event_type == "end":
131 log.debug("frigate/events received: %s %s %s", event_type, event_id, camera)
132 elif topic == "frigate/tracked_object_update" and event_type == "lpr": 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 log.debug("frigate/tracked_object_update: %s %s %s", event_type, event_id, camera)
134 else:
135 log.debug("Skipping %s: %s %s %s", topic, event_type, event_id, camera)
136 return
138 if self.frigate_settings.cameras and camera not in self.frigate_settings.cameras:
139 return
141 with self._processed_lock:
142 if event_id in self._processed_events:
143 return
144 self._processed_events.add(event_id)
145 # Bound memory usage
146 if len(self._processed_events) > 5000:
147 self._processed_events = set(list(self._processed_events)[4000:])
149 extra_info: dict[str, dict[str, Any]] = {"frigate": {}}
150 if topic == "frigate/events": 150 ↛ 167line 150 didn't jump to line 167 because the condition on line 150 was always true
151 after_data: dict[str, str | int | float | bool] = payload.get("after", {}) or {}
152 plate: str | None = cast("str|None", after_data.get("recognized_license_plate"))
153 score: float | None = float(after_data.get("recognized_license_plate_score", 0.0))
154 description: str | None = cast("str|None", after_data.get("label"))
155 for attr in (
156 "recognized_license_plate_score",
157 "velocity_angle",
158 "current_estimated_speed",
159 "average_estimated_speed",
160 "label",
161 "sub_label",
162 "",
163 ):
164 if after_data.get(attr):
165 extra_info["frigate"][attr] = after_data.get(attr)
166 else:
167 plate = payload.get("plate")
168 score = payload.get("score")
169 description = payload.get("name") or "Unknown"
170 if not plate:
171 log.debug("Frigate event %s has no recognized plate, %s", event_id, description)
172 return
174 if score is None: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 log.info("Frigate event %s has plate %s with no score, skipping", event_id, plate)
176 elif score < self.frigate_settings.min_score:
177 log.info(
178 "Frigate event %s has plate %s score %.3f below threshold %.3f, skipping",
179 event_id,
180 plate,
181 score,
182 self.frigate_settings.min_score,
183 )
184 return
186 log.info("Frigate event %s: plate=%s score=%.3f camera=%s", event_id, plate, score, camera)
188 image: Image.Image | None = self._get_event_image(event_id, camera)
189 event_config, camera_settings, tracker, state_topic, image_topic = self._resolve_camera_config(camera)
191 with self._good_plate_lock:
192 cached = self._last_good_plate.get(camera)
193 plate = correct_against_good_read(
194 plate, cached, event_config.good_read_ttl, event_config.good_read_tolerance, tracker.normalizer
195 )
197 start_time: float = float(payload.get("start_time") or 0)
198 timestamp = dt.datetime.fromtimestamp(start_time, tz=dt.UTC) if start_time else dt.datetime.now(dt.UTC)
200 image_size = 0
201 if image:
202 buf = BytesIO()
203 image.save(buf, "JPEG")
204 image_size = buf.tell()
206 image_info = ImageInfo(target=plate, event="frigate_event", timestamp=timestamp, ext="jpg", size=image_size)
208 sighting: Sighting = tracker.find(plate)
210 reg_info: dict[str, Any] | None = None
211 if sighting.target.lookup and self.api_client and event_config.target_type == TARGET_TYPE_PLATE:
212 api_info: dict[str, Any] = self.api_client.lookup(sighting.target.id)
213 if api_info.get("success"): 213 ↛ 220line 213 didn't jump to line 220 because the condition on line 213 was always true
214 reg_info = api_info.get("plate")
215 if sighting.target.description is None and api_info.get("description"): 215 ↛ 217line 215 didn't jump to line 217 because the condition on line 215 was always true
216 sighting.target.description = api_info["description"]
217 with self._good_plate_lock:
218 self._last_good_plate[camera] = (sighting.target.id, dt.datetime.now(dt.UTC))
220 time_analysis: dict[str, Any] = tracker.record(sighting.target.id, event_config.target_type, timestamp)
222 if not time_analysis.get("is_new_visit", True):
223 log.info("Skipping duplicate Frigate visit for %s (within gap window)", sighting.target.id)
224 return
226 gate = self._camera_gates.setdefault(camera, CameraGatekeeper())
227 if not gate.allow(timestamp, reg_info is not None, tracker.tracker_config.min_visit_gap_seconds): 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true
228 log.info("Skipping cross-plate duplicate for camera %s (plate=%s)", camera, plate)
229 return
231 if sighting.target.entity_id:
232 target_state_topic = f"{self.mqtt_topic_root}/{event_config.event}/targets/{sighting.target.entity_id}/state"
233 self.publisher.publish_target_state(
234 state_topic=target_state_topic,
235 description=sighting.target.description,
236 time_analysis=time_analysis,
237 )
239 if sighting.ignore:
240 log.info("Skipping MQTT for ignored plate %s", sighting.target.id)
241 return
243 frigate_ui_url: str | None = None
244 if self.frigate_settings.url:
245 frigate_ui_url = f"{self.frigate_settings.url}/events?id={event_id}"
247 self.publisher.post_state_message(
248 state_topic,
249 sighting=sighting,
250 event_config=event_config,
251 camera=camera_settings,
252 image_info=image_info,
253 time_analysis=time_analysis,
254 reg_info=reg_info,
255 extra_info=extra_info,
256 source="frigate",
257 frigate_event_id=event_id,
258 frigate_ui_url=frigate_ui_url,
259 )
261 if image:
262 self.publisher.post_image_message(image_topic, image, "JPEG")
264 self._schedule_autoclear(camera, event_config, state_topic, image_topic)
266 def _get_event_image(self, event_id: str, camera: str) -> Image.Image | None:
267 # Prefer the event-specific Frigate API snapshot — avoids stale MQTT cache from a subsequent vehicle
268 if self.frigate_settings.url:
269 img = self._fetch_api_snapshot(event_id)
270 if img is not None:
271 return img
272 log.debug("API snapshot unavailable for %s, falling back to MQTT cache", event_id)
274 with self._snapshot_lock:
275 snapshot_bytes = self._snapshot_cache.get(camera)
277 if snapshot_bytes:
278 try:
279 img = Image.open(BytesIO(snapshot_bytes))
280 img.load()
281 log.debug("Using MQTT snapshot for event %s camera %s", event_id, camera)
282 return img
283 except Exception as e:
284 log.warning("Failed to decode MQTT snapshot for camera %s: %s", camera, e)
286 log.warning("No image for Frigate event %s (no MQTT snapshot cached, no url configured)", event_id)
287 return None
289 def _fetch_api_snapshot(self, event_id: str) -> Image.Image | None:
290 url = f"{self.frigate_settings.url}/api/events/{event_id}/snapshot.jpg"
291 try:
292 resp = niquests.get(url, timeout=10)
293 if resp.status_code == 200 and resp.content:
294 img = Image.open(BytesIO(resp.content))
295 img.load()
296 log.info("Fetched API snapshot for event %s (%d bytes)", event_id, len(resp.content))
297 return img
298 log.warning("API snapshot for event %s returned HTTP %s", event_id, resp.status_code)
299 except Exception as e:
300 log.warning("Failed to fetch API snapshot for event %s: %s", event_id, e)
301 return None
303 def _resolve_camera_config(self, camera: str) -> CameraConfig:
304 if camera in self.camera_configs:
305 return self.camera_configs[camera]
307 # No matching event config — synthesise defaults using shared tracker if available
308 log.warning("No event config found for Frigate camera %s, using defaults", camera)
309 event_config = EventSettings(camera=camera, event="anpr", target_type=TARGET_TYPE_PLATE)
310 camera_settings = CameraSettings(name=camera)
311 from anpr2mqtt.settings import TrackerSettings
313 tracker = self.default_tracker or Tracker(
314 target_type=TARGET_TYPE_PLATE,
315 tracker_config=TrackerSettings(),
316 target_config=None,
317 auto_match_tolerance=1,
318 )
319 state_topic = f"{self.mqtt_topic_root}/{event_config.event}/cameras/{camera}/state"
320 image_topic = f"{self.mqtt_topic_root}/{event_config.event}/cameras/{camera}/image"
321 return event_config, camera_settings, tracker, state_topic, image_topic
323 def _schedule_autoclear(self, camera: str, event_config: EventSettings, state_topic: str, image_topic: str) -> None:
324 if not event_config.autoclear.enabled:
325 return
326 if camera not in self._autoclear_timers:
327 self._autoclear_timers[camera] = AutoclearTimer()
328 self._autoclear_timers[camera].schedule(
329 event_config,
330 lambda: self._do_autoclear(camera, event_config, state_topic, image_topic),
331 f"camera {camera}",
332 )
334 def _do_autoclear(self, camera: str, event_config: EventSettings, state_topic: str, image_topic: str) -> None:
335 log.info("Frigate autoclear firing for camera %s", camera)
336 autoclear = event_config.autoclear
337 _, camera_settings, _, _, _ = self._resolve_camera_config(camera)
338 if autoclear.state:
339 self.publisher.post_state_message(state_topic, sighting=None, event_config=event_config, camera=camera_settings)
340 if autoclear.image:
341 self.publisher.post_image_message(image_topic, image=None)