Coverage for src / updates2mqtt / integrations / docker.py: 78%

398 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 23:58 +0000

1import random 

2import re 

3import subprocess 

4import time 

5import typing 

6from collections.abc import AsyncGenerator, Callable 

7from enum import Enum 

8from pathlib import Path 

9from threading import Event 

10from typing import Any, cast 

11 

12import docker 

13import docker.errors 

14import structlog 

15from docker.models.containers import Container 

16 

17from updates2mqtt.config import ( 

18 SEMVER_RE, 

19 UNKNOWN_VERSION, 

20 VERSION_RE, 

21 DockerConfig, 

22 GitHubConfig, 

23 NodeConfig, 

24 PackageUpdateInfo, 

25 PublishPolicy, 

26 RegistryAPI, 

27 UpdatePolicy, 

28 VersionPolicy, 

29) 

30from updates2mqtt.helpers import Selection, Throttler 

31from updates2mqtt.integrations.docker_enrich import ( 

32 CommonPackageEnricher, 

33 ContainerDistributionAPIVersionLookup, 

34 DefaultPackageEnricher, 

35 DockerClientVersionLookup, 

36 DockerImageInfo, 

37 DockerServiceDetails, 

38 LinuxServerIOPackageEnricher, 

39 LocalContainerInfo, 

40 PackageEnricher, 

41 SourceReleaseEnricher, 

42) 

43from updates2mqtt.model import Discovery, ReleaseDetail, ReleaseProvider 

44 

45from .git_utils import git_check_update_available, git_iso_timestamp, git_local_digest, git_pull, git_trust 

46 

47if typing.TYPE_CHECKING: 

48 from docker.models.images import Image 

49 

50# distinguish docker build from docker pull? 

51 

52log = structlog.get_logger() 

53 

54 

55class DockerComposeCommand(Enum): 

56 BUILD = "build" 

57 UP = "up" 

58 

59 

60def safe_json_dt(t: float | None) -> str | None: 

61 return time.strftime("%Y-%m-%dT%H:%M:%S.0000", time.gmtime(t)) if t else None 

62 

63 

64class ContainerCustomization: 

65 """Local customization of a Docker container, by label or env var""" 

66 

67 label_prefix: str = "updates2mqtt." 

68 env_prefix: str = "UPD2MQTT_" 

69 

70 def __init__(self, container: Container) -> None: 

71 self.update: UpdatePolicy = UpdatePolicy.PASSIVE # was known as UPD2MQTT_UPDATE before policies and labels 

72 self.git_repo_path: str | None = None 

73 self.picture: str | None = None 

74 self.relnotes: str | None = None 

75 self.ignore: bool = False 

76 self.version_policy: VersionPolicy | None = None 

77 self.registry_token: str | None = None 

78 

79 if not container.attrs or container.attrs.get("Config") is None: 

80 return 

81 env_pairs: list[str] = container.attrs.get("Config", {}).get("Env") 

82 if env_pairs: 

83 c_env: dict[str, str] = dict(env.split("=", maxsplit=1) for env in env_pairs if "==" not in env) 

84 else: 

85 c_env = {} 

86 

87 for attr in dir(self): 

88 if "__" not in attr: 

89 label = f"{self.label_prefix}{attr.lower()}" 

90 env_var = f"{self.env_prefix}{attr.upper()}" 

91 v: Any = None 

92 if label in container.labels: 

93 # precedence to labels 

94 v = container.labels.get(label) 

95 log.debug( 

96 "%s set from label %s=%s", 

97 attr, 

98 label, 

99 v, 

100 integration="docker", 

101 container=container.name, 

102 action="customize", 

103 ) 

104 elif env_var in c_env: 

105 v = c_env[env_var] 

106 log.debug( 

107 "%s set from env var %s=%s", 

108 attr, 

109 env_var, 

110 v, 

111 integration="docker", 

112 container=container.name, 

113 action="customize", 

114 ) 

115 if v is not None: 

116 if isinstance(getattr(self, attr), bool): 

117 setattr(self, attr, v.upper() in ("TRUE", "YES", "1")) 

118 elif isinstance(getattr(self, attr), VersionPolicy): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 setattr(self, attr, VersionPolicy[v.upper()]) 

