Coverage for src / anpr2mqtt / tracker.py: 93%

155 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-30 16:07 +0000

1import datetime as dt 

2import json 

3import re 

4from dataclasses import dataclass 

5from typing import Any, cast 

6 

7import structlog 

8import tzlocal 

9 

10from anpr2mqtt.normalizers import Normalizer, fuzzy_match 

11from anpr2mqtt.settings import ( 

12 Target, 

13 TargetSettings, 

14 TrackerSettings, 

15) 

16 

17log = structlog.get_logger() 

18 

19 

20@dataclass 

21class Sighting: 

22 target: Target 

23 uncorrected: str | None = None 

24 ignore: bool = False 

25 previous_sightings: list[str] | None = None 

26 

27 def as_dict(self) -> dict[str, Any]: 

28 result = self.target.as_dict() 

29 result.update({"orig_target": self.uncorrected, "ignore": self.ignore}) 

30 return result 

31 

32 

33class Tracker: 

34 def __init__( 

35 self, 

36 target_type: str, 

37 tracker_config: TrackerSettings, 

38 region: str | None = None, 

39 target_config: TargetSettings | None = None, 

40 auto_match_tolerance: int = 0, 

41 ) -> None: 

42 self.target_type: str = target_type 

43 self.tracker_config: TrackerSettings = tracker_config 

44 self.entities: dict[str, list[Target]] = {} 

45 self.ids: dict[str, Target] = {} 

46 self._target_config: TargetSettings | None = None 

47 self.target_config = target_config 

48 self.auto_match_tolerance = auto_match_tolerance 

49 self.region: str | None = region 

50 self.normalizer: Normalizer | None = None 

51 if region and target_type: 

52 self.normalizer = Normalizer(target_type=target_type, region=region) 

53 

54 @property 

55 def target_config(self) -> TargetSettings | None: 

56 return self._target_config 

57 

58 @target_config.setter 

59 def target_config(self, value: TargetSettings | None) -> None: 

60 self._target_config = value 

61 self.ids = {} 

62 self.entities = {} 

63 if value: 

64 for target_group in value.groups: 

65 for target in target_group.members: 

66 self.ids[target.id] = target 

67 entity_id: str | None = target.entity_id 

68 if entity_id is not None: 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true

69 self.entities.setdefault(entity_id, []) 

70 self.entities[entity_id].append(target) 

71 

72 def history(self, target_id: str, target_type: str) -> list[str]: 

73 target_id = target_id or "UNKNOWN" 

74 target_type_path = self.tracker_config.data_dir / target_type 

75 target_type_path.mkdir(exist_ok=True) 

76 target_file = target_type_path / f"{target_id}.json" 

77 

78 try: 

79 if target_file.exists(): 

80 with target_file.open("r") as f: 

81 return cast("list[str]", json.load(f)) 

82 

83 except Exception as e: 

84 log.exception("Failed to find sightings for %s:%s", target_id, e) 

85 return [] 

86 

87 def record(self, target: str, target_type: str, event_dt: dt.datetime | None) -> dict[str, Any]: 

88 target = target or "UNKNOWN" 

89 target_type_path = self.tracker_config.data_dir / target_type 

90 target_type_path.mkdir(exist_ok=True) 

91 target_file = target_type_path / f"{target}.json" 

92 sightings = self.history(target, target_type) 

93 time_analysis: dict[str, Any] = {} 

94 try: 

95 time_analysis = compute_time_analysis(sightings, event_dt) 

96 min_gap = self.tracker_config.min_visit_gap_seconds 

97 if min_gap > 0 and sightings: 

98 try: 

99 last_ts = dt.datetime.fromisoformat(sightings[-1]) 

100 current_ts = event_dt if event_dt is not None else dt.datetime.now(tz=tzlocal.get_localzone()) 

101 if last_ts.tzinfo is None: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true

102 last_ts = last_ts.replace(tzinfo=tzlocal.get_localzone()) 

103 if current_ts.tzinfo is None: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 current_ts = current_ts.replace(tzinfo=tzlocal.get_localzone()) 

105 elapsed = (current_ts - last_ts).total_seconds() 

106 if 0 <= elapsed < min_gap: 

107 log.info("Visit gap: %s seen %.1fs ago (gap=%ds), not recording", target, elapsed, min_gap) 

108 time_analysis["is_new_visit"] = False 

109 return time_analysis 

110 except Exception as gap_err: 

111 log.warning("Visit gap check failed for %s: %s", target, gap_err) 

112 sightings.append(event_dt.isoformat() if event_dt else dt.datetime.now(tz=tzlocal.get_localzone()).isoformat()) 

113 with target_file.open("w") as f: 

114 json.dump(sightings, f) 

115 time_analysis["is_new_visit"] = True 

