Coverage for src / updates2mqtt / model.py: 97%

142 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-20 23:13 +0000

1import datetime as dt 

2import json 

3import time 

4from abc import abstractmethod 

5from collections.abc import AsyncGenerator, Callable 

6from threading import Event 

7from typing import Any 

8 

9import structlog 

10from tzlocal import get_localzone 

11 

12from updates2mqtt.config import NodeConfig, PublishPolicy, UpdatePolicy, VersionPolicy 

13from updates2mqtt.helpers import sanitize_name, timestamp 

14 

15 

16class DiscoveryDetail: 

17 def __init__(self) -> None: 

18 self.captured: dt.datetime = dt.datetime.now(tz=get_localzone()) 

19 

20 @abstractmethod 

21 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]: 

22 return {} 

23 

24 def __str__(self) -> str: 

25 """Log friendly""" 

26 return ",".join(f"{k}:{v}" for k, v in self.as_dict().items()) 

27 

28 

29class DiscoveryArtefactDetail(DiscoveryDetail): 

30 """Provider specific detail""" 

31 

32 def __init__(self) -> None: 

33 super().__init__() 

34 

35 

36class DiscoveryInstallationDetail(DiscoveryDetail): 

37 """Provider specific detail""" 

38 

39 def __init__(self) -> None: 

40 super().__init__() 

41 

42 

43class ReleaseDetail(DiscoveryDetail): 

44 """The artefact source details 

45 

46 Note this may be an actual software package, or the source details of the wrapping of it 

47 For example, some Docker images report the main source repo, and others where the Dockerfile deploy project lives 

48 """ 

49 

50 def __init__( 

51 self, 

52 name: str | None = None, 

53 notes_url: str | None = None, 

54 summary: str | None = None, 

55 source_repo_url: str | None = None, 

56 ) -> None: 

57 super().__init__() 

58 self.name = name 

59 self.source_platform: str | None = None 

60 self.source_repo_url: str | None = source_repo_url 

61 self.source_url: str | None = None 

62 self.version: str | None = None 

63 self.revision: str | None = None 

64 self.diff_url: str | None = None 

65 self.notes_url: str | None = notes_url 

66 self.title: str | None = None 

67 self.summary: str | None = summary 

68 self.net_score: int | None = None 

69 

70 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]: 

71 if not self.summary and self.diff_url: 

72 summary: str | None = f"<a href='{self.diff_url}'>{self.version or self.revision} Diff</a>" 

73 else: 

74 summary = self.summary 

75 

76 return { 

77 "captured": self.captured.isoformat(), 

78 "title": self.title, 

79 "version": self.version, 

80 "source_platform": self.source_platform, 

81 "source_repo": self.source_repo_url, 

82 "source": self.source_url, 

83 "revision": self.revision, 

84 "diff_url": self.diff_url, 

85 "notes_url": self.notes_url, 

86 "summary": summary, 

87 "net_score": str(self.net_score) if self.net_score is not None else None, 

88 } 

89 

90 

91class Discovery: 

92 """Discovered component from a scan""" 

93 

94 def __init__( 

95 self, 

96 provider: "ReleaseProvider", 

97 name: str, 

98 session: str, 

99 node: str, 

100 entity_picture_url: str | None = None, 

101 current_version: str | None = None, 

102 latest_version: str | None = None, 

103 can_build: bool = False, 

104 can_restart: bool = False, 

105 can_pull: bool = False, 

106 status: str = "on", 

107 publish_policy: PublishPolicy = PublishPolicy.HOMEASSISTANT, 

108 update_type: str | None = "Update", 

109 update_policy: UpdatePolicy = UpdatePolicy.PASSIVE, 

110 version_policy: VersionPolicy = VersionPolicy.AUTO, 

111 version_basis: str | None = None, 

112 title_template: str = "{discovery.update_type} for {discovery.name} on {discovery.node}", 

113 device_icon: str | None = None, 

114 custom: dict[str, Any] | None = None, 

115 throttled: bool = False, 

116 previous: "Discovery|None" = None, 

117 release_detail: ReleaseDetail | None = None, 

118 installation_detail: DiscoveryInstallationDetail | None = None, 

119 current_detail: DiscoveryArtefactDetail | None = None, 

120 latest_detail: DiscoveryArtefactDetail | None = None, 

121 ) -> None: 

122 self.provider: ReleaseProvider = provider 

123 self.source_type: str = provider.source_type 

124 self.session: str = session 

125 self.name: str = sanitize_name(name) 

126 self.node: str = node 

127 self.entity_picture_url: str | None = entity_picture_url 

128 self.current_version: str | None = current_version 

129 self.latest_version: str | None = latest_version 

130 self.can_pull: bool = can_pull 

131 self.can_build: bool = can_build 

132 self.can_restart: bool = can_restart 

133 self.title_template: str | None = title_template 

134 self.device_icon: str | None = device_icon 

135 self.update_type: str | None = update_type 

136 self.status: str = status 

137 self.publish_policy: PublishPolicy = publish_policy 

138 self.update_policy: UpdatePolicy = update_policy 

139 self.version_policy: VersionPolicy = version_policy 

140 self.version_basis: str | None = version_basis 

141 self.update_last_attempt: float | None = None 

142 self.custom: dict[str, Any] = custom or {} 

143 self.throttled: bool = throttled 

144 self.scan_count: int 

145 self.first_timestamp: float 

146 self.last_timestamp: float = time.time() 

