From 86033fc76e03ee3b1b4d505265446554bea99f82 Mon Sep 17 00:00:00 2001 From: Stepan Blyschak Date: Fri, 24 Oct 2025 19:13:13 +0300 Subject: [PATCH] Implement `sd_notify` Linux daemons (lldpd, rsyslogd, frr) use sd_notify mechanism to let systemd know when they are ready. This PR implements this for supervisord and transitions processes into READY state when READY=1 is received. Signed-off-by: Stepan Blyschak --- supervisor/options.py | 23 +++++++++++++++++++++++ supervisor/process.py | 23 +++++++++++++++++++++++ supervisor/supervisord.py | 17 +++++++++++++++++ supervisor/tests/base.py | 5 +++++ 4 files changed, 68 insertions(+) diff --git a/supervisor/options.py b/supervisor/options.py index 5f1bc5ae5..19a91ae16 100644 --- a/supervisor/options.py +++ b/supervisor/options.py @@ -447,6 +447,8 @@ class ServerOptions(Options): unlink_pidfile = False unlink_socketfiles = False mood = states.SupervisorStates.RUNNING + notify_sock_path = None + notify_sock = None def __init__(self): Options.__init__(self) @@ -555,6 +557,8 @@ def realize(self, *arg, **kw): self.server_configs = sconfigs = section.server_configs + self.notify_sock_path = section.notify_sock_path + # we need to set a fallback serverurl that process.spawn can use # prefer a unix domain socket @@ -651,6 +655,11 @@ def get(opt, default, **kwargs): section.identifier = get('identifier', 'supervisor') section.nodaemon = boolean(get('nodaemon', 'false')) section.silent = boolean(get('silent', 'false')) + section.notify_sock_path = get('notifysock', None) + if section.notify_sock_path is not None: + if sys.platform != 'linux': + raise ValueError("notifysock is only supported on Linux platform") + section.notify_sock_path = existing_dirpath(section.notify_sock_path) tempdir = tempfile.gettempdir() section.childlogdir = existing_directory(get('childlogdir', tempdir)) @@ -1252,6 +1261,12 @@ def close_httpservers(self): # also https://web.archive.org/web/20160729222427/http://www.plope.com/software/collector/253 server.close() + def close_notify_socket(self): + if self.notify_sock is None: + return + self.notify_sock.close() + self.notify_sock = None + def close_logger(self): self.logger.close() @@ -1288,6 +1303,14 @@ def openhttpservers(self, supervisord): except ValueError as why: self.usage(why.args[0]) + def open_notify_socket(self): + if self.notify_sock_path is None: + return + self._try_unlink(self.notify_sock_path) + self.notify_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self.notify_sock.bind(self.notify_sock_path) + self.notify_sock.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1) + def get_autochildlog_name(self, name, identifier, channel): prefix='%s-%s---%s-' % (name, channel, identifier) logfile = self.mktempfile( diff --git a/supervisor/process.py b/supervisor/process.py index b394be812..bb6a69cf1 100644 --- a/supervisor/process.py +++ b/supervisor/process.py @@ -310,6 +310,9 @@ def _spawn_as_child(self, filename, argv): # set environment env = os.environ.copy() env['SUPERVISOR_ENABLED'] = '1' + notify_sock_path = self.config.options.notify_sock_path + if notify_sock_path is not None: + env['NOTIFY_SOCKET'] = notify_sock_path serverurl = self.config.serverurl if serverurl is None: # unset serverurl = self.config.options.serverurl # might still be None @@ -717,6 +720,17 @@ def transition(self): self.pid)) self.kill(signal.SIGKILL) + def handle_sd_notify(self, msg): + for kv in msg.splitlines(): + if kv == "READY=1": + if self.state == ProcessStates.STARTING: + self.delay = 0 + self.backoff = 0 + self.change_state(ProcessStates.RUNNING) + msg = 'entered RUNNING state, process sent READY=1' + self.config.options.logger.info('success: %s %s' % (self.config.name, msg)) + + class FastCGISubprocess(Subprocess): """Extends Subprocess class to handle FastCGI subprocesses""" @@ -841,11 +855,20 @@ def get_dispatchers(self): def before_remove(self): pass + def handle_sd_notify(self, _msg, _pid): + pass + class ProcessGroup(ProcessGroupBase): def transition(self): for proc in self.processes.values(): proc.transition() + def handle_sd_notify(self, msg, pid): + for proc in self.processes.values(): + if proc.pid == pid: + proc.handle_sd_notify(msg) + break + class FastCGIProcessGroup(ProcessGroup): def __init__(self, config, **kwargs): diff --git a/supervisor/supervisord.py b/supervisor/supervisord.py index 8d9ebe30f..13c192a11 100755 --- a/supervisor/supervisord.py +++ b/supervisor/supervisord.py @@ -85,6 +85,7 @@ def run(self): for config in self.options.process_group_configs: self.add_process_group(config) self.options.openhttpservers(self) + self.options.open_notify_socket() self.options.setsignals() if (not self.options.nodaemon) and self.options.first: self.options.daemonize() @@ -176,6 +177,7 @@ def runforever(self): timeout = 1 # this cannot be fewer than the smallest TickEvent (5) socket_map = self.options.get_socket_map() + notify_sock = self.options.notify_sock while 1: combined_map = {} @@ -200,6 +202,9 @@ def runforever(self): # killing everything), it's OK to shutdown or reload raise asyncore.ExitNow + if self.options.notify_sock: + self.options.poller.register_readable(notify_sock.fileno()) + for fd, dispatcher in combined_map.items(): if dispatcher.readable(): self.options.poller.register_readable(fd) @@ -252,6 +257,17 @@ def runforever(self): except: pass + if notify_sock and notify_sock.fileno() in r: + import socket + import struct + data, ancdata, _, _ = notify_sock.recvmsg(4096, socket.CMSG_SPACE(struct.calcsize("3i"))) + msg = data.decode("utf-8") + for cmsg_level, cmsg_type, cmsg_data in ancdata: + if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_CREDENTIALS: + pid, _, _ = struct.unpack("3i", cmsg_data) + for group in pgroups: + group.handle_sd_notify(msg, pid) + for group in pgroups: group.transition() @@ -372,6 +388,7 @@ def main(args=None, test=False): else: go(options) options.close_httpservers() + options.close_notify_socket() options.close_logger() first = False if test or (options.mood < SupervisorStates.RESTARTING): diff --git a/supervisor/tests/base.py b/supervisor/tests/base.py index f608b2bea..ed795db27 100644 --- a/supervisor/tests/base.py +++ b/supervisor/tests/base.py @@ -31,6 +31,8 @@ class DummyOptions: make_pipes_exception = None remove_exception = None write_exception = None + notify_sock_path = None + notify_sock = None def __init__(self): self.identifier = 'supervisor' @@ -117,6 +119,9 @@ def set_uid_or_exit(self): def openhttpservers(self, supervisord): self.httpservers_opened = True + def open_notify_socket(self): + pass + def daemonize(self): self.daemonized = True