116 except Exception as e: 

117 log.exception("Failed to record sightings for %s:%s", target, e) 

118 return time_analysis 

119 

120 def find(self, target_id: str) -> Sighting: 

121 result: Sighting = Sighting( 

122 target=Target(id=target_id, target_type=self.target_type, priority="high", lookup=True), uncorrected=target_id 

123 ) 

124 if not target_id or self.target_config is None: 

125 # empty dict to make home assistant template logic easier 

126 return result 

127 

128 if self.normalizer: 

129 normalised = self.normalizer.normalize(target_id) 

130 if normalised: 130 ↛ 135line 130 didn't jump to line 135 because the condition on line 130 was always true

131 log.info("%s %s normalised %s -> %s", self.region, self.target_type, target_id, normalised) 

132 target_id = normalised 

133 result.target.id = normalised 

134 

135 lookup_id = target_id 

136 for corrected_target, patterns in self.target_config.correction.items(): 

137 if any(re.match(pat, target_id) for pat in patterns) and corrected_target != target_id: 137 ↛ 136line 137 didn't jump to line 136 because the condition on line 137 was always true

138 result.target.id = corrected_target 

139 lookup_id = corrected_target 

140 log.info("Corrected target %s -> %s", target_id, lookup_id) 

141 break 

142 if lookup_id == target_id: 

143 for registered_target in self.ids.values(): 

144 if any(re.match(pat, target_id) for pat in registered_target.correction): 

145 lookup_id = registered_target.id 

146 result.target.id = registered_target.id 

147 log.info("Corrected target %s -> %s (per-target)", target_id, lookup_id) 

148 break 

149 for pat in self.target_config.ignore: 

150 if re.match(pat, target_id): 150 ↛ 149line 150 didn't jump to line 149 because the condition on line 150 was always true

151 log.info("Ignoring %s matching ignore pattern %s", target_id, pat) 

152 result.ignore = True 

153 result.target.priority = "low" 

154 if result.target.group is None: # not yet found in registered lists 154 ↛ 156line 154 didn't jump to line 156 because the condition on line 154 was always true

155 result.target.description = "Ignored" 

156 break 

157 max_dist = self.auto_match_tolerance 

158 target: Target | None = None 

159 registered_match: str | None = ( 

160 lookup_id 

161 if lookup_id in self.ids 

162 else (fuzzy_match(lookup_id, max_dist, list(self.ids.keys())) if max_dist > 0 else None) 

163 ) 

164 if registered_match: 

165 target = self.ids[registered_match] 

166 

167 if target: 

168 if registered_match != lookup_id: 

169 log.info("Fuzzy-matched %s to registered plate %s", lookup_id, registered_match) 

170 result.target = target 

171 return result 

172 

173 

174def compute_time_analysis(sightings: list[str], current_dt: dt.datetime | None = None) -> dict[str, Any]: 

175 """Derive visit history and time-of-day statistics from previous sightings. 

176 

177 Called before the current visit is appended, so all counts/times reflect prior history only. 

178 

179 Returns a dict with: 

180 - previous_sightings: int — number of times seen before the current visit 

181 - last_seen: ISO datetime string of the most recent prior sighting, or None 

182 - hourly_counts: dict[int,int] of 24 ints, index = hour (0-23) 

183 - earliest_time: "HH:MM:SS" of the earliest time-of-day previously seen, or None 

184 - latest_time: "HH:MM:SS" of the latest time-of-day previously seen, or None 

185 - within_time_range: bool — current time falls within [earliest, latest], or None if no history 

186 """ 

187 hourly_counts: dict[int, int] = {} 

188 times: list[dt.time] = [] 

189 last_seen: str | None = sightings[-1] if sightings else None 

190 for s in sightings: 

191 try: 

192 ts: dt.datetime = dt.datetime.fromisoformat(s) 

193 hourly_counts.setdefault(ts.hour, 0) 

194 hourly_counts[ts.hour] += 1 

195 times.append(ts.replace(tzinfo=None).time()) 

196 except Exception as e: 

197 log.warning("Skipping unparsable sighting timestamp %r: %s", s, e) 

198 

199 earliest = min(times) if times else None 

200 latest = max(times) if times else None 

201 

202 result = { 

203 "previous_sightings": len(sightings), 

204 "last_seen": last_seen, 

205 "hourly_counts": hourly_counts, 

206 "earliest_time": earliest.isoformat() if earliest else None, 

207 "latest_time": latest.isoformat() if latest else None, 

208 } 

209 if current_dt is not None: 

210 if earliest is not None and latest is not None: 

211 current_t = current_dt.replace(tzinfo=None).time() 

212 result["within_time_range"] = earliest <= current_t <= latest 

213 else: 

214 result["within_time_range"] = None 

215 

216 return result