Coverage for src / updates2mqtt / integrations / docker_enrich.py: 76%

539 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 23:58 +0000

1import re 

2import typing 

3from abc import abstractmethod 

4from typing import Any, cast 

5 

6import structlog 

7from docker.auth import resolve_repository_name 

8from docker.models.containers import Container 

9from httpx import Response 

10from omegaconf import MissingMandatoryValue, OmegaConf, ValidationError 

11 

12from updates2mqtt.helpers import APIStatsCounter, CacheMetadata, ThrottledError, Throttler, fetch_url, validate_url 

13from updates2mqtt.model import DiscoveryArtefactDetail, DiscoveryInstallationDetail, ReleaseDetail 

14 

15if typing.TYPE_CHECKING: 

16 from docker.models.images import RegistryData 

17 from omegaconf.dictconfig import DictConfig 

18from http import HTTPStatus 

19 

20import docker 

21import docker.errors 

22 

23from updates2mqtt.config import ( 

24 PKG_INFO_FILE, 

25 CommonPackages, 

26 DockerConfig, 

27 DockerPackageUpdateInfo, 

28 GitHubConfig, 

29 PackageUpdateInfo, 

30 RegistryConfig, 

31 VersionPolicy, 

32) 

33 

34log: Any = structlog.get_logger() 

35 

36SOURCE_PLATFORM_GITHUB = "GitHub" 

37SOURCE_PLATFORM_CODEBERG = "CodeBerg" 

38SOURCE_PLATFORM_GITLAB = "GitLab" 

39SOURCE_PLATFORMS = { 

40 SOURCE_PLATFORM_GITHUB: r"https://github.com/.*", 

41 SOURCE_PLATFORM_GITLAB: r"https://gitlab.com/.*", 

42 SOURCE_PLATFORM_CODEBERG: r"https://codeberg.org/.*", 

43} 

44DIFF_URL_TEMPLATES = { 

45 SOURCE_PLATFORM_GITHUB: "{repo}/commit/{revision}", 

46} 

47RELEASE_URL_TEMPLATES = { 

48 SOURCE_PLATFORM_GITHUB: "{repo}/releases/tag/{version}", 

49} 

50UNKNOWN_RELEASE_URL_TEMPLATES = { 

51 SOURCE_PLATFORM_GITHUB: "{repo}/releases", 

52 SOURCE_PLATFORM_GITLAB: "{repo}/container_registry", 

53} 

54MISSING_VAL = "**MISSING**" 

55UNKNOWN_REGISTRY = "**UNKNOWN_REGISTRY**" 

56 

57HEADER_DOCKER_DIGEST = "docker-content-digest" 

58HEADER_DOCKER_API = "docker-distribution-api-version" 

59 

60TOKEN_URL_TEMPLATE = "https://{auth_host}/token?scope=repository:{image_name}:pull&service={service}" # noqa: S105 # nosec 

61 

62REGISTRIES: dict[str, tuple[str | None, str, str, str | None, str | None]] = { 

63 # registry: (auth_host, api_host, service, url_template, repo_template) 

64 "docker.io": ("auth.docker.io", "registry-1.docker.io", "registry.docker.io", TOKEN_URL_TEMPLATE, None), 

65 "mcr.microsoft.com": (None, "mcr.microsoft.com", "mcr.microsoft.com", None, None), 

66 "quay.io": (None, "quay.io", "quay.io", TOKEN_URL_TEMPLATE, None), 

67 "ghcr.io": ("ghcr.io", "ghcr.io", "ghcr.io", TOKEN_URL_TEMPLATE, "https://github.com/{image_name}"), 

68 "lscr.io": ("ghcr.io", "lscr.io", "ghcr.io", TOKEN_URL_TEMPLATE, None), 

69 "codeberg.org": ( 

70 "codeberg.org", 

71 "codeberg.org", 

72 "container_registry", 

73 TOKEN_URL_TEMPLATE, 

74 "https://codeberg.org/{image_name}", 

75 ), 

76 "registry.gitlab.com": ( 

77 "www.gitlab.com", 

78 "registry.gitlab.com", 

79 "container_registry", 

80 "https://{auth_host}/jwt/auth?service={service}&scope=repository:{image_name}:pull&offline_token=true&client_id=docker", 

81 "https://gitlab.com/{image_name}", 

82 ), 

83} 

84 

85# source: https://specs.opencontainers.org/distribution-spec/?v=v1.0.0#pull 

86OCI_NAME_RE = r"[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*(\/[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*)*" 

87OCI_TAG_RE = r"[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}" 

88 

89 

90class DockerImageInfo(DiscoveryArtefactDetail): 

91 """Normalize and shlep around the bits of an image def 

92 

93 index_name: aka index_name, e.g. ghcr.io 

94 name: image ref without index name or tag, e.g. nginx, or librenms/librenms 

95 tag: tag or digest 

96 untagged_ref: combined index name and package name 

97 """ 

98 

99 def __init__( 

100 self, 

101 ref: str, # ref with optional index name and tag or digest, index:name:tag_or_digest 

102 image_digest: str | None = None, 

103 tags: list[str] | None = None, 

104 attributes: dict[str, Any] | None = None, 

105 annotations: dict[str, Any] | None = None, 

106 platform: str | None = None, # test harness simplification 

107 version: str | None = None, # test harness simplification 

108 created: str | None = None, 

109 ) -> None: 

110 super().__init__() 

111 self.ref: str = ref 

112 self.version: str | None = version 

113 self.image_digest: str | None = image_digest 

114 self.short_digest: str | None = None 

115 self.repo_digest: str | None = None # the single RepoDigest known to match registry 

116 self.git_digest: str | None = None 

117 self.index_name: str | None = None 

118 self.name: str | None = None 

119 self.tag: str | None = None 

120 self.pinned_digest: str | None = None 

121 # untagged ref using combined index and remote name used only for pattern matching common pkg info 

