Coverage for src / anpr2mqtt / hass.py: 87%

150 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-30 16:07 +0000

1import json 

2import random 

3import time 

4from io import BytesIO 

5from pathlib import Path 

6from typing import Any 

7 

8import paho.mqtt.client as mqtt 

9import structlog 

10from paho.mqtt.properties import Properties 

11from paho.mqtt.reasoncodes import ReasonCode 

12from PIL import Image 

13 

14import anpr2mqtt 

15from anpr2mqtt.settings import CameraSettings, EventSettings, HomeAssistantSettings, Target 

16from anpr2mqtt.tracker import Sighting 

17 

18from .const import ImageInfo 

19 

20log = structlog.get_logger() 

21 

22 

23class HomeAssistantPublisher: 

24 def __init__(self, client: mqtt.Client, cfg: HomeAssistantSettings) -> None: 

25 self.client: mqtt.Client = client 

26 self.hass_status_topic: str = cfg.status_topic 

27 self.discovery_topic_prefix: str = cfg.discovery_topic_root 

28 self.device_creation: bool = cfg.device_creation 

29 self.republish: dict[str, dict[str, Any]] = {} 

30 self.hass_online: bool | None = None 

31 

32 def start(self) -> None: 

33 log.info("Subscribing to Home Assistant birth and last will at %s", self.hass_status_topic) 

34 self.client.on_message = self.on_message 

35 self.client.on_subscribe = self.on_subscribe 

36 self.client.on_unsubscribe = self.on_unsubscribe 

37 self.client.subscribe(self.hass_status_topic) 

38 

39 def on_subscribe( 

40 self, 

41 _client: mqtt.Client, 

42 userdata: Any, 

43 mid: int, 

44 reason_code_list: list[ReasonCode], 

45 properties: Properties | None = None, 

46 ) -> None: 

47 log.debug("on_subscribe, userdata=%s, mid=%s, reasons=%s, properties=%s", userdata, mid, reason_code_list, properties) 

48 

49 def on_unsubscribe( 

50 self, 

51 _client: mqtt.Client, 

52 userdata: Any, 

53 mid: int, 

54 reason_code_list: list[ReasonCode], 

55 properties: Properties | None = None, 

56 ) -> None: 

57 log.debug("on_unsubscribe, userdata=%s, mid=%s, reasons=%s, properties=%s", userdata, mid, reason_code_list, properties) 

58 

59 def on_message(self, _client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage) -> None: 

60 """Callback for incoming MQTT messages""" # noqa: D401 

61 if msg.topic == self.hass_status_topic: 

62 decoded: str | None = msg.payload.decode("utf-8") if msg.payload else None 

63 if decoded == "offline": 

64 log.warn("Home Assistant gone offline") 

65 self.hass_online = False 

66 elif decoded == "online": 

67 if self.hass_online is False: 

68 log.info("Home Assistant back online, republishing discoveries") 

69 self.hass_online = True 

70 self.republish_discovery(f"{msg.topic}_{decoded}") 

71 else: 

72 log.info("Home Assistant online") 

73 self.hass_online = True 

74 else: 

75 log.warn("Unknown Home Assistant status payload: %s", msg.payload) 

76 else: 

77 log.debug("Unknown message on %s", msg.topic) 

78 

79 def republish_discovery(self, source_event: str) -> None: 

80 for topic, payload in self.republish.items(): 

81 log.debug("Republishing to %s for %s", topic, source_event) 

82 # add jitter to republish to reduce herd load on HA after restart 

83 time.sleep(random.randint(1, 10)) # noqa: S311 

84 payload["trigger"] = source_event 

85 self.client.publish(topic, json.dumps(payload)) 

86 

87 def publish_sensor_discovery(self, state_topic: str, event_config: EventSettings, camera: CameraSettings) -> None: 

88 name: str = event_config.description or f"{event_config.event} {camera.name}" 

89 payload: dict[str, Any] = { 

90 "o": { 

91 "name": "anpr2mqtt", 

92 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

93 "url": "https://anpr2mqtt.rhizomatics.org.uk", 

94 }, 

95 "device_class": None, 

96 "value_template": "{{ value_json.target }}", 

97 "default_entity_id": f"sensor.{event_config.event}_{event_config.camera}", 

98 "unique_id": f"{event_config.event}_{event_config.camera}", 

99 "state_topic": state_topic, 

100 "json_attributes_topic": state_topic, 

101 "name": name, 

102 } 

