Coverage for src / updates2mqtt / app.py: 80%

156 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-06-02 10:03 +0000

1import asyncio 

2import logging 

3import sys 

4import time 

5import uuid 

6from collections.abc import Callable 

7from datetime import UTC, datetime 

8from pathlib import Path 

9from threading import Event 

10from typing import Any 

11 

12import structlog 

13import structlog.dev 

14 

15import updates2mqtt 

16from updates2mqtt.model import Discovery, ReleaseProvider 

17 

18from .config import Config, PublishPolicy, UpdatePolicy, load_app_config 

19from .integrations.docker import DockerProvider 

20from .mqtt import MqttPublisher 

21 

22log = structlog.get_logger() 

23 

24CONF_FILE = Path("conf/config.yaml") 

25UPDATE_INTERVAL = 60 * 60 * 4 

26 

27# #TODO: 

28# - Set install in progress 

29# - Support apt 

30# - Retry on registry fetch fail 

31# - Fetcher in subproc or thread 

32# - Clear command message after install 

33# - use git hash as alt to img ref for builds, or daily builds 

34 

35 

36class App: 

37 def __init__(self) -> None: 

38 self.startup_timestamp: str = datetime.now(UTC).isoformat() 

39 self.last_scan_timestamp: str | None = None 

40 app_config: Config | None = load_app_config(CONF_FILE) 

41 if app_config is None: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 log.error(f"Invalid configuration at {CONF_FILE}") 

43 log.error("Edit config to fix missing or invalid values and restart") 

44 log.error("Alternately supply correct MQTT_HOST,MQTT_USER,MQTT_PASSWORD environment variables") 

45 log.error("Exiting app") 

46 sys.exit(1) 

47 self.cfg: Config = app_config 

48 self.self_bounce: Event = Event() 

49 

50 if not self.cfg.log.json or sys.stderr.isatty(): 50 ↛ 62line 50 didn't jump to line 62 because the condition on line 50 was always true

51 # Pretty printing when we run in a terminal session. 

52 # Automatically prints pretty tracebacks when "rich" is installed 

53 renderers: list[Any] = [ 

54 structlog.dev.ConsoleRenderer( 

55 colors=sys.stderr.isatty(), 

56 exception_formatter=( 

57 structlog.dev.RichTracebackFormatter() if sys.stderr.isatty() else structlog.dev.plain_traceback 

58 ), 

59 ), 

60 ] 

61 else: 

62 renderers = [ 

63 structlog.processors.dict_tracebacks, 

64 structlog.processors.JSONRenderer(), 

65 ] 

66 structlog.configure( 

67 cache_logger_on_first_use=True, 

68 wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(self.cfg.log.level))), 

69 processors=[ 

70 structlog.contextvars.merge_contextvars, 

71 structlog.processors.add_log_level, 

72 structlog.processors.StackInfoRenderer(), 

73 structlog.dev.set_exc_info, 

74 structlog.processors.TimeStamper(fmt="iso"), 

75 *renderers, 

76 ], 

77 ) 

78 

79 log.debug("Logging initialized", level=self.cfg.log.level, json=self.cfg.log.json, tty=sys.stderr.isatty()) 

80 

81 self.publisher = MqttPublisher(self.cfg.mqtt, self.cfg.node, self.cfg.homeassistant) 

82 

83 self.scanners: list[ReleaseProvider] = [] 

84 self.scan_count: int = 0 

85 self.last_scan: str | None = None 

86 if self.cfg.docker.enabled: 86 ↛ 96line 86 didn't jump to line 96 because the condition on line 86 was always true

87 self.scanners.append( 

88 DockerProvider( 

89 self.cfg.docker, 

90 self.cfg.node, 

91 packages=self.cfg.packages, 

92 github_cfg=self.cfg.github, 

93 self_bounce=self.self_bounce, 

94 ) 

95 ) 

96 self.stopped = Event() 

