Coverage for src / updates2mqtt / integrations / docker_enrich.py: 75%

245 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-20 02:29 +0000

1import re 

2from typing import Any 

3 

4import structlog 

5from docker.auth import resolve_repository_name 

6from hishel.httpx import SyncCacheClient 

7from httpx import Response 

8from omegaconf import MissingMandatoryValue, OmegaConf, ValidationError 

9 

10from updates2mqtt.config import ( 

11 NO_KNOWN_IMAGE, 

12 PKG_INFO_FILE, 

13 DockerConfig, 

14 DockerPackageUpdateInfo, 

15 PackageUpdateInfo, 

16 UpdateInfoConfig, 

17) 

18 

19log = structlog.get_logger() 

20 

21SOURCE_PLATFORM_GITHUB = "GitHub" 

22SOURCE_PLATFORM_CODEBERG = "CodeBerg" 

23SOURCE_PLATFORMS = {SOURCE_PLATFORM_GITHUB: r"https://github.com/.*"} 

24DIFF_URL_TEMPLATES = { 

25 SOURCE_PLATFORM_GITHUB: "{repo}/commit/{revision}", 

26} 

27RELEASE_URL_TEMPLATES = {SOURCE_PLATFORM_GITHUB: "{repo}/releases/tag/{version}"} 

28UNKNOWN_RELEASE_URL_TEMPLATES = {SOURCE_PLATFORM_GITHUB: "{repo}/releases"} 

29MISSING_VAL = "**MISSING**" 

30 

31TOKEN_URL_TEMPLATE = "https://{auth_host}/token?scope=repository:{image_name}:pull&service={service}" # noqa: S105 # nosec 

32REGISTRIES = { 

33 # registry: (auth_host, api_host, service, url_template) 

34 "docker.io": ("auth.docker.io", "registry-1.docker.io", "registry.docker.io", TOKEN_URL_TEMPLATE), 

35 "mcr.microsoft.com": (None, "mcr.microsoft.com", "mcr.microsoft.com", TOKEN_URL_TEMPLATE), 

36 "ghcr.io": ("ghcr.io", "ghcr.io", "ghcr.io", TOKEN_URL_TEMPLATE), 

37 "lscr.io": ("ghcr.io", "lscr.io", "ghcr.io", TOKEN_URL_TEMPLATE), 

38 "codeberg.org": ("codeberg.org", "codeberg.org", "container_registry", TOKEN_URL_TEMPLATE), 

39 "registry.gitlab.com": ( 

40 "www.gitlab.com", 

41 "registry.gitlab.com", 

42 "container_registry", 

43 "https://{auth_host}/jwt/auth?service={service}&scope=repository:{image_name}:pull&offline_token=true&client_id=docker", 

44 ), 

45} 

46 

47 

48def id_source_platform(source: str | None) -> str | None: 

49 candidates: list[str] = [platform for platform, pattern in SOURCE_PLATFORMS.items() if re.match(pattern, source or "")] 

50 return candidates[0] if candidates else None 

51 

52 

53class PackageEnricher: 

54 def __init__(self, docker_cfg: DockerConfig) -> None: 

55 self.pkgs: dict[str, PackageUpdateInfo] = {} 

56 self.cfg: DockerConfig = docker_cfg 

57 self.log: Any = structlog.get_logger().bind(integration="docker") 

58 

59 def initialize(self) -> None: 

60 pass 

61 

62 def enrich(self, image_name: str | None, image_ref: str | None, log: Any) -> PackageUpdateInfo | None: 

63 def match(pkg: PackageUpdateInfo) -> bool: 

64 if pkg is not None and pkg.docker is not None and pkg.docker.image_name is not None: 64 ↛ 69line 64 didn't jump to line 69 because the condition on line 64 was always true

65 if image_name is not None and image_name == pkg.docker.image_name: 

66 return True 

67 if image_ref is not None and image_ref == pkg.docker.image_name: 

68 return True 

69 return False 

