Coverage for src / updates2mqtt / integrations / docker_enrich.py: 76%
539 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 23:58 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 23:58 +0000
1import re
2import typing
3from abc import abstractmethod
4from typing import Any, cast
6import structlog
7from docker.auth import resolve_repository_name
8from docker.models.containers import Container
9from httpx import Response
10from omegaconf import MissingMandatoryValue, OmegaConf, ValidationError
12from updates2mqtt.helpers import APIStatsCounter, CacheMetadata, ThrottledError, Throttler, fetch_url, validate_url
13from updates2mqtt.model import DiscoveryArtefactDetail, DiscoveryInstallationDetail, ReleaseDetail
15if typing.TYPE_CHECKING:
16 from docker.models.images import RegistryData
17 from omegaconf.dictconfig import DictConfig
18from http import HTTPStatus
20import docker
21import docker.errors
23from updates2mqtt.config import (
24 PKG_INFO_FILE,
25 CommonPackages,
26 DockerConfig,
27 DockerPackageUpdateInfo,
28 GitHubConfig,
29 PackageUpdateInfo,
30 RegistryConfig,
31 VersionPolicy,
32)
34log: Any = structlog.get_logger()
36SOURCE_PLATFORM_GITHUB = "GitHub"
37SOURCE_PLATFORM_CODEBERG = "CodeBerg"
38SOURCE_PLATFORM_GITLAB = "GitLab"
39SOURCE_PLATFORMS = {
40 SOURCE_PLATFORM_GITHUB: r"https://github.com/.*",
41 SOURCE_PLATFORM_GITLAB: r"https://gitlab.com/.*",
42 SOURCE_PLATFORM_CODEBERG: r"https://codeberg.org/.*",
43}
44DIFF_URL_TEMPLATES = {
45 SOURCE_PLATFORM_GITHUB: "{repo}/commit/{revision}",
46}
47RELEASE_URL_TEMPLATES = {
48 SOURCE_PLATFORM_GITHUB: "{repo}/releases/tag/{version}",
49}
50UNKNOWN_RELEASE_URL_TEMPLATES = {
51 SOURCE_PLATFORM_GITHUB: "{repo}/releases",
52 SOURCE_PLATFORM_GITLAB: "{repo}/container_registry",
53}
54MISSING_VAL = "**MISSING**"
55UNKNOWN_REGISTRY = "**UNKNOWN_REGISTRY**"
57HEADER_DOCKER_DIGEST = "docker-content-digest"
58HEADER_DOCKER_API = "docker-distribution-api-version"
60TOKEN_URL_TEMPLATE = "https://{auth_host}/token?scope=repository:{image_name}:pull&service={service}" # noqa: S105 # nosec
62REGISTRIES: dict[str, tuple[str | None, str, str, str | None, str | None]] = {
63 # registry: (auth_host, api_host, service, url_template, repo_template)
64 "docker.io": ("auth.docker.io", "registry-1.docker.io", "registry.docker.io", TOKEN_URL_TEMPLATE, None),
65 "mcr.microsoft.com": (None, "mcr.microsoft.com", "mcr.microsoft.com", None, None),
66 "quay.io": (None, "quay.io", "quay.io", TOKEN_URL_TEMPLATE, None),
67 "ghcr.io": ("ghcr.io", "ghcr.io", "ghcr.io", TOKEN_URL_TEMPLATE, "https://github.com/{image_name}"),
68 "lscr.io": ("ghcr.io", "lscr.io", "ghcr.io", TOKEN_URL_TEMPLATE, None),
69 "codeberg.org": (
70 "codeberg.org",
71 "codeberg.org",
72 "container_registry",
73 TOKEN_URL_TEMPLATE,
74 "https://codeberg.org/{image_name}",
75 ),
76 "registry.gitlab.com": (
77 "www.gitlab.com",
78 "registry.gitlab.com",
79 "container_registry",
80 "https://{auth_host}/jwt/auth?service={service}&scope=repository:{image_name}:pull&offline_token=true&client_id=docker",
81 "https://gitlab.com/{image_name}",
82 ),
83}
85# source: https://specs.opencontainers.org/distribution-spec/?v=v1.0.0#pull
86OCI_NAME_RE = r"[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*(\/[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*)*"
87OCI_TAG_RE = r"[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}"
90class DockerImageInfo(DiscoveryArtefactDetail):
91 """Normalize and shlep around the bits of an image def
93 index_name: aka index_name, e.g. ghcr.io
94 name: image ref without index name or tag, e.g. nginx, or librenms/librenms
95 tag: tag or digest
96 untagged_ref: combined index name and package name
97 """
99 def __init__(
100 self,
101 ref: str, # ref with optional index name and tag or digest, index:name:tag_or_digest
102 image_digest: str | None = None,
103 tags: list[str] | None = None,
104 attributes: dict[str, Any] | None = None,
105 annotations: dict[str, Any] | None = None,
106 platform: str | None = None, # test harness simplification
107 version: str | None = None, # test harness simplification
108 created: str | None = None,
109 ) -> None:
110 super().__init__()
111 self.ref: str = ref
112 self.version: str | None = version
113 self.image_digest: str | None = image_digest
114 self.short_digest: str | None = None
115 self.repo_digest: str | None = None # the single RepoDigest known to match registry
116 self.git_digest: str | None = None
117 self.index_name: str | None = None
118 self.name: str | None = None
119 self.tag: str | None = None
120 self.pinned_digest: str | None = None
121 # untagged ref using combined index and remote name used only for pattern matching common pkg info
122 self.untagged_ref: str | None = None # index_name/remote_name used for pkg match
123 self.tag_or_digest: str | None = None # index_name/remote_name:**tag_or_digest**
124 self.tags = tags
125 self.attributes: dict[str, Any] = attributes or {}
126 self.annotations: dict[str, Any] = annotations or {}
127 self.throttled: bool = False
128 self.origin: str | None = None
129 self.error: str | None = None
130 self.platform: str | None = platform
131 self.custom: dict[str, str | float | int | bool | None] = {}
132 self.created: str | None = created
134 self.local_build: bool = not self.repo_digests
135 self.index_name, remote_name = resolve_repository_name(ref)
137 self.name = remote_name
139 if remote_name and ":" in remote_name and ("@" not in remote_name or remote_name.index("@") > remote_name.index(":")):
140 # name:tag format
141 self.name, self.tag_or_digest = remote_name.split(":", 1)
142 self.untagged_ref = ref.split(":", 1)[0]
143 self.tag = self.tag_or_digest
145 elif remote_name and "@" in remote_name:
146 # name@digest format
147 self.name, self.tag_or_digest = remote_name.split("@", 1)
148 self.untagged_ref = ref.split("@", 1)[0]
149 self.pinned_digest = self.tag_or_digest
151 if self.tag and "@" in self.tag:
152 # name:tag@digest format
153 # for pinned tags, care only about the digest part
154 self.tag, self.tag_or_digest = self.tag.split("@", 1)
155 self.pinned_digest = self.tag_or_digest
156 if self.tag_or_digest is None:
157 self.tag_or_digest = "latest"
158 self.untagged_ref = ref
159 self.tag = self.tag_or_digest
161 if self.repo_digest is None and len(self.repo_digests) == 1:
162 # definite known RepoDigest
163 # if its ambiguous, the final version selection will handle it
164 self.repo_digest = self.repo_digests[0]
166 if self.index_name == "docker.io" and "/" not in self.name:
167 # "official Docker images have an abbreviated library/foo name"
168 self.name = f"library/{self.name}"
169 if self.name is not None and not re.match(OCI_NAME_RE, self.name): 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true
170 log.warning("Invalid OCI image name: %s", self.name)
171 if self.tag and not re.match(OCI_TAG_RE, self.tag): 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true
172 log.warning("Invalid OCI image tag: %s", self.tag)
174 if self.os and self.arch:
175 self.platform = "/".join(
176 filter(
177 None,
178 [self.os, self.arch, self.variant],
179 ),
180 )
182 if self.image_digest is not None:
183 self.image_digest = self.condense_digest(self.image_digest, short=False)
184 self.short_digest = self.condense_digest(self.image_digest) # type: ignore[arg-type]
186 @property
187 def repo_digests(self) -> list[str]:
188 if self.repo_digest:
189 return [self.repo_digest]
190 # RepoDigest in image inspect, Registry Config object
191 digests = [v.split("@", 1)[1] if "@" in v else v for v in self.attributes.get("RepoDigests", [])]
192 return digests or []
194 @property
195 def pinned(self) -> bool:
196 """Check if this is pinned and installed version consistent with pin"""
197 return bool(self.pinned_digest and self.pinned_digest in self.repo_digests)
199 @property
200 def os(self) -> str | None:
201 return self.attributes.get("Os")
203 @property
204 def arch(self) -> str | None:
205 return self.attributes.get("Architecture")
207 @property
208 def variant(self) -> str | None:
209 return self.attributes.get("Variant")
211 def condense_digest(self, digest: str, short: bool = True) -> str | None:
212 try:
213 digest = digest.split("@")[1] if "@" in digest else digest # fully qualified RepoDigest
214 if short:
215 digest = digest.split(":")[1] if ":" in digest else digest # remove digest type prefix
216 return digest[0:12]
217 return digest
218 except Exception as e:
219 log.warning("Unable to condense digest %s: %s", digest, e)
220 return None
222 def reuse(self) -> "DockerImageInfo":
223 cloned = DockerImageInfo(
224 self.ref, self.image_digest, self.tags, self.attributes, self.annotations, self.version, self.created
225 )
226 cloned.origin = "REUSED"
227 return cloned
229 def as_dict(self, minimal: bool = True) -> dict[str, str | list | dict | bool | int | None]:
230 result: dict[str, str | list | dict | bool | int | None] = {
231 "captured": self.captured.isoformat(),
232 "image_ref": self.ref,
233 "name": self.name,
234 "version": self.version,
235 "image_digest": self.image_digest,
236 "repo_digest": self.repo_digest,
237 "repo_digests": self.repo_digest,
238 "git_digest": self.git_digest,
239 "index_name": self.index_name,
240 "tag": self.tag,
241 "pinned_digest": self.pinned_digest,
242 "tag_or_digest": self.tag_or_digest,
243 "tags": self.tags,
244 "origin": self.origin,
245 "platform": self.platform,
246 "local_build": self.local_build,
247 "error": self.error,
248 "throttled": self.throttled,
249 "custom": self.custom,
250 }
251 if not minimal:
252 result["attributes"] = self.attributes
253 result["annotations"] = self.annotations
254 return result
257def id_source_platform(source: str | None) -> str | None:
258 candidates: list[str] = [platform for platform, pattern in SOURCE_PLATFORMS.items() if re.match(pattern, source or "")]
259 return candidates[0] if candidates else None
262def _select_annotation(
263 name: str, key: str, local_info: DockerImageInfo | None = None, registry_info: DockerImageInfo | None = None
264) -> dict[str, str | None]:
265 result: dict[str, str | None] = {}
266 if registry_info:
267 v: Any | None = registry_info.annotations.get(key)
268 if v is not None:
269 result[name] = v
270 elif local_info: 270 ↛ 274line 270 didn't jump to line 274 because the condition on line 270 was always true
271 v = local_info.annotations.get(key)
272 if v is not None: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 result[name] = v
274 return result
277def cherrypick_annotations(
278 local_info: DockerImageInfo, registry_info: DockerImageInfo | None
279) -> dict[str, str | float | int | bool | None]:
280 """https://github.com/opencontainers/image-spec/blob/main/annotations.md"""
281 results: dict[str, str | float | int | bool | None] = {}
282 for either_name, either_label in [
283 ("documentation_url", "org.opencontainers.image.documentation"),
284 ("description", "org.opencontainers.image.description"),
285 ("licences", "org.opencontainers.image.licenses"),
286 ("image_base", "org.opencontainers.image.base.name"),
287 ("image_created", "org.opencontainers.image.created"),
288 ("image_version", "org.opencontainers.image.version"),
289 ("image_revision", "org.opencontainers.image.revision"),
290 ("ref_name", "org.opencontainers.image.ref.name"),
291 ("title", "org.opencontainers.image.title"),
292 ("vendor", "org.opencontainers.image.vendor"),
293 ("source", "org.opencontainers.image.source"),
294 ]:
295 results.update(_select_annotation(either_name, either_label, local_info, registry_info))
296 if (
297 results.get("ref_name") == "ubuntu"
298 and local_info.name != "ubuntu"
299 and results.get("image_version")
300 and re.fullmatch(r"^2\d\.\d\d$", cast("str", results["image_version"]))
301 ):
302 log.debug(
303 "Suppressing %s base %s version leaking into image version: %s",
304 local_info.name,
305 results["ref_name"],
306 results["image_version"],
307 )
308 del results["image_version"]
309 return results
312class DockerServiceDetails(DiscoveryInstallationDetail):
313 def __init__(
314 self,
315 container_name: str | None = None,
316 compose_path: str | None = None,
317 compose_version: str | None = None,
318 compose_service: str | None = None,
319 git_repo_path: str | None = None,
320 ) -> None:
321 self.container_name: str | None = container_name
322 self.compose_path: str | None = compose_path
323 self.compose_version: str | None = compose_version
324 self.compose_service: str | None = compose_service
325 self.git_repo_path: str | None = git_repo_path
326 self.git_local_timestamp: str | None = None
328 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]:
329 results: dict[str, str | list | dict | bool | int | None] = {
330 "container_name": self.container_name,
331 "compose_path": self.compose_path,
332 "compose_service": self.compose_service,
333 "compose_version": self.compose_version,
334 }
335 if self.git_local_timestamp:
336 results["git_local_timestamp"] = self.git_local_timestamp
337 if self.git_repo_path:
338 results["git_repo_path"] = self.git_repo_path
339 return results
342class LocalContainerInfo:
343 def build_image_info(self, container: Container) -> tuple[DockerImageInfo, DockerServiceDetails]:
344 """Image contents equiv to `docker inspect image <image_ref>`"""
345 # container image can be none if someone ran `docker rmi -f`
346 # so although this could be sourced from image, like `container.image.tags[0]`
347 # use the container ref instead, which survives monkeying about with images
348 image_ref: str = container.attrs.get("Config", {}).get("Image") or ""
349 image_digest = container.attrs.get("Image")
351 image_info: DockerImageInfo = DockerImageInfo(
352 image_ref,
353 image_digest=image_digest,
354 tags=container.image.tags if container and container.image else None,
355 annotations=container.image.labels if container.image else None,
356 attributes=container.image.attrs if container.image else None,
357 )
358 service_info: DockerServiceDetails = DockerServiceDetails(
359 container.name,
360 compose_path=container.labels.get("com.docker.compose.project.working_dir"),
361 compose_service=container.labels.get("com.docker.compose.service"),
362 compose_version=container.labels.get("com.docker.compose.version"),
363 )
365 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(image_info, None)
366 # capture container labels/annotations, not image ones
367 labels = labels or {}
368 if container.image and container.image.attrs: 368 ↛ 370line 368 didn't jump to line 370 because the condition on line 368 was always true
369 image_info.created = container.image.attrs.get("Created")
370 image_info.custom = labels
371 image_info.version = cast("str|None", labels.get("image_version"))
373 return image_info, service_info
376class PackageEnricher:
377 def __init__(self, docker_cfg: DockerConfig, packages: dict[str, PackageUpdateInfo] | None = None) -> None:
378 self.pkgs: dict[str, PackageUpdateInfo] = packages or {}
379 self.cfg: DockerConfig = docker_cfg
380 self.log: Any = structlog.get_logger().bind(integration="docker")
382 def initialize(self) -> None:
383 pass
385 def enrich(self, image_info: DockerImageInfo) -> PackageUpdateInfo | None:
386 def match(pkg: PackageUpdateInfo) -> bool:
387 if pkg is not None and pkg.docker is not None and pkg.docker.image_name is not None: 387 ↛ 392line 387 didn't jump to line 392 because the condition on line 387 was always true
388 if image_info.untagged_ref is not None and image_info.untagged_ref == pkg.docker.image_name:
389 return True
390 if image_info.ref is not None and image_info.ref == pkg.docker.image_name:
391 return True
392 return False
394 if image_info.untagged_ref is not None and image_info.ref is not None: 394 ↛ 404line 394 didn't jump to line 404 because the condition on line 394 was always true
395 for pkg in self.pkgs.values():
396 if match(pkg):
397 self.log.debug(
398 "Found common package",
399 image_name=pkg.docker.image_name, # type: ignore [union-attr]
400 logo_url=pkg.logo_url,
401 relnotes_url=pkg.release_notes_url,
402 )
403 return pkg
404 return None
407class DefaultPackageEnricher(PackageEnricher):
408 def enrich(self, image_info: DockerImageInfo) -> PackageUpdateInfo | None:
409 self.log.debug("Default pkg info", image_name=image_info.untagged_ref, image_ref=image_info.ref)
410 return PackageUpdateInfo(
411 DockerPackageUpdateInfo(image_info.untagged_ref or image_info.ref, version_policy=VersionPolicy.AUTO),
412 logo_url=self.cfg.default_entity_picture_url,
413 release_notes_url=None,
414 )
417class CommonPackageEnricher(PackageEnricher):
418 def initialize(self) -> None:
419 base_cfg: DictConfig = OmegaConf.structured(CommonPackages)
420 if PKG_INFO_FILE.exists(): 420 ↛ 427line 420 didn't jump to line 427 because the condition on line 420 was always true
421 self.log.debug("Loading common package update info", path=PKG_INFO_FILE)
422 cfg: DictConfig = typing.cast("DictConfig", OmegaConf.merge(base_cfg, OmegaConf.load(PKG_INFO_FILE)))
424 OmegaConf.to_container(cfg, throw_on_missing=True)
425 OmegaConf.set_readonly(cfg, True)
426 else:
427 self.log.warn("No common package update info found", path=PKG_INFO_FILE)
428 cfg = base_cfg
429 try:
430 common_config: CommonPackages = typing.cast("CommonPackages", cfg)
431 # omegaconf broken-ness on optional fields and converting to backclasses
432 self.pkgs = common_config.common_packages
433 # self.pkgs: dict[str, PackageUpdateInfo] = {
434 # pkg: PackageUpdateInfo(**pkg_cfg) for pkg, pkg_cfg in cfg.common_packages.items() if pkg not in self.pkgs
435 # }
436 except (MissingMandatoryValue, ValidationError) as e:
437 self.log.serror("Configuration error %s", e, path=PKG_INFO_FILE.as_posix())
438 raise
441class LinuxServerIOPackageEnricher(PackageEnricher):
442 def initialize(self) -> None:
443 cfg = self.cfg.discover_metadata.get("linuxserver.io")
444 if cfg is None or not cfg.enabled:
445 return
447 self.log.debug(f"Fetching linuxserver.io metadata from API, cache_ttl={cfg.cache_ttl}")
448 response: Response | None = fetch_url(
449 "https://api.linuxserver.io/api/v1/images?include_config=false&include_deprecated=false",
450 cache_ttl=cfg.cache_ttl,
451 )
452 if response and response.is_success:
453 api_data: Any = response.json()
454 repos: list = api_data.get("data", {}).get("repositories", {}).get("linuxserver", [])
455 else:
456 return
458 added = 0
459 for repo in repos:
460 image_name = repo.get("name")
461 if image_name and image_name not in self.pkgs: 461 ↛ 459line 461 didn't jump to line 459 because the condition on line 461 was always true
462 self.pkgs[image_name] = PackageUpdateInfo(
463 DockerPackageUpdateInfo(f"lscr.io/linuxserver/{image_name}"),
464 logo_url=repo["project_logo"],
465 release_notes_url=f"{repo['github_url']}/releases",
466 )
467 added += 1
468 self.log.debug(f"Added {added} linuxserver.io package details")
471class SourceReleaseEnricher:
472 def __init__(self, gh_cfg: GitHubConfig | None = None) -> None:
473 self.log: Any = structlog.get_logger().bind(integration="docker")
474 self.gh_cfg: GitHubConfig | None = gh_cfg
476 def enrich(
477 self, registry_info: DockerImageInfo, source_repo_url: str | None = None, notes_url: str | None = None
478 ) -> ReleaseDetail | None:
479 detail = ReleaseDetail()
481 detail.notes_url = notes_url
482 detail.version = registry_info.annotations.get("org.opencontainers.image.version")
483 detail.revision = registry_info.annotations.get("org.opencontainers.image.revision")
484 # explicit source_repo_url overrides container, e.g. where container source is only the docker wrapper
485 detail.source_url = source_repo_url or registry_info.annotations.get("org.opencontainers.image.source")
487 if detail.source_url is None and registry_info is not None and registry_info.index_name is not None:
488 registry_config: tuple[str | None, str, str, str | None, str | None] | None = REGISTRIES.get(
489 registry_info.index_name
490 )
491 repo_template: str | None = registry_config[4] if registry_config else None
492 if repo_template:
493 source_url = repo_template.format(image_name=registry_info.name)
494 if validate_url(source_url, cache_ttl=86400): 494 ↛ 498line 494 didn't jump to line 498 because the condition on line 494 was always true
495 detail.source_url = source_url
496 self.log.info("Implied source from registry: %s", detail.source_url)
498 if detail.source_url is None and detail.notes_url is None and detail.revision is None and detail.version is None:
499 return None
501 if detail.source_url and "#" in detail.source_url:
502 detail.source_repo_url = detail.source_url.split("#", 1)[0]
503 self.log.debug("Simplifying %s from %s", detail.source_repo_url, detail.source_url)
504 else:
505 detail.source_repo_url = detail.source_url
507 detail.source_platform = id_source_platform(detail.source_repo_url)
508 if not detail.source_platform:
509 self.log.debug("No known source platform found on container", source=detail.source_repo_url)
510 return detail
512 template_vars: dict[str, str | None] = {
513 "version": detail.version or MISSING_VAL,
514 "revision": detail.revision or MISSING_VAL,
515 "repo": detail.source_repo_url or MISSING_VAL,
516 "source": detail.source_url or MISSING_VAL,
517 }
519 diff_url_template: str | None = DIFF_URL_TEMPLATES.get(detail.source_platform)
520 diff_url: str | None = diff_url_template.format(**template_vars) if diff_url_template else None
521 if diff_url and MISSING_VAL not in diff_url and validate_url(diff_url, cache_ttl=3600): 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true
522 detail.diff_url = diff_url
523 else:
524 diff_url = None
526 if detail.notes_url is None and detail.source_platform in RELEASE_URL_TEMPLATES:
527 platform_notes_url: str | None = RELEASE_URL_TEMPLATES[detail.source_platform].format(**template_vars)
528 if ( 528 ↛ 533line 528 didn't jump to line 533 because the condition on line 528 was never true
529 platform_notes_url
530 and MISSING_VAL not in platform_notes_url
531 and validate_url(platform_notes_url, cache_ttl=86400)
532 ):
533 self.log.debug("Setting default known release notes url: %s", platform_notes_url)
534 detail.notes_url = platform_notes_url
536 if detail.notes_url is None and detail.source_platform in UNKNOWN_RELEASE_URL_TEMPLATES:
537 platform_notes_url = UNKNOWN_RELEASE_URL_TEMPLATES[detail.source_platform].format(**template_vars)
538 if (
539 platform_notes_url
540 and MISSING_VAL not in platform_notes_url
541 and validate_url(platform_notes_url, cache_ttl=86400)
542 ):
543 self.log.debug("Setting default unknown release notes url: %s", platform_notes_url)
544 detail.notes_url = platform_notes_url
546 if detail.source_platform == SOURCE_PLATFORM_GITHUB and detail.source_repo_url and detail.version is not None:
547 access_token: str | None = self.gh_cfg.access_token if self.gh_cfg else None
548 if access_token: 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true
549 self.log.debug("Using configured bearer token (%s chars) for GitHub API", len(access_token))
550 base_api = detail.source_repo_url.replace("https://github.com", "https://api.github.com/repos")
552 api_response: Response | None = fetch_url(
553 f"{base_api}/releases/tags/{detail.version}", bearer_token=access_token, allow_stale=True
554 )
555 if api_response and api_response.status_code == 404: 555 ↛ 575line 555 didn't jump to line 575 because the condition on line 555 was always true
556 # possible that source version doesn't match release gag
557 alt_api_response: Response | None = fetch_url(f"{base_api}/releases/latest", bearer_token=access_token)
558 if alt_api_response and alt_api_response.is_success: 558 ↛ 559line 558 didn't jump to line 559 because the condition on line 558 was never true
559 alt_api_results = httpx_json_content(alt_api_response, {})
560 if alt_api_results and re.fullmatch(f"(V|v|r|R)?{detail.version}", alt_api_results.get("tag_name")):
561 self.log.info(
562 f"Matched {registry_info.name} {detail.version} to latest release {alt_api_results['tag_name']}"
563 )
564 api_response = alt_api_response
565 elif alt_api_results:
566 self.log.debug(
567 "Failed to match %s release %s on GitHub, found tag %s for name %s published at %s",
568 registry_info.name,
569 detail.version,
570 alt_api_results.get("tag_name"),
571 alt_api_results.get("name"),
572 alt_api_results.get("published_at"),
573 )
575 if api_response and api_response.is_success: 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true
576 api_results: Any = httpx_json_content(api_response, {})
577 detail.summary = api_results.get("body") # ty:ignore[possibly-missing-attribute]
578 reactions = api_results.get("reactions") # ty:ignore[possibly-missing-attribute]
579 if reactions:
580 detail.net_score = reactions.get("+1", 0) - reactions.get("-1", 0)
581 elif api_response: 581 ↛ 591line 581 didn't jump to line 591 because the condition on line 581 was always true
582 api_results = httpx_json_content(api_response, default={})
583 self.log.debug(
584 "Failed to find %s release %s on GitHub, status %s, errors; %s",
585 registry_info.name,
586 detail.version,
587 api_response.status_code,
588 api_results.get("errors"),
589 )
590 else:
591 self.log.debug(
592 "Failed to fetch GitHub release info",
593 url=f"{base_api}/releases/tags/{detail.version}",
594 status_code=(api_response and api_response.status_code) or None,
595 )
596 if not detail.summary and detail.diff_url: 596 ↛ 597line 596 didn't jump to line 597 because the condition on line 596 was never true
597 detail.summary = f"<a href='{detail.diff_url}'>{detail.version or detail.revision} Diff</a>"
598 return detail
601class AuthError(Exception):
602 pass
605def httpx_json_content(response: Response, default: Any = None) -> Any | None:
606 if response and "json" in response.headers.get("content-type", ""):
607 try:
608 return response.json()
609 except Exception:
610 log.debug("Failed to parse JSON response: %s", response.text)
611 elif response and response.headers.get("content-type", "") == "application/octet-stream":
612 # blob could return a gzip layer tarball, however assumed only index, manifest or config requested
613 try:
614 return response.json()
615 except Exception:
616 log.debug("Failed to parse assumed JSON response: %s", response.text)
617 return default
620class VersionLookup:
621 def __init__(self) -> None:
622 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup")
624 @abstractmethod
625 def lookup(self, local_image_info: DockerImageInfo, **kwargs) -> DockerImageInfo: # noqa: ANN003
626 pass
629class ContainerDistributionAPIVersionLookup(VersionLookup):
630 def __init__(self, throttler: Throttler, cfg: RegistryConfig) -> None:
631 self.throttler: Throttler = throttler
632 self.cfg: RegistryConfig = cfg
633 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup")
634 self.api_stats = APIStatsCounter()
636 def fetch_token(self, registry: str, image_name: str) -> str | None:
637 default_host: tuple[str, str, str, str, None] = (registry, registry, registry, TOKEN_URL_TEMPLATE, None)
638 auth_host: str | None = REGISTRIES.get(registry, default_host)[0]
639 if auth_host is None: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true
640 return None
642 service: str = REGISTRIES.get(registry, default_host)[2]
643 url_template: str | None = REGISTRIES.get(registry, default_host)[3]
644 auth_url: str | None = (
645 url_template.format(auth_host=auth_host, image_name=image_name, service=service) if url_template else None
646 )
647 if auth_url is None: 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true
648 return None
649 response: Response | None = fetch_url(
650 auth_url, cache_ttl=self.cfg.token_cache_ttl, follow_redirects=True, api_stats_counter=self.api_stats
651 )
653 if response and response.is_success: 653 ↛ 661line 653 didn't jump to line 661 because the condition on line 653 was always true
654 api_data = httpx_json_content(response, {})
655 token: str | None = api_data.get("token") if api_data else None
656 if token: 656 ↛ 658line 656 didn't jump to line 658 because the condition on line 656 was always true
657 return token
658 self.log.warning("No token found in response for %s", auth_url)
659 raise AuthError(f"No token found in response for {image_name}")
661 self.log.debug(
662 "Non-success response at %s fetching token: %s",
663 auth_url,
664 (response and response.status_code) or None,
665 )
666 if response and response.status_code == 404:
667 self.log.debug(
668 "Default token URL %s not found, calling /v2 endpoint to validate OCI API and provoke auth", auth_url
669 )
670 response = fetch_url(
671 f"https://{auth_host}/v2",
672 follow_redirects=True,
673 allow_stale=False,
674 cache_ttl=0,
675 api_stats_counter=self.api_stats,
676 )
678 if response and response.status_code == 401:
679 auth = response.headers.get("www-authenticate")
680 if not auth:
681 self.log.warning("No www-authenticate header found in 401 response for %s", auth_url)
682 raise AuthError(f"No www-authenticate header found on 401 for {image_name}")
683 match = re.search(r'realm="([^"]+)",service="([^"]+)",scope="([^"]+)"', auth)
684 if not match:
685 self.log.warning("No realm/service/scope found in www-authenticate header for %s", auth_url)
686 raise AuthError(f"No realm/service/scope found on 401 headers for {image_name}")
688 realm, service, scope = match.groups()
689 auth_url = f"{realm}?service={service}&scope={scope}"
690 response = fetch_url(auth_url, follow_redirects=True, api_stats_counter=self.api_stats)
692 if response and response.is_success:
693 token_data = response.json()
694 self.log.debug("Fetched registry token from %s", auth_url)
695 return token_data.get("token")
696 self.log.warning(
697 "Alternative auth %s with status %s has no token", auth_url, (response and response.status_code) or None
698 )
699 elif response:
700 self.log.warning("Auth %s failed with status %s", auth_url, (response and response.status_code) or None)
702 raise AuthError(f"Failed to fetch token for {image_name} at {auth_url}")
704 def fetch_index(
705 self, api_host: str, local_image_info: DockerImageInfo, token: str | None
706 ) -> tuple[Any | None, str | None, CacheMetadata | None]:
707 if local_image_info.tag: 707 ↛ 711line 707 didn't jump to line 711 because the condition on line 707 was always true
708 api_url: str = f"https://{api_host}/v2/{local_image_info.name}/manifests/{local_image_info.tag}"
709 cache_ttl: int | None = self.cfg.mutable_cache_ttl
710 else:
711 api_url = f"https://{api_host}/v2/{local_image_info.name}/manifests/{local_image_info.pinned_digest}"
712 cache_ttl = self.cfg.immutable_cache_ttl
714 response: Response | None = fetch_url(
715 api_url,
716 cache_ttl=cache_ttl,
717 bearer_token=token,
718 response_type=[
719 "application/vnd.oci.image.index.v1+json",
720 "application/vnd.docker.distribution.manifest.list.v2+json",
721 ],
722 api_stats_counter=self.api_stats,
723 )
725 if response is None: 725 ↛ 726line 725 didn't jump to line 726 because the condition on line 725 was never true
726 self.log.warning("Empty response for manifest for image at %s", api_url)
727 elif response.status_code == 429: 727 ↛ 728line 727 didn't jump to line 728 because the condition on line 727 was never true
728 self.throttler.throttle(local_image_info.index_name, raise_exception=True)
729 elif not response.is_success:
730 api_data = httpx_json_content(response, {})
731 self.log.warning(
732 "Failed to fetch index from %s: %s",
733 api_url,
734 api_data.get("errors") if api_data else response.text,
735 )
736 else:
737 index = response.json()
738 self.log.debug(
739 "INDEX %s manifests, %s annotations, api: %s, header digest: %s",
740 len(index.get("manifests", [])),
741 len(index.get("annotations", [])),
742 response.headers.get(HEADER_DOCKER_API, "N/A"),
743 response.headers.get(HEADER_DOCKER_DIGEST, "N/A"),
744 )
745 return index, response.headers.get(HEADER_DOCKER_DIGEST), CacheMetadata(response)
746 return None, None, None
748 def fetch_object(
749 self,
750 api_host: str,
751 local_image_info: DockerImageInfo,
752 media_type: str,
753 digest: str,
754 token: str | None,
755 follow_redirects: bool = False,
756 api_type: str = "manifests",
757 ) -> tuple[Any | None, CacheMetadata | None]:
758 api_url = f"https://{api_host}/v2/{local_image_info.name}/{api_type}/{digest}"
759 response = fetch_url(
760 api_url,
761 cache_ttl=self.cfg.immutable_cache_ttl,
762 bearer_token=token,
763 response_type=media_type,
764 allow_stale=True,
765 follow_redirects=follow_redirects,
766 api_stats_counter=self.api_stats,
767 )
769 if response and response.is_success:
770 obj = httpx_json_content(response, None)
771 if obj: 771 ↛ 798line 771 didn't jump to line 798 because the condition on line 771 was always true
772 self.log.debug(
773 "%s, header digest:%s, api: %s, %s annotations",
774 api_type.upper(),
775 response.headers.get(HEADER_DOCKER_DIGEST, "N/A"),
776 response.headers.get(HEADER_DOCKER_API, "N/A"),
777 len(obj.get("annotations", [])),
778 )
779 return obj, CacheMetadata(response)
780 elif response and response.status_code == 429: 780 ↛ 781line 780 didn't jump to line 781 because the condition on line 780 was never true
781 self.throttler.throttle(local_image_info.index_name, raise_exception=True)
782 elif response and not response.is_success: 782 ↛ 797line 782 didn't jump to line 797 because the condition on line 782 was always true
783 api_data = httpx_json_content(response, {})
784 if response: 784 ↛ 792line 784 didn't jump to line 792 because the condition on line 784 was always true
785 self.log.warning(
786 "Failed to fetch obj from %s: %s %s",
787 api_url,
788 response.status_code,
789 api_data.get("errors") if api_data else response.text,
790 )
791 else:
792 self.log.warning(
793 "Failed to fetch obj from %s: No Response, %s", api_url, api_data.get("errors") if api_data else None
794 )
796 else:
797 self.log.error("Empty response from %s", api_url)
798 return None, None
800 def lookup(
801 self,
802 local_image_info: DockerImageInfo,
803 token: str | None = None,
804 minimal: bool = False,
805 **kwargs, # noqa: ANN003, ARG002
806 ) -> DockerImageInfo:
807 result: DockerImageInfo = DockerImageInfo(local_image_info.ref)
808 if not local_image_info.name or not local_image_info.index_name: 808 ↛ 809line 808 didn't jump to line 809 because the condition on line 808 was never true
809 self.log.debug("No local pkg name or registry index name to check")
810 return result
812 if self.throttler.check_throttle(local_image_info.index_name): 812 ↛ 813line 812 didn't jump to line 813 because the condition on line 812 was never true
813 result.throttled = True
814 return result
816 if token: 816 ↛ 817line 816 didn't jump to line 817 because the condition on line 816 was never true
817 self.log.debug("Using provided token to fetch manifest for image %s", local_image_info.ref)
818 else:
819 try:
820 token = self.fetch_token(local_image_info.index_name, local_image_info.name)
821 except AuthError as e:
822 self.log.warning("Authentication error prevented Docker Registry enrichment: %s", e)
823 result.error = str(e)
824 return result
826 index: Any | None = None
827 index_digest: str | None = None # fetched from header, should be the image digest
828 index_cache_metadata: CacheMetadata | None = None
829 manifest_cache_metadata: CacheMetadata | None = None
830 config_cache_metadata: CacheMetadata | None = None
831 api_host: str | None = REGISTRIES.get(
832 local_image_info.index_name, (local_image_info.index_name, local_image_info.index_name)
833 )[1]
834 if api_host is None: 834 ↛ 835line 834 didn't jump to line 835 because the condition on line 834 was never true
835 self.log("No API host can be determined for %s", local_image_info.index_name)
836 return result
837 try:
838 index, index_digest, index_cache_metadata = self.fetch_index(api_host, local_image_info, token)
839 except ThrottledError:
840 result.throttled = True
841 index = None
843 if index:
844 result.annotations = index.get("annotations", {})
845 for m in index.get("manifests", []):
846 platform_info = m.get("platform", {})
847 if (
848 platform_info.get("os") == local_image_info.os
849 and platform_info.get("architecture") == local_image_info.arch
850 and ("Variant" not in platform_info or platform_info.get("Variant") == local_image_info.variant)
851 ):
852 if index_digest:
853 result.image_digest = index_digest
854 result.short_digest = result.condense_digest(index_digest)
855 self.log.debug("Setting %s image digest %s", result.name, result.short_digest)
857 digest: str | None = m.get("digest")
858 media_type = m.get("mediaType")
859 manifest: Any | None = None
861 if digest: 861 ↛ 869line 861 didn't jump to line 869 because the condition on line 861 was always true
862 try:
863 manifest, manifest_cache_metadata = self.fetch_object(
864 api_host, local_image_info, media_type, digest, token
865 )
866 except ThrottledError:
867 result.throttled = True
869 if manifest:
870 digest = manifest.get("config", {}).get("digest")
871 if digest is None: 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true
872 self.log.warning("Empty digest for %s %s %s", api_host, digest, media_type)
873 else:
874 result.repo_digest = result.condense_digest(digest, short=False)
875 self.log.debug("Setting %s repo digest: %s", result.name, result.repo_digest)
877 if manifest.get("annotations"):
878 result.annotations.update(manifest.get("annotations", {}))
879 else:
880 self.log.debug("No annotations found in manifest: %s", manifest)
882 if not minimal and manifest.get("config"): 882 ↛ 845line 882 didn't jump to line 845 because the condition on line 882 was always true
883 try:
884 img_config, config_cache_metadata = self.fetch_object(
885 api_host=api_host,
886 local_image_info=local_image_info,
887 media_type=manifest["config"].get("mediaType"),
888 digest=manifest["config"].get("digest"),
889 token=token,
890 follow_redirects=True,
891 api_type="blobs",
892 )
893 if img_config:
894 config = img_config.get("config") or img_config.get("Config")
895 if config and "Labels" in config: 895 ↛ 897line 895 didn't jump to line 897 because the condition on line 895 was always true
896 result.annotations.update(config.get("Labels") or {})
897 result.annotations.update(img_config.get("annotations") or {})
898 result.created = config.get("created") or config.get("Created")
899 else:
900 self.log.debug("No config found: %s", manifest)
901 except Exception as e:
902 self.log.warning("Failed to extract %s image info from config: %s", local_image_info.ref, e)
904 if not result.annotations:
905 self.log.debug("No annotations found from registry data")
907 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(local_image_info, result)
908 result.custom = labels or {}
909 if index_cache_metadata:
910 result.custom["index_cache_age"] = index_cache_metadata.age
911 if manifest_cache_metadata:
912 result.custom["manifest_cache_age"] = manifest_cache_metadata.age
913 if config_cache_metadata:
914 result.custom["config_cache_age"] = config_cache_metadata.age
915 result.version = cast("str|None", labels.get("image_version"))
916 result.origin = "OCI_V2" if not minimal else "OCI_V2_MINIMAL"
918 self.log.debug(
919 "OCI_V2 Lookup for %s: short_digest:%s, repo_digest:%s, version: %s",
920 local_image_info.name,
921 result.short_digest,
922 result.repo_digest,
923 result.version,
924 )
925 return result
928class DockerClientVersionLookup(VersionLookup):
929 """Query remote registry via local Docker API
931 No auth needed, however uses the old v1 APIs, and only Index available via API
932 """
934 def __init__(self, client: docker.DockerClient, throttler: Throttler, cfg: RegistryConfig, api_backoff: int = 30) -> None:
935 self.client: docker.DockerClient = client
936 self.throttler: Throttler = throttler
937 self.cfg: RegistryConfig = cfg
938 self.api_backoff: int = api_backoff
939 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup")
941 def lookup(self, local_image_info: DockerImageInfo, retries: int = 3, **kwargs) -> DockerImageInfo: # noqa: ANN003, ARG002
942 retries_left = retries
943 retry_secs: int = self.api_backoff
944 reg_data: RegistryData | None = None
946 result = DockerImageInfo(local_image_info.ref)
947 if local_image_info.index_name is None or local_image_info.ref is None: 947 ↛ 948line 947 didn't jump to line 948 because the condition on line 947 was never true
948 return result
950 while reg_data is None and retries_left > 0:
951 if self.throttler.check_throttle(local_image_info.index_name):
952 result.throttled = True
953 break
954 try:
955 self.log.debug("Fetching registry data", image_ref=local_image_info.ref)
956 reg_data = self.client.images.get_registry_data(local_image_info.ref)
957 self.log.debug(
958 "Registry Data: id:%s,image:%s, attrs:%s",
959 reg_data.id,
960 reg_data.image_name,
961 reg_data.attrs,
962 )
963 if reg_data: 963 ↛ 950line 963 didn't jump to line 950 because the condition on line 963 was always true
964 result.short_digest = result.condense_digest(reg_data.short_id)
965 result.image_digest = result.condense_digest(reg_data.id, short=False)
966 # result.name = reg_data.image_name
967 result.attributes = reg_data.attrs
968 result.annotations = reg_data.attrs.get("Config", {}).get("Labels") or {}
969 result.error = None
971 except docker.errors.APIError as e:
972 if e.status_code == HTTPStatus.TOO_MANY_REQUESTS: 972 ↛ 981line 972 didn't jump to line 981 because the condition on line 972 was always true
973 retry_secs = round(retry_secs**1.5)
974 try:
975 retry_secs = int(e.response.headers.get("Retry-After", -1)) # type: ignore[union-attr]
976 except Exception as e2:
977 self.log.debug("Failed to access headers for retry info: %s", e2)
978 self.throttler.throttle(local_image_info.index_name, retry_secs, e.explanation)
979 result.throttled = True
980 return result
981 result.error = str(e)
982 retries_left -= 1
983 if retries_left == 0 or e.is_client_error():
984 self.log.warn("Failed to fetch registry data: [%s] %s", e.errno, e.explanation)
985 else:
986 self.log.debug("Failed to fetch registry data, retrying: %s", e)
988 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(local_image_info, result)
989 result.custom = labels or {}
990 result.version = cast("str|None", labels.get("image_version"))
991 result.created = cast("str|None", labels.get("image_created"))
992 result.origin = "DOCKER_CLIENT"
993 return result