122 self.untagged_ref: str | None = None # index_name/remote_name used for pkg match 

123 self.tag_or_digest: str | None = None # index_name/remote_name:**tag_or_digest** 

124 self.tags = tags 

125 self.attributes: dict[str, Any] = attributes or {} 

126 self.annotations: dict[str, Any] = annotations or {} 

127 self.throttled: bool = False 

128 self.origin: str | None = None 

129 self.error: str | None = None 

130 self.platform: str | None = platform 

131 self.custom: dict[str, str | float | int | bool | None] = {} 

132 self.created: str | None = created 

133 

134 self.local_build: bool = not self.repo_digests 

135 self.index_name, remote_name = resolve_repository_name(ref) 

136 

137 self.name = remote_name 

138 

139 if remote_name and ":" in remote_name and ("@" not in remote_name or remote_name.index("@") > remote_name.index(":")): 

140 # name:tag format 

141 self.name, self.tag_or_digest = remote_name.split(":", 1) 

142 self.untagged_ref = ref.split(":", 1)[0] 

143 self.tag = self.tag_or_digest 

144 

145 elif remote_name and "@" in remote_name: 

146 # name@digest format 

147 self.name, self.tag_or_digest = remote_name.split("@", 1) 

148 self.untagged_ref = ref.split("@", 1)[0] 

149 self.pinned_digest = self.tag_or_digest 

150 

151 if self.tag and "@" in self.tag: 

152 # name:tag@digest format 

153 # for pinned tags, care only about the digest part 

154 self.tag, self.tag_or_digest = self.tag.split("@", 1) 

155 self.pinned_digest = self.tag_or_digest 

156 if self.tag_or_digest is None: 

157 self.tag_or_digest = "latest" 

158 self.untagged_ref = ref 

159 self.tag = self.tag_or_digest 

160 

161 if self.repo_digest is None and len(self.repo_digests) == 1: 

162 # definite known RepoDigest 

163 # if its ambiguous, the final version selection will handle it 

164 self.repo_digest = self.repo_digests[0] 

165 

166 if self.index_name == "docker.io" and "/" not in self.name: 

167 # "official Docker images have an abbreviated library/foo name" 

168 self.name = f"library/{self.name}" 

169 if self.name is not None and not re.match(OCI_NAME_RE, self.name): 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 log.warning("Invalid OCI image name: %s", self.name) 

171 if self.tag and not re.match(OCI_TAG_RE, self.tag): 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true

172 log.warning("Invalid OCI image tag: %s", self.tag) 

173 

174 if self.os and self.arch: 

175 self.platform = "/".join( 

176 filter( 

177 None, 

178 [self.os, self.arch, self.variant], 

179 ), 

180 ) 

181 

182 if self.image_digest is not None: 

183 self.image_digest = self.condense_digest(self.image_digest, short=False) 

184 self.short_digest = self.condense_digest(self.image_digest) # type: ignore[arg-type] 

185 

186 @property 

187 def repo_digests(self) -> list[str]: 

188 if self.repo_digest: 

189 return [self.repo_digest] 

190 # RepoDigest in image inspect, Registry Config object 

191 digests = [v.split("@", 1)[1] if "@" in v else v for v in self.attributes.get("RepoDigests", [])] 

192 return digests or [] 

193 

194 @property 

195 def pinned(self) -> bool: 

196 """Check if this is pinned and installed version consistent with pin""" 

197 return bool(self.pinned_digest and self.pinned_digest in self.repo_digests) 

198 

199 @property 

200 def os(self) -> str | None: 

201 return self.attributes.get("Os") 

202 

203 @property 

204 def arch(self) -> str | None: 

205 return self.attributes.get("Architecture") 

206 

207 @property 

208 def variant(self) -> str | None: 

209 return self.attributes.get("Variant") 

210 

211 def condense_digest(self, digest: str, short: bool = True) -> str | None: 

212 try: 

213 digest = digest.split("@")[1] if "@" in digest else digest # fully qualified RepoDigest 

214 if short: 

215 digest = digest.split(":")[1] if ":" in digest else digest # remove digest type prefix 

216 return digest[0:12] 

217 return digest 

218 except Exception as e: 

219 log.warning("Unable to condense digest %s: %s", digest, e) 

220 return None 

221 

222 def reuse(self) -> "DockerImageInfo": 

223 cloned = DockerImageInfo( 

224 self.ref, self.image_digest, self.tags, self.attributes, self.annotations, self.version, self.created 

225 ) 

226 cloned.origin = "REUSED" 

227 return cloned 

228 

229 def as_dict(self, minimal: bool = True) -> dict[str, str | list | dict | bool | int | None]: 

230 result: dict[str, str | list | dict | bool | int | None] = { 

231 "captured": self.captured.isoformat(), 

232 "image_ref": self.ref, 

233 "name": self.name, 

234 "version": self.version, 

235 "image_digest": self.image_digest, 

236 "repo_digest": self.repo_digest, 

237 "repo_digests": self.repo_digest, 

238 "git_digest": self.git_digest, 

239 "index_name": self.index_name, 

240 "tag": self.tag, 

241 "pinned_digest": self.pinned_digest, 

242 "tag_or_digest": self.tag_or_digest, 

243 "tags": self.tags, 

244 "origin": self.origin, 

245 "platform": self.platform, 

246 "local_build": self.local_build, 

247 "error": self.error, 

248 "throttled": self.throttled, 

249 "custom": self.custom, 

250 } 

251 if not minimal: 

252 result["attributes"] = self.attributes 

253 result["annotations"] = self.annotations 

254 return result 

255 

256 

257def id_source_platform(source: str | None) -> str | None: 

258 candidates: list[str] = [platform for platform, pattern in SOURCE_PLATFORMS.items() if re.match(pattern, source or "")] 

259 return candidates[0] if candidates else None 

260 

261 

262def _select_annotation( 

263 name: str, key: str, local_info: DockerImageInfo | None = None, registry_info: DockerImageInfo | None = None 

264) -> dict[str, str | None]: 

