Coverage for src / anpr2mqtt / frigate_handler.py: 90%

209 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-30 16:07 +0000

1import datetime as dt 

2import json 

3import threading 

4from io import BytesIO 

5from typing import TYPE_CHECKING, Any, cast 

6 

7import niquests 

8import paho.mqtt.client as mqtt 

9import structlog 

10from PIL import Image 

11 

12from anpr2mqtt.const import ImageInfo 

13from anpr2mqtt.handler_common import AutoclearTimer, CameraGatekeeper, build_dvla_client, correct_against_good_read 

14from anpr2mqtt.hass import HomeAssistantPublisher 

15from anpr2mqtt.settings import ( 

16 TARGET_TYPE_PLATE, 

17 CameraSettings, 

18 DVLASettings, 

19 EventSettings, 

20 FrigateSettings, 

21 ImageSettings, 

22) 

23from anpr2mqtt.tracker import Sighting, Tracker 

24 

25if TYPE_CHECKING: 

26 from anpr2mqtt.api_client import APIClient 

27 

28log = structlog.get_logger() 

29 

30# dict value: (event_config, camera_settings, tracker, state_topic, image_topic) 

31CameraConfig = tuple[EventSettings, CameraSettings, Tracker, str, str] 

32 

33 

34class FrigateHandler: 

35 def __init__( 

36 self, 

37 mqtt_client: mqtt.Client, 

38 frigate_settings: FrigateSettings, 

39 publisher: HomeAssistantPublisher, 

40 image_settings: ImageSettings, 

41 dvla_settings: DVLASettings, 

42 camera_configs: dict[str, CameraConfig], 

43 mqtt_topic_root: str = "anpr2mqtt", 

44 default_tracker: Tracker | None = None, 

45 ) -> None: 

46 self.mqtt_client = mqtt_client 

47 self.frigate_settings = frigate_settings 

48 self.publisher = publisher 

49 self.image_settings = image_settings 

50 self.camera_configs = camera_configs 

51 self.mqtt_topic_root = mqtt_topic_root 

52 self.default_tracker = default_tracker 

53 

54 # Latest JPEG snapshot bytes per camera, from MQTT retained messages 

55 self._snapshot_cache: dict[str, bytes] = {} 

56 self._snapshot_lock = threading.Lock() 

57 

58 # Track processed event IDs to avoid duplicate publications 

59 self._processed_events: set[str] = set() 

60 self._processed_lock = threading.Lock() 

61 

62 # Per-camera last DVLA-confirmed plate for mis-read correction 

63 self._last_good_plate: dict[str, tuple[str, dt.datetime]] = {} 

64 self._good_plate_lock = threading.Lock() 

65 

66 # Per-camera autoclear timers 

67 self._autoclear_timers: dict[str, AutoclearTimer] = {} 

68 

69 # Per-camera cross-plate duplicate gate 

70 self._camera_gates: dict[str, CameraGatekeeper] = {} 

71 

72 self.api_client: APIClient | None = build_dvla_client(dvla_settings) 

73 

74 def start(self) -> None: 

75 for topic in self.frigate_settings.topic: 

76 self.mqtt_client.message_callback_add(topic, self._on_event_message) 

77 self.mqtt_client.subscribe(topic) 

78 if self.frigate_settings.cameras is None: 78 ↛ 82line 78 didn't jump to line 82 because the condition on line 78 was always true

79 self.mqtt_client.message_callback_add("frigate/+/snapshot", self._on_snapshot_message) 

80 self.mqtt_client.subscribe("frigate/+/snapshot") 

81 else: 

82 for camera in self.frigate_settings.cameras: 

83 self.mqtt_client.message_callback_add(f"frigate/{camera}/snapshot", self._on_snapshot_message) 

84 self.mqtt_client.subscribe(f"frigate/{camera}/snapshot") 

85 log.info( 

86 "Frigate listener started, min_score=%.2f topic=%s", 

87 self.frigate_settings.min_score, 

88 self.frigate_settings.topic, 

89 ) 

90 

91 def _on_snapshot_message(self, _client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage) -> None: 

