Coverage for src / updates2mqtt / app.py: 80%
156 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-02 10:03 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-02 10:03 +0000
1import asyncio
2import logging
3import sys
4import time
5import uuid
6from collections.abc import Callable
7from datetime import UTC, datetime
8from pathlib import Path
9from threading import Event
10from typing import Any
12import structlog
13import structlog.dev
15import updates2mqtt
16from updates2mqtt.model import Discovery, ReleaseProvider
18from .config import Config, PublishPolicy, UpdatePolicy, load_app_config
19from .integrations.docker import DockerProvider
20from .mqtt import MqttPublisher
22log = structlog.get_logger()
24CONF_FILE = Path("conf/config.yaml")
25UPDATE_INTERVAL = 60 * 60 * 4
27# #TODO:
28# - Set install in progress
29# - Support apt
30# - Retry on registry fetch fail
31# - Fetcher in subproc or thread
32# - Clear command message after install
33# - use git hash as alt to img ref for builds, or daily builds
36class App:
37 def __init__(self) -> None:
38 self.startup_timestamp: str = datetime.now(UTC).isoformat()
39 self.last_scan_timestamp: str | None = None
40 app_config: Config | None = load_app_config(CONF_FILE)
41 if app_config is None: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 log.error(f"Invalid configuration at {CONF_FILE}")
43 log.error("Edit config to fix missing or invalid values and restart")
44 log.error("Alternately supply correct MQTT_HOST,MQTT_USER,MQTT_PASSWORD environment variables")
45 log.error("Exiting app")
46 sys.exit(1)
47 self.cfg: Config = app_config
48 self.self_bounce: Event = Event()
50 if not self.cfg.log.json or sys.stderr.isatty(): 50 ↛ 62line 50 didn't jump to line 62 because the condition on line 50 was always true
51 # Pretty printing when we run in a terminal session.
52 # Automatically prints pretty tracebacks when "rich" is installed
53 renderers: list[Any] = [
54 structlog.dev.ConsoleRenderer(
55 colors=sys.stderr.isatty(),
56 exception_formatter=(
57 structlog.dev.RichTracebackFormatter() if sys.stderr.isatty() else structlog.dev.plain_traceback
58 ),
59 ),
60 ]
61 else:
62 renderers = [
63 structlog.processors.dict_tracebacks,
64 structlog.processors.JSONRenderer(),
65 ]
66 structlog.configure(
67 cache_logger_on_first_use=True,
68 wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(self.cfg.log.level))),
69 processors=[
70 structlog.contextvars.merge_contextvars,
71 structlog.processors.add_log_level,
72 structlog.processors.StackInfoRenderer(),
73 structlog.dev.set_exc_info,
74 structlog.processors.TimeStamper(fmt="iso"),
75 *renderers,
76 ],
77 )
79 log.debug("Logging initialized", level=self.cfg.log.level, json=self.cfg.log.json, tty=sys.stderr.isatty())
81 self.publisher = MqttPublisher(self.cfg.mqtt, self.cfg.node, self.cfg.homeassistant)
83 self.scanners: list[ReleaseProvider] = []
84 self.scan_count: int = 0
85 self.last_scan: str | None = None
86 if self.cfg.docker.enabled: 86 ↛ 96line 86 didn't jump to line 96 because the condition on line 86 was always true
87 self.scanners.append(
88 DockerProvider(
89 self.cfg.docker,
90 self.cfg.node,
91 packages=self.cfg.packages,
92 github_cfg=self.cfg.github,
93 self_bounce=self.self_bounce,
94 )
95 )
96 self.stopped = Event()
97 self.healthcheck_topic = self.cfg.node.healthcheck.topic_template.format(node_name=self.cfg.node.name)
99 log.info(
100 "App configured",
101 node=self.cfg.node.name,
102 scan_interval=self.cfg.scan_interval,
103 healthcheck_topic=self.healthcheck_topic,
104 )
106 async def scan(self) -> None:
107 session = uuid.uuid4().hex
108 for scanner in self.scanners:
109 slog = log.bind(source_type=scanner.source_type, session=session)
110 if self.stopped.is_set(): 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true
111 break
112 slog.info("Scanning ...")
113 async with asyncio.TaskGroup() as tg:
114 # xtype: ignore[attr-defined]
115 async for discovery in scanner.scan(session):
116 tg.create_task(self.on_discovery(discovery), name=f"discovery-{discovery.name}")
117 if self.stopped.is_set(): 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 slog.debug("Breaking scan loop on stopped event")
119 break
120 await self.publisher.clean_topics(scanner)
121 self.scan_count += 1
122 slog.info(f"Scan #{self.scan_count} complete")
123 self.last_scan_timestamp = datetime.now(UTC).isoformat()
125 async def main_loop(self) -> None:
126 log.debug("Starting run loop")
127 self.publisher.start()
129 if self.cfg.node.healthcheck.enabled: 129 ↛ 138line 129 didn't jump to line 138 because the condition on line 129 was always true
130 await self.healthcheck() # initial eager healthcheck
131 log.info(
132 f"Setting up healthcheck every {self.cfg.node.healthcheck.interval} seconds to topic {self.healthcheck_topic}"
133 )
134 self.healthcheck_loop_task = asyncio.create_task(
135 repeated_call(self.healthcheck, interval=self.cfg.node.healthcheck.interval), name="healthcheck"
136 )
138 for scanner in self.scanners:
139 scanner.initialize()
140 self.publisher.subscribe_hass_command(scanner)
141 await self.publisher.clean_topics(scanner, initial=True)
143 while not self.stopped.is_set() and self.publisher.is_available(): 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 await self.scan()
145 if not self.stopped.is_set() and self.publisher.is_available():
146 await asyncio.sleep(self.cfg.scan_interval)
147 else:
148 log.info("Stop requested, exiting run loop and skipping sleep")
150 if not self.publisher.is_available(): 150 ↛ 154line 150 didn't jump to line 154 because the condition on line 150 was always true
151 log.error("MQTT fatal connection error - check host,port,user,password in config")
152 self.shutdown(exit_code=1)
154 log.debug("Exiting run loop")
156 async def on_discovery(self, discovery: Discovery) -> None:
157 dlog = log.bind(name=discovery.name)
158 try:
159 if discovery.publish_policy == PublishPolicy.HOMEASSISTANT and self.cfg.homeassistant.discovery.enabled:
160 # Switch off MQTT discovery if not Home Assistant enabled
161 self.publisher.publish_hass_config(discovery)
162 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT):
163 self.publisher.publish_hass_state(discovery)
164 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT, PublishPolicy.MQTT): 164 ↛ 166line 164 didn't jump to line 166 because the condition on line 164 was always true
165 self.publisher.publish_discovery(discovery)
166 if (
167 discovery.update_policy == UpdatePolicy.AUTO
168 and discovery.can_update
169 and discovery.latest_version != discovery.current_version
170 ):
171 # TODO: review auto update, trigger by version, use update interval as throttle
172 elapsed: float = (
173 time.time() - discovery.update_last_attempt if discovery.update_last_attempt is not None else -1
174 )
175 if elapsed == -1 or elapsed > UPDATE_INTERVAL:
176 dlog.info(
177 "Initiate auto update (last:%s, elapsed:%s, max:%s)",
178 discovery.update_last_attempt,
179 elapsed,
180 UPDATE_INTERVAL,
181 )
182 self.publisher.local_message(discovery, "install")
183 else:
184 dlog.info("Skipping auto update")
185 except asyncio.CancelledError:
186 dlog.info("Discovery handling cancelled")
187 except Exception:
188 dlog.exception("Discovery handling failed")
189 raise
191 async def interrupt_tasks(self) -> None:
192 running_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
193 log.info(f"Cancelling {len(running_tasks)} tasks")
194 for t in running_tasks:
195 log.debug("Cancelling task", task=t.get_name())
196 if t.get_name() == "healthcheck" or t.get_name().startswith("discovery-"):
197 t.cancel()
198 await asyncio.gather(*running_tasks, return_exceptions=True)
199 log.debug("Cancellation task completed")
201 def shutdown(self, *args, exit_code: int = 143) -> None: # noqa: ANN002, ARG002
202 if self.self_bounce.is_set():
203 exit_code = 1
204 log.info("Self bouncing, overriding exit_code: %s", exit_code)
205 else:
206 log.info("Shutting down, exit_code: %s", exit_code)
207 self.stopped.set()
208 for scanner in self.scanners:
209 scanner.stop()
210 interrupt_task = asyncio.get_event_loop().create_task(
211 self.interrupt_tasks(),
212 eager_start=True, # pyright: ignore[reportCallIssue]
213 name="interrupt",
214 )
215 for t in asyncio.all_tasks():
216 log.debug("Tasks waiting = %s", t)
217 self.publisher.stop()
218 log.debug("Interrupt: %s", interrupt_task.done())
219 log.info("Shutdown handling complete")
220 sys.exit(exit_code) # SIGTERM Graceful Exit = 143
222 async def healthcheck(self) -> None:
223 if not self.publisher.is_available():
224 return
225 heartbeat_stamp: str = datetime.now(UTC).isoformat()
226 log.debug("Publishing health check", heartbeat_stamp=heartbeat_stamp)
227 self.publisher.publish(
228 topic=self.healthcheck_topic,
229 payload={
230 "version": updates2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
231 "node": self.cfg.node.name,
232 "heartbeat_raw": time.time(),
233 "heartbeat_stamp": heartbeat_stamp,
234 "startup_stamp": self.startup_timestamp,
235 "last_scan_stamp": self.last_scan_timestamp,
236 "scan_count": self.scan_count,
237 },
238 )
241async def repeated_call(func: Callable, interval: int = 60, *args: Any, **kwargs: Any) -> None:
242 # run a task periodically indefinitely
243 while True:
244 try:
245 await func(*args, **kwargs)
246 await asyncio.sleep(interval)
247 except asyncio.CancelledError:
248 log.debug("Periodic task cancelled", func=func)
249 except Exception:
250 log.exception("Periodic task failed")
253def run() -> None:
254 import asyncio
255 import signal
257 # pyright: ignore[reportAttributeAccessIssue]
258 log.debug(f"Starting updates2mqtt v{updates2mqtt.version}") # pyright: ignore[reportAttributeAccessIssue]
259 app = App()
261 signal.signal(signal.SIGTERM, app.shutdown)
262 try:
263 asyncio.run(app.main_loop(), debug=False)
264 log.debug("App exited gracefully")
265 except asyncio.CancelledError:
266 log.debug("App exited on cancelled task")
269if __name__ == "__main__": 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true
270 run()