120 elif isinstance(getattr(self, attr), UpdatePolicy): 

121 setattr(self, attr, UpdatePolicy[v.upper()]) 

122 else: 

123 setattr(self, attr, v) 

124 

125 

126class DockerProvider(ReleaseProvider): 

127 def __init__( 

128 self, 

129 cfg: DockerConfig, 

130 node_cfg: NodeConfig, 

131 packages: dict[str, PackageUpdateInfo] | None = None, 

132 github_cfg: GitHubConfig | None = None, 

133 self_bounce: Event | None = None, 

134 ) -> None: 

135 super().__init__(node_cfg, "docker") 

136 self.client: docker.DockerClient = docker.from_env() 

137 self.cfg: DockerConfig = cfg 

138 

139 # TODO: refresh discovered packages periodically 

140 self.throttler = Throttler(self.cfg.default_api_backoff, self.log, self.stopped) 

141 self.self_bounce: Event | None = self_bounce 

142 

143 self.pkg_enrichers: list[PackageEnricher] = [ 

144 CommonPackageEnricher(self.cfg, packages), 

145 LinuxServerIOPackageEnricher(self.cfg), 

146 DefaultPackageEnricher(self.cfg), 

147 ] 

148 self.docker_client_image_lookup = DockerClientVersionLookup( 

149 self.client, self.throttler, self.cfg.registry, self.cfg.default_api_backoff 

150 ) 

151 self.registry_image_lookup = ContainerDistributionAPIVersionLookup(self.throttler, self.cfg.registry) 

152 self.release_enricher = SourceReleaseEnricher(github_cfg) 

153 self.local_info_builder = LocalContainerInfo() 

154 

155 def initialize(self) -> None: 

156 for enricher in self.pkg_enrichers: 

157 enricher.initialize() 

158 self.log.debug("Docker provider initialized") 

159 

160 def update(self, discovery: Discovery) -> bool: 

161 logger: Any = self.log.bind(container=discovery.name, action="update") 

162 logger.info("Updating - last at %s", discovery.update_last_attempt) 

163 discovery.update_last_attempt = time.time() 

164 self.fetch(discovery) 

165 restarted = self.restart(discovery) 

166 logger.info("Updated - recorded at %s", discovery.update_last_attempt) 

167 return restarted 

168 

169 def fetch(self, discovery: Discovery) -> None: 

170 logger = self.log.bind(container=discovery.name, action="fetch") 

171 installed_info: DockerImageInfo | None = cast("DockerImageInfo|None", discovery.current_detail) 

172 service_info: DockerServiceDetails | None = cast("DockerServiceDetails|None", discovery.installation_detail) 

173 

174 image_ref: str | None = installed_info.ref if installed_info else None 

175 platform: str | None = installed_info.platform if installed_info else None 

176 if discovery.can_pull and image_ref: 

177 logger.info("Pulling", image_ref=image_ref, platform=platform) 

178 image: Image = self.client.images.pull(image_ref, platform=platform, all_tags=False) 

179 if image: 179 ↛ 182line 179 didn't jump to line 182 because the condition on line 179 was always true

180 logger.info("Pulled", image_id=image.id, image_ref=image_ref, platform=platform) 

181 else: 

182 logger.warn("Unable to pull", image_ref=image_ref, platform=platform) 

183 elif discovery.can_build and service_info: 

184 compose_path: str | None = service_info.compose_path 

185 git_repo_path: str | None = service_info.git_repo_path 

186 logger.debug("can_build check", git_repo=git_repo_path) 

187 if not compose_path or not git_repo_path: 

188 logger.warn("No compose path or git repo path configured, skipped build") 

189 return 

190 

191 full_repo_path: Path = self.full_repo_path(compose_path, git_repo_path) 

192 if git_pull(full_repo_path, Path(self.node_cfg.git_path)): 

193 self.build(discovery) 

194 else: 

195 logger.debug("Skipping git_pull, no update") 

196 

197 def full_repo_path(self, compose_path: str, git_repo_path: str) -> Path: 

198 if compose_path is None or git_repo_path is None: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true

199 raise ValueError("Unexpected null paths") 

200 if compose_path and not Path(git_repo_path).is_absolute(): 200 ↛ 202line 200 didn't jump to line 202 because the condition on line 200 was always true