92 parts = msg.topic.split("/") 

93 if len(parts) >= 3 and msg.payload: 

94 camera = parts[1] 

95 with self._snapshot_lock: 

96 self._snapshot_cache[camera] = bytes(msg.payload) 

97 log.debug("Cached Frigate snapshot for camera %s (%d bytes)", camera, len(msg.payload)) 

98 

99 def _on_event_message(self, _client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage) -> None: 

100 try: 

101 self._process_event(msg.topic, msg.payload) 

102 except Exception as e: 

103 log.error("Frigate event processing error: %s", e, exc_info=True) 

104 

105 def _process_event(self, topic: str, raw: bytes) -> None: 

106 try: 

107 payload: dict[str, Any] = json.loads(raw) 

108 except json.JSONDecodeError as e: 

109 log.error("Frigate event JSON parse error: %s", e) 

110 return 

111 

112 event_data: dict[str, str | int | float | bool] = payload or {} 

113 if topic == "frigate/events": 113 ↛ 115line 113 didn't jump to line 115 because the condition on line 113 was always true

114 event_data = payload.get("after", {}) or {} 

115 elif topic != "frigate/tracked_object_update": 

116 log.debug("Skipping Frigate message for %s", topic) 

117 return 

118 

119 # For frigate/events the event type ("new"/"update"/"end") is at the top level of the 

120 # payload; the after dict holds the tracked-object type (e.g. "car"). For 

121 # frigate/tracked_object_update the type field is directly in the payload. 

122 if topic == "frigate/events": 122 ↛ 125line 122 didn't jump to line 125 because the condition on line 122 was always true

123 event_type: str = cast("str", payload.get("type", "") or "") 

124 else: 

125 event_type = cast("str", event_data.get("type", "") or "") 

126 event_id: str = cast("str", event_data.get("id", "") or "") 

127 camera: str = cast("str", event_data.get("camera", "") or "") 

128 

129 # Only process 'end' for frigate/events — they carry the most complete plate data 

130 if topic == "frigate/events" and event_type == "end": 

131 log.debug("frigate/events received: %s %s %s", event_type, event_id, camera) 

132 elif topic == "frigate/tracked_object_update" and event_type == "lpr": 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true

133 log.debug("frigate/tracked_object_update: %s %s %s", event_type, event_id, camera) 

134 else: 

135 log.debug("Skipping %s: %s %s %s", topic, event_type, event_id, camera) 

136 return 

137 

138 if self.frigate_settings.cameras and camera not in self.frigate_settings.cameras: 

139 return 

140 

141 with self._processed_lock: 

142 if event_id in self._processed_events: 

143 return 

144 self._processed_events.add(event_id) 

145 # Bound memory usage 

146 if len(self._processed_events) > 5000: 

147 self._processed_events = set(list(self._processed_events)[4000:]) 

148 

149 extra_info: dict[str, dict[str, Any]] = {"frigate": {}} 

150 if topic == "frigate/events": 150 ↛ 167line 150 didn't jump to line 167 because the condition on line 150 was always true

151 after_data: dict[str, str | int | float | bool] = payload.get("after", {}) or {} 

152 plate: str | None = cast("str|None", after_data.get("recognized_license_plate")) 

153 score: float | None = float(after_data.get("recognized_license_plate_score", 0.0)) 

154 description: str | None = cast("str|None", after_data.get("label")) 

155 for attr in ( 

156 "recognized_license_plate_score", 

157 "velocity_angle", 

158 "current_estimated_speed", 

159 "average_estimated_speed", 

160 "label", 

161 "sub_label", 

162 "", 

163 ): 

164 if after_data.get(attr): 

165 extra_info["frigate"][attr] = after_data.get(attr) 

166 else: 

167 plate = payload.get("plate") 

168 score = payload.get("score") 

169 description = payload.get("name") or "Unknown" 

170 if not plate: 

171 log.debug("Frigate event %s has no recognized plate, %s", event_id, description) 

172 return 

173 

174 if score is None: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 log.info("Frigate event %s has plate %s with no score, skipping", event_id, plate) 

