Coverage for src/updates2mqtt/helpers.py: 87%
196 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-14 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-14 15:07 +0000
1import datetime as dt
2import re
3import time
4from threading import Event
5from typing import Any
6from urllib.parse import urlparse
8import structlog
9from hishel import CacheOptions, SpecificationPolicy # pyright: ignore[reportAttributeAccessIssue]
10from hishel.httpx import SyncCacheClient
11from httpx import Response
12from tzlocal import get_localzone
14from updates2mqtt.config import Selector
16log = structlog.get_logger()
19def timestamp(time_value: float | None) -> str | None:
20 if time_value is None:
21 return None
22 try:
23 return dt.datetime.fromtimestamp(time_value, tz=get_localzone()).isoformat()
24 except: # noqa: E722
25 return None
28def humanize_timespan(span_secs: float | None) -> str:
29 if span_secs is None: 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true
30 return ""
31 days: int = int(span_secs // 86400)
32 hours: int = int((span_secs - (days * 86400)) // 3600)
33 mins: int = int((span_secs - (days * 86400) - (hours * 3600)) // 60)
34 secs: float = span_secs % 60
35 if days > 0:
36 return f"{days} days, {hours} hours"
37 if hours > 0:
38 return f"{hours} hours, {mins} mins"
39 return f"{mins} mins, {secs} secs"
42class Selection:
43 def __init__(self, selector: Selector, value: str | None) -> None:
44 self.result: bool = True
45 self.matched: str | None = None
46 if value is None:
47 self.result = selector.include is None
48 return
49 if selector.exclude is not None:
50 self.result = True
51 if any(re.search(pat, value) for pat in selector.exclude):
52 self.matched = value
53 self.result = False
54 if selector.include is not None:
55 self.result = False
56 if any(re.search(pat, value) for pat in selector.include):
57 self.matched = value
58 self.result = True
60 def __bool__(self) -> bool:
61 """Expose the actual boolean so objects can be appropriately truthy"""
62 return self.result
65class ThrottledError(Exception):
66 def __init__(self, message: str, retry_secs: int) -> None:
67 super().__init__(message)
68 self.retry_secs = retry_secs
71class Throttler:
72 DEFAULT_SITE = "DEFAULT_SITE"
74 def __init__(self, api_throttle_pause: int = 30, logger: Any | None = None, semaphore: Event | None = None) -> None:
75 self.log: Any = logger or log
76 self.pause_api_until: dict[str, float] = {}
77 self.api_throttle_pause: int = api_throttle_pause
78 self.semaphore = semaphore
80 def check_throttle(self, index_name: str | None = None) -> bool:
81 if self.semaphore and self.semaphore.is_set(): 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true
82 return True
83 index_name = index_name or self.DEFAULT_SITE
84 if self.pause_api_until.get(index_name) is not None:
85 if self.pause_api_until[index_name] < time.time():
86 del self.pause_api_until[index_name]
87 self.log.info("%s throttling wait complete", index_name)
88 else:
89 self.log.debug("%s throttling has %0.3f secs left", index_name, self.pause_api_until[index_name] - time.time())
90 return True
91 return False
93 def throttle(
94 self,
95 index_name: str | None = None,
96 retry_secs: int | None = None,
97 explanation: str | None = None,
98 raise_exception: bool = False,
99 ) -> None:
100 index_name = index_name or self.DEFAULT_SITE
101 retry_secs = retry_secs if retry_secs and retry_secs > 0 else self.api_throttle_pause
102 self.log.warn("%s throttling requests for %s seconds, %s", index_name, retry_secs, explanation)
103 self.pause_api_until[index_name] = time.time() + retry_secs
104 if raise_exception: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true
105 raise ThrottledError(explanation or f"{index_name} throttled request", retry_secs)
108class CacheMetadata:
109 """Cache metadata extracted from hishel response extensions"""
111 def __init__(self, response: Response) -> None:
112 self.from_cache: bool = response.extensions.get("hishel_from_cache", False)
113 self.revalidated: bool = response.extensions.get("hishel_revalidated", False)
114 self.created_at: float | None = response.extensions.get("hishel_created_at")
115 self.stored: bool = response.extensions.get("hishel_stored", False)
116 self.age: float | None = None
117 if self.created_at is not None:
118 self.age = time.time() - self.created_at
120 def __str__(self) -> str:
121 """Summarize in a string"""
122 return f"cached: {self.from_cache}, revalidated: {self.revalidated}, age:{self.age}, stored:{self.stored}"
125class APIStats:
126 def __init__(self) -> None:
127 self.fetches: int = 0
128 self.cached: int = 0
129 self.revalidated: int = 0
130 self.failed: dict[int, int] = {}
131 self.elapsed: float = 0
132 self.max_cache_age: float = 0
134 def tick(self, response: Response | None) -> None:
135 self.fetches += 1
136 if response is None:
137 self.failed.setdefault(0, 0)
138 self.failed[0] += 1
139 return
140 cache_metadata: CacheMetadata = CacheMetadata(response)
141 self.cached += 1 if cache_metadata.from_cache else 0
142 self.revalidated += 1 if cache_metadata.revalidated else 0
143 if response.elapsed: 143 ↛ 146line 143 didn't jump to line 146 because the condition on line 143 was always true
144 self.elapsed += response.elapsed.microseconds / 1000000
145 self.elapsed += response.elapsed.seconds
146 if not response.is_success:
147 self.failed.setdefault(response.status_code, 0)
148 self.failed[response.status_code] += 1
149 if cache_metadata.age is not None and (self.max_cache_age is None or cache_metadata.age > self.max_cache_age):
150 self.max_cache_age = cache_metadata.age
152 def hit_ratio(self) -> float:
153 return round(self.cached / self.fetches, 2) if self.cached and self.fetches else 0
155 def average_elapsed(self) -> float:
156 return round(self.elapsed / self.fetches, 2) if self.elapsed and self.fetches else 0
158 def __str__(self) -> str:
159 """Log line friendly string summary"""
160 return (
161 f"fetches: {self.fetches}, cache ratio: {self.hit_ratio():.2%}, revalidated: {self.revalidated}, "
162 + f"errors: {', '.join(f'{status_code}:{fails}' for status_code, fails in self.failed.items()) or '0'}, "
163 + f"oldest cache hit: {humanize_timespan(self.max_cache_age)}, avg elapsed: {self.average_elapsed()}s"
164 )
167class APIStatsCounter:
168 def __init__(self) -> None:
169 self.stats_report_interval: int = 100
170 self.host_stats: dict[str, APIStats] = {}
171 self.fetches: int = 0
172 self.log: Any = structlog.get_logger().bind()
174 def stats(self, url: str, response: Response | None) -> None:
175 try:
176 host: str = urlparse(url).hostname or "UNKNOWN"
177 api_stats: APIStats = self.host_stats.setdefault(host, APIStats())
178 api_stats.tick(response)
179 self.fetches += 1
180 if self.fetches % self.stats_report_interval == 0: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true
181 self.log.info(
182 "OCI_V2 API Stats Summary\n%s", "\n".join(f"{host} {stats}" for host, stats in self.host_stats.items())
183 )
184 except Exception as e:
185 self.log.warning("Failed to tick stats: %s", e)
188def fetch_url(
189 url: str,
190 cache_ttl: int | None = None, # default to server responses for cache ttl
191 bearer_token: str | None = None,
192 response_type: str | list[str] | None = None,
193 follow_redirects: bool = False,
194 allow_stale: bool = False,
195 method: str = "GET",
196 api_stats_counter: APIStatsCounter | None = None,
197) -> Response | None:
198 try:
199 headers = [("cache-control", f"max-age={cache_ttl}")]
200 if bearer_token:
201 headers.append(("Authorization", f"Bearer {bearer_token}"))
202 if response_type:
203 response_type = [response_type] if isinstance(response_type, str) else response_type
204 if response_type and isinstance(response_type, (tuple, list)): 204 ↛ 207line 204 didn't jump to line 207 because the condition on line 204 was always true
205 headers.extend(("Accept", mime_type) for mime_type in response_type)
207 cache_policy = SpecificationPolicy(
208 cache_options=CacheOptions(
209 shared=False, # Private browser cache
210 allow_stale=allow_stale,
211 )
212 )
213 log_headers: list[tuple[str, str]] = [h for h in headers if len(h) > 1 and h[0] != "Authorization"]
214 with SyncCacheClient(headers=headers, follow_redirects=follow_redirects, policy=cache_policy) as client:
215 log.debug(f"Fetching URL {url}, redirects={follow_redirects}, headers={log_headers}, cache_ttl={cache_ttl}")
216 response: Response = client.request(method=method, url=url, extensions={"hishel_ttl": cache_ttl})
217 cache_metadata: CacheMetadata = CacheMetadata(response)
218 if not response.is_success:
219 log.debug("URL %s fetch returned non-success status: %s, %s", url, response.status_code, cache_metadata.stored)
220 elif response: 220 ↛ 229line 220 didn't jump to line 229 because the condition on line 220 was always true
221 log.debug(
222 "URL response: status: %s, cached: %s, revalidated: %s, cache age: %s, stored: %s",
223 response.status_code,
224 cache_metadata.from_cache,
225 cache_metadata.revalidated,
226 humanize_timespan(cache_metadata.age),
227 cache_metadata.stored,
228 )
229 if api_stats_counter:
230 api_stats_counter.stats(url, response)
231 return response
232 except Exception as e:
233 log.debug("URL %s failed to fetch: %s", url, e)
234 if api_stats_counter:
235 api_stats_counter.stats(url, None)
236 return None
239def validate_url(url: str, cache_ttl: int = 1500) -> bool:
240 response: Response | None = fetch_url(url, method="HEAD", cache_ttl=cache_ttl, follow_redirects=True)
241 return response is not None and response.status_code != 404
244def httpx_json_content(response: Response, default: Any = None) -> Any | None:
245 if response and "json" in response.headers.get("content-type", ""):
246 try:
247 return response.json()
248 except Exception:
249 log.debug("Failed to parse JSON response: %s", response.text)
250 elif response and response.headers.get("content-type", "") == "application/octet-stream":
251 # blob could return a gzip layer tarball, however assumed only index, manifest or config requested
252 try:
253 return response.json()
254 except Exception:
255 log.debug("Failed to parse assumed JSON response: %s", response.text)
256 return default
259def sanitize_name(name: str, replacement: str = "_", max_len: int = 64) -> str:
260 """Strict sanitization that removes/replaces common problematic characters for MQTT or HA
262 - Replaces spaces with underscores
263 - Removes control characters
264 - Ensures alphanumeric safety for broader compatibility
266 Args:
267 name: The topic component string to sanitize
268 replacement: Character to replace invalid characters with (default: "_")
269 max_len: Largest acceptable name size
271 Returns:
272 Sanitized topic string safe for most MQTT brokers
274 """
275 if not name: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 raise ValueError("Name cannot be empty")
277 orig_name: str = name
278 name = re.sub(r"[^A-Za-z0-9_\-\.]+", replacement, name)
280 # Replace multiple consecutive replacement chars with single one
281 if replacement: 281 ↛ 286line 281 didn't jump to line 286 because the condition on line 281 was always true
282 pattern = re.escape(replacement) + "+"
283 name = re.sub(pattern, replacement, name)
285 # Trim to max length
286 topic_bytes = name.encode("utf-8")
287 if len(topic_bytes) > max_len: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true
288 name = topic_bytes[:max_len].decode("utf-8", errors="ignore")
290 if not name: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true
291 raise ValueError("Topic became empty after sanitization")
292 if name != orig_name:
293 log.info("Component name %s changed to %s for MQTT/HA compatibility", orig_name, name)
295 return name