103 if event_config.icon: 103 ↛ 105line 103 didn't jump to line 105 because the condition on line 103 was always true

104 payload["icon"] = event_config.icon 

105 if self.device_creation: 

106 self.add_device_info(payload, camera) 

107 topic: str = f"{self.discovery_topic_prefix}/sensor/{event_config.camera}/{event_config.event}/config" 

108 msg: str = json.dumps(payload) 

109 self.client.publish(topic, payload=msg, qos=0, retain=True) 

110 self.republish[topic] = payload 

111 log.info("Published HA MQTT sensor Discovery message to %s", topic) 

112 

113 def publish_image_discovery( 

114 self, state_topic: str, image_topic: str, event_config: EventSettings, camera: CameraSettings 

115 ) -> None: 

116 name: str = event_config.description or f"{event_config.event} {event_config.camera}" 

117 payload = { 

118 "o": { 

119 "name": "anpr2mqtt", 

120 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

121 "url": "https://anpr2mqtt.rhizomatics.org.uk", 

122 }, 

123 "device_class": None, 

124 "unique_id": f"{event_config.event}_{event_config.camera}", 

125 "default_entity_id": f"image.{event_config.event}_{event_config.camera}", 

126 "image_topic": image_topic, 

127 "json_attributes_topic": state_topic, 

128 "icon": event_config.icon, 

129 "name": name, 

130 } 

131 if self.device_creation: 

132 self.add_device_info(payload, camera) 

133 topic = f"{self.discovery_topic_prefix}/image/{event_config.camera}/{event_config.event}/config" 

134 msg = json.dumps(payload) 

135 self.client.publish(topic, payload=msg, qos=0, retain=True) 

136 self.republish[topic] = payload 

137 log.info("Published HA MQTT Discovery message to %s", topic) 

138 

139 def publish_camera_discovery( 

140 self, state_topic: str, image_topic: str, event_config: EventSettings, camera: CameraSettings 

141 ) -> None: 

142 name: str = event_config.description or f"{event_config.event} {event_config.camera}" 

143 payload = { 

144 "o": { 

145 "name": "anpr2mqtt", 

146 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

147 "url": "https://anpr2mqtt.rhizomatics.org.uk", 

148 }, 

149 "unique_id": f"{event_config.event}_{camera.name}", 

150 "default_entity_id": f"camera.{event_config.event}_{camera.name}_anpr", 

151 "topic": image_topic, 

152 "json_attributes_topic": state_topic, 

153 "icon": event_config.icon, 

154 "name": name, 

155 } 

156 if self.device_creation: 

157 self.add_device_info(payload, camera) 

158 topic = f"{self.discovery_topic_prefix}/camera/{camera.name}/{event_config.event}/config" 

159 msg = json.dumps(payload) 

160 self.client.publish(topic, payload=msg, qos=0, retain=True) 

161 self.republish[topic] = payload 

162 log.info("Published HA MQTT Discovery message to %s", topic) 

163 

164 def publish_target_sensor_discovery( 

165 self, entity_id: str, target_type: str, targets: list[Target], state_topic: str, icon: str | None 

166 ) -> None: 

167 payload: dict[str, Any] = { 

168 "o": { 

169 "name": "anpr2mqtt", 

170 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

171 "url": "https://anpr2mqtt.rhizomatics.org.uk", 

172 }, 

173 "unique_id": f"{target_type}_{hash(':'.join(t.id for t in targets))}", 

174 "default_entity_id": f"sensor.{entity_id}", 

175 "name": targets[0].description if len(targets) == 1 else entity_id, 

176 "state_topic": state_topic, 

177 "json_attributes_topic": state_topic, 

178 "value_template": "{{ value_json.last_seen }}", 

179 "device_class": "timestamp", 

180 } 

181 if icon: 

182 payload["icon"] = icon 

183 topic = f"{self.discovery_topic_prefix}/sensor/{entity_id}/config" 

184 msg = json.dumps(payload) 

185 self.client.publish(topic, payload=msg, qos=0, retain=True) 

