Coverage for src / updates2mqtt / integrations / docker_enrich.py: 78%

505 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:11 +0000

1import re 

2import typing 

3from abc import abstractmethod 

4from typing import Any, cast 

5 

6import structlog 

7from docker.auth import resolve_repository_name 

8from docker.models.containers import Container 

9from omegaconf import MissingMandatoryValue, OmegaConf, ValidationError 

10 

11from updates2mqtt.helpers import ( 

12 APIStatsCounter, 

13 CacheMetadata, 

14 ThrottledError, 

15 Throttler, 

16 fetch_url, 

17 httpx_json_content, 

18 validate_url, 

19) 

20from updates2mqtt.model import DiscoveryArtefactDetail, DiscoveryInstallationDetail, ReleaseDetail 

21 

22if typing.TYPE_CHECKING: 

23 from docker.models.images import RegistryData 

24 from httpx import Response 

25 from omegaconf.dictconfig import DictConfig 

26from http import HTTPStatus 

27 

28import docker 

29import docker.errors 

30 

31from updates2mqtt.config import ( 

32 PKG_INFO_FILE, 

33 SOURCE_PLATFORM_GITHUB, 

34 SOURCE_PLATFORM_GITLAB, 

35 SOURCE_PLATFORMS, 

36 CommonPackages, 

37 DockerConfig, 

38 DockerPackageUpdateInfo, 

39 PackageUpdateInfo, 

40 RegistryConfig, 

41 VersionPolicy, 

42) 

43 

44log: Any = structlog.get_logger() 

45 

46DIFF_URL_TEMPLATES = { 

47 SOURCE_PLATFORM_GITHUB: "{repo}/commit/{revision}", 

48} 

49RELEASE_URL_TEMPLATES = { 

50 SOURCE_PLATFORM_GITHUB: "{repo}/releases/tag/{version}", 

51} 

52UNKNOWN_RELEASE_URL_TEMPLATES = { 

53 SOURCE_PLATFORM_GITHUB: "{repo}/releases", 

54 SOURCE_PLATFORM_GITLAB: "{repo}/container_registry", 

55} 

56MISSING_VAL = "**MISSING**" 

57UNKNOWN_REGISTRY = "**UNKNOWN_REGISTRY**" 

58UNKNOWN_NAME = "**UNKNOWN_NAME**" 

59 

60HEADER_DOCKER_DIGEST = "docker-content-digest" 

61HEADER_DOCKER_API = "docker-distribution-api-version" 

62 

63TOKEN_URL_TEMPLATE = "https://{auth_host}/token?scope=repository:{image_name}:pull&service={service}" # noqa: S105 # nosec 

64 

65REGISTRY_GHCR = "ghcr.io" 

66REGISTRY_DOCKER = "docker.io" 

67REGISTRY_MCR = "mcr.microsoft.com" 

68REGISTRY_QUAY = "quay.io" 

69REGISTRY_LSCR = "lscr.io" 

70REGISTRY_CODEBERG = "codeberg.org" 

71REGISTRY_GITLAB = "registry.gitlab.com" 

72 

73REGISTRIES: dict[str, tuple[str | None, str, str, str | None, str | None]] = { 

74 # registry: (auth_host, api_host, service, url_template, repo_template) 

75 REGISTRY_DOCKER: ("auth.docker.io", "registry-1.docker.io", "registry.docker.io", TOKEN_URL_TEMPLATE, None), 

76 REGISTRY_MCR: (None, "mcr.microsoft.com", "mcr.microsoft.com", None, None), 

77 REGISTRY_QUAY: (None, "quay.io", "quay.io", TOKEN_URL_TEMPLATE, None), 

78 REGISTRY_GHCR: ("ghcr.io", "ghcr.io", "ghcr.io", TOKEN_URL_TEMPLATE, "https://github.com/{image_name}"), 

79 "lscr.io": ("ghcr.io", "lscr.io", "ghcr.io", TOKEN_URL_TEMPLATE, None), 

80 REGISTRY_CODEBERG: ( 

81 "codeberg.org", 

82 "codeberg.org", 

83 "container_registry", 

84 TOKEN_URL_TEMPLATE, 

85 "https://codeberg.org/{image_name}", 

86 ), 

87 REGISTRY_GITLAB: ( 

88 "www.gitlab.com", 

89 "registry.gitlab.com", 

90 "container_registry", 

91 "https://{auth_host}/jwt/auth?service={service}&scope=repository:{image_name}:pull&offline_token=true&client_id=docker", 

92 "https://gitlab.com/{image_name}", 

93 ), 

94} 

95 

96# source: https://specs.opencontainers.org/distribution-spec/?v=v1.0.0#pull 

97OCI_NAME_RE = r"[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*(\/[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*)*" 

98OCI_TAG_RE = r"[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}" 

99 

100 

101class DockerImageInfo(DiscoveryArtefactDetail): 

102 """Normalize and shlep around the bits of an image def 

103 

104 index_name: aka index_name, e.g. ghcr.io 

105 name: image ref without index name or tag, e.g. nginx, or librenms/librenms 

106 tag: tag or digest 

107 untagged_ref: combined index name and package name 

108 """ 

109 

110 def __init__( 

111 self, 

112 ref: str, # ref with optional index name and tag or digest, index:name:tag_or_digest 

113 image_digest: str | None = None, 

114 tags: list[str] | None = None, 

115 attributes: dict[str, Any] | None = None, 

116 annotations: dict[str, Any] | None = None, 

117 platform: str | None = None, # test harness simplification 

118 version: str | None = None, # test harness simplification 

119 created: str | None = None, 

120 ) -> None: 

