Coverage for src / updates2mqtt / integrations / docker.py: 78%
398 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 23:58 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 23:58 +0000
1import random
2import re
3import subprocess
4import time
5import typing
6from collections.abc import AsyncGenerator, Callable
7from enum import Enum
8from pathlib import Path
9from threading import Event
10from typing import Any, cast
12import docker
13import docker.errors
14import structlog
15from docker.models.containers import Container
17from updates2mqtt.config import (
18 SEMVER_RE,
19 UNKNOWN_VERSION,
20 VERSION_RE,
21 DockerConfig,
22 GitHubConfig,
23 NodeConfig,
24 PackageUpdateInfo,
25 PublishPolicy,
26 RegistryAPI,
27 UpdatePolicy,
28 VersionPolicy,
29)
30from updates2mqtt.helpers import Selection, Throttler
31from updates2mqtt.integrations.docker_enrich import (
32 CommonPackageEnricher,
33 ContainerDistributionAPIVersionLookup,
34 DefaultPackageEnricher,
35 DockerClientVersionLookup,
36 DockerImageInfo,
37 DockerServiceDetails,
38 LinuxServerIOPackageEnricher,
39 LocalContainerInfo,
40 PackageEnricher,
41 SourceReleaseEnricher,
42)
43from updates2mqtt.model import Discovery, ReleaseDetail, ReleaseProvider
45from .git_utils import git_check_update_available, git_iso_timestamp, git_local_digest, git_pull, git_trust
47if typing.TYPE_CHECKING:
48 from docker.models.images import Image
50# distinguish docker build from docker pull?
52log = structlog.get_logger()
55class DockerComposeCommand(Enum):
56 BUILD = "build"
57 UP = "up"
60def safe_json_dt(t: float | None) -> str | None:
61 return time.strftime("%Y-%m-%dT%H:%M:%S.0000", time.gmtime(t)) if t else None
64class ContainerCustomization:
65 """Local customization of a Docker container, by label or env var"""
67 label_prefix: str = "updates2mqtt."
68 env_prefix: str = "UPD2MQTT_"
70 def __init__(self, container: Container) -> None:
71 self.update: UpdatePolicy = UpdatePolicy.PASSIVE # was known as UPD2MQTT_UPDATE before policies and labels
72 self.git_repo_path: str | None = None
73 self.picture: str | None = None
74 self.relnotes: str | None = None
75 self.ignore: bool = False
76 self.version_policy: VersionPolicy | None = None
77 self.registry_token: str | None = None
79 if not container.attrs or container.attrs.get("Config") is None:
80 return
81 env_pairs: list[str] = container.attrs.get("Config", {}).get("Env")
82 if env_pairs:
83 c_env: dict[str, str] = dict(env.split("=", maxsplit=1) for env in env_pairs if "==" not in env)
84 else:
85 c_env = {}
87 for attr in dir(self):
88 if "__" not in attr:
89 label = f"{self.label_prefix}{attr.lower()}"
90 env_var = f"{self.env_prefix}{attr.upper()}"
91 v: Any = None
92 if label in container.labels:
93 # precedence to labels
94 v = container.labels.get(label)
95 log.debug(
96 "%s set from label %s=%s",
97 attr,
98 label,
99 v,
100 integration="docker",
101 container=container.name,
102 action="customize",
103 )
104 elif env_var in c_env:
105 v = c_env[env_var]
106 log.debug(
107 "%s set from env var %s=%s",
108 attr,
109 env_var,
110 v,
111 integration="docker",
112 container=container.name,
113 action="customize",
114 )
115 if v is not None:
116 if isinstance(getattr(self, attr), bool):
117 setattr(self, attr, v.upper() in ("TRUE", "YES", "1"))
118 elif isinstance(getattr(self, attr), VersionPolicy): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 setattr(self, attr, VersionPolicy[v.upper()])
120 elif isinstance(getattr(self, attr), UpdatePolicy):
121 setattr(self, attr, UpdatePolicy[v.upper()])
122 else:
123 setattr(self, attr, v)
126class DockerProvider(ReleaseProvider):
127 def __init__(
128 self,
129 cfg: DockerConfig,
130 node_cfg: NodeConfig,
131 packages: dict[str, PackageUpdateInfo] | None = None,
132 github_cfg: GitHubConfig | None = None,
133 self_bounce: Event | None = None,
134 ) -> None:
135 super().__init__(node_cfg, "docker")
136 self.client: docker.DockerClient = docker.from_env()
137 self.cfg: DockerConfig = cfg
139 # TODO: refresh discovered packages periodically
140 self.throttler = Throttler(self.cfg.default_api_backoff, self.log, self.stopped)
141 self.self_bounce: Event | None = self_bounce
143 self.pkg_enrichers: list[PackageEnricher] = [
144 CommonPackageEnricher(self.cfg, packages),
145 LinuxServerIOPackageEnricher(self.cfg),
146 DefaultPackageEnricher(self.cfg),
147 ]
148 self.docker_client_image_lookup = DockerClientVersionLookup(
149 self.client, self.throttler, self.cfg.registry, self.cfg.default_api_backoff
150 )
151 self.registry_image_lookup = ContainerDistributionAPIVersionLookup(self.throttler, self.cfg.registry)
152 self.release_enricher = SourceReleaseEnricher(github_cfg)
153 self.local_info_builder = LocalContainerInfo()
155 def initialize(self) -> None:
156 for enricher in self.pkg_enrichers:
157 enricher.initialize()
158 self.log.debug("Docker provider initialized")
160 def update(self, discovery: Discovery) -> bool:
161 logger: Any = self.log.bind(container=discovery.name, action="update")
162 logger.info("Updating - last at %s", discovery.update_last_attempt)
163 discovery.update_last_attempt = time.time()
164 self.fetch(discovery)
165 restarted = self.restart(discovery)
166 logger.info("Updated - recorded at %s", discovery.update_last_attempt)
167 return restarted
169 def fetch(self, discovery: Discovery) -> None:
170 logger = self.log.bind(container=discovery.name, action="fetch")
171 installed_info: DockerImageInfo | None = cast("DockerImageInfo|None", discovery.current_detail)
172 service_info: DockerServiceDetails | None = cast("DockerServiceDetails|None", discovery.installation_detail)
174 image_ref: str | None = installed_info.ref if installed_info else None
175 platform: str | None = installed_info.platform if installed_info else None
176 if discovery.can_pull and image_ref:
177 logger.info("Pulling", image_ref=image_ref, platform=platform)
178 image: Image = self.client.images.pull(image_ref, platform=platform, all_tags=False)
179 if image: 179 ↛ 182line 179 didn't jump to line 182 because the condition on line 179 was always true
180 logger.info("Pulled", image_id=image.id, image_ref=image_ref, platform=platform)
181 else:
182 logger.warn("Unable to pull", image_ref=image_ref, platform=platform)
183 elif discovery.can_build and service_info:
184 compose_path: str | None = service_info.compose_path
185 git_repo_path: str | None = service_info.git_repo_path
186 logger.debug("can_build check", git_repo=git_repo_path)
187 if not compose_path or not git_repo_path:
188 logger.warn("No compose path or git repo path configured, skipped build")
189 return
191 full_repo_path: Path = self.full_repo_path(compose_path, git_repo_path)
192 if git_pull(full_repo_path, Path(self.node_cfg.git_path)):
193 self.build(discovery)
194 else:
195 logger.debug("Skipping git_pull, no update")
197 def full_repo_path(self, compose_path: str, git_repo_path: str) -> Path:
198 if compose_path is None or git_repo_path is None: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true
199 raise ValueError("Unexpected null paths")
200 if compose_path and not Path(git_repo_path).is_absolute(): 200 ↛ 202line 200 didn't jump to line 202 because the condition on line 200 was always true
201 return Path(compose_path) / git_repo_path
202 return Path(git_repo_path)
204 def build(self, discovery: Discovery) -> bool:
205 logger = self.log.bind(container=discovery.name, action="build")
206 service_info: DockerServiceDetails | None = cast("DockerServiceDetails|None", discovery.installation_detail)
208 if not service_info or not service_info.compose_path: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 logger.warn("No service_info available on compose")
210 return False
211 logger.info("Building", compose_path=service_info.compose_path, service=service_info.compose_service)
212 return self.execute_compose(
213 command=DockerComposeCommand.BUILD,
214 args="",
215 service=service_info.compose_service,
216 cwd=service_info.compose_path,
217 logger=logger,
218 )
220 def execute_compose(
221 self, command: DockerComposeCommand, args: str, service: str | None, cwd: str | None, logger: structlog.BoundLogger
222 ) -> bool:
223 if not cwd or not Path(cwd).is_dir(): 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 logger.warn("Invalid compose path, skipped %s", command)
225 return False
227 cmd: str = "docker-compose" if self.cfg.compose_version == "v1" else "docker compose"
228 logger.info(f"Executing {cmd} {command} {args} {service}")
229 cmd = cmd + " " + command.value
230 if args: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 cmd = cmd + " " + args
232 if service: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true
233 cmd = cmd + " " + service
235 proc: subprocess.CompletedProcess[str] = subprocess.run(cmd, check=False, shell=True, cwd=cwd, text=True)
236 if proc.returncode == 0:
237 logger.info(f"{command} via compose successful")
238 return True
239 if proc.stderr and "unknown command: docker compose" in proc.stderr: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 logger.warning("docker compose set to wrong version, seems like v1 installed")
241 self.cfg.compose_version = "v1"
242 logger.warn(
243 f"{command} failed: %s",
244 proc.returncode,
245 )
246 return False
248 def restart(self, discovery: Discovery) -> bool:
249 logger = self.log.bind(container=discovery.name, action="restart")
250 installed_info: DockerImageInfo | None = cast("DockerImageInfo|None", discovery.current_detail)
251 service_info: DockerServiceDetails | None = cast("DockerServiceDetails|None", discovery.installation_detail)
253 if (
254 self.self_bounce is not None
255 and installed_info
256 and service_info
257 and (
258 "ghcr.io/rhizomatics/updates2mqtt" in installed_info.ref
259 or (service_info.git_repo_path and service_info.git_repo_path.endswith("updates2mqtt"))
260 )
261 ):
262 logger.warning("Attempting to self-bounce")
263 self.self_bounce.set()
264 if service_info is None:
265 return False
266 return self.execute_compose(
267 command=DockerComposeCommand.UP,
268 args="--detach --yes",
269 service=service_info.compose_service,
270 cwd=service_info.compose_path,
271 logger=logger,
272 )
274 def rescan(self, discovery: Discovery) -> Discovery | None:
275 logger: Any = self.log.bind(container=discovery.name, action="rescan")
276 try:
277 c: Container = self.client.containers.get(discovery.name)
278 if c: 278 ↛ 283line 278 didn't jump to line 283 because the condition on line 278 was always true
279 rediscovery: Discovery | None = self.analyze(c, discovery.session, previous_discovery=discovery)
280 if rediscovery and not rediscovery.throttled: 280 ↛ 283line 280 didn't jump to line 283 because the condition on line 280 was always true
281 self.discoveries[rediscovery.name] = rediscovery
282 return rediscovery
283 logger.warn("Unable to find container for rescan")
284 except docker.errors.NotFound:
285 logger.warn("Container not found in Docker")
286 except docker.errors.APIError:
287 logger.exception("Docker API error retrieving container")
288 return None
290 def analyze(self, c: Container, session: str, previous_discovery: Discovery | None = None) -> Discovery | None:
291 logger = self.log.bind(container=c.name, action="analyze")
293 if c.attrs is None or not c.attrs: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true
294 logger.warn("No container attributes found, discovery rejected")
295 return None
296 if c.name is None: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 logger.warn("No container name found, discovery rejected")
298 return None
300 customization: ContainerCustomization = ContainerCustomization(c)
301 if customization.ignore: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 logger.info("Container ignored due to UPD2MQTT_IGNORE setting")
303 return None
305 if customization.update == UpdatePolicy.AUTO: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true
306 logger.debug("Auto update policy detected")
307 update_policy: UpdatePolicy = customization.update or UpdatePolicy.PASSIVE
309 local_info: DockerImageInfo
310 service_info: DockerServiceDetails
311 local_info, service_info = self.local_info_builder.build_image_info(c)
312 pkg_info: PackageUpdateInfo = self.default_metadata(local_info)
314 version_policy: VersionPolicy
315 if customization.version_policy: 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true
316 logger.debug("Overriding version_policy to local customization: %s", customization.version_policy)
317 version_policy = customization.version_policy
318 else:
319 if self.cfg.version_policy == VersionPolicy.AUTO and pkg_info.docker: 319 ↛ 325line 319 didn't jump to line 325 because the condition on line 319 was always true
320 logger.debug(
321 "Version policy, pkg level %s, config level: %s", pkg_info.docker.version_policy, self.cfg.version_policy
322 )
323 version_policy = pkg_info.docker.version_policy or self.cfg.version_policy
324 else:
325 logger.debug("Version policy, fixed config level: %s", self.cfg.version_policy)
326 version_policy = self.cfg.version_policy
328 try:
329 service_info.git_repo_path = customization.git_repo_path
331 registry_selection = Selection(self.cfg.registry_select, local_info.index_name)
332 latest_info: DockerImageInfo
333 if local_info.pinned: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true
334 logger.debug("Skipping registry fetch for local pinned image, %s", local_info.ref)
335 latest_info = local_info.reuse()
336 elif registry_selection and local_info.ref and not local_info.local_build:
337 if self.cfg.registry.api == RegistryAPI.DOCKER_CLIENT:
338 latest_info = self.docker_client_image_lookup.lookup(local_info)
339 elif self.cfg.registry.api == RegistryAPI.OCI_V2: 339 ↛ 341line 339 didn't jump to line 341 because the condition on line 339 was always true
340 latest_info = self.registry_image_lookup.lookup(local_info, token=customization.registry_token)
341 elif self.cfg.registry.api == RegistryAPI.OCI_V2_MINIMAL:
342 latest_info = self.registry_image_lookup.lookup(
343 local_info, token=customization.registry_token, minimal=True
344 )
345 else: # assuming RegistryAPI.DISABLED
346 logger.debug(f"Skipping registry check, disabled in config {self.cfg.registry.api}")
347 latest_info = local_info.reuse()
348 elif local_info.local_build: 348 ↛ 354line 348 didn't jump to line 354 because the condition on line 348 was always true
349 # assume its a locally built image if no RepoDigests available
350 latest_info = local_info.reuse()
351 latest_info.short_digest = None
352 latest_info.image_digest = None
353 else:
354 logger.debug("Registry selection rules suppressed metadata lookup")
355 latest_info = local_info.reuse()
357 release_info: ReleaseDetail | None = self.release_enricher.enrich(
358 latest_info,
359 source_repo_url=pkg_info.source_repo_url,
360 notes_url=customization.relnotes or pkg_info.release_notes_url,
361 )
362 logger.debug("Enriched release info: %s", release_info)
364 if service_info.git_repo_path and service_info.compose_path:
365 full_repo_path: Path = Path(service_info.compose_path).joinpath(service_info.git_repo_path)
367 git_trust(full_repo_path, Path(self.node_cfg.git_path))
368 service_info.git_local_timestamp = git_iso_timestamp(full_repo_path, Path(self.node_cfg.git_path))
370 can_pull: bool = (
371 self.cfg.allow_pull
372 and not local_info.local_build
373 and local_info.ref is not None
374 and local_info.ref != ""
375 and (local_info.short_digest is not None or latest_info.short_digest is not None)
376 )
377 if self.cfg.allow_pull and not can_pull:
378 logger.debug(
379 f"Pull unavailable, ref:{local_info.ref},local:{local_info.short_digest},latest:{latest_info.short_digest}"
380 )
382 can_build: bool = False
383 if self.cfg.allow_build: 383 ↛ 408line 383 didn't jump to line 408 because the condition on line 383 was always true
384 can_build = service_info.git_repo_path is not None and service_info.compose_path is not None
385 if not can_build:
386 if service_info.git_repo_path is not None: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true
387 logger.debug(
388 "Local build ignored for git_repo_path=%s because no compose_path", service_info.git_repo_path
389 )
390 else:
391 full_repo_path = self.full_repo_path(
392 cast("str", service_info.compose_path), cast("str", service_info.git_repo_path)
393 )
394 if local_info.local_build and full_repo_path: 394 ↛ 408line 394 didn't jump to line 408 because the condition on line 394 was always true
395 git_versionish = git_local_digest(full_repo_path, Path(self.node_cfg.git_path))
396 if git_versionish:
397 local_info.git_digest = git_versionish
398 logger.debug("Git digest for local code %s", git_versionish)
400 behind_count: int = git_check_update_available(full_repo_path, Path(self.node_cfg.git_path))
401 if behind_count > 0:
402 latest_info.git_digest = f"{git_versionish}+{behind_count}"
403 logger.info("Git update available, generating version %s", latest_info.git_digest)
404 else:
405 logger.debug(f"Git update not available, local repo:{full_repo_path}")
406 latest_info.git_digest = git_versionish
408 can_restart: bool = self.cfg.allow_restart and service_info.compose_path is not None
410 if can_pull:
411 update_type = "Docker Image"
412 elif can_build: 412 ↛ 415line 412 didn't jump to line 415 because the condition on line 412 was always true
413 update_type = "Docker Build"
414 else:
415 update_type = "Unavailable"
417 # can_pull,can_build etc are only info flags
418 # the HASS update process is driven by comparing current and available versions
420 public_installed_version: str
421 public_latest_version: str
422 version_basis: str
423 public_installed_version, public_latest_version, version_basis = select_versions(
424 version_policy, local_info, latest_info
425 )
427 publish_policy: PublishPolicy = PublishPolicy.HOMEASSISTANT
428 img_ref_selection = Selection(self.cfg.image_ref_select, local_info.ref)
429 version_selection = Selection(self.cfg.version_select, latest_info.version)
430 if not img_ref_selection or not version_selection: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true
431 self.log.info(
432 "Excluding from HA Discovery for include/exclude rule: %s, %s", local_info.ref, latest_info.version
433 )
434 publish_policy = PublishPolicy.MQTT
436 discovery: Discovery = Discovery(
437 self,
438 c.name,
439 session,
440 node=self.node_cfg.name,
441 entity_picture_url=customization.picture or pkg_info.logo_url,
442 current_version=public_installed_version,
443 publish_policy=publish_policy,
444 update_policy=update_policy,
445 version_policy=version_policy,
446 version_basis=version_basis,
447 latest_version=public_latest_version,
448 device_icon=self.cfg.device_icon,
449 can_pull=can_pull,
450 update_type=update_type,
451 can_build=can_build,
452 can_restart=can_restart,
453 status=(c.status == "running" and "on") or "off",
454 throttled=latest_info.throttled,
455 previous=previous_discovery,
456 release_detail=release_info,
457 installation_detail=service_info,
458 current_detail=local_info,
459 latest_detail=latest_info,
460 )
461 logger.debug("Analyze generated discovery: %s", discovery)
462 return discovery
463 except Exception:
464 logger.exception("Docker Discovery Failure", container_attrs=c.attrs)
465 logger.debug("Analyze returned empty discovery")
466 return None
468 # def version(self, c: Container, version_type: str):
469 # metadata_version: str = c.labels.get("org.opencontainers.image.version")
470 # metadata_revision: str = c.labels.get("org.opencontainers.image.revision")
472 async def scan(self, session: str, shuffle: bool = True) -> AsyncGenerator[Discovery]:
473 logger = self.log.bind(session=session, action="scan", source=self.source_type)
474 containers: int = 0
475 results: int = 0
476 throttled: int = 0
478 targets: list[Container] = self.client.containers.list()
479 if shuffle: 479 ↛ 481line 479 didn't jump to line 481 because the condition on line 479 was always true
480 random.shuffle(targets)
481 logger.debug("Starting scanning %s containers", len(targets))
482 for c in targets:
483 logger.debug("Analyzing container", container=c.name)
484 if self.stopped.is_set(): 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true
485 logger.info(f"Shutdown detected, aborting scan at {c}")
486 break
487 containers = containers + 1
488 result: Discovery | None = self.analyze(c, session)
489 if result: 489 ↛ 496line 489 didn't jump to line 496 because the condition on line 489 was always true
490 logger.debug("Analyzed container", result_name=result.name, throttled=result.throttled)
491 self.discoveries[result.name] = result
492 results = results + 1
493 throttled += 1 if result.throttled else 0
494 yield result
495 else:
496 logger.debug("No result from analysis", container=c.name)
497 logger.info("Completed", container_count=containers, throttled_count=throttled, result_count=results)
499 def command(self, discovery_name: str, command: str, on_update_start: Callable, on_update_end: Callable) -> bool:
500 logger = self.log.bind(container=discovery_name, action="command", command=command)
501 logger.info("Executing Command")
502 discovery: Discovery | None = None
503 updated: bool = False
504 try:
505 discovery = self.resolve(discovery_name)
506 if not discovery:
507 logger.warn("Unknown entity", entity=discovery_name)
508 elif command != "install":
509 logger.warn("Unknown command")
510 else:
511 if discovery.can_update:
512 rediscovery: Discovery | None = None
513 logger.info("Starting update ...")
514 on_update_start(discovery)
515 if self.update(discovery):
516 logger.debug("Rescanning ...")
517 rediscovery = self.rescan(discovery)
518 updated = rediscovery is not None and not rediscovery.throttled
519 logger.info("Rescanned, updated:%s", updated)
520 else:
521 logger.info("Rescan with no result")
522 on_update_end(rediscovery or discovery)
523 else:
524 logger.warning("Update not supported for this container")
525 except Exception:
526 logger.exception("Failed to handle", discovery_name=discovery_name, command=command)
527 if discovery: 527 ↛ 529line 527 didn't jump to line 529 because the condition on line 527 was always true
528 on_update_end(discovery)
529 return updated
531 def resolve(self, discovery_name: str) -> Discovery | None:
532 return self.discoveries.get(discovery_name)
534 def default_metadata(self, image_info: DockerImageInfo) -> PackageUpdateInfo:
535 for enricher in self.pkg_enrichers: 535 ↛ 539line 535 didn't jump to line 539 because the loop on line 535 didn't complete
536 pkg_info: PackageUpdateInfo | None = enricher.enrich(image_info)
537 if pkg_info is not None:
538 return pkg_info
539 raise ValueError("No enricher could provide metadata, not even default enricher")
542def select_versions(version_policy: VersionPolicy, installed: DockerImageInfo, latest: DockerImageInfo) -> tuple[str, str, str]:
543 """Pick the best version string to display based on the version policy and available data
545 Ensures that both local installed and remote latest versions are derived in same way
546 Falls back to digest if version not reliable or not consistent with current/available version
547 """
548 phase: int = 0
549 shortcircuit: str | None = None
551 def basis(rule: str) -> str:
552 return f"{rule}-{phase}" if not shortcircuit else f"{rule}-{phase}-{shortcircuit}"
554 #
555 # Detect No Update Available
556 # --------------------------
557 #
558 # shortcircuit the logic if there's nothing to compare
559 #
560 if latest.throttled:
561 log.debug("Flattening versions for throttled update %s", installed.ref)
562 shortcircuit = "THR"
563 latest = installed
564 elif not any((latest.short_digest, latest.repo_digest, latest.git_digest, latest.version)):
565 log.debug("Flattening versions for empty update %s", installed.ref)
566 shortcircuit = "NUP"
567 latest = installed
568 elif latest.short_digest == installed.short_digest and latest.short_digest is not None:
569 log.debug("Flattening versions for identical update %s", installed.ref)
570 shortcircuit = "SDM"
571 latest = installed
572 elif installed.image_digest in latest.repo_digests: 572 ↛ 574line 572 didn't jump to line 574 because the condition on line 572 was never true
573 # TODO: avoid this by better adaptations for different registries and single/multi manifests
574 log.debug(
575 "Matching new repo_digest against installed image digest for %s image %s", installed.index_name, installed.name
576 )
577 shortcircuit = "FGA"
578 latest = installed
579 elif latest.image_digest in installed.repo_digests: 579 ↛ 581line 579 didn't jump to line 581 because the condition on line 579 was never true
580 # TODO: avoid this by better adaptations for different registries and single/multi manifests
581 log.debug(
582 "Matching new image_digest against installed repo digest for %s image %s", installed.index_name, installed.name
583 )
584 shortcircuit = "FGB"
585 latest = installed
587 #
588 # Explicit Policy Choice
589 # ----------------------
590 #
592 if version_policy == VersionPolicy.VERSION and installed.version and latest.version:
593 return installed.version, latest.version, basis("version")
595 installed_digest_available: bool = installed.short_digest is not None and installed.short_digest != ""
596 latest_digest_available: bool = latest.short_digest is not None and latest.short_digest != ""
597 matching_digest: bool = (
598 installed_digest_available and latest_digest_available and installed.short_digest == latest.short_digest
599 )
600 changed_digest: bool = (
601 installed_digest_available and latest_digest_available and installed.short_digest != latest.short_digest
602 )
604 if version_policy == VersionPolicy.DIGEST and installed_digest_available and latest_digest_available:
605 return installed.short_digest, latest.short_digest, basis("digest") # type: ignore[return-value]
606 if (
607 version_policy == VersionPolicy.VERSION_DIGEST
608 and installed.version
609 and latest.version
610 and installed_digest_available
611 and latest_digest_available
612 ):
613 return (
614 f"{installed.version}:{installed.short_digest}",
615 f"{latest.version}:{latest.short_digest}",
616 basis("version-digest"),
617 )
619 if (
620 version_policy == VersionPolicy.TIMESTAMP
621 and installed.created
622 and latest.created
623 and (
624 (latest.created > installed.created and changed_digest) or (latest.created == installed.created and matching_digest)
625 )
626 ):
627 return installed.created, latest.created, basis("timestamp")
629 #
630 # Auto Policy - Humane Versions
631 # -----------------------------
632 #
633 phase = 1
634 if (
635 version_policy == VersionPolicy.AUTO
636 and installed.version
637 and latest.version
638 and (
639 (installed.version == latest.version and matching_digest)
640 or (installed.version != latest.version and changed_digest)
641 )
642 ):
643 # detect semver, or v semver (e.g. v1.030)
644 # only use this if both version and digest are consistently agreeing or disagreeing
645 # if the strict conditions work, people see nice version numbers on screen rather than hashes
646 if re.fullmatch(SEMVER_RE, installed.version or "") and re.fullmatch(SEMVER_RE, latest.version or ""):
647 # Smells like semver, override if not using version_policy
648 return installed.version, latest.version, basis("semver")
649 if re.fullmatch(VERSION_RE, installed.version or "") and re.fullmatch(VERSION_RE, latest.version or ""): 649 ↛ 653line 649 didn't jump to line 653 because the condition on line 649 was always true
650 # Smells like casual semver, override if not using version_policy
651 return installed.version, latest.version, basis("casualver")
653 if (
654 version_policy == VersionPolicy.AUTO
655 and installed.tag
656 and latest.tag
657 and ((installed.tag == latest.tag and matching_digest) or (installed.tag != latest.tag and changed_digest))
658 ):
659 if re.fullmatch(SEMVER_RE, installed.tag) and re.fullmatch(SEMVER_RE, latest.tag):
660 return installed.tag, latest.tag, basis("semver-tag")
661 if re.fullmatch(SEMVER_RE, installed.tag) and re.fullmatch(SEMVER_RE, latest.tag): 661 ↛ 662line 661 didn't jump to line 662 because the condition on line 661 was never true
662 return installed.tag, latest.tag, basis("semver-tag")
664 #
665 # Local Builds
666 # ------------
667 #
668 phase = 2
669 if installed.git_digest and latest.git_digest:
670 return f"git:{installed.git_digest}", f"git:{latest.git_digest}", basis("git")
672 #
673 # Fall Back - Qualified Versions
674 # --------------------------------
675 #
676 phase = 3
677 if ( 677 ↛ 685line 677 didn't jump to line 685 because the condition on line 677 was never true
678 installed.version
679 and latest.version
680 and (
681 (installed.version == latest.version and matching_digest)
682 or (installed.version != latest.version and changed_digest)
683 )
684 ):
685 return (
686 f"{installed.version}:{installed.short_digest}",
687 f"{latest.version}:{latest.short_digest}",
688 basis("version-digest"),
689 )
691 #
692 # Fall Back - Timestamp, Digest, Version
693 # --------------------------------------
695 phase = 4
696 if (
697 installed.created
698 and latest.created
699 and (
700 (latest.created > installed.created and changed_digest) or (latest.created == installed.created and matching_digest)
701 )
702 ):
703 return installed.created, latest.created, basis("timestamp")
704 if installed_digest_available and latest_digest_available:
705 return installed.short_digest, latest.short_digest, basis("digest") # type: ignore[return-value]
706 if installed.version and not latest.version and not latest.short_digest and not latest.repo_digest: 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true
707 return installed.version, installed.version, basis("version")
709 #
710 # Fall Back - Missing Digests
711 # ---------------------------
712 phase = 5
713 if not installed_digest_available and latest_digest_available: 713 ↛ 715line 713 didn't jump to line 715 because the condition on line 713 was never true
714 # odd condition if local image has no identity, even out versions so no update alert
715 return latest.short_digest, latest.short_digest, basis("digest") # type: ignore[return-value]
717 #
718 # Fall Back - Repo Digests
719 # ---------------------------
720 phase = 6
722 def condense_repo_id(i: DockerImageInfo) -> str:
723 v: str | None = i.condense_digest(i.repo_digest) if i.repo_digest else None
724 return v or ""
726 if installed.repo_digest and latest.repo_digest:
727 # where the image digest isn't available, fall back to a repo digest
728 return condense_repo_id(installed), condense_repo_id(latest), basis("repo-digest")
730 phase = 7
731 if latest.repo_digest and latest.repo_digest in installed.repo_digests: 731 ↛ 735line 731 didn't jump to line 735 because the condition on line 731 was always true
732 # installed has multiple RepoDigests from multiple pulls and one of them matches latest current repo digest
733 return condense_repo_id(latest), condense_repo_id(latest), basis("repo-digest")
735 if installed_digest_available and not latest_digest_available:
736 # no new digest, so latest is the current
737 return installed.short_digest, installed.short_digest, basis("digest") # type: ignore[return-value]
739 #
740 # Failure to Find Any Version
741 # ---------------------------
742 #
743 log.warn("No versions can be determined for %s", installed.ref)
744 phase = 999
745 return UNKNOWN_VERSION, UNKNOWN_VERSION, basis("failure")