201 return Path(compose_path) / git_repo_path 

202 return Path(git_repo_path) 

203 

204 def build(self, discovery: Discovery) -> bool: 

205 logger = self.log.bind(container=discovery.name, action="build") 

206 service_info: DockerServiceDetails | None = cast("DockerServiceDetails|None", discovery.installation_detail) 

207 

208 if not service_info or not service_info.compose_path: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 logger.warn("No service_info available on compose") 

210 return False 

211 logger.info("Building", compose_path=service_info.compose_path, service=service_info.compose_service) 

212 return self.execute_compose( 

213 command=DockerComposeCommand.BUILD, 

214 args="", 

215 service=service_info.compose_service, 

216 cwd=service_info.compose_path, 

217 logger=logger, 

218 ) 

219 

220 def execute_compose( 

221 self, command: DockerComposeCommand, args: str, service: str | None, cwd: str | None, logger: structlog.BoundLogger 

222 ) -> bool: 

223 if not cwd or not Path(cwd).is_dir(): 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 logger.warn("Invalid compose path, skipped %s", command) 

225 return False 

226 

227 cmd: str = "docker-compose" if self.cfg.compose_version == "v1" else "docker compose" 

228 logger.info(f"Executing {cmd} {command} {args} {service}") 

229 cmd = cmd + " " + command.value 

230 if args: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 cmd = cmd + " " + args 

232 if service: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true

233 cmd = cmd + " " + service 

234 

235 proc: subprocess.CompletedProcess[str] = subprocess.run(cmd, check=False, shell=True, cwd=cwd, text=True) 

236 if proc.returncode == 0: 

237 logger.info(f"{command} via compose successful") 

238 return True 

239 if proc.stderr and "unknown command: docker compose" in proc.stderr: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 logger.warning("docker compose set to wrong version, seems like v1 installed") 

241 self.cfg.compose_version = "v1" 

242 logger.warn( 

243 f"{command} failed: %s", 

244 proc.returncode, 

245 ) 

246 return False 

247 

248 def restart(self, discovery: Discovery) -> bool: 

249 logger = self.log.bind(container=discovery.name, action="restart") 

250 installed_info: DockerImageInfo | None = cast("DockerImageInfo|None", discovery.current_detail) 

251 service_info: DockerServiceDetails | None = cast("DockerServiceDetails|None", discovery.installation_detail) 

252 

253 if ( 

254 self.self_bounce is not None 

255 and installed_info 

256 and service_info 

257 and ( 

258 "ghcr.io/rhizomatics/updates2mqtt" in installed_info.ref 

259 or (service_info.git_repo_path and service_info.git_repo_path.endswith("updates2mqtt")) 

260 ) 

261 ): 

262 logger.warning("Attempting to self-bounce") 

263 self.self_bounce.set() 

264 if service_info is None: 

265 return False 

266 return self.execute_compose( 

267 command=DockerComposeCommand.UP, 

268 args="--detach --yes", 

269 service=service_info.compose_service, 

270 cwd=service_info.compose_path, 

271 logger=logger, 

272 ) 

273 

274 def rescan(self, discovery: Discovery) -> Discovery | None: 

275 logger: Any = self.log.bind(container=discovery.name, action="rescan") 

276 try: 

277 c: Container = self.client.containers.get(discovery.name) 

278 if c: 278 ↛ 283line 278 didn't jump to line 283 because the condition on line 278 was always true

279 rediscovery: Discovery | None = self.analyze(c, discovery.session, previous_discovery=discovery) 

280 if rediscovery and not rediscovery.throttled: 280 ↛ 283line 280 didn't jump to line 283 because the condition on line 280 was always true

281 self.discoveries[rediscovery.name] = rediscovery 

282 return rediscovery 

283 logger.warn("Unable to find container for rescan") 

284 except docker.errors.NotFound: 

285 logger.warn("Container not found in Docker") 

286 except docker.errors.APIError: 

287 logger.exception("Docker API error retrieving container") 

288 return None 

289 

290 def analyze(self, c: Container, session: str, previous_discovery: Discovery | None = None) -> Discovery | None: 

291 logger = self.log.bind(container=c.name, action="analyze") 

292 

