Coverage for src / updates2mqtt / app.py: 72%

155 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-20 02:29 +0000

1import asyncio 

2import logging 

3import sys 

4import time 

5import uuid 

6from collections.abc import Callable 

7from datetime import UTC, datetime 

8from pathlib import Path 

9from threading import Event 

10from typing import Any 

11 

12import structlog 

13 

14import updates2mqtt 

15from updates2mqtt.model import Discovery, ReleaseProvider 

16 

17from .config import Config, PublishPolicy, UpdatePolicy, load_app_config 

18from .integrations.docker import DockerProvider 

19from .mqtt import MqttPublisher 

20 

21log = structlog.get_logger() 

22 

23CONF_FILE = Path("conf/config.yaml") 

24PKG_INFO_FILE = Path("./common_packages.yaml") 

25UPDATE_INTERVAL = 60 * 60 * 4 

26 

27# #TODO: 

28# - Set install in progress 

29# - Support apt 

30# - Retry on registry fetch fail 

31# - Fetcher in subproc or thread 

32# - Clear command message after install 

33# - use git hash as alt to img ref for builds, or daily builds 

34 

35 

36class App: 

37 def __init__(self) -> None: 

38 self.startup_timestamp: str = datetime.now(UTC).isoformat() 

39 self.last_scan_timestamp: str | None = None 

40 app_config: Config | None = load_app_config(CONF_FILE) 

41 if app_config is None: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 log.error(f"Invalid configuration at {CONF_FILE}") 

43 log.error("Edit config to fix missing or invalid values and restart") 

44 log.error("Alternately supply correct MQTT_HOST,MQTT_USER,MQTT_PASSWORD environment variables") 

45 log.error("Exiting app") 

46 sys.exit(1) 

47 self.cfg: Config = app_config 

48 self.self_bounce: Event = Event() 

49 

50 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(self.cfg.log.level)))) 

51 log.debug("Logging initialized", level=self.cfg.log.level) 

52 

53 self.publisher = MqttPublisher(self.cfg.mqtt, self.cfg.node, self.cfg.homeassistant) 

54 

55 self.scanners: list[ReleaseProvider] = [] 

56 self.scan_count: int = 0 

57 self.last_scan: str | None = None 

58 if self.cfg.docker.enabled: 58 ↛ 60line 58 didn't jump to line 60 because the condition on line 58 was always true

59 self.scanners.append(DockerProvider(self.cfg.docker, self.cfg.node, self.self_bounce)) 

60 self.stopped = Event() 

61 self.healthcheck_topic = self.cfg.node.healthcheck.topic_template.format(node_name=self.cfg.node.name) 

62 

63 log.info( 

64 "App configured", 

65 node=self.cfg.node.name, 

66 scan_interval=self.cfg.scan_interval, 

67 healthcheck_topic=self.healthcheck_topic, 

68 ) 

69 

70 async def scan(self) -> None: 

71 session = uuid.uuid4().hex 

72 for scanner in self.scanners: 

73 slog = log.bind(source_type=scanner.source_type, session=session) 

74 slog.info("Cleaning topics before scan") 

75 if self.scan_count == 0: 75 ↛ 77line 75 didn't jump to line 77 because the condition on line 75 was always true

76 await self.publisher.clean_topics(scanner, None, force=True) 

77 if self.stopped.is_set(): 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true

78 break 

79 slog.info("Scanning ...") 

80 async with asyncio.TaskGroup() as tg: 

81 # xtype: ignore[attr-defined] 

82 async for discovery in scanner.scan(session): 

83 tg.create_task(self.on_discovery(discovery), name=f"discovery-{discovery.name}") 

84 if self.stopped.is_set(): 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true

85 slog.debug("Breaking scan loop on stopped event") 

86 break 

87 await self.publisher.clean_topics(scanner, session, force=False) 

88 self.scan_count += 1 

89 slog.info(f"Scan #{self.scan_count} complete") 

90 self.last_scan_timestamp = datetime.now(UTC).isoformat() 

91 

92 async def main_loop(self) -> None: 

93 log.debug("Starting run loop") 

94 self.publisher.start() 

95 

96 if self.cfg.node.healthcheck.enabled: 96 ↛ 105line 96 didn't jump to line 105 because the condition on line 96 was always true

97 await self.healthcheck() # initial eager healthcheck 

98 log.info( 

99 f"Setting up healthcheck every {self.cfg.node.healthcheck.interval} seconds to topic {self.healthcheck_topic}" 

100 ) 

101 self.healthcheck_loop_task = asyncio.create_task( 

102 repeated_call(self.healthcheck, interval=self.cfg.node.healthcheck.interval), name="healthcheck" 

103 ) 

104 

105 for scanner in self.scanners: 

106 scanner.initialize() 

107 self.publisher.subscribe_hass_command(scanner) 

108 

109 while not self.stopped.is_set() and self.publisher.is_available(): 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 await self.scan() 

111 if not self.stopped.is_set() and self.publisher.is_available(): 

112 await asyncio.sleep(self.cfg.scan_interval) 

113 else: 

114 log.info("Stop requested, exiting run loop and skipping sleep") 

115 

116 if not self.publisher.is_available(): 116 ↛ 120line 116 didn't jump to line 120 because the condition on line 116 was always true

117 log.error("MQTT fatal connection error - check host,port,user,password in config") 

