Coverage for src / updates2mqtt / app.py: 72%
152 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 23:58 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 23:58 +0000
1import asyncio
2import logging
3import sys
4import time
5import uuid
6from collections.abc import Callable
7from datetime import UTC, datetime
8from pathlib import Path
9from threading import Event
10from typing import Any
12import structlog
14import updates2mqtt
15from updates2mqtt.model import Discovery, ReleaseProvider
17from .config import Config, PublishPolicy, UpdatePolicy, load_app_config
18from .integrations.docker import DockerProvider
19from .mqtt import MqttPublisher
21log = structlog.get_logger()
23CONF_FILE = Path("conf/config.yaml")
24UPDATE_INTERVAL = 60 * 60 * 4
26# #TODO:
27# - Set install in progress
28# - Support apt
29# - Retry on registry fetch fail
30# - Fetcher in subproc or thread
31# - Clear command message after install
32# - use git hash as alt to img ref for builds, or daily builds
35class App:
36 def __init__(self) -> None:
37 self.startup_timestamp: str = datetime.now(UTC).isoformat()
38 self.last_scan_timestamp: str | None = None
39 app_config: Config | None = load_app_config(CONF_FILE)
40 if app_config is None: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true
41 log.error(f"Invalid configuration at {CONF_FILE}")
42 log.error("Edit config to fix missing or invalid values and restart")
43 log.error("Alternately supply correct MQTT_HOST,MQTT_USER,MQTT_PASSWORD environment variables")
44 log.error("Exiting app")
45 sys.exit(1)
46 self.cfg: Config = app_config
47 self.self_bounce: Event = Event()
49 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(self.cfg.log.level))))
50 log.debug("Logging initialized", level=self.cfg.log.level)
52 self.publisher = MqttPublisher(self.cfg.mqtt, self.cfg.node, self.cfg.homeassistant)
54 self.scanners: list[ReleaseProvider] = []
55 self.scan_count: int = 0
56 self.last_scan: str | None = None
57 if self.cfg.docker.enabled: 57 ↛ 67line 57 didn't jump to line 67 because the condition on line 57 was always true
58 self.scanners.append(
59 DockerProvider(
60 self.cfg.docker,
61 self.cfg.node,
62 packages=self.cfg.packages,
63 github_cfg=self.cfg.github,
64 self_bounce=self.self_bounce,
65 )
66 )
67 self.stopped = Event()
68 self.healthcheck_topic = self.cfg.node.healthcheck.topic_template.format(node_name=self.cfg.node.name)
70 log.info(
71 "App configured",
72 node=self.cfg.node.name,
73 scan_interval=self.cfg.scan_interval,
74 healthcheck_topic=self.healthcheck_topic,
75 )
77 async def scan(self) -> None:
78 session = uuid.uuid4().hex
79 for scanner in self.scanners:
80 slog = log.bind(source_type=scanner.source_type, session=session)
81 if self.stopped.is_set(): 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true
82 break
83 slog.info("Scanning ...")
84 async with asyncio.TaskGroup() as tg:
85 # xtype: ignore[attr-defined]
86 async for discovery in scanner.scan(session):
87 tg.create_task(self.on_discovery(discovery), name=f"discovery-{discovery.name}")
88 if self.stopped.is_set(): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 slog.debug("Breaking scan loop on stopped event")
90 break
91 await self.publisher.clean_topics(scanner)
92 self.scan_count += 1
93 slog.info(f"Scan #{self.scan_count} complete")
94 self.last_scan_timestamp = datetime.now(UTC).isoformat()
96 async def main_loop(self) -> None:
97 log.debug("Starting run loop")
98 self.publisher.start()
100 if self.cfg.node.healthcheck.enabled: 100 ↛ 109line 100 didn't jump to line 109 because the condition on line 100 was always true
101 await self.healthcheck() # initial eager healthcheck
102 log.info(
103 f"Setting up healthcheck every {self.cfg.node.healthcheck.interval} seconds to topic {self.healthcheck_topic}"
104 )
105 self.healthcheck_loop_task = asyncio.create_task(
106 repeated_call(self.healthcheck, interval=self.cfg.node.healthcheck.interval), name="healthcheck"
107 )
109 for scanner in self.scanners:
110 scanner.initialize()
111 self.publisher.subscribe_hass_command(scanner)
112 await self.publisher.clean_topics(scanner, initial=True)
114 while not self.stopped.is_set() and self.publisher.is_available(): 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true
115 await self.scan()
116 if not self.stopped.is_set() and self.publisher.is_available():
117 await asyncio.sleep(self.cfg.scan_interval)
118 else:
119 log.info("Stop requested, exiting run loop and skipping sleep")
121 if not self.publisher.is_available(): 121 ↛ 125line 121 didn't jump to line 125 because the condition on line 121 was always true
122 log.error("MQTT fatal connection error - check host,port,user,password in config")
123 self.shutdown(exit_code=1)
125 log.debug("Exiting run loop")
127 async def on_discovery(self, discovery: Discovery) -> None:
128 dlog = log.bind(name=discovery.name)
129 try:
130 if discovery.publish_policy == PublishPolicy.HOMEASSISTANT and self.cfg.homeassistant.discovery.enabled: 130 ↛ 133line 130 didn't jump to line 133 because the condition on line 130 was always true
131 # Switch off MQTT discovery if not Home Assistant enabled
132 self.publisher.publish_hass_config(discovery)
133 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT): 133 ↛ 135line 133 didn't jump to line 135 because the condition on line 133 was always true
134 self.publisher.publish_hass_state(discovery)
135 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT, PublishPolicy.MQTT): 135 ↛ 137line 135 didn't jump to line 137 because the condition on line 135 was always true
136 self.publisher.publish_discovery(discovery)
137 if ( 137 ↛ 143line 137 didn't jump to line 143 because the condition on line 137 was never true
138 discovery.update_policy == UpdatePolicy.AUTO
139 and discovery.can_update
140 and discovery.latest_version != discovery.current_version
141 ):
142 # TODO: review auto update, trigger by version, use update interval as throttle
143 elapsed: float = (
144 time.time() - discovery.update_last_attempt if discovery.update_last_attempt is not None else -1
145 )
146 if elapsed == -1 or elapsed > UPDATE_INTERVAL:
147 dlog.info(
148 "Initiate auto update (last:%s, elapsed:%s, max:%s)",
149 discovery.update_last_attempt,
150 elapsed,
151 UPDATE_INTERVAL,
152 )
153 self.publisher.local_message(discovery, "install")
154 else:
155 dlog.info("Skipping auto update")
156 except asyncio.CancelledError:
157 dlog.info("Discovery handling cancelled")
158 except Exception:
159 dlog.exception("Discovery handling failed")
160 raise
162 async def interrupt_tasks(self) -> None:
163 running_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
164 log.info(f"Cancelling {len(running_tasks)} tasks")
165 for t in running_tasks:
166 log.debug("Cancelling task", task=t.get_name())
167 if t.get_name() == "healthcheck" or t.get_name().startswith("discovery-"):
168 t.cancel()
169 await asyncio.gather(*running_tasks, return_exceptions=True)
170 log.debug("Cancellation task completed")
172 def shutdown(self, *args, exit_code: int = 143) -> None: # noqa: ANN002, ARG002
173 if self.self_bounce.is_set(): 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true
174 exit_code = 1
175 log.info("Self bouncing, overriding exit_code: %s", exit_code)
176 else:
177 log.info("Shutting down, exit_code: %s", exit_code)
178 self.stopped.set()
179 for scanner in self.scanners:
180 scanner.stop()
181 interrupt_task = asyncio.get_event_loop().create_task(
182 self.interrupt_tasks(),
183 eager_start=True, # type: ignore[call-arg] # pyright: ignore[reportCallIssue]
184 name="interrupt",
185 )
186 for t in asyncio.all_tasks():
187 log.debug("Tasks waiting = %s", t)
188 self.publisher.stop()
189 log.debug("Interrupt: %s", interrupt_task.done())
190 log.info("Shutdown handling complete")
191 sys.exit(exit_code) # SIGTERM Graceful Exit = 143
193 async def healthcheck(self) -> None:
194 if not self.publisher.is_available(): 194 ↛ 196line 194 didn't jump to line 196 because the condition on line 194 was always true
195 return
196 heartbeat_stamp: str = datetime.now(UTC).isoformat()
197 log.debug("Publishing health check", heartbeat_stamp=heartbeat_stamp)
198 self.publisher.publish(
199 topic=self.healthcheck_topic,
200 payload={
201 "version": updates2mqtt.version, # pyright: ignore[reportAttributeAccessIssue]
202 "node": self.cfg.node.name,
203 "heartbeat_raw": time.time(),
204 "heartbeat_stamp": heartbeat_stamp,
205 "startup_stamp": self.startup_timestamp,
206 "last_scan_stamp": self.last_scan_timestamp,
207 "scan_count": self.scan_count,
208 },
209 )
212async def repeated_call(func: Callable, interval: int = 60, *args: Any, **kwargs: Any) -> None:
213 # run a task periodically indefinitely
214 while True:
215 try:
216 await func(*args, **kwargs)
217 await asyncio.sleep(interval)
218 except asyncio.CancelledError:
219 log.debug("Periodic task cancelled", func=func)
220 except Exception:
221 log.exception("Periodic task failed")
224def run() -> None:
225 import asyncio
226 import signal
228 # pyright: ignore[reportAttributeAccessIssue]
229 log.debug(f"Starting updates2mqtt v{updates2mqtt.version}") # pyright: ignore[reportAttributeAccessIssue]
230 app = App()
232 signal.signal(signal.SIGTERM, app.shutdown)
233 try:
234 asyncio.run(app.main_loop(), debug=False)
235 log.debug("App exited gracefully")
236 except asyncio.CancelledError:
237 log.debug("App exited on cancelled task")
240if __name__ == "__main__": 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 run()