Coverage for src / anpr2mqtt / hass.py: 100%

121 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-08 17:29 +0000

1import datetime as dt 

2import json 

3import random 

4import time 

5from io import BytesIO 

6from pathlib import Path 

7from typing import Any 

8 

9import paho.mqtt.client as mqtt 

10import structlog 

11from paho.mqtt.properties import Properties 

12from paho.mqtt.reasoncodes import ReasonCode 

13from PIL import Image 

14 

15import anpr2mqtt 

16from anpr2mqtt.settings import CameraSettings, EventSettings, HomeAssistantSettings 

17 

18from .const import ImageInfo 

19 

20log = structlog.get_logger() 

21 

22 

23class HomeAssistantPublisher: 

24 def __init__(self, client: mqtt.Client, cfg: HomeAssistantSettings) -> None: 

25 self.client: mqtt.Client = client 

26 self.hass_status_topic: str = cfg.status_topic 

27 self.discovery_topic_prefix: str = cfg.discovery_topic_root 

28 self.device_creation: bool = cfg.device_creation 

29 self.republish: dict[str, Any] = {} 

30 self.hass_online: bool | None = None 

31 

32 def start(self) -> None: 

33 log.info("Subscribing to Home Assistant birth and last will at %s", self.hass_status_topic) 

34 self.client.on_message = self.on_message 

35 self.client.on_subscribe = self.on_subscribe 

36 self.client.on_unsubscribe = self.on_unsubscribe 

37 self.client.subscribe(self.hass_status_topic) 

38 

39 def on_subscribe( 

40 self, 

41 _client: mqtt.Client, 

42 userdata: Any, 

43 mid: int, 

44 reason_code_list: list[ReasonCode], 

45 properties: Properties | None = None, 

46 ) -> None: 

47 log.debug("on_subscribe, userdata=%s, mid=%s, reasons=%s, properties=%s", userdata, mid, reason_code_list, properties) 

48 

49 def on_unsubscribe( 

50 self, 

51 _client: mqtt.Client, 

52 userdata: Any, 

53 mid: int, 

54 reason_code_list: list[ReasonCode], 

55 properties: Properties | None = None, 

56 ) -> None: 

57 log.debug("on_unsubscribe, userdata=%s, mid=%s, reasons=%s, properties=%s", userdata, mid, reason_code_list, properties) 

58 

59 def on_message(self, _client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage) -> None: 

60 """Callback for incoming MQTT messages""" # noqa: D401 

61 if msg.topic == self.hass_status_topic: 

62 decoded: str | None = msg.payload.decode("utf-8") if msg.payload else None 

63 if decoded == "offline": 

64 log.warn("Home Assistant gone offline") 

65 self.hass_online = False 

66 elif decoded == "online": 

67 if self.hass_online is False: 

68 log.info("Home Assistant back online") 

69 self.hass_online = True 

70 self.republish_discovery() 

71 else: 

72 log.info("Home Assistant online") 

73 self.hass_online = True 

74 else: 

75 log.warn("Unknown Home Assistant status payload: %s", msg.payload) 

76 else: 

77 log.debug("Unknown message on %s", msg.topic) 

78 

79 def republish_discovery(self) -> None: 

80 for topic, payload in self.republish.items(): 

81 log.debug("Republishing to %s", topic) 

82 # add jitter to republish to reduce herd load on HA after restart 

83 time.sleep(random.randint(1, 10)) # noqa: S311 

84 self.client.publish(topic, payload) 

85 

86 def publish_sensor_discovery(self, state_topic: str, event_config: EventSettings, camera: CameraSettings) -> None: 

87 name: str = event_config.description or f"{event_config.event} {camera.name}" 

88 payload: dict[str, Any] = { 

89 "o": { 

90 "name": "anpr2mqtt", 

91 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

92 "url": "https://anpr2mqtt.rhizomatics.org.uk", 

93 }, 

94 "device_class": None, 

95 "value_template": "{{ value_json.target }}", 

96 "default_entity_id": f"sensor.{event_config.event}_{event_config.camera}", 

97 "unique_id": f"{event_config.event}_{event_config.camera}", 

98 "state_topic": state_topic, 

99 "json_attributes_topic": state_topic, 

100 "icon": "mdi:car-back", 

101 "name": name, 

102 } 

103 if self.device_creation: 

104 self.add_device_info(payload, camera) 

105 topic: str = f"{self.discovery_topic_prefix}/sensor/{event_config.camera}/{event_config.event}/config" 

106 msg: str = json.dumps(payload) 

107 self.client.publish(topic, payload=msg, qos=0, retain=True) 

108 self.republish[topic] = msg 

109 log.info("Published HA MQTT sensor Discovery message to %s", topic) 

110 

111 def publish_image_discovery( 

112 self, state_topic: str, image_topic: str, event_config: EventSettings, camera: CameraSettings 

113 ) -> None: 

114 name: str = event_config.description or f"{event_config.event} {event_config.camera}" 

