Coverage for src / updates2mqtt / app.py: 72%

152 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 23:58 +0000

1import asyncio 

2import logging 

3import sys 

4import time 

5import uuid 

6from collections.abc import Callable 

7from datetime import UTC, datetime 

8from pathlib import Path 

9from threading import Event 

10from typing import Any 

11 

12import structlog 

13 

14import updates2mqtt 

15from updates2mqtt.model import Discovery, ReleaseProvider 

16 

17from .config import Config, PublishPolicy, UpdatePolicy, load_app_config 

18from .integrations.docker import DockerProvider 

19from .mqtt import MqttPublisher 

20 

21log = structlog.get_logger() 

22 

23CONF_FILE = Path("conf/config.yaml") 

24UPDATE_INTERVAL = 60 * 60 * 4 

25 

26# #TODO: 

27# - Set install in progress 

28# - Support apt 

29# - Retry on registry fetch fail 

30# - Fetcher in subproc or thread 

31# - Clear command message after install 

32# - use git hash as alt to img ref for builds, or daily builds 

33 

34 

35class App: 

36 def __init__(self) -> None: 

37 self.startup_timestamp: str = datetime.now(UTC).isoformat() 

38 self.last_scan_timestamp: str | None = None 

39 app_config: Config | None = load_app_config(CONF_FILE) 

40 if app_config is None: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true

41 log.error(f"Invalid configuration at {CONF_FILE}") 

42 log.error("Edit config to fix missing or invalid values and restart") 

43 log.error("Alternately supply correct MQTT_HOST,MQTT_USER,MQTT_PASSWORD environment variables") 

44 log.error("Exiting app") 

45 sys.exit(1) 

46 self.cfg: Config = app_config 

47 self.self_bounce: Event = Event() 

48 

49 structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, str(self.cfg.log.level)))) 

50 log.debug("Logging initialized", level=self.cfg.log.level) 

51 

52 self.publisher = MqttPublisher(self.cfg.mqtt, self.cfg.node, self.cfg.homeassistant) 

53 

54 self.scanners: list[ReleaseProvider] = [] 

55 self.scan_count: int = 0 

56 self.last_scan: str | None = None 

57 if self.cfg.docker.enabled: 57 ↛ 67line 57 didn't jump to line 67 because the condition on line 57 was always true

58 self.scanners.append( 

59 DockerProvider( 

60 self.cfg.docker, 

61 self.cfg.node, 

62 packages=self.cfg.packages, 

63 github_cfg=self.cfg.github, 

64 self_bounce=self.self_bounce, 

65 ) 

66 ) 

67 self.stopped = Event() 

68 self.healthcheck_topic = self.cfg.node.healthcheck.topic_template.format(node_name=self.cfg.node.name) 

69 

70 log.info( 

71 "App configured", 

72 node=self.cfg.node.name, 

73 scan_interval=self.cfg.scan_interval, 

74 healthcheck_topic=self.healthcheck_topic, 

75 ) 

76 

77 async def scan(self) -> None: 

78 session = uuid.uuid4().hex 

79 for scanner in self.scanners: 

80 slog = log.bind(source_type=scanner.source_type, session=session) 

81 if self.stopped.is_set(): 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true

82 break 

83 slog.info("Scanning ...") 

84 async with asyncio.TaskGroup() as tg: 

85 # xtype: ignore[attr-defined] 

86 async for discovery in scanner.scan(session): 

87 tg.create_task(self.on_discovery(discovery), name=f"discovery-{discovery.name}") 

88 if self.stopped.is_set(): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 slog.debug("Breaking scan loop on stopped event") 

90 break 

91 await self.publisher.clean_topics(scanner) 

92 self.scan_count += 1 

93 slog.info(f"Scan #{self.scan_count} complete") 

94 self.last_scan_timestamp = datetime.now(UTC).isoformat() 

95 

96 async def main_loop(self) -> None: 

97 log.debug("Starting run loop") 

98 self.publisher.start() 

99 

100 if self.cfg.node.healthcheck.enabled: 100 ↛ 109line 100 didn't jump to line 109 because the condition on line 100 was always true

101 await self.healthcheck() # initial eager healthcheck 

102 log.info( 

103 f"Setting up healthcheck every {self.cfg.node.healthcheck.interval} seconds to topic {self.healthcheck_topic}" 

104 ) 

105 self.healthcheck_loop_task = asyncio.create_task( 

106 repeated_call(self.healthcheck, interval=self.cfg.node.healthcheck.interval), name="healthcheck" 

107 ) 

108 

109 for scanner in self.scanners: 

110 scanner.initialize() 

111 self.publisher.subscribe_hass_command(scanner) 

112 await self.publisher.clean_topics(scanner, initial=True) 

113 

114 while not self.stopped.is_set() and self.publisher.is_available(): 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true

115 await self.scan() 

116 if not self.stopped.is_set() and self.publisher.is_available(): 

117 await asyncio.sleep(self.cfg.scan_interval) 

118 else: 

119 log.info("Stop requested, exiting run loop and skipping sleep") 

120 

121 if not self.publisher.is_available(): 121 ↛ 125line 121 didn't jump to line 125 because the condition on line 121 was always true

122 log.error("MQTT fatal connection error - check host,port,user,password in config") 

123 self.shutdown(exit_code=1) 

124 

125 log.debug("Exiting run loop") 

126 

127 async def on_discovery(self, discovery: Discovery) -> None: 