293 if c.attrs is None or not c.attrs: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true

294 logger.warn("No container attributes found, discovery rejected") 

295 return None 

296 if c.name is None: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 logger.warn("No container name found, discovery rejected") 

298 return None 

299 

300 customization: ContainerCustomization = ContainerCustomization(c) 

301 if customization.ignore: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 logger.info("Container ignored due to UPD2MQTT_IGNORE setting") 

303 return None 

304 

305 if customization.update == UpdatePolicy.AUTO: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 logger.debug("Auto update policy detected") 

307 update_policy: UpdatePolicy = customization.update or UpdatePolicy.PASSIVE 

308 

309 local_info: DockerImageInfo 

310 service_info: DockerServiceDetails 

311 local_info, service_info = self.local_info_builder.build_image_info(c) 

312 pkg_info: PackageUpdateInfo = self.default_metadata(local_info) 

313 

314 version_policy: VersionPolicy 

315 if customization.version_policy: 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true

316 logger.debug("Overriding version_policy to local customization: %s", customization.version_policy) 

317 version_policy = customization.version_policy 

318 else: 

319 if self.cfg.version_policy == VersionPolicy.AUTO and pkg_info.docker: 319 ↛ 325line 319 didn't jump to line 325 because the condition on line 319 was always true

320 logger.debug( 

321 "Version policy, pkg level %s, config level: %s", pkg_info.docker.version_policy, self.cfg.version_policy 

322 ) 

323 version_policy = pkg_info.docker.version_policy or self.cfg.version_policy 

324 else: 

325 logger.debug("Version policy, fixed config level: %s", self.cfg.version_policy) 

326 version_policy = self.cfg.version_policy 

327 

328 try: 

329 service_info.git_repo_path = customization.git_repo_path 

330 

331 registry_selection = Selection(self.cfg.registry_select, local_info.index_name) 

332 latest_info: DockerImageInfo 

333 if local_info.pinned: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true

334 logger.debug("Skipping registry fetch for local pinned image, %s", local_info.ref) 

335 latest_info = local_info.reuse() 

336 elif registry_selection and local_info.ref and not local_info.local_build: 

337 if self.cfg.registry.api == RegistryAPI.DOCKER_CLIENT: 

338 latest_info = self.docker_client_image_lookup.lookup(local_info) 

339 elif self.cfg.registry.api == RegistryAPI.OCI_V2: 339 ↛ 341line 339 didn't jump to line 341 because the condition on line 339 was always true

340 latest_info = self.registry_image_lookup.lookup(local_info, token=customization.registry_token) 

341 elif self.cfg.registry.api == RegistryAPI.OCI_V2_MINIMAL: 

342 latest_info = self.registry_image_lookup.lookup( 

343 local_info, token=customization.registry_token, minimal=True 

344 ) 

345 else: # assuming RegistryAPI.DISABLED 

346 logger.debug(f"Skipping registry check, disabled in config {self.cfg.registry.api}") 

347 latest_info = local_info.reuse() 

348 elif local_info.local_build: 348 ↛ 354line 348 didn't jump to line 354 because the condition on line 348 was always true

349 # assume its a locally built image if no RepoDigests available 

350 latest_info = local_info.reuse() 

351 latest_info.short_digest = None 

352 latest_info.image_digest = None 

353 else: 

354 logger.debug("Registry selection rules suppressed metadata lookup") 

355 latest_info = local_info.reuse() 

356 

357 release_info: ReleaseDetail | None = self.release_enricher.enrich( 

358 latest_info, 

359 source_repo_url=pkg_info.source_repo_url, 

360 notes_url=customization.relnotes or pkg_info.release_notes_url, 

361 ) 

362 logger.debug("Enriched release info: %s", release_info) 

363 

364 if service_info.git_repo_path and service_info.compose_path: 

365 full_repo_path: Path = Path(service_info.compose_path).joinpath(service_info.git_repo_path) 

366 

367 git_trust(full_repo_path, Path(self.node_cfg.git_path)) 

368 service_info.git_local_timestamp = git_iso_timestamp(full_repo_path, Path(self.node_cfg.git_path)) 

369 

