Coverage for src/updates2mqtt/integrations/docker_enrich.py: 83%

521 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-23 07:50 +0000

1import re 

2import typing 

3from abc import abstractmethod 

4from typing import Any, cast 

5 

6import structlog 

7from docker.auth import resolve_repository_name 

8from docker.models.containers import Container 

9from omegaconf import MissingMandatoryValue, OmegaConf, ValidationError 

10 

11from updates2mqtt.helpers import ( 

12 APIStatsCounter, 

13 CacheMetadata, 

14 ThrottledError, 

15 Throttler, 

16 fetch_url, 

17 httpx_json_content, 

18 validate_url, 

19) 

20from updates2mqtt.model import DiscoveryArtefactDetail, DiscoveryInstallationDetail, ReleaseDetail 

21 

22if typing.TYPE_CHECKING: 

23 from docker.models.images import RegistryData 

24 from httpx import Response 

25 from omegaconf.dictconfig import DictConfig 

26from http import HTTPStatus 

27 

28import docker 

29import docker.errors 

30 

31from updates2mqtt.config import ( 

32 PKG_INFO_FILE, 

33 SOURCE_PLATFORM_GITHUB, 

34 SOURCE_PLATFORM_GITLAB, 

35 SOURCE_PLATFORMS, 

36 CommonPackages, 

37 DockerConfig, 

38 DockerPackageUpdateInfo, 

39 MetadataSourceConfig, 

40 PackageUpdateInfo, 

41 RegistryConfig, 

42 VersionPolicy, 

43 docker_image_names, 

44) 

45 

46log: Any = structlog.get_logger() 

47 

48DIFF_URL_TEMPLATES = { 

49 SOURCE_PLATFORM_GITHUB: "{repo}/commit/{revision}", 

50} 

51RELEASE_URL_TEMPLATES = { 

52 SOURCE_PLATFORM_GITHUB: "{repo}/releases/tag/{version}", 

53} 

54UNKNOWN_RELEASE_URL_TEMPLATES = { 

55 SOURCE_PLATFORM_GITHUB: "{repo}/releases", 

56 SOURCE_PLATFORM_GITLAB: "{repo}/container_registry", 

57} 

58MISSING_VAL = "**MISSING**" 

59UNKNOWN_REGISTRY = "**UNKNOWN_REGISTRY**" 

60UNKNOWN_NAME = "**UNKNOWN_NAME**" 

61 

62HEADER_DOCKER_DIGEST = "docker-content-digest" 

63HEADER_DOCKER_API = "docker-distribution-api-version" 

64 

65TOKEN_URL_TEMPLATE = "https://{auth_host}/token?scope=repository:{image_name}:pull&service={service}" # noqa: S105 # nosec 

66 

67REGISTRY_GHCR = "ghcr.io" 

68REGISTRY_DOCKER = "docker.io" 

69REGISTRY_MCR = "mcr.microsoft.com" 

70REGISTRY_QUAY = "quay.io" 

71REGISTRY_LSCR = "lscr.io" 

72REGISTRY_CODEBERG = "codeberg.org" 

73REGISTRY_GITLAB = "registry.gitlab.com" 

74 

75 

76class RegistryInfo(typing.NamedTuple): 

77 auth_host: str | None 

78 api_host: str 

79 service: str 

80 url_template: str | None 

81 repo_template: str | None 

82 

83 

84REGISTRIES: dict[str, RegistryInfo] = { 

85 REGISTRY_DOCKER: RegistryInfo("auth.docker.io", "registry-1.docker.io", "registry.docker.io", TOKEN_URL_TEMPLATE, None), 

86 REGISTRY_MCR: RegistryInfo(None, "mcr.microsoft.com", "mcr.microsoft.com", None, None), 

87 REGISTRY_QUAY: RegistryInfo(None, "quay.io", "quay.io", TOKEN_URL_TEMPLATE, None), 

88 REGISTRY_GHCR: RegistryInfo("ghcr.io", "ghcr.io", "ghcr.io", TOKEN_URL_TEMPLATE, "https://github.com/{image_name}"), 

89 "lscr.io": RegistryInfo("ghcr.io", "lscr.io", "ghcr.io", TOKEN_URL_TEMPLATE, None), 

90 REGISTRY_CODEBERG: RegistryInfo( 

91 "codeberg.org", 

92 "codeberg.org", 

93 "container_registry", 

94 TOKEN_URL_TEMPLATE, 

95 "https://codeberg.org/{image_name}", 

96 ), 

97 REGISTRY_GITLAB: RegistryInfo( 

98 "www.gitlab.com", 

99 "registry.gitlab.com", 

100 "container_registry", 

101 "https://{auth_host}/jwt/auth?service={service}&scope=repository:{image_name}:pull&offline_token=true&client_id=docker", 

102 "https://gitlab.com/{image_name}", 

103 ), 

104} 

105 

106# source: https://specs.opencontainers.org/distribution-spec/?v=v1.0.0#pull 

107OCI_NAME_RE = r"[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*(\/[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*)*" 

108OCI_TAG_RE = r"[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}" 

109 

110 

111class DockerImageInfo(DiscoveryArtefactDetail): 

112 """Normalize and shlep around the bits of an image def 

113 

114 index_name: aka index_name, e.g. ghcr.io 

115 name: image ref without index name or tag, e.g. nginx, or librenms/librenms 

116 tag: tag or digest 

117 untagged_ref: combined index name and package name 

118 """ 

119 