176 elif score < self.frigate_settings.min_score: 

177 log.info( 

178 "Frigate event %s has plate %s score %.3f below threshold %.3f, skipping", 

179 event_id, 

180 plate, 

181 score, 

182 self.frigate_settings.min_score, 

183 ) 

184 return 

185 

186 log.info("Frigate event %s: plate=%s score=%.3f camera=%s", event_id, plate, score, camera) 

187 

188 image: Image.Image | None = self._get_event_image(event_id, camera) 

189 event_config, camera_settings, tracker, state_topic, image_topic = self._resolve_camera_config(camera) 

190 

191 with self._good_plate_lock: 

192 cached = self._last_good_plate.get(camera) 

193 plate = correct_against_good_read( 

194 plate, cached, event_config.good_read_ttl, event_config.good_read_tolerance, tracker.normalizer 

195 ) 

196 

197 start_time: float = float(payload.get("start_time") or 0) 

198 timestamp = dt.datetime.fromtimestamp(start_time, tz=dt.UTC) if start_time else dt.datetime.now(dt.UTC) 

199 

200 image_size = 0 

201 if image: 

202 buf = BytesIO() 

203 image.save(buf, "JPEG") 

204 image_size = buf.tell() 

205 

206 image_info = ImageInfo(target=plate, event="frigate_event", timestamp=timestamp, ext="jpg", size=image_size) 

207 

208 sighting: Sighting = tracker.find(plate) 

209 

210 reg_info: dict[str, Any] | None = None 

211 if sighting.target.lookup and self.api_client and event_config.target_type == TARGET_TYPE_PLATE: 

212 api_info: dict[str, Any] = self.api_client.lookup(sighting.target.id) 

213 if api_info.get("success"): 213 ↛ 220line 213 didn't jump to line 220 because the condition on line 213 was always true

214 reg_info = api_info.get("plate") 

215 if sighting.target.description is None and api_info.get("description"): 215 ↛ 217line 215 didn't jump to line 217 because the condition on line 215 was always true

216 sighting.target.description = api_info["description"] 

217 with self._good_plate_lock: 

218 self._last_good_plate[camera] = (sighting.target.id, dt.datetime.now(dt.UTC)) 

219 

220 time_analysis: dict[str, Any] = tracker.record(sighting.target.id, event_config.target_type, timestamp) 

221 

222 if not time_analysis.get("is_new_visit", True): 

223 log.info("Skipping duplicate Frigate visit for %s (within gap window)", sighting.target.id) 

224 return 

225 

226 gate = self._camera_gates.setdefault(camera, CameraGatekeeper()) 

227 if not gate.allow(timestamp, reg_info is not None, tracker.tracker_config.min_visit_gap_seconds): 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 log.info("Skipping cross-plate duplicate for camera %s (plate=%s)", camera, plate) 

229 return 

230 

231 if sighting.target.entity_id: 

232 target_state_topic = f"{self.mqtt_topic_root}/{event_config.event}/targets/{sighting.target.entity_id}/state" 

233 self.publisher.publish_target_state( 

234 state_topic=target_state_topic, 

235 description=sighting.target.description, 

236 time_analysis=time_analysis, 

237 ) 

238 

239 if sighting.ignore: 

240 log.info("Skipping MQTT for ignored plate %s", sighting.target.id) 

241 return 

242 

243 frigate_ui_url: str | None = None 

244 if self.frigate_settings.url: 

245 frigate_ui_url = f"{self.frigate_settings.url}/events?id={event_id}" 

246 

247 self.publisher.post_state_message( 

248 state_topic, 

249 sighting=sighting, 

250 event_config=event_config, 

251 camera=camera_settings, 

252 image_info=image_info, 

253 time_analysis=time_analysis, 

254 reg_info=reg_info, 

255 extra_info=extra_info, 

256 source="frigate", 

257 frigate_event_id=event_id, 

258 frigate_ui_url=frigate_ui_url, 

259 ) 

260 

261 if image: 

262 self.publisher.post_image_message(image_topic, image, "JPEG") 

