Coverage for src / updates2mqtt / cli.py: 0%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 23:58 +0000

1import json 

2from typing import TYPE_CHECKING 

3 

4import structlog 

5from omegaconf import DictConfig, OmegaConf 

6from rich import print_json 

7from rich.console import Console 

8 

9from updates2mqtt.config import DockerConfig, GitHubConfig, NodeConfig, RegistryConfig 

10from updates2mqtt.helpers import Throttler 

11from updates2mqtt.integrations.docker import DockerProvider 

12from updates2mqtt.integrations.docker_enrich import ( 

13 REGISTRIES, 

14 ContainerDistributionAPIVersionLookup, 

15 DockerImageInfo, 

16 fetch_url, 

17) 

18from updates2mqtt.model import Discovery 

19 

20if TYPE_CHECKING: 

21 from httpx import Response 

22 

23log = structlog.get_logger() 

24 

25 

26HELP = """ 

27Super simple CLI 

28 

29Command can be `container`,`tags`,`manifest` or `blob` 

30 

31* `container=container-name` 

32* `container=hash` 

33* `dump=csv` 

34* `dump=json` 

35* `tags=ghcr.io/ 

36* `blob=mcr.microsoft.com/dotnet/sdk:latest` 

37* `tags=quay.io/linuxserver.io/babybuddy` 

38* `blob=ghcr.io/blakeblackshear/frigate@sha256:759c36ee869e3e60258350a2e221eae1a4ba1018613e0334f1bc84eb09c4bbbc` 

39 

40In addition, a `log_level=DEBUG` or other level can be added, `github_token` to try a personal access 

41token for GitHub release info retrieval, or `api=docker_client` to use the older API (defaults to `api=OCI_V2`) 

42 

43 

44""" 

45 

46OCI_MANIFEST_TYPES: list[str] = [ 

47 "application/vnd.oci.image.manifest.v1+json", 

48 "application/vnd.oci.image.index.v1+json", 

49 "application/vnd.oci.descriptor.v1+json", 

50 "application/vnd.oci.empty.v1+json", 

51] 

52 

53OCI_CONFIG_TYPES: list[str] = [ 

54 "application/vnd.oci.image.config.v1+json", 

55] 

56 

57OCI_LAYER_TYPES: list[str] = [ 

58 "application/vnd.oci.image.layer.v1.tar", 

59 "application/vnd.oci.image.layer.v1.tar+gzip", 

60 "application/vnd.oci.image.layer.v1.tar+zstd", 

61] 

62 

63OCI_NONDISTRIBUTABLE_LAYER_TYPES: list[str] = [ 

64 "application/vnd.oci.image.layer.nondistributable.v1.tar", 

65 "application/vnd.oci.image.layer.nondistributable.v1.tar+gzip", 

66 "application/vnd.oci.image.layer.nondistributable.v1.tar+zstd", 

67] 

68 

69# Docker Compatibility MIME Types 

70DOCKER_MANIFEST_TYPES: list[str] = [ 

71 "application/vnd.docker.distribution.manifest.v2+json", 

72 "application/vnd.docker.distribution.manifest.list.v2+json", 

73 "application/vnd.docker.distribution.manifest.v1+json", 

74 "application/vnd.docker.distribution.manifest.v1+prettyjws", 

75] 

76 

77DOCKER_CONFIG_TYPES: list[str] = [ 

78 "application/vnd.docker.container.image.v1+json", 

79] 

80 

81DOCKER_LAYER_TYPES: list[str] = [ 

82 "application/vnd.docker.image.rootfs.diff.tar.gzip", 

83 "application/vnd.docker.image.rootfs.foreign.diff.tar.gzip", 

84] 

85 

86# Combined constants 

87ALL_MANIFEST_TYPES: list[str] = OCI_MANIFEST_TYPES + DOCKER_MANIFEST_TYPES 

88ALL_CONFIG_TYPES: list[str] = OCI_CONFIG_TYPES + DOCKER_CONFIG_TYPES 

89ALL_LAYER_TYPES: list[str] = OCI_LAYER_TYPES + OCI_NONDISTRIBUTABLE_LAYER_TYPES + DOCKER_LAYER_TYPES 

90 

91# All content types that might be returned by the API 

92ALL_OCI_MEDIA_TYPES: list[str] = ( 

93 ALL_MANIFEST_TYPES 

94 + ALL_CONFIG_TYPES 

95 + ALL_LAYER_TYPES 

96 + ["application/octet-stream", "application/json"] # Error responses 

97) 

98 

99 

100def dump_url(doc_type: str, img_ref: str, cli_conf: DictConfig) -> None: 

101 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(cli_conf.get("log_level", "ERROR"))) 

102 

103 lookup = ContainerDistributionAPIVersionLookup(Throttler(), RegistryConfig()) 

104 img_info = DockerImageInfo(img_ref) 

105 if not img_info.index_name or not img_info.name: 

106 log.error("Unable to parse %ss", img_ref) 

107 return 

108 