265 result: dict[str, str | None] = {} 

266 if registry_info: 

267 v: Any | None = registry_info.annotations.get(key) 

268 if v is not None: 

269 result[name] = v 

270 elif local_info: 270 ↛ 274line 270 didn't jump to line 274 because the condition on line 270 was always true

271 v = local_info.annotations.get(key) 

272 if v is not None: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 result[name] = v 

274 return result 

275 

276 

277def cherrypick_annotations( 

278 local_info: DockerImageInfo, registry_info: DockerImageInfo | None 

279) -> dict[str, str | float | int | bool | None]: 

280 """https://github.com/opencontainers/image-spec/blob/main/annotations.md""" 

281 results: dict[str, str | float | int | bool | None] = {} 

282 for either_name, either_label in [ 

283 ("documentation_url", "org.opencontainers.image.documentation"), 

284 ("description", "org.opencontainers.image.description"), 

285 ("licences", "org.opencontainers.image.licenses"), 

286 ("image_base", "org.opencontainers.image.base.name"), 

287 ("image_created", "org.opencontainers.image.created"), 

288 ("image_version", "org.opencontainers.image.version"), 

289 ("image_revision", "org.opencontainers.image.revision"), 

290 ("ref_name", "org.opencontainers.image.ref.name"), 

291 ("title", "org.opencontainers.image.title"), 

292 ("vendor", "org.opencontainers.image.vendor"), 

293 ("source", "org.opencontainers.image.source"), 

294 ]: 

295 results.update(_select_annotation(either_name, either_label, local_info, registry_info)) 

296 if ( 

297 results.get("ref_name") == "ubuntu" 

298 and local_info.name != "ubuntu" 

299 and results.get("image_version") 

300 and re.fullmatch(r"^2\d\.\d\d$", cast("str", results["image_version"])) 

301 ): 

302 log.debug( 

303 "Suppressing %s base %s version leaking into image version: %s", 

304 local_info.name, 

305 results["ref_name"], 

306 results["image_version"], 

307 ) 

308 del results["image_version"] 

309 return results 

310 

311 

312class DockerServiceDetails(DiscoveryInstallationDetail): 

313 def __init__( 

314 self, 

315 container_name: str | None = None, 

316 compose_path: str | None = None, 

317 compose_version: str | None = None, 

318 compose_service: str | None = None, 

319 git_repo_path: str | None = None, 

320 ) -> None: 

321 self.container_name: str | None = container_name 

322 self.compose_path: str | None = compose_path 

323 self.compose_version: str | None = compose_version 

324 self.compose_service: str | None = compose_service 

325 self.git_repo_path: str | None = git_repo_path 

326 self.git_local_timestamp: str | None = None 

327 

328 def as_dict(self) -> dict[str, str | list | dict | bool | int | None]: 

329 results: dict[str, str | list | dict | bool | int | None] = { 

330 "container_name": self.container_name, 

331 "compose_path": self.compose_path, 

332 "compose_service": self.compose_service, 

333 "compose_version": self.compose_version, 

334 } 

335 if self.git_local_timestamp: 

336 results["git_local_timestamp"] = self.git_local_timestamp 

337 if self.git_repo_path: 

338 results["git_repo_path"] = self.git_repo_path 

339 return results 

340 

341 

342class LocalContainerInfo: 

343 def build_image_info(self, container: Container) -> tuple[DockerImageInfo, DockerServiceDetails]: 

344 """Image contents equiv to `docker inspect image <image_ref>`""" 

345 # container image can be none if someone ran `docker rmi -f` 

346 # so although this could be sourced from image, like `container.image.tags[0]` 

347 # use the container ref instead, which survives monkeying about with images 

348 image_ref: str = container.attrs.get("Config", {}).get("Image") or "" 

349 image_digest = container.attrs.get("Image") 

350 

351 image_info: DockerImageInfo = DockerImageInfo( 

352 image_ref, 

353 image_digest=image_digest, 

354 tags=container.image.tags if container and container.image else None, 

355 annotations=container.image.labels if container.image else None, 

356 attributes=container.image.attrs if container.image else None, 

357 ) 

358 service_info: DockerServiceDetails = DockerServiceDetails( 

359 container.name, 

360 compose_path=container.labels.get("com.docker.compose.project.working_dir"), 

361 compose_service=container.labels.get("com.docker.compose.service"), 

362 compose_version=container.labels.get("com.docker.compose.version"), 

363 ) 

364 

365 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(image_info, None) 

366 # capture container labels/annotations, not image ones 

367 labels = labels or {} 

368 if container.image and container.image.attrs: 368 ↛ 370line 368 didn't jump to line 370 because the condition on line 368 was always true

369 image_info.created = container.image.attrs.get("Created") 

370 image_info.custom = labels 

371 image_info.version = cast("str|None", labels.get("image_version")) 

372 

373 return image_info, service_info 

374 

375 

376class PackageEnricher: 

377 def __init__(self, docker_cfg: DockerConfig, packages: dict[str, PackageUpdateInfo] | None = None) -> None: 

378 self.pkgs: dict[str, PackageUpdateInfo] = packages or {} 

379 self.cfg: DockerConfig = docker_cfg 

380 self.log: Any = structlog.get_logger().bind(integration="docker") 

381 

382 def initialize(self) -> None: 

383 pass 

384 

385 def enrich(self, image_info: DockerImageInfo) -> PackageUpdateInfo | None: 

386 def match(pkg: PackageUpdateInfo) -> bool: 

387 if pkg is not None and pkg.docker is not None and pkg.docker.image_name is not None: 387 ↛ 392line 387 didn't jump to line 392 because the condition on line 387 was always true

388 if image_info.untagged_ref is not None and image_info.untagged_ref == pkg.docker.image_name: 

389 return True 

390 if image_info.ref is not None and image_info.ref == pkg.docker.image_name: 