121 super().__init__() 

122 self.ref: str = ref 

123 self.version: str | None = version 

124 self.image_digest: str | None = image_digest 

125 self.short_digest: str | None = None 

126 self.repo_digest: str | None = None # the single RepoDigest known to match registry 

127 self.git_digest: str | None = None 

128 self.index_name: str | None = None 

129 self.name: str | None = None 

130 self.tag: str | None = None 

131 self.pinned_digest: str | None = None 

132 # untagged ref using combined index and remote name used only for pattern matching common pkg info 

133 self.untagged_ref: str | None = None # index_name/remote_name used for pkg match 

134 self.tag_or_digest: str | None = None # index_name/remote_name:**tag_or_digest** 

135 self.tags = tags 

136 self.attributes: dict[str, Any] = attributes or {} 

137 self.annotations: dict[str, Any] = annotations or {} 

138 self.throttled: bool = False 

139 self.origin: str | None = None 

140 self.error: str | None = None 

141 self.platform: str | None = platform 

142 self.custom: dict[str, str | float | int | bool | None] = {} 

143 self.created: str | None = created 

144 

145 self.local_build: bool = not self.repo_digests 

146 self.index_name, remote_name = resolve_repository_name(ref) 

147 

148 self.name = remote_name 

149 

150 if remote_name and ":" in remote_name and ("@" not in remote_name or remote_name.index("@") > remote_name.index(":")): 

151 # name:tag format 

152 self.name, self.tag_or_digest = remote_name.split(":", 1) 

153 self.untagged_ref = ref.split(":", 1)[0] 

154 self.tag = self.tag_or_digest 

155 

156 elif remote_name and "@" in remote_name: 

157 # name@digest format 

158 self.name, self.tag_or_digest = remote_name.split("@", 1) 

159 self.untagged_ref = ref.split("@", 1)[0] 

160 self.pinned_digest = self.tag_or_digest 

161 

162 if self.tag and "@" in self.tag: 

163 # name:tag@digest format 

164 # for pinned tags, care only about the digest part 

165 self.tag, self.tag_or_digest = self.tag.split("@", 1) 

166 self.pinned_digest = self.tag_or_digest 

167 if self.tag_or_digest is None: 

168 self.tag_or_digest = "latest" 

169 self.untagged_ref = ref 

170 self.tag = self.tag_or_digest 

171 

172 if self.repo_digest is None and len(self.repo_digests) == 1: 

173 # definite known RepoDigest 

174 # if its ambiguous, the final version selection will handle it 

175 self.repo_digest = self.repo_digests[0] 

176 

177 if self.index_name == "docker.io" and "/" not in self.name: 

178 # "official Docker images have an abbreviated library/foo name" 

179 self.name = f"library/{self.name}" 

180 if self.name is not None and not re.match(OCI_NAME_RE, self.name): 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true

181 log.warning("Invalid OCI image name: %s", self.name) 

182 if self.tag and not re.match(OCI_TAG_RE, self.tag): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 log.warning("Invalid OCI image tag: %s", self.tag) 

184 if "/" in self.name: 

185 self.unqualified_name: str = self.name.split("/", 1)[1] 

186 else: 

187 self.unqualified_name = self.name 

188 

189 if self.os and self.arch: 

190 self.platform = "/".join( 

191 filter( 

192 None, 

193 [self.os, self.arch, self.variant], 

194 ), 

195 ) 

196 

197 if self.image_digest is not None: 

198 self.image_digest = self.condense_digest(self.image_digest, short=False) 

199 self.short_digest = self.condense_digest(self.image_digest) # type: ignore[arg-type] 

200 

201 @property 

202 def repo_digests(self) -> list[str]: 

203 if self.repo_digest: 

204 return [self.repo_digest] 

205 # RepoDigest in image inspect, Registry Config object 

206 digests = [v.split("@", 1)[1] if "@" in v else v for v in self.attributes.get("RepoDigests", [])] 

207 return digests or [] 

208 

209 @property 

210 def pinned(self) -> bool: 

211 """Check if this is pinned and installed version consistent with pin""" 

212 return bool(self.pinned_digest and self.pinned_digest in self.repo_digests) 

213 

214 @property 

215 def os(self) -> str | None: 

216 return self.attributes.get("Os") 

217 

218 @property 

219 def arch(self) -> str | None: 

220 return self.attributes.get("Architecture") 

221 

222 @property 

223 def variant(self) -> str | None: 

224 return self.attributes.get("Variant") 

225 

226 def condense_digest(self, digest: str, short: bool = True) -> str | None: 

227 try: 

228 digest = digest.split("@")[1] if "@" in digest else digest # fully qualified RepoDigest 

229 if short: 

230 digest = digest.split(":")[1] if ":" in digest else digest # remove digest type prefix 

231 return digest[0:12] 

232 return digest 

233 except Exception as e: 

234 log.warning("Unable to condense digest %s: %s", digest, e) 

235 return None 

236 

237 def reuse(self) -> "DockerImageInfo": 

238 cloned = DockerImageInfo( 

239 self.ref, self.image_digest, self.tags, self.attributes, self.annotations, self.version, self.created 

240 ) 

241 cloned.origin = "REUSED" 

242 return cloned 

243 

244 def as_dict(self, minimal: bool = True) -> dict[str, str | list | dict | bool | int | None]: 

