Coverage for src / updates2mqtt / model.py: 97%

138 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 23:58 +0000

1import datetime as dt 

2import json 

3import time 

4from abc import abstractmethod 

5from collections.abc import AsyncGenerator, Callable 

6from threading import Event 

7from typing import Any 

8 

9import structlog 

10from tzlocal import get_localzone 

11 

12from updates2mqtt.config import NodeConfig, PublishPolicy, UpdatePolicy, VersionPolicy 

13from updates2mqtt.helpers import sanitize_name, timestamp 

14 

15 

16class DiscoveryDetail: 

17 def __init__(self) -> None: 

18 self.captured: dt.datetime = dt.datetime.now(tz=get_localzone()) 

19 

20 @abstractmethod 

21 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]: 

22 return {} 

23 

24 def __str__(self) -> str: 

25 """Log friendly""" 

26 return ",".join(f"{k}:{v}" for k, v in self.as_dict().items()) 

27 

28 

29class DiscoveryArtefactDetail(DiscoveryDetail): 

30 """Provider specific detail""" 

31 

32 def __init__(self) -> None: 

33 super().__init__() 

34 

35 

36class DiscoveryInstallationDetail(DiscoveryDetail): 

37 """Provider specific detail""" 

38 

39 def __init__(self) -> None: 

40 super().__init__() 

41 

42 

43class ReleaseDetail(DiscoveryDetail): 

44 """The artefact source details 

45 

46 Note this may be an actual software package, or the source details of the wrapping of it 

47 For example, some Docker images report the main source repo, and others where the Dockerfile deploy project lives 

48 """ 

49 

50 def __init__(self, notes_url: str | None = None, summary: str | None = None) -> None: 

51 super().__init__() 

52 self.source_platform: str | None = None 

53 self.source_repo_url: str | None = None 

54 self.source_url: str | None = None 

55 self.version: str | None = None 

56 self.revision: str | None = None 

57 self.diff_url: str | None = None 

58 self.notes_url: str | None = notes_url 

59 self.title: str | None = None 

60 self.summary: str | None = summary 

61 self.net_score: int | None = None 

62 

63 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]: 

64 return { 

65 "captured": self.captured.isoformat(), 

66 "title": self.title, 

67 "version": self.version, 

68 "source_platform": self.source_platform, 

69 "source_repo": self.source_repo_url, 

70 "source": self.source_url, 

71 "revision": self.revision, 

72 "diff_url": self.diff_url, 

73 "notes_url": self.notes_url, 

74 "summary": self.summary, 

75 "net_score": str(self.net_score) if self.net_score is not None else None, 

76 } 

77 

78 

79class Discovery: 

80 """Discovered component from a scan""" 

81 

82 def __init__( 

83 self, 

84 provider: "ReleaseProvider", 

85 name: str, 

86 session: str, 

87 node: str, 

88 entity_picture_url: str | None = None, 

89 current_version: str | None = None, 

90 latest_version: str | None = None, 

91 can_build: bool = False, 

92 can_restart: bool = False, 

93 can_pull: bool = False, 

94 status: str = "on", 

95 publish_policy: PublishPolicy = PublishPolicy.HOMEASSISTANT, 

96 update_type: str | None = "Update", 

97 update_policy: UpdatePolicy = UpdatePolicy.PASSIVE, 

98 version_policy: VersionPolicy = VersionPolicy.AUTO, 

99 version_basis: str | None = None, 

100 title_template: str = "{discovery.update_type} for {discovery.name} on {discovery.node}", 

101 device_icon: str | None = None, 

102 custom: dict[str, Any] | None = None, 

103 throttled: bool = False, 

104 previous: "Discovery|None" = None, 

105 release_detail: ReleaseDetail | None = None, 

106 installation_detail: DiscoveryInstallationDetail | None = None, 

107 current_detail: DiscoveryArtefactDetail | None = None, 

108 latest_detail: DiscoveryArtefactDetail | None = None, 

109 ) -> None: 

110 self.provider: ReleaseProvider = provider 

111 self.source_type: str = provider.source_type 

112 self.session: str = session 

113 self.name: str = sanitize_name(name) 

114 self.node: str = node 

115 self.entity_picture_url: str | None = entity_picture_url 

116 self.current_version: str | None = current_version 

117 self.latest_version: str | None = latest_version 

118 self.can_pull: bool = can_pull 

119 self.can_build: bool = can_build 

120 self.can_restart: bool = can_restart 

121 self.title_template: str | None = title_template 

122 self.device_icon: str | None = device_icon 

123 self.update_type: str | None = update_type 

124 self.status: str = status 

125 self.publish_policy: PublishPolicy = publish_policy 

126 self.update_policy: UpdatePolicy = update_policy 

127 self.version_policy: VersionPolicy = version_policy 

128 self.version_basis: str | None = version_basis 

129 self.update_last_attempt: float | None = None 

130 self.custom: dict[str, Any] = custom or {} 

131 self.throttled: bool = throttled 

132 self.scan_count: int 

133 self.first_timestamp: float 

134 self.last_timestamp: float = time.time() 

135 self.check_timestamp: float | None = time.time() 

136 self.release_detail: ReleaseDetail | None = release_detail 

137 self.current_detail: DiscoveryArtefactDetail | None = current_detail 