370 can_pull: bool = ( 

371 self.cfg.allow_pull 

372 and not local_info.local_build 

373 and local_info.ref is not None 

374 and local_info.ref != "" 

375 and (local_info.short_digest is not None or latest_info.short_digest is not None) 

376 ) 

377 if self.cfg.allow_pull and not can_pull: 

378 logger.debug( 

379 f"Pull unavailable, ref:{local_info.ref},local:{local_info.short_digest},latest:{latest_info.short_digest}" 

380 ) 

381 

382 can_build: bool = False 

383 if self.cfg.allow_build: 383 ↛ 408line 383 didn't jump to line 408 because the condition on line 383 was always true

384 can_build = service_info.git_repo_path is not None and service_info.compose_path is not None 

385 if not can_build: 

386 if service_info.git_repo_path is not None: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true

387 logger.debug( 

388 "Local build ignored for git_repo_path=%s because no compose_path", service_info.git_repo_path 

389 ) 

390 else: 

391 full_repo_path = self.full_repo_path( 

392 cast("str", service_info.compose_path), cast("str", service_info.git_repo_path) 

393 ) 

394 if local_info.local_build and full_repo_path: 394 ↛ 408line 394 didn't jump to line 408 because the condition on line 394 was always true

395 git_versionish = git_local_digest(full_repo_path, Path(self.node_cfg.git_path)) 

396 if git_versionish: 

397 local_info.git_digest = git_versionish 

398 logger.debug("Git digest for local code %s", git_versionish) 

399 

400 behind_count: int = git_check_update_available(full_repo_path, Path(self.node_cfg.git_path)) 

401 if behind_count > 0: 

402 latest_info.git_digest = f"{git_versionish}+{behind_count}" 

403 logger.info("Git update available, generating version %s", latest_info.git_digest) 

404 else: 

405 logger.debug(f"Git update not available, local repo:{full_repo_path}") 

406 latest_info.git_digest = git_versionish 

407 

408 can_restart: bool = self.cfg.allow_restart and service_info.compose_path is not None 

409 

410 if can_pull: 

411 update_type = "Docker Image" 

412 elif can_build: 412 ↛ 415line 412 didn't jump to line 415 because the condition on line 412 was always true

413 update_type = "Docker Build" 

414 else: 

415 update_type = "Unavailable" 

416 

417 # can_pull,can_build etc are only info flags 

418 # the HASS update process is driven by comparing current and available versions 

419 

420 public_installed_version: str 

421 public_latest_version: str 

422 version_basis: str 

423 public_installed_version, public_latest_version, version_basis = select_versions( 

424 version_policy, local_info, latest_info 

425 ) 

426 

427 publish_policy: PublishPolicy = PublishPolicy.HOMEASSISTANT 

428 img_ref_selection = Selection(self.cfg.image_ref_select, local_info.ref) 

429 version_selection = Selection(self.cfg.version_select, latest_info.version) 

430 if not img_ref_selection or not version_selection: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true

431 self.log.info( 

432 "Excluding from HA Discovery for include/exclude rule: %s, %s", local_info.ref, latest_info.version 

433 ) 

434 publish_policy = PublishPolicy.MQTT 

435 

436 discovery: Discovery = Discovery( 

437 self, 

438 c.name, 

439 session, 

440 node=self.node_cfg.name, 

441 entity_picture_url=customization.picture or pkg_info.logo_url, 

442 current_version=public_installed_version, 

443 publish_policy=publish_policy, 

444 update_policy=update_policy, 

445 version_policy=version_policy, 

446 version_basis=version_basis, 

447 latest_version=public_latest_version, 

448 device_icon=self.cfg.device_icon, 

449 can_pull=can_pull, 

450 update_type=update_type, 

451 can_build=can_build, 

452 can_restart=can_restart, 

453 status=(c.status == "running" and "on") or "off", 

454 throttled=latest_info.throttled, 

455 previous=previous_discovery, 

456 release_detail=release_info, 

457 installation_detail=service_info, 

458 current_detail=local_info, 

459 latest_detail=latest_info, 

460 ) 

461 logger.debug("Analyze generated discovery: %s", discovery) 

462 return discovery 

463 except Exception: 

464 logger.exception("Docker Discovery Failure", container_attrs=c.attrs) 

465 logger.debug("Analyze returned empty discovery") 

466 return None 

467 