245 result: dict[str, str | list | dict | bool | int | None] = { 

246 "captured": self.captured.isoformat(), 

247 "image_ref": self.ref, 

248 "name": self.name, 

249 "version": self.version, 

250 "image_digest": self.image_digest, 

251 "repo_digest": self.repo_digest, 

252 "repo_digests": self.repo_digest, 

253 "git_digest": self.git_digest, 

254 "index_name": self.index_name, 

255 "tag": self.tag, 

256 "pinned_digest": self.pinned_digest, 

257 "tag_or_digest": self.tag_or_digest, 

258 "tags": self.tags, 

259 "origin": self.origin, 

260 "platform": self.platform, 

261 "local_build": self.local_build, 

262 "error": self.error, 

263 "throttled": self.throttled, 

264 "custom": self.custom, 

265 } 

266 if not minimal: 

267 result["attributes"] = self.attributes 

268 result["annotations"] = self.annotations 

269 return result 

270 

271 

272def id_source_platform(source: str | None) -> str | None: 

273 candidates: list[str] = [platform for platform, pattern in SOURCE_PLATFORMS.items() if re.match(pattern, source or "")] 

274 return candidates[0] if candidates else None 

275 

276 

277def _select_annotation( 

278 name: str, key: str, local_info: DockerImageInfo | None = None, registry_info: DockerImageInfo | None = None 

279) -> dict[str, str | None]: 

280 result: dict[str, str | None] = {} 

281 if registry_info: 

282 v: Any | None = registry_info.annotations.get(key) 

283 if v is not None: 

284 result[name] = v 

285 elif local_info: 285 ↛ 289line 285 didn't jump to line 289 because the condition on line 285 was always true

286 v = local_info.annotations.get(key) 

287 if v is not None: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true

288 result[name] = v 

289 return result 

290 

291 

292def cherrypick_annotations( 

293 local_info: DockerImageInfo, registry_info: DockerImageInfo | None 

294) -> dict[str, str | float | int | bool | None]: 

295 """https://github.com/opencontainers/image-spec/blob/main/annotations.md""" 

296 results: dict[str, str | float | int | bool | None] = {} 

297 for either_name, either_label in [ 

298 ("documentation_url", "org.opencontainers.image.documentation"), 

299 ("description", "org.opencontainers.image.description"), 

300 ("licences", "org.opencontainers.image.licenses"), 

301 ("image_base", "org.opencontainers.image.base.name"), 

302 ("image_created", "org.opencontainers.image.created"), 

303 ("image_version", "org.opencontainers.image.version"), 

304 ("image_revision", "org.opencontainers.image.revision"), 

305 ("ref_name", "org.opencontainers.image.ref.name"), 

306 ("title", "org.opencontainers.image.title"), 

307 ("vendor", "org.opencontainers.image.vendor"), 

308 ("source", "org.opencontainers.image.source"), 

309 ]: 

310 results.update(_select_annotation(either_name, either_label, local_info, registry_info)) 

311 if ( 311 ↛ 317line 311 didn't jump to line 317 because the condition on line 311 was never true

312 results.get("ref_name") == "ubuntu" 

313 and local_info.name != "ubuntu" 

314 and results.get("image_version") 

315 and re.fullmatch(r"^2\d\.\d\d$", cast("str", results["image_version"])) 

316 ): 

317 log.debug( 

318 "Suppressing %s base %s version leaking into image version: %s", 

319 local_info.name, 

320 results["ref_name"], 

321 results["image_version"], 

322 ) 

323 del results["image_version"] 

324 return results 

325 

326 

327class DockerServiceDetails(DiscoveryInstallationDetail): 

328 def __init__( 

329 self, 

330 container_name: str | None = None, 

331 compose_path: str | None = None, 

332 compose_version: str | None = None, 

333 compose_service: str | None = None, 

334 git_repo_path: str | None = None, 

335 ) -> None: 

336 self.container_name: str | None = container_name 

337 self.compose_path: str | None = compose_path 

338 self.compose_version: str | None = compose_version 

339 self.compose_service: str | None = compose_service 

340 self.git_repo_path: str | None = git_repo_path 

341 self.git_local_timestamp: str | None = None 

342 

343 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]: 

344 results: dict[str, str | list | dict | bool | int | None] = { 

345 "container_name": self.container_name, 

346 "compose_path": self.compose_path, 

347 "compose_service": self.compose_service, 

348 "compose_version": self.compose_version, 

349 } 

350 if self.git_local_timestamp: 

351 results["git_local_timestamp"] = self.git_local_timestamp 

352 if self.git_repo_path: 

353 results["git_repo_path"] = self.git_repo_path 

354 return results 

355 

356 

357class LocalContainerInfo: 

358 def build_image_info(self, container: Container) -> tuple[DockerImageInfo, DockerServiceDetails]: 

359 """Image contents equiv to `docker inspect image <image_ref>`""" 

360 # container image can be none if someone ran `docker rmi -f` 

361 # so although this could be sourced from image, like `container.image.tags[0]` 

362 # use the container ref instead, which survives monkeying about with images 

363 image_ref: str = container.attrs.get("Config", {}).get("Image") or "" 

364 image_digest = container.attrs.get("Image") 

365 

366 image_info: DockerImageInfo = DockerImageInfo( 

367 image_ref, 

368 image_digest=image_digest, 

369 tags=container.image.tags if container and container.image else None, 

370 annotations=container.image.labels if container.image else None, 

371 attributes=container.image.attrs if container.image else None, 

372 ) 