120 def __init__( 

121 self, 

122 ref: str, # ref with optional index name and tag or digest, index:name:tag_or_digest 

123 image_digest: str | None = None, 

124 tags: list[str] | None = None, 

125 attributes: dict[str, Any] | None = None, 

126 annotations: dict[str, Any] | None = None, 

127 platform: str | None = None, # test harness simplification 

128 version: str | None = None, # test harness simplification 

129 created: str | None = None, 

130 ) -> None: 

131 super().__init__() 

132 self.ref: str = ref 

133 self.version: str | None = version 

134 self.image_digest: str | None = image_digest 

135 self.short_digest: str | None = None 

136 self.repo_digest: str | None = None # the single RepoDigest known to match registry 

137 self.git_digest: str | None = None 

138 self.index_name: str | None = None 

139 self.name: str | None = None 

140 self.tag: str | None = None 

141 self.pinned_digest: str | None = None 

142 # untagged ref using combined index and remote name used only for pattern matching common pkg info 

143 self.untagged_ref: str | None = None # index_name/remote_name used for pkg match 

144 self.tag_or_digest: str | None = None # index_name/remote_name:**tag_or_digest** 

145 self.tags = tags 

146 self.attributes: dict[str, Any] = attributes or {} 

147 self.annotations: dict[str, Any] = annotations or {} 

148 self.throttled: bool = False 

149 self.origin: str | None = None 

150 self.error: str | None = None 

151 self.platform: str | None = platform 

152 self.custom: dict[str, str | float | int | bool | None] = {} 

153 self.created: str | None = created 

154 

155 self.local_build: bool = not self.repo_digests 

156 self.index_name, remote_name = resolve_repository_name(ref) 

157 

158 self.name = remote_name 

159 

160 if remote_name and ":" in remote_name and ("@" not in remote_name or remote_name.index("@") > remote_name.index(":")): 

161 # name:tag format 

162 self.name, self.tag_or_digest = remote_name.split(":", 1) 

163 self.untagged_ref = ref.split(":", 1)[0] 

164 self.tag = self.tag_or_digest 

165 

166 elif remote_name and "@" in remote_name: 

167 # name@digest format 

168 self.name, self.tag_or_digest = remote_name.split("@", 1) 

169 self.untagged_ref = ref.split("@", 1)[0] 

170 self.pinned_digest = self.tag_or_digest 

171 

172 if self.tag and "@" in self.tag: 

173 # name:tag@digest format 

174 # for pinned tags, care only about the digest part 

175 self.tag, self.tag_or_digest = self.tag.split("@", 1) 

176 self.pinned_digest = self.tag_or_digest 

177 if self.tag_or_digest is None: 

178 self.tag_or_digest = "latest" 

179 self.untagged_ref = ref 

180 self.tag = self.tag_or_digest 

181 

182 if self.repo_digest is None and len(self.repo_digests) == 1: 

183 # definite known RepoDigest 

184 # if its ambiguous, the final version selection will handle it 

185 self.repo_digest = self.repo_digests[0] 

186 

187 if self.index_name == "docker.io" and "/" not in self.name: 

188 # "official Docker images have an abbreviated library/foo name" 

189 self.name = f"library/{self.name}" 

190 if self.name is not None and not re.match(OCI_NAME_RE, self.name): 

191 log.warning("Invalid OCI image name: %s", self.name) 

192 if self.tag and not re.match(OCI_TAG_RE, self.tag): 

193 log.warning("Invalid OCI image tag: %s", self.tag) 

194 if "/" in self.name: 

195 self.unqualified_name: str = self.name.split("/", 1)[1] 

196 else: 

197 self.unqualified_name = self.name 

198 

199 if self.os and self.arch: 

200 self.platform = "/".join( 

201 filter( 

202 None, 

203 [self.os, self.arch, self.variant], 

204 ), 

205 ) 

206 

207 if self.image_digest is not None: 

208 self.image_digest = self.condense_digest(self.image_digest, short=False) 

209 self.short_digest = self.condense_digest(self.image_digest) # type: ignore[arg-type] 

210 

211 @property 

212 def repo_digests(self) -> list[str]: 

213 if self.repo_digest: 

214 return [self.repo_digest] 

215 # RepoDigest in image inspect, Registry Config object 

216 digests = [v.split("@", 1)[1] if "@" in v else v for v in self.attributes.get("RepoDigests", [])] 

217 return digests or [] 

218 

219 @property 

220 def pinned(self) -> bool: 

221 """Check if this is pinned and installed version consistent with pin""" 

222 return bool(self.pinned_digest and self.pinned_digest in self.repo_digests) 

223 

224 @property 

225 def os(self) -> str | None: 

226 return self.attributes.get("Os") 

227 

228 @property 

229 def arch(self) -> str | None: 

230 return self.attributes.get("Architecture") 

231 

232 @property 

233 def variant(self) -> str | None: 

234 return self.attributes.get("Variant") 

235 

236 def condense_digest(self, digest: str, short: bool = True) -> str | None: 

237 try: 

238 digest = digest.split("@")[1] if "@" in digest else digest # fully qualified RepoDigest 

239 if short: 

240 digest = digest.split(":")[1] if ":" in digest else digest # remove digest type prefix 

241 return digest[0:12] 

242 return digest 

243 except Exception as e: 

244 log.warning("Unable to condense digest %s: %s", digest, e) 

245 return None 

246 

247 def reuse(self) -> "DockerImageInfo": 

248 cloned = DockerImageInfo( 

249 self.ref, self.image_digest, self.tags, self.attributes, self.annotations, self.version, self.created 

250 ) 

251 cloned.origin = "REUSED" 

252 return cloned 

253 

254 def as_dict(self, minimal: bool = True) -> dict[str, str | list | dict | bool | int | None]: 

