Coverage for src / updates2mqtt / cli.py: 0%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-20 23:13 +0000

1import json 

2from collections.abc import AsyncGenerator 

3from typing import TYPE_CHECKING 

4 

5import structlog 

6from omegaconf import DictConfig, OmegaConf 

7from rich import print_json 

8from rich.console import Console 

9 

10from updates2mqtt.config import DockerConfig, GitHubConfig, NodeConfig, RegistryConfig 

11from updates2mqtt.helpers import Throttler 

12from updates2mqtt.integrations.docker import DockerProvider 

13from updates2mqtt.integrations.docker_enrich import ( 

14 REGISTRIES, 

15 ContainerDistributionAPIVersionLookup, 

16 DockerImageInfo, 

17 fetch_url, 

18) 

19from updates2mqtt.model import Discovery 

20 

21if TYPE_CHECKING: 

22 from httpx import Response 

23 

24log = structlog.get_logger() 

25 

26 

27HELP = """ 

28Super simple CLI 

29 

30Command can be `container`,`tags`,`manifest` or `blob` 

31 

32* `container=container-name` 

33* `container=hash` 

34* `dump=csv` 

35* `dump=json` 

36* `dump=json container=frigate` 

37* `tags=ghcr.io/ 

38* `blob=mcr.microsoft.com/dotnet/sdk:latest` 

39* `tags=quay.io/linuxserver.io/babybuddy` 

40* `blob=ghcr.io/blakeblackshear/frigate@sha256:759c36ee869e3e60258350a2e221eae1a4ba1018613e0334f1bc84eb09c4bbbc` 

41 

42In addition, a `log_level=DEBUG` or other level can be added, `github_token` to try a personal access 

43token for GitHub release info retrieval, or `api=docker_client` to use the older API (defaults to `api=OCI_V2`) 

44 

45 

46""" 

47 

48OCI_MANIFEST_TYPES: list[str] = [ 

49 "application/vnd.oci.image.manifest.v1+json", 

50 "application/vnd.oci.image.index.v1+json", 

51 "application/vnd.oci.descriptor.v1+json", 

52 "application/vnd.oci.empty.v1+json", 

53] 

54 

55OCI_CONFIG_TYPES: list[str] = [ 

56 "application/vnd.oci.image.config.v1+json", 

57] 

58 

59OCI_LAYER_TYPES: list[str] = [ 

60 "application/vnd.oci.image.layer.v1.tar", 

61 "application/vnd.oci.image.layer.v1.tar+gzip", 

62 "application/vnd.oci.image.layer.v1.tar+zstd", 

63] 

64 

65OCI_NONDISTRIBUTABLE_LAYER_TYPES: list[str] = [ 

66 "application/vnd.oci.image.layer.nondistributable.v1.tar", 

67 "application/vnd.oci.image.layer.nondistributable.v1.tar+gzip", 

68 "application/vnd.oci.image.layer.nondistributable.v1.tar+zstd", 

69] 

70 

71# Docker Compatibility MIME Types 

72DOCKER_MANIFEST_TYPES: list[str] = [ 

73 "application/vnd.docker.distribution.manifest.v2+json", 

74 "application/vnd.docker.distribution.manifest.list.v2+json", 

75 "application/vnd.docker.distribution.manifest.v1+json", 

76 "application/vnd.docker.distribution.manifest.v1+prettyjws", 

77] 

78 

79DOCKER_CONFIG_TYPES: list[str] = [ 

80 "application/vnd.docker.container.image.v1+json", 

81] 

82 

83DOCKER_LAYER_TYPES: list[str] = [ 

84 "application/vnd.docker.image.rootfs.diff.tar.gzip", 

85 "application/vnd.docker.image.rootfs.foreign.diff.tar.gzip", 

86] 

87 

88# Combined constants 

89ALL_MANIFEST_TYPES: list[str] = OCI_MANIFEST_TYPES + DOCKER_MANIFEST_TYPES 

90ALL_CONFIG_TYPES: list[str] = OCI_CONFIG_TYPES + DOCKER_CONFIG_TYPES 

91ALL_LAYER_TYPES: list[str] = OCI_LAYER_TYPES + OCI_NONDISTRIBUTABLE_LAYER_TYPES + DOCKER_LAYER_TYPES 

92 

93# All content types that might be returned by the API 

94ALL_OCI_MEDIA_TYPES: list[str] = ( 

95 ALL_MANIFEST_TYPES 

96 + ALL_CONFIG_TYPES 

97 + ALL_LAYER_TYPES 

98 + ["application/octet-stream", "application/json"] # Error responses 

99) 

100 

101 

102def dump_url(doc_type: str, img_ref: str, cli_conf: DictConfig) -> None: 

103 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(cli_conf.get("log_level", "ERROR"))) 

104 

105 lookup = ContainerDistributionAPIVersionLookup(Throttler(), RegistryConfig()) 

106 img_info = DockerImageInfo(img_ref) 

107 if not img_info.index_name or not img_info.name: 

108 log.error("Unable to parse %ss", img_ref) 

109 return 

110 

111 api_host: str | None = REGISTRIES.get(img_info.index_name, (img_info.index_name, img_info.index_name))[1] 

112 

