Coverage for src / updates2mqtt / model.py: 97%
142 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-20 23:13 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-20 23:13 +0000
1import datetime as dt
2import json
3import time
4from abc import abstractmethod
5from collections.abc import AsyncGenerator, Callable
6from threading import Event
7from typing import Any
9import structlog
10from tzlocal import get_localzone
12from updates2mqtt.config import NodeConfig, PublishPolicy, UpdatePolicy, VersionPolicy
13from updates2mqtt.helpers import sanitize_name, timestamp
16class DiscoveryDetail:
17 def __init__(self) -> None:
18 self.captured: dt.datetime = dt.datetime.now(tz=get_localzone())
20 @abstractmethod
21 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]:
22 return {}
24 def __str__(self) -> str:
25 """Log friendly"""
26 return ",".join(f"{k}:{v}" for k, v in self.as_dict().items())
29class DiscoveryArtefactDetail(DiscoveryDetail):
30 """Provider specific detail"""
32 def __init__(self) -> None:
33 super().__init__()
36class DiscoveryInstallationDetail(DiscoveryDetail):
37 """Provider specific detail"""
39 def __init__(self) -> None:
40 super().__init__()
43class ReleaseDetail(DiscoveryDetail):
44 """The artefact source details
46 Note this may be an actual software package, or the source details of the wrapping of it
47 For example, some Docker images report the main source repo, and others where the Dockerfile deploy project lives
48 """
50 def __init__(
51 self,
52 name: str | None = None,
53 notes_url: str | None = None,
54 summary: str | None = None,
55 source_repo_url: str | None = None,
56 ) -> None:
57 super().__init__()
58 self.name = name
59 self.source_platform: str | None = None
60 self.source_repo_url: str | None = source_repo_url
61 self.source_url: str | None = None
62 self.version: str | None = None
63 self.revision: str | None = None
64 self.diff_url: str | None = None
65 self.notes_url: str | None = notes_url
66 self.title: str | None = None
67 self.summary: str | None = summary
68 self.net_score: int | None = None
70 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]:
71 if not self.summary and self.diff_url:
72 summary: str | None = f"<a href='{self.diff_url}'>{self.version or self.revision} Diff</a>"
73 else:
74 summary = self.summary
76 return {
77 "captured": self.captured.isoformat(),
78 "title": self.title,
79 "version": self.version,
80 "source_platform": self.source_platform,
81 "source_repo": self.source_repo_url,
82 "source": self.source_url,
83 "revision": self.revision,
84 "diff_url": self.diff_url,
85 "notes_url": self.notes_url,
86 "summary": summary,
87 "net_score": str(self.net_score) if self.net_score is not None else None,
88 }
91class Discovery:
92 """Discovered component from a scan"""
94 def __init__(
95 self,
96 provider: "ReleaseProvider",
97 name: str,
98 session: str,
99 node: str,
100 entity_picture_url: str | None = None,
101 current_version: str | None = None,
102 latest_version: str | None = None,
103 can_build: bool = False,
104 can_restart: bool = False,
105 can_pull: bool = False,
106 status: str = "on",
107 publish_policy: PublishPolicy = PublishPolicy.HOMEASSISTANT,
108 update_type: str | None = "Update",
109 update_policy: UpdatePolicy = UpdatePolicy.PASSIVE,
110 version_policy: VersionPolicy = VersionPolicy.AUTO,
111 version_basis: str | None = None,
112 title_template: str = "{discovery.update_type} for {discovery.name} on {discovery.node}",
113 device_icon: str | None = None,
114 custom: dict[str, Any] | None = None,
115 throttled: bool = False,
116 previous: "Discovery|None" = None,
117 release_detail: ReleaseDetail | None = None,
118 installation_detail: DiscoveryInstallationDetail | None = None,
119 current_detail: DiscoveryArtefactDetail | None = None,
120 latest_detail: DiscoveryArtefactDetail | None = None,
121 ) -> None:
122 self.provider: ReleaseProvider = provider
123 self.source_type: str = provider.source_type
124 self.session: str = session
125 self.name: str = sanitize_name(name)
126 self.node: str = node
127 self.entity_picture_url: str | None = entity_picture_url
128 self.current_version: str | None = current_version
129 self.latest_version: str | None = latest_version
130 self.can_pull: bool = can_pull
131 self.can_build: bool = can_build
132 self.can_restart: bool = can_restart
133 self.title_template: str | None = title_template
134 self.device_icon: str | None = device_icon
135 self.update_type: str | None = update_type
136 self.status: str = status
137 self.publish_policy: PublishPolicy = publish_policy
138 self.update_policy: UpdatePolicy = update_policy
139 self.version_policy: VersionPolicy = version_policy
140 self.version_basis: str | None = version_basis
141 self.update_last_attempt: float | None = None
142 self.custom: dict[str, Any] = custom or {}
143 self.throttled: bool = throttled
144 self.scan_count: int
145 self.first_timestamp: float
146 self.last_timestamp: float = time.time()
147 self.check_timestamp: float | None = time.time()
148 self.release_detail: ReleaseDetail | None = release_detail
149 self.current_detail: DiscoveryArtefactDetail | None = current_detail
150 self.latest_detail: DiscoveryArtefactDetail | None = latest_detail
151 self.installation_detail: DiscoveryInstallationDetail | None = installation_detail
153 if previous:
154 self.update_last_attempt = previous.update_last_attempt
155 self.first_timestamp = previous.first_timestamp
156 self.scan_count = previous.scan_count + 1
157 else:
158 self.first_timestamp = time.time()
159 self.scan_count = 1
160 if throttled and previous:
161 # roll forward last non-throttled check
162 self.check_timestamp = previous.check_timestamp
163 elif not throttled:
164 self.check_timestamp = time.time()
166 def __repr__(self) -> str:
167 """Build a custom string representation"""
168 return f"Discovery('{self.name}','{self.source_type}',current={self.current_version},latest={self.latest_version})"
170 def __str__(self) -> str:
171 """Dump the attrs"""
173 def stringify(v: Any) -> str | int | float | bool:
174 return str(v) if not isinstance(v, (str, int, float, bool)) else v
176 dump = {k: stringify(v) for k, v in self.__dict__.items()}
177 return json.dumps(dump)
179 @property
180 def can_update(self) -> bool:
181 return self.can_pull or self.can_build or self.can_restart
183 @property
184 def features(self) -> list[str]:
185 results = []
186 if self.can_update:
187 # public install-neutral capabilities and Home Assistant features
188 results.append("INSTALL")
189 results.append("PROGRESS")
190 if self.release_detail and self.release_detail.notes_url:
191 results.append("RELEASE_NOTES")
192 return results
194 @property
195 def title(self) -> str:
196 if self.title_template:
197 return self.title_template.format(discovery=self)
198 return self.name
200 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]:
201 results: dict[str, str | list | dict | bool | int | None] = {
202 "name": self.name,
203 "node": self.node,
204 "provider": {"source_type": self.provider.source_type},
205 "first_scan": {"timestamp": timestamp(self.first_timestamp)},
206 "last_scan": {"timestamp": timestamp(self.last_timestamp), "session": self.session, "throttled": self.throttled},
207 "scan_count": self.scan_count,
208 "installed_version": self.current_version,
209 "latest_version": self.latest_version,
210 "version_basis": self.version_basis,
211 "title": self.title,
212 "can_update": self.can_update,
213 "can_build": self.can_build,
214 "can_restart": self.can_restart,
215 "device_icon": self.device_icon,
216 "update_type": self.update_type,
217 "status": self.status,
218 "features": self.features,
219 "entity_picture_url": self.entity_picture_url,
220 "update_policy": str(self.update_policy),
221 "publish_policy": str(self.publish_policy),
222 "version_policy": str(self.version_policy),
223 "update": {"last_attempt": timestamp(self.update_last_attempt), "in_progress": False},
224 "installation_detail": self.installation_detail.as_dict() if self.installation_detail else None,
225 "current_detail": self.current_detail.as_dict() if self.current_detail else None,
226 "latest_detail": self.latest_detail.as_dict() if self.latest_detail else None,
227 }
228 if self.release_detail:
229 results["release"] = self.release_detail.as_dict() if self.release_detail else None
230 if self.custom:
231 results[self.source_type] = self.custom
232 return results
235class ReleaseProvider:
236 """Abstract base class for release providers, such as container scanners or package managers API calls"""
238 def __init__(self, node_cfg: NodeConfig, source_type: str = "base") -> None:
239 self.source_type: str = source_type
240 self.discoveries: dict[str, Discovery] = {}
241 self.node_cfg: NodeConfig = node_cfg
242 self.log: Any = structlog.get_logger().bind(integration=self.source_type)
243 self.stopped = Event()
245 def initialize(self) -> None:
246 """Initialize any loops or background tasks, make any startup API calls"""
247 pass
249 def stop(self) -> None:
250 """Stop any loops or background tasks"""
251 self.log.info("Asking release provider to stop", source_type=self.source_type)
252 self.stopped.set()
254 def __str__(self) -> str:
255 """Stringify"""
256 return f"{self.source_type} Discovery"
258 @abstractmethod
259 def update(self, discovery: Discovery) -> bool:
260 """Attempt to update the component version"""
262 @abstractmethod
263 def rescan(self, discovery: Discovery) -> Discovery | None:
264 """Rescan a previously discovered component"""
266 @abstractmethod
267 async def scan(self, session: str) -> AsyncGenerator[Discovery]:
268 """Scan for components to monitor"""
269 # force recognition as an async generator
270 if False:
271 yield 0 # type: ignore[unreachable]
273 @abstractmethod
274 def command(self, discovery_name: str, command: str, on_update_start: Callable, on_update_end: Callable) -> bool:
275 """Execute a command on a discovered component"""
277 @abstractmethod
278 def resolve(self, discovery_name: str) -> Discovery | None:
279 """Resolve a discovered component by name"""