Coverage for src / anpr2mqtt / app.py: 89%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-08 17:29 +0000

1import logging 

2import sys 

3from typing import Any, cast 

4 

5import paho.mqtt.client as mqtt 

6import structlog 

7from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode, MQTTProtocolVersion 

8from paho.mqtt.properties import Properties 

9from paho.mqtt.reasoncodes import ReasonCode 

10from pydantic_settings import CliApp 

11from watchdog.observers import Observer 

12 

13import anpr2mqtt 

14from anpr2mqtt.event_handler import EventHandler 

15from anpr2mqtt.hass import HomeAssistantPublisher 

16from anpr2mqtt.settings import CameraSettings, Settings 

17 

18log = structlog.get_logger() 

19# run like docker run --restart always -d -v /ftp:/ftp d4d8dea7d1e3 

20 

21 

22def on_connect( 

23 _client: mqtt.Client, 

24 _userdata: Any, 

25 _flags: mqtt.ConnectFlags, 

26 rc: ReasonCode, 

27 _props: Properties | None = None, 

28) -> None: 

29 log.debug("on_connect, MQTT result code " + str(rc)) 

30 if cast("str", rc.getName()) == "Not authorized": 

31 log.error("Invalid MQTT credentials", result_code=rc) 

32 return 

33 if rc != 0: 

34 log.warning("Connection failed to broker", result_code=rc) 

35 else: 

36 log.debug("Connected to broker", result_code=rc) 

37 

38 

39def on_disconnect( 

40 _client: mqtt.Client, 

41 _userdata: Any, 

42 _disconnect_flags: mqtt.DisconnectFlags, 

43 rc: ReasonCode, 

44 _props: Properties | None, 

45) -> None: 

46 if rc == 0: 

47 log.debug("Disconnected from broker", result_code=rc) 

48 else: 

49 log.warning("Disconnect failure from broker", result_code=rc) 

50 

51 

52def main_loop() -> None: 

53 """Watch a directory, post any matching files to MQTT after optionally analyzing the image""" 

54 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.INFO)) 

55 settings: Settings = Settings() # type: ignore[call-arg] 

56 if settings.log_level != "INFO": 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(settings.log_level)))) 

58 log.info("ANPR2MQTT %s starting up", anpr2mqtt.version) # pyright: ignore[reportAttributeAccessIssue] 

59 for target_type in settings.targets: 59 ↛ 60line 59 didn't jump to line 60 because the loop on line 59 never started

60 log.info( 

61 "ANPR2MQTT %s known vehicles, %s alert vehicles, %s corrections, %s ignore patterns", 

62 len(settings.targets[target_type].known), 

63 len(settings.targets[target_type].dangerous), 

64 len(settings.targets[target_type].correction), 

65 len(settings.targets[target_type].ignore), 

66 ) 

67 

68 client: mqtt.Client 

69 publisher: HomeAssistantPublisher 

70 protocol: MQTTProtocolVersion 

71 if settings.mqtt.protocol in ("3", "3.11"): 

72 protocol = MQTTProtocolVersion.MQTTv311 

73 elif settings.mqtt.protocol == "3.1": 

74 protocol = MQTTProtocolVersion.MQTTv31 

75 elif settings.mqtt.protocol in ("5", "5.0"): 

76 protocol = MQTTProtocolVersion.MQTTv5 

77 else: 

78 log.info("No valid MQTT protocol version found (%s), setting to default v3.11", settings.mqtt.protocol) 

79 protocol = MQTTProtocolVersion.MQTTv311 

80 log.debug("MQTT protocol set to %r", protocol) 

81 

82 try: 

83 client = mqtt.Client( 

84 callback_api_version=CallbackAPIVersion.VERSION2, 

85 clean_session=True if protocol != MQTTProtocolVersion.MQTTv5 else None, 

86 client_id="anpr2mqtt", 

87 protocol=protocol, 

88 ) 