255 result: dict[str, str | list | dict | bool | int | None] = { 

256 "captured": self.captured.isoformat(), 

257 "image_ref": self.ref, 

258 "name": self.name, 

259 "version": self.version, 

260 "image_digest": self.image_digest, 

261 "repo_digest": self.repo_digest, 

262 "repo_digests": self.repo_digest, 

263 "git_digest": self.git_digest, 

264 "index_name": self.index_name, 

265 "tag": self.tag, 

266 "pinned_digest": self.pinned_digest, 

267 "tag_or_digest": self.tag_or_digest, 

268 "tags": self.tags, 

269 "origin": self.origin, 

270 "platform": self.platform, 

271 "local_build": self.local_build, 

272 "error": self.error, 

273 "throttled": self.throttled, 

274 "custom": self.custom, 

275 } 

276 if not minimal: 276 ↛ 279line 276 didn't jump to line 279 because the condition on line 276 was always true

277 result["attributes"] = self.attributes 

278 result["annotations"] = self.annotations 

279 return result 

280 

281 

282def id_source_platform(source: str | None) -> str | None: 

283 candidates: list[str] = [platform for platform, pattern in SOURCE_PLATFORMS.items() if re.match(pattern, source or "")] 

284 return candidates[0] if candidates else None 

285 

286 

287def _select_annotation( 

288 name: str, key: str, local_info: DockerImageInfo | None = None, registry_info: DockerImageInfo | None = None 

289) -> dict[str, str | None]: 

290 result: dict[str, str | None] = {} 

291 if registry_info: 

292 v: Any | None = registry_info.annotations.get(key) 

293 if v is not None: 

294 result[name] = v 

295 elif local_info: 295 ↛ 299line 295 didn't jump to line 299 because the condition on line 295 was always true

296 v = local_info.annotations.get(key) 

297 if v is not None: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 result[name] = v 

299 return result 

300 

301 

302def cherrypick_annotations( 

303 local_info: DockerImageInfo, registry_info: DockerImageInfo | None 

304) -> dict[str, str | float | int | bool | None]: 

305 """https://github.com/opencontainers/image-spec/blob/main/annotations.md""" 

306 results: dict[str, str | float | int | bool | None] = {} 

307 for either_name, either_label in [ 

308 ("documentation_url", "org.opencontainers.image.documentation"), 

309 ("description", "org.opencontainers.image.description"), 

310 ("licences", "org.opencontainers.image.licenses"), 

311 ("image_base", "org.opencontainers.image.base.name"), 

312 ("image_created", "org.opencontainers.image.created"), 

313 ("image_version", "org.opencontainers.image.version"), 

314 ("image_revision", "org.opencontainers.image.revision"), 

315 ("ref_name", "org.opencontainers.image.ref.name"), 

316 ("title", "org.opencontainers.image.title"), 

317 ("vendor", "org.opencontainers.image.vendor"), 

318 ("source", "org.opencontainers.image.source"), 

319 ]: 

320 results.update(_select_annotation(either_name, either_label, local_info, registry_info)) 

321 if ( 321 ↛ 327line 321 didn't jump to line 327 because the condition on line 321 was never true

322 results.get("ref_name") == "ubuntu" 

323 and local_info.name != "ubuntu" 

324 and results.get("image_version") 

325 and re.fullmatch(r"^2\d\.\d\d$", str(results["image_version"])) 

326 ): 

327 log.debug( 

328 "Suppressing %s base %s version leaking into image version: %s", 

329 local_info.name, 

330 results["ref_name"], 

331 results["image_version"], 

332 ) 

333 del results["image_version"] 

334 return results 

335 

336 

337class DockerServiceDetails(DiscoveryInstallationDetail): 

338 def __init__( 

339 self, 

340 container_name: str | None = None, 

341 compose_path: str | None = None, 

342 compose_version: str | None = None, 

343 compose_service: str | None = None, 

344 git_repo_path: str | None = None, 

345 ) -> None: 

346 self.container_name: str | None = container_name 

347 self.compose_path: str | None = compose_path 

348 self.compose_version: str | None = compose_version 

349 self.compose_service: str | None = compose_service 

350 self.git_repo_path: str | None = git_repo_path 

351 self.git_local_timestamp: str | None = None 

352 

353 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]: 

354 results: dict[str, str | list | dict | bool | int | None] = { 

355 "container_name": self.container_name, 

356 "compose_path": self.compose_path, 

357 "compose_service": self.compose_service, 

358 "compose_version": self.compose_version, 

359 } 

360 if self.git_local_timestamp: 

361 results["git_local_timestamp"] = self.git_local_timestamp 

362 if self.git_repo_path: 

363 results["git_repo_path"] = self.git_repo_path 

364 return results 

365 

366 

367class LocalContainerInfo: 

368 def build_image_info(self, container: Container) -> tuple[DockerImageInfo, DockerServiceDetails]: 

369 """Image contents equiv to `docker inspect image <image_ref>`""" 

370 # container image can be none if someone ran `docker rmi -f` 

371 # so although this could be sourced from image, like `container.image.tags[0]` 

372 # use the container ref instead, which survives monkeying about with images 

373 image_ref: str = container.attrs.get("Config", {}).get("Image") or "" 

374 image_digest = container.attrs.get("Image") 

375 

376 image_info: DockerImageInfo = DockerImageInfo( 

377 image_ref, 

378 image_digest=image_digest, 

379 tags=container.image.tags if container and container.image else None, 

380 annotations=container.image.labels if container.image else None, 

381 attributes=container.image.attrs if container.image else None, 

382 ) 

