Coverage for src / updates2mqtt / model.py: 96%

143 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-20 02:29 +0000

1import datetime as dt 

2import json 

3import re 

4import time 

5from abc import abstractmethod 

6from collections.abc import AsyncGenerator, Callable 

7from enum import StrEnum 

8from threading import Event 

9from typing import Any 

10 

11import structlog 

12from tzlocal import get_localzone 

13 

14from updates2mqtt.config import NO_KNOWN_IMAGE, NodeConfig, PublishPolicy, Selector, UpdatePolicy 

15 

16 

17def timestamp(time_value: float | None) -> str | None: 

18 if time_value is None: 

19 return None 

20 try: 

21 return dt.datetime.fromtimestamp(time_value, tz=get_localzone()).isoformat() 

22 except: # noqa: E722 

23 return None 

24 

25 

26class VersionPolicy(StrEnum): 

27 AUTO = "AUTO" 

28 VERSION = "VERSION" 

29 DIGEST = "DIGEST" 

30 VERSION_DIGEST = "VERSION_DIGEST" 

31 

32 

33class Discovery: 

34 """Discovered component from a scan""" 

35 

36 def __init__( 

37 self, 

38 provider: "ReleaseProvider", 

39 name: str, 

40 session: str, 

41 node: str, 

42 entity_picture_url: str | None = None, 

43 current_version: str | None = None, 

44 latest_version: str | None = None, 

45 can_update: bool = False, 

46 can_build: bool = False, 

47 can_restart: bool = False, 

48 status: str = "on", 

49 publish_policy: PublishPolicy = PublishPolicy.HOMEASSISTANT, 

50 update_type: str | None = "Update", 

51 update_policy: UpdatePolicy = UpdatePolicy.PASSIVE, 

52 version_policy: VersionPolicy = VersionPolicy.AUTO, 

53 release_url: str | None = None, 

54 release_summary: str | None = None, 

55 title_template: str = "{discovery.update_type} for {discovery.name} on {discovery.node}", 

56 device_icon: str | None = None, 

57 custom: dict[str, Any] | None = None, 

58 features: list[str] | None = None, 

59 throttled: bool = False, 

60 previous: "Discovery|None" = None, 

61 ) -> None: 

62 self.provider: ReleaseProvider = provider 

63 self.source_type: str = provider.source_type 

64 self.session: str = session 

65 self.name: str = name 

66 self.node: str = node 

67 self.entity_picture_url: str | None = entity_picture_url 

68 self.current_version: str | None = current_version 

69 self.latest_version: str | None = latest_version 

70 self.can_update: bool = can_update 

71 self.can_build: bool = can_build 

72 self.can_restart: bool = can_restart 

73 self.release_url: str | None = release_url 

74 self.release_summary: str | None = release_summary 

75 self.title_template: str | None = title_template 

76 self.device_icon: str | None = device_icon 

77 self.update_type: str | None = update_type 

78 self.status: str = status 

79 self.publish_policy: PublishPolicy = publish_policy 

80 self.update_policy: UpdatePolicy = update_policy 

81 self.version_policy: VersionPolicy = version_policy 

82 self.update_last_attempt: float | None = None 

83 self.custom: dict[str, Any] = custom or {} 

84 self.features: list[str] = features or [] 

85 self.throttled: bool = throttled 

86 self.scan_count: int 

87 self.first_timestamp: float 

88 self.last_timestamp: float = time.time() 

89 self.check_timestamp: float | None = time.time() 

90 

91 if previous: 

92 self.update_last_attempt = previous.update_last_attempt 

93 self.first_timestamp = previous.first_timestamp 

94 self.scan_count = previous.scan_count + 1 

95 else: 

96 self.first_timestamp = time.time() 

97 self.scan_count = 1 

98 if throttled and previous: 

99 # roll forward last non-throttled check 

100 self.check_timestamp = previous.check_timestamp 

101 elif not throttled: 

102 self.check_timestamp = time.time() 

103 

104 def __repr__(self) -> str: 

105 """Build a custom string representation""" 

106 return f"Discovery('{self.name}','{self.source_type}',current={self.current_version},latest={self.latest_version})" 

107 

108 def __str__(self) -> str: 

109 """Dump the attrs""" 

110 

111 def stringify(v: Any) -> str | int | float | bool: 

112 return str(v) if not isinstance(v, (str, int, float, bool)) else v 

113 

114 dump = {k: stringify(v) for k, v in self.__dict__.items()} 

115 return json.dumps(dump) 

116 

117 @property 

118 def title(self) -> str: 

119 if self.title_template: 

120 return self.title_template.format(discovery=self) 

121 return self.name 

122 

123 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]: 

124 return { 

125 "name": self.name, 

126 "node": self.node, 

127 "provider": {"source_type": self.provider.source_type}, 

128 "first_scan": {"timestamp": timestamp(self.first_timestamp)}, 

129 "last_scan": {"timestamp": timestamp(self.last_timestamp), "session": self.session, "throttled": self.throttled}, 

130 "scan_count": self.scan_count, 

131 "installed_version": self.current_version, 

132 "latest_version": self.latest_version, 

133 "title": self.title, 

134 "release_summary": self.release_summary, 

135 "release_url": self.release_url, 

136 "entity_picture_url": self.entity_picture_url, 

137 "can_update": self.can_update, 

138 "can_build": self.can_build, 

139 "can_restart": self.can_restart, 

140 "device_icon": self.device_icon, 

141 "update_type": self.update_type, 

142 "status": self.status, 

143 "features": self.features, 

144 "update_policy": str(self.update_policy), 

145 "publish_policy": str(self.publish_policy), 

146 "version_policy": str(self.version_policy), 

147 "update": {"last_attempt": timestamp(self.update_last_attempt), "in_progress": False}, 

148 self.source_type: self.custom, 

149 } 