70 

71 if image_name is not None and image_ref is not None: 

72 for pkg in self.pkgs.values(): 

73 if match(pkg): 

74 log.debug( 

75 "Found common package", 

76 image_name=pkg.docker.image_name, # type: ignore [union-attr] 

77 logo_url=pkg.logo_url, 

78 relnotes_url=pkg.release_notes_url, 

79 ) 

80 return pkg 

81 return None 

82 

83 

84class DefaultPackageEnricher(PackageEnricher): 

85 def enrich(self, image_name: str | None, image_ref: str | None, log: Any) -> PackageUpdateInfo | None: 

86 log.debug("Default pkg info", image_name=image_name, image_ref=image_ref) 

87 return PackageUpdateInfo( 

88 DockerPackageUpdateInfo(image_name or NO_KNOWN_IMAGE), 

89 logo_url=self.cfg.default_entity_picture_url, 

90 release_notes_url=None, 

91 ) 

92 

93 

94class CommonPackageEnricher(PackageEnricher): 

95 def initialize(self) -> None: 

96 if PKG_INFO_FILE.exists(): 96 ↛ 100line 96 didn't jump to line 100 because the condition on line 96 was always true

97 log.debug("Loading common package update info", path=PKG_INFO_FILE) 

98 cfg = OmegaConf.load(PKG_INFO_FILE) 

99 else: 

100 log.warn("No common package update info found", path=PKG_INFO_FILE) 

101 cfg = OmegaConf.structured(UpdateInfoConfig) 

102 try: 

103 # omegaconf broken-ness on optional fields and converting to backclasses 

104 self.pkgs: dict[str, PackageUpdateInfo] = { 

105 pkg: PackageUpdateInfo(**pkg_cfg) for pkg, pkg_cfg in cfg.common_packages.items() 

106 } 

107 except (MissingMandatoryValue, ValidationError) as e: 

108 log.error("Configuration error %s", e, path=PKG_INFO_FILE.as_posix()) 

109 raise 

110 

111 

112class LinuxServerIOPackageEnricher(PackageEnricher): 

113 def initialize(self) -> None: 

114 cfg = self.cfg.discover_metadata.get("linuxserver.io") 

115 if cfg is None or not cfg.enabled: 

116 return 

117 

118 try: 

119 with SyncCacheClient(headers=[("cache-control", f"max-age={cfg.cache_ttl}")]) as client: 

120 log.debug(f"Fetching linuxserver.io metadata from API, cache_ttl={cfg.cache_ttl}") 

121 response: Response = client.get( 

122 "https://api.linuxserver.io/api/v1/images?include_config=false&include_deprecated=false" 

123 ) 

124 if response.status_code != 200: 

125 log.error("Failed to fetch linuxserver.io metadata, non-200 response", status_code=response.status_code) 

126 return 

127 api_data: Any = response.json() 

128 repos: list = api_data.get("data", {}).get("repositories", {}).get("linuxserver", []) 

129 except Exception: 

130 log.exception("Failed to fetch linuxserver.io metadata") 

131 return 

132 

133 added = 0 

134 for repo in repos: 

135 image_name = repo.get("name") 

136 if image_name and image_name not in self.pkgs: 136 ↛ 134line 136 didn't jump to line 134 because the condition on line 136 was always true

137 self.pkgs[image_name] = PackageUpdateInfo( 

138 DockerPackageUpdateInfo(f"lscr.io/linuxserver/{image_name}"), 

139 logo_url=repo["project_logo"], 

140 release_notes_url=f"{repo['github_url']}/releases", 

141 ) 

142 added += 1 

143 log.debug("Added linuxserver.io package", pkg=image_name) 

144 log.info(f"Added {added} linuxserver.io package details") 

145 

146 

147def fetch_url( 

148 url: str, 

149 cache_ttl: int = 300, 

150 bearer_token: str | None = None, 

151 response_type: str | None = None, 

152 follow_redirects: bool = False, 

153) -> Response | None: 