391 return True 

392 return False 

393 

394 if image_info.untagged_ref is not None and image_info.ref is not None: 394 ↛ 404line 394 didn't jump to line 404 because the condition on line 394 was always true

395 for pkg in self.pkgs.values(): 

396 if match(pkg): 

397 self.log.debug( 

398 "Found common package", 

399 image_name=pkg.docker.image_name, # type: ignore [union-attr] 

400 logo_url=pkg.logo_url, 

401 relnotes_url=pkg.release_notes_url, 

402 ) 

403 return pkg 

404 return None 

405 

406 

407class DefaultPackageEnricher(PackageEnricher): 

408 def enrich(self, image_info: DockerImageInfo) -> PackageUpdateInfo | None: 

409 self.log.debug("Default pkg info", image_name=image_info.untagged_ref, image_ref=image_info.ref) 

410 return PackageUpdateInfo( 

411 DockerPackageUpdateInfo(image_info.untagged_ref or image_info.ref, version_policy=VersionPolicy.AUTO), 

412 logo_url=self.cfg.default_entity_picture_url, 

413 release_notes_url=None, 

414 ) 

415 

416 

417class CommonPackageEnricher(PackageEnricher): 

418 def initialize(self) -> None: 

419 base_cfg: DictConfig = OmegaConf.structured(CommonPackages) 

420 if PKG_INFO_FILE.exists(): 420 ↛ 427line 420 didn't jump to line 427 because the condition on line 420 was always true

421 self.log.debug("Loading common package update info", path=PKG_INFO_FILE) 

422 cfg: DictConfig = typing.cast("DictConfig", OmegaConf.merge(base_cfg, OmegaConf.load(PKG_INFO_FILE))) 

423 

424 OmegaConf.to_container(cfg, throw_on_missing=True) 

425 OmegaConf.set_readonly(cfg, True) 

426 else: 

427 self.log.warn("No common package update info found", path=PKG_INFO_FILE) 

428 cfg = base_cfg 

429 try: 

430 common_config: CommonPackages = typing.cast("CommonPackages", cfg) 

431 # omegaconf broken-ness on optional fields and converting to backclasses 

432 self.pkgs = common_config.common_packages 

433 # self.pkgs: dict[str, PackageUpdateInfo] = { 

434 # pkg: PackageUpdateInfo(**pkg_cfg) for pkg, pkg_cfg in cfg.common_packages.items() if pkg not in self.pkgs 

435 # } 

436 except (MissingMandatoryValue, ValidationError) as e: 

437 self.log.serror("Configuration error %s", e, path=PKG_INFO_FILE.as_posix()) 

438 raise 

439 

440 

441class LinuxServerIOPackageEnricher(PackageEnricher): 

442 def initialize(self) -> None: 

443 cfg = self.cfg.discover_metadata.get("linuxserver.io") 

444 if cfg is None or not cfg.enabled: 

445 return 

446 

447 self.log.debug(f"Fetching linuxserver.io metadata from API, cache_ttl={cfg.cache_ttl}") 

448 response: Response | None = fetch_url( 

449 "https://api.linuxserver.io/api/v1/images?include_config=false&include_deprecated=false", 

450 cache_ttl=cfg.cache_ttl, 

451 ) 

452 if response and response.is_success: 

453 api_data: Any = response.json() 

454 repos: list = api_data.get("data", {}).get("repositories", {}).get("linuxserver", []) 

455 else: 

456 return 

457 

458 added = 0 

459 for repo in repos: 

460 image_name = repo.get("name") 

461 if image_name and image_name not in self.pkgs: 461 ↛ 459line 461 didn't jump to line 459 because the condition on line 461 was always true

462 self.pkgs[image_name] = PackageUpdateInfo( 

463 DockerPackageUpdateInfo(f"lscr.io/linuxserver/{image_name}"), 

464 logo_url=repo["project_logo"], 

465 release_notes_url=f"{repo['github_url']}/releases", 

466 ) 

467 added += 1 

468 self.log.debug(f"Added {added} linuxserver.io package details") 

469 

470 

471class SourceReleaseEnricher: 

472 def __init__(self, gh_cfg: GitHubConfig | None = None) -> None: 

473 self.log: Any = structlog.get_logger().bind(integration="docker") 

474 self.gh_cfg: GitHubConfig | None = gh_cfg 

475 

476 def enrich( 

477 self, registry_info: DockerImageInfo, source_repo_url: str | None = None, notes_url: str | None = None 

478 ) -> ReleaseDetail | None: 

479 detail = ReleaseDetail() 

480 

481 detail.notes_url = notes_url 

482 detail.version = registry_info.annotations.get("org.opencontainers.image.version") 

483 detail.revision = registry_info.annotations.get("org.opencontainers.image.revision") 

484 # explicit source_repo_url overrides container, e.g. where container source is only the docker wrapper 

485 detail.source_url = source_repo_url or registry_info.annotations.get("org.opencontainers.image.source") 

486 

487 if detail.source_url is None and registry_info is not None and registry_info.index_name is not None: 

488 registry_config: tuple[str | None, str, str, str | None, str | None] | None = REGISTRIES.get( 

489 registry_info.index_name 

490 ) 

491 repo_template: str | None = registry_config[4] if registry_config else None 

492 if repo_template: 

493 source_url = repo_template.format(image_name=registry_info.name) 

494 if validate_url(source_url, cache_ttl=86400): 494 ↛ 498line 494 didn't jump to line 498 because the condition on line 494 was always true

495 detail.source_url = source_url 

496 self.log.info("Implied source from registry: %s", detail.source_url) 

497 

498 if detail.source_url is None and detail.notes_url is None and detail.revision is None and detail.version is None: 

499 return None 

500 

501 if detail.source_url and "#" in detail.source_url: 

502 detail.source_repo_url = detail.source_url.split("#", 1)[0] 

503 self.log.debug("Simplifying %s from %s", detail.source_repo_url, detail.source_url) 

