Coverage for src/updates2mqtt/integrations/docker.py: 77%
416 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-14 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-14 15:07 +0000
1import random
2import re
3import subprocess
4import time
5import typing
6from collections.abc import AsyncGenerator, Callable
7from enum import Enum
8from pathlib import Path
9from threading import Event, Timer
10from typing import Any, cast
12import docker
13import docker.errors
14import structlog
15from docker.models.containers import Container
17from updates2mqtt.config import (
18 SEMVER_RE,
19 SOURCE_PLATFORM_GITHUB,
20 UNKNOWN_VERSION,
21 VERSION_RE,
22 DockerConfig,
23 GitHubConfig,
24 NodeConfig,
25 PackageUpdateInfo,
26 PublishPolicy,
27 RegistryAPI,
28 UpdatePolicy,
29 VersionPolicy,
30)
31from updates2mqtt.helpers import Selection, Throttler
32from updates2mqtt.integrations.docker_enrich import (
33 CommonPackageEnricher,
34 ContainerDistributionAPIVersionLookup,
35 DefaultPackageEnricher,
36 DockerClientVersionLookup,
37 DockerImageInfo,
38 DockerServiceDetails,
39 LinuxServerIOPackageEnricher,
40 LocalContainerInfo,
41 PackageEnricher,
42 SourceReleaseEnricher,
43)
44from updates2mqtt.integrations.github_enrich import GithubReleaseEnricher
45from updates2mqtt.model import Discovery, ReleaseDetail, ReleaseProvider
47from .git_utils import git_check_update_available, git_iso_timestamp, git_local_digest, git_pull, git_trust
49if typing.TYPE_CHECKING:
50 from docker.models.images import Image
52# distinguish docker build from docker pull?
54log = structlog.get_logger()
57class DockerComposeCommand(Enum):
58 BUILD = "build"
59 UP = "up"
62# Seconds to wait before recreating ourselves on self-bounce, giving the in-flight
63# update command time to finish and publish its final (non in-progress) state.
64SELF_BOUNCE_DELAY = 5
67def safe_json_dt(t: float | None) -> str | None:
68 return time.strftime("%Y-%m-%dT%H:%M:%S.0000", time.gmtime(t)) if t else None
71class ContainerCustomization:
72 """Local customization of a Docker container, by label or env var"""
74 label_prefix: str = "updates2mqtt."
75 env_prefix: str = "UPD2MQTT_"
77 def __init__(self, container: Container) -> None:
78 self.update: UpdatePolicy = UpdatePolicy.PASSIVE # was known as UPD2MQTT_UPDATE before policies and labels
79 self.git_repo_path: str | None = None
80 self.picture: str | None = None
81 self.relnotes: str | None = None
82 self.ignore: bool = False
83 self.version_policy: VersionPolicy | None = None
84 self.registry_token: str | None = None
86 if not container.attrs or container.attrs.get("Config") is None:
87 return
88 env_pairs: list[str] = container.attrs.get("Config", {}).get("Env")
89 if env_pairs:
90 c_env: dict[str, str] = dict(env.split("=", maxsplit=1) for env in env_pairs if "==" not in env)
91 else:
92 c_env = {}
94 for attr in dir(self):
95 if "__" not in attr:
96 label = f"{self.label_prefix}{attr.lower()}"
97 env_var = f"{self.env_prefix}{attr.upper()}"
98 v: Any = None
99 if label in container.labels:
100 # precedence to labels
101 v = container.labels.get(label)
102 log.debug(
103 "%s set from label %s=%s",
104 attr,
105 label,
106 v,
107 integration="docker",
108 container=container.name,
109 action="customize",
110 )
111 elif env_var in c_env:
112 v = c_env[env_var]
113 log.debug(
114 "%s set from env var %s=%s",
115 attr,
116 env_var,
117 v,
118 integration="docker",
119 container=container.name,
120 action="customize",
121 )
122 if v is not None:
123 if isinstance(getattr(self, attr), bool):
124 setattr(self, attr, v.upper() in ("TRUE", "YES", "1"))
125 elif isinstance(getattr(self, attr), VersionPolicy): 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true
126 setattr(self, attr, VersionPolicy[v.upper()])
127 elif isinstance(getattr(self, attr), UpdatePolicy):
128 setattr(self, attr, UpdatePolicy[v.upper()])
129 else:
130 setattr(self, attr, v)
133class DockerProvider(ReleaseProvider):
134 def __init__(
135 self,
136 cfg: DockerConfig,
137 node_cfg: NodeConfig,
138 packages: dict[str, PackageUpdateInfo] | None = None,
139 github_cfg: GitHubConfig | None = None,
140 self_bounce: Event | None = None,
141 ) -> None:
142 super().__init__(node_cfg, "docker")
143 self.client: docker.DockerClient = docker.from_env()
144 self.cfg: DockerConfig = cfg
146 # TODO: refresh discovered packages periodically
147 self.throttler = Throttler(self.cfg.default_api_backoff, self.log, self.stopped)
148 self.self_bounce: Event | None = self_bounce
150 self.pkg_enrichers: list[PackageEnricher] = [
151 CommonPackageEnricher(self.cfg, packages),
152 LinuxServerIOPackageEnricher(self.cfg),
153 DefaultPackageEnricher(self.cfg),
154 ]
155 self.docker_client_image_lookup = DockerClientVersionLookup(
156 self.client, self.throttler, self.cfg.registry, self.cfg.default_api_backoff
157 )
158 self.registry_image_lookup = ContainerDistributionAPIVersionLookup(self.throttler, self.cfg.registry)
159 self.release_enricher = SourceReleaseEnricher()
160 if github_cfg: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 self.github_enricher: GithubReleaseEnricher | None = GithubReleaseEnricher(github_cfg)
162 else:
163 self.github_enricher = None
164 self.local_info_builder = LocalContainerInfo()
166 def initialize(self) -> None:
167 for enricher in self.pkg_enrichers:
168 enricher.initialize()
169 self.log.debug("Docker provider initialized")
171 def update(self, discovery: Discovery) -> bool:
172 logger: Any = self.log.bind(container=discovery.name, action="update")
173 if discovery.update_last_attempt:
174 logger.info("Updating - last at %s", discovery.update_last_attempt)
175 else:
176 logger.info("Updating - first this session")
177 discovery.update_last_attempt = time.time()
178 self.fetch(discovery)
179 restarted = self.restart(discovery)
180 logger.info("Updated - recorded at %s", discovery.update_last_attempt)
181 return restarted
183 def fetch(self, discovery: Discovery) -> None:
184 logger = self.log.bind(container=discovery.name, action="fetch")
185 installed_info: DockerImageInfo | None = cast("DockerImageInfo|None", discovery.current_detail)
186 service_info: DockerServiceDetails | None = cast("DockerServiceDetails|None", discovery.installation_detail)
188 image_ref: str | None = installed_info.ref if installed_info else None
189 platform: str | None = installed_info.platform if installed_info else None
190 if discovery.can_pull and image_ref:
191 logger.info("Pulling", image_ref=image_ref, platform=platform)
192 image: Image = self.client.images.pull(image_ref, platform=platform, all_tags=False)
193 if image: 193 ↛ 196line 193 didn't jump to line 196 because the condition on line 193 was always true
194 logger.info("Pulled", image_id=image.id, image_ref=image_ref, platform=platform)
195 else:
196 logger.warn("Unable to pull", image_ref=image_ref, platform=platform)
197 elif discovery.can_build and service_info:
198 compose_path: str | None = service_info.compose_path
199 git_repo_path: str | None = service_info.git_repo_path
200 logger.debug("can_build check", git_repo=git_repo_path)
201 if not compose_path or not git_repo_path:
202 logger.warn("No compose path or git repo path configured, skipped build")
203 return
205 full_repo_path: Path = self.full_repo_path(compose_path, git_repo_path)
206 if git_pull(full_repo_path, Path(self.node_cfg.git_path)):
207 self.build(discovery)
208 else:
209 logger.debug("Skipping git_pull, no update")
211 def full_repo_path(self, compose_path: str, git_repo_path: str) -> Path:
212 if compose_path is None or git_repo_path is None: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 raise ValueError("Unexpected null paths")
214 if compose_path and not Path(git_repo_path).is_absolute(): 214 ↛ 216line 214 didn't jump to line 216 because the condition on line 214 was always true
215 return Path(compose_path) / git_repo_path
216 return Path(git_repo_path)
218 def build(self, discovery: Discovery) -> bool:
219 logger = self.log.bind(container=discovery.name, action="build")
220 service_info: DockerServiceDetails | None = cast("DockerServiceDetails|None", discovery.installation_detail)
222 if not service_info or not service_info.compose_path: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true
223 logger.warn("No service_info available on compose")
224 return False
225 logger.info("Building", compose_path=service_info.compose_path, service=service_info.compose_service)
226 return self.execute_compose(
227 command=DockerComposeCommand.BUILD,
228 args="",
229 service=service_info.compose_service,
230 cwd=service_info.compose_path,
231 logger=logger,
232 )
234 def execute_compose(
235 self, command: DockerComposeCommand, args: str, service: str | None, cwd: str | None, logger: structlog.BoundLogger
236 ) -> bool:
237 if not cwd or not Path(cwd).is_dir(): 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true
238 logger.warn("Invalid compose path, skipped %s", command)
239 return False
241 cmd: str = "docker-compose" if self.cfg.compose_version == "v1" else "docker compose"
242 logger.info(f"Executing {cmd} {command} {args} {service}")
243 cmd = cmd + " " + command.value
244 if args: 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true
245 cmd = cmd + " " + args
246 if service: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 cmd = cmd + " " + service
249 proc: subprocess.CompletedProcess[str] = subprocess.run(cmd, check=False, shell=True, cwd=cwd, text=True)
250 if proc.returncode == 0:
251 logger.info(f"{command} via compose successful")
252 return True
253 if proc.stderr and "unknown command: docker compose" in proc.stderr: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true
254 logger.warning("docker compose set to wrong version, seems like v1 installed")
255 self.cfg.compose_version = "v1"
256 logger.warn(
257 f"{command} failed: %s",
258 proc.returncode,
259 )
260 return False
262 def restart(self, discovery: Discovery) -> bool:
263 logger = self.log.bind(container=discovery.name, action="restart")
264 installed_info: DockerImageInfo | None = cast("DockerImageInfo|None", discovery.current_detail)
265 service_info: DockerServiceDetails | None = cast("DockerServiceDetails|None", discovery.installation_detail)
267 if service_info is None:
268 return False
270 is_self_bounce: bool = bool(
271 self.self_bounce is not None
272 and installed_info
273 and (
274 "ghcr.io/rhizomatics/updates2mqtt" in installed_info.ref
275 or (service_info.git_repo_path and service_info.git_repo_path.endswith("updates2mqtt"))
276 )
277 )
278 if is_self_bounce and self.self_bounce is not None:
279 logger.warning("Attempting to self-bounce, deferring recreation by %ss", SELF_BOUNCE_DELAY)
280 self.self_bounce.set()
281 # Defer the recreate so that the in-flight command handling can finish and publish
282 # the final (non in-progress) state before this container is stopped/replaced.
283 Timer(
284 SELF_BOUNCE_DELAY,
285 self.execute_compose,
286 kwargs={
287 "command": DockerComposeCommand.UP,
288 "args": "--detach --yes",
289 "service": service_info.compose_service,
290 "cwd": service_info.compose_path,
291 "logger": logger,
292 },
293 ).start()
294 return True
296 return self.execute_compose(
297 command=DockerComposeCommand.UP,
298 args="--detach --yes",
299 service=service_info.compose_service,
300 cwd=service_info.compose_path,
301 logger=logger,
302 )
304 def rescan(self, discovery: Discovery) -> Discovery | None:
305 logger: Any = self.log.bind(container=discovery.name, action="rescan")
306 try:
307 c: Container = self.client.containers.get(discovery.name)
308 if c: 308 ↛ 313line 308 didn't jump to line 313 because the condition on line 308 was always true
309 rediscovery: Discovery | None = self.analyze(c, discovery.session, previous_discovery=discovery)
310 if rediscovery and not rediscovery.throttled: 310 ↛ 313line 310 didn't jump to line 313 because the condition on line 310 was always true
311 self.discoveries[rediscovery.name] = rediscovery
312 return rediscovery
313 logger.warn("Unable to find container for rescan")
314 except docker.errors.NotFound:
315 logger.warn("Container not found in Docker")
316 except docker.errors.APIError:
317 logger.exception("Docker API error retrieving container")
318 return None
320 def analyze(self, c: Container, session: str, previous_discovery: Discovery | None = None) -> Discovery | None:
321 logger = self.log.bind(container=c.name, action="analyze")
323 if c.attrs is None or not c.attrs: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true
324 logger.warn("No container attributes found, discovery rejected")
325 return None
326 if c.name is None: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true
327 logger.warn("No container name found, discovery rejected")
328 return None
330 customization: ContainerCustomization = ContainerCustomization(c)
331 if customization.ignore: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 logger.info("Container ignored due to UPD2MQTT_IGNORE setting")
333 return None
335 if customization.update == UpdatePolicy.AUTO: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true
336 logger.debug("Auto update policy detected")
337 update_policy: UpdatePolicy = customization.update or UpdatePolicy.PASSIVE
339 local_info: DockerImageInfo
340 service_info: DockerServiceDetails
341 local_info, service_info = self.local_info_builder.build_image_info(c)
342 pkg_info: PackageUpdateInfo = self.default_metadata(local_info)
344 version_policy: VersionPolicy
345 if customization.version_policy: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true
346 logger.debug("Overriding version_policy to local customization: %s", customization.version_policy)
347 version_policy = customization.version_policy
348 else:
349 if self.cfg.version_policy == VersionPolicy.AUTO and pkg_info.docker: 349 ↛ 355line 349 didn't jump to line 355 because the condition on line 349 was always true
350 logger.debug(
351 "Version policy, pkg level %s, config level: %s", pkg_info.docker.version_policy, self.cfg.version_policy
352 )
353 version_policy = pkg_info.docker.version_policy or self.cfg.version_policy
354 else:
355 logger.debug("Version policy, fixed config level: %s", self.cfg.version_policy)
356 version_policy = self.cfg.version_policy
358 try:
359 service_info.git_repo_path = customization.git_repo_path
361 registry_selection = Selection(self.cfg.registry_select, local_info.index_name)
362 latest_info: DockerImageInfo
363 if local_info.pinned: 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true
364 logger.debug("Skipping registry fetch for local pinned image, %s", local_info.ref)
365 latest_info = local_info.reuse()
366 elif registry_selection and local_info.ref and not local_info.local_build:
367 if self.cfg.registry.api == RegistryAPI.DOCKER_CLIENT:
368 latest_info = self.docker_client_image_lookup.lookup(local_info)
369 elif self.cfg.registry.api == RegistryAPI.OCI_V2: 369 ↛ 371line 369 didn't jump to line 371 because the condition on line 369 was always true
370 latest_info = self.registry_image_lookup.lookup(local_info, token=customization.registry_token)
371 elif self.cfg.registry.api == RegistryAPI.OCI_V2_MINIMAL:
372 latest_info = self.registry_image_lookup.lookup(
373 local_info, token=customization.registry_token, minimal=True
374 )
375 else: # assuming RegistryAPI.DISABLED
376 logger.debug(f"Skipping registry check, disabled in config {self.cfg.registry.api}")
377 latest_info = local_info.reuse()
378 elif local_info.local_build: 378 ↛ 384line 378 didn't jump to line 384 because the condition on line 378 was always true
379 # assume its a locally built image if no RepoDigests available
380 latest_info = local_info.reuse()
381 latest_info.short_digest = None
382 latest_info.image_digest = None
383 else:
384 logger.debug("Registry selection rules suppressed metadata lookup")
385 latest_info = local_info.reuse()
387 release_info: ReleaseDetail | None = self.release_enricher.enrich(
388 latest_info,
389 source_repo_url=pkg_info.source_repo_url,
390 notes_url=customization.relnotes or pkg_info.release_notes_url,
391 )
392 logger.debug("Enriched release info: %s", release_info)
394 if latest_info.image_digest and release_info:
395 if self.github_enricher and release_info.source_platform == SOURCE_PLATFORM_GITHUB: 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true
396 self.github_enricher.enrich(latest_info, release_info)
397 else:
398 self.log.debug("Not a github release or no github configured")
400 if service_info.git_repo_path and service_info.compose_path:
401 full_repo_path: Path = Path(service_info.compose_path).joinpath(service_info.git_repo_path)
403 git_trust(full_repo_path, Path(self.node_cfg.git_path))
404 service_info.git_local_timestamp = git_iso_timestamp(full_repo_path, Path(self.node_cfg.git_path))
406 can_pull: bool = (
407 self.cfg.allow_pull
408 and not local_info.local_build
409 and local_info.ref is not None
410 and local_info.ref != ""
411 and (local_info.short_digest is not None or latest_info.short_digest is not None)
412 )
413 if self.cfg.allow_pull and not can_pull:
414 logger.debug(
415 f"Pull unavailable, ref:{local_info.ref},local:{local_info.short_digest},latest:{latest_info.short_digest}"
416 )
418 can_build: bool = False
419 if self.cfg.allow_build: 419 ↛ 444line 419 didn't jump to line 444 because the condition on line 419 was always true
420 can_build = service_info.git_repo_path is not None and service_info.compose_path is not None
421 if not can_build:
422 if service_info.git_repo_path is not None: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true
423 logger.debug(
424 "Local build ignored for git_repo_path=%s because no compose_path", service_info.git_repo_path
425 )
426 else:
427 full_repo_path = self.full_repo_path(
428 cast("str", service_info.compose_path), cast("str", service_info.git_repo_path)
429 )
430 if local_info.local_build and full_repo_path: 430 ↛ 444line 430 didn't jump to line 444 because the condition on line 430 was always true
431 git_versionish = git_local_digest(full_repo_path, Path(self.node_cfg.git_path))
432 if git_versionish:
433 local_info.git_digest = git_versionish
434 logger.debug("Git digest for local code %s", git_versionish)
436 behind_count: int = git_check_update_available(full_repo_path, Path(self.node_cfg.git_path))
437 if behind_count > 0:
438 latest_info.git_digest = f"{git_versionish}+{behind_count}"
439 logger.info("Git update available, generating version %s", latest_info.git_digest)
440 else:
441 logger.debug(f"Git update not available, local repo:{full_repo_path}")
442 latest_info.git_digest = git_versionish
444 can_restart: bool = self.cfg.allow_restart and service_info.compose_path is not None
446 if can_pull:
447 update_type = "Docker Image"
448 elif can_build: 448 ↛ 451line 448 didn't jump to line 451 because the condition on line 448 was always true
449 update_type = "Docker Build"
450 else:
451 update_type = "Unavailable"
453 # can_pull,can_build etc are only info flags
454 # the HASS update process is driven by comparing current and available versions
456 public_installed_version: str
457 public_latest_version: str
458 version_basis: str
459 public_installed_version, public_latest_version, version_basis = select_versions(
460 version_policy, local_info, latest_info
461 )
463 publish_policy: PublishPolicy = PublishPolicy.HOMEASSISTANT
464 img_ref_selection = Selection(self.cfg.image_ref_select, local_info.ref)
465 version_selection = Selection(self.cfg.version_select, latest_info.version)
466 if not img_ref_selection or not version_selection: 466 ↛ 467line 466 didn't jump to line 467 because the condition on line 466 was never true
467 self.log.info(
468 "Excluding from HA Discovery for include/exclude rule: %s, %s", local_info.ref, latest_info.version
469 )
470 publish_policy = PublishPolicy.MQTT
472 discovery: Discovery = Discovery(
473 self,
474 c.name,
475 session,
476 node=self.node_cfg.name,
477 entity_picture_url=customization.picture or pkg_info.logo_url,
478 current_version=public_installed_version,
479 publish_policy=publish_policy,
480 update_policy=update_policy,
481 version_policy=version_policy,
482 version_basis=version_basis,
483 latest_version=public_latest_version,
484 device_icon=self.cfg.device_icon,
485 can_pull=can_pull,
486 update_type=update_type,
487 can_build=can_build,
488 can_restart=can_restart,
489 status=(c.status == "running" and "on") or "off",
490 throttled=latest_info.throttled,
491 previous=previous_discovery,
492 release_detail=release_info,
493 installation_detail=service_info,
494 current_detail=local_info,
495 latest_detail=latest_info,
496 )
497 logger.debug("Analyze generated discovery: %s", discovery)
498 return discovery
499 except Exception:
500 logger.exception("Docker Discovery Failure", container_attrs=c.attrs)
501 logger.debug("Analyze returned empty discovery")
502 return None
504 # def version(self, c: Container, version_type: str):
505 # metadata_version: str = c.labels.get("org.opencontainers.image.version")
506 # metadata_revision: str = c.labels.get("org.opencontainers.image.revision")
508 async def scan(self, session: str, shuffle: bool = True) -> AsyncGenerator[Discovery]:
509 logger = self.log.bind(session=session, action="scan", source=self.source_type)
510 containers: int = 0
511 results: int = 0
512 throttled: int = 0
514 targets: list[Container] = self.client.containers.list()
515 if shuffle: 515 ↛ 517line 515 didn't jump to line 517 because the condition on line 515 was always true
516 random.shuffle(targets)
517 logger.debug("Starting scanning %s containers", len(targets))
518 for c in targets:
519 logger.debug("Analyzing container", container=c.name)
520 if self.stopped.is_set(): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true
521 logger.info(f"Shutdown detected, aborting scan at {c}")
522 break
523 containers = containers + 1
524 result: Discovery | None = self.analyze(c, session)
525 if result: 525 ↛ 532line 525 didn't jump to line 532 because the condition on line 525 was always true
526 logger.debug("Analyzed container", result_name=result.name, throttled=result.throttled)
527 self.discoveries[result.name] = result
528 results = results + 1
529 throttled += 1 if result.throttled else 0
530 yield result
531 else:
532 logger.debug("No result from analysis", container=c.name)
533 logger.info("Completed", container_count=containers, throttled_count=throttled, result_count=results)
535 def command(self, discovery_name: str, command: str, on_update_start: Callable, on_update_end: Callable) -> bool:
536 logger = self.log.bind(container=discovery_name, action="command", command=command)
537 logger.info("Executing Command")
538 discovery: Discovery | None = None
539 updated: bool = False
540 in_progress: Discovery | None = None
541 try:
542 discovery = self.resolve(discovery_name)
543 if not discovery:
544 logger.warn("Unknown entity", entity=discovery_name)
545 elif command != "install":
546 logger.warn("Unknown command")
547 else:
548 if discovery.can_update:
549 rediscovery: Discovery | None = None
550 logger.info("Starting update ...")
551 on_update_start(discovery)
552 in_progress = discovery
553 if self.update(discovery):
554 logger.debug("Rescanning ...")
555 rediscovery = self.rescan(discovery)
556 updated = rediscovery is not None and not rediscovery.throttled
557 logger.info("Rescanned, updated:%s", updated)
558 else:
559 logger.info("Rescan with no result")
560 in_progress = rediscovery or discovery
561 on_update_end(in_progress)
562 in_progress = None
563 else:
564 logger.warning("Update not supported for this container")
565 except Exception:
566 logger.exception("Failed to handle", discovery_name=discovery_name, command=command)
567 finally:
568 if in_progress:
569 on_update_end(in_progress)
570 return updated
572 def resolve(self, discovery_name: str) -> Discovery | None:
573 return self.discoveries.get(discovery_name)
575 def default_metadata(self, image_info: DockerImageInfo) -> PackageUpdateInfo:
576 for enricher in self.pkg_enrichers: 576 ↛ 580line 576 didn't jump to line 580 because the loop on line 576 didn't complete
577 pkg_info: PackageUpdateInfo | None = enricher.enrich(image_info)
578 if pkg_info is not None:
579 return pkg_info
580 raise ValueError("No enricher could provide metadata, not even default enricher")
583def select_versions(version_policy: VersionPolicy, installed: DockerImageInfo, latest: DockerImageInfo) -> tuple[str, str, str]:
584 """Pick the best version string to display based on the version policy and available data
586 Ensures that both local installed and remote latest versions are derived in same way
587 Falls back to digest if version not reliable or not consistent with current/available version
588 """
589 phase: int = 0
590 shortcircuit: str | None = None
592 def basis(rule: str) -> str:
593 return f"{rule}-{phase}" if not shortcircuit else f"{rule}-{phase}-{shortcircuit}"
595 #
596 # Detect No Update Available
597 # --------------------------
598 #
599 # shortcircuit the logic if there's nothing to compare
600 #
601 if latest.throttled:
602 log.debug("Flattening versions for throttled update %s", installed.ref)
603 shortcircuit = "THR"
604 latest = installed
605 elif not any((latest.short_digest, latest.repo_digest, latest.git_digest, latest.version)):
606 log.debug("Flattening versions for empty update %s", installed.ref)
607 shortcircuit = "NUP"
608 latest = installed
609 elif latest.short_digest == installed.short_digest and latest.short_digest is not None:
610 log.debug("Flattening versions for identical update %s", installed.ref)
611 shortcircuit = "SDM"
612 latest = installed
613 elif installed.image_digest in latest.repo_digests: 613 ↛ 615line 613 didn't jump to line 615 because the condition on line 613 was never true
614 # TODO: avoid this by better adaptations for different registries and single/multi manifests
615 log.debug(
616 "Matching new repo_digest against installed image digest for %s image %s", installed.index_name, installed.name
617 )
618 shortcircuit = "FGA"
619 latest = installed
620 elif latest.image_digest in installed.repo_digests: 620 ↛ 622line 620 didn't jump to line 622 because the condition on line 620 was never true
621 # TODO: avoid this by better adaptations for different registries and single/multi manifests
622 log.debug(
623 "Matching new image_digest against installed repo digest for %s image %s", installed.index_name, installed.name
624 )
625 shortcircuit = "FGB"
626 latest = installed
628 #
629 # Explicit Policy Choice
630 # ----------------------
631 #
633 if version_policy == VersionPolicy.VERSION and installed.version and latest.version:
634 return installed.version, latest.version, basis("version")
636 installed_digest_available: bool = installed.short_digest is not None and installed.short_digest != ""
637 latest_digest_available: bool = latest.short_digest is not None and latest.short_digest != ""
638 matching_digest: bool = (
639 installed_digest_available and latest_digest_available and installed.short_digest == latest.short_digest
640 )
641 changed_digest: bool = (
642 installed_digest_available and latest_digest_available and installed.short_digest != latest.short_digest
643 )
645 if version_policy == VersionPolicy.DIGEST and installed_digest_available and latest_digest_available:
646 return installed.short_digest, latest.short_digest, basis("digest") # type: ignore[return-value]
647 if (
648 version_policy == VersionPolicy.VERSION_DIGEST
649 and installed.version
650 and latest.version
651 and installed_digest_available
652 and latest_digest_available
653 ):
654 return (
655 f"{installed.version}:{installed.short_digest}",
656 f"{latest.version}:{latest.short_digest}",
657 basis("version-digest"),
658 )
660 if (
661 version_policy == VersionPolicy.TIMESTAMP
662 and installed.created
663 and latest.created
664 and (
665 (latest.created > installed.created and changed_digest) or (latest.created == installed.created and matching_digest)
666 )
667 ):
668 return installed.created, latest.created, basis("timestamp")
670 #
671 # Auto Policy - Humane Versions
672 # -----------------------------
673 #
674 phase = 1
675 if (
676 version_policy == VersionPolicy.AUTO
677 and installed.version
678 and latest.version
679 and (
680 (installed.version == latest.version and matching_digest)
681 or (installed.version != latest.version and changed_digest)
682 )
683 ):
684 # detect semver, or v semver (e.g. v1.030)
685 # only use this if both version and digest are consistently agreeing or disagreeing
686 # if the strict conditions work, people see nice version numbers on screen rather than hashes
687 if re.fullmatch(SEMVER_RE, installed.version or "") and re.fullmatch(SEMVER_RE, latest.version or ""):
688 # Smells like semver, override if not using version_policy
689 return installed.version, latest.version, basis("semver")
690 if re.fullmatch(VERSION_RE, installed.version or "") and re.fullmatch(VERSION_RE, latest.version or ""): 690 ↛ 694line 690 didn't jump to line 694 because the condition on line 690 was always true
691 # Smells like casual semver, override if not using version_policy
692 return installed.version, latest.version, basis("casualver")
694 if (
695 version_policy == VersionPolicy.AUTO
696 and installed.tag
697 and latest.tag
698 and ((installed.tag == latest.tag and matching_digest) or (installed.tag != latest.tag and changed_digest))
699 ):
700 if re.fullmatch(SEMVER_RE, installed.tag) and re.fullmatch(SEMVER_RE, latest.tag):
701 return installed.tag, latest.tag, basis("semver-tag")
702 if re.fullmatch(SEMVER_RE, installed.tag) and re.fullmatch(SEMVER_RE, latest.tag): 702 ↛ 703line 702 didn't jump to line 703 because the condition on line 702 was never true
703 return installed.tag, latest.tag, basis("semver-tag")
705 #
706 # Local Builds
707 # ------------
708 #
709 phase = 2
710 if installed.git_digest and latest.git_digest:
711 return f"git:{installed.git_digest}", f"git:{latest.git_digest}", basis("git")
713 #
714 # Fall Back - Qualified Versions
715 # --------------------------------
716 #
717 phase = 3
718 if ( 718 ↛ 726line 718 didn't jump to line 726 because the condition on line 718 was never true
719 installed.version
720 and latest.version
721 and (
722 (installed.version == latest.version and matching_digest)
723 or (installed.version != latest.version and changed_digest)
724 )
725 ):
726 return (
727 f"{installed.version}:{installed.short_digest}",
728 f"{latest.version}:{latest.short_digest}",
729 basis("version-digest"),
730 )
732 #
733 # Fall Back - Timestamp, Digest, Version
734 # --------------------------------------
736 phase = 4
737 if (
738 installed.created
739 and latest.created
740 and (
741 (latest.created > installed.created and changed_digest) or (latest.created == installed.created and matching_digest)
742 )
743 ):
744 return installed.created, latest.created, basis("timestamp")
745 if installed_digest_available and latest_digest_available:
746 return installed.short_digest, latest.short_digest, basis("digest") # type: ignore[return-value]
747 if installed.version and not latest.version and not latest.short_digest and not latest.repo_digest: 747 ↛ 748line 747 didn't jump to line 748 because the condition on line 747 was never true
748 return installed.version, installed.version, basis("version")
750 #
751 # Fall Back - Missing Digests
752 # ---------------------------
753 phase = 5
754 if not installed_digest_available and latest_digest_available: 754 ↛ 756line 754 didn't jump to line 756 because the condition on line 754 was never true
755 # odd condition if local image has no identity, even out versions so no update alert
756 return latest.short_digest, latest.short_digest, basis("digest") # type: ignore[return-value]
758 #
759 # Fall Back - Repo Digests
760 # ---------------------------
761 phase = 6
763 def condense_repo_id(i: DockerImageInfo) -> str:
764 v: str | None = i.condense_digest(i.repo_digest) if i.repo_digest else None
765 return v or ""
767 if installed.repo_digest and latest.repo_digest:
768 # where the image digest isn't available, fall back to a repo digest
769 return condense_repo_id(installed), condense_repo_id(latest), basis("repo-digest")
771 phase = 7
772 if latest.repo_digest and latest.repo_digest in installed.repo_digests: 772 ↛ 776line 772 didn't jump to line 776 because the condition on line 772 was always true
773 # installed has multiple RepoDigests from multiple pulls and one of them matches latest current repo digest
774 return condense_repo_id(latest), condense_repo_id(latest), basis("repo-digest")
776 if installed_digest_available and not latest_digest_available:
777 # no new digest, so latest is the current
778 return installed.short_digest, installed.short_digest, basis("digest") # type: ignore[return-value]
780 #
781 # Failure to Find Any Version
782 # ---------------------------
783 #
784 log.warn("No versions can be determined for %s", installed.ref)
785 phase = 999
786 return UNKNOWN_VERSION, UNKNOWN_VERSION, basis("failure")