Minion uses tcp transport for ipc

This commit is contained in:
Daniel A. Wozniak 2023-06-19 15:26:16 -07:00 committed by Gareth J. Greenaway
parent f3522141df
commit 7e3a5b10f1
5 changed files with 148 additions and 74 deletions

View file

@ -376,6 +376,7 @@ class AsyncPubChannel:
async_methods = [
"connect",
"_decode_messages",
# "close",
]
close_methods = [
"close",
@ -455,8 +456,11 @@ class AsyncPubChannel:
"""
Close the channel
"""
log.error("AsyncPubChannel.close called")
self.transport.close()
log.error("Transport closed")
if self.event is not None:
log.error("Event destroy called")
self.event.destroy()
self.event = None

View file

@ -1048,33 +1048,27 @@ class MinionManager(MinionBase):
# self.opts,
# io_loop=self.io_loop,
# )
def target():
import hashlib
self.opts["publish_port"] = 12321
hash_type = getattr(hashlib, self.opts["hash_type"])
ipc_publisher = salt.transport.publish_server(self.opts)
id_hash = hash_type(
salt.utils.stringutils.to_bytes(self.opts["id"])
).hexdigest()[:10]
epub_sock_path = "ipc://{}".format(
os.path.join(
self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
)
import hashlib
ipc_publisher = salt.transport.publish_server(self.opts)
hash_type = getattr(hashlib, self.opts["hash_type"])
id_hash = hash_type(
salt.utils.stringutils.to_bytes(self.opts["id"])
).hexdigest()[:10]
epub_sock_path = "ipc://{}".format(
os.path.join(
self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
)
if os.path.exists(epub_sock_path):
os.unlink(epub_sock_path)
epull_sock_path = "ipc://{}".format(
os.path.join(
self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
)
)
if os.path.exists(epub_sock_path):
os.unlink(epub_sock_path)
epull_sock_path = "ipc://{}".format(
os.path.join(
self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
)
ipc_publisher.pub_uri = epub_sock_path
ipc_publisher.pull_uri = epull_sock_path
ipc_publisher.publish_daemon(ipc_publisher.publish_payload)
thread = salt.utils.process.Process(target=target)
thread.start()
)
ipc_publisher.pub_uri = epub_sock_path
ipc_publisher.pull_uri = epull_sock_path
self.io_loop.add_callback(ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop)
self.event = salt.utils.event.get_event(
"minion", opts=self.opts, io_loop=self.io_loop
)
@ -3265,17 +3259,29 @@ class Minion(MinionBase):
if self._running is False:
return
log.error("Loop status %r %r", self, self.io_loop.asyncio_loop.is_running())
self.io_loop.asyncio_loop.stop()
log.error("Loop status %r %r", self, self.io_loop.asyncio_loop.is_running())
self._running = False
if hasattr(self, "schedule"):
del self.schedule
if hasattr(self, "pub_channel") and self.pub_channel is not None:
self.pub_channel.on_recv(None)
if hasattr(self.pub_channel, "close"):
self.pub_channel.close()
del self.pub_channel
if hasattr(self, "periodic_callbacks"):
for cb in self.periodic_callbacks.values():
cb.stop()
log.error("create pub_channel.close task %r", self)
self.pub_channel.close()
#self.io_loop.asyncio_loop.run_until_complete(self.pub_channel.close())
#if hasattr(self.pub_channel, "close"):
# asyncio.create_task(
# self.pub_channel.close()
# )
# #self.pub_channel.close()
#del self.pub_channel
if hasattr(self, "event"):
log.error("HAS EVENT")
#if hasattr(self, "periodic_callbacks"):
# for cb in self.periodic_callbacks.values():
# cb.stop()
log.error("%r destroy method finished", self)
# pylint: disable=W1701
def __del__(self):

View file

@ -85,7 +85,6 @@ def publish_client(opts, io_loop):
# switch on available ttypes
if ttype == "zeromq":
import salt.transport.zeromq
return salt.transport.zeromq.PublishClient(opts, io_loop)
elif ttype == "tcp":
import salt.transport.tcp

View file

@ -113,7 +113,7 @@ class PublishClient(salt.transport.base.PublishClient):
"connect",
"connect_uri",
"recv",
"close",
#"close",
]
close_methods = [
"close",
@ -139,7 +139,9 @@ class PublishClient(salt.transport.base.PublishClient):
self.hexid = hashlib.sha1(salt.utils.stringutils.to_bytes(_id)).hexdigest()
self._closing = False
self.context = zmq.asyncio.Context()
log.error("ZMQ Context creat %r", self)
self._socket = self.context.socket(zmq.SUB)
self._socket.setsockopt(zmq.LINGER, -1)
if zmq_filtering:
# TODO: constants file for "broadcast"
self._socket.setsockopt(zmq.SUBSCRIBE, b"broadcast")
@ -221,7 +223,29 @@ class PublishClient(salt.transport.base.PublishClient):
elif hasattr(self, "_socket"):
self._socket.close(0)
if hasattr(self, "context") and self.context.closed is False:
log.error("ZMQ Context term %r", self)
self.context.term()
log.error("ZMQ Context after term %r", self)
if self.callbacks:
for cb in self.callbacks:
running, task = self.callbacks[cb]
task.cancel()
def close(self):
if self._closing is True:
return
self._closing = True
if hasattr(self, "_monitor") and self._monitor is not None:
self._monitor.stop()
self._monitor = None
if hasattr(self, "_stream"):
self._stream.close(0)
elif hasattr(self, "_socket"):
self._socket.close(0)
if hasattr(self, "context") and self.context.closed is False:
log.error("ZMQ Context term %r", self)
self.context.term()
log.error("ZMQ Context after term %r", self)
# pylint: enable=W1701
def __enter__(self):
@ -335,18 +359,24 @@ class PublishClient(salt.transport.base.PublishClient):
try:
while running.is_set():
try:
msg = await self._socket.recv()
log.error("Waiting for pyaload from %r", self.uri)
msg = await self.recv(timeout=None)
log.error("Got for pyaload from %r", self.uri)
except zmq.error.ZMQError as exc:
log.error("ZMQERROR, %s", exc)
# We've disconnected just die
break
except Exception: # pylint: disable=broad-except
log.error("WTF", exc_info=True)
break
try:
await callback(msg)
except Exception: # pylint: disable=broad-except
log.error("Exception while running callback", exc_info=True)
log.debug("Callback done %r", callback)
if msg:
try:
log.error("Running callback for pyaload from %r", self.uri)
await callback(msg)
log.error("Finished callback for pyaload from %r", self.uri)
except Exception: # pylint: disable=broad-except
log.error("Exception while running callback", exc_info=True)
#log.debug("Callback done %r", callback)
except Exception as exc: # pylint: disable=broad-except
log.error("CONSUME Exception %s %s", self.uri, exc, exc_info=True)
log.error("CONSUME ENDING %s", self.uri)
@ -370,6 +400,7 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer):
"""
self.__setup_signals()
context = zmq.Context(self.opts["worker_threads"])
log.error("ZMQ Context create %r", self)
# Prepare the zeromq sockets
self.uri = "tcp://{interface}:{ret_port}".format(**self.opts)
self.clients = context.socket(zmq.ROUTER)
@ -417,7 +448,9 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer):
raise
except (KeyboardInterrupt, SystemExit):
break
log.error("ZMQ Context term %r", self)
context.term()
log.error("ZMQ Context after term %r", self)
def close(self):
"""
@ -443,7 +476,9 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer):
if hasattr(self, "_socket") and self._socket.closed is False:
self._socket.close()
if hasattr(self, "context") and self.context.closed is False:
log.error("ZMQ Context term %r", self)
self.context.term()
log.error("ZMQ Context after term %r", self)
def pre_fork(self, process_manager):
"""
@ -477,7 +512,7 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer):
:param IOLoop io_loop: An instance of a Tornado IOLoop, to handle event scheduling
"""
# context = zmq.Context(1)
context = zmq.asyncio.Context()
context = zmq.asyncio.Context(1)
self._socket = context.socket(zmq.REP)
# Linger -1 means we'll never discard messages.
self._socket.setsockopt(zmq.LINGER, -1)
@ -561,6 +596,7 @@ def _set_tcp_keepalive(zmq_socket, opts):
if "tcp_keepalive_intvl" in opts:
zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, opts["tcp_keepalive_intvl"])
ctx = zmq.asyncio.Context()
# TODO: unit tests!
class AsyncReqMessageClient:
@ -591,6 +627,7 @@ class AsyncReqMessageClient:
self.io_loop = io_loop
self.context = zmq.asyncio.Context()
log.error("ZMQ Context create %r", self)
self.send_queue = []
# mapping of message -> future
@ -616,7 +653,9 @@ class AsyncReqMessageClient:
self.socket.close()
if self.context.closed is False:
# This hangs if closing the stream causes an import error
log.error("ZMQ Context term %r", self)
self.context.term()
log.error("ZMQ Context after term %r", self)
def _init_socket(self):
if hasattr(self, "socket"):
@ -624,6 +663,7 @@ class AsyncReqMessageClient:
del self.socket
self.socket = self.context.socket(zmq.REQ)
self.socket.setsockopt(zmq.LINGER, -1)
# socket options
if hasattr(zmq, "RECONNECT_IVL_MAX"):
@ -753,7 +793,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
# _sock_data = threading.local()
async_methods = [
"publish",
"close",
#"close",
]
close_methods = [
"close",
@ -772,11 +812,15 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
interface = self.opts.get("interface", "127.0.0.1")
publish_port = self.opts.get("publish_port", 4560)
self.pub_uri = f"tcp://{interface}:{publish_port}"
self.ctx = zmq.asyncio.Context()
self.ctx = None
self.sock = None
self.deamon_context = None
self.deamon_pub_sock = None
self.deamon_pull_sock = None
self.deamon_monitor = None
def __repr__(self):
return f"<PublishServer pub_uri={self.pub_uri} pull_uri={self.pull_uri} at {id(self)}"
return f"<PublishServer pub_uri={self.pub_uri} pull_uri={self.pull_uri} at {hex(id(self))}>"
def publish_daemon(
self,
@ -789,9 +833,14 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
run in a thread or process as it creates and runs an it's own ioloop.
"""
ioloop = tornado.ioloop.IOLoop()
ioloop.asyncio_loop.set_debug(True)
self.io_loop = ioloop
context = zmq.asyncio.Context()
ioloop.add_callback(self.publisher, publish_payload)
try:
ioloop.start()
finally:
self.close()
def _get_sockets(self, context, ioloop):
log.error("ZMQ Context create %r", self)
pub_sock = context.socket(zmq.PUB)
monitor = ZeroMQSocketMonitor(pub_sock)
monitor.start_io_loop(ioloop)
@ -837,22 +886,19 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
pull_path,
0o600,
)
return pull_sock, pub_sock, monitor
async def run_publisher():
await self.publisher(pull_sock, publish_payload)
ioloop.add_callback(self.publisher, pull_sock, publish_payload)
try:
ioloop.start()
finally:
pub_sock.close()
pull_sock.close()
async def publisher(self, pull_sock, publish_payload):
async def publisher(self, publish_payload, ioloop=None):
if ioloop is None:
ioloop = tornado.ioloop.IOLoop.current()
ioloop.asyncio_loop.set_debug(True)
self.daemon_context = zmq.asyncio.Context()
self.daemon_pull_sock, self.daemon_pub_sock, self.deamon_monitor = self._get_sockets(self.daemon_context, ioloop)
while True:
try:
package = await pull_sock.recv()
log.error("Publisher got package %r %s", package, self.pull_uri)
log.error("Publisher wait package %s", self.pull_uri)
package = await self.daemon_pull_sock.recv()
log.error("Publisher got package %s %r", self.pull_uri, package)
# payload = salt.payload.loads(package)
await publish_payload(package)
except Exception as exc: # pylint: disable=broad-except
@ -922,19 +968,36 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
new socket.
"""
log.debug("Connecting to pub server: %s", self.pull_uri)
self.ctx = zmq.asyncio.Context()
self.sock = self.ctx.socket(zmq.PUSH)
self.sock.setsockopt(zmq.LINGER, -1)
self.sock.setsockopt(zmq.LINGER, 300)
self.sock.connect(self.pull_uri)
return self.sock
async def close(self):
def close(self):
"""
Disconnect an existing publisher socket and remove it from the local
thread's cache.
"""
sock = self.sock
self.sock = None
log.error("Socket close %r", self)
sock.close()
log.error("Socket closed %r", self)
if self.ctx and self.ctx.closed is False:
ctx = self.ctx
self.ctx = None
log.error("Context term %r", self)
ctx.term()
log.error("After context term %r", self)
if self.deamon_pub_sock:
self.deamon_pub_sock.close()
if self.deamon_pull_sock:
self.deamon_pull_sock.close()
if self.daemon_monitor:
self.daemon_monitor.close()
if self.deamon_context:
self.deamon_context.term()
async def publish(self, payload, **kwargs):
"""

View file

@ -410,7 +410,10 @@ class SaltEvent:
"""
if not self.cpub:
return
#if isinstance(self.subscriber, salt.utils.asynchronous.SyncWrapper):
# self.subscriber.close()
#else:
# asyncio.create_task(self.subscriber.close())
self.subscriber.close()
self.subscriber = None
self.pending_events = []
@ -862,18 +865,17 @@ class SaltEvent:
)
msg = salt.utils.stringutils.to_bytes(event, "utf-8")
if self._run_io_loop_sync:
log.error("FIRE EVENT A %r %r", msg, self.pusher)
with salt.utils.asynchronous.current_ioloop(self.io_loop):
try:
# self.pusher.send(msg)
self.pusher.publish(msg)
except Exception as exc: # pylint: disable=broad-except
log.debug(
"Publisher send failed with exception: %s",
exc,
exc_info_on_loglevel=logging.DEBUG,
)
raise
log.error("FIRE EVENT A %r %r", msg, self.pusher.obj)
try:
# self.pusher.send(msg)
self.pusher.publish(msg)
except Exception as exc: # pylint: disable=broad-except
log.debug(
"Publisher send failed with exception: %s",
exc,
exc_info_on_loglevel=logging.DEBUG,
)
raise
else:
log.error("FIRE EVENT B %r %r", msg, self.pusher)
asyncio.create_task(self.pusher.publish(msg))