504 else: 

505 detail.source_repo_url = detail.source_url 

506 

507 detail.source_platform = id_source_platform(detail.source_repo_url) 

508 if not detail.source_platform: 

509 self.log.debug("No known source platform found on container", source=detail.source_repo_url) 

510 return detail 

511 

512 template_vars: dict[str, str | None] = { 

513 "version": detail.version or MISSING_VAL, 

514 "revision": detail.revision or MISSING_VAL, 

515 "repo": detail.source_repo_url or MISSING_VAL, 

516 "source": detail.source_url or MISSING_VAL, 

517 } 

518 

519 diff_url_template: str | None = DIFF_URL_TEMPLATES.get(detail.source_platform) 

520 diff_url: str | None = diff_url_template.format(**template_vars) if diff_url_template else None 

521 if diff_url and MISSING_VAL not in diff_url and validate_url(diff_url, cache_ttl=3600): 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true

522 detail.diff_url = diff_url 

523 else: 

524 diff_url = None 

525 

526 if detail.notes_url is None and detail.source_platform in RELEASE_URL_TEMPLATES: 

527 platform_notes_url: str | None = RELEASE_URL_TEMPLATES[detail.source_platform].format(**template_vars) 

528 if ( 528 ↛ 533line 528 didn't jump to line 533 because the condition on line 528 was never true

529 platform_notes_url 

530 and MISSING_VAL not in platform_notes_url 

531 and validate_url(platform_notes_url, cache_ttl=86400) 

532 ): 

533 self.log.debug("Setting default known release notes url: %s", platform_notes_url) 

534 detail.notes_url = platform_notes_url 

535 

536 if detail.notes_url is None and detail.source_platform in UNKNOWN_RELEASE_URL_TEMPLATES: 

537 platform_notes_url = UNKNOWN_RELEASE_URL_TEMPLATES[detail.source_platform].format(**template_vars) 

538 if ( 

539 platform_notes_url 

540 and MISSING_VAL not in platform_notes_url 

541 and validate_url(platform_notes_url, cache_ttl=86400) 

542 ): 

543 self.log.debug("Setting default unknown release notes url: %s", platform_notes_url) 

544 detail.notes_url = platform_notes_url 

545 

546 if detail.source_platform == SOURCE_PLATFORM_GITHUB and detail.source_repo_url and detail.version is not None: 

547 access_token: str | None = self.gh_cfg.access_token if self.gh_cfg else None 

548 if access_token: 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true

549 self.log.debug("Using configured bearer token (%s chars) for GitHub API", len(access_token)) 

550 base_api = detail.source_repo_url.replace("https://github.com", "https://api.github.com/repos") 

551 

552 api_response: Response | None = fetch_url( 

553 f"{base_api}/releases/tags/{detail.version}", bearer_token=access_token, allow_stale=True 

554 ) 

555 if api_response and api_response.status_code == 404: 555 ↛ 575line 555 didn't jump to line 575 because the condition on line 555 was always true

556 # possible that source version doesn't match release gag 

557 alt_api_response: Response | None = fetch_url(f"{base_api}/releases/latest", bearer_token=access_token) 

558 if alt_api_response and alt_api_response.is_success: 558 ↛ 559line 558 didn't jump to line 559 because the condition on line 558 was never true

559 alt_api_results = httpx_json_content(alt_api_response, {}) 

560 if alt_api_results and re.fullmatch(f"(V|v|r|R)?{detail.version}", alt_api_results.get("tag_name")): 

561 self.log.info( 

562 f"Matched {registry_info.name} {detail.version} to latest release {alt_api_results['tag_name']}" 

563 ) 

564 api_response = alt_api_response 

565 elif alt_api_results: 

566 self.log.debug( 

567 "Failed to match %s release %s on GitHub, found tag %s for name %s published at %s", 

568 registry_info.name, 

569 detail.version, 

570 alt_api_results.get("tag_name"), 

571 alt_api_results.get("name"), 

572 alt_api_results.get("published_at"), 

573 ) 

574 

575 if api_response and api_response.is_success: 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true

576 api_results: Any = httpx_json_content(api_response, {}) 

577 detail.summary = api_results.get("body") # ty:ignore[possibly-missing-attribute] 

578 reactions = api_results.get("reactions") # ty:ignore[possibly-missing-attribute] 

579 if reactions: 

580 detail.net_score = reactions.get("+1", 0) - reactions.get("-1", 0) 

581 elif api_response: 581 ↛ 591line 581 didn't jump to line 591 because the condition on line 581 was always true

582 api_results = httpx_json_content(api_response, default={}) 

583 self.log.debug( 

584 "Failed to find %s release %s on GitHub, status %s, errors; %s", 

585 registry_info.name, 

586 detail.version, 

587 api_response.status_code, 

588 api_results.get("errors"), 

589 ) 

590 else: 

591 self.log.debug( 

592 "Failed to fetch GitHub release info", 

593 url=f"{base_api}/releases/tags/{detail.version}", 

594 status_code=(api_response and api_response.status_code) or None, 

595 ) 

596 if not detail.summary and detail.diff_url: 596 ↛ 597line 596 didn't jump to line 597 because the condition on line 596 was never true

597 detail.summary = f"<a href='{detail.diff_url}'>{detail.version or detail.revision} Diff</a>" 

598 return detail 

599 

600 

601class AuthError(Exception): 

602 pass 

603 

604 

605def httpx_json_content(response: Response, default: Any = None) -> Any | None: 

606 if response and "json" in response.headers.get("content-type", ""): 

607 try: 

608 return response.json() 

609 except Exception: 

610 log.debug("Failed to parse JSON response: %s", response.text) 

611 elif response and response.headers.get("content-type", "") == "application/octet-stream": 

612 # blob could return a gzip layer tarball, however assumed only index, manifest or config requested 

613 try: 

614 return response.json() 

615 except Exception: 

