Coverage for src / updates2mqtt / integrations / docker_enrich.py: 75%
245 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-20 02:29 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-20 02:29 +0000
1import re
2from typing import Any
4import structlog
5from docker.auth import resolve_repository_name
6from hishel.httpx import SyncCacheClient
7from httpx import Response
8from omegaconf import MissingMandatoryValue, OmegaConf, ValidationError
10from updates2mqtt.config import (
11 NO_KNOWN_IMAGE,
12 PKG_INFO_FILE,
13 DockerConfig,
14 DockerPackageUpdateInfo,
15 PackageUpdateInfo,
16 UpdateInfoConfig,
17)
19log = structlog.get_logger()
21SOURCE_PLATFORM_GITHUB = "GitHub"
22SOURCE_PLATFORM_CODEBERG = "CodeBerg"
23SOURCE_PLATFORMS = {SOURCE_PLATFORM_GITHUB: r"https://github.com/.*"}
24DIFF_URL_TEMPLATES = {
25 SOURCE_PLATFORM_GITHUB: "{repo}/commit/{revision}",
26}
27RELEASE_URL_TEMPLATES = {SOURCE_PLATFORM_GITHUB: "{repo}/releases/tag/{version}"}
28UNKNOWN_RELEASE_URL_TEMPLATES = {SOURCE_PLATFORM_GITHUB: "{repo}/releases"}
29MISSING_VAL = "**MISSING**"
31TOKEN_URL_TEMPLATE = "https://{auth_host}/token?scope=repository:{image_name}:pull&service={service}" # noqa: S105 # nosec
32REGISTRIES = {
33 # registry: (auth_host, api_host, service, url_template)
34 "docker.io": ("auth.docker.io", "registry-1.docker.io", "registry.docker.io", TOKEN_URL_TEMPLATE),
35 "mcr.microsoft.com": (None, "mcr.microsoft.com", "mcr.microsoft.com", TOKEN_URL_TEMPLATE),
36 "ghcr.io": ("ghcr.io", "ghcr.io", "ghcr.io", TOKEN_URL_TEMPLATE),
37 "lscr.io": ("ghcr.io", "lscr.io", "ghcr.io", TOKEN_URL_TEMPLATE),
38 "codeberg.org": ("codeberg.org", "codeberg.org", "container_registry", TOKEN_URL_TEMPLATE),
39 "registry.gitlab.com": (
40 "www.gitlab.com",
41 "registry.gitlab.com",
42 "container_registry",
43 "https://{auth_host}/jwt/auth?service={service}&scope=repository:{image_name}:pull&offline_token=true&client_id=docker",
44 ),
45}
48def id_source_platform(source: str | None) -> str | None:
49 candidates: list[str] = [platform for platform, pattern in SOURCE_PLATFORMS.items() if re.match(pattern, source or "")]
50 return candidates[0] if candidates else None
53class PackageEnricher:
54 def __init__(self, docker_cfg: DockerConfig) -> None:
55 self.pkgs: dict[str, PackageUpdateInfo] = {}
56 self.cfg: DockerConfig = docker_cfg
57 self.log: Any = structlog.get_logger().bind(integration="docker")
59 def initialize(self) -> None:
60 pass
62 def enrich(self, image_name: str | None, image_ref: str | None, log: Any) -> PackageUpdateInfo | None:
63 def match(pkg: PackageUpdateInfo) -> bool:
64 if pkg is not None and pkg.docker is not None and pkg.docker.image_name is not None: 64 ↛ 69line 64 didn't jump to line 69 because the condition on line 64 was always true
65 if image_name is not None and image_name == pkg.docker.image_name:
66 return True
67 if image_ref is not None and image_ref == pkg.docker.image_name:
68 return True
69 return False
71 if image_name is not None and image_ref is not None:
72 for pkg in self.pkgs.values():
73 if match(pkg):
74 log.debug(
75 "Found common package",
76 image_name=pkg.docker.image_name, # type: ignore [union-attr]
77 logo_url=pkg.logo_url,
78 relnotes_url=pkg.release_notes_url,
79 )
80 return pkg
81 return None
84class DefaultPackageEnricher(PackageEnricher):
85 def enrich(self, image_name: str | None, image_ref: str | None, log: Any) -> PackageUpdateInfo | None:
86 log.debug("Default pkg info", image_name=image_name, image_ref=image_ref)
87 return PackageUpdateInfo(
88 DockerPackageUpdateInfo(image_name or NO_KNOWN_IMAGE),
89 logo_url=self.cfg.default_entity_picture_url,
90 release_notes_url=None,
91 )
94class CommonPackageEnricher(PackageEnricher):
95 def initialize(self) -> None:
96 if PKG_INFO_FILE.exists(): 96 ↛ 100line 96 didn't jump to line 100 because the condition on line 96 was always true
97 log.debug("Loading common package update info", path=PKG_INFO_FILE)
98 cfg = OmegaConf.load(PKG_INFO_FILE)
99 else:
100 log.warn("No common package update info found", path=PKG_INFO_FILE)
101 cfg = OmegaConf.structured(UpdateInfoConfig)
102 try:
103 # omegaconf broken-ness on optional fields and converting to backclasses
104 self.pkgs: dict[str, PackageUpdateInfo] = {
105 pkg: PackageUpdateInfo(**pkg_cfg) for pkg, pkg_cfg in cfg.common_packages.items()
106 }
107 except (MissingMandatoryValue, ValidationError) as e:
108 log.error("Configuration error %s", e, path=PKG_INFO_FILE.as_posix())
109 raise
112class LinuxServerIOPackageEnricher(PackageEnricher):
113 def initialize(self) -> None:
114 cfg = self.cfg.discover_metadata.get("linuxserver.io")
115 if cfg is None or not cfg.enabled:
116 return
118 try:
119 with SyncCacheClient(headers=[("cache-control", f"max-age={cfg.cache_ttl}")]) as client:
120 log.debug(f"Fetching linuxserver.io metadata from API, cache_ttl={cfg.cache_ttl}")
121 response: Response = client.get(
122 "https://api.linuxserver.io/api/v1/images?include_config=false&include_deprecated=false"
123 )
124 if response.status_code != 200:
125 log.error("Failed to fetch linuxserver.io metadata, non-200 response", status_code=response.status_code)
126 return
127 api_data: Any = response.json()
128 repos: list = api_data.get("data", {}).get("repositories", {}).get("linuxserver", [])
129 except Exception:
130 log.exception("Failed to fetch linuxserver.io metadata")
131 return
133 added = 0
134 for repo in repos:
135 image_name = repo.get("name")
136 if image_name and image_name not in self.pkgs: 136 ↛ 134line 136 didn't jump to line 134 because the condition on line 136 was always true
137 self.pkgs[image_name] = PackageUpdateInfo(
138 DockerPackageUpdateInfo(f"lscr.io/linuxserver/{image_name}"),
139 logo_url=repo["project_logo"],
140 release_notes_url=f"{repo['github_url']}/releases",
141 )
142 added += 1
143 log.debug("Added linuxserver.io package", pkg=image_name)
144 log.info(f"Added {added} linuxserver.io package details")
147def fetch_url(
148 url: str,
149 cache_ttl: int = 300,
150 bearer_token: str | None = None,
151 response_type: str | None = None,
152 follow_redirects: bool = False,
153) -> Response | None:
154 try:
155 headers = [("cache-control", f"max-age={cache_ttl}")]
156 if bearer_token:
157 headers.append(("Authorization", f"Bearer {bearer_token}"))
158 if response_type:
159 headers.append(("Accept", response_type))
160 with SyncCacheClient(headers=headers, follow_redirects=follow_redirects) as client:
161 log.debug(f"Fetching URL {url}, cache_ttl={cache_ttl}")
162 response: Response = client.get(url)
163 if not response.is_success:
164 log.debug("URL %s fetch returned non-success status: %s", url, response.status_code)
165 return response
166 except Exception as e:
167 log.debug("URL %s failed to fetch: %s", url, e)
168 return None
171def validate_url(url: str, cache_ttl: int = 300) -> bool:
172 response: Response | None = fetch_url(url, cache_ttl=cache_ttl)
173 return response is not None and response.status_code != 404
176class SourceReleaseEnricher:
177 def __init__(self) -> None:
178 self.log: Any = structlog.get_logger().bind(integration="docker")
180 def record(self, results: dict[str, str], k: str, v: str | None) -> None:
181 if v is not None:
182 results[k] = v
184 def enrich(
185 self, annotations: dict[str, str], source_repo_url: str | None = None, release_url: str | None = None
186 ) -> dict[str, str]:
187 results: dict[str, str] = {}
189 self.record(results, "latest_image_created", annotations.get("org.opencontainers.image.created"))
190 self.record(results, "documentation_url", annotations.get("org.opencontainers.image.documentation"))
191 self.record(results, "description", annotations.get("org.opencontainers.image.description"))
192 self.record(results, "vendor", annotations.get("org.opencontainers.image.vendor"))
194 release_version: str | None = annotations.get("org.opencontainers.image.version")
195 self.record(results, "latest_image_version", release_version)
196 release_revision: str | None = annotations.get("org.opencontainers.image.revision")
197 self.record(results, "latest_release_revision", release_revision)
198 release_source: str | None = annotations.get("org.opencontainers.image.source") or source_repo_url
199 self.record(results, "source", release_source)
201 release_source_deep: str | None = release_source
202 if release_source and "#" in release_source:
203 release_source = release_source.split("#", 1)[0]
204 self.log.debug("Simplifying %s from %s", release_source, release_source_deep)
206 source_platform = id_source_platform(release_source)
207 if not source_platform:
208 self.log.debug("No known source platform found on container", source=release_source)
209 return results
211 results["source_platform"] = source_platform
213 template_vars: dict[str, str | None] = {
214 "version": release_version or MISSING_VAL,
215 "revision": release_revision or MISSING_VAL,
216 "repo": release_source or MISSING_VAL,
217 "source": release_source_deep or MISSING_VAL,
218 }
220 diff_url = DIFF_URL_TEMPLATES[source_platform].format(**template_vars)
221 if MISSING_VAL not in diff_url and validate_url(diff_url): 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 results["diff_url"] = diff_url
224 if release_url is None:
225 release_url = RELEASE_URL_TEMPLATES[source_platform].format(**template_vars)
227 if MISSING_VAL in release_url or not validate_url(release_url): 227 ↛ 232line 227 didn't jump to line 232 because the condition on line 227 was always true
228 release_url = UNKNOWN_RELEASE_URL_TEMPLATES[source_platform].format(**template_vars)
229 if MISSING_VAL in release_url or not validate_url(release_url): 229 ↛ 232line 229 didn't jump to line 232 because the condition on line 229 was always true
230 release_url = None
232 self.record(results, "release_url", release_url)
234 if source_platform == SOURCE_PLATFORM_GITHUB and release_source: 234 ↛ 250line 234 didn't jump to line 250 because the condition on line 234 was always true
235 base_api = release_source.replace("https://github.com", "https://api.github.com/repos")
237 api_response: Response | None = fetch_url(f"{base_api}/releases/tags/{release_version}")
238 if api_response and api_response.is_success: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true
239 api_results: Any = httpx_json_content(api_response, {})
240 results["release_summary"] = api_results.get("body") # ty:ignore[possibly-missing-attribute]
241 reactions = api_results.get("reactions") # ty:ignore[possibly-missing-attribute]
242 if reactions:
243 results["net_score"] = reactions.get("+1", 0) - reactions.get("-1", 0)
244 else:
245 self.log.debug(
246 "Failed to fetch GitHub release info",
247 url=f"{base_api}/releases/tags/{release_version}",
248 status_code=(api_response and api_response.status_code) or None,
249 )
250 return results
253class AuthError(Exception):
254 pass
257def httpx_json_content(response: Response, default: Any = None) -> Any | None:
258 if response and "json" in response.headers.get("content-type"): 258 ↛ 263line 258 didn't jump to line 263 because the condition on line 258 was always true
259 try:
260 return response.json()
261 except Exception:
262 log.debug("Failed to parse JSON response: %s", response.text)
263 return default
266class LabelEnricher:
267 def __init__(self) -> None:
268 self.log: Any = structlog.get_logger().bind(integration="docker")
270 def fetch_token(self, registry: str, image_name: str) -> str | None:
271 logger = self.log.bind(image_name=image_name, action="auth_registry")
273 default_host: tuple[str, str, str, str] = (registry, registry, registry, TOKEN_URL_TEMPLATE)
274 auth_host: str | None = REGISTRIES.get(registry, default_host)[0]
275 if auth_host is None: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 return None
278 service: str = REGISTRIES.get(registry, default_host)[2]
279 url_template: str = REGISTRIES.get(registry, default_host)[3]
280 auth_url: str = url_template.format(auth_host=auth_host, image_name=image_name, service=service)
281 response: Response | None = fetch_url(auth_url, cache_ttl=30, follow_redirects=True)
282 if response and response.is_success: 282 ↛ 290line 282 didn't jump to line 290 because the condition on line 282 was always true
283 api_data = httpx_json_content(response, {})
284 token: str | None = api_data.get("token") if api_data else None
285 if token: 285 ↛ 287line 285 didn't jump to line 287 because the condition on line 285 was always true
286 return token
287 logger.warning("No token found in response for %s", auth_url)
288 raise AuthError(f"No token found in response for {image_name}")
290 logger.debug(
291 "Non-success response at %s fetching token: %s",
292 auth_url,
293 (response and response.status_code) or None,
294 )
295 if response and response.status_code == 404:
296 logger.debug("Default token URL %s not found, calling /v2 endpoint to validate OCI API and provoke auth", auth_url)
297 response = fetch_url(f"https://{auth_host}/v2", follow_redirects=True)
298 if response and response.status_code == 401:
299 auth = response.headers.get("www-authenticate")
300 if not auth:
301 logger.warning("No www-authenticate header found in 401 response for %s", auth_url)
302 raise AuthError(f"No www-authenticate header found on 401 for {image_name}")
303 match = re.search(r'realm="([^"]+)",service="([^"]+)",scope="([^"]+)"', auth)
304 if not match:
305 logger.warning("No realm/service/scope found in www-authenticate header for %s", auth_url)
306 raise AuthError(f"No realm/service/scope found on 401 headers for {image_name}")
308 realm, service, scope = match.groups()
309 auth_url = f"{realm}?service={service}&scope={scope}"
310 response = fetch_url(auth_url, follow_redirects=True)
311 if response and response.is_success:
312 token_data = response.json()
313 logger.debug("Fetched registry token from %s", auth_url)
314 return token_data.get("token")
315 logger.warning(
316 "Alternative auth %s with status %s has no token", auth_url, (response and response.status_code) or None
317 )
318 elif response:
319 logger.warning("Auth %s failed with status %s", auth_url, (response and response.status_code) or None)
321 raise AuthError(f"Failed to fetch token for {image_name} at {auth_url}")
323 def fetch_annotations(
324 self,
325 image_ref: str,
326 os: str,
327 arch: str,
328 token: str | None = None,
329 mutable_cache_ttl: int = 600,
330 immutable_cache_ttl: int = 86400,
331 ) -> dict[str, str]:
332 logger = self.log.bind(image_ref=image_ref, action="enrich_registry")
333 annotations: dict[str, str] = {}
334 if token: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true
335 logger.debug("Using provided token to fetch manifest for image %s", image_ref)
336 registry, ref = resolve_repository_name(image_ref)
338 img_name = ref.split(":")[0] if ":" in ref else ref
339 img_name = img_name if "/" in img_name else f"library/{img_name}"
340 if token is None: 340 ↛ 343line 340 didn't jump to line 343 because the condition on line 340 was always true
341 token = self.fetch_token(registry, img_name)
343 img_tag = ref.split(":")[1] if ":" in ref else "latest"
344 img_tag = img_tag.split("@")[0] if "@" in img_tag else img_tag
345 api_host: str | None = REGISTRIES.get(registry, (registry, registry))[1]
346 api_url: str = f"https://{api_host}/v2/{img_name}/manifests/{img_tag}"
347 response: Response | None = fetch_url(
348 api_url,
349 cache_ttl=mutable_cache_ttl,
350 bearer_token=token,
351 response_type="application/vnd.oci.image.index.v1+json",
352 )
353 if response is None: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true
354 logger.warning("Empty response for manifest for image at %s", api_url)
355 return annotations
356 if not response.is_success:
357 api_data = httpx_json_content(response, {})
358 logger.warning(
359 "Failed to fetch manifest from %s: %s",
360 api_url,
361 api_data.get("errors") if api_data else response.text,
362 )
363 return annotations
364 index = response.json()
365 logger.debug(
366 "INDEX %s manifests, %s annotations",
367 len(index.get("manifests", [])),
368 len(index.get("annotations", [])),
369 )
370 annotations = index.get("annotations", {})
371 for m in index.get("manifests", []):
372 platform_info = m.get("platform", {})
373 if platform_info.get("os") == os and platform_info.get("architecture") == arch:
374 digest = m.get("digest")
375 media_type = m.get("mediaType")
376 response = fetch_url(
377 f"https://{api_host}/v2/{img_name}/manifests/{digest}",
378 cache_ttl=immutable_cache_ttl,
379 bearer_token=token,
380 response_type=media_type,
381 )
382 if response and response.is_success: 382 ↛ 371line 382 didn't jump to line 371 because the condition on line 382 was always true
383 api_data = httpx_json_content(response, None)
384 if api_data: 384 ↛ 371line 384 didn't jump to line 371 because the condition on line 384 was always true
385 logger.debug(
386 "MANIFEST %s layers, %s annotations",
387 len(api_data.get("layers", [])),
388 len(api_data.get("annotations", [])),
389 )
390 if api_data.get("annotations"):
391 annotations.update(api_data.get("annotations", {}))
392 else:
393 logger.debug("No annotations found in manifest: %s", api_data)
395 if not annotations:
396 logger.debug("No annotations found from registry data")
397 return annotations