115 payload = { 

116 "o": { 

117 "name": "anpr2mqtt", 

118 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

119 "url": "https://anpr2mqtt.rhizomatics.org.uk", 

120 }, 

121 "device_class": None, 

122 "unique_id": f"{event_config.event}_{event_config.camera}", 

123 "default_entity_id": f"image.{event_config.event}_{event_config.camera}", 

124 "image_topic": image_topic, 

125 "json_attributes_topic": state_topic, 

126 "icon": "mdi:car-back", 

127 "name": name, 

128 } 

129 if self.device_creation: 

130 self.add_device_info(payload, camera) 

131 topic = f"{self.discovery_topic_prefix}/image/{event_config.camera}/{event_config.event}/config" 

132 msg = json.dumps(payload) 

133 self.client.publish(topic, payload=msg, qos=0, retain=True) 

134 self.republish[topic] = msg 

135 log.info("Published HA MQTT Discovery message to %s", topic) 

136 

137 def publish_camera_discovery( 

138 self, state_topic: str, image_topic: str, event_config: EventSettings, camera: CameraSettings 

139 ) -> None: 

140 name: str = event_config.description or f"{event_config.event} {event_config.camera}" 

141 payload = { 

142 "o": { 

143 "name": "anpr2mqtt", 

144 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

145 "url": "https://anpr2mqtt.rhizomatics.org.uk", 

146 }, 

147 "unique_id": f"{event_config.event}_{camera.name}", 

148 "default_entity_id": f"camera.{event_config.event}_{camera.name}_anpr", 

149 "topic": image_topic, 

150 "json_attributes_topic": state_topic, 

151 "icon": "mdi:car-back", 

152 "name": name, 

153 } 

154 if self.device_creation: 

155 self.add_device_info(payload, camera) 

156 topic = f"{self.discovery_topic_prefix}/camera/{camera.name}/{event_config.event}/config" 

157 msg = json.dumps(payload) 

158 self.client.publish(topic, payload=msg, qos=0, retain=True) 

159 self.republish[topic] = msg 

160 log.info("Published HA MQTT Discovery message to %s", topic) 

161 

162 def add_device_info(self, payload: dict[str, Any], camera: CameraSettings) -> None: 

163 payload["dev"] = { 

164 "name": f"anpr2mqtt on {camera.name}", 

165 "sw_version": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

166 "manufacturer": "rhizomatics", 

167 "identifiers": [f"{camera.name}.anpr2mqtt"], 

168 } 

169 if camera.area: 

170 payload["dev"]["suggested_area"] = camera.area 

171 

172 def post_state_message( 

173 self, 

174 topic: str, 

175 target: str | None, 

176 event_config: EventSettings, 

177 camera: CameraSettings, 

178 ocr_fields: dict[str, str | None] | None = None, 

179 image_info: ImageInfo | None = None, 

180 classification: dict[str, Any] | None = None, 

181 previous_sightings: int | None = None, 

182 last_sighting: dt.datetime | None = None, 

183 url: str | None = None, 

184 error: str | None = None, 

185 file_path: Path | None = None, 

186 reg_info: Any = None, 

187 ) -> None: 

188 payload: dict[str, Any] = { 

189 "target": target, 

190 "target_type": event_config.target_type, 

191 event_config.target_type: target, 

192 "event": event_config.event, 

193 "camera": camera.name or "UNKNOWN", 

194 "area": camera.area, 

195 "live_url": camera.live_url, 

196 "reg_info": reg_info, 

197 } 

198 if ocr_fields: 

199 payload.update(ocr_fields) 

200 if error: 

201 payload["error"] = error 

202 if url is not None: 

203 payload["event_image_url"] = url 

204 if file_path is not None: 

205 payload["file_path"] = str(file_path) 

206 if classification is not None: 

207 payload.update(classification) 

208 if previous_sightings is not None: 

209 payload["previous_sightings"] = previous_sightings 

210 if last_sighting is not None: 

211 payload["last_sighting"] = last_sighting.isoformat() 

212 

213 try: 

214 if image_info: 

215 payload.update( 

216 { 

217 "event_time": image_info.timestamp.isoformat(), 

218 "image_event": image_info.event, 

219 "ext": image_info.ext, 

220 "image_size": image_info.size, 

221 } 

222 ) 

223 msg: str = json.dumps(payload) 

224 self.client.publish(topic, payload=msg, qos=0, retain=True) 

225 log.debug("Published HA MQTT State message to %s: %s", topic, payload) 

226 except Exception as e: 

227 log.error("Failed to publish event %s: %s", payload, e, exc_info=1) 

228 

229 def post_image_message(self, topic: str, image: Image.Image, img_format: str = "JPEG") -> None: 

230 try: 

231 img_byte_arr = BytesIO() 

232 image.save(img_byte_arr, format=img_format) 

233 img_bytes = img_byte_arr.getvalue() 

234 

235 self.client.publish(topic, payload=img_bytes, qos=0, retain=True) 

236 log.debug("Published HA MQTT Image message to %s: %s bytes", topic, len(img_bytes)) 

237 except Exception as e: 

238 log.error("Failed to publish image entity: %s", e, exc_info=1)