154 try: 

155 headers = [("cache-control", f"max-age={cache_ttl}")] 

156 if bearer_token: 

157 headers.append(("Authorization", f"Bearer {bearer_token}")) 

158 if response_type: 

159 headers.append(("Accept", response_type)) 

160 with SyncCacheClient(headers=headers, follow_redirects=follow_redirects) as client: 

161 log.debug(f"Fetching URL {url}, cache_ttl={cache_ttl}") 

162 response: Response = client.get(url) 

163 if not response.is_success: 

164 log.debug("URL %s fetch returned non-success status: %s", url, response.status_code) 

165 return response 

166 except Exception as e: 

167 log.debug("URL %s failed to fetch: %s", url, e) 

168 return None 

169 

170 

171def validate_url(url: str, cache_ttl: int = 300) -> bool: 

172 response: Response | None = fetch_url(url, cache_ttl=cache_ttl) 

173 return response is not None and response.status_code != 404 

174 

175 

176class SourceReleaseEnricher: 

177 def __init__(self) -> None: 

178 self.log: Any = structlog.get_logger().bind(integration="docker") 

179 

180 def record(self, results: dict[str, str], k: str, v: str | None) -> None: 

181 if v is not None: 

182 results[k] = v 

183 

184 def enrich( 

185 self, annotations: dict[str, str], source_repo_url: str | None = None, release_url: str | None = None 

186 ) -> dict[str, str]: 

187 results: dict[str, str] = {} 

188 

189 self.record(results, "latest_image_created", annotations.get("org.opencontainers.image.created")) 

190 self.record(results, "documentation_url", annotations.get("org.opencontainers.image.documentation")) 

191 self.record(results, "description", annotations.get("org.opencontainers.image.description")) 

192 self.record(results, "vendor", annotations.get("org.opencontainers.image.vendor")) 

193 

194 release_version: str | None = annotations.get("org.opencontainers.image.version") 

195 self.record(results, "latest_image_version", release_version) 

196 release_revision: str | None = annotations.get("org.opencontainers.image.revision") 

197 self.record(results, "latest_release_revision", release_revision) 

198 release_source: str | None = annotations.get("org.opencontainers.image.source") or source_repo_url 

199 self.record(results, "source", release_source) 

200 

201 release_source_deep: str | None = release_source 

202 if release_source and "#" in release_source: 

203 release_source = release_source.split("#", 1)[0] 

204 self.log.debug("Simplifying %s from %s", release_source, release_source_deep) 

205 

206 source_platform = id_source_platform(release_source) 

207 if not source_platform: 

208 self.log.debug("No known source platform found on container", source=release_source) 

209 return results 

210 

211 results["source_platform"] = source_platform 

212 

213 template_vars: dict[str, str | None] = { 

214 "version": release_version or MISSING_VAL, 

215 "revision": release_revision or MISSING_VAL, 

216 "repo": release_source or MISSING_VAL, 

217 "source": release_source_deep or MISSING_VAL, 

218 } 

219 

220 diff_url = DIFF_URL_TEMPLATES[source_platform].format(**template_vars) 

221 if MISSING_VAL not in diff_url and validate_url(diff_url): 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 results["diff_url"] = diff_url 

223 

224 if release_url is None: 

225 release_url = RELEASE_URL_TEMPLATES[source_platform].format(**template_vars) 

226 

227 if MISSING_VAL in release_url or not validate_url(release_url): 227 ↛ 232line 227 didn't jump to line 232 because the condition on line 227 was always true

228 release_url = UNKNOWN_RELEASE_URL_TEMPLATES[source_platform].format(**template_vars) 

229 if MISSING_VAL in release_url or not validate_url(release_url): 229 ↛ 232line 229 didn't jump to line 232 because the condition on line 229 was always true

230 release_url = None 

231 

232 self.record(results, "release_url", release_url) 

233 