113 if doc_type == "blob": 

114 if not img_info.pinned_digest: 

115 log.warning("No digest found in %s", img_ref) 

116 return 

117 url: str = f"https://{api_host}/v2/{img_info.name}/blobs/{img_info.pinned_digest}" 

118 elif doc_type == "manifest": 

119 if not img_info.tag_or_digest: 

120 log.warning("No tag or digest found in %s", img_ref) 

121 return 

122 url = f"https://{api_host}/v2/{img_info.name}/manifests/{img_info.tag_or_digest}" 

123 elif doc_type == "tags": 

124 url = f"https://{api_host}/v2/{img_info.name}/tags/list" 

125 else: 

126 return 

127 

128 token: str | None = lookup.fetch_token(img_info.index_name, img_info.name) 

129 

130 response: Response | None = fetch_url(url, bearer_token=token, follow_redirects=True, response_type=ALL_OCI_MEDIA_TYPES) 

131 if response and response.is_error: 

132 log.warning(f"{response.status_code}: {url}") 

133 log.warning(response.text) 

134 elif response and response.is_success: 

135 log.debug(f"{response.status_code}: {url}") 

136 log.debug("HEADERS") 

137 for k, v in response.headers.items(): 

138 log.debug(f"{k}: {v}") 

139 log.debug("CONTENTS") 

140 

141 print_json(response.text) 

142 

143 

144def docker_provider(cli_conf: DictConfig) -> DockerProvider: 

145 docker_scanner = DockerProvider( 

146 DockerConfig(registry=RegistryConfig(api=cli_conf.get("api", "OCI_V2"))), 

147 NodeConfig(), 

148 packages={}, 

149 github_cfg=GitHubConfig(access_token=cli_conf.get("github_token")), 

150 self_bounce=None, 

151 ) 

152 docker_scanner.initialize() 

153 return docker_scanner 

154 

155 

156async def dump(fmt: str, cli_conf: DictConfig) -> None: 

157 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(cli_conf.get("log_level", "ERROR"))) 

158 console = Console() 

159 docker_scanner: DockerProvider = docker_provider(cli_conf) 

160 if cli_conf.get("container"): 

161 

162 async def single_discovery() -> AsyncGenerator[Discovery]: 

163 result = docker_scanner.rescan(Discovery(docker_scanner, cli_conf["container"], "cli", "manual")) 

164 if result: 

165 yield result 

166 

167 source: AsyncGenerator[Discovery] = single_discovery() 

168 else: 

169 source = docker_scanner.scan("cli", False) 

170 

171 if fmt == "csv": 

172 console.print( 

173 ",".join( 

174 f'"{v}"' 

175 for v in ( 

176 "name", 

177 "ref", 

178 "registry", 

179 "installed_version", 

180 "latest_version", 

181 "version_basis", 

182 "title", 

183 "can_update", 

184 "can_build", 

185 "can_restart", 

186 "update_type", 

187 "source", 

188 "throttled", 

189 ) 

190 ), 

191 style="bold white on black", 

192 ) 

193 async for discovery in source: 

194 v = discovery.as_dict() 

195 console.print( 

196 ",".join( 

197 f'"{v}"' 

198 for v in ( 

199 v["name"], 

200 v["current_detail"].get("image_ref"), # type: ignore[union-attr] 

201 v["current_detail"].get("index_name"), # type: ignore[union-attr] 

202 v["installed_version"], 

203 v["latest_version"], 

204 v["version_basis"], 

205 v["title"], 

206 v["can_update"], 

207 v["can_build"], 

208 v["can_restart"], 

209 v["update_type"], 

210 v.get("release", {}).get("source"), # type: ignore[union-attr] 

211 v.get("last_scan", {}).get("throttled"), # type: ignore[union-attr] 

212 ) 

213 ) 

214 ) 

215 elif fmt == "json": 

216 print_json(json.dumps([v.as_dict() async for v in source])) 

217 else: 

218 log.warning(f"Unsupported dump format {fmt}") 

219 

220 

221def main() -> None: 

222 # will be a proper cli someday 

223 cli_conf: DictConfig = OmegaConf.from_cli() 

224 

225 if "help" in cli_conf or "--help" in cli_conf: 

226 log.info(HELP) 

227 elif cli_conf.get("blob"): 

228 dump_url("blob", cli_conf.get("blob"), cli_conf) 

229 elif cli_conf.get("manifest"): 

230 dump_url("manifest", cli_conf.get("manifest"), cli_conf) 

231 elif cli_conf.get("tags"): 

232 dump_url("tags", cli_conf.get("tags"), cli_conf) 

233 elif cli_conf.get("dump"): 

234 import asyncio 

235 

236 asyncio.run(dump(cli_conf.get("dump"), cli_conf)) 

237 

238 else: 

239 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(cli_conf.get("log_level", "INFO"))) 

240 

241 docker_scanner = docker_provider(cli_conf) 

242 discovery: Discovery | None = docker_scanner.rescan( 

243 Discovery(docker_scanner, cli_conf.get("container", "frigate"), "cli", "manual") 

244 ) 

245 if discovery: 

246 log.info(discovery.as_dict()) 

247 

248 

249if __name__ == "__main__": 

250 main()