383 service_info: DockerServiceDetails = DockerServiceDetails( 

384 container.name, 

385 compose_path=container.labels.get("com.docker.compose.project.working_dir"), 

386 compose_service=container.labels.get("com.docker.compose.service"), 

387 compose_version=container.labels.get("com.docker.compose.version"), 

388 ) 

389 

390 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(image_info, None) 

391 # capture container labels/annotations, not image ones 

392 labels = labels or {} 

393 if container.image and container.image.attrs: 393 ↛ 395line 393 didn't jump to line 395 because the condition on line 393 was always true

394 image_info.created = container.image.attrs.get("Created") 

395 image_info.custom = labels 

396 image_info.version = cast("str|None", labels.get("image_version")) 

397 

398 return image_info, service_info 

399 

400 

401class PackageEnricher: 

402 def __init__(self, docker_cfg: DockerConfig, packages: dict[str, PackageUpdateInfo] | None = None) -> None: 

403 self.pkgs: dict[str, PackageUpdateInfo] = packages or {} 

404 self.cfg: DockerConfig = docker_cfg 

405 self.log: Any = structlog.get_logger().bind(integration="docker") 

406 

407 def initialize(self) -> None: 

408 pass 

409 

410 def enrich(self, image_info: DockerImageInfo) -> PackageUpdateInfo | None: 

411 def match(pkg: PackageUpdateInfo) -> bool: 

412 if pkg is not None and pkg.docker is not None and pkg.docker.image_name is not None: 412 ↛ 418line 412 didn't jump to line 418 because the condition on line 412 was always true

413 image_names = docker_image_names(pkg.docker) 

414 if image_info.untagged_ref is not None and image_info.untagged_ref in image_names: 

415 return True 

416 if image_info.ref is not None and image_info.ref in image_names: 

417 return True 

418 return False 

419 

420 if image_info.untagged_ref is not None and image_info.ref is not None: 420 ↛ 430line 420 didn't jump to line 430 because the condition on line 420 was always true

421 for pkg in self.pkgs.values(): 

422 if match(pkg): 

423 self.log.debug( 

424 "Found common package", 

425 image_name=pkg.docker.image_name, # type: ignore [union-attr] 

426 logo_url=pkg.logo_url, 

427 relnotes_url=pkg.release_notes_url, 

428 ) 

429 return pkg 

430 return None 

431 

432 

433class DefaultPackageEnricher(PackageEnricher): 

434 def enrich(self, image_info: DockerImageInfo) -> PackageUpdateInfo | None: 

435 self.log.debug("Default pkg info", image_name=image_info.untagged_ref, image_ref=image_info.ref) 

436 return PackageUpdateInfo( 

437 DockerPackageUpdateInfo(image_info.untagged_ref or image_info.ref, version_policy=VersionPolicy.AUTO), 

438 logo_url=self.cfg.default_entity_picture_url, 

439 release_notes_url=None, 

440 ) 

441 

442 

443class CommonPackageEnricher(PackageEnricher): 

444 def initialize(self) -> None: 

445 base_cfg: DictConfig = OmegaConf.structured(CommonPackages) 

446 if PKG_INFO_FILE.exists(): 446 ↛ 453line 446 didn't jump to line 453 because the condition on line 446 was always true

447 self.log.debug("Loading common package update info", path=PKG_INFO_FILE) 

448 cfg: DictConfig = typing.cast("DictConfig", OmegaConf.merge(base_cfg, OmegaConf.load(PKG_INFO_FILE))) 

449 

450 OmegaConf.to_container(cfg, throw_on_missing=True) 

451 OmegaConf.set_readonly(cfg, True) 

452 else: 

453 self.log.warn("No common package update info found", path=PKG_INFO_FILE) 

454 cfg = base_cfg 

455 try: 

456 common_config: CommonPackages = typing.cast("CommonPackages", cfg) 

457 # omegaconf broken-ness on optional fields and converting to backclasses 

458 self.pkgs = common_config.common_packages 

459 # self.pkgs: dict[str, PackageUpdateInfo] = { 

460 # pkg: PackageUpdateInfo(**pkg_cfg) for pkg, pkg_cfg in cfg.common_packages.items() if pkg not in self.pkgs 

461 # } 

462 except (MissingMandatoryValue, ValidationError) as e: 

463 self.log.serror("Configuration error %s", e, path=PKG_INFO_FILE.as_posix()) 

464 raise 

465 

466 

467class LinuxServerIOPackageEnricher(PackageEnricher): 

468 def initialize(self) -> None: 

469 cfg: MetadataSourceConfig | None = self.cfg.discover_metadata.get("linuxserver.io") 

470 if cfg is None or not cfg.enabled: 

471 return 

472 

473 self.log.debug(f"Fetching linuxserver.io metadata from API, cache_ttl={cfg.cache_ttl}") 

474 response: Response | None = fetch_url( 

475 "https://api.linuxserver.io/api/v1/images?include_config=false&include_deprecated=false", 

476 cache_ttl=cfg.cache_ttl, 

477 ) 

478 if response and response.is_success: 

479 api_data: Any = response.json() 

480 repos: list = api_data.get("data", {}).get("repositories", {}).get("linuxserver", []) 

481 else: 

482 return 

483 

484 added = 0 

485 for repo in repos: 

486 image_name = repo.get("name") 

487 if image_name and image_name not in self.pkgs: 487 ↛ 485line 487 didn't jump to line 485 because the condition on line 487 was always true

488 github_url: str | None = repo.get("github_url") 

489 self.pkgs[image_name] = PackageUpdateInfo( 

490 DockerPackageUpdateInfo(f"lscr.io/linuxserver/{image_name}"), 

491 logo_url=repo.get("project_logo"), 

492 release_notes_url=f"{github_url}/releases" if github_url else None, 

493 ) 

