Coverage for src / anpr2mqtt / app.py: 78%

138 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-30 16:07 +0000

1import logging 

2import sys 

3from typing import Any, cast 

4 

5import paho.mqtt.client as mqtt 

6import structlog 

7from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode, MQTTProtocolVersion 

8from paho.mqtt.properties import Properties 

9from paho.mqtt.reasoncodes import ReasonCode 

10from pydantic import ValidationError 

11from pydantic_settings import CliApp 

12from watchdog.observers import Observer 

13 

14import anpr2mqtt 

15from anpr2mqtt.event_handler import EventHandler 

16from anpr2mqtt.frigate_handler import CameraConfig, FrigateHandler 

17from anpr2mqtt.hass import HomeAssistantPublisher 

18from anpr2mqtt.settings import CameraSettings, EventSettings, Settings 

19from anpr2mqtt.tracker import Tracker, compute_time_analysis 

20 

21log = structlog.get_logger() 

22# run like docker run --restart always -d -v /ftp:/ftp d4d8dea7d1e3 

23 

24 

25def on_connect( 

26 _client: mqtt.Client, 

27 _userdata: Any, 

28 _flags: mqtt.ConnectFlags, 

29 rc: ReasonCode, 

30 _props: Properties | None = None, 

31) -> None: 

32 log.debug("on_connect, MQTT result code " + str(rc)) 

33 if cast("str", rc.getName()) == "Not authorized": 

34 log.error("Invalid MQTT credentials", result_code=rc) 

35 return 

36 if rc != 0: 

37 log.warning("Connection failed to broker", result_code=rc) 

38 else: 

39 log.debug("Connected to broker", result_code=rc) 

40 

41 

42def on_disconnect( 

43 _client: mqtt.Client, 

44 _userdata: Any, 

45 _disconnect_flags: mqtt.DisconnectFlags, 

46 rc: ReasonCode, 

47 _props: Properties | None, 

48) -> None: 

49 if rc == 0: 

50 log.debug("Disconnected from broker", result_code=rc) 

51 else: 

52 log.warning("Disconnect failure from broker", result_code=rc) 

53 

54 

55def main_loop() -> None: 

56 """Watch a directory, post any matching files to MQTT after optionally analyzing the image""" 

57 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.INFO)) 

58 settings: Settings = Settings() # type: ignore[call-arg] 

59 if settings.log_level != "INFO": 

60 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(settings.log_level)))) 

61 log.info("ANPR2MQTT %s starting up", anpr2mqtt.version) # pyright: ignore[reportAttributeAccessIssue] 

62 for target_type in settings.targets: 

63 log.info( 

64 "ANPR2MQTT %s target groups, %s corrections, %s ignore patterns", 

65 len(settings.targets[target_type].groups), 

66 len(settings.targets[target_type].correction), 

67 len(settings.targets[target_type].ignore), 

68 ) 

69 

70 client: mqtt.Client 

71 publisher: HomeAssistantPublisher 

72 protocol: MQTTProtocolVersion 

73 if settings.mqtt.protocol in ("3", "3.11"): 

74 protocol = MQTTProtocolVersion.MQTTv311 

75 elif settings.mqtt.protocol == "3.1": 

76 protocol = MQTTProtocolVersion.MQTTv31 

77 elif settings.mqtt.protocol in ("5", "5.0"): 

78 protocol = MQTTProtocolVersion.MQTTv5 

79 else: 

80 log.info("No valid MQTT protocol version found (%s), setting to default v3.11", settings.mqtt.protocol) 

81 protocol = MQTTProtocolVersion.MQTTv311 

82 log.debug("MQTT protocol set to %r", protocol) 

83 

84 try: 

85 client = mqtt.Client( 

86 callback_api_version=CallbackAPIVersion.VERSION2, 

87 clean_session=True if protocol != MQTTProtocolVersion.MQTTv5 else None, 

88 client_id="anpr2mqtt", 

89 protocol=protocol, 

90 ) 

91 client.on_connect = on_connect 

92 client.on_disconnect = on_disconnect 

93 client.username_pw_set(username=settings.mqtt.user, password=settings.mqtt.password) 

94 rc: MQTTErrorCode = client.connect(host=settings.mqtt.host, port=int(settings.mqtt.port), keepalive=60) 

95 log.info("Client connection requested", result_code=rc) 

96 client.loop_start() 

97 log.info(f"Connected to MQTT at {settings.mqtt.host}:{settings.mqtt.port} as {settings.mqtt.user}") 

98 log.info(f"Publishing at {settings.mqtt.topic_root}") 

99 publisher = HomeAssistantPublisher(client, settings.homeassistant) 

100 

101 except Exception as e: 

102 log.error("Failed to connect to MQTT: %s", e, exc_info=1) 

103 sys.exit(-500) 

104 

105 try: 

106 observer = Observer() 

107 except Exception as e: 

108 log.error("Failed to setup file system watchdog: %s", e) 

109 sys.exit(-400) 

110 

111 # Maps camera name → (event_config, camera_settings, tracker, state_topic, image_topic) 

112 # Used by FrigateHandler to share the same pipeline as filesystem events. 

113 frigate_camera_configs: dict[str, CameraConfig] = {} 

114 

115 for event_config in settings.events: 

116 camera: CameraSettings | None = None 

117 try: 

118 for camera_config in settings.cameras: 

119 if camera_config.name == event_config.camera: 

120 camera = camera_config 

121 if camera is None: 

122 camera = CameraSettings(name=event_config.camera) 

123 state_topic = f"{settings.mqtt.topic_root}/{event_config.event}/cameras/{camera.name}/state" 