616 log.debug("Failed to parse assumed JSON response: %s", response.text) 

617 return default 

618 

619 

620class VersionLookup: 

621 def __init__(self) -> None: 

622 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup") 

623 

624 @abstractmethod 

625 def lookup(self, local_image_info: DockerImageInfo, **kwargs) -> DockerImageInfo: # noqa: ANN003 

626 pass 

627 

628 

629class ContainerDistributionAPIVersionLookup(VersionLookup): 

630 def __init__(self, throttler: Throttler, cfg: RegistryConfig) -> None: 

631 self.throttler: Throttler = throttler 

632 self.cfg: RegistryConfig = cfg 

633 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup") 

634 self.api_stats = APIStatsCounter() 

635 

636 def fetch_token(self, registry: str, image_name: str) -> str | None: 

637 default_host: tuple[str, str, str, str, None] = (registry, registry, registry, TOKEN_URL_TEMPLATE, None) 

638 auth_host: str | None = REGISTRIES.get(registry, default_host)[0] 

639 if auth_host is None: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true

640 return None 

641 

642 service: str = REGISTRIES.get(registry, default_host)[2] 

643 url_template: str | None = REGISTRIES.get(registry, default_host)[3] 

644 auth_url: str | None = ( 

645 url_template.format(auth_host=auth_host, image_name=image_name, service=service) if url_template else None 

646 ) 

647 if auth_url is None: 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true

648 return None 

649 response: Response | None = fetch_url( 

650 auth_url, cache_ttl=self.cfg.token_cache_ttl, follow_redirects=True, api_stats_counter=self.api_stats 

651 ) 

652 

653 if response and response.is_success: 653 ↛ 661line 653 didn't jump to line 661 because the condition on line 653 was always true

654 api_data = httpx_json_content(response, {}) 

655 token: str | None = api_data.get("token") if api_data else None 

656 if token: 656 ↛ 658line 656 didn't jump to line 658 because the condition on line 656 was always true

657 return token 

658 self.log.warning("No token found in response for %s", auth_url) 

659 raise AuthError(f"No token found in response for {image_name}") 

660 

661 self.log.debug( 

662 "Non-success response at %s fetching token: %s", 

663 auth_url, 

664 (response and response.status_code) or None, 

665 ) 

666 if response and response.status_code == 404: 

667 self.log.debug( 

668 "Default token URL %s not found, calling /v2 endpoint to validate OCI API and provoke auth", auth_url 

669 ) 

670 response = fetch_url( 

671 f"https://{auth_host}/v2", 

672 follow_redirects=True, 

673 allow_stale=False, 

674 cache_ttl=0, 

675 api_stats_counter=self.api_stats, 

676 ) 

677 

678 if response and response.status_code == 401: 

679 auth = response.headers.get("www-authenticate") 

680 if not auth: 

681 self.log.warning("No www-authenticate header found in 401 response for %s", auth_url) 

682 raise AuthError(f"No www-authenticate header found on 401 for {image_name}") 

683 match = re.search(r'realm="([^"]+)",service="([^"]+)",scope="([^"]+)"', auth) 

684 if not match: 

685 self.log.warning("No realm/service/scope found in www-authenticate header for %s", auth_url) 

686 raise AuthError(f"No realm/service/scope found on 401 headers for {image_name}") 

687 

688 realm, service, scope = match.groups() 

689 auth_url = f"{realm}?service={service}&scope={scope}" 

690 response = fetch_url(auth_url, follow_redirects=True, api_stats_counter=self.api_stats) 

691 

692 if response and response.is_success: 

693 token_data = response.json() 

694 self.log.debug("Fetched registry token from %s", auth_url) 

695 return token_data.get("token") 

696 self.log.warning( 

697 "Alternative auth %s with status %s has no token", auth_url, (response and response.status_code) or None 

698 ) 

699 elif response: 

700 self.log.warning("Auth %s failed with status %s", auth_url, (response and response.status_code) or None) 

701 

702 raise AuthError(f"Failed to fetch token for {image_name} at {auth_url}") 

703 

704 def fetch_index( 

705 self, api_host: str, local_image_info: DockerImageInfo, token: str | None 

706 ) -> tuple[Any | None, str | None, CacheMetadata | None]: 

707 if local_image_info.tag: 707 ↛ 711line 707 didn't jump to line 711 because the condition on line 707 was always true

708 api_url: str = f"https://{api_host}/v2/{local_image_info.name}/manifests/{local_image_info.tag}" 

709 cache_ttl: int | None = self.cfg.mutable_cache_ttl 

710 else: 

711 api_url = f"https://{api_host}/v2/{local_image_info.name}/manifests/{local_image_info.pinned_digest}" 

712 cache_ttl = self.cfg.immutable_cache_ttl 

713 

714 response: Response | None = fetch_url( 

715 api_url, 

716 cache_ttl=cache_ttl, 

717 bearer_token=token, 

718 response_type=[ 

719 "application/vnd.oci.image.index.v1+json", 

720 "application/vnd.docker.distribution.manifest.list.v2+json", 

721 ], 

722 api_stats_counter=self.api_stats, 

723 ) 

724 

725 if response is None: 725 ↛ 726line 725 didn't jump to line 726 because the condition on line 725 was never true

726 self.log.warning("Empty response for manifest for image at %s", api_url) 

727 elif response.status_code == 429: 727 ↛ 728line 727 didn't jump to line 728 because the condition on line 727 was never true

728 self.throttler.throttle(local_image_info.index_name, raise_exception=True) 

729 elif not response.is_success: 

730 api_data = httpx_json_content(response, {}) 

731 self.log.warning( 

732 "Failed to fetch index from %s: %s", 

733 api_url, 

734 api_data.get("errors") if api_data else response.text, 

735 ) 

736 else: 

737 index = response.json() 