494 added += 1 

495 self.log.debug(f"Added {added} linuxserver.io package details") 

496 

497 

498class SourceReleaseEnricher: 

499 def __init__(self) -> None: 

500 self.log: Any = structlog.get_logger().bind(integration="docker") 

501 

502 def enrich( 

503 self, registry_info: DockerImageInfo, source_repo_url: str | None = None, notes_url: str | None = None 

504 ) -> ReleaseDetail | None: 

505 detail = ReleaseDetail(registry_info.name or UNKNOWN_NAME) 

506 

507 detail.notes_url = notes_url 

508 detail.version = registry_info.annotations.get("org.opencontainers.image.version") 

509 detail.revision = registry_info.annotations.get("org.opencontainers.image.revision") 

510 # explicit source_repo_url overrides container, e.g. where container source is only the docker wrapper 

511 detail.source_url = source_repo_url or registry_info.annotations.get("org.opencontainers.image.source") 

512 

513 if detail.source_url is None and registry_info is not None and registry_info.index_name is not None: 

514 registry_config: RegistryInfo | None = REGISTRIES.get(registry_info.index_name) 

515 repo_template: str | None = registry_config.repo_template if registry_config else None 

516 if repo_template: 

517 source_url = repo_template.format(image_name=registry_info.name) 

518 if validate_url(source_url, cache_ttl=86400): 518 ↛ 522line 518 didn't jump to line 522 because the condition on line 518 was always true

519 detail.source_url = source_url 

520 self.log.info("Implied source from registry: %s", detail.source_url) 

521 

522 if detail.source_url is None and detail.notes_url is None and detail.revision is None and detail.version is None: 

523 return None 

524 

525 if detail.source_url and "#" in detail.source_url: 

526 detail.source_repo_url = detail.source_url.split("#", 1)[0] 

527 self.log.debug("Simplifying %s from %s", detail.source_repo_url, detail.source_url) 

528 else: 

529 detail.source_repo_url = detail.source_url 

530 

531 detail.source_platform = id_source_platform(detail.source_repo_url) 

532 if not detail.source_platform: 

533 self.log.debug("No known source platform found on container", source=detail.source_repo_url) 

534 return detail 

535 

536 template_vars: dict[str, str | None] = { 

537 "version": detail.version or MISSING_VAL, 

538 "revision": detail.revision or MISSING_VAL, 

539 "repo": detail.source_repo_url or MISSING_VAL, 

540 "source": detail.source_url or MISSING_VAL, 

541 } 

542 

543 try: 

544 diff_url_template: str | None = DIFF_URL_TEMPLATES.get(detail.source_platform) 

545 diff_url: str | None = diff_url_template.format(**template_vars) if diff_url_template else None 

546 if diff_url and MISSING_VAL not in diff_url and validate_url(diff_url, cache_ttl=3600): 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true

547 detail.diff_url = diff_url 

548 else: 

549 diff_url = None 

550 

551 if detail.notes_url is None and detail.source_platform in RELEASE_URL_TEMPLATES: 

552 platform_notes_url: str | None = RELEASE_URL_TEMPLATES[detail.source_platform].format(**template_vars) 

553 if ( 553 ↛ 558line 553 didn't jump to line 558 because the condition on line 553 was never true

554 platform_notes_url 

555 and MISSING_VAL not in platform_notes_url 

556 and validate_url(platform_notes_url, cache_ttl=86400) 

557 ): 

558 self.log.debug("Setting default known release notes url: %s", platform_notes_url) 

559 detail.notes_url = platform_notes_url 

560 

561 if detail.notes_url is None and detail.source_platform in UNKNOWN_RELEASE_URL_TEMPLATES: 

562 platform_notes_url = UNKNOWN_RELEASE_URL_TEMPLATES[detail.source_platform].format(**template_vars) 

563 if ( 

564 platform_notes_url 

565 and MISSING_VAL not in platform_notes_url 

566 and validate_url(platform_notes_url, cache_ttl=86400) 

567 ): 

568 self.log.debug("Setting default unknown release notes url: %s", platform_notes_url) 

569 detail.notes_url = platform_notes_url 

570 except Exception as e: 

571 self.log.error("Failed formatting enriched URLs with %s: %s", template_vars, e) 

572 

573 return detail 

574 

575 

576class AuthError(Exception): 

577 pass 

578 

579 

580class VersionLookup: 

581 def __init__(self) -> None: 

582 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup") 

583 

584 @abstractmethod 

585 def lookup(self, local_image_info: DockerImageInfo, **kwargs) -> DockerImageInfo: # noqa: ANN003 

586 pass 

587 

588 

589class ContainerDistributionAPIVersionLookup(VersionLookup): 

590 def __init__(self, throttler: Throttler, cfg: RegistryConfig) -> None: 

591 self.throttler: Throttler = throttler 

592 self.cfg: RegistryConfig = cfg 

593 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup") 

594 self.api_stats = APIStatsCounter() 

595 

596 def fetch_token(self, registry: str, image_name: str) -> str | None: 

597 default_info = RegistryInfo(registry, registry, registry, TOKEN_URL_TEMPLATE, None) 

598 registry_info_: RegistryInfo = REGISTRIES.get(registry, default_info) 

599 auth_host: str | None = registry_info_.auth_host 

600 if auth_host is None: 

601 return None 

602 

603 service: str = registry_info_.service 

604 url_template: str | None = registry_info_.url_template 

605 auth_url: str | None = ( 

606 url_template.format(auth_host=auth_host, image_name=image_name, service=service) if url_template else None 

607 ) 