118 self.shutdown(exit_code=1) 

119 

120 log.debug("Exiting run loop") 

121 

122 async def on_discovery(self, discovery: Discovery) -> None: 

123 dlog = log.bind(name=discovery.name) 

124 try: 

125 if discovery.publish_policy == PublishPolicy.HOMEASSISTANT and self.cfg.homeassistant.discovery.enabled: 125 ↛ 128line 125 didn't jump to line 128 because the condition on line 125 was always true

126 # Switch off MQTT discovery if not Home Assistant enabled 

127 self.publisher.publish_hass_config(discovery) 

128 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT): 128 ↛ 130line 128 didn't jump to line 130 because the condition on line 128 was always true

129 self.publisher.publish_hass_state(discovery) 

130 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT, PublishPolicy.MQTT): 130 ↛ 132line 130 didn't jump to line 132 because the condition on line 130 was always true

131 self.publisher.publish_discovery(discovery) 

132 if ( 132 ↛ 138line 132 didn't jump to line 138 because the condition on line 132 was never true

133 discovery.update_policy == UpdatePolicy.AUTO 

134 and discovery.can_update 

135 and discovery.latest_version != discovery.current_version 

136 ): 

137 # TODO: review auto update, trigger by version, use update interval as throttle 

138 elapsed: float = ( 

139 time.time() - discovery.update_last_attempt if discovery.update_last_attempt is not None else -1 

140 ) 

141 if elapsed == -1 or elapsed > UPDATE_INTERVAL: 

142 dlog.info( 

143 "Initiate auto update (last:%s, elapsed:%s, max:%s)", 

144 discovery.update_last_attempt, 

145 elapsed, 

146 UPDATE_INTERVAL, 

147 ) 

148 self.publisher.local_message(discovery, "install") 

149 else: 

150 dlog.info("Skipping auto update") 

151 except asyncio.CancelledError: 

152 dlog.info("Discovery handling cancelled") 

153 except Exception: 

154 dlog.exception("Discovery handling failed") 

155 raise 

156 

157 async def interrupt_tasks(self) -> None: 

158 running_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] 

159 log.info(f"Cancelling {len(running_tasks)} tasks") 

160 for t in running_tasks: 

161 log.debug("Cancelling task", task=t.get_name()) 

162 if t.get_name() == "healthcheck" or t.get_name().startswith("discovery-"): 

163 t.cancel() 

164 await asyncio.gather(*running_tasks, return_exceptions=True) 

165 log.debug("Cancellation task completed") 

166 

167 def shutdown(self, *args, exit_code: int = 143) -> None: # noqa: ANN002, ARG002 

168 if self.self_bounce.is_set(): 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true

169 exit_code = 1 

170 log.info("Self bouncing, overriding exit_code: %s", exit_code) 

171 else: 

172 log.info("Shutting down, exit_code: %s", exit_code) 

173 self.stopped.set() 

174 for scanner in self.scanners: 

175 scanner.stop() 

176 interrupt_task = asyncio.get_event_loop().create_task( 

177 self.interrupt_tasks(), 

178 eager_start=True, # type: ignore[call-arg] # pyright: ignore[reportCallIssue] 

179 name="interrupt", 

180 ) 

181 for t in asyncio.all_tasks(): 

182 log.debug("Tasks waiting = %s", t) 

183 self.publisher.stop() 

184 log.debug("Interrupt: %s", interrupt_task.done()) 

185 log.info("Shutdown handling complete") 

186 sys.exit(exit_code) # SIGTERM Graceful Exit = 143 

187 

188 async def healthcheck(self) -> None: 

189 if not self.publisher.is_available(): 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true

190 return 

191 heartbeat_stamp: str = datetime.now(UTC).isoformat() 

192 log.debug("Publishing health check", heartbeat_stamp=heartbeat_stamp) 

193 self.publisher.publish( 

194 topic=self.healthcheck_topic, 

195 payload={ 

196 "version": updates2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

197 "node": self.cfg.node.name, 

198 "heartbeat_raw": time.time(), 

199 "heartbeat_stamp": heartbeat_stamp, 

200 "startup_stamp": self.startup_timestamp, 

201 "last_scan_stamp": self.last_scan_timestamp, 

202 "scan_count": self.scan_count, 

203 }, 

204 ) 

205 

206 

207async def repeated_call(func: Callable, interval: int = 60, *args: Any, **kwargs: Any) -> None: 

208 # run a task periodically indefinitely 

209 while True: 

210 try: 

211 await func(*args, **kwargs) 

212 await asyncio.sleep(interval) 

213 except asyncio.CancelledError: 

214 log.debug("Periodic task cancelled", func=func) 

215 except Exception: 

216 log.exception("Periodic task failed") 

217 

218 

219def run() -> None: 

220 import asyncio 

221 import signal 

222 

223 # pyright: ignore[reportAttributeAccessIssue] 

224 log.debug(f"Starting updates2mqtt v{updates2mqtt.version}") # pyright: ignore[reportAttributeAccessIssue] 

225 app = App() 

226 

227 signal.signal(signal.SIGTERM, app.shutdown) 

228 try: 

229 asyncio.run(app.main_loop(), debug=False) 

230 log.debug("App exited gracefully") 

231 except asyncio.CancelledError: 

232 log.debug("App exited on cancelled task") 

233 

234 

235if __name__ == "__main__": 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 run()