373 service_info: DockerServiceDetails = DockerServiceDetails( 

374 container.name, 

375 compose_path=container.labels.get("com.docker.compose.project.working_dir"), 

376 compose_service=container.labels.get("com.docker.compose.service"), 

377 compose_version=container.labels.get("com.docker.compose.version"), 

378 ) 

379 

380 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(image_info, None) 

381 # capture container labels/annotations, not image ones 

382 labels = labels or {} 

383 if container.image and container.image.attrs: 383 ↛ 385line 383 didn't jump to line 385 because the condition on line 383 was always true

384 image_info.created = container.image.attrs.get("Created") 

385 image_info.custom = labels 

386 image_info.version = cast("str|None", labels.get("image_version")) 

387 

388 return image_info, service_info 

389 

390 

391class PackageEnricher: 

392 def __init__(self, docker_cfg: DockerConfig, packages: dict[str, PackageUpdateInfo] | None = None) -> None: 

393 self.pkgs: dict[str, PackageUpdateInfo] = packages or {} 

394 self.cfg: DockerConfig = docker_cfg 

395 self.log: Any = structlog.get_logger().bind(integration="docker") 

396 

397 def initialize(self) -> None: 

398 pass 

399 

400 def enrich(self, image_info: DockerImageInfo) -> PackageUpdateInfo | None: 

401 def match(pkg: PackageUpdateInfo) -> bool: 

402 if pkg is not None and pkg.docker is not None and pkg.docker.image_name is not None: 402 ↛ 407line 402 didn't jump to line 407 because the condition on line 402 was always true

403 if image_info.untagged_ref is not None and image_info.untagged_ref == pkg.docker.image_name: 

404 return True 

405 if image_info.ref is not None and image_info.ref == pkg.docker.image_name: 

406 return True 

407 return False 

408 

409 if image_info.untagged_ref is not None and image_info.ref is not None: 409 ↛ 419line 409 didn't jump to line 419 because the condition on line 409 was always true

410 for pkg in self.pkgs.values(): 

411 if match(pkg): 

412 self.log.debug( 

413 "Found common package", 

414 image_name=pkg.docker.image_name, # type: ignore [union-attr] 

415 logo_url=pkg.logo_url, 

416 relnotes_url=pkg.release_notes_url, 

417 ) 

418 return pkg 

419 return None 

420 

421 

422class DefaultPackageEnricher(PackageEnricher): 

423 def enrich(self, image_info: DockerImageInfo) -> PackageUpdateInfo | None: 

424 self.log.debug("Default pkg info", image_name=image_info.untagged_ref, image_ref=image_info.ref) 

425 return PackageUpdateInfo( 

426 DockerPackageUpdateInfo(image_info.untagged_ref or image_info.ref, version_policy=VersionPolicy.AUTO), 

427 logo_url=self.cfg.default_entity_picture_url, 

428 release_notes_url=None, 

429 ) 

430 

431 

432class CommonPackageEnricher(PackageEnricher): 

433 def initialize(self) -> None: 

434 base_cfg: DictConfig = OmegaConf.structured(CommonPackages) 

435 if PKG_INFO_FILE.exists(): 435 ↛ 442line 435 didn't jump to line 442 because the condition on line 435 was always true

436 self.log.debug("Loading common package update info", path=PKG_INFO_FILE) 

437 cfg: DictConfig = typing.cast("DictConfig", OmegaConf.merge(base_cfg, OmegaConf.load(PKG_INFO_FILE))) 

438 

439 OmegaConf.to_container(cfg, throw_on_missing=True) 

440 OmegaConf.set_readonly(cfg, True) 

441 else: 

442 self.log.warn("No common package update info found", path=PKG_INFO_FILE) 

443 cfg = base_cfg 

444 try: 

445 common_config: CommonPackages = typing.cast("CommonPackages", cfg) 

446 # omegaconf broken-ness on optional fields and converting to backclasses 

447 self.pkgs = common_config.common_packages 

448 # self.pkgs: dict[str, PackageUpdateInfo] = { 

449 # pkg: PackageUpdateInfo(**pkg_cfg) for pkg, pkg_cfg in cfg.common_packages.items() if pkg not in self.pkgs 

450 # } 

451 except (MissingMandatoryValue, ValidationError) as e: 

452 self.log.serror("Configuration error %s", e, path=PKG_INFO_FILE.as_posix()) 

453 raise 

454 

455 

456class LinuxServerIOPackageEnricher(PackageEnricher): 

457 def initialize(self) -> None: 

458 cfg = self.cfg.discover_metadata.get("linuxserver.io") 

459 if cfg is None or not cfg.enabled: 

460 return 

461 

462 self.log.debug(f"Fetching linuxserver.io metadata from API, cache_ttl={cfg.cache_ttl}") 

463 response: Response | None = fetch_url( 

464 "https://api.linuxserver.io/api/v1/images?include_config=false&include_deprecated=false", 

465 cache_ttl=cfg.cache_ttl, 

466 ) 

467 if response and response.is_success: 

468 api_data: Any = response.json() 

469 repos: list = api_data.get("data", {}).get("repositories", {}).get("linuxserver", []) 

470 else: 

471 return 

472 

473 added = 0 

474 for repo in repos: 

475 image_name = repo.get("name") 

476 if image_name and image_name not in self.pkgs: 476 ↛ 474line 476 didn't jump to line 474 because the condition on line 476 was always true

477 self.pkgs[image_name] = PackageUpdateInfo( 

478 DockerPackageUpdateInfo(f"lscr.io/linuxserver/{image_name}"), 

479 logo_url=repo["project_logo"], 

480 release_notes_url=f"{repo['github_url']}/releases", 

481 ) 