468 # def version(self, c: Container, version_type: str): 

469 # metadata_version: str = c.labels.get("org.opencontainers.image.version") 

470 # metadata_revision: str = c.labels.get("org.opencontainers.image.revision") 

471 

472 async def scan(self, session: str, shuffle: bool = True) -> AsyncGenerator[Discovery]: 

473 logger = self.log.bind(session=session, action="scan", source=self.source_type) 

474 containers: int = 0 

475 results: int = 0 

476 throttled: int = 0 

477 

478 targets: list[Container] = self.client.containers.list() 

479 if shuffle: 479 ↛ 481line 479 didn't jump to line 481 because the condition on line 479 was always true

480 random.shuffle(targets) 

481 logger.debug("Starting scanning %s containers", len(targets)) 

482 for c in targets: 

483 logger.debug("Analyzing container", container=c.name) 

484 if self.stopped.is_set(): 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true

485 logger.info(f"Shutdown detected, aborting scan at {c}") 

486 break 

487 containers = containers + 1 

488 result: Discovery | None = self.analyze(c, session) 

489 if result: 489 ↛ 496line 489 didn't jump to line 496 because the condition on line 489 was always true

490 logger.debug("Analyzed container", result_name=result.name, throttled=result.throttled) 

491 self.discoveries[result.name] = result 

492 results = results + 1 

493 throttled += 1 if result.throttled else 0 

494 yield result 

495 else: 

496 logger.debug("No result from analysis", container=c.name) 

497 logger.info("Completed", container_count=containers, throttled_count=throttled, result_count=results) 

498 

499 def command(self, discovery_name: str, command: str, on_update_start: Callable, on_update_end: Callable) -> bool: 

500 logger = self.log.bind(container=discovery_name, action="command", command=command) 

501 logger.info("Executing Command") 

502 discovery: Discovery | None = None 

503 updated: bool = False 

504 try: 

505 discovery = self.resolve(discovery_name) 

506 if not discovery: 

507 logger.warn("Unknown entity", entity=discovery_name) 

508 elif command != "install": 

509 logger.warn("Unknown command") 

510 else: 

511 if discovery.can_update: 

512 rediscovery: Discovery | None = None 

513 logger.info("Starting update ...") 

514 on_update_start(discovery) 

515 if self.update(discovery): 

516 logger.debug("Rescanning ...") 

517 rediscovery = self.rescan(discovery) 

518 updated = rediscovery is not None and not rediscovery.throttled 

519 logger.info("Rescanned, updated:%s", updated) 

520 else: 

521 logger.info("Rescan with no result") 

522 on_update_end(rediscovery or discovery) 

523 else: 

524 logger.warning("Update not supported for this container") 

525 except Exception: 

526 logger.exception("Failed to handle", discovery_name=discovery_name, command=command) 

527 if discovery: 527 ↛ 529line 527 didn't jump to line 529 because the condition on line 527 was always true

528 on_update_end(discovery) 

529 return updated 

530 

531 def resolve(self, discovery_name: str) -> Discovery | None: 

532 return self.discoveries.get(discovery_name) 

533 

534 def default_metadata(self, image_info: DockerImageInfo) -> PackageUpdateInfo: 

535 for enricher in self.pkg_enrichers: 535 ↛ 539line 535 didn't jump to line 539 because the loop on line 535 didn't complete

536 pkg_info: PackageUpdateInfo | None = enricher.enrich(image_info) 

537 if pkg_info is not None: 

538 return pkg_info 

539 raise ValueError("No enricher could provide metadata, not even default enricher") 

540 

541 

542def select_versions(version_policy: VersionPolicy, installed: DockerImageInfo, latest: DockerImageInfo) -> tuple[str, str, str]: 

543 """Pick the best version string to display based on the version policy and available data 

544 

545 Ensures that both local installed and remote latest versions are derived in same way 

546 Falls back to digest if version not reliable or not consistent with current/available version 

547 """ 

548 phase: int = 0 

549 shortcircuit: str | None = None 

550 

551 def basis(rule: str) -> str: 

552 return f"{rule}-{phase}" if not shortcircuit else f"{rule}-{phase}-{shortcircuit}" 

553 

554 # 

555 # Detect No Update Available 

556 # -------------------------- 

