Coverage for src / updates2mqtt / app.py: 72%
155 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-20 02:29 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-20 02:29 +0000
1import asyncio
2import logging
3import sys
4import time
5import uuid
6from collections.abc import Callable
7from datetime import UTC, datetime
8from pathlib import Path
9from threading import Event
10from typing import Any
12import structlog
14import updates2mqtt
15from updates2mqtt.model import Discovery, ReleaseProvider
17from .config import Config, PublishPolicy, UpdatePolicy, load_app_config
18from .integrations.docker import DockerProvider
19from .mqtt import MqttPublisher
21log = structlog.get_logger()
23CONF_FILE = Path("conf/config.yaml")
24PKG_INFO_FILE = Path("./common_packages.yaml")
25UPDATE_INTERVAL = 60 * 60 * 4
27# #TODO:
28# - Set install in progress
29# - Support apt
30# - Retry on registry fetch fail
31# - Fetcher in subproc or thread
32# - Clear command message after install
33# - use git hash as alt to img ref for builds, or daily builds
36class App:
37 def __init__(self) -> None:
38 self.startup_timestamp: str = datetime.now(UTC).isoformat()
39 self.last_scan_timestamp: str | None = None
40 app_config: Config | None = load_app_config(CONF_FILE)
41 if app_config is None: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 log.error(f"Invalid configuration at {CONF_FILE}")
43 log.error("Edit config to fix missing or invalid values and restart")
44 log.error("Alternately supply correct MQTT_HOST,MQTT_USER,MQTT_PASSWORD environment variables")
45 log.error("Exiting app")
46 sys.exit(1)
47 self.cfg: Config = app_config
48 self.self_bounce: Event = Event()
50 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(self.cfg.log.level))))
51 log.debug("Logging initialized", level=self.cfg.log.level)
53 self.publisher = MqttPublisher(self.cfg.mqtt, self.cfg.node, self.cfg.homeassistant)
55 self.scanners: list[ReleaseProvider] = []
56 self.scan_count: int = 0
57 self.last_scan: str | None = None
58 if self.cfg.docker.enabled: 58 ↛ 60line 58 didn't jump to line 60 because the condition on line 58 was always true
59 self.scanners.append(DockerProvider(self.cfg.docker, self.cfg.node, self.self_bounce))
60 self.stopped = Event()
61 self.healthcheck_topic = self.cfg.node.healthcheck.topic_template.format(node_name=self.cfg.node.name)
63 log.info(
64 "App configured",
65 node=self.cfg.node.name,
66 scan_interval=self.cfg.scan_interval,
67 healthcheck_topic=self.healthcheck_topic,
68 )
70 async def scan(self) -> None:
71 session = uuid.uuid4().hex
72 for scanner in self.scanners:
73 slog = log.bind(source_type=scanner.source_type, session=session)
74 slog.info("Cleaning topics before scan")
75 if self.scan_count == 0: 75 ↛ 77line 75 didn't jump to line 77 because the condition on line 75 was always true
76 await self.publisher.clean_topics(scanner, None, force=True)
77 if self.stopped.is_set(): 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true
78 break
79 slog.info("Scanning ...")
80 async with asyncio.TaskGroup() as tg:
81 # xtype: ignore[attr-defined]
82 async for discovery in scanner.scan(session):
83 tg.create_task(self.on_discovery(discovery), name=f"discovery-{discovery.name}")
84 if self.stopped.is_set(): 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 slog.debug("Breaking scan loop on stopped event")
86 break
87 await self.publisher.clean_topics(scanner, session, force=False)
88 self.scan_count += 1
89 slog.info(f"Scan #{self.scan_count} complete")
90 self.last_scan_timestamp = datetime.now(UTC).isoformat()
92 async def main_loop(self) -> None:
93 log.debug("Starting run loop")
94 self.publisher.start()
96 if self.cfg.node.healthcheck.enabled: 96 ↛ 105line 96 didn't jump to line 105 because the condition on line 96 was always true
97 await self.healthcheck() # initial eager healthcheck
98 log.info(
99 f"Setting up healthcheck every {self.cfg.node.healthcheck.interval} seconds to topic {self.healthcheck_topic}"
100 )
101 self.healthcheck_loop_task = asyncio.create_task(
102 repeated_call(self.healthcheck, interval=self.cfg.node.healthcheck.interval), name="healthcheck"
103 )
105 for scanner in self.scanners:
106 scanner.initialize()
107 self.publisher.subscribe_hass_command(scanner)
109 while not self.stopped.is_set() and self.publisher.is_available(): 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 await self.scan()
111 if not self.stopped.is_set() and self.publisher.is_available():
112 await asyncio.sleep(self.cfg.scan_interval)
113 else:
114 log.info("Stop requested, exiting run loop and skipping sleep")
116 if not self.publisher.is_available(): 116 ↛ 120line 116 didn't jump to line 120 because the condition on line 116 was always true
117 log.error("MQTT fatal connection error - check host,port,user,password in config")
118 self.shutdown(exit_code=1)
120 log.debug("Exiting run loop")
122 async def on_discovery(self, discovery: Discovery) -> None:
123 dlog = log.bind(name=discovery.name)
124 try:
125 if discovery.publish_policy == PublishPolicy.HOMEASSISTANT and self.cfg.homeassistant.discovery.enabled: 125 ↛ 128line 125 didn't jump to line 128 because the condition on line 125 was always true
126 # Switch off MQTT discovery if not Home Assistant enabled
127 self.publisher.publish_hass_config(discovery)
128 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT): 128 ↛ 130line 128 didn't jump to line 130 because the condition on line 128 was always true
129 self.publisher.publish_hass_state(discovery)
130 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT, PublishPolicy.MQTT): 130 ↛ 132line 130 didn't jump to line 132 because the condition on line 130 was always true
131 self.publisher.publish_discovery(discovery)
132 if ( 132 ↛ 138line 132 didn't jump to line 138 because the condition on line 132 was never true
133 discovery.update_policy == UpdatePolicy.AUTO
134 and discovery.can_update
135 and discovery.latest_version != discovery.current_version
136 ):
137 # TODO: review auto update, trigger by version, use update interval as throttle
138 elapsed: float = (
139 time.time() - discovery.update_last_attempt if discovery.update_last_attempt is not None else -1
140 )
141 if elapsed == -1 or elapsed > UPDATE_INTERVAL:
142 dlog.info(
143 "Initiate auto update (last:%s, elapsed:%s, max:%s)",
144 discovery.update_last_attempt,
145 elapsed,
146 UPDATE_INTERVAL,
147 )
148 self.publisher.local_message(discovery, "install")
149 else:
150 dlog.info("Skipping auto update")
151 except asyncio.CancelledError:
152 dlog.info("Discovery handling cancelled")
153 except Exception:
154 dlog.exception("Discovery handling failed")
155 raise
157 async def interrupt_tasks(self) -> None:
158 running_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
159 log.info(f"Cancelling {len(running_tasks)} tasks")
160 for t in running_tasks:
161 log.debug("Cancelling task", task=t.get_name())
162 if t.get_name() == "healthcheck" or t.get_name().startswith("discovery-"):
163 t.cancel()
164 await asyncio.gather(*running_tasks, return_exceptions=True)
165 log.debug("Cancellation task completed")
167 def shutdown(self, *args, exit_code: int = 143) -> None: # noqa: ANN002, ARG002
168 if self.self_bounce.is_set(): 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true
169 exit_code = 1
170 log.info("Self bouncing, overriding exit_code: %s", exit_code)
171 else:
172 log.info("Shutting down, exit_code: %s", exit_code)
173 self.stopped.set()
174 for scanner in self.scanners:
175 scanner.stop()
176 interrupt_task = asyncio.get_event_loop().create_task(
177 self.interrupt_tasks(),
178 eager_start=True, # type: ignore[call-arg] # pyright: ignore[reportCallIssue]
179 name="interrupt",
180 )
181 for t in asyncio.all_tasks():
182 log.debug("Tasks waiting = %s", t)
183 self.publisher.stop()
184 log.debug("Interrupt: %s", interrupt_task.done())
185 log.info("Shutdown handling complete")
186 sys.exit(exit_code) # SIGTERM Graceful Exit = 143
188 async def healthcheck(self) -> None:
189 if not self.publisher.is_available(): 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true
190 return
191 heartbeat_stamp: str = datetime.now(UTC).isoformat()
192 log.debug("Publishing health check", heartbeat_stamp=heartbeat_stamp)
193 self.publisher.publish(
194 topic=self.healthcheck_topic,
195 payload={
196 "version": updates2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
197 "node": self.cfg.node.name,
198 "heartbeat_raw": time.time(),
199 "heartbeat_stamp": heartbeat_stamp,
200 "startup_stamp": self.startup_timestamp,
201 "last_scan_stamp": self.last_scan_timestamp,
202 "scan_count": self.scan_count,
203 },
204 )
207async def repeated_call(func: Callable, interval: int = 60, *args: Any, **kwargs: Any) -> None:
208 # run a task periodically indefinitely
209 while True:
210 try:
211 await func(*args, **kwargs)
212 await asyncio.sleep(interval)
213 except asyncio.CancelledError:
214 log.debug("Periodic task cancelled", func=func)
215 except Exception:
216 log.exception("Periodic task failed")
219def run() -> None:
220 import asyncio
221 import signal
223 # pyright: ignore[reportAttributeAccessIssue]
224 log.debug(f"Starting updates2mqtt v{updates2mqtt.version}") # pyright: ignore[reportAttributeAccessIssue]
225 app = App()
227 signal.signal(signal.SIGTERM, app.shutdown)
228 try:
229 asyncio.run(app.main_loop(), debug=False)
230 log.debug("App exited gracefully")
231 except asyncio.CancelledError:
232 log.debug("App exited on cancelled task")
235if __name__ == "__main__": 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 run()