Coverage for src/updates2mqtt/integrations/docker_enrich.py: 83%
521 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-23 07:50 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-23 07:50 +0000
1import re
2import typing
3from abc import abstractmethod
4from typing import Any, cast
6import structlog
7from docker.auth import resolve_repository_name
8from docker.models.containers import Container
9from omegaconf import MissingMandatoryValue, OmegaConf, ValidationError
11from updates2mqtt.helpers import (
12 APIStatsCounter,
13 CacheMetadata,
14 ThrottledError,
15 Throttler,
16 fetch_url,
17 httpx_json_content,
18 validate_url,
19)
20from updates2mqtt.model import DiscoveryArtefactDetail, DiscoveryInstallationDetail, ReleaseDetail
22if typing.TYPE_CHECKING:
23 from docker.models.images import RegistryData
24 from httpx import Response
25 from omegaconf.dictconfig import DictConfig
26from http import HTTPStatus
28import docker
29import docker.errors
31from updates2mqtt.config import (
32 PKG_INFO_FILE,
33 SOURCE_PLATFORM_GITHUB,
34 SOURCE_PLATFORM_GITLAB,
35 SOURCE_PLATFORMS,
36 CommonPackages,
37 DockerConfig,
38 DockerPackageUpdateInfo,
39 MetadataSourceConfig,
40 PackageUpdateInfo,
41 RegistryConfig,
42 VersionPolicy,
43 docker_image_names,
44)
46log: Any = structlog.get_logger()
48DIFF_URL_TEMPLATES = {
49 SOURCE_PLATFORM_GITHUB: "{repo}/commit/{revision}",
50}
51RELEASE_URL_TEMPLATES = {
52 SOURCE_PLATFORM_GITHUB: "{repo}/releases/tag/{version}",
53}
54UNKNOWN_RELEASE_URL_TEMPLATES = {
55 SOURCE_PLATFORM_GITHUB: "{repo}/releases",
56 SOURCE_PLATFORM_GITLAB: "{repo}/container_registry",
57}
58MISSING_VAL = "**MISSING**"
59UNKNOWN_REGISTRY = "**UNKNOWN_REGISTRY**"
60UNKNOWN_NAME = "**UNKNOWN_NAME**"
62HEADER_DOCKER_DIGEST = "docker-content-digest"
63HEADER_DOCKER_API = "docker-distribution-api-version"
65TOKEN_URL_TEMPLATE = "https://{auth_host}/token?scope=repository:{image_name}:pull&service={service}" # noqa: S105 # nosec
67REGISTRY_GHCR = "ghcr.io"
68REGISTRY_DOCKER = "docker.io"
69REGISTRY_MCR = "mcr.microsoft.com"
70REGISTRY_QUAY = "quay.io"
71REGISTRY_LSCR = "lscr.io"
72REGISTRY_CODEBERG = "codeberg.org"
73REGISTRY_GITLAB = "registry.gitlab.com"
76class RegistryInfo(typing.NamedTuple):
77 auth_host: str | None
78 api_host: str
79 service: str
80 url_template: str | None
81 repo_template: str | None
84REGISTRIES: dict[str, RegistryInfo] = {
85 REGISTRY_DOCKER: RegistryInfo("auth.docker.io", "registry-1.docker.io", "registry.docker.io", TOKEN_URL_TEMPLATE, None),
86 REGISTRY_MCR: RegistryInfo(None, "mcr.microsoft.com", "mcr.microsoft.com", None, None),
87 REGISTRY_QUAY: RegistryInfo(None, "quay.io", "quay.io", TOKEN_URL_TEMPLATE, None),
88 REGISTRY_GHCR: RegistryInfo("ghcr.io", "ghcr.io", "ghcr.io", TOKEN_URL_TEMPLATE, "https://github.com/{image_name}"),
89 "lscr.io": RegistryInfo("ghcr.io", "lscr.io", "ghcr.io", TOKEN_URL_TEMPLATE, None),
90 REGISTRY_CODEBERG: RegistryInfo(
91 "codeberg.org",
92 "codeberg.org",
93 "container_registry",
94 TOKEN_URL_TEMPLATE,
95 "https://codeberg.org/{image_name}",
96 ),
97 REGISTRY_GITLAB: RegistryInfo(
98 "www.gitlab.com",
99 "registry.gitlab.com",
100 "container_registry",
101 "https://{auth_host}/jwt/auth?service={service}&scope=repository:{image_name}:pull&offline_token=true&client_id=docker",
102 "https://gitlab.com/{image_name}",
103 ),
104}
106# source: https://specs.opencontainers.org/distribution-spec/?v=v1.0.0#pull
107OCI_NAME_RE = r"[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*(\/[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*)*"
108OCI_TAG_RE = r"[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}"
111class DockerImageInfo(DiscoveryArtefactDetail):
112 """Normalize and shlep around the bits of an image def
114 index_name: aka index_name, e.g. ghcr.io
115 name: image ref without index name or tag, e.g. nginx, or librenms/librenms
116 tag: tag or digest
117 untagged_ref: combined index name and package name
118 """
120 def __init__(
121 self,
122 ref: str, # ref with optional index name and tag or digest, index:name:tag_or_digest
123 image_digest: str | None = None,
124 tags: list[str] | None = None,
125 attributes: dict[str, Any] | None = None,
126 annotations: dict[str, Any] | None = None,
127 platform: str | None = None, # test harness simplification
128 version: str | None = None, # test harness simplification
129 created: str | None = None,
130 ) -> None:
131 super().__init__()
132 self.ref: str = ref
133 self.version: str | None = version
134 self.image_digest: str | None = image_digest
135 self.short_digest: str | None = None
136 self.repo_digest: str | None = None # the single RepoDigest known to match registry
137 self.git_digest: str | None = None
138 self.index_name: str | None = None
139 self.name: str | None = None
140 self.tag: str | None = None
141 self.pinned_digest: str | None = None
142 # untagged ref using combined index and remote name used only for pattern matching common pkg info
143 self.untagged_ref: str | None = None # index_name/remote_name used for pkg match
144 self.tag_or_digest: str | None = None # index_name/remote_name:**tag_or_digest**
145 self.tags = tags
146 self.attributes: dict[str, Any] = attributes or {}
147 self.annotations: dict[str, Any] = annotations or {}
148 self.throttled: bool = False
149 self.origin: str | None = None
150 self.error: str | None = None
151 self.platform: str | None = platform
152 self.custom: dict[str, str | float | int | bool | None] = {}
153 self.created: str | None = created
155 self.local_build: bool = not self.repo_digests
156 self.index_name, remote_name = resolve_repository_name(ref)
158 self.name = remote_name
160 if remote_name and ":" in remote_name and ("@" not in remote_name or remote_name.index("@") > remote_name.index(":")):
161 # name:tag format
162 self.name, self.tag_or_digest = remote_name.split(":", 1)
163 self.untagged_ref = ref.split(":", 1)[0]
164 self.tag = self.tag_or_digest
166 elif remote_name and "@" in remote_name:
167 # name@digest format
168 self.name, self.tag_or_digest = remote_name.split("@", 1)
169 self.untagged_ref = ref.split("@", 1)[0]
170 self.pinned_digest = self.tag_or_digest
172 if self.tag and "@" in self.tag:
173 # name:tag@digest format
174 # for pinned tags, care only about the digest part
175 self.tag, self.tag_or_digest = self.tag.split("@", 1)
176 self.pinned_digest = self.tag_or_digest
177 if self.tag_or_digest is None:
178 self.tag_or_digest = "latest"
179 self.untagged_ref = ref
180 self.tag = self.tag_or_digest
182 if self.repo_digest is None and len(self.repo_digests) == 1:
183 # definite known RepoDigest
184 # if its ambiguous, the final version selection will handle it
185 self.repo_digest = self.repo_digests[0]
187 if self.index_name == "docker.io" and "/" not in self.name:
188 # "official Docker images have an abbreviated library/foo name"
189 self.name = f"library/{self.name}"
190 if self.name is not None and not re.match(OCI_NAME_RE, self.name):
191 log.warning("Invalid OCI image name: %s", self.name)
192 if self.tag and not re.match(OCI_TAG_RE, self.tag):
193 log.warning("Invalid OCI image tag: %s", self.tag)
194 if "/" in self.name:
195 self.unqualified_name: str = self.name.split("/", 1)[1]
196 else:
197 self.unqualified_name = self.name
199 if self.os and self.arch:
200 self.platform = "/".join(
201 filter(
202 None,
203 [self.os, self.arch, self.variant],
204 ),
205 )
207 if self.image_digest is not None:
208 self.image_digest = self.condense_digest(self.image_digest, short=False)
209 self.short_digest = self.condense_digest(self.image_digest) # type: ignore[arg-type]
211 @property
212 def repo_digests(self) -> list[str]:
213 if self.repo_digest:
214 return [self.repo_digest]
215 # RepoDigest in image inspect, Registry Config object
216 digests = [v.split("@", 1)[1] if "@" in v else v for v in self.attributes.get("RepoDigests", [])]
217 return digests or []
219 @property
220 def pinned(self) -> bool:
221 """Check if this is pinned and installed version consistent with pin"""
222 return bool(self.pinned_digest and self.pinned_digest in self.repo_digests)
224 @property
225 def os(self) -> str | None:
226 return self.attributes.get("Os")
228 @property
229 def arch(self) -> str | None:
230 return self.attributes.get("Architecture")
232 @property
233 def variant(self) -> str | None:
234 return self.attributes.get("Variant")
236 def condense_digest(self, digest: str, short: bool = True) -> str | None:
237 try:
238 digest = digest.split("@")[1] if "@" in digest else digest # fully qualified RepoDigest
239 if short:
240 digest = digest.split(":")[1] if ":" in digest else digest # remove digest type prefix
241 return digest[0:12]
242 return digest
243 except Exception as e:
244 log.warning("Unable to condense digest %s: %s", digest, e)
245 return None
247 def reuse(self) -> "DockerImageInfo":
248 cloned = DockerImageInfo(
249 self.ref, self.image_digest, self.tags, self.attributes, self.annotations, self.version, self.created
250 )
251 cloned.origin = "REUSED"
252 return cloned
254 def as_dict(self, minimal: bool = True) -> dict[str, str | list | dict | bool | int | None]:
255 result: dict[str, str | list | dict | bool | int | None] = {
256 "captured": self.captured.isoformat(),
257 "image_ref": self.ref,
258 "name": self.name,
259 "version": self.version,
260 "image_digest": self.image_digest,
261 "repo_digest": self.repo_digest,
262 "repo_digests": self.repo_digest,
263 "git_digest": self.git_digest,
264 "index_name": self.index_name,
265 "tag": self.tag,
266 "pinned_digest": self.pinned_digest,
267 "tag_or_digest": self.tag_or_digest,
268 "tags": self.tags,
269 "origin": self.origin,
270 "platform": self.platform,
271 "local_build": self.local_build,
272 "error": self.error,
273 "throttled": self.throttled,
274 "custom": self.custom,
275 }
276 if not minimal: 276 ↛ 279line 276 didn't jump to line 279 because the condition on line 276 was always true
277 result["attributes"] = self.attributes
278 result["annotations"] = self.annotations
279 return result
282def id_source_platform(source: str | None) -> str | None:
283 candidates: list[str] = [platform for platform, pattern in SOURCE_PLATFORMS.items() if re.match(pattern, source or "")]
284 return candidates[0] if candidates else None
287def _select_annotation(
288 name: str, key: str, local_info: DockerImageInfo | None = None, registry_info: DockerImageInfo | None = None
289) -> dict[str, str | None]:
290 result: dict[str, str | None] = {}
291 if registry_info:
292 v: Any | None = registry_info.annotations.get(key)
293 if v is not None:
294 result[name] = v
295 elif local_info: 295 ↛ 299line 295 didn't jump to line 299 because the condition on line 295 was always true
296 v = local_info.annotations.get(key)
297 if v is not None: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true
298 result[name] = v
299 return result
302def cherrypick_annotations(
303 local_info: DockerImageInfo, registry_info: DockerImageInfo | None
304) -> dict[str, str | float | int | bool | None]:
305 """https://github.com/opencontainers/image-spec/blob/main/annotations.md"""
306 results: dict[str, str | float | int | bool | None] = {}
307 for either_name, either_label in [
308 ("documentation_url", "org.opencontainers.image.documentation"),
309 ("description", "org.opencontainers.image.description"),
310 ("licences", "org.opencontainers.image.licenses"),
311 ("image_base", "org.opencontainers.image.base.name"),
312 ("image_created", "org.opencontainers.image.created"),
313 ("image_version", "org.opencontainers.image.version"),
314 ("image_revision", "org.opencontainers.image.revision"),
315 ("ref_name", "org.opencontainers.image.ref.name"),
316 ("title", "org.opencontainers.image.title"),
317 ("vendor", "org.opencontainers.image.vendor"),
318 ("source", "org.opencontainers.image.source"),
319 ]:
320 results.update(_select_annotation(either_name, either_label, local_info, registry_info))
321 if ( 321 ↛ 327line 321 didn't jump to line 327 because the condition on line 321 was never true
322 results.get("ref_name") == "ubuntu"
323 and local_info.name != "ubuntu"
324 and results.get("image_version")
325 and re.fullmatch(r"^2\d\.\d\d$", str(results["image_version"]))
326 ):
327 log.debug(
328 "Suppressing %s base %s version leaking into image version: %s",
329 local_info.name,
330 results["ref_name"],
331 results["image_version"],
332 )
333 del results["image_version"]
334 return results
337class DockerServiceDetails(DiscoveryInstallationDetail):
338 def __init__(
339 self,
340 container_name: str | None = None,
341 compose_path: str | None = None,
342 compose_version: str | None = None,
343 compose_service: str | None = None,
344 git_repo_path: str | None = None,
345 ) -> None:
346 self.container_name: str | None = container_name
347 self.compose_path: str | None = compose_path
348 self.compose_version: str | None = compose_version
349 self.compose_service: str | None = compose_service
350 self.git_repo_path: str | None = git_repo_path
351 self.git_local_timestamp: str | None = None
353 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]:
354 results: dict[str, str | list | dict | bool | int | None] = {
355 "container_name": self.container_name,
356 "compose_path": self.compose_path,
357 "compose_service": self.compose_service,
358 "compose_version": self.compose_version,
359 }
360 if self.git_local_timestamp:
361 results["git_local_timestamp"] = self.git_local_timestamp
362 if self.git_repo_path:
363 results["git_repo_path"] = self.git_repo_path
364 return results
367class LocalContainerInfo:
368 def build_image_info(self, container: Container) -> tuple[DockerImageInfo, DockerServiceDetails]:
369 """Image contents equiv to `docker inspect image <image_ref>`"""
370 # container image can be none if someone ran `docker rmi -f`
371 # so although this could be sourced from image, like `container.image.tags[0]`
372 # use the container ref instead, which survives monkeying about with images
373 image_ref: str = container.attrs.get("Config", {}).get("Image") or ""
374 image_digest = container.attrs.get("Image")
376 image_info: DockerImageInfo = DockerImageInfo(
377 image_ref,
378 image_digest=image_digest,
379 tags=container.image.tags if container and container.image else None,
380 annotations=container.image.labels if container.image else None,
381 attributes=container.image.attrs if container.image else None,
382 )
383 service_info: DockerServiceDetails = DockerServiceDetails(
384 container.name,
385 compose_path=container.labels.get("com.docker.compose.project.working_dir"),
386 compose_service=container.labels.get("com.docker.compose.service"),
387 compose_version=container.labels.get("com.docker.compose.version"),
388 )
390 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(image_info, None)
391 # capture container labels/annotations, not image ones
392 labels = labels or {}
393 if container.image and container.image.attrs: 393 ↛ 395line 393 didn't jump to line 395 because the condition on line 393 was always true
394 image_info.created = container.image.attrs.get("Created")
395 image_info.custom = labels
396 image_info.version = cast("str|None", labels.get("image_version"))
398 return image_info, service_info
401class PackageEnricher:
402 def __init__(self, docker_cfg: DockerConfig, packages: dict[str, PackageUpdateInfo] | None = None) -> None:
403 self.pkgs: dict[str, PackageUpdateInfo] = packages or {}
404 self.cfg: DockerConfig = docker_cfg
405 self.log: Any = structlog.get_logger().bind(integration="docker")
407 def initialize(self) -> None:
408 pass
410 def enrich(self, image_info: DockerImageInfo) -> PackageUpdateInfo | None:
411 def match(pkg: PackageUpdateInfo) -> bool:
412 if pkg is not None and pkg.docker is not None and pkg.docker.image_name is not None: 412 ↛ 418line 412 didn't jump to line 418 because the condition on line 412 was always true
413 image_names = docker_image_names(pkg.docker)
414 if image_info.untagged_ref is not None and image_info.untagged_ref in image_names:
415 return True
416 if image_info.ref is not None and image_info.ref in image_names:
417 return True
418 return False
420 if image_info.untagged_ref is not None and image_info.ref is not None: 420 ↛ 430line 420 didn't jump to line 430 because the condition on line 420 was always true
421 for pkg in self.pkgs.values():
422 if match(pkg):
423 self.log.debug(
424 "Found common package",
425 image_name=pkg.docker.image_name, # type: ignore [union-attr]
426 logo_url=pkg.logo_url,
427 relnotes_url=pkg.release_notes_url,
428 )
429 return pkg
430 return None
433class DefaultPackageEnricher(PackageEnricher):
434 def enrich(self, image_info: DockerImageInfo) -> PackageUpdateInfo | None:
435 self.log.debug("Default pkg info", image_name=image_info.untagged_ref, image_ref=image_info.ref)
436 return PackageUpdateInfo(
437 DockerPackageUpdateInfo(image_info.untagged_ref or image_info.ref, version_policy=VersionPolicy.AUTO),
438 logo_url=self.cfg.default_entity_picture_url,
439 release_notes_url=None,
440 )
443class CommonPackageEnricher(PackageEnricher):
444 def initialize(self) -> None:
445 base_cfg: DictConfig = OmegaConf.structured(CommonPackages)
446 if PKG_INFO_FILE.exists(): 446 ↛ 453line 446 didn't jump to line 453 because the condition on line 446 was always true
447 self.log.debug("Loading common package update info", path=PKG_INFO_FILE)
448 cfg: DictConfig = typing.cast("DictConfig", OmegaConf.merge(base_cfg, OmegaConf.load(PKG_INFO_FILE)))
450 OmegaConf.to_container(cfg, throw_on_missing=True)
451 OmegaConf.set_readonly(cfg, True)
452 else:
453 self.log.warn("No common package update info found", path=PKG_INFO_FILE)
454 cfg = base_cfg
455 try:
456 common_config: CommonPackages = typing.cast("CommonPackages", cfg)
457 # omegaconf broken-ness on optional fields and converting to backclasses
458 self.pkgs = common_config.common_packages
459 # self.pkgs: dict[str, PackageUpdateInfo] = {
460 # pkg: PackageUpdateInfo(**pkg_cfg) for pkg, pkg_cfg in cfg.common_packages.items() if pkg not in self.pkgs
461 # }
462 except (MissingMandatoryValue, ValidationError) as e:
463 self.log.serror("Configuration error %s", e, path=PKG_INFO_FILE.as_posix())
464 raise
467class LinuxServerIOPackageEnricher(PackageEnricher):
468 def initialize(self) -> None:
469 cfg: MetadataSourceConfig | None = self.cfg.discover_metadata.get("linuxserver.io")
470 if cfg is None or not cfg.enabled:
471 return
473 self.log.debug(f"Fetching linuxserver.io metadata from API, cache_ttl={cfg.cache_ttl}")
474 response: Response | None = fetch_url(
475 "https://api.linuxserver.io/api/v1/images?include_config=false&include_deprecated=false",
476 cache_ttl=cfg.cache_ttl,
477 )
478 if response and response.is_success:
479 api_data: Any = response.json()
480 repos: list = api_data.get("data", {}).get("repositories", {}).get("linuxserver", [])
481 else:
482 return
484 added = 0
485 for repo in repos:
486 image_name = repo.get("name")
487 if image_name and image_name not in self.pkgs: 487 ↛ 485line 487 didn't jump to line 485 because the condition on line 487 was always true
488 github_url: str | None = repo.get("github_url")
489 self.pkgs[image_name] = PackageUpdateInfo(
490 DockerPackageUpdateInfo(f"lscr.io/linuxserver/{image_name}"),
491 logo_url=repo.get("project_logo"),
492 release_notes_url=f"{github_url}/releases" if github_url else None,
493 )
494 added += 1
495 self.log.debug(f"Added {added} linuxserver.io package details")
498class SourceReleaseEnricher:
499 def __init__(self) -> None:
500 self.log: Any = structlog.get_logger().bind(integration="docker")
502 def enrich(
503 self, registry_info: DockerImageInfo, source_repo_url: str | None = None, notes_url: str | None = None
504 ) -> ReleaseDetail | None:
505 detail = ReleaseDetail(registry_info.name or UNKNOWN_NAME)
507 detail.notes_url = notes_url
508 detail.version = registry_info.annotations.get("org.opencontainers.image.version")
509 detail.revision = registry_info.annotations.get("org.opencontainers.image.revision")
510 # explicit source_repo_url overrides container, e.g. where container source is only the docker wrapper
511 detail.source_url = source_repo_url or registry_info.annotations.get("org.opencontainers.image.source")
513 if detail.source_url is None and registry_info is not None and registry_info.index_name is not None:
514 registry_config: RegistryInfo | None = REGISTRIES.get(registry_info.index_name)
515 repo_template: str | None = registry_config.repo_template if registry_config else None
516 if repo_template:
517 source_url = repo_template.format(image_name=registry_info.name)
518 if validate_url(source_url, cache_ttl=86400): 518 ↛ 522line 518 didn't jump to line 522 because the condition on line 518 was always true
519 detail.source_url = source_url
520 self.log.info("Implied source from registry: %s", detail.source_url)
522 if detail.source_url is None and detail.notes_url is None and detail.revision is None and detail.version is None:
523 return None
525 if detail.source_url and "#" in detail.source_url:
526 detail.source_repo_url = detail.source_url.split("#", 1)[0]
527 self.log.debug("Simplifying %s from %s", detail.source_repo_url, detail.source_url)
528 else:
529 detail.source_repo_url = detail.source_url
531 detail.source_platform = id_source_platform(detail.source_repo_url)
532 if not detail.source_platform:
533 self.log.debug("No known source platform found on container", source=detail.source_repo_url)
534 return detail
536 template_vars: dict[str, str | None] = {
537 "version": detail.version or MISSING_VAL,
538 "revision": detail.revision or MISSING_VAL,
539 "repo": detail.source_repo_url or MISSING_VAL,
540 "source": detail.source_url or MISSING_VAL,
541 }
543 try:
544 diff_url_template: str | None = DIFF_URL_TEMPLATES.get(detail.source_platform)
545 diff_url: str | None = diff_url_template.format(**template_vars) if diff_url_template else None
546 if diff_url and MISSING_VAL not in diff_url and validate_url(diff_url, cache_ttl=3600): 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true
547 detail.diff_url = diff_url
548 else:
549 diff_url = None
551 if detail.notes_url is None and detail.source_platform in RELEASE_URL_TEMPLATES:
552 platform_notes_url: str | None = RELEASE_URL_TEMPLATES[detail.source_platform].format(**template_vars)
553 if ( 553 ↛ 558line 553 didn't jump to line 558 because the condition on line 553 was never true
554 platform_notes_url
555 and MISSING_VAL not in platform_notes_url
556 and validate_url(platform_notes_url, cache_ttl=86400)
557 ):
558 self.log.debug("Setting default known release notes url: %s", platform_notes_url)
559 detail.notes_url = platform_notes_url
561 if detail.notes_url is None and detail.source_platform in UNKNOWN_RELEASE_URL_TEMPLATES:
562 platform_notes_url = UNKNOWN_RELEASE_URL_TEMPLATES[detail.source_platform].format(**template_vars)
563 if (
564 platform_notes_url
565 and MISSING_VAL not in platform_notes_url
566 and validate_url(platform_notes_url, cache_ttl=86400)
567 ):
568 self.log.debug("Setting default unknown release notes url: %s", platform_notes_url)
569 detail.notes_url = platform_notes_url
570 except Exception as e:
571 self.log.error("Failed formatting enriched URLs with %s: %s", template_vars, e)
573 return detail
576class AuthError(Exception):
577 pass
580class VersionLookup:
581 def __init__(self) -> None:
582 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup")
584 @abstractmethod
585 def lookup(self, local_image_info: DockerImageInfo, **kwargs) -> DockerImageInfo: # noqa: ANN003
586 pass
589class ContainerDistributionAPIVersionLookup(VersionLookup):
590 def __init__(self, throttler: Throttler, cfg: RegistryConfig) -> None:
591 self.throttler: Throttler = throttler
592 self.cfg: RegistryConfig = cfg
593 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup")
594 self.api_stats = APIStatsCounter()
596 def fetch_token(self, registry: str, image_name: str) -> str | None:
597 default_info = RegistryInfo(registry, registry, registry, TOKEN_URL_TEMPLATE, None)
598 registry_info_: RegistryInfo = REGISTRIES.get(registry, default_info)
599 auth_host: str | None = registry_info_.auth_host
600 if auth_host is None:
601 return None
603 service: str = registry_info_.service
604 url_template: str | None = registry_info_.url_template
605 auth_url: str | None = (
606 url_template.format(auth_host=auth_host, image_name=image_name, service=service) if url_template else None
607 )
608 if auth_url is None: 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true
609 return None
610 response: Response | None = fetch_url(
611 auth_url, cache_ttl=self.cfg.token_cache_ttl, follow_redirects=True, api_stats_counter=self.api_stats
612 )
614 if response and response.is_success:
615 api_data = httpx_json_content(response, {})
616 token: str | None = api_data.get("token") if api_data else None
617 if token: 617 ↛ 619line 617 didn't jump to line 619 because the condition on line 617 was always true
618 return token
619 self.log.warning("No token found in response for %s", auth_url)
620 raise AuthError(f"No token found in response for {image_name}")
622 self.log.debug(
623 "Non-success response at %s fetching token: %s",
624 auth_url,
625 (response and response.status_code) or None,
626 )
627 if response and response.status_code == 404:
628 self.log.debug(
629 "Default token URL %s not found, calling /v2 endpoint to validate OCI API and provoke auth", auth_url
630 )
631 response = fetch_url(
632 f"https://{auth_host}/v2",
633 follow_redirects=True,
634 allow_stale=False,
635 cache_ttl=0,
636 api_stats_counter=self.api_stats,
637 )
639 if response and response.status_code == 401: 639 ↛ 660line 639 didn't jump to line 660 because the condition on line 639 was always true
640 auth = response.headers.get("www-authenticate")
641 if not auth:
642 self.log.warning("No www-authenticate header found in 401 response for %s", auth_url)
643 raise AuthError(f"No www-authenticate header found on 401 for {image_name}")
644 match = re.search(r'realm="([^"]+)",service="([^"]+)",scope="([^"]+)"', auth)
645 if not match: 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true
646 self.log.warning("No realm/service/scope found in www-authenticate header for %s", auth_url)
647 raise AuthError(f"No realm/service/scope found on 401 headers for {image_name}")
649 realm, service, scope = match.groups()
650 auth_url = f"{realm}?service={service}&scope={scope}"
651 response = fetch_url(auth_url, follow_redirects=True, api_stats_counter=self.api_stats)
653 if response and response.is_success: 653 ↛ 657line 653 didn't jump to line 657 because the condition on line 653 was always true
654 token_data = response.json()
655 self.log.debug("Fetched registry token from %s", auth_url)
656 return token_data.get("token")
657 self.log.warning(
658 "Alternative auth %s with status %s has no token", auth_url, (response and response.status_code) or None
659 )
660 elif response:
661 self.log.warning("Auth %s failed with status %s", auth_url, (response and response.status_code) or None)
663 raise AuthError(f"Failed to fetch token for {image_name} at {auth_url}")
665 def fetch_index(
666 self, api_host: str, local_image_info: DockerImageInfo, token: str | None
667 ) -> tuple[Any | None, str | None, CacheMetadata | None]:
668 if local_image_info.tag: 668 ↛ 672line 668 didn't jump to line 672 because the condition on line 668 was always true
669 api_url: str = f"https://{api_host}/v2/{local_image_info.name}/manifests/{local_image_info.tag}"
670 cache_ttl: int | None = self.cfg.mutable_cache_ttl
671 else:
672 api_url = f"https://{api_host}/v2/{local_image_info.name}/manifests/{local_image_info.pinned_digest}"
673 cache_ttl = self.cfg.immutable_cache_ttl
675 response: Response | None = fetch_url(
676 api_url,
677 cache_ttl=cache_ttl,
678 bearer_token=token,
679 response_type=[
680 "application/vnd.oci.image.index.v1+json",
681 "application/vnd.docker.distribution.manifest.list.v2+json",
682 ],
683 api_stats_counter=self.api_stats,
684 )
686 if response is None: 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true
687 self.log.warning("Empty response for manifest for image at %s", api_url)
688 elif response.status_code == 429: 688 ↛ 689line 688 didn't jump to line 689 because the condition on line 688 was never true
689 self.throttler.throttle(local_image_info.index_name, raise_exception=True)
690 elif not response.is_success:
691 api_data = httpx_json_content(response, {})
692 self.log.warning(
693 "Failed to fetch index from %s: %s",
694 api_url,
695 api_data.get("errors") if api_data else response.text,
696 )
697 else:
698 index = response.json()
699 self.log.debug(
700 "INDEX %s manifests, %s annotations, api: %s, header digest: %s",
701 len(index.get("manifests", [])),
702 len(index.get("annotations", [])),
703 response.headers.get(HEADER_DOCKER_API, "N/A"),
704 response.headers.get(HEADER_DOCKER_DIGEST, "N/A"),
705 )
706 return index, response.headers.get(HEADER_DOCKER_DIGEST), CacheMetadata(response)
707 return None, None, None
709 def fetch_object(
710 self,
711 api_host: str,
712 local_image_info: DockerImageInfo,
713 media_type: str,
714 digest: str,
715 token: str | None,
716 follow_redirects: bool = False,
717 api_type: str = "manifests",
718 ) -> tuple[Any | None, CacheMetadata | None]:
719 api_url = f"https://{api_host}/v2/{local_image_info.name}/{api_type}/{digest}"
720 response = fetch_url(
721 api_url,
722 cache_ttl=self.cfg.immutable_cache_ttl,
723 bearer_token=token,
724 response_type=media_type,
725 allow_stale=True,
726 follow_redirects=follow_redirects,
727 api_stats_counter=self.api_stats,
728 )
730 if response and response.is_success:
731 obj = httpx_json_content(response, None)
732 if obj: 732 ↛ 759line 732 didn't jump to line 759 because the condition on line 732 was always true
733 self.log.debug(
734 "%s, header digest:%s, api: %s, %s annotations",
735 api_type.upper(),
736 response.headers.get(HEADER_DOCKER_DIGEST, "N/A"),
737 response.headers.get(HEADER_DOCKER_API, "N/A"),
738 len(obj.get("annotations", [])),
739 )
740 return obj, CacheMetadata(response)
741 elif response and response.status_code == 429: 741 ↛ 742line 741 didn't jump to line 742 because the condition on line 741 was never true
742 self.throttler.throttle(local_image_info.index_name, raise_exception=True)
743 elif response and not response.is_success: 743 ↛ 758line 743 didn't jump to line 758 because the condition on line 743 was always true
744 api_data = httpx_json_content(response, {})
745 if response: 745 ↛ 753line 745 didn't jump to line 753 because the condition on line 745 was always true
746 self.log.warning(
747 "Failed to fetch obj from %s: %s %s",
748 api_url,
749 response.status_code,
750 api_data.get("errors") if api_data else response.text,
751 )
752 else:
753 self.log.warning(
754 "Failed to fetch obj from %s: No Response, %s", api_url, api_data.get("errors") if api_data else None
755 )
757 else:
758 self.log.error("Empty response from %s", api_url)
759 return None, None
761 def lookup(
762 self,
763 local_image_info: DockerImageInfo,
764 token: str | None = None,
765 minimal: bool = False,
766 **kwargs, # noqa: ANN003, ARG002
767 ) -> DockerImageInfo:
768 result: DockerImageInfo = DockerImageInfo(local_image_info.ref)
769 if not local_image_info.name or not local_image_info.index_name: 769 ↛ 770line 769 didn't jump to line 770 because the condition on line 769 was never true
770 self.log.debug("No local pkg name or registry index name to check")
771 return result
773 if self.throttler.check_throttle(local_image_info.index_name): 773 ↛ 774line 773 didn't jump to line 774 because the condition on line 773 was never true
774 result.throttled = True
775 return result
777 if token: 777 ↛ 778line 777 didn't jump to line 778 because the condition on line 777 was never true
778 self.log.debug("Using provided token to fetch manifest for image %s", local_image_info.ref)
779 else:
780 try:
781 token = self.fetch_token(local_image_info.index_name, local_image_info.name)
782 except AuthError as e:
783 self.log.warning("Authentication error prevented Docker Registry enrichment: %s", e)
784 result.error = str(e)
785 return result
787 index: Any | None = None
788 index_digest: str | None = None # fetched from header, should be the image digest
789 index_cache_metadata: CacheMetadata | None = None
790 manifest_cache_metadata: CacheMetadata | None = None
791 config_cache_metadata: CacheMetadata | None = None
792 idx = local_image_info.index_name
793 api_host: str | None = REGISTRIES.get(
794 idx,
795 RegistryInfo(idx, idx, idx, TOKEN_URL_TEMPLATE, None),
796 ).api_host
797 if api_host is None: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true
798 self.log("No API host can be determined for %s", local_image_info.index_name)
799 return result
800 try:
801 index, index_digest, index_cache_metadata = self.fetch_index(api_host, local_image_info, token)
802 except ThrottledError:
803 result.throttled = True
804 index = None
806 if index:
807 result.annotations = index.get("annotations", {})
808 for m in index.get("manifests", []):
809 try:
810 platform_info = m.get("platform", {})
811 except Exception as e:
812 self.log.warning("Failed analyzing manifest data: %s: %s", m, e)
813 continue
814 if (
815 platform_info.get("os") == local_image_info.os
816 and platform_info.get("architecture") == local_image_info.arch
817 and ("Variant" not in platform_info or platform_info.get("Variant") == local_image_info.variant)
818 ):
819 if index_digest:
820 result.image_digest = index_digest
821 result.short_digest = result.condense_digest(index_digest)
822 self.log.debug("Setting %s image digest %s", result.name, result.short_digest)
824 digest: str | None = m.get("digest")
825 media_type = m.get("mediaType")
826 manifest: Any | None = None
828 if digest: 828 ↛ 836line 828 didn't jump to line 836 because the condition on line 828 was always true
829 try:
830 manifest, manifest_cache_metadata = self.fetch_object(
831 api_host, local_image_info, media_type, digest, token
832 )
833 except ThrottledError:
834 result.throttled = True
836 if manifest:
837 manifest_config: dict[str, Any] = manifest.get("config", {})
838 digest = manifest_config.get("digest")
839 if digest is None: 839 ↛ 840line 839 didn't jump to line 840 because the condition on line 839 was never true
840 self.log.warning("Empty digest for %s %s %s", api_host, digest, media_type)
841 else:
842 result.repo_digest = result.condense_digest(digest, short=False)
843 self.log.debug("Setting %s repo digest: %s", result.name, result.repo_digest)
845 if manifest.get("annotations"): 845 ↛ 848line 845 didn't jump to line 848 because the condition on line 845 was always true
846 result.annotations.update(manifest.get("annotations", {}))
847 else:
848 self.log.debug("No annotations found in manifest: %s", manifest)
850 if (
851 not minimal
852 and manifest_config
853 and manifest_config.get("mediaType")
854 and manifest_config.get("digest")
855 ):
856 try:
857 img_config, config_cache_metadata = self.fetch_object(
858 api_host=api_host,
859 local_image_info=local_image_info,
860 media_type=manifest_config["mediaType"],
861 digest=manifest_config["digest"],
862 token=token,
863 follow_redirects=True,
864 api_type="blobs",
865 )
866 if img_config: 866 ↛ 876line 866 didn't jump to line 876 because the condition on line 866 was always true
867 config = img_config.get("config") or img_config.get("Config")
868 try:
869 if config and "Labels" in config: 869 ↛ 871line 869 didn't jump to line 871 because the condition on line 869 was always true
870 result.annotations.update(config.get("Labels") or {})
871 result.annotations.update(img_config.get("annotations") or {})
872 except Exception as e:
873 self.log.warning("Failure handling labels/annotations %s: %s", config, e)
874 result.created = config.get("created") or config.get("Created")
875 else:
876 self.log.debug("No config found: %s", manifest)
877 except Exception as e:
878 self.log.warning("Failed to extract %s image info from config: %s", local_image_info.ref, e)
880 if not result.annotations:
881 self.log.debug("No annotations found from registry data")
883 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(local_image_info, result)
884 result.custom = labels or {}
885 if index_cache_metadata:
886 result.custom["index_cache_age"] = index_cache_metadata.age
887 if manifest_cache_metadata:
888 result.custom["manifest_cache_age"] = manifest_cache_metadata.age
889 if config_cache_metadata:
890 result.custom["config_cache_age"] = config_cache_metadata.age
891 result.version = cast("str|None", labels.get("image_version"))
892 result.origin = "OCI_V2" if not minimal else "OCI_V2_MINIMAL"
894 self.log.debug(
895 "OCI_V2 Lookup for %s: short_digest:%s, repo_digest:%s, version: %s",
896 local_image_info.name,
897 result.short_digest,
898 result.repo_digest,
899 result.version,
900 )
901 return result
904class DockerClientVersionLookup(VersionLookup):
905 """Query remote registry via local Docker API
907 No auth needed, however uses the old v1 APIs, and only Index available via API
908 """
910 def __init__(self, client: docker.DockerClient, throttler: Throttler, cfg: RegistryConfig, api_backoff: int = 30) -> None:
911 self.client: docker.DockerClient = client
912 self.throttler: Throttler = throttler
913 self.cfg: RegistryConfig = cfg
914 self.api_backoff: int = api_backoff
915 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup")
917 def lookup(self, local_image_info: DockerImageInfo, retries: int = 3, **kwargs) -> DockerImageInfo: # noqa: ANN003, ARG002
918 retries_left = retries
919 retry_secs: int = self.api_backoff
920 reg_data: RegistryData | None = None
922 result = DockerImageInfo(local_image_info.ref)
923 if local_image_info.index_name is None or local_image_info.ref is None: 923 ↛ 924line 923 didn't jump to line 924 because the condition on line 923 was never true
924 return result
926 while reg_data is None and retries_left > 0:
927 if self.throttler.check_throttle(local_image_info.index_name):
928 result.throttled = True
929 break
930 try:
931 self.log.debug("Fetching registry data", image_ref=local_image_info.ref)
932 reg_data = self.client.images.get_registry_data(local_image_info.ref)
933 self.log.debug(
934 "Registry Data: id:%s,image:%s, attrs:%s",
935 reg_data.id,
936 reg_data.image_name,
937 reg_data.attrs,
938 )
939 if reg_data: 939 ↛ 926line 939 didn't jump to line 926 because the condition on line 939 was always true
940 result.short_digest = result.condense_digest(reg_data.short_id)
941 result.image_digest = result.condense_digest(reg_data.id, short=False)
942 # result.name = reg_data.image_name
943 result.attributes = reg_data.attrs
944 result.annotations = reg_data.attrs.get("Config", {}).get("Labels") or {}
945 result.error = None
947 except docker.errors.APIError as e:
948 if e.status_code == HTTPStatus.TOO_MANY_REQUESTS: 948 ↛ 957line 948 didn't jump to line 957 because the condition on line 948 was always true
949 retry_secs = round(retry_secs**1.5)
950 try:
951 retry_secs = int(e.response.headers.get("Retry-After", -1)) # type: ignore[union-attr]
952 except Exception as e2:
953 self.log.debug("Failed to access headers for retry info: %s", e2)
954 self.throttler.throttle(local_image_info.index_name, retry_secs, e.explanation)
955 result.throttled = True
956 return result
957 result.error = str(e)
958 retries_left -= 1
959 if retries_left == 0 or e.is_client_error():
960 self.log.warn("Failed to fetch registry data: [%s] %s", e.errno, e.explanation)
961 else:
962 self.log.debug("Failed to fetch registry data, retrying: %s", e)
964 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(local_image_info, result)
965 result.custom = labels or {}
966 result.version = cast("str|None", labels.get("image_version"))
967 result.created = cast("str|None", labels.get("image_created"))
968 result.origin = "DOCKER_CLIENT"
969 return result