128 dlog = log.bind(name=discovery.name) 

129 try: 

130 if discovery.publish_policy == PublishPolicy.HOMEASSISTANT and self.cfg.homeassistant.discovery.enabled: 130 ↛ 133line 130 didn't jump to line 133 because the condition on line 130 was always true

131 # Switch off MQTT discovery if not Home Assistant enabled 

132 self.publisher.publish_hass_config(discovery) 

133 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT): 133 ↛ 135line 133 didn't jump to line 135 because the condition on line 133 was always true

134 self.publisher.publish_hass_state(discovery) 

135 if discovery.publish_policy in (PublishPolicy.HOMEASSISTANT, PublishPolicy.MQTT): 135 ↛ 137line 135 didn't jump to line 137 because the condition on line 135 was always true

136 self.publisher.publish_discovery(discovery) 

137 if ( 137 ↛ 143line 137 didn't jump to line 143 because the condition on line 137 was never true

138 discovery.update_policy == UpdatePolicy.AUTO 

139 and discovery.can_update 

140 and discovery.latest_version != discovery.current_version 

141 ): 

142 # TODO: review auto update, trigger by version, use update interval as throttle 

143 elapsed: float = ( 

144 time.time() - discovery.update_last_attempt if discovery.update_last_attempt is not None else -1 

145 ) 

146 if elapsed == -1 or elapsed > UPDATE_INTERVAL: 

147 dlog.info( 

148 "Initiate auto update (last:%s, elapsed:%s, max:%s)", 

149 discovery.update_last_attempt, 

150 elapsed, 

151 UPDATE_INTERVAL, 

152 ) 

153 self.publisher.local_message(discovery, "install") 

154 else: 

155 dlog.info("Skipping auto update") 

156 except asyncio.CancelledError: 

157 dlog.info("Discovery handling cancelled") 

158 except Exception: 

159 dlog.exception("Discovery handling failed") 

160 raise 

161 

162 async def interrupt_tasks(self) -> None: 

163 running_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] 

164 log.info(f"Cancelling {len(running_tasks)} tasks") 

165 for t in running_tasks: 

166 log.debug("Cancelling task", task=t.get_name()) 

167 if t.get_name() == "healthcheck" or t.get_name().startswith("discovery-"): 

168 t.cancel() 

169 await asyncio.gather(*running_tasks, return_exceptions=True) 

170 log.debug("Cancellation task completed") 

171 

172 def shutdown(self, *args, exit_code: int = 143) -> None: # noqa: ANN002, ARG002 

173 if self.self_bounce.is_set(): 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 exit_code = 1 

175 log.info("Self bouncing, overriding exit_code: %s", exit_code) 

176 else: 

177 log.info("Shutting down, exit_code: %s", exit_code) 

178 self.stopped.set() 

179 for scanner in self.scanners: 

180 scanner.stop() 

181 interrupt_task = asyncio.get_event_loop().create_task( 

182 self.interrupt_tasks(), 

183 eager_start=True, # type: ignore[call-arg] # pyright: ignore[reportCallIssue] 

184 name="interrupt", 

185 ) 

186 for t in asyncio.all_tasks(): 

187 log.debug("Tasks waiting = %s", t) 

188 self.publisher.stop() 

189 log.debug("Interrupt: %s", interrupt_task.done()) 

190 log.info("Shutdown handling complete") 

191 sys.exit(exit_code) # SIGTERM Graceful Exit = 143 

192 

193 async def healthcheck(self) -> None: 

194 if not self.publisher.is_available(): 194 ↛ 196line 194 didn't jump to line 196 because the condition on line 194 was always true

195 return 

196 heartbeat_stamp: str = datetime.now(UTC).isoformat() 

197 log.debug("Publishing health check", heartbeat_stamp=heartbeat_stamp) 

198 self.publisher.publish( 

199 topic=self.healthcheck_topic, 

200 payload={ 

201 "version": updates2mqtt.version, # pyright: ignore[reportAttributeAccessIssue] 

202 "node": self.cfg.node.name, 

203 "heartbeat_raw": time.time(), 

204 "heartbeat_stamp": heartbeat_stamp, 

205 "startup_stamp": self.startup_timestamp, 

206 "last_scan_stamp": self.last_scan_timestamp, 

207 "scan_count": self.scan_count, 

208 }, 

209 ) 

210 

211 

212async def repeated_call(func: Callable, interval: int = 60, *args: Any, **kwargs: Any) -> None: 

213 # run a task periodically indefinitely 

214 while True: 

215 try: 

216 await func(*args, **kwargs) 

217 await asyncio.sleep(interval) 

218 except asyncio.CancelledError: 

219 log.debug("Periodic task cancelled", func=func) 

220 except Exception: 

221 log.exception("Periodic task failed") 

222 

223 

224def run() -> None: 

225 import asyncio 

226 import signal 

227 

228 # pyright: ignore[reportAttributeAccessIssue] 

229 log.debug(f"Starting updates2mqtt v{updates2mqtt.version}") # pyright: ignore[reportAttributeAccessIssue] 

230 app = App() 

231 

232 signal.signal(signal.SIGTERM, app.shutdown) 

233 try: 

234 asyncio.run(app.main_loop(), debug=False) 

235 log.debug("App exited gracefully") 

236 except asyncio.CancelledError: 

237 log.debug("App exited on cancelled task") 

238 

239 

240if __name__ == "__main__": 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 run()