Coverage for src / anpr2mqtt / app.py: 23%
79 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 15:35 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 15:35 +0000
1import logging
2import sys
3from typing import Any
5import paho.mqtt.client as mqtt
6import structlog
7from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode
8from paho.mqtt.properties import Properties
9from paho.mqtt.reasoncodes import ReasonCode
10from pydantic_settings import CliApp
11from watchdog.observers import Observer
13import anpr2mqtt
14from anpr2mqtt.event_handler import EventHandler
15from anpr2mqtt.hass import post_discovery_message
16from anpr2mqtt.settings import Settings
18log = structlog.get_logger()
19# run like docker run --restart always -d -v /ftp:/ftp d4d8dea7d1e3
22def on_connect(
23 _client: mqtt.Client,
24 _userdata: Any,
25 _flags: mqtt.ConnectFlags,
26 rc: ReasonCode,
27 _props: Properties | None = None,
28) -> None:
29 log.debug("on_connect, MQTT result code " + str(rc))
30 if rc.getName() == "Not authorized":
31 log.error("Invalid MQTT credentials", result_code=rc)
32 return
33 if rc != 0:
34 log.warning("Connection failed to broker", result_code=rc)
35 else:
36 log.debug("Connected to broker", result_code=rc)
39def on_disconnect(
40 _client: mqtt.Client,
41 _userdata: Any,
42 _disconnect_flags: mqtt.DisconnectFlags,
43 rc: ReasonCode,
44 _props: Properties | None,
45) -> None:
46 if rc == 0:
47 log.debug("Disconnected from broker", result_code=rc)
48 else:
49 log.warning("Disconnect failure from broker", result_code=rc)
52def main_loop() -> None:
53 """Watch a directory, post any matching files to MQTT after optionally analyzing the image"""
54 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.INFO))
55 settings: Settings = Settings() # type: ignore[call-arg]
56 if settings.log_level != "INFO":
57 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(settings.log_level))))
58 log.info("ANPR2MQTT %s starting up", anpr2mqtt.version) # pyright: ignore[reportAttributeAccessIssue]
59 for target_type in settings.targets:
60 log.info(
61 "ANPR2MQTT %s known vehicles, %s alert vehicles, %s corrections, %s ignore patterns",
62 len(settings.targets[target_type].known),
63 len(settings.targets[target_type].dangerous),
64 len(settings.targets[target_type].correction),
65 len(settings.targets[target_type].ignore),
66 )
68 client: mqtt.Client
69 try:
70 client = mqtt.Client(
71 callback_api_version=CallbackAPIVersion.VERSION2,
72 clean_session=True,
73 client_id="anpr2mqtt",
74 )
75 client.on_connect = on_connect
76 client.on_disconnect = on_disconnect
77 client.username_pw_set(username=settings.mqtt.user, password=settings.mqtt.password)
78 rc: MQTTErrorCode = client.connect(host=settings.mqtt.host, port=int(settings.mqtt.port), keepalive=60)
79 log.info("Client connection requested", result_code=rc)
80 client.loop_start()
81 log.info(f"Connected to MQTT at {settings.mqtt.host}:{settings.mqtt.port} as {settings.mqtt.user}")
82 log.info(f"Publishing at {settings.mqtt.topic_root}")
84 except Exception as e:
85 log.error("Failed to connect to MQTT: %s", e, exc_info=1)
86 sys.exit(-500)
88 try:
89 observer = Observer()
90 except Exception as e:
91 log.error("Failed to setup file system watchdog: %s", e)
92 sys.exit(-400)
94 for event_config in settings.events:
95 try:
96 state_topic = f"{settings.mqtt.topic_root}/{event_config.event}/{event_config.camera}/state"
97 image_topic = f"{settings.mqtt.topic_root}/{event_config.event}/{event_config.camera}/image"
98 event_handler = EventHandler(
99 client=client,
100 event_config=event_config,
101 state_topic=state_topic,
102 image_topic=image_topic,
103 target_config=settings.targets.get(event_config.target_type),
104 ocr_config=settings.ocr,
105 image_config=settings.image,
106 dvla_config=settings.dvla,
107 tracker_config=settings.tracker,
108 ) # ty:ignore[invalid-argument-type]
109 log.debug("Scheduling watchdog for %s", event_config.watch_path)
110 observer.schedule(event_handler, str(event_config.watch_path), recursive=False) # ty:ignore[invalid-argument-type]
111 post_discovery_message(
112 client,
113 settings.homeassistant.discovery_topic_root,
114 state_topic=state_topic,
115 image_topic=image_topic,
116 event_config=event_config,
117 device_creation=settings.homeassistant.device_creation,
118 )
119 log.info("Publishing %s %s state to %s", event_config.event, event_config.camera, state_topic)
120 except Exception as e:
121 log.error("Failed to schedule event %s %s watchdog: %s", event_config.event, event_config.camera, e)
123 observer.start()
124 try:
125 log.info("Starting observer loop")
126 while observer.is_alive():
127 observer.join(1)
128 except Exception as e:
129 log.error("Failed in run observer loop: %s", e, exc_info=1)
130 finally:
131 observer.stop()
132 observer.join()
133 log.info("loop observer ended")
136class Anpr2MQTT(Settings):
137 def cli_cmd(self) -> None:
138 main_loop()
141def run() -> None:
142 CliApp.run(model_cls=Anpr2MQTT)