89 client.on_connect = on_connect 

90 client.on_disconnect = on_disconnect 

91 client.username_pw_set(username=settings.mqtt.user, password=settings.mqtt.password) 

92 rc: MQTTErrorCode = client.connect(host=settings.mqtt.host, port=int(settings.mqtt.port), keepalive=60) 

93 log.info("Client connection requested", result_code=rc) 

94 client.loop_start() 

95 log.info(f"Connected to MQTT at {settings.mqtt.host}:{settings.mqtt.port} as {settings.mqtt.user}") 

96 log.info(f"Publishing at {settings.mqtt.topic_root}") 

97 publisher = HomeAssistantPublisher(client, settings.homeassistant) 

98 

99 except Exception as e: 

100 log.error("Failed to connect to MQTT: %s", e, exc_info=1) 

101 sys.exit(-500) 

102 

103 try: 

104 observer = Observer() 

105 except Exception as e: 

106 log.error("Failed to setup file system watchdog: %s", e) 

107 sys.exit(-400) 

108 

109 for event_config in settings.events: 

110 camera: CameraSettings | None = None 

111 try: 

112 for camera_config in settings.cameras: 

113 if camera_config.name == event_config.camera: 113 ↛ 112line 113 didn't jump to line 112 because the condition on line 113 was always true

114 camera = camera_config 

115 if camera is None: 

116 camera = CameraSettings(name=event_config.camera) 

117 state_topic = f"{settings.mqtt.topic_root}/{event_config.event}/{camera.name}/state" 

118 image_topic = f"{settings.mqtt.topic_root}/{event_config.event}/{camera.name}/image" 

119 event_handler = EventHandler( 

120 publisher=publisher, 

121 event_config=event_config, 

122 state_topic=state_topic, 

123 camera=camera, 

124 image_topic=image_topic, 

125 target_config=settings.targets.get(event_config.target_type), 

126 ocr_config=settings.ocr, 

127 image_config=settings.image, 

128 dvla_config=settings.dvla, 

129 tracker_config=settings.tracker, 

130 ) # ty:ignore[invalid-argument-type] 

131 log.debug("Scheduling watchdog for %s", event_config.watch_path) 

132 observer.schedule(event_handler, str(event_config.watch_path), recursive=event_config.watch_tree) # ty:ignore[invalid-argument-type] 

133 publisher.publish_sensor_discovery(state_topic=state_topic, event_config=event_config, camera=camera) 

134 if settings.homeassistant.image_entity: 

135 publisher.publish_image_discovery( 

136 state_topic=state_topic, image_topic=image_topic, event_config=event_config, camera=camera 

137 ) 

138 if settings.homeassistant.camera_entity: 

139 publisher.publish_camera_discovery( 

140 state_topic=state_topic, image_topic=image_topic, event_config=event_config, camera=camera 

141 ) 

142 # post initial empty state message 

143 publisher.post_state_message(state_topic, target=None, event_config=event_config, camera=camera) 

144 log.info("Publishing %s %s state to %s", event_config.event, camera.name, state_topic) 

145 except Exception as e: 

146 log.error( 

147 "Failed to schedule event %s %s watchdog: %s", 

148 event_config.event, 

149 camera.name if camera else "unknown camera", 

150 e, 

151 ) 

152 

153 publisher.start() 

154 observer.start() 

155 

156 try: 

157 log.info("Starting observer loop") 

158 while observer.is_alive(): 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true

159 observer.join(1) 

160 except Exception as e: 

161 log.error("Failed in run observer loop: %s", e, exc_info=1) 

162 finally: 

163 observer.stop() 

164 observer.join() 

165 log.info("loop observer ended") 

166 

167 

168class Anpr2MQTT(Settings): 

169 def cli_cmd(self) -> None: 

170 main_loop() 

171 

172 

173def run() -> None: 

174 CliApp.run(model_cls=Anpr2MQTT)