Coverage for src / anpr2mqtt / app.py: 78%
138 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-30 16:07 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-30 16:07 +0000
1import logging
2import sys
3from typing import Any, cast
5import paho.mqtt.client as mqtt
6import structlog
7from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode, MQTTProtocolVersion
8from paho.mqtt.properties import Properties
9from paho.mqtt.reasoncodes import ReasonCode
10from pydantic import ValidationError
11from pydantic_settings import CliApp
12from watchdog.observers import Observer
14import anpr2mqtt
15from anpr2mqtt.event_handler import EventHandler
16from anpr2mqtt.frigate_handler import CameraConfig, FrigateHandler
17from anpr2mqtt.hass import HomeAssistantPublisher
18from anpr2mqtt.settings import CameraSettings, EventSettings, Settings
19from anpr2mqtt.tracker import Tracker, compute_time_analysis
21log = structlog.get_logger()
22# run like docker run --restart always -d -v /ftp:/ftp d4d8dea7d1e3
25def on_connect(
26 _client: mqtt.Client,
27 _userdata: Any,
28 _flags: mqtt.ConnectFlags,
29 rc: ReasonCode,
30 _props: Properties | None = None,
31) -> None:
32 log.debug("on_connect, MQTT result code " + str(rc))
33 if cast("str", rc.getName()) == "Not authorized":
34 log.error("Invalid MQTT credentials", result_code=rc)
35 return
36 if rc != 0:
37 log.warning("Connection failed to broker", result_code=rc)
38 else:
39 log.debug("Connected to broker", result_code=rc)
42def on_disconnect(
43 _client: mqtt.Client,
44 _userdata: Any,
45 _disconnect_flags: mqtt.DisconnectFlags,
46 rc: ReasonCode,
47 _props: Properties | None,
48) -> None:
49 if rc == 0:
50 log.debug("Disconnected from broker", result_code=rc)
51 else:
52 log.warning("Disconnect failure from broker", result_code=rc)
55def main_loop() -> None:
56 """Watch a directory, post any matching files to MQTT after optionally analyzing the image"""
57 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.INFO))
58 settings: Settings = Settings() # type: ignore[call-arg]
59 if settings.log_level != "INFO":
60 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(settings.log_level))))
61 log.info("ANPR2MQTT %s starting up", anpr2mqtt.version) # pyright: ignore[reportAttributeAccessIssue]
62 for target_type in settings.targets:
63 log.info(
64 "ANPR2MQTT %s target groups, %s corrections, %s ignore patterns",
65 len(settings.targets[target_type].groups),
66 len(settings.targets[target_type].correction),
67 len(settings.targets[target_type].ignore),
68 )
70 client: mqtt.Client
71 publisher: HomeAssistantPublisher
72 protocol: MQTTProtocolVersion
73 if settings.mqtt.protocol in ("3", "3.11"):
74 protocol = MQTTProtocolVersion.MQTTv311
75 elif settings.mqtt.protocol == "3.1":
76 protocol = MQTTProtocolVersion.MQTTv31
77 elif settings.mqtt.protocol in ("5", "5.0"):
78 protocol = MQTTProtocolVersion.MQTTv5
79 else:
80 log.info("No valid MQTT protocol version found (%s), setting to default v3.11", settings.mqtt.protocol)
81 protocol = MQTTProtocolVersion.MQTTv311
82 log.debug("MQTT protocol set to %r", protocol)
84 try:
85 client = mqtt.Client(
86 callback_api_version=CallbackAPIVersion.VERSION2,
87 clean_session=True if protocol != MQTTProtocolVersion.MQTTv5 else None,
88 client_id="anpr2mqtt",
89 protocol=protocol,
90 )
91 client.on_connect = on_connect
92 client.on_disconnect = on_disconnect
93 client.username_pw_set(username=settings.mqtt.user, password=settings.mqtt.password)
94 rc: MQTTErrorCode = client.connect(host=settings.mqtt.host, port=int(settings.mqtt.port), keepalive=60)
95 log.info("Client connection requested", result_code=rc)
96 client.loop_start()
97 log.info(f"Connected to MQTT at {settings.mqtt.host}:{settings.mqtt.port} as {settings.mqtt.user}")
98 log.info(f"Publishing at {settings.mqtt.topic_root}")
99 publisher = HomeAssistantPublisher(client, settings.homeassistant)
101 except Exception as e:
102 log.error("Failed to connect to MQTT: %s", e, exc_info=1)
103 sys.exit(-500)
105 try:
106 observer = Observer()
107 except Exception as e:
108 log.error("Failed to setup file system watchdog: %s", e)
109 sys.exit(-400)
111 # Maps camera name → (event_config, camera_settings, tracker, state_topic, image_topic)
112 # Used by FrigateHandler to share the same pipeline as filesystem events.
113 frigate_camera_configs: dict[str, CameraConfig] = {}
115 for event_config in settings.events:
116 camera: CameraSettings | None = None
117 try:
118 for camera_config in settings.cameras:
119 if camera_config.name == event_config.camera:
120 camera = camera_config
121 if camera is None:
122 camera = CameraSettings(name=event_config.camera)
123 state_topic = f"{settings.mqtt.topic_root}/{event_config.event}/cameras/{camera.name}/state"
124 image_topic = f"{settings.mqtt.topic_root}/{event_config.event}/cameras/{camera.name}/image"
125 tracker = Tracker(
126 event_config.target_type,
127 tracker_config=settings.tracker,
128 target_config=settings.targets.get(event_config.target_type),
129 region=event_config.region,
130 auto_match_tolerance=event_config.auto_match_tolerance,
131 )
132 if camera.name not in frigate_camera_configs: 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was always true
133 frigate_camera_configs[camera.name] = (event_config, camera, tracker, state_topic, image_topic)
134 event_handler = EventHandler(
135 publisher=publisher,
136 event_config=event_config,
137 state_topic=state_topic,
138 camera=camera,
139 image_topic=image_topic,
140 ocr_config=settings.ocr,
141 image_config=settings.image,
142 dvla_config=settings.dvla,
143 tracker=tracker,
144 mqtt_topic_root=settings.mqtt.topic_root,
145 ) # ty:ignore[invalid-argument-type]
146 log.debug("Scheduling watchdog for %s", event_config.watch_path)
147 observer.schedule(event_handler, str(event_config.watch_path), recursive=event_config.watch_tree) # ty:ignore[invalid-argument-type]
148 publisher.publish_sensor_discovery(state_topic=state_topic, event_config=event_config, camera=camera)
149 if settings.homeassistant.image_entity:
150 publisher.publish_image_discovery(
151 state_topic=state_topic, image_topic=image_topic, event_config=event_config, camera=camera
152 )
153 if settings.homeassistant.camera_entity:
154 publisher.publish_camera_discovery(
155 state_topic=state_topic, image_topic=image_topic, event_config=event_config, camera=camera
156 )
157 # selectively publish known targets as HA sensors, using last seen timestamp as state value
158 for entity_id, targets in tracker.entities.items(): 158 ↛ 159line 158 didn't jump to line 159 because the loop on line 158 never started
159 target_topic: str = f"{settings.mqtt.topic_root}/{event_config.event}/targets/{entity_id}/state"
160 log.info("Publishing sensor.%s for %s targets", entity_id, len(targets))
161 icon: str | None = targets[0].icon if len(targets) > 1 and targets[0].icon else event_config.icon
162 publisher.publish_target_sensor_discovery(
163 entity_id=entity_id,
164 target_type=event_config.target_type,
165 icon=icon,
166 targets=targets,
167 state_topic=target_topic,
168 )
170 previous_sightings: list[str] = []
171 for target in targets:
172 previous_sightings.extend(tracker.history(target.id, target.target_type))
173 if previous_sightings:
174 time_analysis: dict[str, Any] = compute_time_analysis(sorted(previous_sightings))
175 else:
176 time_analysis = {"last_seen": None}
177 publisher.publish_target_state(
178 state_topic=target_topic,
179 time_analysis=time_analysis,
180 )
182 # post initial empty state message
183 publisher.post_state_message(state_topic, sighting=None, event_config=event_config, camera=camera)
184 log.info("Publishing %s %s state to %s", event_config.event, camera.name, state_topic)
185 except Exception as e:
186 log.error(
187 "Failed to schedule event %s %s watchdog: %s",
188 event_config.event,
189 camera.name if camera else "unknown camera",
190 e,
191 )
193 publisher.start()
194 observer.start()
196 if settings.frigate.enabled: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true
197 event_settings: EventSettings | None = None
198 for cfg in settings.events:
199 if cfg.target_type == "plate":
200 event_settings = cfg
201 break
202 event_settings = event_settings or EventSettings(target_type="plate")
204 default_tracker = Tracker(
205 target_type="plate",
206 tracker_config=settings.tracker,
207 target_config=settings.targets.get("plate"),
208 region=event_settings.region,
209 auto_match_tolerance=event_settings.auto_match_tolerance,
210 )
211 frigate_handler = FrigateHandler(
212 mqtt_client=client,
213 frigate_settings=settings.frigate,
214 publisher=publisher,
215 image_settings=settings.image,
216 dvla_settings=settings.dvla,
217 camera_configs=frigate_camera_configs,
218 mqtt_topic_root=settings.mqtt.topic_root,
219 default_tracker=default_tracker,
220 )
221 frigate_handler.start()
222 else:
223 log.info("Frigate listener disabled")
225 try:
226 log.info("Starting observer loop")
227 while observer.is_alive():
228 observer.join(1)
229 except Exception as e:
230 log.error("Failed in run observer loop: %s", e, exc_info=1)
231 finally:
232 observer.stop()
233 observer.join()
234 log.info("loop observer ended")
237class Anpr2MQTT(Settings):
238 def cli_cmd(self) -> None:
239 main_loop()
242def run() -> None:
243 try:
244 CliApp.run(model_cls=Anpr2MQTT)
245 except ValidationError as e:
246 log.error(e)
247 log.error("Unable to startup, validation error on settings, check configuration file, arguments or env vars")
248 log.error("MQTT host, account and password are mandatory (if using env vars, be sure to use MQTT__HOST not MQTT_HOST)")
249 log.error("Use --help for a full set of config")
250 log.error("Use the tools CLI for testing out directory watch, image analysis and APIs")