Add master pub channels skeleton

This commit is contained in:
Daniel A. Wozniak 2023-08-01 17:55:52 -07:00 committed by Gareth J. Greenaway
parent 8764aa9eea
commit b473ed193a
4 changed files with 203 additions and 20 deletions

View file

@ -670,3 +670,64 @@ class AsyncPullChannel:
import salt.transport.ipc
return salt.transport.ipc.IPCMessageServer(opts, **kwargs)
class AsyncMasterPubChannel:
""" """
async_methods = [
"connect",
]
close_methods = [
"close",
]
@classmethod
def factory(cls, opts, **kwargs):
io_loop = kwargs.get("io_loop")
if io_loop is None:
io_loop = tornado.ioloop.IOLoop.current()
transport = salt.transport.ipc_publish_client(opts, "master")
return cls(opts, transport, None, io_loop)
def __init__(self, opts, transport, auth, io_loop=None):
self.opts = opts
self.io_loop = io_loop
self.auth = auth
self.transport = transport
self._closing = False
self._reconnected = False
async def connect(self):
"""
Return a future which completes when connected to the remote publisher
"""
await self.transport.connect()
async def recv(self, timeout=None):
return await self.transport.recv(timeout)
def close(self):
"""
Close the channel
"""
self.transport.close()
def on_recv(self, callback=None):
"""
When jobs are received pass them (decoded) to callback
"""
return self.transport.on_recv(callback)
def __enter__(self):
return self
def __exit__(self, *args):
self.io_loop.spawn_callback(self.close)
async def __aenter__(self):
return self
async def __aexit__(self, *_):
await self.close()

View file

@ -16,6 +16,7 @@ import salt.crypt
import salt.master
import salt.payload
import salt.transport.frame
import salt.transport.ipc
import salt.utils.channel
import salt.utils.event
import salt.utils.files
@ -870,6 +871,10 @@ class PubServerChannel:
)
payload["load"] = crypticle.dumps(load)
if self.opts["sign_pub_messages"]:
if self.opts["cluster_id"]:
master_pem_path = os.path.join(self.opts["cluster_pki_dir"], "cluster.pem")
else:
master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem")
log.debug("Signing data packet")
payload["sig"] = salt.crypt.sign_message(
self.master_key.rsa_path, payload["load"]
@ -907,3 +912,106 @@ class PubServerChannel:
)
payload = salt.payload.dumps(load)
await self.transport.publish(payload)
class MasterPubServerChannel:
""" """
@classmethod
def factory(cls, opts, **kwargs):
transport = salt.transport.ipc_publish_server("master", opts)
return cls(opts, transport)
def __init__(self, opts, transport, presence_events=False):
self.opts = opts
self.transport = transport
self.io_loop = tornado.ioloop.IOLoop.current()
def __getstate__(self):
return {
"opts": self.opts,
"transport": self.transport,
}
def __setstate__(self, state):
self.opts = state["opts"]
self.transport = state["transport"]
def close(self):
self.transport.close()
def pre_fork(self, process_manager, kwargs=None):
"""
Do anything necessary pre-fork. Since this is on the master side this will
primarily be used to create IPC channels and create our daemon process to
do the actual publishing
:param func process_manager: A ProcessManager, from salt.utils.process.ProcessManager
"""
if hasattr(self.transport, "publish_daemon"):
process_manager.add_process(
self._publish_daemon, kwargs=kwargs, name="EventPublisher"
)
def _publish_daemon(self, **kwargs):
if (
self.opts["event_publisher_niceness"]
and not salt.utils.platform.is_windows()
):
log.info(
"setting EventPublisher niceness to %i",
self.opts["event_publisher_niceness"],
)
os.nice(self.opts["event_publisher_niceness"])
self.io_loop = tornado.ioloop.IOLoop.current()
tcp_master_pool_port = 4520
self.pushers = []
for master in self.opts.get("master_pool", []):
pusher = salt.transport.tcp.TCPPublishServer(
self.opts,
pull_host=master,
pull_port=tcp_master_pool_port,
)
self.pushers.append(pusher)
self.pool_puller = salt.transport.tcp.TCPPuller(
host=self.opts["interface"],
port=tcp_master_pool_port,
io_loop=self.io_loop,
payload_handler=self.handle_pool_publish,
)
self.pool_puller.start()
self.io_loop.add_callback(
self.transport.publisher,
self.publish_payload,
io_loop=self.io_loop,
)
# run forever
try:
self.io_loop.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
self.close()
async def publish(self, load):
"""
Publish "load" to minions
"""
await self.transport.publish(load)
async def handle_pool_publish(self, load, _):
log.error("Got event from other master")
try:
await self.transport.publish(load)
# Add an extra fallback in case a forked process leeks through
except Exception: # pylint: disable=broad-except
log.critical("Unexpected error while polling master events", exc_info=True)
return None
async def publish_payload(self, load, *args):
await self.transport.publish_payload(load)
for pusher in self.pushers:
log.error("Send event to master %s:%s", pusher.pull_host, pusher.pull_port)
await pusher.publish(load)