97 self.healthcheck_topic = self.cfg.node.healthcheck.topic_template.format(node_name=self.cfg.node.name) 

98 

99 log.info( 

100 "App configured", 

101 node=self.cfg.node.name, 

102 scan_interval=self.cfg.scan_interval, 

103 healthcheck_topic=self.healthcheck_topic, 

104 ) 

105 

106 async def scan(self) -> None: 

107 session = uuid.uuid4().hex 

108 for scanner in self.scanners: 

109 slog = log.bind(source_type=scanner.source_type, session=session) 

110 if self.stopped.is_set(): 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 break 

112 slog.info("Scanning ...") 

113 async with asyncio.TaskGroup() as tg: 

114 # xtype: ignore[attr-defined] 

115 async for discovery in scanner.scan(session): 

116 tg.create_task(self.on_discovery(discovery), name=f"discovery-{discovery.name}") 

117 if self.stopped.is_set(): 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 slog.debug("Breaking scan loop on stopped event") 

119 break 

120 await self.publisher.clean_topics(scanner) 

121 self.scan_count += 1 

122 slog.info(f"Scan #{self.scan_count} complete") 

123 self.last_scan_timestamp = datetime.now(UTC).isoformat() 

124 

125 async def main_loop(self) -> None: 

126 log.debug("Starting run loop") 

127 self.publisher.start() 

128 

129 if self.cfg.node.healthcheck.enabled: 129 ↛ 138line 129 didn't jump to line 138 because the condition on line 129 was always true

130 await self.healthcheck() # initial eager healthcheck 

131 log.info( 

132 f"Setting up healthcheck every {self.cfg.node.healthcheck.interval} seconds to topic {self.healthcheck_topic}" 

133 ) 

134 self.healthcheck_loop_task = asyncio.create_task( 

135 repeated_call(self.healthcheck, interval=self.cfg.node.healthcheck.interval), name="healthcheck" 

136 ) 

137 

138 for scanner in self.scanners: 

139 scanner.initialize() 

140 self.publisher.subscribe_hass_command(scanner) 

141 await self.publisher.clean_topics(scanner, initial=True) 

142 

143 while not self.stopped.is_set() and self.publisher.is_available(): 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 await self.scan() 

145 if not self.stopped.is_set() and self.publisher.is_available(): 

146 await asyncio.sleep(self.cfg.scan_interval) 

147 else: 

148 log.info("Stop requested, exiting run loop and skipping sleep") 

149 

150 if not self.publisher.is_available(): 150 ↛ 154line 150 didn't jump to line 154 because the condition on line 150 was always true

151 log.error("MQTT fatal connection error - check host,port,user,password in config") 

152 self.shutdown(exit_code=1) 

153 

154 log.debug("Exiting run loop") 

155 

156 async def on_discovery(self, discovery: Discovery) -> None: 

157 dlog = log.bind(name=discovery.name) 

158 try: 

159 if discovery.publish_policy == PublishPolicy.HOMEASSISTANT and self.cfg.homeassistant.discovery.enabled: 

160 # Switch off MQTT discovery if not Home Assistant enabled 

161 self.publisher.publish_hass_config(discovery) 

162 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT): 

163 self.publisher.publish_hass_state(discovery) 

164 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT, PublishPolicy.MQTT): 164 ↛ 166line 164 didn't jump to line 166 because the condition on line 164 was always true

165 self.publisher.publish_discovery(discovery) 

166 if ( 

167 discovery.update_policy == UpdatePolicy.AUTO 

168 and discovery.can_update 

169 and discovery.latest_version != discovery.current_version 

170 ): 

171 # TODO: review auto update, trigger by version, use update interval as throttle 

172 elapsed: float = ( 

173 time.time() - discovery.update_last_attempt if discovery.update_last_attempt is not None else -1 

174 ) 

175 if elapsed == -1 or elapsed > UPDATE_INTERVAL: 

