Coverage for src / anpr2mqtt / hass.py: 100%
121 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 17:29 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 17:29 +0000
1import datetime as dt
2import json
3import random
4import time
5from io import BytesIO
6from pathlib import Path
7from typing import Any
9import paho.mqtt.client as mqtt
10import structlog
11from paho.mqtt.properties import Properties
12from paho.mqtt.reasoncodes import ReasonCode
13from PIL import Image
15import anpr2mqtt
16from anpr2mqtt.settings import CameraSettings, EventSettings, HomeAssistantSettings
18from .const import ImageInfo
20log = structlog.get_logger()
23class HomeAssistantPublisher:
24 def __init__(self, client: mqtt.Client, cfg: HomeAssistantSettings) -> None:
25 self.client: mqtt.Client = client
26 self.hass_status_topic: str = cfg.status_topic
27 self.discovery_topic_prefix: str = cfg.discovery_topic_root
28 self.device_creation: bool = cfg.device_creation
29 self.republish: dict[str, Any] = {}
30 self.hass_online: bool | None = None
32 def start(self) -> None:
33 log.info("Subscribing to Home Assistant birth and last will at %s", self.hass_status_topic)
34 self.client.on_message = self.on_message
35 self.client.on_subscribe = self.on_subscribe
36 self.client.on_unsubscribe = self.on_unsubscribe
37 self.client.subscribe(self.hass_status_topic)
39 def on_subscribe(
40 self,
41 _client: mqtt.Client,
42 userdata: Any,
43 mid: int,
44 reason_code_list: list[ReasonCode],
45 properties: Properties | None = None,
46 ) -> None:
47 log.debug("on_subscribe, userdata=%s, mid=%s, reasons=%s, properties=%s", userdata, mid, reason_code_list, properties)
49 def on_unsubscribe(
50 self,
51 _client: mqtt.Client,
52 userdata: Any,
53 mid: int,
54 reason_code_list: list[ReasonCode],
55 properties: Properties | None = None,
56 ) -> None:
57 log.debug("on_unsubscribe, userdata=%s, mid=%s, reasons=%s, properties=%s", userdata, mid, reason_code_list, properties)
59 def on_message(self, _client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage) -> None:
60 """Callback for incoming MQTT messages""" # noqa: D401
61 if msg.topic == self.hass_status_topic:
62 decoded: str | None = msg.payload.decode("utf-8") if msg.payload else None
63 if decoded == "offline":
64 log.warn("Home Assistant gone offline")
65 self.hass_online = False
66 elif decoded == "online":
67 if self.hass_online is False:
68 log.info("Home Assistant back online")
69 self.hass_online = True
70 self.republish_discovery()
71 else:
72 log.info("Home Assistant online")
73 self.hass_online = True
74 else:
75 log.warn("Unknown Home Assistant status payload: %s", msg.payload)
76 else:
77 log.debug("Unknown message on %s", msg.topic)
79 def republish_discovery(self) -> None:
80 for topic, payload in self.republish.items():
81 log.debug("Republishing to %s", topic)
82 # add jitter to republish to reduce herd load on HA after restart
83 time.sleep(random.randint(1, 10)) # noqa: S311
84 self.client.publish(topic, payload)
86 def publish_sensor_discovery(self, state_topic: str, event_config: EventSettings, camera: CameraSettings) -> None:
87 name: str = event_config.description or f"{event_config.event} {camera.name}"
88 payload: dict[str, Any] = {
89 "o": {
90 "name": "anpr2mqtt",
91 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
92 "url": "https://anpr2mqtt.rhizomatics.org.uk",
93 },
94 "device_class": None,
95 "value_template": "{{ value_json.target }}",
96 "default_entity_id": f"sensor.{event_config.event}_{event_config.camera}",
97 "unique_id": f"{event_config.event}_{event_config.camera}",
98 "state_topic": state_topic,
99 "json_attributes_topic": state_topic,
100 "icon": "mdi:car-back",
101 "name": name,
102 }
103 if self.device_creation:
104 self.add_device_info(payload, camera)
105 topic: str = f"{self.discovery_topic_prefix}/sensor/{event_config.camera}/{event_config.event}/config"
106 msg: str = json.dumps(payload)
107 self.client.publish(topic, payload=msg, qos=0, retain=True)
108 self.republish[topic] = msg
109 log.info("Published HA MQTT sensor Discovery message to %s", topic)
111 def publish_image_discovery(
112 self, state_topic: str, image_topic: str, event_config: EventSettings, camera: CameraSettings
113 ) -> None:
114 name: str = event_config.description or f"{event_config.event} {event_config.camera}"
115 payload = {
116 "o": {
117 "name": "anpr2mqtt",
118 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
119 "url": "https://anpr2mqtt.rhizomatics.org.uk",
120 },
121 "device_class": None,
122 "unique_id": f"{event_config.event}_{event_config.camera}",
123 "default_entity_id": f"image.{event_config.event}_{event_config.camera}",
124 "image_topic": image_topic,
125 "json_attributes_topic": state_topic,
126 "icon": "mdi:car-back",
127 "name": name,
128 }
129 if self.device_creation:
130 self.add_device_info(payload, camera)
131 topic = f"{self.discovery_topic_prefix}/image/{event_config.camera}/{event_config.event}/config"
132 msg = json.dumps(payload)
133 self.client.publish(topic, payload=msg, qos=0, retain=True)
134 self.republish[topic] = msg
135 log.info("Published HA MQTT Discovery message to %s", topic)
137 def publish_camera_discovery(
138 self, state_topic: str, image_topic: str, event_config: EventSettings, camera: CameraSettings
139 ) -> None:
140 name: str = event_config.description or f"{event_config.event} {event_config.camera}"
141 payload = {
142 "o": {
143 "name": "anpr2mqtt",
144 "sw": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
145 "url": "https://anpr2mqtt.rhizomatics.org.uk",
146 },
147 "unique_id": f"{event_config.event}_{camera.name}",
148 "default_entity_id": f"camera.{event_config.event}_{camera.name}_anpr",
149 "topic": image_topic,
150 "json_attributes_topic": state_topic,
151 "icon": "mdi:car-back",
152 "name": name,
153 }
154 if self.device_creation:
155 self.add_device_info(payload, camera)
156 topic = f"{self.discovery_topic_prefix}/camera/{camera.name}/{event_config.event}/config"
157 msg = json.dumps(payload)
158 self.client.publish(topic, payload=msg, qos=0, retain=True)
159 self.republish[topic] = msg
160 log.info("Published HA MQTT Discovery message to %s", topic)
162 def add_device_info(self, payload: dict[str, Any], camera: CameraSettings) -> None:
163 payload["dev"] = {
164 "name": f"anpr2mqtt on {camera.name}",
165 "sw_version": anpr2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
166 "manufacturer": "rhizomatics",
167 "identifiers": [f"{camera.name}.anpr2mqtt"],
168 }
169 if camera.area:
170 payload["dev"]["suggested_area"] = camera.area
172 def post_state_message(
173 self,
174 topic: str,
175 target: str | None,
176 event_config: EventSettings,
177 camera: CameraSettings,
178 ocr_fields: dict[str, str | None] | None = None,
179 image_info: ImageInfo | None = None,
180 classification: dict[str, Any] | None = None,
181 previous_sightings: int | None = None,
182 last_sighting: dt.datetime | None = None,
183 url: str | None = None,
184 error: str | None = None,
185 file_path: Path | None = None,
186 reg_info: Any = None,
187 ) -> None:
188 payload: dict[str, Any] = {
189 "target": target,
190 "target_type": event_config.target_type,
191 event_config.target_type: target,
192 "event": event_config.event,
193 "camera": camera.name or "UNKNOWN",
194 "area": camera.area,
195 "live_url": camera.live_url,
196 "reg_info": reg_info,
197 }
198 if ocr_fields:
199 payload.update(ocr_fields)
200 if error:
201 payload["error"] = error
202 if url is not None:
203 payload["event_image_url"] = url
204 if file_path is not None:
205 payload["file_path"] = str(file_path)
206 if classification is not None:
207 payload.update(classification)
208 if previous_sightings is not None:
209 payload["previous_sightings"] = previous_sightings
210 if last_sighting is not None:
211 payload["last_sighting"] = last_sighting.isoformat()
213 try:
214 if image_info:
215 payload.update(
216 {
217 "event_time": image_info.timestamp.isoformat(),
218 "image_event": image_info.event,
219 "ext": image_info.ext,
220 "image_size": image_info.size,
221 }
222 )
223 msg: str = json.dumps(payload)
224 self.client.publish(topic, payload=msg, qos=0, retain=True)
225 log.debug("Published HA MQTT State message to %s: %s", topic, payload)
226 except Exception as e:
227 log.error("Failed to publish event %s: %s", payload, e, exc_info=1)
229 def post_image_message(self, topic: str, image: Image.Image, img_format: str = "JPEG") -> None:
230 try:
231 img_byte_arr = BytesIO()
232 image.save(img_byte_arr, format=img_format)
233 img_bytes = img_byte_arr.getvalue()
235 self.client.publish(topic, payload=img_bytes, qos=0, retain=True)
236 log.debug("Published HA MQTT Image message to %s: %s bytes", topic, len(img_bytes))
237 except Exception as e:
238 log.error("Failed to publish image entity: %s", e, exc_info=1)