109 api_host: str | None = REGISTRIES.get(img_info.index_name, (img_info.index_name, img_info.index_name))[1] 

110 

111 if doc_type == "blob": 

112 if not img_info.pinned_digest: 

113 log.warning("No digest found in %s", img_ref) 

114 return 

115 url: str = f"https://{api_host}/v2/{img_info.name}/blobs/{img_info.pinned_digest}" 

116 elif doc_type == "manifest": 

117 if not img_info.tag_or_digest: 

118 log.warning("No tag or digest found in %s", img_ref) 

119 return 

120 url = f"https://{api_host}/v2/{img_info.name}/manifests/{img_info.tag_or_digest}" 

121 elif doc_type == "tags": 

122 url = f"https://{api_host}/v2/{img_info.name}/tags/list" 

123 else: 

124 return 

125 

126 token: str | None = lookup.fetch_token(img_info.index_name, img_info.name) 

127 

128 response: Response | None = fetch_url(url, bearer_token=token, follow_redirects=True, response_type=ALL_OCI_MEDIA_TYPES) 

129 if response and response.is_error: 

130 log.warning(f"{response.status_code}: {url}") 

131 log.warning(response.text) 

132 elif response and response.is_success: 

133 log.debug(f"{response.status_code}: {url}") 

134 log.debug("HEADERS") 

135 for k, v in response.headers.items(): 

136 log.debug(f"{k}: {v}") 

137 log.debug("CONTENTS") 

138 

139 print_json(response.text) 

140 

141 

142def docker_provider(cli_conf: DictConfig) -> DockerProvider: 

143 docker_scanner = DockerProvider( 

144 DockerConfig(registry=RegistryConfig(api=cli_conf.get("api", "OCI_V2"))), 

145 NodeConfig(), 

146 packages={}, 

147 github_cfg=GitHubConfig(access_token=cli_conf.get("github_token")), 

148 self_bounce=None, 

149 ) 

150 docker_scanner.initialize() 

151 return docker_scanner 

152 

153 

154async def dump(fmt: str, cli_conf: DictConfig) -> None: 

155 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(cli_conf.get("log_level", "ERROR"))) 

156 console = Console() 

157 docker_scanner: DockerProvider = docker_provider(cli_conf) 

158 if fmt == "csv": 

159 console.print( 

160 ",".join( 

161 f'"{v}"' 

162 for v in ( 

163 "name", 

164 "ref", 

165 "registry", 

166 "installed_version", 

167 "latest_version", 

168 "version_basis", 

169 "title", 

170 "can_update", 

171 "can_build", 

172 "can_restart", 

173 "update_type", 

174 "source", 

175 "throttled", 

176 ) 

177 ), 

178 style="bold white on black", 

179 ) 

180 async for discovery in docker_scanner.scan("cli", False): 

181 v = discovery.as_dict() 

182 console.print( 

183 ",".join( 

184 f'"{v}"' 

185 for v in ( 

186 v["name"], 

187 v["current_detail"].get("image_ref"), # type: ignore[union-attr] 

188 v["current_detail"].get("index_name"), # type: ignore[union-attr] 

189 v["installed_version"], 

190 v["latest_version"], 

191 v["version_basis"], 

192 v["title"], 

193 v["can_update"], 

194 v["can_build"], 

195 v["can_restart"], 

196 v["update_type"], 

197 v.get("release", {}).get("source"), # type: ignore[union-attr] 

198 v.get("last_scan", {}).get("throttled"), # type: ignore[union-attr] 

199 ) 

200 ) 

201 ) 

202 elif fmt == "json": 

203 print_json(json.dumps([v.as_dict() async for v in docker_scanner.scan("cli", False)])) 

204 else: 

205 log.warning(f"Unsupported dump format {fmt}") 

206 

207 

208def main() -> None: 

209 # will be a proper cli someday 

210 cli_conf: DictConfig = OmegaConf.from_cli() 

211 

212 if "help" in cli_conf or "--help" in cli_conf: 

213 log.info(HELP) 

214 elif cli_conf.get("blob"): 

215 dump_url("blob", cli_conf.get("blob"), cli_conf) 

216 elif cli_conf.get("manifest"): 

217 dump_url("manifest", cli_conf.get("manifest"), cli_conf) 

218 elif cli_conf.get("tags"): 

219 dump_url("tags", cli_conf.get("tags"), cli_conf) 

220 elif cli_conf.get("dump"): 

221 import asyncio 

222 

223 asyncio.run(dump(cli_conf.get("dump"), cli_conf)) 

224 

225 else: 

226 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(cli_conf.get("log_level", "INFO"))) 

227 

228 docker_scanner = docker_provider(cli_conf) 

229 discovery: Discovery | None = docker_scanner.rescan( 

230 Discovery(docker_scanner, cli_conf.get("container", "frigate"), "cli", "manual") 

231 ) 

232 if discovery: 

233 log.info(discovery.as_dict()) 

234 

235 

236if __name__ == "__main__": 

237 main()