Coverage for src/updates2mqtt/app.py: 80%
155 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-14 15:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-14 15:07 +0000
1import asyncio
2import logging
3import sys
4import time
5import uuid
6from collections.abc import Callable
7from datetime import UTC, datetime
8from pathlib import Path
9from threading import Event
10from typing import Any
12import structlog
13import structlog.dev
15import updates2mqtt
16from updates2mqtt.model import Discovery, ReleaseProvider
18from .config import Config, PublishPolicy, UpdatePolicy, load_app_config
19from .integrations.docker import DockerProvider
20from .mqtt import MqttPublisher
22log = structlog.get_logger()
24CONF_FILE = Path("conf/config.yaml")
25UPDATE_INTERVAL = 60 * 60 * 4
27# #TODO:
28# - Set install in progress
29# - Support apt
30# - Retry on registry fetch fail
31# - Fetcher in subproc or thread
32# - Clear command message after install
33# - use git hash as alt to img ref for builds, or daily builds
36class App:
37 def __init__(self) -> None:
38 self.startup_timestamp: str = datetime.now(UTC).isoformat()
39 self.last_scan_timestamp: str | None = None
40 app_config: Config | None = load_app_config(CONF_FILE)
41 if app_config is None: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 log.error(f"Invalid configuration at {CONF_FILE}")
43 log.error("Edit config to fix missing or invalid values and restart")
44 log.error("Alternately supply correct MQTT_HOST,MQTT_USER,MQTT_PASSWORD environment variables")
45 log.error("Exiting app")
46 sys.exit(1)
47 self.cfg: Config = app_config
48 self.self_bounce: Event = Event()
50 if not self.cfg.log.json or sys.stderr.isatty(): 50 ↛ 62line 50 didn't jump to line 62 because the condition on line 50 was always true
51 # Pretty printing when we run in a terminal session.
52 # Automatically prints pretty tracebacks when "rich" is installed
53 renderers: list[Any] = [
54 structlog.dev.ConsoleRenderer(
55 colors=sys.stderr.isatty(),
56 exception_formatter=(
57 structlog.dev.RichTracebackFormatter() if sys.stderr.isatty() else structlog.dev.plain_traceback
58 ),
59 ),
60 ]
61 else:
62 renderers = [
63 structlog.processors.dict_tracebacks,
64 structlog.processors.JSONRenderer(),
65 ]
66 structlog.configure(
67 cache_logger_on_first_use=True,
68 wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(self.cfg.log.level))),
69 processors=[
70 structlog.contextvars.merge_contextvars,
71 structlog.processors.add_log_level,
72 structlog.processors.StackInfoRenderer(),
73 structlog.dev.set_exc_info,
74 structlog.processors.TimeStamper(fmt="iso"),
75 *renderers,
76 ],
77 )
79 log.debug("Logging initialized", level=self.cfg.log.level, json=self.cfg.log.json, tty=sys.stderr.isatty())
81 self.publisher = MqttPublisher(self.cfg.mqtt, self.cfg.node, self.cfg.homeassistant)
83 self.scanners: list[ReleaseProvider] = []
84 self.scan_count: int = 0
85 self.last_scan: str | None = None
86 if self.cfg.docker.enabled: 86 ↛ 96line 86 didn't jump to line 96 because the condition on line 86 was always true
87 self.scanners.append(
88 DockerProvider(
89 self.cfg.docker,
90 self.cfg.node,
91 packages=self.cfg.packages,
92 github_cfg=self.cfg.github,
93 self_bounce=self.self_bounce,
94 )
95 )
96 self.stopped = Event()
97 self.heartbeat_topic = self.cfg.node.healthcheck.topic_template.format(node_name=self.cfg.node.name)
99 log.info(
100 "App configured",
101 node=self.cfg.node.name,
102 scan_interval=self.cfg.scan_interval,
103 heartbeat_topic=self.heartbeat_topic,
104 )
106 async def scan(self) -> None:
107 session = uuid.uuid4().hex
108 for scanner in self.scanners:
109 slog = log.bind(source_type=scanner.source_type, session=session)
110 if self.stopped.is_set(): 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true
111 break
112 slog.info("Scanning ...")
113 async for discovery in scanner.scan(session):
114 await self.on_discovery(discovery)
115 if self.stopped.is_set(): 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 slog.debug("Breaking scan loop on stopped event")
117 break
118 await self.publisher.clean_topics(scanner)
119 self.scan_count += 1
120 slog.info(f"Scan #{self.scan_count} complete")
121 self.last_scan_timestamp = datetime.now(UTC).isoformat()
123 async def main_loop(self) -> None:
124 log.debug("Starting run loop")
125 self.publisher.start()
127 if self.cfg.node.healthcheck.enabled: 127 ↛ 134line 127 didn't jump to line 134 because the condition on line 127 was always true
128 await self.heartbeat() # initial eager heartbeat
129 log.info(f"Setting up heartbeat every {self.cfg.node.healthcheck.interval} seconds to topic {self.heartbeat_topic}")
130 self.heartbeat_loop_task = asyncio.create_task(
131 repeated_call(self.heartbeat, interval=self.cfg.node.healthcheck.interval), name="heartbeat"
132 )
134 for scanner in self.scanners:
135 scanner.initialize()
136 self.publisher.subscribe_hass_command(scanner)
137 await self.publisher.clean_topics(scanner, initial=True)
139 while not self.stopped.is_set() and self.publisher.is_available(): 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 await self.scan()
141 if not self.stopped.is_set() and self.publisher.is_available():
142 await asyncio.sleep(self.cfg.scan_interval)
143 else:
144 log.info("Stop requested, exiting run loop and skipping sleep")
146 if not self.publisher.is_available(): 146 ↛ 150line 146 didn't jump to line 150 because the condition on line 146 was always true
147 log.error("MQTT fatal connection error - check host,port,user,password in config")
148 self.shutdown(exit_code=1)
150 log.debug("Exiting run loop")
152 async def on_discovery(self, discovery: Discovery) -> None:
153 dlog = log.bind(name=discovery.name)
154 try:
155 if discovery.publish_policy == PublishPolicy.HOMEASSISTANT and self.cfg.homeassistant.discovery.enabled:
156 # Switch off MQTT discovery if not Home Assistant enabled
157 self.publisher.publish_hass_config(discovery)
158 if discovery.publish_policy == PublishPolicy.HOMEASSISTANT:
159 self.publisher.publish_hass_state(discovery)
160 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT, PublishPolicy.MQTT): 160 ↛ 162line 160 didn't jump to line 162 because the condition on line 160 was always true
161 self.publisher.publish_discovery(discovery)
162 if (
163 discovery.update_policy == UpdatePolicy.AUTO
164 and discovery.can_update
165 and discovery.latest_version != discovery.current_version
166 ):
167 # TODO: review auto update, trigger by version, use update interval as throttle
168 elapsed: float = (
169 time.time() - discovery.update_last_attempt if discovery.update_last_attempt is not None else -1
170 )
171 if elapsed == -1 or elapsed > UPDATE_INTERVAL:
172 dlog.info(
173 "Initiate auto update (last:%s, elapsed:%s, max:%s)",
174 discovery.update_last_attempt,
175 elapsed,
176 UPDATE_INTERVAL,
177 )
178 self.publisher.local_message(discovery, "install")
179 else:
180 dlog.info("Skipping auto update")
181 except asyncio.CancelledError:
182 dlog.info("Discovery handling cancelled")
183 except Exception:
184 dlog.exception("Discovery handling failed")
185 raise
187 async def interrupt_tasks(self) -> None:
188 running_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
189 log.info(f"Cancelling {len(running_tasks)} tasks")
190 for t in running_tasks:
191 log.debug("Cancelling task", task=t.get_name())
192 if t.get_name() == "heartbeat":
193 t.cancel()
194 await asyncio.gather(*running_tasks, return_exceptions=True)
195 log.debug("Cancellation task completed")
197 def shutdown(self, *args, exit_code: int = 143) -> None: # noqa: ANN002, ARG002
198 if self.self_bounce.is_set():
199 exit_code = 1
200 log.info("Self bouncing, overriding exit_code: %s", exit_code)
201 else:
202 log.info("Shutting down, exit_code: %s", exit_code)
203 self.stopped.set()
204 for scanner in self.scanners:
205 scanner.stop()
206 interrupt_task = asyncio.get_event_loop().create_task(
207 self.interrupt_tasks(),
208 eager_start=True, # pyright: ignore[reportCallIssue]
209 name="interrupt",
210 )
211 for t in asyncio.all_tasks():
212 log.debug("Tasks waiting = %s", t)
213 self.publisher.stop()
214 log.debug("Interrupt: %s", interrupt_task.done())
215 log.info("Shutdown handling complete")
216 sys.exit(exit_code) # SIGTERM Graceful Exit = 143
218 async def heartbeat(self) -> None:
219 if not self.publisher.is_available():
220 return
221 heartbeat_stamp: str = datetime.now(UTC).isoformat()
222 log.debug("Publishing health check", heartbeat_stamp=heartbeat_stamp)
223 self.publisher.publish(
224 topic=self.heartbeat_topic,
225 payload={
226 "version": updates2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
227 "node": self.cfg.node.name,
228 "heartbeat_raw": time.time(),
229 "heartbeat_stamp": heartbeat_stamp,
230 "startup_stamp": self.startup_timestamp,
231 "last_scan_stamp": self.last_scan_timestamp,
232 "scan_count": self.scan_count,
233 },
234 )
237async def repeated_call(func: Callable, interval: int = 60, *args: Any, **kwargs: Any) -> None:
238 # run a task periodically indefinitely
239 while True:
240 try:
241 await func(*args, **kwargs)
242 await asyncio.sleep(interval)
243 except asyncio.CancelledError:
244 log.debug("Periodic task cancelled", func=func)
245 except Exception:
246 log.exception("Periodic task failed")
249def run() -> None:
250 import asyncio
251 import signal
253 # pyright: ignore[reportAttributeAccessIssue]
254 log.debug(f"Starting updates2mqtt v{updates2mqtt.version}") # pyright: ignore[reportAttributeAccessIssue]
255 app = App()
257 signal.signal(signal.SIGTERM, app.shutdown)
258 try:
259 asyncio.run(app.main_loop(), debug=False)
260 log.debug("App exited gracefully")
261 except asyncio.CancelledError:
262 log.debug("App exited on cancelled task")
265if __name__ == "__main__": 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true
266 run()