608 if auth_url is None: 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true

609 return None 

610 response: Response | None = fetch_url( 

611 auth_url, cache_ttl=self.cfg.token_cache_ttl, follow_redirects=True, api_stats_counter=self.api_stats 

612 ) 

613 

614 if response and response.is_success: 

615 api_data = httpx_json_content(response, {}) 

616 token: str | None = api_data.get("token") if api_data else None 

617 if token: 617 ↛ 619line 617 didn't jump to line 619 because the condition on line 617 was always true

618 return token 

619 self.log.warning("No token found in response for %s", auth_url) 

620 raise AuthError(f"No token found in response for {image_name}") 

621 

622 self.log.debug( 

623 "Non-success response at %s fetching token: %s", 

624 auth_url, 

625 (response and response.status_code) or None, 

626 ) 

627 if response and response.status_code == 404: 

628 self.log.debug( 

629 "Default token URL %s not found, calling /v2 endpoint to validate OCI API and provoke auth", auth_url 

630 ) 

631 response = fetch_url( 

632 f"https://{auth_host}/v2", 

633 follow_redirects=True, 

634 allow_stale=False, 

635 cache_ttl=0, 

636 api_stats_counter=self.api_stats, 

637 ) 

638 

639 if response and response.status_code == 401: 639 ↛ 660line 639 didn't jump to line 660 because the condition on line 639 was always true

640 auth = response.headers.get("www-authenticate") 

641 if not auth: 

642 self.log.warning("No www-authenticate header found in 401 response for %s", auth_url) 

643 raise AuthError(f"No www-authenticate header found on 401 for {image_name}") 

644 match = re.search(r'realm="([^"]+)",service="([^"]+)",scope="([^"]+)"', auth) 

645 if not match: 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true

646 self.log.warning("No realm/service/scope found in www-authenticate header for %s", auth_url) 

647 raise AuthError(f"No realm/service/scope found on 401 headers for {image_name}") 

648 

649 realm, service, scope = match.groups() 

650 auth_url = f"{realm}?service={service}&scope={scope}" 

651 response = fetch_url(auth_url, follow_redirects=True, api_stats_counter=self.api_stats) 

652 

653 if response and response.is_success: 653 ↛ 657line 653 didn't jump to line 657 because the condition on line 653 was always true

654 token_data = response.json() 

655 self.log.debug("Fetched registry token from %s", auth_url) 

656 return token_data.get("token") 

657 self.log.warning( 

658 "Alternative auth %s with status %s has no token", auth_url, (response and response.status_code) or None 

659 ) 

660 elif response: 

661 self.log.warning("Auth %s failed with status %s", auth_url, (response and response.status_code) or None) 

662 

663 raise AuthError(f"Failed to fetch token for {image_name} at {auth_url}") 

664 

665 def fetch_index( 

666 self, api_host: str, local_image_info: DockerImageInfo, token: str | None 

667 ) -> tuple[Any | None, str | None, CacheMetadata | None]: 

668 if local_image_info.tag: 668 ↛ 672line 668 didn't jump to line 672 because the condition on line 668 was always true

669 api_url: str = f"https://{api_host}/v2/{local_image_info.name}/manifests/{local_image_info.tag}" 

670 cache_ttl: int | None = self.cfg.mutable_cache_ttl 

671 else: 

672 api_url = f"https://{api_host}/v2/{local_image_info.name}/manifests/{local_image_info.pinned_digest}" 

673 cache_ttl = self.cfg.immutable_cache_ttl 

674 

675 response: Response | None = fetch_url( 

676 api_url, 

677 cache_ttl=cache_ttl, 

678 bearer_token=token, 

679 response_type=[ 

680 "application/vnd.oci.image.index.v1+json", 

681 "application/vnd.docker.distribution.manifest.list.v2+json", 

682 ], 

683 api_stats_counter=self.api_stats, 

684 ) 

685 

686 if response is None: 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true

687 self.log.warning("Empty response for manifest for image at %s", api_url) 

688 elif response.status_code == 429: 688 ↛ 689line 688 didn't jump to line 689 because the condition on line 688 was never true

689 self.throttler.throttle(local_image_info.index_name, raise_exception=True) 

690 elif not response.is_success: 

691 api_data = httpx_json_content(response, {}) 

692 self.log.warning( 

693 "Failed to fetch index from %s: %s", 

694 api_url, 

695 api_data.get("errors") if api_data else response.text, 

696 ) 

697 else: 

698 index = response.json() 

699 self.log.debug( 

700 "INDEX %s manifests, %s annotations, api: %s, header digest: %s", 

701 len(index.get("manifests", [])), 

702 len(index.get("annotations", [])), 

703 response.headers.get(HEADER_DOCKER_API, "N/A"), 

704 response.headers.get(HEADER_DOCKER_DIGEST, "N/A"), 

705 ) 

706 return index, response.headers.get(HEADER_DOCKER_DIGEST), CacheMetadata(response) 

707 return None, None, None 

708 

709 def fetch_object( 

710 self, 

711 api_host: str, 

712 local_image_info: DockerImageInfo, 

713 media_type: str, 

714 digest: str, 

715 token: str | None, 

716 follow_redirects: bool = False, 

717 api_type: str = "manifests", 

718 ) -> tuple[Any | None, CacheMetadata | None]: 

719 api_url = f"https://{api_host}/v2/{local_image_info.name}/{api_type}/{digest}" 

720 response = fetch_url( 

721 api_url, 

722 cache_ttl=self.cfg.immutable_cache_ttl, 

723 bearer_token=token, 

724 response_type=media_type, 

725 allow_stale=True, 

726 follow_redirects=follow_redirects, 

727 api_stats_counter=self.api_stats, 

728 ) 