150 

151 

152class ReleaseProvider: 

153 """Abstract base class for release providers, such as container scanners or package managers API calls""" 

154 

155 def __init__(self, node_cfg: NodeConfig, source_type: str = "base") -> None: 

156 self.source_type: str = source_type 

157 self.discoveries: dict[str, Discovery] = {} 

158 self.node_cfg: NodeConfig = node_cfg 

159 self.log: Any = structlog.get_logger().bind(integration=self.source_type) 

160 self.stopped = Event() 

161 

162 def initialize(self) -> None: 

163 """Initialize any loops or background tasks, make any startup API calls""" 

164 pass 

165 

166 def stop(self) -> None: 

167 """Stop any loops or background tasks""" 

168 self.log.info("Asking release provider to stop", source_type=self.source_type) 

169 self.stopped.set() 

170 

171 def __str__(self) -> str: 

172 """Stringify""" 

173 return f"{self.source_type} Discovery" 

174 

175 @abstractmethod 

176 def update(self, discovery: Discovery) -> bool: 

177 """Attempt to update the component version""" 

178 

179 @abstractmethod 

180 def rescan(self, discovery: Discovery) -> Discovery | None: 

181 """Rescan a previously discovered component""" 

182 

183 @abstractmethod 

184 async def scan(self, session: str) -> AsyncGenerator[Discovery]: 

185 """Scan for components to monitor""" 

186 # force recognition as an async generator 

187 if False: 

188 yield 0 # type: ignore[unreachable] 

189 

190 @abstractmethod 

191 def command(self, discovery_name: str, command: str, on_update_start: Callable, on_update_end: Callable) -> bool: 

192 """Execute a command on a discovered component""" 

193 

194 @abstractmethod 

195 def resolve(self, discovery_name: str) -> Discovery | None: 

196 """Resolve a discovered component by name""" 

197 

198 

199class Selection: 

200 def __init__(self, selector: Selector, value: str | None) -> None: 

201 self.result: bool = True 

202 self.matched: str | None = None 

203 if value is None: 

204 self.result = selector.include is None 

205 return 

206 if selector.exclude is not None: 

207 self.result = True 

208 if any(re.search(pat, value) for pat in selector.exclude): 

209 self.matched = value 

210 self.result = False 

211 if selector.include is not None: 

212 self.result = False 

213 if any(re.search(pat, value) for pat in selector.include): 

214 self.matched = value 

215 self.result = True 

216 

217 def __bool__(self) -> bool: 

218 """Expose the actual boolean so objects can be appropriately truthy""" 

219 return self.result 

220 

221 

222VERSION_RE = r"[vVr]?[0-9]+(\.[0-9]+)*" 

223# source: https://semver.org/#is-there-a-suggested-regular-expression-regex-to-check-a-semver-string 

224SEMVER_RE = r"^(?P<major>0|[1-9]\d*)\.(?P<minor>0|[1-9]\d*)\.(?P<patch>0|[1-9]\d*)(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$" # noqa: E501 

225 

226 

227def select_version( 

228 version_policy: VersionPolicy, 

229 version: str | None, 

230 digest: str | None, 

231 other_version: str | None = None, 

232 other_digest: str | None = None, 

233) -> str: 

234 """Pick the best version string to display based on the version policy and available data 

235 

236 Falls back to digest if version not reliable or not consistent with current/available version 

237 """ 

238 if version_policy == VersionPolicy.VERSION and version: 

239 return version 

240 if version_policy == VersionPolicy.DIGEST and digest and digest != NO_KNOWN_IMAGE: 

241 return digest 

242 if version_policy == VersionPolicy.VERSION_DIGEST and version and digest and digest != NO_KNOWN_IMAGE: 

243 return f"{version}:{digest}" 

244 # AUTO or fallback 

245 if version_policy == VersionPolicy.AUTO and version and re.match(VERSION_RE, version or ""): 

246 # Smells like semver 

247 if other_version is None and other_digest is None: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 return version 

249 if any((re.match(VERSION_RE, other_version or ""), re.match(SEMVER_RE, other_version or ""))) and ( 

250 (version == other_version and digest == other_digest) or (version != other_version and digest != other_digest) 

251 ): 

252 # Only semver if versions and digest consistently same or different 

253 return version 

254 

255 if ( 255 ↛ 261line 255 didn't jump to line 261 because the condition on line 255 was never true

256 version 

257 and digest 

258 and digest != NO_KNOWN_IMAGE 

259 and ((other_digest is None and other_version is None) or (other_digest is not None and other_version is not None)) 

260 ): 

261 return f"{version}:{digest}" 

262 if version and other_version: 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true

263 return version 

264 if digest and digest != NO_KNOWN_IMAGE: 

265 return digest 

266 

267 return other_digest or NO_KNOWN_IMAGE