176 dlog.info( 

177 "Initiate auto update (last:%s, elapsed:%s, max:%s)", 

178 discovery.update_last_attempt, 

179 elapsed, 

180 UPDATE_INTERVAL, 

181 ) 

182 self.publisher.local_message(discovery, "install") 

183 else: 

184 dlog.info("Skipping auto update") 

185 except asyncio.CancelledError: 

186 dlog.info("Discovery handling cancelled") 

187 except Exception: 

188 dlog.exception("Discovery handling failed") 

189 raise 

190 

191 async def interrupt_tasks(self) -> None: 

192 running_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] 

193 log.info(f"Cancelling {len(running_tasks)} tasks") 

194 for t in running_tasks: 

195 log.debug("Cancelling task", task=t.get_name()) 

196 if t.get_name() == "healthcheck" or t.get_name().startswith("discovery-"): 

197 t.cancel() 

198 await asyncio.gather(*running_tasks, return_exceptions=True) 

199 log.debug("Cancellation task completed") 

200 

201 def shutdown(self, *args, exit_code: int = 143) -> None: # noqa: ANN002, ARG002 

202 if self.self_bounce.is_set(): 

203 exit_code = 1 

204 log.info("Self bouncing, overriding exit_code: %s", exit_code) 

205 else: 

206 log.info("Shutting down, exit_code: %s", exit_code) 

207 self.stopped.set() 

208 for scanner in self.scanners: 

209 scanner.stop() 

210 interrupt_task = asyncio.get_event_loop().create_task( 

211 self.interrupt_tasks(), 

212 eager_start=True, # pyright: ignore[reportCallIssue] 

213 name="interrupt", 

214 ) 

215 for t in asyncio.all_tasks(): 

216 log.debug("Tasks waiting = %s", t) 

217 self.publisher.stop() 

218 log.debug("Interrupt: %s", interrupt_task.done()) 

219 log.info("Shutdown handling complete") 

220 sys.exit(exit_code) # SIGTERM Graceful Exit = 143 

221 

222 async def healthcheck(self) -> None: 

223 if not self.publisher.is_available(): 

224 return 

225 heartbeat_stamp: str = datetime.now(UTC).isoformat() 

226 log.debug("Publishing health check", heartbeat_stamp=heartbeat_stamp) 

227 self.publisher.publish( 

228 topic=self.healthcheck_topic, 

229 payload={ 

230 "version": updates2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

231 "node": self.cfg.node.name, 

232 "heartbeat_raw": time.time(), 

233 "heartbeat_stamp": heartbeat_stamp, 

234 "startup_stamp": self.startup_timestamp, 

235 "last_scan_stamp": self.last_scan_timestamp, 

236 "scan_count": self.scan_count, 

237 }, 

238 ) 

239 

240 

241async def repeated_call(func: Callable, interval: int = 60, *args: Any, **kwargs: Any) -> None: 

242 # run a task periodically indefinitely 

243 while True: 

244 try: 

245 await func(*args, **kwargs) 

246 await asyncio.sleep(interval) 

247 except asyncio.CancelledError: 

248 log.debug("Periodic task cancelled", func=func) 

249 except Exception: 

250 log.exception("Periodic task failed") 

251 

252 

253def run() -> None: 

254 import asyncio 

255 import signal 

256 

257 # pyright: ignore[reportAttributeAccessIssue] 

258 log.debug(f"Starting updates2mqtt v{updates2mqtt.version}") # pyright: ignore[reportAttributeAccessIssue] 

259 app = App() 

260 

261 signal.signal(signal.SIGTERM, app.shutdown) 

262 try: 

263 asyncio.run(app.main_loop(), debug=False) 

264 log.debug("App exited gracefully") 

265 except asyncio.CancelledError: 

266 log.debug("App exited on cancelled task") 

267 

268 

269if __name__ == "__main__": 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true

270 run()