Coverage for src / updates2mqtt / integrations / docker.py: 78%
406 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-20 23:13 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-20 23:13 +0000
1import random
2import re
3import subprocess
4import time
5import typing
6from collections.abc import AsyncGenerator, Callable
7from enum import Enum
8from pathlib import Path
9from threading import Event
10from typing import Any, cast
12import docker
13import docker.errors
14import structlog
15from docker.models.containers import Container
17from updates2mqtt.config import (
18 SEMVER_RE,
19 SOURCE_PLATFORM_GITHUB,
20 UNKNOWN_VERSION,
21 VERSION_RE,
22 DockerConfig,
23 GitHubConfig,
24 NodeConfig,
25 PackageUpdateInfo,
26 PublishPolicy,
27 RegistryAPI,
28 UpdatePolicy,
29 VersionPolicy,
30)
31from updates2mqtt.helpers import Selection, Throttler
32from updates2mqtt.integrations.docker_enrich import (
33 CommonPackageEnricher,
34 ContainerDistributionAPIVersionLookup,
35 DefaultPackageEnricher,
36 DockerClientVersionLookup,
37 DockerImageInfo,
38 DockerServiceDetails,
39 LinuxServerIOPackageEnricher,
40 LocalContainerInfo,
41 PackageEnricher,
42 SourceReleaseEnricher,
43)
44from updates2mqtt.integrations.github_enrich import GithubReleaseEnricher
45from updates2mqtt.model import Discovery, ReleaseDetail, ReleaseProvider
47from .git_utils import git_check_update_available, git_iso_timestamp, git_local_digest, git_pull, git_trust
49if typing.TYPE_CHECKING:
50 from docker.models.images import Image
52# distinguish docker build from docker pull?
54log = structlog.get_logger()
57class DockerComposeCommand(Enum):
58 BUILD = "build"
59 UP = "up"
62def safe_json_dt(t: float | None) -> str | None:
63 return time.strftime("%Y-%m-%dT%H:%M:%S.0000", time.gmtime(t)) if t else None
66class ContainerCustomization:
67 """Local customization of a Docker container, by label or env var"""
69 label_prefix: str = "updates2mqtt."
70 env_prefix: str = "UPD2MQTT_"
72 def __init__(self, container: Container) -> None:
73 self.update: UpdatePolicy = UpdatePolicy.PASSIVE # was known as UPD2MQTT_UPDATE before policies and labels
74 self.git_repo_path: str | None = None
75 self.picture: str | None = None
76 self.relnotes: str | None = None
77 self.ignore: bool = False
78 self.version_policy: VersionPolicy | None = None
79 self.registry_token: str | None = None
81 if not container.attrs or container.attrs.get("Config") is None:
82 return
83 env_pairs: list[str] = container.attrs.get("Config", {}).get("Env")
84 if env_pairs:
85 c_env: dict[str, str] = dict(env.split("=", maxsplit=1) for env in env_pairs if "==" not in env)
86 else:
87 c_env = {}
89 for attr in dir(self):
90 if "__" not in attr:
91 label = f"{self.label_prefix}{attr.lower()}"
92 env_var = f"{self.env_prefix}{attr.upper()}"
93 v: Any = None
94 if label in container.labels:
95 # precedence to labels
96 v = container.labels.get(label)
97 log.debug(
98 "%s set from label %s=%s",
99 attr,
100 label,
101 v,
102 integration="docker",
103 container=container.name,
104 action="customize",
105 )
106 elif env_var in c_env:
107 v = c_env[env_var]
108 log.debug(
109 "%s set from env var %s=%s",
110 attr,
111 env_var,
112 v,
113 integration="docker",
114 container=container.name,
115 action="customize",
116 )
117 if v is not None:
118 if isinstance(getattr(self, attr), bool):
119 setattr(self, attr, v.upper() in ("TRUE", "YES", "1"))
120 elif isinstance(getattr(self, attr), VersionPolicy): 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 setattr(self, attr, VersionPolicy[v.upper()])
122 elif isinstance(getattr(self, attr), UpdatePolicy):
123 setattr(self, attr, UpdatePolicy[v.upper()])
124 else:
125 setattr(self, attr, v)
128class DockerProvider(ReleaseProvider):
129 def __init__(
130 self,
131 cfg: DockerConfig,
132 node_cfg: NodeConfig,
133 packages: dict[str, PackageUpdateInfo] | None = None,
134 github_cfg: GitHubConfig | None = None,
135 self_bounce: Event | None = None,
136 ) -> None:
137 super().__init__(node_cfg, "docker")
138 self.client: docker.DockerClient = docker.from_env()
139 self.cfg: DockerConfig = cfg
141 # TODO: refresh discovered packages periodically
142 self.throttler = Throttler(self.cfg.default_api_backoff, self.log, self.stopped)
143 self.self_bounce: Event | None = self_bounce
145 self.pkg_enrichers: list[PackageEnricher] = [
146 CommonPackageEnricher(self.cfg, packages),
147 LinuxServerIOPackageEnricher(self.cfg),
148 DefaultPackageEnricher(self.cfg),
149 ]
150 self.docker_client_image_lookup = DockerClientVersionLookup(
151 self.client, self.throttler, self.cfg.registry, self.cfg.default_api_backoff
152 )
153 self.registry_image_lookup = ContainerDistributionAPIVersionLookup(self.throttler, self.cfg.registry)
154 self.release_enricher = SourceReleaseEnricher()
155 if github_cfg: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 self.github_enricher: GithubReleaseEnricher | None = GithubReleaseEnricher(github_cfg)
157 else:
158 self.github_enricher = None
159 self.local_info_builder = LocalContainerInfo()
161 def initialize(self) -> None:
162 for enricher in self.pkg_enrichers:
163 enricher.initialize()
164 self.log.debug("Docker provider initialized")
166 def update(self, discovery: Discovery) -> bool:
167 logger: Any = self.log.bind(container=discovery.name, action="update")
168 logger.info("Updating - last at %s", discovery.update_last_attempt)
169 discovery.update_last_attempt = time.time()
170 self.fetch(discovery)
171 restarted = self.restart(discovery)
172 logger.info("Updated - recorded at %s", discovery.update_last_attempt)
173 return restarted
175 def fetch(self, discovery: Discovery) -> None:
176 logger = self.log.bind(container=discovery.name, action="fetch")
177 installed_info: DockerImageInfo | None = cast("DockerImageInfo|None", discovery.current_detail)
178 service_info: DockerServiceDetails | None = cast("DockerServiceDetails|None", discovery.installation_detail)
180 image_ref: str | None = installed_info.ref if installed_info else None
181 platform: str | None = installed_info.platform if installed_info else None
182 if discovery.can_pull and image_ref:
183 logger.info("Pulling", image_ref=image_ref, platform=platform)
184 image: Image = self.client.images.pull(image_ref, platform=platform, all_tags=False)
185 if image: 185 ↛ 188line 185 didn't jump to line 188 because the condition on line 185 was always true
186 logger.info("Pulled", image_id=image.id, image_ref=image_ref, platform=platform)
187 else:
188 logger.warn("Unable to pull", image_ref=image_ref, platform=platform)
189 elif discovery.can_build and service_info:
190 compose_path: str | None = service_info.compose_path
191 git_repo_path: str | None = service_info.git_repo_path
192 logger.debug("can_build check", git_repo=git_repo_path)
193 if not compose_path or not git_repo_path:
194 logger.warn("No compose path or git repo path configured, skipped build")
195 return
197 full_repo_path: Path = self.full_repo_path(compose_path, git_repo_path)
198 if git_pull(full_repo_path, Path(self.node_cfg.git_path)):
199 self.build(discovery)
200 else:
201 logger.debug("Skipping git_pull, no update")
203 def full_repo_path(self, compose_path: str, git_repo_path: str) -> Path:
204 if compose_path is None or git_repo_path is None: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 raise ValueError("Unexpected null paths")
206 if compose_path and not Path(git_repo_path).is_absolute(): 206 ↛ 208line 206 didn't jump to line 208 because the condition on line 206 was always true
207 return Path(compose_path) / git_repo_path
208 return Path(git_repo_path)
210 def build(self, discovery: Discovery) -> bool:
211 logger = self.log.bind(container=discovery.name, action="build")
212 service_info: DockerServiceDetails | None = cast("DockerServiceDetails|None", discovery.installation_detail)
214 if not service_info or not service_info.compose_path: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 logger.warn("No service_info available on compose")
216 return False
217 logger.info("Building", compose_path=service_info.compose_path, service=service_info.compose_service)
218 return self.execute_compose(
219 command=DockerComposeCommand.BUILD,
220 args="",
221 service=service_info.compose_service,
222 cwd=service_info.compose_path,
223 logger=logger,
224 )
226 def execute_compose(
227 self, command: DockerComposeCommand, args: str, service: str | None, cwd: str | None, logger: structlog.BoundLogger
228 ) -> bool:
229 if not cwd or not Path(cwd).is_dir(): 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 logger.warn("Invalid compose path, skipped %s", command)
231 return False
233 cmd: str = "docker-compose" if self.cfg.compose_version == "v1" else "docker compose"
234 logger.info(f"Executing {cmd} {command} {args} {service}")
235 cmd = cmd + " " + command.value
236 if args: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 cmd = cmd + " " + args
238 if service: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true
239 cmd = cmd + " " + service
241 proc: subprocess.CompletedProcess[str] = subprocess.run(cmd, check=False, shell=True, cwd=cwd, text=True)
242 if proc.returncode == 0:
243 logger.info(f"{command} via compose successful")
244 return True
245 if proc.stderr and "unknown command: docker compose" in proc.stderr: 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true
246 logger.warning("docker compose set to wrong version, seems like v1 installed")
247 self.cfg.compose_version = "v1"
248 logger.warn(
249 f"{command} failed: %s",
250 proc.returncode,
251 )
252 return False
254 def restart(self, discovery: Discovery) -> bool:
255 logger = self.log.bind(container=discovery.name, action="restart")
256 installed_info: DockerImageInfo | None = cast("DockerImageInfo|None", discovery.current_detail)
257 service_info: DockerServiceDetails | None = cast("DockerServiceDetails|None", discovery.installation_detail)
259 if (
260 self.self_bounce is not None
261 and installed_info
262 and service_info
263 and (
264 "ghcr.io/rhizomatics/updates2mqtt" in installed_info.ref
265 or (service_info.git_repo_path and service_info.git_repo_path.endswith("updates2mqtt"))
266 )
267 ):
268 logger.warning("Attempting to self-bounce")
269 self.self_bounce.set()
270 if service_info is None:
271 return False
272 return self.execute_compose(
273 command=DockerComposeCommand.UP,
274 args="--detach --yes",
275 service=service_info.compose_service,
276 cwd=service_info.compose_path,
277 logger=logger,
278 )
280 def rescan(self, discovery: Discovery) -> Discovery | None:
281 logger: Any = self.log.bind(container=discovery.name, action="rescan")
282 try:
283 c: Container = self.client.containers.get(discovery.name)
284 if c: 284 ↛ 289line 284 didn't jump to line 289 because the condition on line 284 was always true
285 rediscovery: Discovery | None = self.analyze(c, discovery.session, previous_discovery=discovery)
286 if rediscovery and not rediscovery.throttled: 286 ↛ 289line 286 didn't jump to line 289 because the condition on line 286 was always true
287 self.discoveries[rediscovery.name] = rediscovery
288 return rediscovery
289 logger.warn("Unable to find container for rescan")
290 except docker.errors.NotFound:
291 logger.warn("Container not found in Docker")
292 except docker.errors.APIError:
293 logger.exception("Docker API error retrieving container")
294 return None
296 def analyze(self, c: Container, session: str, previous_discovery: Discovery | None = None) -> Discovery | None:
297 logger = self.log.bind(container=c.name, action="analyze")
299 if c.attrs is None or not c.attrs: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true
300 logger.warn("No container attributes found, discovery rejected")
301 return None
302 if c.name is None: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 logger.warn("No container name found, discovery rejected")
304 return None
306 customization: ContainerCustomization = ContainerCustomization(c)
307 if customization.ignore: 307 ↛ 308line 307 didn't jump to line 308 because the condition on line 307 was never true
308 logger.info("Container ignored due to UPD2MQTT_IGNORE setting")
309 return None
311 if customization.update == UpdatePolicy.AUTO: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true
312 logger.debug("Auto update policy detected")
313 update_policy: UpdatePolicy = customization.update or UpdatePolicy.PASSIVE
315 local_info: DockerImageInfo
316 service_info: DockerServiceDetails
317 local_info, service_info = self.local_info_builder.build_image_info(c)
318 pkg_info: PackageUpdateInfo = self.default_metadata(local_info)
320 version_policy: VersionPolicy
321 if customization.version_policy: 321 ↛ 322line 321 didn't jump to line 322 because the condition on line 321 was never true
322 logger.debug("Overriding version_policy to local customization: %s", customization.version_policy)
323 version_policy = customization.version_policy
324 else:
325 if self.cfg.version_policy == VersionPolicy.AUTO and pkg_info.docker: 325 ↛ 331line 325 didn't jump to line 331 because the condition on line 325 was always true
326 logger.debug(
327 "Version policy, pkg level %s, config level: %s", pkg_info.docker.version_policy, self.cfg.version_policy
328 )
329 version_policy = pkg_info.docker.version_policy or self.cfg.version_policy
330 else:
331 logger.debug("Version policy, fixed config level: %s", self.cfg.version_policy)
332 version_policy = self.cfg.version_policy
334 try:
335 service_info.git_repo_path = customization.git_repo_path
337 registry_selection = Selection(self.cfg.registry_select, local_info.index_name)
338 latest_info: DockerImageInfo
339 if local_info.pinned: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true
340 logger.debug("Skipping registry fetch for local pinned image, %s", local_info.ref)
341 latest_info = local_info.reuse()
342 elif registry_selection and local_info.ref and not local_info.local_build:
343 if self.cfg.registry.api == RegistryAPI.DOCKER_CLIENT:
344 latest_info = self.docker_client_image_lookup.lookup(local_info)
345 elif self.cfg.registry.api == RegistryAPI.OCI_V2: 345 ↛ 347line 345 didn't jump to line 347 because the condition on line 345 was always true
346 latest_info = self.registry_image_lookup.lookup(local_info, token=customization.registry_token)
347 elif self.cfg.registry.api == RegistryAPI.OCI_V2_MINIMAL:
348 latest_info = self.registry_image_lookup.lookup(
349 local_info, token=customization.registry_token, minimal=True
350 )
351 else: # assuming RegistryAPI.DISABLED
352 logger.debug(f"Skipping registry check, disabled in config {self.cfg.registry.api}")
353 latest_info = local_info.reuse()
354 elif local_info.local_build: 354 ↛ 360line 354 didn't jump to line 360 because the condition on line 354 was always true
355 # assume its a locally built image if no RepoDigests available
356 latest_info = local_info.reuse()
357 latest_info.short_digest = None
358 latest_info.image_digest = None
359 else:
360 logger.debug("Registry selection rules suppressed metadata lookup")
361 latest_info = local_info.reuse()
363 release_info: ReleaseDetail | None = self.release_enricher.enrich(
364 latest_info,
365 source_repo_url=pkg_info.source_repo_url,
366 notes_url=customization.relnotes or pkg_info.release_notes_url,
367 )
368 logger.debug("Enriched release info: %s", release_info)
370 if latest_info.image_digest and release_info:
371 if self.github_enricher and release_info.source_platform == SOURCE_PLATFORM_GITHUB: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true
372 self.github_enricher.enrich(latest_info, release_info)
373 else:
374 self.log.debug("Not a github release or no github configured")
376 if service_info.git_repo_path and service_info.compose_path:
377 full_repo_path: Path = Path(service_info.compose_path).joinpath(service_info.git_repo_path)
379 git_trust(full_repo_path, Path(self.node_cfg.git_path))
380 service_info.git_local_timestamp = git_iso_timestamp(full_repo_path, Path(self.node_cfg.git_path))
382 can_pull: bool = (
383 self.cfg.allow_pull
384 and not local_info.local_build
385 and local_info.ref is not None
386 and local_info.ref != ""
387 and (local_info.short_digest is not None or latest_info.short_digest is not None)
388 )
389 if self.cfg.allow_pull and not can_pull:
390 logger.debug(
391 f"Pull unavailable, ref:{local_info.ref},local:{local_info.short_digest},latest:{latest_info.short_digest}"
392 )
394 can_build: bool = False
395 if self.cfg.allow_build: 395 ↛ 420line 395 didn't jump to line 420 because the condition on line 395 was always true
396 can_build = service_info.git_repo_path is not None and service_info.compose_path is not None
397 if not can_build:
398 if service_info.git_repo_path is not None: 398 ↛ 399line 398 didn't jump to line 399 because the condition on line 398 was never true
399 logger.debug(
400 "Local build ignored for git_repo_path=%s because no compose_path", service_info.git_repo_path
401 )
402 else:
403 full_repo_path = self.full_repo_path(
404 cast("str", service_info.compose_path), cast("str", service_info.git_repo_path)
405 )
406 if local_info.local_build and full_repo_path: 406 ↛ 420line 406 didn't jump to line 420 because the condition on line 406 was always true
407 git_versionish = git_local_digest(full_repo_path, Path(self.node_cfg.git_path))
408 if git_versionish:
409 local_info.git_digest = git_versionish
410 logger.debug("Git digest for local code %s", git_versionish)
412 behind_count: int = git_check_update_available(full_repo_path, Path(self.node_cfg.git_path))
413 if behind_count > 0:
414 latest_info.git_digest = f"{git_versionish}+{behind_count}"
415 logger.info("Git update available, generating version %s", latest_info.git_digest)
416 else:
417 logger.debug(f"Git update not available, local repo:{full_repo_path}")
418 latest_info.git_digest = git_versionish
420 can_restart: bool = self.cfg.allow_restart and service_info.compose_path is not None
422 if can_pull:
423 update_type = "Docker Image"
424 elif can_build: 424 ↛ 427line 424 didn't jump to line 427 because the condition on line 424 was always true
425 update_type = "Docker Build"
426 else:
427 update_type = "Unavailable"
429 # can_pull,can_build etc are only info flags
430 # the HASS update process is driven by comparing current and available versions
432 public_installed_version: str
433 public_latest_version: str
434 version_basis: str
435 public_installed_version, public_latest_version, version_basis = select_versions(
436 version_policy, local_info, latest_info
437 )
439 publish_policy: PublishPolicy = PublishPolicy.HOMEASSISTANT
440 img_ref_selection = Selection(self.cfg.image_ref_select, local_info.ref)
441 version_selection = Selection(self.cfg.version_select, latest_info.version)
442 if not img_ref_selection or not version_selection: 442 ↛ 443line 442 didn't jump to line 443 because the condition on line 442 was never true
443 self.log.info(
444 "Excluding from HA Discovery for include/exclude rule: %s, %s", local_info.ref, latest_info.version
445 )
446 publish_policy = PublishPolicy.MQTT
448 discovery: Discovery = Discovery(
449 self,
450 c.name,
451 session,
452 node=self.node_cfg.name,
453 entity_picture_url=customization.picture or pkg_info.logo_url,
454 current_version=public_installed_version,
455 publish_policy=publish_policy,
456 update_policy=update_policy,
457 version_policy=version_policy,
458 version_basis=version_basis,
459 latest_version=public_latest_version,
460 device_icon=self.cfg.device_icon,
461 can_pull=can_pull,
462 update_type=update_type,
463 can_build=can_build,
464 can_restart=can_restart,
465 status=(c.status == "running" and "on") or "off",
466 throttled=latest_info.throttled,
467 previous=previous_discovery,
468 release_detail=release_info,
469 installation_detail=service_info,
470 current_detail=local_info,
471 latest_detail=latest_info,
472 )
473 logger.debug("Analyze generated discovery: %s", discovery)
474 return discovery
475 except Exception:
476 logger.exception("Docker Discovery Failure", container_attrs=c.attrs)
477 logger.debug("Analyze returned empty discovery")
478 return None
480 # def version(self, c: Container, version_type: str):
481 # metadata_version: str = c.labels.get("org.opencontainers.image.version")
482 # metadata_revision: str = c.labels.get("org.opencontainers.image.revision")
484 async def scan(self, session: str, shuffle: bool = True) -> AsyncGenerator[Discovery]:
485 logger = self.log.bind(session=session, action="scan", source=self.source_type)
486 containers: int = 0
487 results: int = 0
488 throttled: int = 0
490 targets: list[Container] = self.client.containers.list()
491 if shuffle: 491 ↛ 493line 491 didn't jump to line 493 because the condition on line 491 was always true
492 random.shuffle(targets)
493 logger.debug("Starting scanning %s containers", len(targets))
494 for c in targets:
495 logger.debug("Analyzing container", container=c.name)
496 if self.stopped.is_set(): 496 ↛ 497line 496 didn't jump to line 497 because the condition on line 496 was never true
497 logger.info(f"Shutdown detected, aborting scan at {c}")
498 break
499 containers = containers + 1
500 result: Discovery | None = self.analyze(c, session)
501 if result: 501 ↛ 508line 501 didn't jump to line 508 because the condition on line 501 was always true
502 logger.debug("Analyzed container", result_name=result.name, throttled=result.throttled)
503 self.discoveries[result.name] = result
504 results = results + 1
505 throttled += 1 if result.throttled else 0
506 yield result
507 else:
508 logger.debug("No result from analysis", container=c.name)
509 logger.info("Completed", container_count=containers, throttled_count=throttled, result_count=results)
511 def command(self, discovery_name: str, command: str, on_update_start: Callable, on_update_end: Callable) -> bool:
512 logger = self.log.bind(container=discovery_name, action="command", command=command)
513 logger.info("Executing Command")
514 discovery: Discovery | None = None
515 updated: bool = False
516 try:
517 discovery = self.resolve(discovery_name)
518 if not discovery:
519 logger.warn("Unknown entity", entity=discovery_name)
520 elif command != "install":
521 logger.warn("Unknown command")
522 else:
523 if discovery.can_update:
524 rediscovery: Discovery | None = None
525 logger.info("Starting update ...")
526 on_update_start(discovery)
527 if self.update(discovery):
528 logger.debug("Rescanning ...")
529 rediscovery = self.rescan(discovery)
530 updated = rediscovery is not None and not rediscovery.throttled
531 logger.info("Rescanned, updated:%s", updated)
532 else:
533 logger.info("Rescan with no result")
534 on_update_end(rediscovery or discovery)
535 else:
536 logger.warning("Update not supported for this container")
537 except Exception:
538 logger.exception("Failed to handle", discovery_name=discovery_name, command=command)
539 if discovery: 539 ↛ 541line 539 didn't jump to line 541 because the condition on line 539 was always true
540 on_update_end(discovery)
541 return updated
543 def resolve(self, discovery_name: str) -> Discovery | None:
544 return self.discoveries.get(discovery_name)
546 def default_metadata(self, image_info: DockerImageInfo) -> PackageUpdateInfo:
547 for enricher in self.pkg_enrichers: 547 ↛ 551line 547 didn't jump to line 551 because the loop on line 547 didn't complete
548 pkg_info: PackageUpdateInfo | None = enricher.enrich(image_info)
549 if pkg_info is not None:
550 return pkg_info
551 raise ValueError("No enricher could provide metadata, not even default enricher")
554def select_versions(version_policy: VersionPolicy, installed: DockerImageInfo, latest: DockerImageInfo) -> tuple[str, str, str]:
555 """Pick the best version string to display based on the version policy and available data
557 Ensures that both local installed and remote latest versions are derived in same way
558 Falls back to digest if version not reliable or not consistent with current/available version
559 """
560 phase: int = 0
561 shortcircuit: str | None = None
563 def basis(rule: str) -> str:
564 return f"{rule}-{phase}" if not shortcircuit else f"{rule}-{phase}-{shortcircuit}"
566 #
567 # Detect No Update Available
568 # --------------------------
569 #
570 # shortcircuit the logic if there's nothing to compare
571 #
572 if latest.throttled:
573 log.debug("Flattening versions for throttled update %s", installed.ref)
574 shortcircuit = "THR"
575 latest = installed
576 elif not any((latest.short_digest, latest.repo_digest, latest.git_digest, latest.version)):
577 log.debug("Flattening versions for empty update %s", installed.ref)
578 shortcircuit = "NUP"
579 latest = installed
580 elif latest.short_digest == installed.short_digest and latest.short_digest is not None:
581 log.debug("Flattening versions for identical update %s", installed.ref)
582 shortcircuit = "SDM"
583 latest = installed
584 elif installed.image_digest in latest.repo_digests: 584 ↛ 586line 584 didn't jump to line 586 because the condition on line 584 was never true
585 # TODO: avoid this by better adaptations for different registries and single/multi manifests
586 log.debug(
587 "Matching new repo_digest against installed image digest for %s image %s", installed.index_name, installed.name
588 )
589 shortcircuit = "FGA"
590 latest = installed
591 elif latest.image_digest in installed.repo_digests: 591 ↛ 593line 591 didn't jump to line 593 because the condition on line 591 was never true
592 # TODO: avoid this by better adaptations for different registries and single/multi manifests
593 log.debug(
594 "Matching new image_digest against installed repo digest for %s image %s", installed.index_name, installed.name
595 )
596 shortcircuit = "FGB"
597 latest = installed
599 #
600 # Explicit Policy Choice
601 # ----------------------
602 #
604 if version_policy == VersionPolicy.VERSION and installed.version and latest.version:
605 return installed.version, latest.version, basis("version")
607 installed_digest_available: bool = installed.short_digest is not None and installed.short_digest != ""
608 latest_digest_available: bool = latest.short_digest is not None and latest.short_digest != ""
609 matching_digest: bool = (
610 installed_digest_available and latest_digest_available and installed.short_digest == latest.short_digest
611 )
612 changed_digest: bool = (
613 installed_digest_available and latest_digest_available and installed.short_digest != latest.short_digest
614 )
616 if version_policy == VersionPolicy.DIGEST and installed_digest_available and latest_digest_available:
617 return installed.short_digest, latest.short_digest, basis("digest") # type: ignore[return-value]
618 if (
619 version_policy == VersionPolicy.VERSION_DIGEST
620 and installed.version
621 and latest.version
622 and installed_digest_available
623 and latest_digest_available
624 ):
625 return (
626 f"{installed.version}:{installed.short_digest}",
627 f"{latest.version}:{latest.short_digest}",
628 basis("version-digest"),
629 )
631 if (
632 version_policy == VersionPolicy.TIMESTAMP
633 and installed.created
634 and latest.created
635 and (
636 (latest.created > installed.created and changed_digest) or (latest.created == installed.created and matching_digest)
637 )
638 ):
639 return installed.created, latest.created, basis("timestamp")
641 #
642 # Auto Policy - Humane Versions
643 # -----------------------------
644 #
645 phase = 1
646 if (
647 version_policy == VersionPolicy.AUTO
648 and installed.version
649 and latest.version
650 and (
651 (installed.version == latest.version and matching_digest)
652 or (installed.version != latest.version and changed_digest)
653 )
654 ):
655 # detect semver, or v semver (e.g. v1.030)
656 # only use this if both version and digest are consistently agreeing or disagreeing
657 # if the strict conditions work, people see nice version numbers on screen rather than hashes
658 if re.fullmatch(SEMVER_RE, installed.version or "") and re.fullmatch(SEMVER_RE, latest.version or ""):
659 # Smells like semver, override if not using version_policy
660 return installed.version, latest.version, basis("semver")
661 if re.fullmatch(VERSION_RE, installed.version or "") and re.fullmatch(VERSION_RE, latest.version or ""): 661 ↛ 665line 661 didn't jump to line 665 because the condition on line 661 was always true
662 # Smells like casual semver, override if not using version_policy
663 return installed.version, latest.version, basis("casualver")
665 if (
666 version_policy == VersionPolicy.AUTO
667 and installed.tag
668 and latest.tag
669 and ((installed.tag == latest.tag and matching_digest) or (installed.tag != latest.tag and changed_digest))
670 ):
671 if re.fullmatch(SEMVER_RE, installed.tag) and re.fullmatch(SEMVER_RE, latest.tag):
672 return installed.tag, latest.tag, basis("semver-tag")
673 if re.fullmatch(SEMVER_RE, installed.tag) and re.fullmatch(SEMVER_RE, latest.tag): 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true
674 return installed.tag, latest.tag, basis("semver-tag")
676 #
677 # Local Builds
678 # ------------
679 #
680 phase = 2
681 if installed.git_digest and latest.git_digest:
682 return f"git:{installed.git_digest}", f"git:{latest.git_digest}", basis("git")
684 #
685 # Fall Back - Qualified Versions
686 # --------------------------------
687 #
688 phase = 3
689 if ( 689 ↛ 697line 689 didn't jump to line 697 because the condition on line 689 was never true
690 installed.version
691 and latest.version
692 and (
693 (installed.version == latest.version and matching_digest)
694 or (installed.version != latest.version and changed_digest)
695 )
696 ):
697 return (
698 f"{installed.version}:{installed.short_digest}",
699 f"{latest.version}:{latest.short_digest}",
700 basis("version-digest"),
701 )
703 #
704 # Fall Back - Timestamp, Digest, Version
705 # --------------------------------------
707 phase = 4
708 if (
709 installed.created
710 and latest.created
711 and (
712 (latest.created > installed.created and changed_digest) or (latest.created == installed.created and matching_digest)
713 )
714 ):
715 return installed.created, latest.created, basis("timestamp")
716 if installed_digest_available and latest_digest_available:
717 return installed.short_digest, latest.short_digest, basis("digest") # type: ignore[return-value]
718 if installed.version and not latest.version and not latest.short_digest and not latest.repo_digest: 718 ↛ 719line 718 didn't jump to line 719 because the condition on line 718 was never true
719 return installed.version, installed.version, basis("version")
721 #
722 # Fall Back - Missing Digests
723 # ---------------------------
724 phase = 5
725 if not installed_digest_available and latest_digest_available: 725 ↛ 727line 725 didn't jump to line 727 because the condition on line 725 was never true
726 # odd condition if local image has no identity, even out versions so no update alert
727 return latest.short_digest, latest.short_digest, basis("digest") # type: ignore[return-value]
729 #
730 # Fall Back - Repo Digests
731 # ---------------------------
732 phase = 6
734 def condense_repo_id(i: DockerImageInfo) -> str:
735 v: str | None = i.condense_digest(i.repo_digest) if i.repo_digest else None
736 return v or ""
738 if installed.repo_digest and latest.repo_digest:
739 # where the image digest isn't available, fall back to a repo digest
740 return condense_repo_id(installed), condense_repo_id(latest), basis("repo-digest")
742 phase = 7
743 if latest.repo_digest and latest.repo_digest in installed.repo_digests: 743 ↛ 747line 743 didn't jump to line 747 because the condition on line 743 was always true
744 # installed has multiple RepoDigests from multiple pulls and one of them matches latest current repo digest
745 return condense_repo_id(latest), condense_repo_id(latest), basis("repo-digest")
747 if installed_digest_available and not latest_digest_available:
748 # no new digest, so latest is the current
749 return installed.short_digest, installed.short_digest, basis("digest") # type: ignore[return-value]
751 #
752 # Failure to Find Any Version
753 # ---------------------------
754 #
755 log.warn("No versions can be determined for %s", installed.ref)
756 phase = 999
757 return UNKNOWN_VERSION, UNKNOWN_VERSION, basis("failure")