diff --git a/changelog/66993.fixed.md b/changelog/66993.fixed.md new file mode 100644 index 00000000000..775a4c4bf6c --- /dev/null +++ b/changelog/66993.fixed.md @@ -0,0 +1 @@ +Salt master waits for publish servers while starting up. diff --git a/salt/master.py b/salt/master.py index f39e2c0a802..8614b2f457a 100644 --- a/salt/master.py +++ b/salt/master.py @@ -813,6 +813,10 @@ class Master(SMaster): for _, opts in iter_transport_opts(self.opts): chan = salt.channel.server.PubServerChannel.factory(opts) chan.pre_fork(self.process_manager, kwargs={"secrets": SMaster.secrets}) + if not chan.transport.started.wait(30): + raise salt.exceptions.SaltMasterError( + "Publish server did not start within 30 seconds. Something went wrong." + ) pub_channels.append(chan) log.info("Creating master event publisher process") @@ -820,6 +824,10 @@ class Master(SMaster): self.opts ) ipc_publisher.pre_fork(self.process_manager) + if not ipc_publisher.transport.started.wait(30): + raise salt.exceptions.SaltMasterError( + "IPC publish server did not start within 30 seconds. Something went wrong." + ) self.process_manager.add_process( EventMonitor, args=[self.opts, ipc_publisher], diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 673c000e6a0..b45deebd965 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -1331,6 +1331,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): pull_path_perms=0o600, pub_path_perms=0o600, ssl=None, + started=None, ): self.opts = opts self.pub_sock = None @@ -1343,6 +1344,10 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): self.pull_path_perms = pull_path_perms self.pub_path_perms = pub_path_perms self.ssl = ssl + if started is None: + self.started = multiprocessing.Event() + else: + self.started = started @property def topic_support(self): @@ -1362,6 +1367,8 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): "pull_path": self.pull_path, "pub_path_perms": self.pub_path_perms, "pull_path_perms": self.pull_path_perms, + "ssl": self.ssl, + "started": self.started, } def publish_daemon( @@ -1456,6 +1463,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): with salt.utils.files.set_umask(0o177): self.pull_sock.start() os.chmod(self.pull_path, self.pull_path_perms) + self.started.set() def pre_fork(self, process_manager): """ diff --git a/salt/transport/ws.py b/salt/transport/ws.py index 20e55efe36b..65a2bb420e0 100644 --- a/salt/transport/ws.py +++ b/salt/transport/ws.py @@ -263,6 +263,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): pull_path_perms=0o600, pub_path_perms=0o600, ssl=None, + started=None, ): self.opts = opts self.pub_host = pub_host @@ -279,6 +280,10 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): self.pub_writer = None self.pub_reader = None self._connecting = None + if started is None: + self.started = multiprocessing.Event() + else: + self.started = started @property def topic_support(self): @@ -298,6 +303,8 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): "pull_path": self.pull_path, "pull_path_perms": self.pull_path_perms, "pub_path_perms": self.pub_path_perms, + "ssl": self.ssl, + "started": self.started, } def publish_daemon( @@ -305,6 +312,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): publish_payload, presence_callback=None, remove_presence_callback=None, + event=None, ): """ Bind to the interface specified in the configuration file @@ -375,6 +383,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): self.puller = await asyncio.start_server( self.pull_handler, self.pull_host, self.pull_port ) + self.started.set() while self._run.is_set(): await asyncio.sleep(0.3) await self.server.stop() diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 19b41f7e273..3a0b225d378 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -7,6 +7,7 @@ import asyncio.exceptions import errno import hashlib import logging +import multiprocessing import os import signal import sys @@ -854,6 +855,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): pull_path=None, pull_path_perms=0o600, pub_path_perms=0o600, + started=None, ): self.opts = opts self.pub_host = pub_host @@ -878,10 +880,31 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): self.daemon_pub_sock = None self.daemon_pull_sock = None self.daemon_monitor = None + if started is None: + self.started = multiprocessing.Event() + else: + self.started = started def __repr__(self): return f"" + def __setstate__(self, state): + self.__init__(**state) + + def __getstate__(self): + return { + "opts": self.opts, + "pub_host": self.pub_host, + "pub_port": self.pub_port, + "pub_path": self.pub_path, + "pull_host": self.pull_host, + "pull_port": self.pull_port, + "pull_path": self.pull_path, + "pub_path_perms": self.pub_path_perms, + "pull_path_perms": self.pull_path_perms, + "started": self.started, + } + def publish_daemon( self, publish_payload, @@ -954,6 +977,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): self.daemon_pub_sock, self.daemon_monitor, ) = self._get_sockets(self.daemon_context, ioloop) + self.started.set() while True: try: package = await self.daemon_pull_sock.recv() diff --git a/tests/pytests/functional/channel/test_server.py b/tests/pytests/functional/channel/test_server.py index cd2a828e41e..32f71068ac0 100644 --- a/tests/pytests/functional/channel/test_server.py +++ b/tests/pytests/functional/channel/test_server.py @@ -59,6 +59,8 @@ def transport_ids(value): @pytest.fixture( params=[ "ws", + "tcp", + "zeromq", ], ids=transport_ids, ) @@ -173,6 +175,8 @@ def test_pub_server_channel( master_config, ) server_channel.pre_fork(process_manager) + if not server_channel.transport.started.wait(30): + pytest.fail("Server channel did not start within 30 seconds.") req_server_channel = salt.channel.server.ReqServerChannel.factory(master_config) req_server_channel.pre_fork(process_manager)