Coverage for src / updates2mqtt / model.py: 96%
143 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-20 02:29 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-20 02:29 +0000
1import datetime as dt
2import json
3import re
4import time
5from abc import abstractmethod
6from collections.abc import AsyncGenerator, Callable
7from enum import StrEnum
8from threading import Event
9from typing import Any
11import structlog
12from tzlocal import get_localzone
14from updates2mqtt.config import NO_KNOWN_IMAGE, NodeConfig, PublishPolicy, Selector, UpdatePolicy
17def timestamp(time_value: float | None) -> str | None:
18 if time_value is None:
19 return None
20 try:
21 return dt.datetime.fromtimestamp(time_value, tz=get_localzone()).isoformat()
22 except: # noqa: E722
23 return None
26class VersionPolicy(StrEnum):
27 AUTO = "AUTO"
28 VERSION = "VERSION"
29 DIGEST = "DIGEST"
30 VERSION_DIGEST = "VERSION_DIGEST"
33class Discovery:
34 """Discovered component from a scan"""
36 def __init__(
37 self,
38 provider: "ReleaseProvider",
39 name: str,
40 session: str,
41 node: str,
42 entity_picture_url: str | None = None,
43 current_version: str | None = None,
44 latest_version: str | None = None,
45 can_update: bool = False,
46 can_build: bool = False,
47 can_restart: bool = False,
48 status: str = "on",
49 publish_policy: PublishPolicy = PublishPolicy.HOMEASSISTANT,
50 update_type: str | None = "Update",
51 update_policy: UpdatePolicy = UpdatePolicy.PASSIVE,
52 version_policy: VersionPolicy = VersionPolicy.AUTO,
53 release_url: str | None = None,
54 release_summary: str | None = None,
55 title_template: str = "{discovery.update_type} for {discovery.name} on {discovery.node}",
56 device_icon: str | None = None,
57 custom: dict[str, Any] | None = None,
58 features: list[str] | None = None,
59 throttled: bool = False,
60 previous: "Discovery|None" = None,
61 ) -> None:
62 self.provider: ReleaseProvider = provider
63 self.source_type: str = provider.source_type
64 self.session: str = session
65 self.name: str = name
66 self.node: str = node
67 self.entity_picture_url: str | None = entity_picture_url
68 self.current_version: str | None = current_version
69 self.latest_version: str | None = latest_version
70 self.can_update: bool = can_update
71 self.can_build: bool = can_build
72 self.can_restart: bool = can_restart
73 self.release_url: str | None = release_url
74 self.release_summary: str | None = release_summary
75 self.title_template: str | None = title_template
76 self.device_icon: str | None = device_icon
77 self.update_type: str | None = update_type
78 self.status: str = status
79 self.publish_policy: PublishPolicy = publish_policy
80 self.update_policy: UpdatePolicy = update_policy
81 self.version_policy: VersionPolicy = version_policy
82 self.update_last_attempt: float | None = None
83 self.custom: dict[str, Any] = custom or {}
84 self.features: list[str] = features or []
85 self.throttled: bool = throttled
86 self.scan_count: int
87 self.first_timestamp: float
88 self.last_timestamp: float = time.time()
89 self.check_timestamp: float | None = time.time()
91 if previous:
92 self.update_last_attempt = previous.update_last_attempt
93 self.first_timestamp = previous.first_timestamp
94 self.scan_count = previous.scan_count + 1
95 else:
96 self.first_timestamp = time.time()
97 self.scan_count = 1
98 if throttled and previous:
99 # roll forward last non-throttled check
100 self.check_timestamp = previous.check_timestamp
101 elif not throttled:
102 self.check_timestamp = time.time()
104 def __repr__(self) -> str:
105 """Build a custom string representation"""
106 return f"Discovery('{self.name}','{self.source_type}',current={self.current_version},latest={self.latest_version})"
108 def __str__(self) -> str:
109 """Dump the attrs"""
111 def stringify(v: Any) -> str | int | float | bool:
112 return str(v) if not isinstance(v, (str, int, float, bool)) else v
114 dump = {k: stringify(v) for k, v in self.__dict__.items()}
115 return json.dumps(dump)
117 @property
118 def title(self) -> str:
119 if self.title_template:
120 return self.title_template.format(discovery=self)
121 return self.name
123 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]:
124 return {
125 "name": self.name,
126 "node": self.node,
127 "provider": {"source_type": self.provider.source_type},
128 "first_scan": {"timestamp": timestamp(self.first_timestamp)},
129 "last_scan": {"timestamp": timestamp(self.last_timestamp), "session": self.session, "throttled": self.throttled},
130 "scan_count": self.scan_count,
131 "installed_version": self.current_version,
132 "latest_version": self.latest_version,
133 "title": self.title,
134 "release_summary": self.release_summary,
135 "release_url": self.release_url,
136 "entity_picture_url": self.entity_picture_url,
137 "can_update": self.can_update,
138 "can_build": self.can_build,
139 "can_restart": self.can_restart,
140 "device_icon": self.device_icon,
141 "update_type": self.update_type,
142 "status": self.status,
143 "features": self.features,
144 "update_policy": str(self.update_policy),
145 "publish_policy": str(self.publish_policy),
146 "version_policy": str(self.version_policy),
147 "update": {"last_attempt": timestamp(self.update_last_attempt), "in_progress": False},
148 self.source_type: self.custom,
149 }
152class ReleaseProvider:
153 """Abstract base class for release providers, such as container scanners or package managers API calls"""
155 def __init__(self, node_cfg: NodeConfig, source_type: str = "base") -> None:
156 self.source_type: str = source_type
157 self.discoveries: dict[str, Discovery] = {}
158 self.node_cfg: NodeConfig = node_cfg
159 self.log: Any = structlog.get_logger().bind(integration=self.source_type)
160 self.stopped = Event()
162 def initialize(self) -> None:
163 """Initialize any loops or background tasks, make any startup API calls"""
164 pass
166 def stop(self) -> None:
167 """Stop any loops or background tasks"""
168 self.log.info("Asking release provider to stop", source_type=self.source_type)
169 self.stopped.set()
171 def __str__(self) -> str:
172 """Stringify"""
173 return f"{self.source_type} Discovery"
175 @abstractmethod
176 def update(self, discovery: Discovery) -> bool:
177 """Attempt to update the component version"""
179 @abstractmethod
180 def rescan(self, discovery: Discovery) -> Discovery | None:
181 """Rescan a previously discovered component"""
183 @abstractmethod
184 async def scan(self, session: str) -> AsyncGenerator[Discovery]:
185 """Scan for components to monitor"""
186 # force recognition as an async generator
187 if False:
188 yield 0 # type: ignore[unreachable]
190 @abstractmethod
191 def command(self, discovery_name: str, command: str, on_update_start: Callable, on_update_end: Callable) -> bool:
192 """Execute a command on a discovered component"""
194 @abstractmethod
195 def resolve(self, discovery_name: str) -> Discovery | None:
196 """Resolve a discovered component by name"""
199class Selection:
200 def __init__(self, selector: Selector, value: str | None) -> None:
201 self.result: bool = True
202 self.matched: str | None = None
203 if value is None:
204 self.result = selector.include is None
205 return
206 if selector.exclude is not None:
207 self.result = True
208 if any(re.search(pat, value) for pat in selector.exclude):
209 self.matched = value
210 self.result = False
211 if selector.include is not None:
212 self.result = False
213 if any(re.search(pat, value) for pat in selector.include):
214 self.matched = value
215 self.result = True
217 def __bool__(self) -> bool:
218 """Expose the actual boolean so objects can be appropriately truthy"""
219 return self.result
222VERSION_RE = r"[vVr]?[0-9]+(\.[0-9]+)*"
223# source: https://semver.org/#is-there-a-suggested-regular-expression-regex-to-check-a-semver-string
224SEMVER_RE = r"^(?P<major>0|[1-9]\d*)\.(?P<minor>0|[1-9]\d*)\.(?P<patch>0|[1-9]\d*)(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$" # noqa: E501
227def select_version(
228 version_policy: VersionPolicy,
229 version: str | None,
230 digest: str | None,
231 other_version: str | None = None,
232 other_digest: str | None = None,
233) -> str:
234 """Pick the best version string to display based on the version policy and available data
236 Falls back to digest if version not reliable or not consistent with current/available version
237 """
238 if version_policy == VersionPolicy.VERSION and version:
239 return version
240 if version_policy == VersionPolicy.DIGEST and digest and digest != NO_KNOWN_IMAGE:
241 return digest
242 if version_policy == VersionPolicy.VERSION_DIGEST and version and digest and digest != NO_KNOWN_IMAGE:
243 return f"{version}:{digest}"
244 # AUTO or fallback
245 if version_policy == VersionPolicy.AUTO and version and re.match(VERSION_RE, version or ""):
246 # Smells like semver
247 if other_version is None and other_digest is None: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 return version
249 if any((re.match(VERSION_RE, other_version or ""), re.match(SEMVER_RE, other_version or ""))) and (
250 (version == other_version and digest == other_digest) or (version != other_version and digest != other_digest)
251 ):
252 # Only semver if versions and digest consistently same or different
253 return version
255 if ( 255 ↛ 261line 255 didn't jump to line 261 because the condition on line 255 was never true
256 version
257 and digest
258 and digest != NO_KNOWN_IMAGE
259 and ((other_digest is None and other_version is None) or (other_digest is not None and other_version is not None))
260 ):
261 return f"{version}:{digest}"
262 if version and other_version: 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true
263 return version
264 if digest and digest != NO_KNOWN_IMAGE:
265 return digest
267 return other_digest or NO_KNOWN_IMAGE