557 # 

558 # shortcircuit the logic if there's nothing to compare 

559 # 

560 if latest.throttled: 

561 log.debug("Flattening versions for throttled update %s", installed.ref) 

562 shortcircuit = "THR" 

563 latest = installed 

564 elif not any((latest.short_digest, latest.repo_digest, latest.git_digest, latest.version)): 

565 log.debug("Flattening versions for empty update %s", installed.ref) 

566 shortcircuit = "NUP" 

567 latest = installed 

568 elif latest.short_digest == installed.short_digest and latest.short_digest is not None: 

569 log.debug("Flattening versions for identical update %s", installed.ref) 

570 shortcircuit = "SDM" 

571 latest = installed 

572 elif installed.image_digest in latest.repo_digests: 572 ↛ 574line 572 didn't jump to line 574 because the condition on line 572 was never true

573 # TODO: avoid this by better adaptations for different registries and single/multi manifests 

574 log.debug( 

575 "Matching new repo_digest against installed image digest for %s image %s", installed.index_name, installed.name 

576 ) 

577 shortcircuit = "FGA" 

578 latest = installed 

579 elif latest.image_digest in installed.repo_digests: 579 ↛ 581line 579 didn't jump to line 581 because the condition on line 579 was never true

580 # TODO: avoid this by better adaptations for different registries and single/multi manifests 

581 log.debug( 

582 "Matching new image_digest against installed repo digest for %s image %s", installed.index_name, installed.name 

583 ) 

584 shortcircuit = "FGB" 

585 latest = installed 

586 

587 # 

588 # Explicit Policy Choice 

589 # ---------------------- 

590 # 

591 

592 if version_policy == VersionPolicy.VERSION and installed.version and latest.version: 

593 return installed.version, latest.version, basis("version") 

594 

595 installed_digest_available: bool = installed.short_digest is not None and installed.short_digest != "" 

596 latest_digest_available: bool = latest.short_digest is not None and latest.short_digest != "" 

597 matching_digest: bool = ( 

598 installed_digest_available and latest_digest_available and installed.short_digest == latest.short_digest 

599 ) 

600 changed_digest: bool = ( 

601 installed_digest_available and latest_digest_available and installed.short_digest != latest.short_digest 

602 ) 

603 

604 if version_policy == VersionPolicy.DIGEST and installed_digest_available and latest_digest_available: 

605 return installed.short_digest, latest.short_digest, basis("digest") # type: ignore[return-value] 

606 if ( 

607 version_policy == VersionPolicy.VERSION_DIGEST 

608 and installed.version 

609 and latest.version 

610 and installed_digest_available 

611 and latest_digest_available 

612 ): 

613 return ( 

614 f"{installed.version}:{installed.short_digest}", 

615 f"{latest.version}:{latest.short_digest}", 

616 basis("version-digest"), 

617 ) 

618 

619 if ( 

620 version_policy == VersionPolicy.TIMESTAMP 

621 and installed.created 

622 and latest.created 

623 and ( 

624 (latest.created > installed.created and changed_digest) or (latest.created == installed.created and matching_digest) 

625 ) 

626 ): 

627 return installed.created, latest.created, basis("timestamp") 

628 

629 # 

630 # Auto Policy - Humane Versions 

631 # ----------------------------- 

632 # 

633 phase = 1 

634 if ( 

635 version_policy == VersionPolicy.AUTO 

636 and installed.version 

637 and latest.version 

638 and ( 

639 (installed.version == latest.version and matching_digest) 

640 or (installed.version != latest.version and changed_digest) 

641 ) 

642 ): 

643 # detect semver, or v semver (e.g. v1.030) 

644 # only use this if both version and digest are consistently agreeing or disagreeing 

645 # if the strict conditions work, people see nice version numbers on screen rather than hashes 

646 if re.fullmatch(SEMVER_RE, installed.version or "") and re.fullmatch(SEMVER_RE, latest.version or ""): 

647 # Smells like semver, override if not using version_policy 

648 return installed.version, latest.version, basis("semver") 

649 if re.fullmatch(VERSION_RE, installed.version or "") and re.fullmatch(VERSION_RE, latest.version or ""): 649 ↛ 653line 649 didn't jump to line 653 because the condition on line 649 was always true