263 

264 self._schedule_autoclear(camera, event_config, state_topic, image_topic) 

265 

266 def _get_event_image(self, event_id: str, camera: str) -> Image.Image | None: 

267 # Prefer the event-specific Frigate API snapshot — avoids stale MQTT cache from a subsequent vehicle 

268 if self.frigate_settings.url: 

269 img = self._fetch_api_snapshot(event_id) 

270 if img is not None: 

271 return img 

272 log.debug("API snapshot unavailable for %s, falling back to MQTT cache", event_id) 

273 

274 with self._snapshot_lock: 

275 snapshot_bytes = self._snapshot_cache.get(camera) 

276 

277 if snapshot_bytes: 

278 try: 

279 img = Image.open(BytesIO(snapshot_bytes)) 

280 img.load() 

281 log.debug("Using MQTT snapshot for event %s camera %s", event_id, camera) 

282 return img 

283 except Exception as e: 

284 log.warning("Failed to decode MQTT snapshot for camera %s: %s", camera, e) 

285 

286 log.warning("No image for Frigate event %s (no MQTT snapshot cached, no url configured)", event_id) 

287 return None 

288 

289 def _fetch_api_snapshot(self, event_id: str) -> Image.Image | None: 

290 url = f"{self.frigate_settings.url}/api/events/{event_id}/snapshot.jpg" 

291 try: 

292 resp = niquests.get(url, timeout=10) 

293 if resp.status_code == 200 and resp.content: 

294 img = Image.open(BytesIO(resp.content)) 

295 img.load() 

296 log.info("Fetched API snapshot for event %s (%d bytes)", event_id, len(resp.content)) 

297 return img 

298 log.warning("API snapshot for event %s returned HTTP %s", event_id, resp.status_code) 

299 except Exception as e: 

300 log.warning("Failed to fetch API snapshot for event %s: %s", event_id, e) 

301 return None 

302 

303 def _resolve_camera_config(self, camera: str) -> CameraConfig: 

304 if camera in self.camera_configs: 

305 return self.camera_configs[camera] 

306 

307 # No matching event config — synthesise defaults using shared tracker if available 

308 log.warning("No event config found for Frigate camera %s, using defaults", camera) 

309 event_config = EventSettings(camera=camera, event="anpr", target_type=TARGET_TYPE_PLATE) 

310 camera_settings = CameraSettings(name=camera) 

311 from anpr2mqtt.settings import TrackerSettings 

312 

313 tracker = self.default_tracker or Tracker( 

314 target_type=TARGET_TYPE_PLATE, 

315 tracker_config=TrackerSettings(), 

316 target_config=None, 

317 auto_match_tolerance=1, 

318 ) 

319 state_topic = f"{self.mqtt_topic_root}/{event_config.event}/cameras/{camera}/state" 

320 image_topic = f"{self.mqtt_topic_root}/{event_config.event}/cameras/{camera}/image" 

321 return event_config, camera_settings, tracker, state_topic, image_topic 

322 

323 def _schedule_autoclear(self, camera: str, event_config: EventSettings, state_topic: str, image_topic: str) -> None: 

324 if not event_config.autoclear.enabled: 

325 return 

326 if camera not in self._autoclear_timers: 

327 self._autoclear_timers[camera] = AutoclearTimer() 

328 self._autoclear_timers[camera].schedule( 

329 event_config, 

330 lambda: self._do_autoclear(camera, event_config, state_topic, image_topic), 

331 f"camera {camera}", 

332 ) 

333 

334 def _do_autoclear(self, camera: str, event_config: EventSettings, state_topic: str, image_topic: str) -> None: 

335 log.info("Frigate autoclear firing for camera %s", camera) 

336 autoclear = event_config.autoclear 

337 _, camera_settings, _, _, _ = self._resolve_camera_config(camera) 

338 if autoclear.state: 

339 self.publisher.post_state_message(state_topic, sighting=None, event_config=event_config, camera=camera_settings) 

340 if autoclear.image: 

341 self.publisher.post_image_message(image_topic, image=None)