Coverage for src/updates2mqtt/app.py: 80%

155 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-14 15:07 +0000

1import asyncio 

2import logging 

3import sys 

4import time 

5import uuid 

6from collections.abc import Callable 

7from datetime import UTC, datetime 

8from pathlib import Path 

9from threading import Event 

10from typing import Any 

11 

12import structlog 

13import structlog.dev 

14 

15import updates2mqtt 

16from updates2mqtt.model import Discovery, ReleaseProvider 

17 

18from .config import Config, PublishPolicy, UpdatePolicy, load_app_config 

19from .integrations.docker import DockerProvider 

20from .mqtt import MqttPublisher 

21 

22log = structlog.get_logger() 

23 

24CONF_FILE = Path("conf/config.yaml") 

25UPDATE_INTERVAL = 60 * 60 * 4 

26 

27# #TODO: 

28# - Set install in progress 

29# - Support apt 

30# - Retry on registry fetch fail 

31# - Fetcher in subproc or thread 

32# - Clear command message after install 

33# - use git hash as alt to img ref for builds, or daily builds 

34 

35 

36class App: 

37 def __init__(self) -> None: 

38 self.startup_timestamp: str = datetime.now(UTC).isoformat() 

39 self.last_scan_timestamp: str | None = None 

40 app_config: Config | None = load_app_config(CONF_FILE) 

41 if app_config is None: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 log.error(f"Invalid configuration at {CONF_FILE}") 

43 log.error("Edit config to fix missing or invalid values and restart") 

44 log.error("Alternately supply correct MQTT_HOST,MQTT_USER,MQTT_PASSWORD environment variables") 

45 log.error("Exiting app") 

46 sys.exit(1) 

47 self.cfg: Config = app_config 

48 self.self_bounce: Event = Event() 

49 

50 if not self.cfg.log.json or sys.stderr.isatty(): 50 ↛ 62line 50 didn't jump to line 62 because the condition on line 50 was always true

51 # Pretty printing when we run in a terminal session. 

52 # Automatically prints pretty tracebacks when "rich" is installed 

53 renderers: list[Any] = [ 

54 structlog.dev.ConsoleRenderer( 

55 colors=sys.stderr.isatty(), 

56 exception_formatter=( 

57 structlog.dev.RichTracebackFormatter() if sys.stderr.isatty() else structlog.dev.plain_traceback 

58 ), 

59 ), 

60 ] 

61 else: 

62 renderers = [ 

63 structlog.processors.dict_tracebacks, 

64 structlog.processors.JSONRenderer(), 

65 ] 

66 structlog.configure( 

67 cache_logger_on_first_use=True, 

68 wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(self.cfg.log.level))), 

69 processors=[ 

70 structlog.contextvars.merge_contextvars, 

71 structlog.processors.add_log_level, 

72 structlog.processors.StackInfoRenderer(), 

73 structlog.dev.set_exc_info, 

74 structlog.processors.TimeStamper(fmt="iso"), 

75 *renderers, 

76 ], 

77 ) 

78 

79 log.debug("Logging initialized", level=self.cfg.log.level, json=self.cfg.log.json, tty=sys.stderr.isatty()) 

80 

81 self.publisher = MqttPublisher(self.cfg.mqtt, self.cfg.node, self.cfg.homeassistant) 

82 

83 self.scanners: list[ReleaseProvider] = [] 

84 self.scan_count: int = 0 

85 self.last_scan: str | None = None 

86 if self.cfg.docker.enabled: 86 ↛ 96line 86 didn't jump to line 96 because the condition on line 86 was always true

87 self.scanners.append( 

88 DockerProvider( 

89 self.cfg.docker, 

90 self.cfg.node, 

91 packages=self.cfg.packages, 

92 github_cfg=self.cfg.github, 

93 self_bounce=self.self_bounce, 

94 ) 

95 ) 

96 self.stopped = Event() 

97 self.heartbeat_topic = self.cfg.node.healthcheck.topic_template.format(node_name=self.cfg.node.name) 

98 

99 log.info( 

100 "App configured", 

101 node=self.cfg.node.name, 

102 scan_interval=self.cfg.scan_interval, 

103 heartbeat_topic=self.heartbeat_topic, 

104 ) 

105 

106 async def scan(self) -> None: 

107 session = uuid.uuid4().hex 

108 for scanner in self.scanners: 

109 slog = log.bind(source_type=scanner.source_type, session=session) 

110 if self.stopped.is_set(): 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 break 

112 slog.info("Scanning ...") 

113 async for discovery in scanner.scan(session): 

114 await self.on_discovery(discovery) 

115 if self.stopped.is_set(): 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 slog.debug("Breaking scan loop on stopped event") 

117 break 

118 await self.publisher.clean_topics(scanner) 

119 self.scan_count += 1 

120 slog.info(f"Scan #{self.scan_count} complete") 

121 self.last_scan_timestamp = datetime.now(UTC).isoformat() 

122 

123 async def main_loop(self) -> None: 

124 log.debug("Starting run loop") 

125 self.publisher.start() 

126 

127 if self.cfg.node.healthcheck.enabled: 127 ↛ 134line 127 didn't jump to line 134 because the condition on line 127 was always true

128 await self.heartbeat() # initial eager heartbeat 

129 log.info(f"Setting up heartbeat every {self.cfg.node.healthcheck.interval} seconds to topic {self.heartbeat_topic}") 

130 self.heartbeat_loop_task = asyncio.create_task( 

131 repeated_call(self.heartbeat, interval=self.cfg.node.healthcheck.interval), name="heartbeat" 

132 ) 

133 

134 for scanner in self.scanners: 

135 scanner.initialize() 

136 self.publisher.subscribe_hass_command(scanner) 

