Coverage for src / anpr2mqtt / app.py: 23%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 15:35 +0000

1import logging 

2import sys 

3from typing import Any 

4 

5import paho.mqtt.client as mqtt 

6import structlog 

7from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode 

8from paho.mqtt.properties import Properties 

9from paho.mqtt.reasoncodes import ReasonCode 

10from pydantic_settings import CliApp 

11from watchdog.observers import Observer 

12 

13import anpr2mqtt 

14from anpr2mqtt.event_handler import EventHandler 

15from anpr2mqtt.hass import post_discovery_message 

16from anpr2mqtt.settings import Settings 

17 

18log = structlog.get_logger() 

19# run like docker run --restart always -d -v /ftp:/ftp d4d8dea7d1e3 

20 

21 

22def on_connect( 

23 _client: mqtt.Client, 

24 _userdata: Any, 

25 _flags: mqtt.ConnectFlags, 

26 rc: ReasonCode, 

27 _props: Properties | None = None, 

28) -> None: 

29 log.debug("on_connect, MQTT result code " + str(rc)) 

30 if rc.getName() == "Not authorized": 

31 log.error("Invalid MQTT credentials", result_code=rc) 

32 return 

33 if rc != 0: 

34 log.warning("Connection failed to broker", result_code=rc) 

35 else: 

36 log.debug("Connected to broker", result_code=rc) 

37 

38 

39def on_disconnect( 

40 _client: mqtt.Client, 

41 _userdata: Any, 

42 _disconnect_flags: mqtt.DisconnectFlags, 

43 rc: ReasonCode, 

44 _props: Properties | None, 

45) -> None: 

46 if rc == 0: 

47 log.debug("Disconnected from broker", result_code=rc) 

48 else: 

49 log.warning("Disconnect failure from broker", result_code=rc) 

50 

51 

52def main_loop() -> None: 

53 """Watch a directory, post any matching files to MQTT after optionally analyzing the image""" 

54 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.INFO)) 

55 settings: Settings = Settings() # type: ignore[call-arg] 

56 if settings.log_level != "INFO": 

57 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(settings.log_level)))) 

58 log.info("ANPR2MQTT %s starting up", anpr2mqtt.version) # pyright: ignore[reportAttributeAccessIssue] 

59 for target_type in settings.targets: 

60 log.info( 

61 "ANPR2MQTT %s known vehicles, %s alert vehicles, %s corrections, %s ignore patterns", 

62 len(settings.targets[target_type].known), 

63 len(settings.targets[target_type].dangerous), 

64 len(settings.targets[target_type].correction), 

65 len(settings.targets[target_type].ignore), 

66 ) 

67 

68 client: mqtt.Client 

69 try: 

70 client = mqtt.Client( 

71 callback_api_version=CallbackAPIVersion.VERSION2, 

72 clean_session=True, 

73 client_id="anpr2mqtt", 

74 ) 

75 client.on_connect = on_connect 

76 client.on_disconnect = on_disconnect 

77 client.username_pw_set(username=settings.mqtt.user, password=settings.mqtt.password) 

78 rc: MQTTErrorCode = client.connect(host=settings.mqtt.host, port=int(settings.mqtt.port), keepalive=60) 

79 log.info("Client connection requested", result_code=rc) 

80 client.loop_start() 

81 log.info(f"Connected to MQTT at {settings.mqtt.host}:{settings.mqtt.port} as {settings.mqtt.user}") 

82 log.info(f"Publishing at {settings.mqtt.topic_root}") 

83 

84 except Exception as e: 

85 log.error("Failed to connect to MQTT: %s", e, exc_info=1) 

86 sys.exit(-500) 

87 

88 try: 

89 observer = Observer() 

90 except Exception as e: 

91 log.error("Failed to setup file system watchdog: %s", e) 

92 sys.exit(-400) 

93 

94 for event_config in settings.events: 

95 try: 

96 state_topic = f"{settings.mqtt.topic_root}/{event_config.event}/{event_config.camera}/state" 

97 image_topic = f"{settings.mqtt.topic_root}/{event_config.event}/{event_config.camera}/image" 

98 event_handler = EventHandler( 

99 client=client, 

100 event_config=event_config, 

101 state_topic=state_topic, 

102 image_topic=image_topic, 

103 target_config=settings.targets.get(event_config.target_type), 

104 ocr_config=settings.ocr, 

105 image_config=settings.image, 

106 dvla_config=settings.dvla, 

107 tracker_config=settings.tracker, 

108 ) # ty:ignore[invalid-argument-type] 

109 log.debug("Scheduling watchdog for %s", event_config.watch_path) 

110 observer.schedule(event_handler, str(event_config.watch_path), recursive=False) # ty:ignore[invalid-argument-type] 

111 post_discovery_message( 

112 client, 

113 settings.homeassistant.discovery_topic_root, 

114 state_topic=state_topic, 

115 image_topic=image_topic, 

116 event_config=event_config, 

117 device_creation=settings.homeassistant.device_creation, 

118 ) 

119 log.info("Publishing %s %s state to %s", event_config.event, event_config.camera, state_topic) 

120 except Exception as e: 

121 log.error("Failed to schedule event %s %s watchdog: %s", event_config.event, event_config.camera, e) 

122 

123 observer.start() 

124 try: 

125 log.info("Starting observer loop") 

126 while observer.is_alive(): 

127 observer.join(1) 

128 except Exception as e: 

129 log.error("Failed in run observer loop: %s", e, exc_info=1) 

130 finally: 

131 observer.stop() 

132 observer.join() 

133 log.info("loop observer ended") 

134 

135 

136class Anpr2MQTT(Settings): 

137 def cli_cmd(self) -> None: 

138 main_loop() 

139 

140 

141def run() -> None: 

142 CliApp.run(model_cls=Anpr2MQTT)