234 if source_platform == SOURCE_PLATFORM_GITHUB and release_source: 234 ↛ 250line 234 didn't jump to line 250 because the condition on line 234 was always true

235 base_api = release_source.replace("https://github.com", "https://api.github.com/repos") 

236 

237 api_response: Response | None = fetch_url(f"{base_api}/releases/tags/{release_version}") 

238 if api_response and api_response.is_success: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true

239 api_results: Any = httpx_json_content(api_response, {}) 

240 results["release_summary"] = api_results.get("body") # ty:ignore[possibly-missing-attribute] 

241 reactions = api_results.get("reactions") # ty:ignore[possibly-missing-attribute] 

242 if reactions: 

243 results["net_score"] = reactions.get("+1", 0) - reactions.get("-1", 0) 

244 else: 

245 self.log.debug( 

246 "Failed to fetch GitHub release info", 

247 url=f"{base_api}/releases/tags/{release_version}", 

248 status_code=(api_response and api_response.status_code) or None, 

249 ) 

250 return results 

251 

252 

253class AuthError(Exception): 

254 pass 

255 

256 

257def httpx_json_content(response: Response, default: Any = None) -> Any | None: 

258 if response and "json" in response.headers.get("content-type"): 258 ↛ 263line 258 didn't jump to line 263 because the condition on line 258 was always true

259 try: 

260 return response.json() 

261 except Exception: 

262 log.debug("Failed to parse JSON response: %s", response.text) 

263 return default 

264 

265 

266class LabelEnricher: 

267 def __init__(self) -> None: 

268 self.log: Any = structlog.get_logger().bind(integration="docker") 

269 

270 def fetch_token(self, registry: str, image_name: str) -> str | None: 

271 logger = self.log.bind(image_name=image_name, action="auth_registry") 

272 

273 default_host: tuple[str, str, str, str] = (registry, registry, registry, TOKEN_URL_TEMPLATE) 

274 auth_host: str | None = REGISTRIES.get(registry, default_host)[0] 

275 if auth_host is None: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 return None 

277 

278 service: str = REGISTRIES.get(registry, default_host)[2] 

279 url_template: str = REGISTRIES.get(registry, default_host)[3] 

280 auth_url: str = url_template.format(auth_host=auth_host, image_name=image_name, service=service) 

281 response: Response | None = fetch_url(auth_url, cache_ttl=30, follow_redirects=True) 

282 if response and response.is_success: 282 ↛ 290line 282 didn't jump to line 290 because the condition on line 282 was always true

283 api_data = httpx_json_content(response, {}) 

284 token: str | None = api_data.get("token") if api_data else None 

285 if token: 285 ↛ 287line 285 didn't jump to line 287 because the condition on line 285 was always true

286 return token 

287 logger.warning("No token found in response for %s", auth_url) 

288 raise AuthError(f"No token found in response for {image_name}") 

289 

290 logger.debug( 

291 "Non-success response at %s fetching token: %s", 

292 auth_url, 

293 (response and response.status_code) or None, 

294 ) 

295 if response and response.status_code == 404: 

296 logger.debug("Default token URL %s not found, calling /v2 endpoint to validate OCI API and provoke auth", auth_url) 

297 response = fetch_url(f"https://{auth_host}/v2", follow_redirects=True) 

298 if response and response.status_code == 401: 

299 auth = response.headers.get("www-authenticate") 

300 if not auth: 

301 logger.warning("No www-authenticate header found in 401 response for %s", auth_url) 

302 raise AuthError(f"No www-authenticate header found on 401 for {image_name}") 

303 match = re.search(r'realm="([^"]+)",service="([^"]+)",scope="([^"]+)"', auth) 

304 if not match: 

305 logger.warning("No realm/service/scope found in www-authenticate header for %s", auth_url) 

306 raise AuthError(f"No realm/service/scope found on 401 headers for {image_name}") 

307 

308 realm, service, scope = match.groups() 

309 auth_url = f"{realm}?service={service}&scope={scope}" 

