Add started event to PublishServer

- Add a started event that gets set after the server transport is ready
for clients to connect.
- Wait on publish servers start while the master process is starting up.
This commit is contained in:
Daniel A. Wozniak 2024-10-23 18:09:14 -07:00 committed by Daniel Wozniak
parent 8b0d609ec6
commit d4ebb63638
6 changed files with 54 additions and 0 deletions

1
changelog/66993.fixed.md Normal file
View file

@ -0,0 +1 @@
Salt master waits for publish servers while starting up.

View file

@ -813,6 +813,10 @@ class Master(SMaster):
for _, opts in iter_transport_opts(self.opts):
chan = salt.channel.server.PubServerChannel.factory(opts)
chan.pre_fork(self.process_manager, kwargs={"secrets": SMaster.secrets})
if not chan.transport.started.wait(30):
raise salt.exceptions.SaltMasterError(
"Publish server did not start within 30 seconds. Something went wrong."
)
pub_channels.append(chan)
log.info("Creating master event publisher process")
@ -820,6 +824,10 @@ class Master(SMaster):
self.opts
)
ipc_publisher.pre_fork(self.process_manager)
if not ipc_publisher.transport.started.wait(30):
raise salt.exceptions.SaltMasterError(
"IPC publish server did not start within 30 seconds. Something went wrong."
)
self.process_manager.add_process(
EventMonitor,
args=[self.opts, ipc_publisher],

View file

@ -1331,6 +1331,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
pull_path_perms=0o600,
pub_path_perms=0o600,
ssl=None,
started=None,
):
self.opts = opts
self.pub_sock = None
@ -1343,6 +1344,10 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
self.pull_path_perms = pull_path_perms
self.pub_path_perms = pub_path_perms
self.ssl = ssl
if started is None:
self.started = multiprocessing.Event()
else:
self.started = started
@property
def topic_support(self):
@ -1362,6 +1367,8 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
"pull_path": self.pull_path,
"pub_path_perms": self.pub_path_perms,
"pull_path_perms": self.pull_path_perms,
"ssl": self.ssl,
"started": self.started,
}
def publish_daemon(
@ -1456,6 +1463,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
with salt.utils.files.set_umask(0o177):
self.pull_sock.start()
os.chmod(self.pull_path, self.pull_path_perms)
self.started.set()
def pre_fork(self, process_manager):
"""

View file

@ -263,6 +263,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
pull_path_perms=0o600,
pub_path_perms=0o600,
ssl=None,
started=None,
):
self.opts = opts
self.pub_host = pub_host
@ -279,6 +280,10 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
self.pub_writer = None
self.pub_reader = None
self._connecting = None
if started is None:
self.started = multiprocessing.Event()
else:
self.started = started
@property
def topic_support(self):
@ -298,6 +303,8 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
"pull_path": self.pull_path,
"pull_path_perms": self.pull_path_perms,
"pub_path_perms": self.pub_path_perms,
"ssl": self.ssl,
"started": self.started,
}
def publish_daemon(
@ -305,6 +312,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
publish_payload,
presence_callback=None,
remove_presence_callback=None,
event=None,
):
"""
Bind to the interface specified in the configuration file
@ -375,6 +383,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
self.puller = await asyncio.start_server(
self.pull_handler, self.pull_host, self.pull_port
)
self.started.set()
while self._run.is_set():
await asyncio.sleep(0.3)
await self.server.stop()

View file

@ -7,6 +7,7 @@ import asyncio.exceptions
import errno
import hashlib
import logging
import multiprocessing
import os
import signal
import sys
@ -854,6 +855,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
pull_path=None,
pull_path_perms=0o600,
pub_path_perms=0o600,
started=None,
):
self.opts = opts
self.pub_host = pub_host
@ -878,10 +880,31 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
self.daemon_pub_sock = None
self.daemon_pull_sock = None
self.daemon_monitor = None
if started is None:
self.started = multiprocessing.Event()
else:
self.started = started
def __repr__(self):
return f"<PublishServer pub_uri={self.pub_uri} pull_uri={self.pull_uri} at {hex(id(self))}>"
def __setstate__(self, state):
self.__init__(**state)
def __getstate__(self):
return {
"opts": self.opts,
"pub_host": self.pub_host,
"pub_port": self.pub_port,
"pub_path": self.pub_path,
"pull_host": self.pull_host,
"pull_port": self.pull_port,
"pull_path": self.pull_path,
"pub_path_perms": self.pub_path_perms,
"pull_path_perms": self.pull_path_perms,
"started": self.started,
}
def publish_daemon(
self,
publish_payload,
@ -954,6 +977,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
self.daemon_pub_sock,
self.daemon_monitor,
) = self._get_sockets(self.daemon_context, ioloop)
self.started.set()
while True:
try:
package = await self.daemon_pull_sock.recv()

View file

@ -59,6 +59,8 @@ def transport_ids(value):
@pytest.fixture(
params=[
"ws",
"tcp",
"zeromq",
],
ids=transport_ids,
)
@ -173,6 +175,8 @@ def test_pub_server_channel(
master_config,
)
server_channel.pre_fork(process_manager)
if not server_channel.transport.started.wait(30):
pytest.fail("Server channel did not start within 30 seconds.")
req_server_channel = salt.channel.server.ReqServerChannel.factory(master_config)
req_server_channel.pre_fork(process_manager)