650 # Smells like casual semver, override if not using version_policy 

651 return installed.version, latest.version, basis("casualver") 

652 

653 if ( 

654 version_policy == VersionPolicy.AUTO 

655 and installed.tag 

656 and latest.tag 

657 and ((installed.tag == latest.tag and matching_digest) or (installed.tag != latest.tag and changed_digest)) 

658 ): 

659 if re.fullmatch(SEMVER_RE, installed.tag) and re.fullmatch(SEMVER_RE, latest.tag): 

660 return installed.tag, latest.tag, basis("semver-tag") 

661 if re.fullmatch(SEMVER_RE, installed.tag) and re.fullmatch(SEMVER_RE, latest.tag): 661 ↛ 662line 661 didn't jump to line 662 because the condition on line 661 was never true

662 return installed.tag, latest.tag, basis("semver-tag") 

663 

664 # 

665 # Local Builds 

666 # ------------ 

667 # 

668 phase = 2 

669 if installed.git_digest and latest.git_digest: 

670 return f"git:{installed.git_digest}", f"git:{latest.git_digest}", basis("git") 

671 

672 # 

673 # Fall Back - Qualified Versions 

674 # -------------------------------- 

675 # 

676 phase = 3 

677 if ( 677 ↛ 685line 677 didn't jump to line 685 because the condition on line 677 was never true

678 installed.version 

679 and latest.version 

680 and ( 

681 (installed.version == latest.version and matching_digest) 

682 or (installed.version != latest.version and changed_digest) 

683 ) 

684 ): 

685 return ( 

686 f"{installed.version}:{installed.short_digest}", 

687 f"{latest.version}:{latest.short_digest}", 

688 basis("version-digest"), 

689 ) 

690 

691 # 

692 # Fall Back - Timestamp, Digest, Version 

693 # -------------------------------------- 

694 

695 phase = 4 

696 if ( 

697 installed.created 

698 and latest.created 

699 and ( 

700 (latest.created > installed.created and changed_digest) or (latest.created == installed.created and matching_digest) 

701 ) 

702 ): 

703 return installed.created, latest.created, basis("timestamp") 

704 if installed_digest_available and latest_digest_available: 

705 return installed.short_digest, latest.short_digest, basis("digest") # type: ignore[return-value] 

706 if installed.version and not latest.version and not latest.short_digest and not latest.repo_digest: 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true

707 return installed.version, installed.version, basis("version") 

708 

709 # 

710 # Fall Back - Missing Digests 

711 # --------------------------- 

712 phase = 5 

713 if not installed_digest_available and latest_digest_available: 713 ↛ 715line 713 didn't jump to line 715 because the condition on line 713 was never true

714 # odd condition if local image has no identity, even out versions so no update alert 

715 return latest.short_digest, latest.short_digest, basis("digest") # type: ignore[return-value] 

716 

717 # 

718 # Fall Back - Repo Digests 

719 # --------------------------- 

720 phase = 6 

721 

722 def condense_repo_id(i: DockerImageInfo) -> str: 

723 v: str | None = i.condense_digest(i.repo_digest) if i.repo_digest else None 

724 return v or "" 

725 

726 if installed.repo_digest and latest.repo_digest: 

727 # where the image digest isn't available, fall back to a repo digest 

728 return condense_repo_id(installed), condense_repo_id(latest), basis("repo-digest") 

729 

730 phase = 7 

731 if latest.repo_digest and latest.repo_digest in installed.repo_digests: 731 ↛ 735line 731 didn't jump to line 735 because the condition on line 731 was always true

732 # installed has multiple RepoDigests from multiple pulls and one of them matches latest current repo digest 

733 return condense_repo_id(latest), condense_repo_id(latest), basis("repo-digest") 

734 

735 if installed_digest_available and not latest_digest_available: 

736 # no new digest, so latest is the current 

737 return installed.short_digest, installed.short_digest, basis("digest") # type: ignore[return-value] 

738 

739 # 

740 # Failure to Find Any Version 

741 # --------------------------- 

742 # 

743 log.warn("No versions can be determined for %s", installed.ref) 

744 phase = 999 

745 return UNKNOWN_VERSION, UNKNOWN_VERSION, basis("failure")