137 await self.publisher.clean_topics(scanner, initial=True) 

138 

139 while not self.stopped.is_set() and self.publisher.is_available(): 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 await self.scan() 

141 if not self.stopped.is_set() and self.publisher.is_available(): 

142 await asyncio.sleep(self.cfg.scan_interval) 

143 else: 

144 log.info("Stop requested, exiting run loop and skipping sleep") 

145 

146 if not self.publisher.is_available(): 146 ↛ 150line 146 didn't jump to line 150 because the condition on line 146 was always true

147 log.error("MQTT fatal connection error - check host,port,user,password in config") 

148 self.shutdown(exit_code=1) 

149 

150 log.debug("Exiting run loop") 

151 

152 async def on_discovery(self, discovery: Discovery) -> None: 

153 dlog = log.bind(name=discovery.name) 

154 try: 

155 if discovery.publish_policy == PublishPolicy.HOMEASSISTANT and self.cfg.homeassistant.discovery.enabled: 

156 # Switch off MQTT discovery if not Home Assistant enabled 

157 self.publisher.publish_hass_config(discovery) 

158 if discovery.publish_policy == PublishPolicy.HOMEASSISTANT: 

159 self.publisher.publish_hass_state(discovery) 

160 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT, PublishPolicy.MQTT): 160 ↛ 162line 160 didn't jump to line 162 because the condition on line 160 was always true

161 self.publisher.publish_discovery(discovery) 

162 if ( 

163 discovery.update_policy == UpdatePolicy.AUTO 

164 and discovery.can_update 

165 and discovery.latest_version != discovery.current_version 

166 ): 

167 # TODO: review auto update, trigger by version, use update interval as throttle 

168 elapsed: float = ( 

169 time.time() - discovery.update_last_attempt if discovery.update_last_attempt is not None else -1 

170 ) 

171 if elapsed == -1 or elapsed > UPDATE_INTERVAL: 

172 dlog.info( 

173 "Initiate auto update (last:%s, elapsed:%s, max:%s)", 

174 discovery.update_last_attempt, 

175 elapsed, 

176 UPDATE_INTERVAL, 

177 ) 

178 self.publisher.local_message(discovery, "install") 

179 else: 

180 dlog.info("Skipping auto update") 

181 except asyncio.CancelledError: 

182 dlog.info("Discovery handling cancelled") 

183 except Exception: 

184 dlog.exception("Discovery handling failed") 

185 raise 

186 

187 async def interrupt_tasks(self) -> None: 

188 running_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] 

189 log.info(f"Cancelling {len(running_tasks)} tasks") 

190 for t in running_tasks: 

191 log.debug("Cancelling task", task=t.get_name()) 

192 if t.get_name() == "heartbeat": 

193 t.cancel() 

194 await asyncio.gather(*running_tasks, return_exceptions=True) 

195 log.debug("Cancellation task completed") 

196 

197 def shutdown(self, *args, exit_code: int = 143) -> None: # noqa: ANN002, ARG002 

198 if self.self_bounce.is_set(): 

199 exit_code = 1 

200 log.info("Self bouncing, overriding exit_code: %s", exit_code) 

201 else: 

202 log.info("Shutting down, exit_code: %s", exit_code) 

203 self.stopped.set() 

204 for scanner in self.scanners: 

205 scanner.stop() 

206 interrupt_task = asyncio.get_event_loop().create_task( 

207 self.interrupt_tasks(), 

208 eager_start=True, # pyright: ignore[reportCallIssue] 

209 name="interrupt", 

210 ) 

211 for t in asyncio.all_tasks(): 

212 log.debug("Tasks waiting = %s", t) 

213 self.publisher.stop() 

214 log.debug("Interrupt: %s", interrupt_task.done()) 

215 log.info("Shutdown handling complete") 

216 sys.exit(exit_code) # SIGTERM Graceful Exit = 143 

217 

218 async def heartbeat(self) -> None: 

219 if not self.publisher.is_available(): 

220 return 

221 heartbeat_stamp: str = datetime.now(UTC).isoformat() 

222 log.debug("Publishing health check", heartbeat_stamp=heartbeat_stamp) 

223 self.publisher.publish( 

224 topic=self.heartbeat_topic, 

225 payload={ 

226 "version": updates2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

227 "node": self.cfg.node.name, 

228 "heartbeat_raw": time.time(), 

229 "heartbeat_stamp": heartbeat_stamp, 

230 "startup_stamp": self.startup_timestamp, 

231 "last_scan_stamp": self.last_scan_timestamp, 

232 "scan_count": self.scan_count, 

233 }, 

234 ) 

235 

236 

237async def repeated_call(func: Callable, interval: int = 60, *args: Any, **kwargs: Any) -> None: 

238 # run a task periodically indefinitely 

239 while True: 

240 try: 

241 await func(*args, **kwargs) 

242 await asyncio.sleep(interval) 

243 except asyncio.CancelledError: 

244 log.debug("Periodic task cancelled", func=func) 

245 except Exception: 

246 log.exception("Periodic task failed") 

247 

248 

249def run() -> None: 

250 import asyncio 

251 import signal 

252 

253 # pyright: ignore[reportAttributeAccessIssue] 

254 log.debug(f"Starting updates2mqtt v{updates2mqtt.version}") # pyright: ignore[reportAttributeAccessIssue] 

255 app = App() 

256 

257 signal.signal(signal.SIGTERM, app.shutdown) 

258 try: 

259 asyncio.run(app.main_loop(), debug=False) 

260 log.debug("App exited gracefully") 

261 except asyncio.CancelledError: 

262 log.debug("App exited on cancelled task") 

263 

264 

265if __name__ == "__main__": 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true

266 run()