482 added += 1 

483 self.log.debug(f"Added {added} linuxserver.io package details") 

484 

485 

486class SourceReleaseEnricher: 

487 def __init__(self) -> None: 

488 self.log: Any = structlog.get_logger().bind(integration="docker") 

489 

490 def enrich( 

491 self, registry_info: DockerImageInfo, source_repo_url: str | None = None, notes_url: str | None = None 

492 ) -> ReleaseDetail | None: 

493 detail = ReleaseDetail(registry_info.name or UNKNOWN_NAME) 

494 

495 detail.notes_url = notes_url 

496 detail.version = registry_info.annotations.get("org.opencontainers.image.version") 

497 detail.revision = registry_info.annotations.get("org.opencontainers.image.revision") 

498 # explicit source_repo_url overrides container, e.g. where container source is only the docker wrapper 

499 detail.source_url = source_repo_url or registry_info.annotations.get("org.opencontainers.image.source") 

500 

501 if detail.source_url is None and registry_info is not None and registry_info.index_name is not None: 

502 registry_config: tuple[str | None, str, str, str | None, str | None] | None = REGISTRIES.get( 

503 registry_info.index_name 

504 ) 

505 repo_template: str | None = registry_config[4] if registry_config else None 

506 if repo_template: 

507 source_url = repo_template.format(image_name=registry_info.name) 

508 if validate_url(source_url, cache_ttl=86400): 508 ↛ 512line 508 didn't jump to line 512 because the condition on line 508 was always true

509 detail.source_url = source_url 

510 self.log.info("Implied source from registry: %s", detail.source_url) 

511 

512 if detail.source_url is None and detail.notes_url is None and detail.revision is None and detail.version is None: 

513 return None 

514 

515 if detail.source_url and "#" in detail.source_url: 

516 detail.source_repo_url = detail.source_url.split("#", 1)[0] 

517 self.log.debug("Simplifying %s from %s", detail.source_repo_url, detail.source_url) 

518 else: 

519 detail.source_repo_url = detail.source_url 

520 

521 detail.source_platform = id_source_platform(detail.source_repo_url) 

522 if not detail.source_platform: 

523 self.log.debug("No known source platform found on container", source=detail.source_repo_url) 

524 return detail 

525 

526 template_vars: dict[str, str | None] = { 

527 "version": detail.version or MISSING_VAL, 

528 "revision": detail.revision or MISSING_VAL, 

529 "repo": detail.source_repo_url or MISSING_VAL, 

530 "source": detail.source_url or MISSING_VAL, 

531 } 

532 

533 diff_url_template: str | None = DIFF_URL_TEMPLATES.get(detail.source_platform) 

534 diff_url: str | None = diff_url_template.format(**template_vars) if diff_url_template else None 

535 if diff_url and MISSING_VAL not in diff_url and validate_url(diff_url, cache_ttl=3600): 535 ↛ 536line 535 didn't jump to line 536 because the condition on line 535 was never true

536 detail.diff_url = diff_url 

537 else: 

538 diff_url = None 

539 

540 if detail.notes_url is None and detail.source_platform in RELEASE_URL_TEMPLATES: 

541 platform_notes_url: str | None = RELEASE_URL_TEMPLATES[detail.source_platform].format(**template_vars) 

542 if ( 542 ↛ 547line 542 didn't jump to line 547 because the condition on line 542 was never true

543 platform_notes_url 

544 and MISSING_VAL not in platform_notes_url 

545 and validate_url(platform_notes_url, cache_ttl=86400) 

546 ): 

547 self.log.debug("Setting default known release notes url: %s", platform_notes_url) 

548 detail.notes_url = platform_notes_url 

549 

550 if detail.notes_url is None and detail.source_platform in UNKNOWN_RELEASE_URL_TEMPLATES: 

551 platform_notes_url = UNKNOWN_RELEASE_URL_TEMPLATES[detail.source_platform].format(**template_vars) 

552 if ( 

553 platform_notes_url 

554 and MISSING_VAL not in platform_notes_url 

555 and validate_url(platform_notes_url, cache_ttl=86400) 

556 ): 

557 self.log.debug("Setting default unknown release notes url: %s", platform_notes_url) 

558 detail.notes_url = platform_notes_url 

559 

560 return detail 

561 

562 

563class AuthError(Exception): 

564 pass 

565 

566 

567class VersionLookup: 

568 def __init__(self) -> None: 

569 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup") 

570 

571 @abstractmethod 

572 def lookup(self, local_image_info: DockerImageInfo, **kwargs) -> DockerImageInfo: # noqa: ANN003 

573 pass 

574 

575 

576class ContainerDistributionAPIVersionLookup(VersionLookup): 

577 def __init__(self, throttler: Throttler, cfg: RegistryConfig) -> None: 

578 self.throttler: Throttler = throttler 

579 self.cfg: RegistryConfig = cfg 

580 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup") 

581 self.api_stats = APIStatsCounter() 

582 

583 def fetch_token(self, registry: str, image_name: str) -> str | None: 

584 default_host: tuple[str, str, str, str, None] = (registry, registry, registry, TOKEN_URL_TEMPLATE, None) 

585 auth_host: str | None = REGISTRIES.get(registry, default_host)[0] 

586 if auth_host is None: 586 ↛ 587line 586 didn't jump to line 587 because the condition on line 586 was never true

587 return None 

588 

589 service: str = REGISTRIES.get(registry, default_host)[2] 