729 

730 if response and response.is_success: 

731 obj = httpx_json_content(response, None) 

732 if obj: 732 ↛ 759line 732 didn't jump to line 759 because the condition on line 732 was always true

733 self.log.debug( 

734 "%s, header digest:%s, api: %s, %s annotations", 

735 api_type.upper(), 

736 response.headers.get(HEADER_DOCKER_DIGEST, "N/A"), 

737 response.headers.get(HEADER_DOCKER_API, "N/A"), 

738 len(obj.get("annotations", [])), 

739 ) 

740 return obj, CacheMetadata(response) 

741 elif response and response.status_code == 429: 741 ↛ 742line 741 didn't jump to line 742 because the condition on line 741 was never true

742 self.throttler.throttle(local_image_info.index_name, raise_exception=True) 

743 elif response and not response.is_success: 743 ↛ 758line 743 didn't jump to line 758 because the condition on line 743 was always true

744 api_data = httpx_json_content(response, {}) 

745 if response: 745 ↛ 753line 745 didn't jump to line 753 because the condition on line 745 was always true

746 self.log.warning( 

747 "Failed to fetch obj from %s: %s %s", 

748 api_url, 

749 response.status_code, 

750 api_data.get("errors") if api_data else response.text, 

751 ) 

752 else: 

753 self.log.warning( 

754 "Failed to fetch obj from %s: No Response, %s", api_url, api_data.get("errors") if api_data else None 

755 ) 

756 

757 else: 

758 self.log.error("Empty response from %s", api_url) 

759 return None, None 

760 

761 def lookup( 

762 self, 

763 local_image_info: DockerImageInfo, 

764 token: str | None = None, 

765 minimal: bool = False, 

766 **kwargs, # noqa: ANN003, ARG002 

767 ) -> DockerImageInfo: 

768 result: DockerImageInfo = DockerImageInfo(local_image_info.ref) 

769 if not local_image_info.name or not local_image_info.index_name: 769 ↛ 770line 769 didn't jump to line 770 because the condition on line 769 was never true

770 self.log.debug("No local pkg name or registry index name to check") 

771 return result 

772 

773 if self.throttler.check_throttle(local_image_info.index_name): 773 ↛ 774line 773 didn't jump to line 774 because the condition on line 773 was never true

774 result.throttled = True 

775 return result 

776 

777 if token: 777 ↛ 778line 777 didn't jump to line 778 because the condition on line 777 was never true

778 self.log.debug("Using provided token to fetch manifest for image %s", local_image_info.ref) 

779 else: 

780 try: 

781 token = self.fetch_token(local_image_info.index_name, local_image_info.name) 

782 except AuthError as e: 

783 self.log.warning("Authentication error prevented Docker Registry enrichment: %s", e) 

784 result.error = str(e) 

785 return result 

786 

787 index: Any | None = None 

788 index_digest: str | None = None # fetched from header, should be the image digest 

789 index_cache_metadata: CacheMetadata | None = None 

790 manifest_cache_metadata: CacheMetadata | None = None 

791 config_cache_metadata: CacheMetadata | None = None 

792 idx = local_image_info.index_name 

793 api_host: str | None = REGISTRIES.get( 

794 idx, 

795 RegistryInfo(idx, idx, idx, TOKEN_URL_TEMPLATE, None), 

796 ).api_host 

797 if api_host is None: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true

798 self.log("No API host can be determined for %s", local_image_info.index_name) 

799 return result 

800 try: 

801 index, index_digest, index_cache_metadata = self.fetch_index(api_host, local_image_info, token) 

802 except ThrottledError: 

803 result.throttled = True 

804 index = None 

805 

806 if index: 

807 result.annotations = index.get("annotations", {}) 

808 for m in index.get("manifests", []): 

809 try: 

810 platform_info = m.get("platform", {}) 

811 except Exception as e: 

812 self.log.warning("Failed analyzing manifest data: %s: %s", m, e) 

813 continue 

814 if ( 

815 platform_info.get("os") == local_image_info.os 

816 and platform_info.get("architecture") == local_image_info.arch 

817 and ("Variant" not in platform_info or platform_info.get("Variant") == local_image_info.variant) 

818 ): 

819 if index_digest: 

820 result.image_digest = index_digest 

821 result.short_digest = result.condense_digest(index_digest) 

822 self.log.debug("Setting %s image digest %s", result.name, result.short_digest) 

823 

824 digest: str | None = m.get("digest") 

825 media_type = m.get("mediaType") 

826 manifest: Any | None = None 

827 

828 if digest: 828 ↛ 836line 828 didn't jump to line 836 because the condition on line 828 was always true

829 try: 

830 manifest, manifest_cache_metadata = self.fetch_object( 

831 api_host, local_image_info, media_type, digest, token 

832 ) 

833 except ThrottledError: 

834 result.throttled = True 

835 

836 if manifest: 

837 manifest_config: dict[str, Any] = manifest.get("config", {}) 

838 digest = manifest_config.get("digest") 

839 if digest is None: 839 ↛ 840line 839 didn't jump to line 840 because the condition on line 839 was never true

840 self.log.warning("Empty digest for %s %s %s", api_host, digest, media_type) 

841 else: 

842 result.repo_digest = result.condense_digest(digest, short=False) 

843 self.log.debug("Setting %s repo digest: %s", result.name, result.repo_digest) 

844 

845 if manifest.get("annotations"): 845 ↛ 848line 845 didn't jump to line 848 because the condition on line 845 was always true

846 result.annotations.update(manifest.get("annotations", {})) 