738 self.log.debug( 

739 "INDEX %s manifests, %s annotations, api: %s, header digest: %s", 

740 len(index.get("manifests", [])), 

741 len(index.get("annotations", [])), 

742 response.headers.get(HEADER_DOCKER_API, "N/A"), 

743 response.headers.get(HEADER_DOCKER_DIGEST, "N/A"), 

744 ) 

745 return index, response.headers.get(HEADER_DOCKER_DIGEST), CacheMetadata(response) 

746 return None, None, None 

747 

748 def fetch_object( 

749 self, 

750 api_host: str, 

751 local_image_info: DockerImageInfo, 

752 media_type: str, 

753 digest: str, 

754 token: str | None, 

755 follow_redirects: bool = False, 

756 api_type: str = "manifests", 

757 ) -> tuple[Any | None, CacheMetadata | None]: 

758 api_url = f"https://{api_host}/v2/{local_image_info.name}/{api_type}/{digest}" 

759 response = fetch_url( 

760 api_url, 

761 cache_ttl=self.cfg.immutable_cache_ttl, 

762 bearer_token=token, 

763 response_type=media_type, 

764 allow_stale=True, 

765 follow_redirects=follow_redirects, 

766 api_stats_counter=self.api_stats, 

767 ) 

768 

769 if response and response.is_success: 

770 obj = httpx_json_content(response, None) 

771 if obj: 771 ↛ 798line 771 didn't jump to line 798 because the condition on line 771 was always true

772 self.log.debug( 

773 "%s, header digest:%s, api: %s, %s annotations", 

774 api_type.upper(), 

775 response.headers.get(HEADER_DOCKER_DIGEST, "N/A"), 

776 response.headers.get(HEADER_DOCKER_API, "N/A"), 

777 len(obj.get("annotations", [])), 

778 ) 

779 return obj, CacheMetadata(response) 

780 elif response and response.status_code == 429: 780 ↛ 781line 780 didn't jump to line 781 because the condition on line 780 was never true

781 self.throttler.throttle(local_image_info.index_name, raise_exception=True) 

782 elif response and not response.is_success: 782 ↛ 797line 782 didn't jump to line 797 because the condition on line 782 was always true

783 api_data = httpx_json_content(response, {}) 

784 if response: 784 ↛ 792line 784 didn't jump to line 792 because the condition on line 784 was always true

785 self.log.warning( 

786 "Failed to fetch obj from %s: %s %s", 

787 api_url, 

788 response.status_code, 

789 api_data.get("errors") if api_data else response.text, 

790 ) 

791 else: 

792 self.log.warning( 

793 "Failed to fetch obj from %s: No Response, %s", api_url, api_data.get("errors") if api_data else None 

794 ) 

795 

796 else: 

797 self.log.error("Empty response from %s", api_url) 

798 return None, None 

799 

800 def lookup( 

801 self, 

802 local_image_info: DockerImageInfo, 

803 token: str | None = None, 

804 minimal: bool = False, 

805 **kwargs, # noqa: ANN003, ARG002 

806 ) -> DockerImageInfo: 

807 result: DockerImageInfo = DockerImageInfo(local_image_info.ref) 

808 if not local_image_info.name or not local_image_info.index_name: 808 ↛ 809line 808 didn't jump to line 809 because the condition on line 808 was never true

809 self.log.debug("No local pkg name or registry index name to check") 

810 return result 

811 

812 if self.throttler.check_throttle(local_image_info.index_name): 812 ↛ 813line 812 didn't jump to line 813 because the condition on line 812 was never true

813 result.throttled = True 

814 return result 

815 

816 if token: 816 ↛ 817line 816 didn't jump to line 817 because the condition on line 816 was never true

817 self.log.debug("Using provided token to fetch manifest for image %s", local_image_info.ref) 

818 else: 

819 try: 

820 token = self.fetch_token(local_image_info.index_name, local_image_info.name) 

821 except AuthError as e: 

822 self.log.warning("Authentication error prevented Docker Registry enrichment: %s", e) 

823 result.error = str(e) 

824 return result 

825 

826 index: Any | None = None 

827 index_digest: str | None = None # fetched from header, should be the image digest 

828 index_cache_metadata: CacheMetadata | None = None 

829 manifest_cache_metadata: CacheMetadata | None = None 

830 config_cache_metadata: CacheMetadata | None = None 

831 api_host: str | None = REGISTRIES.get( 

832 local_image_info.index_name, (local_image_info.index_name, local_image_info.index_name) 

833 )[1] 

834 if api_host is None: 834 ↛ 835line 834 didn't jump to line 835 because the condition on line 834 was never true

835 self.log("No API host can be determined for %s", local_image_info.index_name) 

836 return result 

837 try: 

838 index, index_digest, index_cache_metadata = self.fetch_index(api_host, local_image_info, token) 

839 except ThrottledError: 

840 result.throttled = True 

841 index = None 

842 

843 if index: 

844 result.annotations = index.get("annotations", {}) 

845 for m in index.get("manifests", []): 

846 platform_info = m.get("platform", {}) 

847 if ( 

848 platform_info.get("os") == local_image_info.os 

849 and platform_info.get("architecture") == local_image_info.arch 

850 and ("Variant" not in platform_info or platform_info.get("Variant") == local_image_info.variant) 

851 ): 

852 if index_digest: 

853 result.image_digest = index_digest 

854 result.short_digest = result.condense_digest(index_digest) 

855 self.log.debug("Setting %s image digest %s", result.name, result.short_digest) 

856 

857 digest: str | None = m.get("digest") 

858 media_type = m.get("mediaType") 

859 manifest: Any | None = None 

860 

861 if digest: 861 ↛ 869line 861 didn't jump to line 869 because the condition on line 861 was always true

862 try: 

863 manifest, manifest_cache_metadata = self.fetch_object( 

864 api_host, local_image_info, media_type, digest, token 

865 ) 

866 except ThrottledError: 

867 result.throttled = True 

868 

869 if manifest: 

870 digest = manifest.get("config", {}).get("digest") 

871 if digest is None: 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true