310 response = fetch_url(auth_url, follow_redirects=True) 

311 if response and response.is_success: 

312 token_data = response.json() 

313 logger.debug("Fetched registry token from %s", auth_url) 

314 return token_data.get("token") 

315 logger.warning( 

316 "Alternative auth %s with status %s has no token", auth_url, (response and response.status_code) or None 

317 ) 

318 elif response: 

319 logger.warning("Auth %s failed with status %s", auth_url, (response and response.status_code) or None) 

320 

321 raise AuthError(f"Failed to fetch token for {image_name} at {auth_url}") 

322 

323 def fetch_annotations( 

324 self, 

325 image_ref: str, 

326 os: str, 

327 arch: str, 

328 token: str | None = None, 

329 mutable_cache_ttl: int = 600, 

330 immutable_cache_ttl: int = 86400, 

331 ) -> dict[str, str]: 

332 logger = self.log.bind(image_ref=image_ref, action="enrich_registry") 

333 annotations: dict[str, str] = {} 

334 if token: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 logger.debug("Using provided token to fetch manifest for image %s", image_ref) 

336 registry, ref = resolve_repository_name(image_ref) 

337 

338 img_name = ref.split(":")[0] if ":" in ref else ref 

339 img_name = img_name if "/" in img_name else f"library/{img_name}" 

340 if token is None: 340 ↛ 343line 340 didn't jump to line 343 because the condition on line 340 was always true

341 token = self.fetch_token(registry, img_name) 

342 

343 img_tag = ref.split(":")[1] if ":" in ref else "latest" 

344 img_tag = img_tag.split("@")[0] if "@" in img_tag else img_tag 

345 api_host: str | None = REGISTRIES.get(registry, (registry, registry))[1] 

346 api_url: str = f"https://{api_host}/v2/{img_name}/manifests/{img_tag}" 

347 response: Response | None = fetch_url( 

348 api_url, 

349 cache_ttl=mutable_cache_ttl, 

350 bearer_token=token, 

351 response_type="application/vnd.oci.image.index.v1+json", 

352 ) 

353 if response is None: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true

354 logger.warning("Empty response for manifest for image at %s", api_url) 

355 return annotations 

356 if not response.is_success: 

357 api_data = httpx_json_content(response, {}) 

358 logger.warning( 

359 "Failed to fetch manifest from %s: %s", 

360 api_url, 

361 api_data.get("errors") if api_data else response.text, 

362 ) 

363 return annotations 

364 index = response.json() 

365 logger.debug( 

366 "INDEX %s manifests, %s annotations", 

367 len(index.get("manifests", [])), 

368 len(index.get("annotations", [])), 

369 ) 

370 annotations = index.get("annotations", {}) 

371 for m in index.get("manifests", []): 

372 platform_info = m.get("platform", {}) 

373 if platform_info.get("os") == os and platform_info.get("architecture") == arch: 

374 digest = m.get("digest") 

375 media_type = m.get("mediaType") 

376 response = fetch_url( 

377 f"https://{api_host}/v2/{img_name}/manifests/{digest}", 

378 cache_ttl=immutable_cache_ttl, 

379 bearer_token=token, 

380 response_type=media_type, 

381 ) 

382 if response and response.is_success: 382 ↛ 371line 382 didn't jump to line 371 because the condition on line 382 was always true

383 api_data = httpx_json_content(response, None) 

384 if api_data: 384 ↛ 371line 384 didn't jump to line 371 because the condition on line 384 was always true

385 logger.debug( 

386 "MANIFEST %s layers, %s annotations", 

387 len(api_data.get("layers", [])), 

388 len(api_data.get("annotations", [])), 

389 ) 

390 if api_data.get("annotations"): 

391 annotations.update(api_data.get("annotations", {})) 

392 else: 

393 logger.debug("No annotations found in manifest: %s", api_data) 

394 

395 if not annotations: 

396 logger.debug("No annotations found from registry data") 

397 return annotations