590 url_template: str | None = REGISTRIES.get(registry, default_host)[3] 

591 auth_url: str | None = ( 

592 url_template.format(auth_host=auth_host, image_name=image_name, service=service) if url_template else None 

593 ) 

594 if auth_url is None: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true

595 return None 

596 response: Response | None = fetch_url( 

597 auth_url, cache_ttl=self.cfg.token_cache_ttl, follow_redirects=True, api_stats_counter=self.api_stats 

598 ) 

599 

600 if response and response.is_success: 600 ↛ 608line 600 didn't jump to line 608 because the condition on line 600 was always true

601 api_data = httpx_json_content(response, {}) 

602 token: str | None = api_data.get("token") if api_data else None 

603 if token: 603 ↛ 605line 603 didn't jump to line 605 because the condition on line 603 was always true

604 return token 

605 self.log.warning("No token found in response for %s", auth_url) 

606 raise AuthError(f"No token found in response for {image_name}") 

607 

608 self.log.debug( 

609 "Non-success response at %s fetching token: %s", 

610 auth_url, 

611 (response and response.status_code) or None, 

612 ) 

613 if response and response.status_code == 404: 

614 self.log.debug( 

615 "Default token URL %s not found, calling /v2 endpoint to validate OCI API and provoke auth", auth_url 

616 ) 

617 response = fetch_url( 

618 f"https://{auth_host}/v2", 

619 follow_redirects=True, 

620 allow_stale=False, 

621 cache_ttl=0, 

622 api_stats_counter=self.api_stats, 

623 ) 

624 

625 if response and response.status_code == 401: 

626 auth = response.headers.get("www-authenticate") 

627 if not auth: 

628 self.log.warning("No www-authenticate header found in 401 response for %s", auth_url) 

629 raise AuthError(f"No www-authenticate header found on 401 for {image_name}") 

630 match = re.search(r'realm="([^"]+)",service="([^"]+)",scope="([^"]+)"', auth) 

631 if not match: 

632 self.log.warning("No realm/service/scope found in www-authenticate header for %s", auth_url) 

633 raise AuthError(f"No realm/service/scope found on 401 headers for {image_name}") 

634 

635 realm, service, scope = match.groups() 

636 auth_url = f"{realm}?service={service}&scope={scope}" 

637 response = fetch_url(auth_url, follow_redirects=True, api_stats_counter=self.api_stats) 

638 

639 if response and response.is_success: 

640 token_data = response.json() 

641 self.log.debug("Fetched registry token from %s", auth_url) 

642 return token_data.get("token") 

643 self.log.warning( 

644 "Alternative auth %s with status %s has no token", auth_url, (response and response.status_code) or None 

645 ) 

646 elif response: 

647 self.log.warning("Auth %s failed with status %s", auth_url, (response and response.status_code) or None) 

648 

649 raise AuthError(f"Failed to fetch token for {image_name} at {auth_url}") 

650 

651 def fetch_index( 

652 self, api_host: str, local_image_info: DockerImageInfo, token: str | None 

653 ) -> tuple[Any | None, str | None, CacheMetadata | None]: 

654 if local_image_info.tag: 654 ↛ 658line 654 didn't jump to line 658 because the condition on line 654 was always true

655 api_url: str = f"https://{api_host}/v2/{local_image_info.name}/manifests/{local_image_info.tag}" 

656 cache_ttl: int | None = self.cfg.mutable_cache_ttl 

657 else: 

658 api_url = f"https://{api_host}/v2/{local_image_info.name}/manifests/{local_image_info.pinned_digest}" 

659 cache_ttl = self.cfg.immutable_cache_ttl 

660 

661 response: Response | None = fetch_url( 

662 api_url, 

663 cache_ttl=cache_ttl, 

664 bearer_token=token, 

665 response_type=[ 

666 "application/vnd.oci.image.index.v1+json", 

667 "application/vnd.docker.distribution.manifest.list.v2+json", 

668 ], 

669 api_stats_counter=self.api_stats, 

670 ) 

671 

672 if response is None: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true

673 self.log.warning("Empty response for manifest for image at %s", api_url) 

674 elif response.status_code == 429: 674 ↛ 675line 674 didn't jump to line 675 because the condition on line 674 was never true

675 self.throttler.throttle(local_image_info.index_name, raise_exception=True) 

676 elif not response.is_success: 

677 api_data = httpx_json_content(response, {}) 

678 self.log.warning( 

679 "Failed to fetch index from %s: %s", 

680 api_url, 

681 api_data.get("errors") if api_data else response.text, 

682 ) 

683 else: 

684 index = response.json() 

685 self.log.debug( 

686 "INDEX %s manifests, %s annotations, api: %s, header digest: %s", 

687 len(index.get("manifests", [])), 

688 len(index.get("annotations", [])), 

689 response.headers.get(HEADER_DOCKER_API, "N/A"), 

690 response.headers.get(HEADER_DOCKER_DIGEST, "N/A"), 

691 ) 

692 return index, response.headers.get(HEADER_DOCKER_DIGEST), CacheMetadata(response) 

693 return None, None, None 

694 

695 def fetch_object( 

696 self, 

697 api_host: str, 

698 local_image_info: DockerImageInfo, 

699 media_type: str, 

700 digest: str, 

701 token: str | None, 

702 follow_redirects: bool = False, 

703 api_type: str = "manifests", 

704 ) -> tuple[Any | None, CacheMetadata | None]: 

705 api_url = f"https://{api_host}/v2/{local_image_info.name}/{api_type}/{digest}" 

