Coverage for src / anpr2mqtt / hass.py: 87%
150 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-30 16:07 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-30 16:07 +0000
1import json
2import random
3import time
4from io import BytesIO
5from pathlib import Path
6from typing import Any
8import paho.mqtt.client as mqtt
9import structlog
10from paho.mqtt.properties import Properties
11from paho.mqtt.reasoncodes import ReasonCode
12from PIL import Image
14import anpr2mqtt
15from anpr2mqtt.settings import CameraSettings, EventSettings, HomeAssistantSettings, Target
16from anpr2mqtt.tracker import Sighting
18from .const import ImageInfo
20log = structlog.get_logger()
23class HomeAssistantPublisher:
24 def __init__(self, client: mqtt.Client, cfg: HomeAssistantSettings) -> None:
25 self.client: mqtt.Client = client
26 self.hass_status_topic: str = cfg.status_topic
27 self.discovery_topic_prefix: str = cfg.discovery_topic_root
28 self.device_creation: bool = cfg.device_creation
29 self.republish: dict[str, dict[str, Any]] = {}
30 self.hass_online: bool | None = None
32 def start(self) -> None:
33 log.info("Subscribing to Home Assistant birth and last will at %s", self.hass_status_topic)
34 self.client.on_message = self.on_message
35 self.client.on_subscribe = self.on_subscribe
36 self.client.on_unsubscribe = self.on_unsubscribe
37 self.client.subscribe(self.hass_status_topic)
39 def on_subscribe(
40 self,
41 _client: mqtt.Client,
42 userdata: Any,
43 mid: int,
44 reason_code_list: list[ReasonCode],
45 properties: Properties | None = None,
46 ) -> None:
47 log.debug("on_subscribe, userdata=%s, mid=%s, reasons=%s, properties=%s", userdata, mid, reason_code_list, properties)
49 def on_unsubscribe(
50 self,
51 _client: mqtt.Client,
52 userdata: Any,
53 mid: int,
54 reason_code_list: list[ReasonCode],
55 properties: Properties | None = None,
56 ) -> None:
57 log.debug("on_unsubscribe, userdata=%s, mid=%s, reasons=%s, properties=%s", userdata, mid, reason_code_list, properties)
59 def on_message(self, _client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage) -> None:
60 """Callback for incoming MQTT messages""" # noqa: D401
61 if msg.topic == self.hass_status_topic:
62 decoded: str | None = msg.payload.decode("utf-8") if msg.payload else None
63 if decoded == "offline":
64 log.warn("Home Assistant gone offline")
65 self.hass_online = False
66 elif decoded == "online":
67 if self.hass_online is False:
68 log.info("Home Assistant back online, republishing discoveries")
69 self.hass_online = True
70 self.republish_discovery(f"{msg.topic}_{decoded}")
71 else:
72 log.info("Home Assistant online")
73 self.hass_online = True
74 else:
75 log.warn("Unknown Home Assistant status payload: %s", msg.payload)
76 else:
77 log.debug("Unknown message on %s", msg.topic)
79 def republish_discovery(self, source_event: str) -> None:
80 for topic, payload in self.republish.items():
81 log.debug("Republishing to %s for %s", topic, source_event)
82 # add jitter to republish to reduce herd load on HA after restart
83 time.sleep(random.randint(1, 10)) # noqa: S311
84 payload["trigger"] = source_event
85 self.client.publish(topic, json.dumps(payload))
87 def publish_sensor_discovery(self, state_topic: str, event_config: EventSettings, camera: CameraSettings) -> None:
88 name: str = event_config.description or f"{event_config.event} {camera.name}"
89 payload: dict[str, Any] = {
90 "o": {
91 "name": "anpr2mqtt",
92 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
93 "url": "https://anpr2mqtt.rhizomatics.org.uk",
94 },
95 "device_class": None,
96 "value_template": "{{ value_json.target }}",
97 "default_entity_id": f"sensor.{event_config.event}_{event_config.camera}",
98 "unique_id": f"{event_config.event}_{event_config.camera}",
99 "state_topic": state_topic,
100 "json_attributes_topic": state_topic,
101 "name": name,
102 }
103 if event_config.icon: 103 ↛ 105line 103 didn't jump to line 105 because the condition on line 103 was always true
104 payload["icon"] = event_config.icon
105 if self.device_creation:
106 self.add_device_info(payload, camera)
107 topic: str = f"{self.discovery_topic_prefix}/sensor/{event_config.camera}/{event_config.event}/config"
108 msg: str = json.dumps(payload)
109 self.client.publish(topic, payload=msg, qos=0, retain=True)
110 self.republish[topic] = payload
111 log.info("Published HA MQTT sensor Discovery message to %s", topic)
113 def publish_image_discovery(
114 self, state_topic: str, image_topic: str, event_config: EventSettings, camera: CameraSettings
115 ) -> None:
116 name: str = event_config.description or f"{event_config.event} {event_config.camera}"
117 payload = {
118 "o": {
119 "name": "anpr2mqtt",
120 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
121 "url": "https://anpr2mqtt.rhizomatics.org.uk",
122 },
123 "device_class": None,
124 "unique_id": f"{event_config.event}_{event_config.camera}",
125 "default_entity_id": f"image.{event_config.event}_{event_config.camera}",
126 "image_topic": image_topic,
127 "json_attributes_topic": state_topic,
128 "icon": event_config.icon,
129 "name": name,
130 }
131 if self.device_creation:
132 self.add_device_info(payload, camera)
133 topic = f"{self.discovery_topic_prefix}/image/{event_config.camera}/{event_config.event}/config"
134 msg = json.dumps(payload)
135 self.client.publish(topic, payload=msg, qos=0, retain=True)
136 self.republish[topic] = payload
137 log.info("Published HA MQTT Discovery message to %s", topic)
139 def publish_camera_discovery(
140 self, state_topic: str, image_topic: str, event_config: EventSettings, camera: CameraSettings
141 ) -> None:
142 name: str = event_config.description or f"{event_config.event} {event_config.camera}"
143 payload = {
144 "o": {
145 "name": "anpr2mqtt",
146 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
147 "url": "https://anpr2mqtt.rhizomatics.org.uk",
148 },
149 "unique_id": f"{event_config.event}_{camera.name}",
150 "default_entity_id": f"camera.{event_config.event}_{camera.name}_anpr",
151 "topic": image_topic,
152 "json_attributes_topic": state_topic,
153 "icon": event_config.icon,
154 "name": name,
155 }
156 if self.device_creation:
157 self.add_device_info(payload, camera)
158 topic = f"{self.discovery_topic_prefix}/camera/{camera.name}/{event_config.event}/config"
159 msg = json.dumps(payload)
160 self.client.publish(topic, payload=msg, qos=0, retain=True)
161 self.republish[topic] = payload
162 log.info("Published HA MQTT Discovery message to %s", topic)
164 def publish_target_sensor_discovery(
165 self, entity_id: str, target_type: str, targets: list[Target], state_topic: str, icon: str | None
166 ) -> None:
167 payload: dict[str, Any] = {
168 "o": {
169 "name": "anpr2mqtt",
170 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
171 "url": "https://anpr2mqtt.rhizomatics.org.uk",
172 },
173 "unique_id": f"{target_type}_{hash(':'.join(t.id for t in targets))}",
174 "default_entity_id": f"sensor.{entity_id}",
175 "name": targets[0].description if len(targets) == 1 else entity_id,
176 "state_topic": state_topic,
177 "json_attributes_topic": state_topic,
178 "value_template": "{{ value_json.last_seen }}",
179 "device_class": "timestamp",
180 }
181 if icon:
182 payload["icon"] = icon
183 topic = f"{self.discovery_topic_prefix}/sensor/{entity_id}/config"
184 msg = json.dumps(payload)
185 self.client.publish(topic, payload=msg, qos=0, retain=True)
186 self.republish[topic] = payload
187 log.info("Published HA MQTT target sensor Discovery message to %s", topic)
189 def publish_target_state(self, state_topic: str, time_analysis: dict[str, Any], description: str | None = None) -> None:
190 payload: dict[str, Any] = {**time_analysis}
191 if description:
192 payload["description"] = description
193 try:
194 msg = json.dumps(payload)
195 self.client.publish(state_topic, payload=msg, qos=0, retain=True)
196 log.debug("Published target state to %s: %s", state_topic, payload)
197 except Exception as e:
198 log.error("Failed to publish target state to %s: %s", state_topic, e, exc_info=1)
200 def add_device_info(self, payload: dict[str, Any], camera: CameraSettings) -> None:
201 payload["dev"] = {
202 "name": f"anpr2mqtt on {camera.name}",
203 "sw_version": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
204 "manufacturer": "rhizomatics",
205 "identifiers": [f"{camera.name}.anpr2mqtt"],
206 }
207 if camera.area:
208 payload["dev"]["suggested_area"] = camera.area
210 def post_state_message(
211 self,
212 topic: str,
213 sighting: Sighting | None,
214 event_config: EventSettings,
215 camera: CameraSettings,
216 extra_info: dict[str, Any] | None = None,
217 image_info: ImageInfo | None = None,
218 time_analysis: dict[str, Any] | None = None,
219 url: str | None = None,
220 error: str | None = None,
221 file_path: Path | None = None,
222 reg_info: Any = None,
223 source: str | None = None,
224 frigate_event_id: str | None = None,
225 frigate_ui_url: str | None = None,
226 ) -> None:
228 payload: dict[str, Any] = sighting.as_dict() if sighting else {"target": None, "target_type": event_config.target_type}
229 if payload.get("description") is None:
230 payload["description"] = event_config.default_description
231 payload.update(
232 {
233 event_config.target_type: sighting.target.id if sighting else None,
234 "event": event_config.event,
235 "camera": camera.name or "UNKNOWN",
236 "area": camera.area,
237 "live_url": camera.live_url,
238 "reg_info": reg_info,
239 "history": time_analysis,
240 }
241 )
242 if extra_info:
243 payload.update(extra_info)
244 if error:
245 payload["error"] = error
246 if url is not None:
247 payload["event_image_url"] = url
248 if file_path is not None:
249 payload["file_path"] = str(file_path)
250 if source is not None:
251 payload["source"] = source
252 if frigate_event_id is not None: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 payload["frigate_event_id"] = frigate_event_id
254 if frigate_ui_url is not None: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 payload["frigate_ui_url"] = frigate_ui_url
257 try:
258 if image_info:
259 payload.update(
260 {
261 "event_time": image_info.timestamp.isoformat(),
262 "image_event": image_info.event,
263 "ext": image_info.ext,
264 "image_size": image_info.size,
265 }
266 )
267 msg: str = json.dumps(payload)
268 self.client.publish(topic, payload=msg, qos=0, retain=True)
269 log.debug("Published HA MQTT State message to %s: %s", topic, payload)
270 except Exception as e:
271 log.error("Failed to publish event %s: %s", payload, e, exc_info=1)
273 def post_image_message(self, topic: str, image: Image.Image | None, img_format: str = "JPEG") -> None:
274 try:
275 if image is None:
276 self.client.publish(topic, payload=None, qos=0, retain=True)
277 log.debug("Cleared HA MQTT Image message at %s", topic)
278 return
279 img_byte_arr = BytesIO()
280 image.save(img_byte_arr, format=img_format)
281 img_bytes = img_byte_arr.getvalue()
283 self.client.publish(topic, payload=img_bytes, qos=0, retain=True)
284 log.debug("Published HA MQTT Image message to %s: %s bytes", topic, len(img_bytes))
285 except Exception as e:
286 log.error("Failed to publish image entity: %s", e, exc_info=1)