872 self.log.warning("Empty digest for %s %s %s", api_host, digest, media_type) 

873 else: 

874 result.repo_digest = result.condense_digest(digest, short=False) 

875 self.log.debug("Setting %s repo digest: %s", result.name, result.repo_digest) 

876 

877 if manifest.get("annotations"): 

878 result.annotations.update(manifest.get("annotations", {})) 

879 else: 

880 self.log.debug("No annotations found in manifest: %s", manifest) 

881 

882 if not minimal and manifest.get("config"): 882 ↛ 845line 882 didn't jump to line 845 because the condition on line 882 was always true

883 try: 

884 img_config, config_cache_metadata = self.fetch_object( 

885 api_host=api_host, 

886 local_image_info=local_image_info, 

887 media_type=manifest["config"].get("mediaType"), 

888 digest=manifest["config"].get("digest"), 

889 token=token, 

890 follow_redirects=True, 

891 api_type="blobs", 

892 ) 

893 if img_config: 

894 config = img_config.get("config") or img_config.get("Config") 

895 if config and "Labels" in config: 895 ↛ 897line 895 didn't jump to line 897 because the condition on line 895 was always true

896 result.annotations.update(config.get("Labels") or {}) 

897 result.annotations.update(img_config.get("annotations") or {}) 

898 result.created = config.get("created") or config.get("Created") 

899 else: 

900 self.log.debug("No config found: %s", manifest) 

901 except Exception as e: 

902 self.log.warning("Failed to extract %s image info from config: %s", local_image_info.ref, e) 

903 

904 if not result.annotations: 

905 self.log.debug("No annotations found from registry data") 

906 

907 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(local_image_info, result) 

908 result.custom = labels or {} 

909 if index_cache_metadata: 

910 result.custom["index_cache_age"] = index_cache_metadata.age 

911 if manifest_cache_metadata: 

912 result.custom["manifest_cache_age"] = manifest_cache_metadata.age 

913 if config_cache_metadata: 

914 result.custom["config_cache_age"] = config_cache_metadata.age 

915 result.version = cast("str|None", labels.get("image_version")) 

916 result.origin = "OCI_V2" if not minimal else "OCI_V2_MINIMAL" 

917 

918 self.log.debug( 

919 "OCI_V2 Lookup for %s: short_digest:%s, repo_digest:%s, version: %s", 

920 local_image_info.name, 

921 result.short_digest, 

922 result.repo_digest, 

923 result.version, 

924 ) 

925 return result 

926 

927 

928class DockerClientVersionLookup(VersionLookup): 

929 """Query remote registry via local Docker API 

930 

931 No auth needed, however uses the old v1 APIs, and only Index available via API 

932 """ 

933 

934 def __init__(self, client: docker.DockerClient, throttler: Throttler, cfg: RegistryConfig, api_backoff: int = 30) -> None: 

935 self.client: docker.DockerClient = client 

936 self.throttler: Throttler = throttler 

937 self.cfg: RegistryConfig = cfg 

938 self.api_backoff: int = api_backoff 

939 self.log: Any = structlog.get_logger().bind(integration="docker", tool="version_lookup") 

940 

941 def lookup(self, local_image_info: DockerImageInfo, retries: int = 3, **kwargs) -> DockerImageInfo: # noqa: ANN003, ARG002 

942 retries_left = retries 

943 retry_secs: int = self.api_backoff 

944 reg_data: RegistryData | None = None 

945 

946 result = DockerImageInfo(local_image_info.ref) 

947 if local_image_info.index_name is None or local_image_info.ref is None: 947 ↛ 948line 947 didn't jump to line 948 because the condition on line 947 was never true

948 return result 

949 

950 while reg_data is None and retries_left > 0: 

951 if self.throttler.check_throttle(local_image_info.index_name): 

952 result.throttled = True 

953 break 

954 try: 

955 self.log.debug("Fetching registry data", image_ref=local_image_info.ref) 

956 reg_data = self.client.images.get_registry_data(local_image_info.ref) 

957 self.log.debug( 

958 "Registry Data: id:%s,image:%s, attrs:%s", 

959 reg_data.id, 

960 reg_data.image_name, 

961 reg_data.attrs, 

962 ) 

963 if reg_data: 963 ↛ 950line 963 didn't jump to line 950 because the condition on line 963 was always true

964 result.short_digest = result.condense_digest(reg_data.short_id) 

965 result.image_digest = result.condense_digest(reg_data.id, short=False) 

966 # result.name = reg_data.image_name 

967 result.attributes = reg_data.attrs 

968 result.annotations = reg_data.attrs.get("Config", {}).get("Labels") or {} 

969 result.error = None 

970 

971 except docker.errors.APIError as e: 

972 if e.status_code == HTTPStatus.TOO_MANY_REQUESTS: 972 ↛ 981line 972 didn't jump to line 981 because the condition on line 972 was always true

973 retry_secs = round(retry_secs**1.5) 

974 try: 

975 retry_secs = int(e.response.headers.get("Retry-After", -1)) # type: ignore[union-attr] 

976 except Exception as e2: 

977 self.log.debug("Failed to access headers for retry info: %s", e2) 

978 self.throttler.throttle(local_image_info.index_name, retry_secs, e.explanation) 

979 result.throttled = True 

980 return result 

981 result.error = str(e) 

982 retries_left -= 1 

983 if retries_left == 0 or e.is_client_error(): 

984 self.log.warn("Failed to fetch registry data: [%s] %s", e.errno, e.explanation) 

985 else: 

986 self.log.debug("Failed to fetch registry data, retrying: %s", e) 

987 

988 labels: dict[str, str | float | int | bool | None] = cherrypick_annotations(local_image_info, result) 

989 result.custom = labels or {} 

990 result.version = cast("str|None", labels.get("image_version")) 

991 result.created = cast("str|None", labels.get("image_created")) 

992 result.origin = "DOCKER_CLIENT" 

993 return result