706 response = fetch_url( 

707 api_url, 

708 cache_ttl=self.cfg.immutable_cache_ttl, 

709 bearer_token=token, 

710 response_type=media_type, 

711 allow_stale=True, 

712 follow_redirects=follow_redirects, 

713 api_stats_counter=self.api_stats, 

714 ) 

715 

716 if response and response.is_success: 

717 obj = httpx_json_content(response, None) 

718 if obj: 718 ↛ 745line 718 didn't jump to line 745 because the condition on line 718 was always true

719 self.log.debug( 

720 "%s, header digest:%s, api: %s, %s annotations", 

721 api_type.upper(), 

722 response.headers.get(HEADER_DOCKER_DIGEST, "N/A"), 

723 response.headers.get(HEADER_DOCKER_API, "N/A"), 

724 len(obj.get("annotations", [])), 

725 ) 

726 return obj, CacheMetadata(response) 

727 elif response and response.status_code == 429: 727 ↛ 728line 727 didn't jump to line 728 because the condition on line 727 was never true

728 self.throttler.throttle(local_image_info.index_name, raise_exception=True) 

729 elif response and not response.is_success: 729 ↛ 744line 729 didn't jump to line 744 because the condition on line 729 was always true

730 api_data = httpx_json_content(response, {}) 

731 if response: 731 ↛ 739line 731 didn't jump to line 739 because the condition on line 731 was always true

732 self.log.warning( 

733 "Failed to fetch obj from %s: %s %s", 

734 api_url, 

735 response.status_code, 

736 api_data.get("errors") if api_data else response.text, 

737 ) 

738 else: 

739 self.log.warning( 

740 "Failed to fetch obj from %s: No Response, %s", api_url, api_data.get("errors") if api_data else None 

741 ) 

742 

743 else: 

744 self.log.error("Empty response from %s", api_url) 

745 return None, None 

746 

747 def lookup( 

748 self, 

749 local_image_info: DockerImageInfo, 

750 token: str | None = None, 

751 minimal: bool = False, 

752 **kwargs, # noqa: ANN003, ARG002 

753 ) -> DockerImageInfo: 

754 result: DockerImageInfo = DockerImageInfo(local_image_info.ref) 

755 if not local_image_info.name or not local_image_info.index_name: 755 ↛ 756line 755 didn't jump to line 756 because the condition on line 755 was never true

756 self.log.debug("No local pkg name or registry index name to check") 

757 return result 

758 

759 if self.throttler.check_throttle(local_image_info.index_name): 759 ↛ 760line 759 didn't jump to line 760 because the condition on line 759 was never true

760 result.throttled = True 

761 return result 

762 

763 if token: 763 ↛ 764line 763 didn't jump to line 764 because the condition on line 763 was never true

764 self.log.debug("Using provided token to fetch manifest for image %s", local_image_info.ref) 

765 else: 

766 try: 

767 token = self.fetch_token(local_image_info.index_name, local_image_info.name) 

768 except AuthError as e: 

769 self.log.warning("Authentication error prevented Docker Registry enrichment: %s", e) 

770 result.error = str(e) 

771 return result 

772 

773 index: Any | None = None 

774 index_digest: str | None = None # fetched from header, should be the image digest 

775 index_cache_metadata: CacheMetadata | None = None 

776 manifest_cache_metadata: CacheMetadata | None = None 

777 config_cache_metadata: CacheMetadata | None = None 

778 api_host: str | None = REGISTRIES.get( 

779 local_image_info.index_name, (local_image_info.index_name, local_image_info.index_name) 

780 )[1] 

781 if api_host is None: 781 ↛ 782line 781 didn't jump to line 782 because the condition on line 781 was never true

782 self.log("No API host can be determined for %s", local_image_info.index_name) 

783 return result 

784 try: 

785 index, index_digest, index_cache_metadata = self.fetch_index(api_host, local_image_info, token) 

786 except ThrottledError: 

787 result.throttled = True 

788 index = None 

789 

790 if index: 

791 result.annotations = index.get("annotations", {}) 

792 for m in index.get("manifests", []): 

793 platform_info = m.get("platform", {}) 

794 if ( 

795 platform_info.get("os") == local_image_info.os 

796 and platform_info.get("architecture") == local_image_info.arch 

797 and ("Variant" not in platform_info or platform_info.get("Variant") == local_image_info.variant) 

798 ): 

799 if index_digest: 

800 result.image_digest = index_digest 

801 result.short_digest = result.condense_digest(index_digest) 

802 self.log.debug("Setting %s image digest %s", result.name, result.short_digest) 

803 

804 digest: str | None = m.get("digest") 

805 media_type = m.get("mediaType") 

806 manifest: Any | None = None 

807 

808 if digest: 808 ↛ 816line 808 didn't jump to line 816 because the condition on line 808 was always true

809 try: 

810 manifest, manifest_cache_metadata = self.fetch_object( 

811 api_host, local_image_info, media_type, digest, token 

812 ) 

813 except ThrottledError: 

814 result.throttled = True 

815 

816 if manifest: 

817 digest = manifest.get("config", {}).get("digest") 

818 if digest is None: 818 ↛ 819line 818 didn't jump to line 819 because the condition on line 818 was never true

819 self.log.warning("Empty digest for %s %s %s", api_host, digest, media_type) 

820 else: 

821 result.repo_digest = result.condense_digest(digest, short=False) 

822 self.log.debug("Setting %s repo digest: %s", result.name, result.repo_digest) 

823 