View file

@ -746,13 +746,18 @@ class Master(SMaster):
log.info("Creating master event publisher process")
ipc_publisher = salt.transport.ipc_publish_server("master", self.opts)
self.process_manager.add_process(
ipc_publisher.publish_daemon,
args=[
ipc_publisher.publish_payload,
],
name="EventPublisher",
ipc_publisher = salt.channel.server.MasterPubServerChannel.factory(
self.opts
)
ipc_publisher.pre_fork(self.process_manager)
# self.process_manager.add_process(
# ipc_publisher.publish_daemon,
# args=[
# ipc_publisher.publish_payload,
# ],
# name="EventPublisher",
# )
self.process_manager.add_process(
EventMonitor,

View file

@ -435,7 +435,7 @@ class TCPPubClient(salt.transport.base.PublishClient):
while True:
msg = await self.recv()
if msg:
callback(msg)
await callback(msg)
def on_recv(self, callback):
"""
@ -1120,7 +1120,7 @@ class TCPPuller:
but using either UNIX domain sockets or TCP sockets
"""
def __init__(self, socket_path, io_loop=None, payload_handler=None):
def __init__(self, host=None, port=None, path=None, io_loop=None, payload_handler=None):
"""
Create a new Tornado IPC server
@ -1136,7 +1136,9 @@ class TCPPuller:
:param func payload_handler: A function to customize handling of
incoming data.
"""
self.socket_path = socket_path
self.host = host
self.port = port
self.path = path
self._started = False
self.payload_handler = payload_handler
@ -1152,16 +1154,17 @@ class TCPPuller:
Blocks until socket is established
"""
# Start up the ioloop
log.trace("IPCServer: binding to socket: %s", self.socket_path)
if isinstance(self.socket_path, int):
if self.path:
log.trace("IPCServer: binding to socket: %s", self.path)
self.sock = tornado.netutil.bind_unix_socket(self.path)
else:
log.trace("IPCServer: binding to socket: %s:%s", self.host, self.port)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.setblocking(0)
self.sock.bind(("127.0.0.1", self.socket_path))
self.sock.bind((self.host, self.port))
# Based on default used in tornado.netutil.bind_sockets()
self.sock.listen(128)
else:
self.sock = tornado.netutil.bind_unix_socket(self.socket_path)
tornado.netutil.add_accept_handler(
self.sock,
@ -1210,7 +1213,10 @@ class TCPPuller:
write_callback(stream, framed_msg["head"]),
)
except tornado.iostream.StreamClosedError:
log.trace("Client disconnected from IPC %s", self.socket_path)
if self.path:
log.trace("Client disconnected from IPC %s", self.path)
else:
log.trace("Client disconnected from IPC %s:%s", self.host, self.port)
break
except OSError as exc:
# On occasion an exception will occur with
@ -1362,7 +1368,7 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
log.debug("Publish server binding pub to %s", self.pub_path)
sock = tornado.netutil.bind_unix_socket(self.pub_path)
else:
log.debug(
log.info(
"Publish server binding pub to %s:%s", self.pub_host, self.pub_port
)
sock = _get_socket(self.opts)
@ -1378,13 +1384,16 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
self.pub_server = pub_server
if self.pull_path:
log.debug("Publish server binding pull to %s", self.pull_path)
pull_uri = self.pull_path
pull_path = self.pull_path
else:
log.debug("Publish server binding pull to 127.0.0.1:%s", self.pull_port)
pull_uri = self.pull_port
log.info("Publish server binding pull to 127.0.0.1:%s", self.pull_port)
pull_host = self.pull_host
pull_port = self.pull_port
self.pull_sock = TCPPuller(
pull_uri,
host=self.pull_host,
port=self.pull_port,
path=self.pull_path,
io_loop=io_loop,
payload_handler=publish_payload,
)