124 image_topic = f"{settings.mqtt.topic_root}/{event_config.event}/cameras/{camera.name}/image" 

125 tracker = Tracker( 

126 event_config.target_type, 

127 tracker_config=settings.tracker, 

128 target_config=settings.targets.get(event_config.target_type), 

129 region=event_config.region, 

130 auto_match_tolerance=event_config.auto_match_tolerance, 

131 ) 

132 if camera.name not in frigate_camera_configs: 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was always true

133 frigate_camera_configs[camera.name] = (event_config, camera, tracker, state_topic, image_topic) 

134 event_handler = EventHandler( 

135 publisher=publisher, 

136 event_config=event_config, 

137 state_topic=state_topic, 

138 camera=camera, 

139 image_topic=image_topic, 

140 ocr_config=settings.ocr, 

141 image_config=settings.image, 

142 dvla_config=settings.dvla, 

143 tracker=tracker, 

144 mqtt_topic_root=settings.mqtt.topic_root, 

145 ) # ty:ignore[invalid-argument-type] 

146 log.debug("Scheduling watchdog for %s", event_config.watch_path) 

147 observer.schedule(event_handler, str(event_config.watch_path), recursive=event_config.watch_tree) # ty:ignore[invalid-argument-type] 

148 publisher.publish_sensor_discovery(state_topic=state_topic, event_config=event_config, camera=camera) 

149 if settings.homeassistant.image_entity: 

150 publisher.publish_image_discovery( 

151 state_topic=state_topic, image_topic=image_topic, event_config=event_config, camera=camera 

152 ) 

153 if settings.homeassistant.camera_entity: 

154 publisher.publish_camera_discovery( 

155 state_topic=state_topic, image_topic=image_topic, event_config=event_config, camera=camera 

156 ) 

157 # selectively publish known targets as HA sensors, using last seen timestamp as state value 

158 for entity_id, targets in tracker.entities.items(): 158 ↛ 159line 158 didn't jump to line 159 because the loop on line 158 never started

159 target_topic: str = f"{settings.mqtt.topic_root}/{event_config.event}/targets/{entity_id}/state" 

160 log.info("Publishing sensor.%s for %s targets", entity_id, len(targets)) 

161 icon: str | None = targets[0].icon if len(targets) > 1 and targets[0].icon else event_config.icon 

162 publisher.publish_target_sensor_discovery( 

163 entity_id=entity_id, 

164 target_type=event_config.target_type, 

165 icon=icon, 

166 targets=targets, 

167 state_topic=target_topic, 

168 ) 

169 

170 previous_sightings: list[str] = [] 

171 for target in targets: 

172 previous_sightings.extend(tracker.history(target.id, target.target_type)) 

173 if previous_sightings: 

174 time_analysis: dict[str, Any] = compute_time_analysis(sorted(previous_sightings)) 

175 else: 

176 time_analysis = {"last_seen": None} 

177 publisher.publish_target_state( 

178 state_topic=target_topic, 

179 time_analysis=time_analysis, 

180 ) 

181 

182 # post initial empty state message 

183 publisher.post_state_message(state_topic, sighting=None, event_config=event_config, camera=camera) 

184 log.info("Publishing %s %s state to %s", event_config.event, camera.name, state_topic) 

185 except Exception as e: 

186 log.error( 

187 "Failed to schedule event %s %s watchdog: %s", 

188 event_config.event, 

189 camera.name if camera else "unknown camera", 

190 e, 

191 ) 

192 

193 publisher.start() 

194 observer.start() 

195 

196 if settings.frigate.enabled: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true

197 event_settings: EventSettings | None = None 

198 for cfg in settings.events: 

199 if cfg.target_type == "plate": 

200 event_settings = cfg 

201 break 

202 event_settings = event_settings or EventSettings(target_type="plate") 

203 

204 default_tracker = Tracker( 

205 target_type="plate", 

206 tracker_config=settings.tracker, 

207 target_config=settings.targets.get("plate"), 

208 region=event_settings.region, 

209 auto_match_tolerance=event_settings.auto_match_tolerance, 

210 ) 

211 frigate_handler = FrigateHandler( 

212 mqtt_client=client, 

213 frigate_settings=settings.frigate, 

214 publisher=publisher, 

215 image_settings=settings.image, 

216 dvla_settings=settings.dvla, 

217 camera_configs=frigate_camera_configs, 

218 mqtt_topic_root=settings.mqtt.topic_root, 

219 default_tracker=default_tracker, 

220 ) 

221 frigate_handler.start() 

222 else: 

223 log.info("Frigate listener disabled") 

224 

225 try: 

226 log.info("Starting observer loop") 

227 while observer.is_alive(): 

228 observer.join(1) 

229 except Exception as e: 

230 log.error("Failed in run observer loop: %s", e, exc_info=1) 

231 finally: 

232 observer.stop() 

233 observer.join() 

234 log.info("loop observer ended") 

235 

236 

237class Anpr2MQTT(Settings): 

238 def cli_cmd(self) -> None: 

239 main_loop() 

240 

241 

242def run() -> None: 

243 try: 

244 CliApp.run(model_cls=Anpr2MQTT) 

245 except ValidationError as e: 

246 log.error(e) 

247 log.error("Unable to startup, validation error on settings, check configuration file, arguments or env vars") 

248 log.error("MQTT host, account and password are mandatory (if using env vars, be sure to use MQTT__HOST not MQTT_HOST)") 

249 log.error("Use --help for a full set of config") 

250 log.error("Use the tools CLI for testing out directory watch, image analysis and APIs")