824 if manifest.get("annotations"): 

825 result.annotations.update(manifest.get("annotations", {})) 

826 else: 

827 self.log.debug("No annotations found in manifest: %s", manifest) 

828 

829 if not minimal and manifest.get("config"): 829 ↛ 792line 829 didn't jump to line 792 because the condition on line 829 was always true

830 try: 

831 img_config, config_cache_metadata = self.fetch_object( 

832 api_host=api_host, 

833 local_image_info=local_image_info, 

834 media_type=manifest["config"].get("mediaType"), 

835 digest=manifest["config"].get("digest"), 

836 token=token, 

837 follow_redirects=True, 

838 api_type="blobs", 

839 ) 

840 if img_config: 

841 config = img_config.get("config") or img_config.get("Config") 

842 if config and "Labels" in config: 842 ↛ 844line 842 didn't jump to line 844 because the condition on line 842 was always true

843 result.annotations.update(config.get("Labels") or {}) 

844 result.annotations.update(img_config.get("annotations") or {}) 

845 result.created = config.get("created") or config.get("Created") 

846 else: 

847 self.log.debug("No config found: %s", manifest) 

848 except Exception as e: 

849 self.log.warning("Failed to extract %s image info from config: %s", local_image_info.ref, e) 

850 

851 if not result.annotations: 

852 self.log.debug("No annotations found from registry data") 

853 

854 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(local_image_info, result) 

855 result.custom = labels or {} 

856 if index_cache_metadata: 

857 result.custom["index_cache_age"] = index_cache_metadata.age 

858 if manifest_cache_metadata: 

859 result.custom["manifest_cache_age"] = manifest_cache_metadata.age 

860 if config_cache_metadata: 

861 result.custom["config_cache_age"] = config_cache_metadata.age 

862 result.version = cast("str|None", labels.get("image_version")) 

863 result.origin = "OCI_V2" if not minimal else "OCI_V2_MINIMAL" 

864 

865 self.log.debug( 

866 "OCI_V2 Lookup for %s: short_digest:%s, repo_digest:%s, version: %s", 

867 local_image_info.name, 

868 result.short_digest, 

869 result.repo_digest, 

870 result.version, 

871 ) 

872 return result 

873 

874 

875class DockerClientVersionLookup(VersionLookup): 

876 """Query remote registry via local Docker API 

877 

878 No auth needed, however uses the old v1 APIs, and only Index available via API 

879 """ 

880 

881 def __init__(self, client: docker.DockerClient, throttler: Throttler, cfg: RegistryConfig, api_backoff: int = 30) -> None: 

882 self.client: docker.DockerClient = client 

883 self.throttler: Throttler = throttler 

884 self.cfg: RegistryConfig = cfg 

885 self.api_backoff: int = api_backoff 

886 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup") 

887 

888 def lookup(self, local_image_info: DockerImageInfo, retries: int = 3, **kwargs) -> DockerImageInfo: # noqa: ANN003, ARG002 

889 retries_left = retries 

890 retry_secs: int = self.api_backoff 

891 reg_data: RegistryData | None = None 

892 

893 result = DockerImageInfo(local_image_info.ref) 

894 if local_image_info.index_name is None or local_image_info.ref is None: 894 ↛ 895line 894 didn't jump to line 895 because the condition on line 894 was never true

895 return result 

896 

897 while reg_data is None and retries_left > 0: 

898 if self.throttler.check_throttle(local_image_info.index_name): 

899 result.throttled = True 

900 break 

901 try: 

902 self.log.debug("Fetching registry data", image_ref=local_image_info.ref) 

903 reg_data = self.client.images.get_registry_data(local_image_info.ref) 

904 self.log.debug( 

905 "Registry Data: id:%s,image:%s, attrs:%s", 

906 reg_data.id, 

907 reg_data.image_name, 

908 reg_data.attrs, 

909 ) 

910 if reg_data: 910 ↛ 897line 910 didn't jump to line 897 because the condition on line 910 was always true

911 result.short_digest = result.condense_digest(reg_data.short_id) 

912 result.image_digest = result.condense_digest(reg_data.id, short=False) 

913 # result.name = reg_data.image_name 

914 result.attributes = reg_data.attrs 

915 result.annotations = reg_data.attrs.get("Config", {}).get("Labels") or {} 

916 result.error = None 

917 

918 except docker.errors.APIError as e: 

919 if e.status_code == HTTPStatus.TOO_MANY_REQUESTS: 919 ↛ 928line 919 didn't jump to line 928 because the condition on line 919 was always true

920 retry_secs = round(retry_secs**1.5) 

921 try: 

922 retry_secs = int(e.response.headers.get("Retry-After", -1)) # type: ignore[union-attr] 

923 except Exception as e2: 

924 self.log.debug("Failed to access headers for retry info: %s", e2) 

925 self.throttler.throttle(local_image_info.index_name, retry_secs, e.explanation) 

926 result.throttled = True 

927 return result 

928 result.error = str(e) 

929 retries_left -= 1 

930 if retries_left == 0 or e.is_client_error(): 

931 self.log.warn("Failed to fetch registry data: [%s] %s", e.errno, e.explanation) 

932 else: 

933 self.log.debug("Failed to fetch registry data, retrying: %s", e) 

934 

935 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(local_image_info, result) 

936 result.custom = labels or {} 

937 result.version = cast("str|None", labels.get("image_version")) 

938 result.created = cast("str|None", labels.get("image_created")) 

939 result.origin = "DOCKER_CLIENT" 

940 return result