Coverage for src / updates2mqtt / model.py: 97%
138 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 23:58 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 23:58 +0000
1import datetime as dt
2import json
3import time
4from abc import abstractmethod
5from collections.abc import AsyncGenerator, Callable
6from threading import Event
7from typing import Any
9import structlog
10from tzlocal import get_localzone
12from updates2mqtt.config import NodeConfig, PublishPolicy, UpdatePolicy, VersionPolicy
13from updates2mqtt.helpers import sanitize_name, timestamp
16class DiscoveryDetail:
17 def __init__(self) -> None:
18 self.captured: dt.datetime = dt.datetime.now(tz=get_localzone())
20 @abstractmethod
21 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]:
22 return {}
24 def __str__(self) -> str:
25 """Log friendly"""
26 return ",".join(f"{k}:{v}" for k, v in self.as_dict().items())
29class DiscoveryArtefactDetail(DiscoveryDetail):
30 """Provider specific detail"""
32 def __init__(self) -> None:
33 super().__init__()
36class DiscoveryInstallationDetail(DiscoveryDetail):
37 """Provider specific detail"""
39 def __init__(self) -> None:
40 super().__init__()
43class ReleaseDetail(DiscoveryDetail):
44 """The artefact source details
46 Note this may be an actual software package, or the source details of the wrapping of it
47 For example, some Docker images report the main source repo, and others where the Dockerfile deploy project lives
48 """
50 def __init__(self, notes_url: str | None = None, summary: str | None = None) -> None:
51 super().__init__()
52 self.source_platform: str | None = None
53 self.source_repo_url: str | None = None
54 self.source_url: str | None = None
55 self.version: str | None = None
56 self.revision: str | None = None
57 self.diff_url: str | None = None
58 self.notes_url: str | None = notes_url
59 self.title: str | None = None
60 self.summary: str | None = summary
61 self.net_score: int | None = None
63 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]:
64 return {
65 "captured": self.captured.isoformat(),
66 "title": self.title,
67 "version": self.version,
68 "source_platform": self.source_platform,
69 "source_repo": self.source_repo_url,
70 "source": self.source_url,
71 "revision": self.revision,
72 "diff_url": self.diff_url,
73 "notes_url": self.notes_url,
74 "summary": self.summary,
75 "net_score": str(self.net_score) if self.net_score is not None else None,
76 }
79class Discovery:
80 """Discovered component from a scan"""
82 def __init__(
83 self,
84 provider: "ReleaseProvider",
85 name: str,
86 session: str,
87 node: str,
88 entity_picture_url: str | None = None,
89 current_version: str | None = None,
90 latest_version: str | None = None,
91 can_build: bool = False,
92 can_restart: bool = False,
93 can_pull: bool = False,
94 status: str = "on",
95 publish_policy: PublishPolicy = PublishPolicy.HOMEASSISTANT,
96 update_type: str | None = "Update",
97 update_policy: UpdatePolicy = UpdatePolicy.PASSIVE,
98 version_policy: VersionPolicy = VersionPolicy.AUTO,
99 version_basis: str | None = None,
100 title_template: str = "{discovery.update_type} for {discovery.name} on {discovery.node}",
101 device_icon: str | None = None,
102 custom: dict[str, Any] | None = None,
103 throttled: bool = False,
104 previous: "Discovery|None" = None,
105 release_detail: ReleaseDetail | None = None,
106 installation_detail: DiscoveryInstallationDetail | None = None,
107 current_detail: DiscoveryArtefactDetail | None = None,
108 latest_detail: DiscoveryArtefactDetail | None = None,
109 ) -> None:
110 self.provider: ReleaseProvider = provider
111 self.source_type: str = provider.source_type
112 self.session: str = session
113 self.name: str = sanitize_name(name)
114 self.node: str = node
115 self.entity_picture_url: str | None = entity_picture_url
116 self.current_version: str | None = current_version
117 self.latest_version: str | None = latest_version
118 self.can_pull: bool = can_pull
119 self.can_build: bool = can_build
120 self.can_restart: bool = can_restart
121 self.title_template: str | None = title_template
122 self.device_icon: str | None = device_icon
123 self.update_type: str | None = update_type
124 self.status: str = status
125 self.publish_policy: PublishPolicy = publish_policy
126 self.update_policy: UpdatePolicy = update_policy
127 self.version_policy: VersionPolicy = version_policy
128 self.version_basis: str | None = version_basis
129 self.update_last_attempt: float | None = None
130 self.custom: dict[str, Any] = custom or {}
131 self.throttled: bool = throttled
132 self.scan_count: int
133 self.first_timestamp: float
134 self.last_timestamp: float = time.time()
135 self.check_timestamp: float | None = time.time()
136 self.release_detail: ReleaseDetail | None = release_detail
137 self.current_detail: DiscoveryArtefactDetail | None = current_detail
138 self.latest_detail: DiscoveryArtefactDetail | None = latest_detail
139 self.installation_detail: DiscoveryInstallationDetail | None = installation_detail
141 if previous:
142 self.update_last_attempt = previous.update_last_attempt
143 self.first_timestamp = previous.first_timestamp
144 self.scan_count = previous.scan_count + 1
145 else:
146 self.first_timestamp = time.time()
147 self.scan_count = 1
148 if throttled and previous:
149 # roll forward last non-throttled check
150 self.check_timestamp = previous.check_timestamp
151 elif not throttled:
152 self.check_timestamp = time.time()
154 def __repr__(self) -> str:
155 """Build a custom string representation"""
156 return f"Discovery('{self.name}','{self.source_type}',current={self.current_version},latest={self.latest_version})"
158 def __str__(self) -> str:
159 """Dump the attrs"""
161 def stringify(v: Any) -> str | int | float | bool:
162 return str(v) if not isinstance(v, (str, int, float, bool)) else v
164 dump = {k: stringify(v) for k, v in self.__dict__.items()}
165 return json.dumps(dump)
167 @property
168 def can_update(self) -> bool:
169 return self.can_pull or self.can_build or self.can_restart
171 @property
172 def features(self) -> list[str]:
173 results = []
174 if self.can_update:
175 # public install-neutral capabilities and Home Assistant features
176 results.append("INSTALL")
177 results.append("PROGRESS")
178 if self.release_detail and self.release_detail.notes_url:
179 results.append("RELEASE_NOTES")
180 return results
182 @property
183 def title(self) -> str:
184 if self.title_template:
185 return self.title_template.format(discovery=self)
186 return self.name
188 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]:
189 results: dict[str, str | list | dict | bool | int | None] = {
190 "name": self.name,
191 "node": self.node,
192 "provider": {"source_type": self.provider.source_type},
193 "first_scan": {"timestamp": timestamp(self.first_timestamp)},
194 "last_scan": {"timestamp": timestamp(self.last_timestamp), "session": self.session, "throttled": self.throttled},
195 "scan_count": self.scan_count,
196 "installed_version": self.current_version,
197 "latest_version": self.latest_version,
198 "version_basis": self.version_basis,
199 "title": self.title,
200 "can_update": self.can_update,
201 "can_build": self.can_build,
202 "can_restart": self.can_restart,
203 "device_icon": self.device_icon,
204 "update_type": self.update_type,
205 "status": self.status,
206 "features": self.features,
207 "entity_picture_url": self.entity_picture_url,
208 "update_policy": str(self.update_policy),
209 "publish_policy": str(self.publish_policy),
210 "version_policy": str(self.version_policy),
211 "update": {"last_attempt": timestamp(self.update_last_attempt), "in_progress": False},
212 "installation_detail": self.installation_detail.as_dict() if self.installation_detail else None,
213 "current_detail": self.current_detail.as_dict() if self.current_detail else None,
214 "latest_detail": self.latest_detail.as_dict() if self.latest_detail else None,
215 }
216 if self.release_detail:
217 results["release"] = self.release_detail.as_dict() if self.release_detail else None
218 if self.custom:
219 results[self.source_type] = self.custom
220 return results
223class ReleaseProvider:
224 """Abstract base class for release providers, such as container scanners or package managers API calls"""
226 def __init__(self, node_cfg: NodeConfig, source_type: str = "base") -> None:
227 self.source_type: str = source_type
228 self.discoveries: dict[str, Discovery] = {}
229 self.node_cfg: NodeConfig = node_cfg
230 self.log: Any = structlog.get_logger().bind(integration=self.source_type)
231 self.stopped = Event()
233 def initialize(self) -> None:
234 """Initialize any loops or background tasks, make any startup API calls"""
235 pass
237 def stop(self) -> None:
238 """Stop any loops or background tasks"""
239 self.log.info("Asking release provider to stop", source_type=self.source_type)
240 self.stopped.set()
242 def __str__(self) -> str:
243 """Stringify"""
244 return f"{self.source_type} Discovery"
246 @abstractmethod
247 def update(self, discovery: Discovery) -> bool:
248 """Attempt to update the component version"""
250 @abstractmethod
251 def rescan(self, discovery: Discovery) -> Discovery | None:
252 """Rescan a previously discovered component"""
254 @abstractmethod
255 async def scan(self, session: str) -> AsyncGenerator[Discovery]:
256 """Scan for components to monitor"""
257 # force recognition as an async generator
258 if False:
259 yield 0 # type: ignore[unreachable]
261 @abstractmethod
262 def command(self, discovery_name: str, command: str, on_update_start: Callable, on_update_end: Callable) -> bool:
263 """Execute a command on a discovered component"""
265 @abstractmethod
266 def resolve(self, discovery_name: str) -> Discovery | None:
267 """Resolve a discovered component by name"""