847 else: 

848 self.log.debug("No annotations found in manifest: %s", manifest) 

849 

850 if ( 

851 not minimal 

852 and manifest_config 

853 and manifest_config.get("mediaType") 

854 and manifest_config.get("digest") 

855 ): 

856 try: 

857 img_config, config_cache_metadata = self.fetch_object( 

858 api_host=api_host, 

859 local_image_info=local_image_info, 

860 media_type=manifest_config["mediaType"], 

861 digest=manifest_config["digest"], 

862 token=token, 

863 follow_redirects=True, 

864 api_type="blobs", 

865 ) 

866 if img_config: 866 ↛ 876line 866 didn't jump to line 876 because the condition on line 866 was always true

867 config = img_config.get("config") or img_config.get("Config") 

868 try: 

869 if config and "Labels" in config: 869 ↛ 871line 869 didn't jump to line 871 because the condition on line 869 was always true

870 result.annotations.update(config.get("Labels") or {}) 

871 result.annotations.update(img_config.get("annotations") or {}) 

872 except Exception as e: 

873 self.log.warning("Failure handling labels/annotations %s: %s", config, e) 

874 result.created = config.get("created") or config.get("Created") 

875 else: 

876 self.log.debug("No config found: %s", manifest) 

877 except Exception as e: 

878 self.log.warning("Failed to extract %s image info from config: %s", local_image_info.ref, e) 

879 

880 if not result.annotations: 

881 self.log.debug("No annotations found from registry data") 

882 

883 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(local_image_info, result) 

884 result.custom = labels or {} 

885 if index_cache_metadata: 

886 result.custom["index_cache_age"] = index_cache_metadata.age 

887 if manifest_cache_metadata: 

888 result.custom["manifest_cache_age"] = manifest_cache_metadata.age 

889 if config_cache_metadata: 

890 result.custom["config_cache_age"] = config_cache_metadata.age 

891 result.version = cast("str|None", labels.get("image_version")) 

892 result.origin = "OCI_V2" if not minimal else "OCI_V2_MINIMAL" 

893 

894 self.log.debug( 

895 "OCI_V2 Lookup for %s: short_digest:%s, repo_digest:%s, version: %s", 

896 local_image_info.name, 

897 result.short_digest, 

898 result.repo_digest, 

899 result.version, 

900 ) 

901 return result 

902 

903 

904class DockerClientVersionLookup(VersionLookup): 

905 """Query remote registry via local Docker API 

906 

907 No auth needed, however uses the old v1 APIs, and only Index available via API 

908 """ 

909 

910 def __init__(self, client: docker.DockerClient, throttler: Throttler, cfg: RegistryConfig, api_backoff: int = 30) -> None: 

911 self.client: docker.DockerClient = client 

912 self.throttler: Throttler = throttler 

913 self.cfg: RegistryConfig = cfg 

914 self.api_backoff: int = api_backoff 

915 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup") 

916 

917 def lookup(self, local_image_info: DockerImageInfo, retries: int = 3, **kwargs) -> DockerImageInfo: # noqa: ANN003, ARG002 

918 retries_left = retries 

919 retry_secs: int = self.api_backoff 

920 reg_data: RegistryData | None = None 

921 

922 result = DockerImageInfo(local_image_info.ref) 

923 if local_image_info.index_name is None or local_image_info.ref is None: 923 ↛ 924line 923 didn't jump to line 924 because the condition on line 923 was never true

924 return result 

925 

926 while reg_data is None and retries_left > 0: 

927 if self.throttler.check_throttle(local_image_info.index_name): 

928 result.throttled = True 

929 break 

930 try: 

931 self.log.debug("Fetching registry data", image_ref=local_image_info.ref) 

932 reg_data = self.client.images.get_registry_data(local_image_info.ref) 

933 self.log.debug( 

934 "Registry Data: id:%s,image:%s, attrs:%s", 

935 reg_data.id, 

936 reg_data.image_name, 

937 reg_data.attrs, 

938 ) 

939 if reg_data: 939 ↛ 926line 939 didn't jump to line 926 because the condition on line 939 was always true

940 result.short_digest = result.condense_digest(reg_data.short_id) 

941 result.image_digest = result.condense_digest(reg_data.id, short=False) 

942 # result.name = reg_data.image_name 

943 result.attributes = reg_data.attrs 

944 result.annotations = reg_data.attrs.get("Config", {}).get("Labels") or {} 

945 result.error = None 

946 

947 except docker.errors.APIError as e: 

948 if e.status_code == HTTPStatus.TOO_MANY_REQUESTS: 948 ↛ 957line 948 didn't jump to line 957 because the condition on line 948 was always true

949 retry_secs = round(retry_secs**1.5) 

950 try: 

951 retry_secs = int(e.response.headers.get("Retry-After", -1)) # type: ignore[union-attr] 

952 except Exception as e2: 

953 self.log.debug("Failed to access headers for retry info: %s", e2) 

954 self.throttler.throttle(local_image_info.index_name, retry_secs, e.explanation) 

955 result.throttled = True 

956 return result 

957 result.error = str(e) 

958 retries_left -= 1 

959 if retries_left == 0 or e.is_client_error(): 

960 self.log.warn("Failed to fetch registry data: [%s] %s", e.errno, e.explanation) 

961 else: 

962 self.log.debug("Failed to fetch registry data, retrying: %s", e) 

963 

964 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(local_image_info, result) 

965 result.custom = labels or {} 

966 result.version = cast("str|None", labels.get("image_version")) 

967 result.created = cast("str|None", labels.get("image_created")) 

968 result.origin = "DOCKER_CLIENT" 

969 return result