Coverage for src / anpr2mqtt / app.py: 89%
101 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 17:29 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 17:29 +0000
1import logging
2import sys
3from typing import Any, cast
5import paho.mqtt.client as mqtt
6import structlog
7from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode, MQTTProtocolVersion
8from paho.mqtt.properties import Properties
9from paho.mqtt.reasoncodes import ReasonCode
10from pydantic_settings import CliApp
11from watchdog.observers import Observer
13import anpr2mqtt
14from anpr2mqtt.event_handler import EventHandler
15from anpr2mqtt.hass import HomeAssistantPublisher
16from anpr2mqtt.settings import CameraSettings, Settings
18log = structlog.get_logger()
19# run like docker run --restart always -d -v /ftp:/ftp d4d8dea7d1e3
22def on_connect(
23 _client: mqtt.Client,
24 _userdata: Any,
25 _flags: mqtt.ConnectFlags,
26 rc: ReasonCode,
27 _props: Properties | None = None,
28) -> None:
29 log.debug("on_connect, MQTT result code " + str(rc))
30 if cast("str", rc.getName()) == "Not authorized":
31 log.error("Invalid MQTT credentials", result_code=rc)
32 return
33 if rc != 0:
34 log.warning("Connection failed to broker", result_code=rc)
35 else:
36 log.debug("Connected to broker", result_code=rc)
39def on_disconnect(
40 _client: mqtt.Client,
41 _userdata: Any,
42 _disconnect_flags: mqtt.DisconnectFlags,
43 rc: ReasonCode,
44 _props: Properties | None,
45) -> None:
46 if rc == 0:
47 log.debug("Disconnected from broker", result_code=rc)
48 else:
49 log.warning("Disconnect failure from broker", result_code=rc)
52def main_loop() -> None:
53 """Watch a directory, post any matching files to MQTT after optionally analyzing the image"""
54 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.INFO))
55 settings: Settings = Settings() # type: ignore[call-arg]
56 if settings.log_level != "INFO": 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(settings.log_level))))
58 log.info("ANPR2MQTT %s starting up", anpr2mqtt.version) # pyright: ignore[reportAttributeAccessIssue]
59 for target_type in settings.targets: 59 ↛ 60line 59 didn't jump to line 60 because the loop on line 59 never started
60 log.info(
61 "ANPR2MQTT %s known vehicles, %s alert vehicles, %s corrections, %s ignore patterns",
62 len(settings.targets[target_type].known),
63 len(settings.targets[target_type].dangerous),
64 len(settings.targets[target_type].correction),
65 len(settings.targets[target_type].ignore),
66 )
68 client: mqtt.Client
69 publisher: HomeAssistantPublisher
70 protocol: MQTTProtocolVersion
71 if settings.mqtt.protocol in ("3", "3.11"):
72 protocol = MQTTProtocolVersion.MQTTv311
73 elif settings.mqtt.protocol == "3.1":
74 protocol = MQTTProtocolVersion.MQTTv31
75 elif settings.mqtt.protocol in ("5", "5.0"):
76 protocol = MQTTProtocolVersion.MQTTv5
77 else:
78 log.info("No valid MQTT protocol version found (%s), setting to default v3.11", settings.mqtt.protocol)
79 protocol = MQTTProtocolVersion.MQTTv311
80 log.debug("MQTT protocol set to %r", protocol)
82 try:
83 client = mqtt.Client(
84 callback_api_version=CallbackAPIVersion.VERSION2,
85 clean_session=True if protocol != MQTTProtocolVersion.MQTTv5 else None,
86 client_id="anpr2mqtt",
87 protocol=protocol,
88 )
89 client.on_connect = on_connect
90 client.on_disconnect = on_disconnect
91 client.username_pw_set(username=settings.mqtt.user, password=settings.mqtt.password)
92 rc: MQTTErrorCode = client.connect(host=settings.mqtt.host, port=int(settings.mqtt.port), keepalive=60)
93 log.info("Client connection requested", result_code=rc)
94 client.loop_start()
95 log.info(f"Connected to MQTT at {settings.mqtt.host}:{settings.mqtt.port} as {settings.mqtt.user}")
96 log.info(f"Publishing at {settings.mqtt.topic_root}")
97 publisher = HomeAssistantPublisher(client, settings.homeassistant)
99 except Exception as e:
100 log.error("Failed to connect to MQTT: %s", e, exc_info=1)
101 sys.exit(-500)
103 try:
104 observer = Observer()
105 except Exception as e:
106 log.error("Failed to setup file system watchdog: %s", e)
107 sys.exit(-400)
109 for event_config in settings.events:
110 camera: CameraSettings | None = None
111 try:
112 for camera_config in settings.cameras:
113 if camera_config.name == event_config.camera: 113 ↛ 112line 113 didn't jump to line 112 because the condition on line 113 was always true
114 camera = camera_config
115 if camera is None:
116 camera = CameraSettings(name=event_config.camera)
117 state_topic = f"{settings.mqtt.topic_root}/{event_config.event}/{camera.name}/state"
118 image_topic = f"{settings.mqtt.topic_root}/{event_config.event}/{camera.name}/image"
119 event_handler = EventHandler(
120 publisher=publisher,
121 event_config=event_config,
122 state_topic=state_topic,
123 camera=camera,
124 image_topic=image_topic,
125 target_config=settings.targets.get(event_config.target_type),
126 ocr_config=settings.ocr,
127 image_config=settings.image,
128 dvla_config=settings.dvla,
129 tracker_config=settings.tracker,
130 ) # ty:ignore[invalid-argument-type]
131 log.debug("Scheduling watchdog for %s", event_config.watch_path)
132 observer.schedule(event_handler, str(event_config.watch_path), recursive=event_config.watch_tree) # ty:ignore[invalid-argument-type]
133 publisher.publish_sensor_discovery(state_topic=state_topic, event_config=event_config, camera=camera)
134 if settings.homeassistant.image_entity:
135 publisher.publish_image_discovery(
136 state_topic=state_topic, image_topic=image_topic, event_config=event_config, camera=camera
137 )
138 if settings.homeassistant.camera_entity:
139 publisher.publish_camera_discovery(
140 state_topic=state_topic, image_topic=image_topic, event_config=event_config, camera=camera
141 )
142 # post initial empty state message
143 publisher.post_state_message(state_topic, target=None, event_config=event_config, camera=camera)
144 log.info("Publishing %s %s state to %s", event_config.event, camera.name, state_topic)
145 except Exception as e:
146 log.error(
147 "Failed to schedule event %s %s watchdog: %s",
148 event_config.event,
149 camera.name if camera else "unknown camera",
150 e,
151 )
153 publisher.start()
154 observer.start()
156 try:
157 log.info("Starting observer loop")
158 while observer.is_alive(): 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true
159 observer.join(1)
160 except Exception as e:
161 log.error("Failed in run observer loop: %s", e, exc_info=1)
162 finally:
163 observer.stop()
164 observer.join()
165 log.info("loop observer ended")
168class Anpr2MQTT(Settings):
169 def cli_cmd(self) -> None:
170 main_loop()
173def run() -> None:
174 CliApp.run(model_cls=Anpr2MQTT)