Coverage for src/updates2mqtt/helpers.py: 87%

196 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-14 15:07 +0000

1import datetime as dt 

2import re 

3import time 

4from threading import Event 

5from typing import Any 

6from urllib.parse import urlparse 

7 

8import structlog 

9from hishel import CacheOptions, SpecificationPolicy # pyright: ignore[reportAttributeAccessIssue] 

10from hishel.httpx import SyncCacheClient 

11from httpx import Response 

12from tzlocal import get_localzone 

13 

14from updates2mqtt.config import Selector 

15 

16log = structlog.get_logger() 

17 

18 

19def timestamp(time_value: float | None) -> str | None: 

20 if time_value is None: 

21 return None 

22 try: 

23 return dt.datetime.fromtimestamp(time_value, tz=get_localzone()).isoformat() 

24 except: # noqa: E722 

25 return None 

26 

27 

28def humanize_timespan(span_secs: float | None) -> str: 

29 if span_secs is None: 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true

30 return "" 

31 days: int = int(span_secs // 86400) 

32 hours: int = int((span_secs - (days * 86400)) // 3600) 

33 mins: int = int((span_secs - (days * 86400) - (hours * 3600)) // 60) 

34 secs: float = span_secs % 60 

35 if days > 0: 

36 return f"{days} days, {hours} hours" 

37 if hours > 0: 

38 return f"{hours} hours, {mins} mins" 

39 return f"{mins} mins, {secs} secs" 

40 

41 

42class Selection: 

43 def __init__(self, selector: Selector, value: str | None) -> None: 

44 self.result: bool = True 

45 self.matched: str | None = None 

46 if value is None: 

47 self.result = selector.include is None 

48 return 

49 if selector.exclude is not None: 

50 self.result = True 

51 if any(re.search(pat, value) for pat in selector.exclude): 

52 self.matched = value 

53 self.result = False 

54 if selector.include is not None: 

55 self.result = False 

56 if any(re.search(pat, value) for pat in selector.include): 

57 self.matched = value 

58 self.result = True 

59 

60 def __bool__(self) -> bool: 

61 """Expose the actual boolean so objects can be appropriately truthy""" 

62 return self.result 

63 

64 

65class ThrottledError(Exception): 

66 def __init__(self, message: str, retry_secs: int) -> None: 

67 super().__init__(message) 

68 self.retry_secs = retry_secs 

69 

70 

71class Throttler: 

72 DEFAULT_SITE = "DEFAULT_SITE" 

73 

74 def __init__(self, api_throttle_pause: int = 30, logger: Any | None = None, semaphore: Event | None = None) -> None: 

75 self.log: Any = logger or log 

76 self.pause_api_until: dict[str, float] = {} 

77 self.api_throttle_pause: int = api_throttle_pause 

78 self.semaphore = semaphore 

79 

80 def check_throttle(self, index_name: str | None = None) -> bool: 

81 if self.semaphore and self.semaphore.is_set(): 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true

82 return True 

83 index_name = index_name or self.DEFAULT_SITE 

84 if self.pause_api_until.get(index_name) is not None: 

85 if self.pause_api_until[index_name] < time.time(): 

86 del self.pause_api_until[index_name] 

87 self.log.info("%s throttling wait complete", index_name) 

88 else: 

89 self.log.debug("%s throttling has %0.3f secs left", index_name, self.pause_api_until[index_name] - time.time()) 

90 return True 

91 return False 

92 

93 def throttle( 

94 self, 

95 index_name: str | None = None, 

96 retry_secs: int | None = None, 

97 explanation: str | None = None, 

98 raise_exception: bool = False, 

99 ) -> None: 

100 index_name = index_name or self.DEFAULT_SITE 

101 retry_secs = retry_secs if retry_secs and retry_secs > 0 else self.api_throttle_pause 

102 self.log.warn("%s throttling requests for %s seconds, %s", index_name, retry_secs, explanation) 

103 self.pause_api_until[index_name] = time.time() + retry_secs 

104 if raise_exception: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 raise ThrottledError(explanation or f"{index_name} throttled request", retry_secs) 

106 

107 

108class CacheMetadata: 

109 """Cache metadata extracted from hishel response extensions""" 

110 

111 def __init__(self, response: Response) -> None: 

112 self.from_cache: bool = response.extensions.get("hishel_from_cache", False) 

113 self.revalidated: bool = response.extensions.get("hishel_revalidated", False) 

114 self.created_at: float | None = response.extensions.get("hishel_created_at") 

115 self.stored: bool = response.extensions.get("hishel_stored", False) 

116 self.age: float | None = None 

117 if self.created_at is not None: 

118 self.age = time.time() - self.created_at 

119 

120 def __str__(self) -> str: 

121 """Summarize in a string""" 

122 return f"cached: {self.from_cache}, revalidated: {self.revalidated}, age:{self.age}, stored:{self.stored}" 

123 

124 

125class APIStats: 

126 def __init__(self) -> None: 

127 self.fetches: int = 0 

128 self.cached: int = 0 

129 self.revalidated: int = 0 

130 self.failed: dict[int, int] = {} 

131 self.elapsed: float = 0 

132 self.max_cache_age: float = 0 

133 

134 def tick(self, response: Response | None) -> None: 

135 self.fetches += 1 

136 if response is None: 

137 self.failed.setdefault(0, 0) 

138 self.failed[0] += 1 

139 return 

140 cache_metadata: CacheMetadata = CacheMetadata(response) 

141 self.cached += 1 if cache_metadata.from_cache else 0 

142 self.revalidated += 1 if cache_metadata.revalidated else 0 

143 if response.elapsed: 143 ↛ 146line 143 didn't jump to line 146 because the condition on line 143 was always true

144 self.elapsed += response.elapsed.microseconds / 1000000 

145 self.elapsed += response.elapsed.seconds 

146 if not response.is_success: 

147 self.failed.setdefault(response.status_code, 0) 

148 self.failed[response.status_code] += 1 

149 if cache_metadata.age is not None and (self.max_cache_age is None or cache_metadata.age > self.max_cache_age): 

150 self.max_cache_age = cache_metadata.age 

151 

152 def hit_ratio(self) -> float: 

153 return round(self.cached / self.fetches, 2) if self.cached and self.fetches else 0 

154 

155 def average_elapsed(self) -> float: 

156 return round(self.elapsed / self.fetches, 2) if self.elapsed and self.fetches else 0 

157 

158 def __str__(self) -> str: 

159 """Log line friendly string summary""" 

160 return ( 

161 f"fetches: {self.fetches}, cache ratio: {self.hit_ratio():.2%}, revalidated: {self.revalidated}, " 

162 + f"errors: {', '.join(f'{status_code}:{fails}' for status_code, fails in self.failed.items()) or '0'}, " 

163 + f"oldest cache hit: {humanize_timespan(self.max_cache_age)}, avg elapsed: {self.average_elapsed()}s" 

164 ) 

165 

166 

167class APIStatsCounter: 

168 def __init__(self) -> None: 

169 self.stats_report_interval: int = 100 

170 self.host_stats: dict[str, APIStats] = {} 

171 self.fetches: int = 0 

172 self.log: Any = structlog.get_logger().bind() 

173 

174 def stats(self, url: str, response: Response | None) -> None: 

175 try: 

176 host: str = urlparse(url).hostname or "UNKNOWN" 

177 api_stats: APIStats = self.host_stats.setdefault(host, APIStats()) 

178 api_stats.tick(response) 

179 self.fetches += 1 

180 if self.fetches % self.stats_report_interval == 0: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true

181 self.log.info( 

182 "OCI_V2 API Stats Summary\n%s", "\n".join(f"{host} {stats}" for host, stats in self.host_stats.items()) 

183 ) 

184 except Exception as e: 

185 self.log.warning("Failed to tick stats: %s", e) 

186 

187 

188def fetch_url( 

189 url: str, 

190 cache_ttl: int | None = None, # default to server responses for cache ttl 

191 bearer_token: str | None = None, 

192 response_type: str | list[str] | None = None, 

193 follow_redirects: bool = False, 

194 allow_stale: bool = False, 

195 method: str = "GET", 

196 api_stats_counter: APIStatsCounter | None = None, 

197) -> Response | None: 

198 try: 

199 headers = [("cache-control", f"max-age={cache_ttl}")] 

200 if bearer_token: 

201 headers.append(("Authorization", f"Bearer {bearer_token}")) 

202 if response_type: 

203 response_type = [response_type] if isinstance(response_type, str) else response_type 

204 if response_type and isinstance(response_type, (tuple, list)): 204 ↛ 207line 204 didn't jump to line 207 because the condition on line 204 was always true

205 headers.extend(("Accept", mime_type) for mime_type in response_type) 

206 

207 cache_policy = SpecificationPolicy( 

208 cache_options=CacheOptions( 

209 shared=False, # Private browser cache 

210 allow_stale=allow_stale, 

211 ) 

212 ) 

213 log_headers: list[tuple[str, str]] = [h for h in headers if len(h) > 1 and h[0] != "Authorization"] 

214 with SyncCacheClient(headers=headers, follow_redirects=follow_redirects, policy=cache_policy) as client: 

215 log.debug(f"Fetching URL {url}, redirects={follow_redirects}, headers={log_headers}, cache_ttl={cache_ttl}") 

216 response: Response = client.request(method=method, url=url, extensions={"hishel_ttl": cache_ttl}) 

217 cache_metadata: CacheMetadata = CacheMetadata(response) 

218 if not response.is_success: 

219 log.debug("URL %s fetch returned non-success status: %s, %s", url, response.status_code, cache_metadata.stored) 

220 elif response: 220 ↛ 229line 220 didn't jump to line 229 because the condition on line 220 was always true

221 log.debug( 

222 "URL response: status: %s, cached: %s, revalidated: %s, cache age: %s, stored: %s", 

223 response.status_code, 

224 cache_metadata.from_cache, 

225 cache_metadata.revalidated, 

226 humanize_timespan(cache_metadata.age), 

227 cache_metadata.stored, 

228 ) 

229 if api_stats_counter: 

230 api_stats_counter.stats(url, response) 

231 return response 

232 except Exception as e: 

233 log.debug("URL %s failed to fetch: %s", url, e) 

234 if api_stats_counter: 

235 api_stats_counter.stats(url, None) 

236 return None 

237 

238 

239def validate_url(url: str, cache_ttl: int = 1500) -> bool: 

240 response: Response | None = fetch_url(url, method="HEAD", cache_ttl=cache_ttl, follow_redirects=True) 

241 return response is not None and response.status_code != 404 

242 

243 

244def httpx_json_content(response: Response, default: Any = None) -> Any | None: 

245 if response and "json" in response.headers.get("content-type", ""): 

246 try: 

247 return response.json() 

248 except Exception: 

249 log.debug("Failed to parse JSON response: %s", response.text) 

250 elif response and response.headers.get("content-type", "") == "application/octet-stream": 

251 # blob could return a gzip layer tarball, however assumed only index, manifest or config requested 

252 try: 

253 return response.json() 

254 except Exception: 

255 log.debug("Failed to parse assumed JSON response: %s", response.text) 

256 return default 

257 

258 

259def sanitize_name(name: str, replacement: str = "_", max_len: int = 64) -> str: 

260 """Strict sanitization that removes/replaces common problematic characters for MQTT or HA 

261 

262 - Replaces spaces with underscores 

263 - Removes control characters 

264 - Ensures alphanumeric safety for broader compatibility 

265 

266 Args: 

267 name: The topic component string to sanitize 

268 replacement: Character to replace invalid characters with (default: "_") 

269 max_len: Largest acceptable name size 

270 

271 Returns: 

272 Sanitized topic string safe for most MQTT brokers 

273 

274 """ 

275 if not name: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 raise ValueError("Name cannot be empty") 

277 orig_name: str = name 

278 name = re.sub(r"[^A-Za-z0-9_\-\.]+", replacement, name) 

279 

280 # Replace multiple consecutive replacement chars with single one 

281 if replacement: 281 ↛ 286line 281 didn't jump to line 286 because the condition on line 281 was always true

282 pattern = re.escape(replacement) + "+" 

283 name = re.sub(pattern, replacement, name) 

284 

285 # Trim to max length 

286 topic_bytes = name.encode("utf-8") 

287 if len(topic_bytes) > max_len: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true

288 name = topic_bytes[:max_len].decode("utf-8", errors="ignore") 

289 

290 if not name: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 raise ValueError("Topic became empty after sanitization") 

292 if name != orig_name: 

293 log.info("Component name %s changed to %s for MQTT/HA compatibility", orig_name, name) 

294 

295 return name