Coverage for src / updates2mqtt / integrations / docker_enrich.py: 78%
505 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:11 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:11 +0000
1import re
2import typing
3from abc import abstractmethod
4from typing import Any, cast
6import structlog
7from docker.auth import resolve_repository_name
8from docker.models.containers import Container
9from omegaconf import MissingMandatoryValue, OmegaConf, ValidationError
11from updates2mqtt.helpers import (
12 APIStatsCounter,
13 CacheMetadata,
14 ThrottledError,
15 Throttler,
16 fetch_url,
17 httpx_json_content,
18 validate_url,
19)
20from updates2mqtt.model import DiscoveryArtefactDetail, DiscoveryInstallationDetail, ReleaseDetail
22if typing.TYPE_CHECKING:
23 from docker.models.images import RegistryData
24 from httpx import Response
25 from omegaconf.dictconfig import DictConfig
26from http import HTTPStatus
28import docker
29import docker.errors
31from updates2mqtt.config import (
32 PKG_INFO_FILE,
33 SOURCE_PLATFORM_GITHUB,
34 SOURCE_PLATFORM_GITLAB,
35 SOURCE_PLATFORMS,
36 CommonPackages,
37 DockerConfig,
38 DockerPackageUpdateInfo,
39 PackageUpdateInfo,
40 RegistryConfig,
41 VersionPolicy,
42)
44log: Any = structlog.get_logger()
46DIFF_URL_TEMPLATES = {
47 SOURCE_PLATFORM_GITHUB: "{repo}/commit/{revision}",
48}
49RELEASE_URL_TEMPLATES = {
50 SOURCE_PLATFORM_GITHUB: "{repo}/releases/tag/{version}",
51}
52UNKNOWN_RELEASE_URL_TEMPLATES = {
53 SOURCE_PLATFORM_GITHUB: "{repo}/releases",
54 SOURCE_PLATFORM_GITLAB: "{repo}/container_registry",
55}
56MISSING_VAL = "**MISSING**"
57UNKNOWN_REGISTRY = "**UNKNOWN_REGISTRY**"
58UNKNOWN_NAME = "**UNKNOWN_NAME**"
60HEADER_DOCKER_DIGEST = "docker-content-digest"
61HEADER_DOCKER_API = "docker-distribution-api-version"
63TOKEN_URL_TEMPLATE = "https://{auth_host}/token?scope=repository:{image_name}:pull&service={service}" # noqa: S105 # nosec
65REGISTRY_GHCR = "ghcr.io"
66REGISTRY_DOCKER = "docker.io"
67REGISTRY_MCR = "mcr.microsoft.com"
68REGISTRY_QUAY = "quay.io"
69REGISTRY_LSCR = "lscr.io"
70REGISTRY_CODEBERG = "codeberg.org"
71REGISTRY_GITLAB = "registry.gitlab.com"
73REGISTRIES: dict[str, tuple[str | None, str, str, str | None, str | None]] = {
74 # registry: (auth_host, api_host, service, url_template, repo_template)
75 REGISTRY_DOCKER: ("auth.docker.io", "registry-1.docker.io", "registry.docker.io", TOKEN_URL_TEMPLATE, None),
76 REGISTRY_MCR: (None, "mcr.microsoft.com", "mcr.microsoft.com", None, None),
77 REGISTRY_QUAY: (None, "quay.io", "quay.io", TOKEN_URL_TEMPLATE, None),
78 REGISTRY_GHCR: ("ghcr.io", "ghcr.io", "ghcr.io", TOKEN_URL_TEMPLATE, "https://github.com/{image_name}"),
79 "lscr.io": ("ghcr.io", "lscr.io", "ghcr.io", TOKEN_URL_TEMPLATE, None),
80 REGISTRY_CODEBERG: (
81 "codeberg.org",
82 "codeberg.org",
83 "container_registry",
84 TOKEN_URL_TEMPLATE,
85 "https://codeberg.org/{image_name}",
86 ),
87 REGISTRY_GITLAB: (
88 "www.gitlab.com",
89 "registry.gitlab.com",
90 "container_registry",
91 "https://{auth_host}/jwt/auth?service={service}&scope=repository:{image_name}:pull&offline_token=true&client_id=docker",
92 "https://gitlab.com/{image_name}",
93 ),
94}
96# source: https://specs.opencontainers.org/distribution-spec/?v=v1.0.0#pull
97OCI_NAME_RE = r"[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*(\/[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*)*"
98OCI_TAG_RE = r"[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}"
101class DockerImageInfo(DiscoveryArtefactDetail):
102 """Normalize and shlep around the bits of an image def
104 index_name: aka index_name, e.g. ghcr.io
105 name: image ref without index name or tag, e.g. nginx, or librenms/librenms
106 tag: tag or digest
107 untagged_ref: combined index name and package name
108 """
110 def __init__(
111 self,
112 ref: str, # ref with optional index name and tag or digest, index:name:tag_or_digest
113 image_digest: str | None = None,
114 tags: list[str] | None = None,
115 attributes: dict[str, Any] | None = None,
116 annotations: dict[str, Any] | None = None,
117 platform: str | None = None, # test harness simplification
118 version: str | None = None, # test harness simplification
119 created: str | None = None,
120 ) -> None:
121 super().__init__()
122 self.ref: str = ref
123 self.version: str | None = version
124 self.image_digest: str | None = image_digest
125 self.short_digest: str | None = None
126 self.repo_digest: str | None = None # the single RepoDigest known to match registry
127 self.git_digest: str | None = None
128 self.index_name: str | None = None
129 self.name: str | None = None
130 self.tag: str | None = None
131 self.pinned_digest: str | None = None
132 # untagged ref using combined index and remote name used only for pattern matching common pkg info
133 self.untagged_ref: str | None = None # index_name/remote_name used for pkg match
134 self.tag_or_digest: str | None = None # index_name/remote_name:**tag_or_digest**
135 self.tags = tags
136 self.attributes: dict[str, Any] = attributes or {}
137 self.annotations: dict[str, Any] = annotations or {}
138 self.throttled: bool = False
139 self.origin: str | None = None
140 self.error: str | None = None
141 self.platform: str | None = platform
142 self.custom: dict[str, str | float | int | bool | None] = {}
143 self.created: str | None = created
145 self.local_build: bool = not self.repo_digests
146 self.index_name, remote_name = resolve_repository_name(ref)
148 self.name = remote_name
150 if remote_name and ":" in remote_name and ("@" not in remote_name or remote_name.index("@") > remote_name.index(":")):
151 # name:tag format
152 self.name, self.tag_or_digest = remote_name.split(":", 1)
153 self.untagged_ref = ref.split(":", 1)[0]
154 self.tag = self.tag_or_digest
156 elif remote_name and "@" in remote_name:
157 # name@digest format
158 self.name, self.tag_or_digest = remote_name.split("@", 1)
159 self.untagged_ref = ref.split("@", 1)[0]
160 self.pinned_digest = self.tag_or_digest
162 if self.tag and "@" in self.tag:
163 # name:tag@digest format
164 # for pinned tags, care only about the digest part
165 self.tag, self.tag_or_digest = self.tag.split("@", 1)
166 self.pinned_digest = self.tag_or_digest
167 if self.tag_or_digest is None:
168 self.tag_or_digest = "latest"
169 self.untagged_ref = ref
170 self.tag = self.tag_or_digest
172 if self.repo_digest is None and len(self.repo_digests) == 1:
173 # definite known RepoDigest
174 # if its ambiguous, the final version selection will handle it
175 self.repo_digest = self.repo_digests[0]
177 if self.index_name == "docker.io" and "/" not in self.name:
178 # "official Docker images have an abbreviated library/foo name"
179 self.name = f"library/{self.name}"
180 if self.name is not None and not re.match(OCI_NAME_RE, self.name): 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true
181 log.warning("Invalid OCI image name: %s", self.name)
182 if self.tag and not re.match(OCI_TAG_RE, self.tag): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 log.warning("Invalid OCI image tag: %s", self.tag)
184 if "/" in self.name:
185 self.unqualified_name: str = self.name.split("/", 1)[1]
186 else:
187 self.unqualified_name = self.name
189 if self.os and self.arch:
190 self.platform = "/".join(
191 filter(
192 None,
193 [self.os, self.arch, self.variant],
194 ),
195 )
197 if self.image_digest is not None:
198 self.image_digest = self.condense_digest(self.image_digest, short=False)
199 self.short_digest = self.condense_digest(self.image_digest) # type: ignore[arg-type]
201 @property
202 def repo_digests(self) -> list[str]:
203 if self.repo_digest:
204 return [self.repo_digest]
205 # RepoDigest in image inspect, Registry Config object
206 digests = [v.split("@", 1)[1] if "@" in v else v for v in self.attributes.get("RepoDigests", [])]
207 return digests or []
209 @property
210 def pinned(self) -> bool:
211 """Check if this is pinned and installed version consistent with pin"""
212 return bool(self.pinned_digest and self.pinned_digest in self.repo_digests)
214 @property
215 def os(self) -> str | None:
216 return self.attributes.get("Os")
218 @property
219 def arch(self) -> str | None:
220 return self.attributes.get("Architecture")
222 @property
223 def variant(self) -> str | None:
224 return self.attributes.get("Variant")
226 def condense_digest(self, digest: str, short: bool = True) -> str | None:
227 try:
228 digest = digest.split("@")[1] if "@" in digest else digest # fully qualified RepoDigest
229 if short:
230 digest = digest.split(":")[1] if ":" in digest else digest # remove digest type prefix
231 return digest[0:12]
232 return digest
233 except Exception as e:
234 log.warning("Unable to condense digest %s: %s", digest, e)
235 return None
237 def reuse(self) -> "DockerImageInfo":
238 cloned = DockerImageInfo(
239 self.ref, self.image_digest, self.tags, self.attributes, self.annotations, self.version, self.created
240 )
241 cloned.origin = "REUSED"
242 return cloned
244 def as_dict(self, minimal: bool = True) -> dict[str, str | list | dict | bool | int | None]:
245 result: dict[str, str | list | dict | bool | int | None] = {
246 "captured": self.captured.isoformat(),
247 "image_ref": self.ref,
248 "name": self.name,
249 "version": self.version,
250 "image_digest": self.image_digest,
251 "repo_digest": self.repo_digest,
252 "repo_digests": self.repo_digest,
253 "git_digest": self.git_digest,
254 "index_name": self.index_name,
255 "tag": self.tag,
256 "pinned_digest": self.pinned_digest,
257 "tag_or_digest": self.tag_or_digest,
258 "tags": self.tags,
259 "origin": self.origin,
260 "platform": self.platform,
261 "local_build": self.local_build,
262 "error": self.error,
263 "throttled": self.throttled,
264 "custom": self.custom,
265 }
266 if not minimal:
267 result["attributes"] = self.attributes
268 result["annotations"] = self.annotations
269 return result
272def id_source_platform(source: str | None) -> str | None:
273 candidates: list[str] = [platform for platform, pattern in SOURCE_PLATFORMS.items() if re.match(pattern, source or "")]
274 return candidates[0] if candidates else None
277def _select_annotation(
278 name: str, key: str, local_info: DockerImageInfo | None = None, registry_info: DockerImageInfo | None = None
279) -> dict[str, str | None]:
280 result: dict[str, str | None] = {}
281 if registry_info:
282 v: Any | None = registry_info.annotations.get(key)
283 if v is not None:
284 result[name] = v
285 elif local_info: 285 ↛ 289line 285 didn't jump to line 289 because the condition on line 285 was always true
286 v = local_info.annotations.get(key)
287 if v is not None: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true
288 result[name] = v
289 return result
292def cherrypick_annotations(
293 local_info: DockerImageInfo, registry_info: DockerImageInfo | None
294) -> dict[str, str | float | int | bool | None]:
295 """https://github.com/opencontainers/image-spec/blob/main/annotations.md"""
296 results: dict[str, str | float | int | bool | None] = {}
297 for either_name, either_label in [
298 ("documentation_url", "org.opencontainers.image.documentation"),
299 ("description", "org.opencontainers.image.description"),
300 ("licences", "org.opencontainers.image.licenses"),
301 ("image_base", "org.opencontainers.image.base.name"),
302 ("image_created", "org.opencontainers.image.created"),
303 ("image_version", "org.opencontainers.image.version"),
304 ("image_revision", "org.opencontainers.image.revision"),
305 ("ref_name", "org.opencontainers.image.ref.name"),
306 ("title", "org.opencontainers.image.title"),
307 ("vendor", "org.opencontainers.image.vendor"),
308 ("source", "org.opencontainers.image.source"),
309 ]:
310 results.update(_select_annotation(either_name, either_label, local_info, registry_info))
311 if ( 311 ↛ 317line 311 didn't jump to line 317 because the condition on line 311 was never true
312 results.get("ref_name") == "ubuntu"
313 and local_info.name != "ubuntu"
314 and results.get("image_version")
315 and re.fullmatch(r"^2\d\.\d\d$", cast("str", results["image_version"]))
316 ):
317 log.debug(
318 "Suppressing %s base %s version leaking into image version: %s",
319 local_info.name,
320 results["ref_name"],
321 results["image_version"],
322 )
323 del results["image_version"]
324 return results
327class DockerServiceDetails(DiscoveryInstallationDetail):
328 def __init__(
329 self,
330 container_name: str | None = None,
331 compose_path: str | None = None,
332 compose_version: str | None = None,
333 compose_service: str | None = None,
334 git_repo_path: str | None = None,
335 ) -> None:
336 self.container_name: str | None = container_name
337 self.compose_path: str | None = compose_path
338 self.compose_version: str | None = compose_version
339 self.compose_service: str | None = compose_service
340 self.git_repo_path: str | None = git_repo_path
341 self.git_local_timestamp: str | None = None
343 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]:
344 results: dict[str, str | list | dict | bool | int | None] = {
345 "container_name": self.container_name,
346 "compose_path": self.compose_path,
347 "compose_service": self.compose_service,
348 "compose_version": self.compose_version,
349 }
350 if self.git_local_timestamp:
351 results["git_local_timestamp"] = self.git_local_timestamp
352 if self.git_repo_path:
353 results["git_repo_path"] = self.git_repo_path
354 return results
357class LocalContainerInfo:
358 def build_image_info(self, container: Container) -> tuple[DockerImageInfo, DockerServiceDetails]:
359 """Image contents equiv to `docker inspect image <image_ref>`"""
360 # container image can be none if someone ran `docker rmi -f`
361 # so although this could be sourced from image, like `container.image.tags[0]`
362 # use the container ref instead, which survives monkeying about with images
363 image_ref: str = container.attrs.get("Config", {}).get("Image") or ""
364 image_digest = container.attrs.get("Image")
366 image_info: DockerImageInfo = DockerImageInfo(
367 image_ref,
368 image_digest=image_digest,
369 tags=container.image.tags if container and container.image else None,
370 annotations=container.image.labels if container.image else None,
371 attributes=container.image.attrs if container.image else None,
372 )
373 service_info: DockerServiceDetails = DockerServiceDetails(
374 container.name,
375 compose_path=container.labels.get("com.docker.compose.project.working_dir"),
376 compose_service=container.labels.get("com.docker.compose.service"),
377 compose_version=container.labels.get("com.docker.compose.version"),
378 )
380 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(image_info, None)
381 # capture container labels/annotations, not image ones
382 labels = labels or {}
383 if container.image and container.image.attrs: 383 ↛ 385line 383 didn't jump to line 385 because the condition on line 383 was always true
384 image_info.created = container.image.attrs.get("Created")
385 image_info.custom = labels
386 image_info.version = cast("str|None", labels.get("image_version"))
388 return image_info, service_info
391class PackageEnricher:
392 def __init__(self, docker_cfg: DockerConfig, packages: dict[str, PackageUpdateInfo] | None = None) -> None:
393 self.pkgs: dict[str, PackageUpdateInfo] = packages or {}
394 self.cfg: DockerConfig = docker_cfg
395 self.log: Any = structlog.get_logger().bind(integration="docker")
397 def initialize(self) -> None:
398 pass
400 def enrich(self, image_info: DockerImageInfo) -> PackageUpdateInfo | None:
401 def match(pkg: PackageUpdateInfo) -> bool:
402 if pkg is not None and pkg.docker is not None and pkg.docker.image_name is not None: 402 ↛ 407line 402 didn't jump to line 407 because the condition on line 402 was always true
403 if image_info.untagged_ref is not None and image_info.untagged_ref == pkg.docker.image_name:
404 return True
405 if image_info.ref is not None and image_info.ref == pkg.docker.image_name:
406 return True
407 return False
409 if image_info.untagged_ref is not None and image_info.ref is not None: 409 ↛ 419line 409 didn't jump to line 419 because the condition on line 409 was always true
410 for pkg in self.pkgs.values():
411 if match(pkg):
412 self.log.debug(
413 "Found common package",
414 image_name=pkg.docker.image_name, # type: ignore [union-attr]
415 logo_url=pkg.logo_url,
416 relnotes_url=pkg.release_notes_url,
417 )
418 return pkg
419 return None
422class DefaultPackageEnricher(PackageEnricher):
423 def enrich(self, image_info: DockerImageInfo) -> PackageUpdateInfo | None:
424 self.log.debug("Default pkg info", image_name=image_info.untagged_ref, image_ref=image_info.ref)
425 return PackageUpdateInfo(
426 DockerPackageUpdateInfo(image_info.untagged_ref or image_info.ref, version_policy=VersionPolicy.AUTO),
427 logo_url=self.cfg.default_entity_picture_url,
428 release_notes_url=None,
429 )
432class CommonPackageEnricher(PackageEnricher):
433 def initialize(self) -> None:
434 base_cfg: DictConfig = OmegaConf.structured(CommonPackages)
435 if PKG_INFO_FILE.exists(): 435 ↛ 442line 435 didn't jump to line 442 because the condition on line 435 was always true
436 self.log.debug("Loading common package update info", path=PKG_INFO_FILE)
437 cfg: DictConfig = typing.cast("DictConfig", OmegaConf.merge(base_cfg, OmegaConf.load(PKG_INFO_FILE)))
439 OmegaConf.to_container(cfg, throw_on_missing=True)
440 OmegaConf.set_readonly(cfg, True)
441 else:
442 self.log.warn("No common package update info found", path=PKG_INFO_FILE)
443 cfg = base_cfg
444 try:
445 common_config: CommonPackages = typing.cast("CommonPackages", cfg)
446 # omegaconf broken-ness on optional fields and converting to backclasses
447 self.pkgs = common_config.common_packages
448 # self.pkgs: dict[str, PackageUpdateInfo] = {
449 # pkg: PackageUpdateInfo(**pkg_cfg) for pkg, pkg_cfg in cfg.common_packages.items() if pkg not in self.pkgs
450 # }
451 except (MissingMandatoryValue, ValidationError) as e:
452 self.log.serror("Configuration error %s", e, path=PKG_INFO_FILE.as_posix())
453 raise
456class LinuxServerIOPackageEnricher(PackageEnricher):
457 def initialize(self) -> None:
458 cfg = self.cfg.discover_metadata.get("linuxserver.io")
459 if cfg is None or not cfg.enabled:
460 return
462 self.log.debug(f"Fetching linuxserver.io metadata from API, cache_ttl={cfg.cache_ttl}")
463 response: Response | None = fetch_url(
464 "https://api.linuxserver.io/api/v1/images?include_config=false&include_deprecated=false",
465 cache_ttl=cfg.cache_ttl,
466 )
467 if response and response.is_success:
468 api_data: Any = response.json()
469 repos: list = api_data.get("data", {}).get("repositories", {}).get("linuxserver", [])
470 else:
471 return
473 added = 0
474 for repo in repos:
475 image_name = repo.get("name")
476 if image_name and image_name not in self.pkgs: 476 ↛ 474line 476 didn't jump to line 474 because the condition on line 476 was always true
477 self.pkgs[image_name] = PackageUpdateInfo(
478 DockerPackageUpdateInfo(f"lscr.io/linuxserver/{image_name}"),
479 logo_url=repo["project_logo"],
480 release_notes_url=f"{repo['github_url']}/releases",
481 )
482 added += 1
483 self.log.debug(f"Added {added} linuxserver.io package details")
486class SourceReleaseEnricher:
487 def __init__(self) -> None:
488 self.log: Any = structlog.get_logger().bind(integration="docker")
490 def enrich(
491 self, registry_info: DockerImageInfo, source_repo_url: str | None = None, notes_url: str | None = None
492 ) -> ReleaseDetail | None:
493 detail = ReleaseDetail(registry_info.name or UNKNOWN_NAME)
495 detail.notes_url = notes_url
496 detail.version = registry_info.annotations.get("org.opencontainers.image.version")
497 detail.revision = registry_info.annotations.get("org.opencontainers.image.revision")
498 # explicit source_repo_url overrides container, e.g. where container source is only the docker wrapper
499 detail.source_url = source_repo_url or registry_info.annotations.get("org.opencontainers.image.source")
501 if detail.source_url is None and registry_info is not None and registry_info.index_name is not None:
502 registry_config: tuple[str | None, str, str, str | None, str | None] | None = REGISTRIES.get(
503 registry_info.index_name
504 )
505 repo_template: str | None = registry_config[4] if registry_config else None
506 if repo_template:
507 source_url = repo_template.format(image_name=registry_info.name)
508 if validate_url(source_url, cache_ttl=86400): 508 ↛ 512line 508 didn't jump to line 512 because the condition on line 508 was always true
509 detail.source_url = source_url
510 self.log.info("Implied source from registry: %s", detail.source_url)
512 if detail.source_url is None and detail.notes_url is None and detail.revision is None and detail.version is None:
513 return None
515 if detail.source_url and "#" in detail.source_url:
516 detail.source_repo_url = detail.source_url.split("#", 1)[0]
517 self.log.debug("Simplifying %s from %s", detail.source_repo_url, detail.source_url)
518 else:
519 detail.source_repo_url = detail.source_url
521 detail.source_platform = id_source_platform(detail.source_repo_url)
522 if not detail.source_platform:
523 self.log.debug("No known source platform found on container", source=detail.source_repo_url)
524 return detail
526 template_vars: dict[str, str | None] = {
527 "version": detail.version or MISSING_VAL,
528 "revision": detail.revision or MISSING_VAL,
529 "repo": detail.source_repo_url or MISSING_VAL,
530 "source": detail.source_url or MISSING_VAL,
531 }
533 diff_url_template: str | None = DIFF_URL_TEMPLATES.get(detail.source_platform)
534 diff_url: str | None = diff_url_template.format(**template_vars) if diff_url_template else None
535 if diff_url and MISSING_VAL not in diff_url and validate_url(diff_url, cache_ttl=3600): 535 ↛ 536line 535 didn't jump to line 536 because the condition on line 535 was never true
536 detail.diff_url = diff_url
537 else:
538 diff_url = None
540 if detail.notes_url is None and detail.source_platform in RELEASE_URL_TEMPLATES:
541 platform_notes_url: str | None = RELEASE_URL_TEMPLATES[detail.source_platform].format(**template_vars)
542 if ( 542 ↛ 547line 542 didn't jump to line 547 because the condition on line 542 was never true
543 platform_notes_url
544 and MISSING_VAL not in platform_notes_url
545 and validate_url(platform_notes_url, cache_ttl=86400)
546 ):
547 self.log.debug("Setting default known release notes url: %s", platform_notes_url)
548 detail.notes_url = platform_notes_url
550 if detail.notes_url is None and detail.source_platform in UNKNOWN_RELEASE_URL_TEMPLATES:
551 platform_notes_url = UNKNOWN_RELEASE_URL_TEMPLATES[detail.source_platform].format(**template_vars)
552 if (
553 platform_notes_url
554 and MISSING_VAL not in platform_notes_url
555 and validate_url(platform_notes_url, cache_ttl=86400)
556 ):
557 self.log.debug("Setting default unknown release notes url: %s", platform_notes_url)
558 detail.notes_url = platform_notes_url
560 return detail
563class AuthError(Exception):
564 pass
567class VersionLookup:
568 def __init__(self) -> None:
569 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup")
571 @abstractmethod
572 def lookup(self, local_image_info: DockerImageInfo, **kwargs) -> DockerImageInfo: # noqa: ANN003
573 pass
576class ContainerDistributionAPIVersionLookup(VersionLookup):
577 def __init__(self, throttler: Throttler, cfg: RegistryConfig) -> None:
578 self.throttler: Throttler = throttler
579 self.cfg: RegistryConfig = cfg
580 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup")
581 self.api_stats = APIStatsCounter()
583 def fetch_token(self, registry: str, image_name: str) -> str | None:
584 default_host: tuple[str, str, str, str, None] = (registry, registry, registry, TOKEN_URL_TEMPLATE, None)
585 auth_host: str | None = REGISTRIES.get(registry, default_host)[0]
586 if auth_host is None: 586 ↛ 587line 586 didn't jump to line 587 because the condition on line 586 was never true
587 return None
589 service: str = REGISTRIES.get(registry, default_host)[2]
590 url_template: str | None = REGISTRIES.get(registry, default_host)[3]
591 auth_url: str | None = (
592 url_template.format(auth_host=auth_host, image_name=image_name, service=service) if url_template else None
593 )
594 if auth_url is None: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true
595 return None
596 response: Response | None = fetch_url(
597 auth_url, cache_ttl=self.cfg.token_cache_ttl, follow_redirects=True, api_stats_counter=self.api_stats
598 )
600 if response and response.is_success: 600 ↛ 608line 600 didn't jump to line 608 because the condition on line 600 was always true
601 api_data = httpx_json_content(response, {})
602 token: str | None = api_data.get("token") if api_data else None
603 if token: 603 ↛ 605line 603 didn't jump to line 605 because the condition on line 603 was always true
604 return token
605 self.log.warning("No token found in response for %s", auth_url)
606 raise AuthError(f"No token found in response for {image_name}")
608 self.log.debug(
609 "Non-success response at %s fetching token: %s",
610 auth_url,
611 (response and response.status_code) or None,
612 )
613 if response and response.status_code == 404:
614 self.log.debug(
615 "Default token URL %s not found, calling /v2 endpoint to validate OCI API and provoke auth", auth_url
616 )
617 response = fetch_url(
618 f"https://{auth_host}/v2",
619 follow_redirects=True,
620 allow_stale=False,
621 cache_ttl=0,
622 api_stats_counter=self.api_stats,
623 )
625 if response and response.status_code == 401:
626 auth = response.headers.get("www-authenticate")
627 if not auth:
628 self.log.warning("No www-authenticate header found in 401 response for %s", auth_url)
629 raise AuthError(f"No www-authenticate header found on 401 for {image_name}")
630 match = re.search(r'realm="([^"]+)",service="([^"]+)",scope="([^"]+)"', auth)
631 if not match:
632 self.log.warning("No realm/service/scope found in www-authenticate header for %s", auth_url)
633 raise AuthError(f"No realm/service/scope found on 401 headers for {image_name}")
635 realm, service, scope = match.groups()
636 auth_url = f"{realm}?service={service}&scope={scope}"
637 response = fetch_url(auth_url, follow_redirects=True, api_stats_counter=self.api_stats)
639 if response and response.is_success:
640 token_data = response.json()
641 self.log.debug("Fetched registry token from %s", auth_url)
642 return token_data.get("token")
643 self.log.warning(
644 "Alternative auth %s with status %s has no token", auth_url, (response and response.status_code) or None
645 )
646 elif response:
647 self.log.warning("Auth %s failed with status %s", auth_url, (response and response.status_code) or None)
649 raise AuthError(f"Failed to fetch token for {image_name} at {auth_url}")
651 def fetch_index(
652 self, api_host: str, local_image_info: DockerImageInfo, token: str | None
653 ) -> tuple[Any | None, str | None, CacheMetadata | None]:
654 if local_image_info.tag: 654 ↛ 658line 654 didn't jump to line 658 because the condition on line 654 was always true
655 api_url: str = f"https://{api_host}/v2/{local_image_info.name}/manifests/{local_image_info.tag}"
656 cache_ttl: int | None = self.cfg.mutable_cache_ttl
657 else:
658 api_url = f"https://{api_host}/v2/{local_image_info.name}/manifests/{local_image_info.pinned_digest}"
659 cache_ttl = self.cfg.immutable_cache_ttl
661 response: Response | None = fetch_url(
662 api_url,
663 cache_ttl=cache_ttl,
664 bearer_token=token,
665 response_type=[
666 "application/vnd.oci.image.index.v1+json",
667 "application/vnd.docker.distribution.manifest.list.v2+json",
668 ],
669 api_stats_counter=self.api_stats,
670 )
672 if response is None: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true
673 self.log.warning("Empty response for manifest for image at %s", api_url)
674 elif response.status_code == 429: 674 ↛ 675line 674 didn't jump to line 675 because the condition on line 674 was never true
675 self.throttler.throttle(local_image_info.index_name, raise_exception=True)
676 elif not response.is_success:
677 api_data = httpx_json_content(response, {})
678 self.log.warning(
679 "Failed to fetch index from %s: %s",
680 api_url,
681 api_data.get("errors") if api_data else response.text,
682 )
683 else:
684 index = response.json()
685 self.log.debug(
686 "INDEX %s manifests, %s annotations, api: %s, header digest: %s",
687 len(index.get("manifests", [])),
688 len(index.get("annotations", [])),
689 response.headers.get(HEADER_DOCKER_API, "N/A"),
690 response.headers.get(HEADER_DOCKER_DIGEST, "N/A"),
691 )
692 return index, response.headers.get(HEADER_DOCKER_DIGEST), CacheMetadata(response)
693 return None, None, None
695 def fetch_object(
696 self,
697 api_host: str,
698 local_image_info: DockerImageInfo,
699 media_type: str,
700 digest: str,
701 token: str | None,
702 follow_redirects: bool = False,
703 api_type: str = "manifests",
704 ) -> tuple[Any | None, CacheMetadata | None]:
705 api_url = f"https://{api_host}/v2/{local_image_info.name}/{api_type}/{digest}"
706 response = fetch_url(
707 api_url,
708 cache_ttl=self.cfg.immutable_cache_ttl,
709 bearer_token=token,
710 response_type=media_type,
711 allow_stale=True,
712 follow_redirects=follow_redirects,
713 api_stats_counter=self.api_stats,
714 )
716 if response and response.is_success:
717 obj = httpx_json_content(response, None)
718 if obj: 718 ↛ 745line 718 didn't jump to line 745 because the condition on line 718 was always true
719 self.log.debug(
720 "%s, header digest:%s, api: %s, %s annotations",
721 api_type.upper(),
722 response.headers.get(HEADER_DOCKER_DIGEST, "N/A"),
723 response.headers.get(HEADER_DOCKER_API, "N/A"),
724 len(obj.get("annotations", [])),
725 )
726 return obj, CacheMetadata(response)
727 elif response and response.status_code == 429: 727 ↛ 728line 727 didn't jump to line 728 because the condition on line 727 was never true
728 self.throttler.throttle(local_image_info.index_name, raise_exception=True)
729 elif response and not response.is_success: 729 ↛ 744line 729 didn't jump to line 744 because the condition on line 729 was always true
730 api_data = httpx_json_content(response, {})
731 if response: 731 ↛ 739line 731 didn't jump to line 739 because the condition on line 731 was always true
732 self.log.warning(
733 "Failed to fetch obj from %s: %s %s",
734 api_url,
735 response.status_code,
736 api_data.get("errors") if api_data else response.text,
737 )
738 else:
739 self.log.warning(
740 "Failed to fetch obj from %s: No Response, %s", api_url, api_data.get("errors") if api_data else None
741 )
743 else:
744 self.log.error("Empty response from %s", api_url)
745 return None, None
747 def lookup(
748 self,
749 local_image_info: DockerImageInfo,
750 token: str | None = None,
751 minimal: bool = False,
752 **kwargs, # noqa: ANN003, ARG002
753 ) -> DockerImageInfo:
754 result: DockerImageInfo = DockerImageInfo(local_image_info.ref)
755 if not local_image_info.name or not local_image_info.index_name: 755 ↛ 756line 755 didn't jump to line 756 because the condition on line 755 was never true
756 self.log.debug("No local pkg name or registry index name to check")
757 return result
759 if self.throttler.check_throttle(local_image_info.index_name): 759 ↛ 760line 759 didn't jump to line 760 because the condition on line 759 was never true
760 result.throttled = True
761 return result
763 if token: 763 ↛ 764line 763 didn't jump to line 764 because the condition on line 763 was never true
764 self.log.debug("Using provided token to fetch manifest for image %s", local_image_info.ref)
765 else:
766 try:
767 token = self.fetch_token(local_image_info.index_name, local_image_info.name)
768 except AuthError as e:
769 self.log.warning("Authentication error prevented Docker Registry enrichment: %s", e)
770 result.error = str(e)
771 return result
773 index: Any | None = None
774 index_digest: str | None = None # fetched from header, should be the image digest
775 index_cache_metadata: CacheMetadata | None = None
776 manifest_cache_metadata: CacheMetadata | None = None
777 config_cache_metadata: CacheMetadata | None = None
778 api_host: str | None = REGISTRIES.get(
779 local_image_info.index_name, (local_image_info.index_name, local_image_info.index_name)
780 )[1]
781 if api_host is None: 781 ↛ 782line 781 didn't jump to line 782 because the condition on line 781 was never true
782 self.log("No API host can be determined for %s", local_image_info.index_name)
783 return result
784 try:
785 index, index_digest, index_cache_metadata = self.fetch_index(api_host, local_image_info, token)
786 except ThrottledError:
787 result.throttled = True
788 index = None
790 if index:
791 result.annotations = index.get("annotations", {})
792 for m in index.get("manifests", []):
793 platform_info = m.get("platform", {})
794 if (
795 platform_info.get("os") == local_image_info.os
796 and platform_info.get("architecture") == local_image_info.arch
797 and ("Variant" not in platform_info or platform_info.get("Variant") == local_image_info.variant)
798 ):
799 if index_digest:
800 result.image_digest = index_digest
801 result.short_digest = result.condense_digest(index_digest)
802 self.log.debug("Setting %s image digest %s", result.name, result.short_digest)
804 digest: str | None = m.get("digest")
805 media_type = m.get("mediaType")
806 manifest: Any | None = None
808 if digest: 808 ↛ 816line 808 didn't jump to line 816 because the condition on line 808 was always true
809 try:
810 manifest, manifest_cache_metadata = self.fetch_object(
811 api_host, local_image_info, media_type, digest, token
812 )
813 except ThrottledError:
814 result.throttled = True
816 if manifest:
817 digest = manifest.get("config", {}).get("digest")
818 if digest is None: 818 ↛ 819line 818 didn't jump to line 819 because the condition on line 818 was never true
819 self.log.warning("Empty digest for %s %s %s", api_host, digest, media_type)
820 else:
821 result.repo_digest = result.condense_digest(digest, short=False)
822 self.log.debug("Setting %s repo digest: %s", result.name, result.repo_digest)
824 if manifest.get("annotations"):
825 result.annotations.update(manifest.get("annotations", {}))
826 else:
827 self.log.debug("No annotations found in manifest: %s", manifest)
829 if not minimal and manifest.get("config"): 829 ↛ 792line 829 didn't jump to line 792 because the condition on line 829 was always true
830 try:
831 img_config, config_cache_metadata = self.fetch_object(
832 api_host=api_host,
833 local_image_info=local_image_info,
834 media_type=manifest["config"].get("mediaType"),
835 digest=manifest["config"].get("digest"),
836 token=token,
837 follow_redirects=True,
838 api_type="blobs",
839 )
840 if img_config:
841 config = img_config.get("config") or img_config.get("Config")
842 if config and "Labels" in config: 842 ↛ 844line 842 didn't jump to line 844 because the condition on line 842 was always true
843 result.annotations.update(config.get("Labels") or {})
844 result.annotations.update(img_config.get("annotations") or {})
845 result.created = config.get("created") or config.get("Created")
846 else:
847 self.log.debug("No config found: %s", manifest)
848 except Exception as e:
849 self.log.warning("Failed to extract %s image info from config: %s", local_image_info.ref, e)
851 if not result.annotations:
852 self.log.debug("No annotations found from registry data")
854 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(local_image_info, result)
855 result.custom = labels or {}
856 if index_cache_metadata:
857 result.custom["index_cache_age"] = index_cache_metadata.age
858 if manifest_cache_metadata:
859 result.custom["manifest_cache_age"] = manifest_cache_metadata.age
860 if config_cache_metadata:
861 result.custom["config_cache_age"] = config_cache_metadata.age
862 result.version = cast("str|None", labels.get("image_version"))
863 result.origin = "OCI_V2" if not minimal else "OCI_V2_MINIMAL"
865 self.log.debug(
866 "OCI_V2 Lookup for %s: short_digest:%s, repo_digest:%s, version: %s",
867 local_image_info.name,
868 result.short_digest,
869 result.repo_digest,
870 result.version,
871 )
872 return result
875class DockerClientVersionLookup(VersionLookup):
876 """Query remote registry via local Docker API
878 No auth needed, however uses the old v1 APIs, and only Index available via API
879 """
881 def __init__(self, client: docker.DockerClient, throttler: Throttler, cfg: RegistryConfig, api_backoff: int = 30) -> None:
882 self.client: docker.DockerClient = client
883 self.throttler: Throttler = throttler
884 self.cfg: RegistryConfig = cfg
885 self.api_backoff: int = api_backoff
886 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup")
888 def lookup(self, local_image_info: DockerImageInfo, retries: int = 3, **kwargs) -> DockerImageInfo: # noqa: ANN003, ARG002
889 retries_left = retries
890 retry_secs: int = self.api_backoff
891 reg_data: RegistryData | None = None
893 result = DockerImageInfo(local_image_info.ref)
894 if local_image_info.index_name is None or local_image_info.ref is None: 894 ↛ 895line 894 didn't jump to line 895 because the condition on line 894 was never true
895 return result
897 while reg_data is None and retries_left > 0:
898 if self.throttler.check_throttle(local_image_info.index_name):
899 result.throttled = True
900 break
901 try:
902 self.log.debug("Fetching registry data", image_ref=local_image_info.ref)
903 reg_data = self.client.images.get_registry_data(local_image_info.ref)
904 self.log.debug(
905 "Registry Data: id:%s,image:%s, attrs:%s",
906 reg_data.id,
907 reg_data.image_name,
908 reg_data.attrs,
909 )
910 if reg_data: 910 ↛ 897line 910 didn't jump to line 897 because the condition on line 910 was always true
911 result.short_digest = result.condense_digest(reg_data.short_id)
912 result.image_digest = result.condense_digest(reg_data.id, short=False)
913 # result.name = reg_data.image_name
914 result.attributes = reg_data.attrs
915 result.annotations = reg_data.attrs.get("Config", {}).get("Labels") or {}
916 result.error = None
918 except docker.errors.APIError as e:
919 if e.status_code == HTTPStatus.TOO_MANY_REQUESTS: 919 ↛ 928line 919 didn't jump to line 928 because the condition on line 919 was always true
920 retry_secs = round(retry_secs**1.5)
921 try:
922 retry_secs = int(e.response.headers.get("Retry-After", -1)) # type: ignore[union-attr]
923 except Exception as e2:
924 self.log.debug("Failed to access headers for retry info: %s", e2)
925 self.throttler.throttle(local_image_info.index_name, retry_secs, e.explanation)
926 result.throttled = True
927 return result
928 result.error = str(e)
929 retries_left -= 1
930 if retries_left == 0 or e.is_client_error():
931 self.log.warn("Failed to fetch registry data: [%s] %s", e.errno, e.explanation)
932 else:
933 self.log.debug("Failed to fetch registry data, retrying: %s", e)
935 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(local_image_info, result)
936 result.custom = labels or {}
937 result.version = cast("str|None", labels.get("image_version"))
938 result.created = cast("str|None", labels.get("image_created"))
939 result.origin = "DOCKER_CLIENT"
940 return result