Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions supervisor/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ class ServerOptions(Options):
unlink_pidfile = False
unlink_socketfiles = False
mood = states.SupervisorStates.RUNNING
notify_sock_path = None
notify_sock = None

def __init__(self):
Options.__init__(self)
Expand Down Expand Up @@ -555,6 +557,8 @@ def realize(self, *arg, **kw):

self.server_configs = sconfigs = section.server_configs

self.notify_sock_path = section.notify_sock_path

# we need to set a fallback serverurl that process.spawn can use

# prefer a unix domain socket
Expand Down Expand Up @@ -651,6 +655,11 @@ def get(opt, default, **kwargs):
section.identifier = get('identifier', 'supervisor')
section.nodaemon = boolean(get('nodaemon', 'false'))
section.silent = boolean(get('silent', 'false'))
section.notify_sock_path = get('notifysock', None)
if section.notify_sock_path is not None:
if sys.platform != 'linux':
raise ValueError("notifysock is only supported on Linux platform")
section.notify_sock_path = existing_dirpath(section.notify_sock_path)

tempdir = tempfile.gettempdir()
section.childlogdir = existing_directory(get('childlogdir', tempdir))
Expand Down Expand Up @@ -1252,6 +1261,12 @@ def close_httpservers(self):
# also https://web.archive.org/web/20160729222427/http://www.plope.com/software/collector/253
server.close()

def close_notify_socket(self):
if self.notify_sock is None:
return
self.notify_sock.close()
self.notify_sock = None

def close_logger(self):
self.logger.close()

Expand Down Expand Up @@ -1288,6 +1303,14 @@ def openhttpservers(self, supervisord):
except ValueError as why:
self.usage(why.args[0])

def open_notify_socket(self):
if self.notify_sock_path is None:
return
self._try_unlink(self.notify_sock_path)
self.notify_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self.notify_sock.bind(self.notify_sock_path)
self.notify_sock.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)

def get_autochildlog_name(self, name, identifier, channel):
prefix='%s-%s---%s-' % (name, channel, identifier)
logfile = self.mktempfile(
Expand Down
23 changes: 23 additions & 0 deletions supervisor/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ def _spawn_as_child(self, filename, argv):
# set environment
env = os.environ.copy()
env['SUPERVISOR_ENABLED'] = '1'
notify_sock_path = self.config.options.notify_sock_path
if notify_sock_path is not None:
env['NOTIFY_SOCKET'] = notify_sock_path
serverurl = self.config.serverurl
if serverurl is None: # unset
serverurl = self.config.options.serverurl # might still be None
Expand Down Expand Up @@ -717,6 +720,17 @@ def transition(self):
self.pid))
self.kill(signal.SIGKILL)

def handle_sd_notify(self, msg):
for kv in msg.splitlines():
if kv == "READY=1":
if self.state == ProcessStates.STARTING:
self.delay = 0
self.backoff = 0
self.change_state(ProcessStates.RUNNING)
msg = 'entered RUNNING state, process sent READY=1'
self.config.options.logger.info('success: %s %s' % (self.config.name, msg))


class FastCGISubprocess(Subprocess):
"""Extends Subprocess class to handle FastCGI subprocesses"""

Expand Down Expand Up @@ -841,11 +855,20 @@ def get_dispatchers(self):
def before_remove(self):
pass

def handle_sd_notify(self, _msg, _pid):
pass

class ProcessGroup(ProcessGroupBase):
def transition(self):
for proc in self.processes.values():
proc.transition()

def handle_sd_notify(self, msg, pid):
for proc in self.processes.values():
if proc.pid == pid:
proc.handle_sd_notify(msg)
break

class FastCGIProcessGroup(ProcessGroup):

def __init__(self, config, **kwargs):
Expand Down
17 changes: 17 additions & 0 deletions supervisor/supervisord.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def run(self):
for config in self.options.process_group_configs:
self.add_process_group(config)
self.options.openhttpservers(self)
self.options.open_notify_socket()
self.options.setsignals()
if (not self.options.nodaemon) and self.options.first:
self.options.daemonize()
Expand Down Expand Up @@ -176,6 +177,7 @@ def runforever(self):
timeout = 1 # this cannot be fewer than the smallest TickEvent (5)

socket_map = self.options.get_socket_map()
notify_sock = self.options.notify_sock

while 1:
combined_map = {}
Expand All @@ -200,6 +202,9 @@ def runforever(self):
# killing everything), it's OK to shutdown or reload
raise asyncore.ExitNow

if self.options.notify_sock:
self.options.poller.register_readable(notify_sock.fileno())

for fd, dispatcher in combined_map.items():
if dispatcher.readable():
self.options.poller.register_readable(fd)
Expand Down Expand Up @@ -252,6 +257,17 @@ def runforever(self):
except:
pass

if notify_sock and notify_sock.fileno() in r:
import socket
import struct
data, ancdata, _, _ = notify_sock.recvmsg(4096, socket.CMSG_SPACE(struct.calcsize("3i")))
msg = data.decode("utf-8")
for cmsg_level, cmsg_type, cmsg_data in ancdata:
if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_CREDENTIALS:
pid, _, _ = struct.unpack("3i", cmsg_data)
for group in pgroups:
group.handle_sd_notify(msg, pid)

for group in pgroups:
group.transition()

Expand Down Expand Up @@ -372,6 +388,7 @@ def main(args=None, test=False):
else:
go(options)
options.close_httpservers()
options.close_notify_socket()
options.close_logger()
first = False
if test or (options.mood < SupervisorStates.RESTARTING):
Expand Down
5 changes: 5 additions & 0 deletions supervisor/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class DummyOptions:
make_pipes_exception = None
remove_exception = None
write_exception = None
notify_sock_path = None
notify_sock = None

def __init__(self):
self.identifier = 'supervisor'
Expand Down Expand Up @@ -117,6 +119,9 @@ def set_uid_or_exit(self):
def openhttpservers(self, supervisord):
self.httpservers_opened = True

def open_notify_socket(self):
pass

def daemonize(self):
self.daemonized = True

Expand Down