186 self.republish[topic] = payload 

187 log.info("Published HA MQTT target sensor Discovery message to %s", topic) 

188 

189 def publish_target_state(self, state_topic: str, time_analysis: dict[str, Any], description: str | None = None) -> None: 

190 payload: dict[str, Any] = {**time_analysis} 

191 if description: 

192 payload["description"] = description 

193 try: 

194 msg = json.dumps(payload) 

195 self.client.publish(state_topic, payload=msg, qos=0, retain=True) 

196 log.debug("Published target state to %s: %s", state_topic, payload) 

197 except Exception as e: 

198 log.error("Failed to publish target state to %s: %s", state_topic, e, exc_info=1) 

199 

200 def add_device_info(self, payload: dict[str, Any], camera: CameraSettings) -> None: 

201 payload["dev"] = { 

202 "name": f"anpr2mqtt on {camera.name}", 

203 "sw_version": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

204 "manufacturer": "rhizomatics", 

205 "identifiers": [f"{camera.name}.anpr2mqtt"], 

206 } 

207 if camera.area: 

208 payload["dev"]["suggested_area"] = camera.area 

209 

210 def post_state_message( 

211 self, 

212 topic: str, 

213 sighting: Sighting | None, 

214 event_config: EventSettings, 

215 camera: CameraSettings, 

216 extra_info: dict[str, Any] | None = None, 

217 image_info: ImageInfo | None = None, 

218 time_analysis: dict[str, Any] | None = None, 

219 url: str | None = None, 

220 error: str | None = None, 

221 file_path: Path | None = None, 

222 reg_info: Any = None, 

223 source: str | None = None, 

224 frigate_event_id: str | None = None, 

225 frigate_ui_url: str | None = None, 

226 ) -> None: 

227 

228 payload: dict[str, Any] = sighting.as_dict() if sighting else {"target": None, "target_type": event_config.target_type} 

229 if payload.get("description") is None: 

230 payload["description"] = event_config.default_description 

231 payload.update( 

232 { 

233 event_config.target_type: sighting.target.id if sighting else None, 

234 "event": event_config.event, 

235 "camera": camera.name or "UNKNOWN", 

236 "area": camera.area, 

237 "live_url": camera.live_url, 

238 "reg_info": reg_info, 

239 "history": time_analysis, 

240 } 

241 ) 

242 if extra_info: 

243 payload.update(extra_info) 

244 if error: 

245 payload["error"] = error 

246 if url is not None: 

247 payload["event_image_url"] = url 

248 if file_path is not None: 

249 payload["file_path"] = str(file_path) 

250 if source is not None: 

251 payload["source"] = source 

252 if frigate_event_id is not None: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 payload["frigate_event_id"] = frigate_event_id 

254 if frigate_ui_url is not None: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 payload["frigate_ui_url"] = frigate_ui_url 

256 

257 try: 

258 if image_info: 

259 payload.update( 

260 { 

261 "event_time": image_info.timestamp.isoformat(), 

262 "image_event": image_info.event, 

263 "ext": image_info.ext, 

264 "image_size": image_info.size, 

265 } 

266 ) 

267 msg: str = json.dumps(payload) 

268 self.client.publish(topic, payload=msg, qos=0, retain=True) 

269 log.debug("Published HA MQTT State message to %s: %s", topic, payload) 

270 except Exception as e: 

271 log.error("Failed to publish event %s: %s", payload, e, exc_info=1) 

272 

273 def post_image_message(self, topic: str, image: Image.Image | None, img_format: str = "JPEG") -> None: 

274 try: 

275 if image is None: 

276 self.client.publish(topic, payload=None, qos=0, retain=True) 

277 log.debug("Cleared HA MQTT Image message at %s", topic) 

278 return 

279 img_byte_arr = BytesIO() 

280 image.save(img_byte_arr, format=img_format) 

281 img_bytes = img_byte_arr.getvalue() 

282 

283 self.client.publish(topic, payload=img_bytes, qos=0, retain=True) 

284 log.debug("Published HA MQTT Image message to %s: %s bytes", topic, len(img_bytes)) 

285 except Exception as e: 

286 log.error("Failed to publish image entity: %s", e, exc_info=1)