147 self.check_timestamp: float | None = time.time() 

148 self.release_detail: ReleaseDetail | None = release_detail 

149 self.current_detail: DiscoveryArtefactDetail | None = current_detail 

150 self.latest_detail: DiscoveryArtefactDetail | None = latest_detail 

151 self.installation_detail: DiscoveryInstallationDetail | None = installation_detail 

152 

153 if previous: 

154 self.update_last_attempt = previous.update_last_attempt 

155 self.first_timestamp = previous.first_timestamp 

156 self.scan_count = previous.scan_count + 1 

157 else: 

158 self.first_timestamp = time.time() 

159 self.scan_count = 1 

160 if throttled and previous: 

161 # roll forward last non-throttled check 

162 self.check_timestamp = previous.check_timestamp 

163 elif not throttled: 

164 self.check_timestamp = time.time() 

165 

166 def __repr__(self) -> str: 

167 """Build a custom string representation""" 

168 return f"Discovery('{self.name}','{self.source_type}',current={self.current_version},latest={self.latest_version})" 

169 

170 def __str__(self) -> str: 

171 """Dump the attrs""" 

172 

173 def stringify(v: Any) -> str | int | float | bool: 

174 return str(v) if not isinstance(v, (str, int, float, bool)) else v 

175 

176 dump = {k: stringify(v) for k, v in self.__dict__.items()} 

177 return json.dumps(dump) 

178 

179 @property 

180 def can_update(self) -> bool: 

181 return self.can_pull or self.can_build or self.can_restart 

182 

183 @property 

184 def features(self) -> list[str]: 

185 results = [] 

186 if self.can_update: 

187 # public install-neutral capabilities and Home Assistant features 

188 results.append("INSTALL") 

189 results.append("PROGRESS") 

190 if self.release_detail and self.release_detail.notes_url: 

191 results.append("RELEASE_NOTES") 

192 return results 

193 

194 @property 

195 def title(self) -> str: 

196 if self.title_template: 

197 return self.title_template.format(discovery=self) 

198 return self.name 

199 

200 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]: 

201 results: dict[str, str | list | dict | bool | int | None] = { 

202 "name": self.name, 

203 "node": self.node, 

204 "provider": {"source_type": self.provider.source_type}, 

205 "first_scan": {"timestamp": timestamp(self.first_timestamp)}, 

206 "last_scan": {"timestamp": timestamp(self.last_timestamp), "session": self.session, "throttled": self.throttled}, 

207 "scan_count": self.scan_count, 

208 "installed_version": self.current_version, 

209 "latest_version": self.latest_version, 

210 "version_basis": self.version_basis, 

211 "title": self.title, 

212 "can_update": self.can_update, 

213 "can_build": self.can_build, 

214 "can_restart": self.can_restart, 

215 "device_icon": self.device_icon, 

216 "update_type": self.update_type, 

217 "status": self.status, 

218 "features": self.features, 

219 "entity_picture_url": self.entity_picture_url, 

220 "update_policy": str(self.update_policy), 

221 "publish_policy": str(self.publish_policy), 

222 "version_policy": str(self.version_policy), 

223 "update": {"last_attempt": timestamp(self.update_last_attempt), "in_progress": False}, 

224 "installation_detail": self.installation_detail.as_dict() if self.installation_detail else None, 

225 "current_detail": self.current_detail.as_dict() if self.current_detail else None, 

226 "latest_detail": self.latest_detail.as_dict() if self.latest_detail else None, 

227 } 

228 if self.release_detail: 

229 results["release"] = self.release_detail.as_dict() if self.release_detail else None 

230 if self.custom: 

231 results[self.source_type] = self.custom 

232 return results 

233 

234 

235class ReleaseProvider: 

236 """Abstract base class for release providers, such as container scanners or package managers API calls""" 

237 

238 def __init__(self, node_cfg: NodeConfig, source_type: str = "base") -> None: 

239 self.source_type: str = source_type 

240 self.discoveries: dict[str, Discovery] = {} 

241 self.node_cfg: NodeConfig = node_cfg 

242 self.log: Any = structlog.get_logger().bind(integration=self.source_type) 

243 self.stopped = Event() 

244 

245 def initialize(self) -> None: 

246 """Initialize any loops or background tasks, make any startup API calls""" 

247 pass 

248 

249 def stop(self) -> None: 

250 """Stop any loops or background tasks""" 

251 self.log.info("Asking release provider to stop", source_type=self.source_type) 

252 self.stopped.set() 

253 

254 def __str__(self) -> str: 

255 """Stringify""" 

256 return f"{self.source_type} Discovery" 

257 

258 @abstractmethod 

259 def update(self, discovery: Discovery) -> bool: 

260 """Attempt to update the component version""" 

261 

262 @abstractmethod 

263 def rescan(self, discovery: Discovery) -> Discovery | None: 

264 """Rescan a previously discovered component""" 

265 

266 @abstractmethod 

267 async def scan(self, session: str) -> AsyncGenerator[Discovery]: 

268 """Scan for components to monitor""" 

269 # force recognition as an async generator 

270 if False: 

271 yield 0 # type: ignore[unreachable] 

272 

273 @abstractmethod 

274 def command(self, discovery_name: str, command: str, on_update_start: Callable, on_update_end: Callable) -> bool: 

275 """Execute a command on a discovered component""" 

276 

277 @abstractmethod 

278 def resolve(self, discovery_name: str) -> Discovery | None: 

279 """Resolve a discovered component by name"""