138 self.latest_detail: DiscoveryArtefactDetail | None = latest_detail 

139 self.installation_detail: DiscoveryInstallationDetail | None = installation_detail 

140 

141 if previous: 

142 self.update_last_attempt = previous.update_last_attempt 

143 self.first_timestamp = previous.first_timestamp 

144 self.scan_count = previous.scan_count + 1 

145 else: 

146 self.first_timestamp = time.time() 

147 self.scan_count = 1 

148 if throttled and previous: 

149 # roll forward last non-throttled check 

150 self.check_timestamp = previous.check_timestamp 

151 elif not throttled: 

152 self.check_timestamp = time.time() 

153 

154 def __repr__(self) -> str: 

155 """Build a custom string representation""" 

156 return f"Discovery('{self.name}','{self.source_type}',current={self.current_version},latest={self.latest_version})" 

157 

158 def __str__(self) -> str: 

159 """Dump the attrs""" 

160 

161 def stringify(v: Any) -> str | int | float | bool: 

162 return str(v) if not isinstance(v, (str, int, float, bool)) else v 

163 

164 dump = {k: stringify(v) for k, v in self.__dict__.items()} 

165 return json.dumps(dump) 

166 

167 @property 

168 def can_update(self) -> bool: 

169 return self.can_pull or self.can_build or self.can_restart 

170 

171 @property 

172 def features(self) -> list[str]: 

173 results = [] 

174 if self.can_update: 

175 # public install-neutral capabilities and Home Assistant features 

176 results.append("INSTALL") 

177 results.append("PROGRESS") 

178 if self.release_detail and self.release_detail.notes_url: 

179 results.append("RELEASE_NOTES") 

180 return results 

181 

182 @property 

183 def title(self) -> str: 

184 if self.title_template: 

185 return self.title_template.format(discovery=self) 

186 return self.name 

187 

188 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]: 

189 results: dict[str, str | list | dict | bool | int | None] = { 

190 "name": self.name, 

191 "node": self.node, 

192 "provider": {"source_type": self.provider.source_type}, 

193 "first_scan": {"timestamp": timestamp(self.first_timestamp)}, 

194 "last_scan": {"timestamp": timestamp(self.last_timestamp), "session": self.session, "throttled": self.throttled}, 

195 "scan_count": self.scan_count, 

196 "installed_version": self.current_version, 

197 "latest_version": self.latest_version, 

198 "version_basis": self.version_basis, 

199 "title": self.title, 

200 "can_update": self.can_update, 

201 "can_build": self.can_build, 

202 "can_restart": self.can_restart, 

203 "device_icon": self.device_icon, 

204 "update_type": self.update_type, 

205 "status": self.status, 

206 "features": self.features, 

207 "entity_picture_url": self.entity_picture_url, 

208 "update_policy": str(self.update_policy), 

209 "publish_policy": str(self.publish_policy), 

210 "version_policy": str(self.version_policy), 

211 "update": {"last_attempt": timestamp(self.update_last_attempt), "in_progress": False}, 

212 "installation_detail": self.installation_detail.as_dict() if self.installation_detail else None, 

213 "current_detail": self.current_detail.as_dict() if self.current_detail else None, 

214 "latest_detail": self.latest_detail.as_dict() if self.latest_detail else None, 

215 } 

216 if self.release_detail: 

217 results["release"] = self.release_detail.as_dict() if self.release_detail else None 

218 if self.custom: 

219 results[self.source_type] = self.custom 

220 return results 

221 

222 

223class ReleaseProvider: 

224 """Abstract base class for release providers, such as container scanners or package managers API calls""" 

225 

226 def __init__(self, node_cfg: NodeConfig, source_type: str = "base") -> None: 

227 self.source_type: str = source_type 

228 self.discoveries: dict[str, Discovery] = {} 

229 self.node_cfg: NodeConfig = node_cfg 

230 self.log: Any = structlog.get_logger().bind(integration=self.source_type) 

231 self.stopped = Event() 

232 

233 def initialize(self) -> None: 

234 """Initialize any loops or background tasks, make any startup API calls""" 

235 pass 

236 

237 def stop(self) -> None: 

238 """Stop any loops or background tasks""" 

239 self.log.info("Asking release provider to stop", source_type=self.source_type) 

240 self.stopped.set() 

241 

242 def __str__(self) -> str: 

243 """Stringify""" 

244 return f"{self.source_type} Discovery" 

245 

246 @abstractmethod 

247 def update(self, discovery: Discovery) -> bool: 

248 """Attempt to update the component version""" 

249 

250 @abstractmethod 

251 def rescan(self, discovery: Discovery) -> Discovery | None: 

252 """Rescan a previously discovered component""" 

253 

254 @abstractmethod 

255 async def scan(self, session: str) -> AsyncGenerator[Discovery]: 

256 """Scan for components to monitor""" 

257 # force recognition as an async generator 

258 if False: 

259 yield 0 # type: ignore[unreachable] 

260 

261 @abstractmethod 

262 def command(self, discovery_name: str, command: str, on_update_start: Callable, on_update_end: Callable) -> bool: 

263 """Execute a command on a discovered component""" 

264 

265 @abstractmethod 

266 def resolve(self, discovery_name: str) -> Discovery | None: 

267 """Resolve a discovered component by name"""