Merge async work with ipc work

This commit is contained in:
Daniel A. Wozniak 2023-06-18 01:46:56 -07:00 committed by Gareth J. Greenaway
parent cec5aa517d
commit eb8328717d
6 changed files with 293 additions and 355 deletions

View file

@ -699,6 +699,9 @@ class PubServerChannel:
Factory class to create subscription channels to the master's Publisher
"""
def __repr__(self):
return f"<PubServerChannel pub_uri={self.transport.pub_uri} pull_uri={self.transport.pull_uri} at {id(self)}>"
@classmethod
def factory(cls, opts, **kwargs):
if "master_uri" not in opts and "master_uri" in kwargs:
@ -837,8 +840,7 @@ class PubServerChannel:
data, salt.utils.event.tagify("present", "presence")
)
@tornado.gen.coroutine
def publish_payload(self, load, *args):
async def publish_payload(self, load, *args):
load = salt.payload.loads(load)
unpacked_package = self.wrap_payload(load)
try:
@ -849,10 +851,10 @@ class PubServerChannel:
payload = salt.payload.dumps(payload)
if "topic_lst" in unpacked_package:
topic_list = unpacked_package["topic_lst"]
ret = yield self.transport.publish_payload(payload, topic_list)
ret = await self.transport.publish_payload(payload, topic_list)
else:
ret = yield self.transport.publish_payload(payload)
raise tornado.gen.Return(ret)
ret = await self.transport.publish_payload(payload)
return ret
def wrap_payload(self, load):
payload = {"enc": "aes"}
@ -887,7 +889,7 @@ class PubServerChannel:
return int_payload
def publish(self, load):
async def publish(self, load):
"""
Publish "load" to minions
"""
@ -896,5 +898,5 @@ class PubServerChannel:
load.get("jid", None),
repr(load)[:40],
)
load = salt.payload.dumps(load)
self.transport.publish(load)
payload = salt.payload.dumps(load)
await self.transport.publish(payload)

View file

@ -32,7 +32,6 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
import salt.client
self.parse_args()
print("A")
try:
# We don't need to bail on config file permission errors
@ -48,7 +47,6 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
self.exit(2, "{}\n".format(exc))
return
print("B")
if self.options.batch or self.options.static:
# _run_batch() will handle all output and
# exit with the appropriate error condition
@ -65,7 +63,6 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
if self.options.timeout <= 0:
self.options.timeout = self.local_client.opts["timeout"]
print("C")
kwargs = {
"tgt": self.config["tgt"],
"fun": self.config["fun"],
@ -93,7 +90,6 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
else:
kwargs["tgt_type"] = "glob"
print("D")
# If batch_safe_limit is set, check minions matching target and
# potentially switch to batch execution
if self.options.batch_safe_limit > 1:
@ -155,7 +151,6 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
)
return
print("E")
# local will be None when there was an error
if not self.local_client:
return
@ -227,7 +222,7 @@ class SaltCMD(salt.utils.parsers.SaltCMDOptionParser):
AuthorizationError,
SaltInvocationError,
EauthAuthenticationError,
#SaltClientError,
SaltClientError,
) as exc:
print(repr(exc))
ret = str(exc)

View file

@ -2,6 +2,7 @@
This module contains all of the routines needed to set up a master server, this
involves preparing the three listeners and the workers needed by the master.
"""
import asyncio
import collections
import copy
import ctypes
@ -722,11 +723,11 @@ class Master(SMaster):
pub_channels.append(chan)
log.info("Creating master event publisher process")
#self.process_manager.add_process(
# self.process_manager.add_process(
# salt.utils.event.EventPublisher,
# args=(self.opts,),
# name="EventPublisher",
#)
# )
ipc_publisher = salt.transport.publish_server(self.opts)
ipc_publisher.pub_uri = "ipc://{}".format(
@ -737,7 +738,9 @@ class Master(SMaster):
)
self.process_manager.add_process(
ipc_publisher.publish_daemon,
args=[ipc_publisher.publish_payload,],
args=[
ipc_publisher.publish_payload,
],
)
if self.opts.get("reactor"):
@ -1020,8 +1023,7 @@ class MWorker(salt.utils.process.SignalHandlingProcess):
# Tornado knows what to do
pass
@tornado.gen.coroutine
def _handle_payload(self, payload):
async def _handle_payload(self, payload):
"""
The _handle_payload method is the key method used to figure out what
needs to be done with communication to the server
@ -1045,7 +1047,10 @@ class MWorker(salt.utils.process.SignalHandlingProcess):
key = payload["enc"]
load = payload["load"]
ret = {"aes": self._handle_aes, "clear": self._handle_clear}[key](load)
raise tornado.gen.Return(ret)
while self.clear_funcs.tasks:
# dequeue
await self.clear_funcs.tasks.pop(0)
return ret
def _post_stats(self, start, cmd):
"""
@ -1997,6 +2002,8 @@ class ClearFuncs(TransportMethods):
# Make a masterapi object
self.masterapi = salt.daemons.masterapi.LocalFuncs(opts, key)
self.channels = []
self.tasks = []
# self.task_group = asyncio.TaskGroup()
def runner(self, clear_load):
"""
@ -2371,8 +2378,8 @@ class ClearFuncs(TransportMethods):
chan = salt.channel.server.PubServerChannel.factory(opts)
self.channels.append(chan)
for chan in self.channels:
log.error("SEND PUB %r", load)
chan.publish(load)
task = asyncio.create_task(chan.publish(load))
self.tasks.append(task)
@property
def ssh_client(self):

View file

@ -285,7 +285,7 @@ def get_proc_dir(cachedir, **kwargs):
mode = kwargs.pop("mode", None)
if mode is None:
eode = {}
mode = {}
else:
mode = {"mode": mode}
@ -1044,26 +1044,31 @@ class MinionManager(MinionBase):
def _bind(self):
# start up the event publisher, so we can see events during startup
#self.event_publisher = salt.utils.event.AsyncEventPublisher(
# self.event_publisher = salt.utils.event.AsyncEventPublisher(
# self.opts,
# io_loop=self.io_loop,
#)
# )
def target():
import hashlib
self.opts['publish_port'] = 12321
self.opts["publish_port"] = 12321
hash_type = getattr(hashlib, self.opts["hash_type"])
ipc_publisher = salt.transport.publish_server(self.opts)
id_hash = hash_type(
salt.utils.stringutils.to_bytes(self.opts["id"])
).hexdigest()[:10]
epub_sock_path = "ipc://{}".format(os.path.join(
self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
))
epub_sock_path = "ipc://{}".format(
os.path.join(
self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
)
)
if os.path.exists(epub_sock_path):
os.unlink(epub_sock_path)
epull_sock_path = "ipc://{}".format(os.path.join(
self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
))
epull_sock_path = "ipc://{}".format(
os.path.join(
self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
)
)
ipc_publisher.pub_uri = epub_sock_path
ipc_publisher.pull_uri = epull_sock_path
ipc_publisher.publish_daemon(ipc_publisher.publish_payload)

View file

@ -2,6 +2,7 @@
Zeromq transport classes
"""
import asyncio
import asyncio.exceptions
import errno
import hashlib
import logging
@ -9,16 +10,14 @@ import os
import signal
import sys
import threading
import asyncio
import asyncio.exceptions
from random import randint
import tornado
import tornado.concurrent
import tornado.gen
import tornado.ioloop
import zmq.error
import zmq.asyncio
import zmq.error
import zmq.eventloop.zmqstream
import salt.payload
@ -114,40 +113,31 @@ class PublishClient(salt.transport.base.PublishClient):
"connect",
"connect_uri",
"recv",
"close",
]
close_methods = [
"close",
]
#<<<<<<< HEAD
# def __init__(self, opts, io_loop, **kwargs):
# super().__init__(opts, io_loop, **kwargs)
# self.callbacks = {}
# self.opts = opts
# self.io_loop = io_loop
#=======
def _legacy_setup(self,
_id,
role,
zmq_filtering=False,
tcp_keepalive=True,
tcp_keepalive_idle=300,
tcp_keepalive_cnt=-1,
tcp_keepalive_intvl=-1,
recon_default=1000,
recon_max=10000,
recon_randomize=True,
ipv6=None,
master_ip="127.0.0.1",
zmq_monitor=False,
**extras
):
self.hexid = hashlib.sha1(
salt.utils.stringutils.to_bytes(_id)
).hexdigest()
def _legacy_setup(
self,
_id,
role,
zmq_filtering=False,
tcp_keepalive=True,
tcp_keepalive_idle=300,
tcp_keepalive_cnt=-1,
tcp_keepalive_intvl=-1,
recon_default=1000,
recon_max=10000,
recon_randomize=True,
ipv6=None,
master_ip="127.0.0.1",
zmq_monitor=False,
**extras,
):
self.hexid = hashlib.sha1(salt.utils.stringutils.to_bytes(_id)).hexdigest()
self._closing = False
import zmq.asyncio
self.context = zmq.asyncio.Context()
self._socket = self.context.socket(zmq.SUB)
if zmq_filtering:
@ -163,22 +153,14 @@ class PublishClient(salt.transport.base.PublishClient):
self._socket.setsockopt(zmq.SUBSCRIBE, b"")
if _id:
self._socket.setsockopt(
zmq.IDENTITY, salt.utils.stringutils.to_bytes(_id)
)
self._socket.setsockopt(zmq.IDENTITY, salt.utils.stringutils.to_bytes(_id))
# TODO: cleanup all the socket opts stuff
if hasattr(zmq, "TCP_KEEPALIVE"):
self._socket.setsockopt(zmq.TCP_KEEPALIVE, tcp_keepalive)
self._socket.setsockopt(
zmq.TCP_KEEPALIVE_IDLE, tcp_keepalive_idle
)
self._socket.setsockopt(
zmq.TCP_KEEPALIVE_CNT, tcp_keepalive_cnt
)
self._socket.setsockopt(
zmq.TCP_KEEPALIVE_INTVL, tcp_keepalive_intvl
)
self._socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, tcp_keepalive_idle)
self._socket.setsockopt(zmq.TCP_KEEPALIVE_CNT, tcp_keepalive_cnt)
self._socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, tcp_keepalive_intvl)
if recon_randomize:
recon_delay = randint(
@ -193,36 +175,27 @@ class PublishClient(salt.transport.base.PublishClient):
recon_delay,
)
log.debug("Setting zmq_reconnect_ivl to '%sms'", recon_delay)
self._socket.setsockopt(zmq.RECONNECT_IVL, recon_delay)
log.debug("Setting zmq_reconnect_ivl to '%sms'", recon_delay)
self._socket.setsockopt(zmq.RECONNECT_IVL, recon_delay)
if hasattr(zmq, "RECONNECT_IVL_MAX"):
log.debug(
"Setting zmq_reconnect_ivl_max to '%sms'",
recon_delay + recon_max,
)
if hasattr(zmq, "RECONNECT_IVL_MAX"):
log.debug(
"Setting zmq_reconnect_ivl_max to '%sms'",
recon_delay + recon_max,
)
self._socket.setsockopt(zmq.RECONNECT_IVL_MAX, recon_max)
self._socket.setsockopt(zmq.RECONNECT_IVL_MAX, recon_max)
if (ipv6 is True or ":" in master_ip) and hasattr(
zmq, "IPV4ONLY"
):
if (ipv6 is True or ":" in master_ip) and hasattr(zmq, "IPV4ONLY"):
# IPv6 sockets work for both IPv6 and IPv4 addresses
self._socket.setsockopt(zmq.IPV4ONLY, 0)
#<<<<<<< HEAD
# # if HAS_ZMQ_MONITOR and self.opts["zmq_monitor"]:
# # self._monitor = ZeroMQSocketMonitor(self._socket)
# # self._monitor.start_io_loop(self.io_loop)
# self._monitor = None
# self.task = None
#=======
if HAS_ZMQ_MONITOR and zmq_monitor:
self._monitor = ZeroMQSocketMonitor(self._socket)
self._monitor.start_io_loop(self.io_loop)
self.poller = zmq.Poller()
self.poller.register(self._socket, zmq.POLLIN)
if HAS_ZMQ_MONITOR and zmq_monitor:
self._monitor = ZeroMQSocketMonitor(self._socket)
self._monitor.start_io_loop(self.io_loop)
def __init__(self, opts, io_loop, **kwargs):
super().__init__(opts, io_loop, **kwargs)
@ -236,7 +209,7 @@ class PublishClient(salt.transport.base.PublishClient):
self.connect_called = False
self.callbacks = {}
def close(self):
async def close(self):
if self._closing is True:
return
self._closing = True
@ -263,7 +236,8 @@ class PublishClient(salt.transport.base.PublishClient):
):
self.connect_called = True
self.publish_port = publish_port
log.error(
self.uri = self.master_pub
log.debug(
"Connecting the Minion to the Master publish port, using the URI: %s",
self.master_pub,
)
@ -272,11 +246,9 @@ class PublishClient(salt.transport.base.PublishClient):
async def connect_uri(self, uri, connect_callback=None, disconnect_callback=None):
self.connect_called = True
log.error(
"Connecting the Minion to the Master publish port, using the URI: %s",
uri
)
#log.debug("%r connecting to %s", self, self.master_pub)
log.debug("Connecting the publisher client to: %s", uri)
# log.debug("%r connecting to %s", self, self.master_pub)
self.uri = uri
self._socket.connect(uri)
if connect_callback:
connect_callback(True)
@ -299,67 +271,55 @@ class PublishClient(salt.transport.base.PublishClient):
:param list messages: A list of messages to be decoded
"""
messages_len = len(messages)
# if it was one message, then its old style
if messages_len == 1:
payload = salt.payload.loads(messages[0])
# 2 includes a header which says who should do it
elif messages_len == 2:
message_target = salt.utils.stringutils.to_str(messages[0])
if (
self.opts.get("__role") != "syndic"
and message_target not in ("broadcast", self.hexid)
) or (
self.opts.get("__role") == "syndic"
and message_target not in ("broadcast", "syndic")
):
log.debug("Publish received for not this minion: %s", message_target)
return None
payload = salt.payload.loads(messages[1])
else:
raise Exception(
"Invalid number of messages ({}) in zeromq pubmessage from master".format(
len(messages_len)
if isinstance(messages, list):
messages_len = len(messages)
# if it was one message, then its old style
if messages_len == 1:
payload = salt.payload.loads(messages[0])
# 2 includes a header which says who should do it
elif messages_len == 2:
message_target = salt.utils.stringutils.to_str(messages[0])
if (
self.opts.get("__role") != "syndic"
and message_target not in ("broadcast", self.hexid)
) or (
self.opts.get("__role") == "syndic"
and message_target not in ("broadcast", "syndic")
):
log.debug(
"Publish received for not this minion: %s", message_target
)
return None
payload = salt.payload.loads(messages[1])
else:
raise Exception(
"Invalid number of messages ({}) in zeromq pubmessage from master".format(
len(messages_len)
)
)
)
else:
payload = salt.payload.loads(messages)
# Yield control back to the caller. When the payload has been decoded, assign
# the decoded payload to 'ret' and resume operation
return payload
#@property
#def stream(self):
# """
# Return the current zmqstream, creating one if necessary
# """
# if not hasattr(self, "_stream"):
# self._stream = zmq.eventloop.zmqstream.ZMQStream(
# self._socket, io_loop=self.io_loop
# )
# return self._stream
#def on_recv(self, callback):
# """
# Register a callback for received messages (that we didn't initiate)
async def recv(self, timeout=None):
log.error("SOCK %r %s", self._socket, self.connect_called)
if timeout == 0:
events = self.poller.poll(timeout=timeout)
log.error("NOBLOCK %r", events)
if self._socket in events and events[self._socket] == zmq.POLLIN:
if events:
return await self._socket.recv()
elif timeout:
try:
return await asyncio.wait_for(self._socket.recv(), timeout=timeout)
except asyncio.exceptions.TimeoutError:
log.error("TIMEOUT")
log.trace("PublishClient recieve timedout: %d", timeout)
else:
return await self._socket.recv()
async def send(self, msg):
return
await self._socket.send(msg)
# raise Exception("Send not supported")
# await self._socket.send(msg)
def on_recv(self, callback):
@ -372,24 +332,29 @@ class PublishClient(salt.transport.base.PublishClient):
running.set()
async def consume(running):
while running.is_set():
try:
msg = await self._socket.recv_multipart()
except zmq.error.ZMQError:
# We've disconnected just die
break
except Exception: # pylint: disable=broad-except
log.error("Exception while reading", exc_info=True)
break
try:
await callback(msg)
except Exception: # pylint: disable=broad-except
log.error("Exception while running callback", exc_info=True)
log.debug("Callback done %r", callback)
try:
while running.is_set():
try:
msg = await self._socket.recv()
except zmq.error.ZMQError as exc:
log.error("ZMQERROR, %s", exc)
# We've disconnected just die
break
except Exception: # pylint: disable=broad-except
break
try:
await callback(msg)
except Exception: # pylint: disable=broad-except
log.error("Exception while running callback", exc_info=True)
log.debug("Callback done %r", callback)
except Exception as exc: # pylint: disable=broad-except
log.error("CONSUME Exception %s %s", self.uri, exc, exc_info=True)
log.error("CONSUME ENDING %s", self.uri)
task = self.io_loop.spawn_callback(consume, running)
self.callbacks[callback] = running, task
class RequestServer(salt.transport.base.DaemonizedRequestServer):
def __init__(self, opts): # pylint: disable=W0231
self.opts = opts
@ -511,7 +476,7 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer):
they are picked up off the wire
:param IOLoop io_loop: An instance of a Tornado IOLoop, to handle event scheduling
"""
#context = zmq.Context(1)
# context = zmq.Context(1)
context = zmq.asyncio.Context()
self._socket = context.socket(zmq.REP)
# Linger -1 means we'll never discard messages.
@ -533,9 +498,11 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer):
):
os.chmod(os.path.join(self.opts["sock_dir"], "workers.ipc"), 0o600)
self.message_handler = message_handler
async def callback():
self.task = asyncio.create_task(self.request_handler())
await self.task
io_loop.add_callback(callback)
async def request_handler(self):
@ -544,12 +511,10 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer):
reply = await self.handle_message(None, request)
await self._socket.send(self.encode_payload(reply))
async def handle_message(self, stream, payload):
payload = self.decode_payload(payload)
return await self.message_handler(payload)
def encode_payload(self, payload):
return salt.payload.dumps(payload)
@ -717,15 +682,29 @@ class ZeroMQSocketMonitor:
"""
self._socket = socket
self._monitor_socket = self._socket.get_monitor_socket()
self._monitor_stream = None
self._monitor_task = None
self._running = asyncio.Event()
def start_io_loop(self, io_loop):
log.trace("Event monitor start!")
return
self._monitor_stream = zmq.eventloop.zmqstream.ZMQStream(
self._monitor_socket, io_loop=io_loop
)
self._monitor_stream.on_recv(self.monitor_callback)
self._running.set()
io_loop.spawn_callback(self.consume)
async def consume(self):
while self._running.is_set():
try:
if self._monitor_socket.poll():
msg = await self._monitor_socket.recv_multipart()
self.monitor_callback(msg)
else:
await asyncio.sleep(0.3)
except zmq.error.ZMQError as exc:
log.error("ZmqMonitor, %s", exc)
# We've disconnected just die
break
except Exception as exc: # pylint: disable=broad-except
log.error("ZmqMonitor, %s", exc)
break
def start_poll(self):
log.trace("Event monitor start!")
@ -761,10 +740,8 @@ class ZeroMQSocketMonitor:
return
self._socket.disable_monitor()
self._socket = None
self._running.clear()
self._monitor_socket = None
if self._monitor_stream is not None:
self._monitor_stream.close()
self._monitor_stream = None
log.trace("Event monitor done!")
@ -773,7 +750,14 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
Encapsulate synchronous operations for a publisher channel
"""
_sock_data = threading.local()
# _sock_data = threading.local()
async_methods = [
"publish",
"close",
]
close_methods = [
"close",
]
def __init__(self, opts):
self.opts = opts
@ -788,9 +772,11 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
interface = self.opts.get("interface", "127.0.0.1")
publish_port = self.opts.get("publish_port", 4560)
self.pub_uri = f"tcp://{interface}:{publish_port}"
self.ctx = zmq.asyncio.Context()
self.sock = None
def connect(self):
return tornado.gen.sleep(5)
def __repr__(self):
return f"<PublishServer pub_uri={self.pub_uri} pull_uri={self.pull_uri} at {id(self)}"
def publish_daemon(
self,
@ -810,7 +796,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
monitor = ZeroMQSocketMonitor(pub_sock)
monitor.start_io_loop(ioloop)
_set_tcp_keepalive(pub_sock, self.opts)
self.dpub_sock = pub_sock #= zmq.eventloop.zmqstream.ZMQStream(pub_sock)
self.dpub_sock = pub_sock # = zmq.eventloop.zmqstream.ZMQStream(pub_sock)
# if 2.1 >= zmq < 3.0, we only have one HWM setting
try:
pub_sock.setsockopt(zmq.HWM, self.opts.get("pub_hwm", 1000))
@ -829,123 +815,92 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
# Prepare minion pull socket
pull_sock = context.socket(zmq.PULL)
pull_sock.setsockopt(zmq.LINGER, -1)
#pull_sock = zmq.eventloop.zmqstream.ZMQStream(pull_sock)
# pull_sock = zmq.eventloop.zmqstream.ZMQStream(pull_sock)
pull_sock.setsockopt(zmq.LINGER, -1)
salt.utils.zeromq.check_ipc_path_max_len(self.pull_uri)
# Start the minion command publisher
# Securely create socket
with salt.utils.files.set_umask(0o177):
import pathlib
log.info("Starting the Salt Publisher on %s", self.pub_uri)
pub_sock.bind(self.pub_uri)
if 'ipc://' in self.pub_uri:
if "ipc://" in self.pub_uri:
pub_path = self.pub_uri.replace("ipc:", "")
#pathlib.Path(pub_path).touch()
os.chmod( # nosec
pub_path,
0o600,
)
log.info("Starting the Salt Puller on %s", self.pull_uri)
pull_sock.bind(self.pull_uri)
if 'ipc://' in self.pull_uri:
if "ipc://" in self.pull_uri:
pull_path = self.pull_uri.replace("ipc:", "")
#pathlib.Path(pull_path).touch()
os.chmod( # nosec
pull_path,
0o600,
)
async def on_recv(packages):
for package in packages:
log.error("PACAKGE %s %s %r", self.pull_uri, self.pub_uri, package)
# payload = salt.payload.loads(package)
await publish_payload(payload)
async def run_publisher():
await self.publisher(pull_sock, publish_payload)
self.task = None
async def callback():
self.task = asyncio.create_task(self.publisher(pull_sock, publish_payload))
ioloop.add_callback(callback)
ioloop.add_callback(self.publisher, pull_sock, publish_payload)
try:
ioloop.start()
finally:
pub_sock.close()
pull_sock.close()
if self.task:
self.task.cancel()
# @property
# def pull_uri(self):
# if self.opts.get("ipc_mode", "") == "tcp":
# pull_uri = "tcp://127.0.0.1:{}".format(
# self.opts.get("tcp_master_publish_pull", 4514)
# )
# else:
# pull_uri = "ipc://{}".format(
# os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
# )
# return pull_uri
#
# @property
# def pub_uri(self):
# return "tcp://{interface}:{publish_port}".format(**self.opts)
async def publisher(self, pull_sock, publish_payload):
while True:
try:
package = await pull_sock.recv()
payload = salt.payload.loads(package)
await publish_payload(payload)
except Exception as exc:
log.error("Exception in publisher %s %s", self.pull_uri, exc, exc_info=True)
@property
def pull_uri(self):
if self.opts.get("ipc_mode", "") == "tcp":
pull_uri = "tcp://127.0.0.1:{}".format(
self.opts.get("tcp_master_publish_pull", 4514)
)
else:
pull_uri = "ipc://{}".format(
os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
)
return pull_uri
@property
def pub_uri(self):
return "tcp://{interface}:{publish_port}".format(**self.opts)
log.error("Publisher got package %r %s", package, self.pull_uri)
# payload = salt.payload.loads(package)
await publish_payload(package)
except Exception as exc: # pylint: disable=broad-except
log.error(
"Exception in publisher %s %s", self.pull_uri, exc, exc_info=True
)
async def publish_payload(self, payload, topic_list=None):
#payload = salt.payload.dumps(payload)
if self.opts["zmq_filtering"]:
if topic_list:
for topic in topic_list:
log.trace("Sending filtered data over publisher %s", self.pub_uri)
# zmq filters are substring match, hash the topic
# to avoid collisions
htopic = salt.utils.stringutils.to_bytes(
hashlib.sha1(salt.utils.stringutils.to_bytes(topic)).hexdigest()
log.error(f"Publish payload %s %r", self.pub_uri, payload)
try:
# payload = salt.payload.dumps(payload)
if self.opts["zmq_filtering"]:
if topic_list:
for topic in topic_list:
log.trace(
"Sending filtered data over publisher %s", self.pub_uri
)
# zmq filters are substring match, hash the topic
# to avoid collisions
htopic = salt.utils.stringutils.to_bytes(
hashlib.sha1(
salt.utils.stringutils.to_bytes(topic)
).hexdigest()
)
await self.dpub_sock.send(htopic, flags=zmq.SNDMORE)
await self.dpub_sock.send(payload)
log.trace("Filtered data has been sent")
# Syndic broadcast
if self.opts.get("order_masters"):
log.trace("Sending filtered data to syndic")
await self.dpub_sock.send(b"syndic", flags=zmq.SNDMORE)
await self.dpub_sock.send(payload)
log.trace("Filtered data has been sent to syndic")
# otherwise its a broadcast
else:
# TODO: constants file for "broadcast"
log.trace(
"Sending broadcasted data over publisher %s", self.pub_uri
)
await self.dpub_sock.send(htopic, flags=zmq.SNDMORE)
await self.dpub_sock.send(b"broadcast", flags=zmq.SNDMORE)
await self.dpub_sock.send(payload)
log.trace("Filtered data has been sent")
# Syndic broadcast
if self.opts.get("order_masters"):
log.trace("Sending filtered data to syndic")
await self.dpub_sock.send(b"syndic", flags=zmq.SNDMORE)
await self.dpub_sock.send(payload)
log.trace("Filtered data has been sent to syndic")
# otherwise its a broadcast
log.trace("Broadcasted data has been sent")
else:
# TODO: constants file for "broadcast"
log.trace("Sending broadcasted data over publisher %s", self.pub_uri)
await self.dpub_sock.send(b"broadcast", flags=zmq.SNDMORE)
log.trace("Sending ZMQ-unfiltered data over publisher %s", self.pub_uri)
await self.dpub_sock.send(payload)
log.trace("Broadcasted data has been sent")
else:
log.trace("Sending ZMQ-unfiltered data over publisher %s", self.pub_uri)
await self.dpub_sock.send(payload)
log.trace("Unfiltered data has been sent")
log.trace("Unfiltered data has been sent")
except Exception as exc: # pylint: disable=broad-except
log.error("pub payload %s", exc, exc_info=True)
def pre_fork(self, process_manager):
"""
@ -960,70 +915,43 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
args=(self.publish_payload,),
)
@property
def pub_sock(self):
"""
This thread's zmq publisher socket. This socket is stored on the class
so that multiple instantiations in the same thread will re-use a single
zmq socket.
"""
try:
return self._sock_data.sock
except AttributeError:
pass
def pub_connect(self):
def connect(self):
"""
Create and connect this thread's zmq socket. If a publisher socket
already exists "pub_close" is called before creating and connecting a
new socket.
"""
if self.pub_sock:
self.pub_close()
ctx = zmq.Context()
self._sock_data.sock = ctx.socket(zmq.PUSH)
self.pub_sock.setsockopt(zmq.LINGER, -1)
#if self.opts.get("ipc_mode", "") == "tcp":
# pull_uri = "tcp://127.0.0.1:{}".format(
# self.opts.get("tcp_master_publish_pull", 4514)
# )
#else:
# pull_uri = "ipc://{}".format(
# os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
# )
log.debug("Connecting to pub server: %s", self.pull_uri)
self.pub_sock.connect(self.pull_uri)
return self._sock_data.sock
self.sock = self.ctx.socket(zmq.PUSH)
self.sock.setsockopt(zmq.LINGER, -1)
self.sock.connect(self.pull_uri)
return self.sock
def pub_close(self):
async def close(self):
"""
Disconnect an existing publisher socket and remove it from the local
thread's cache.
"""
if hasattr(self._sock_data, "sock"):
self._sock_data.sock.close()
delattr(self._sock_data, "sock")
sock = self.sock
self.sock = None
sock.close()
def publish(self, payload, **kwargs):
async def publish(self, payload, **kwargs):
"""
Publish "load" to minions. This send the load to the publisher daemon
process with does the actual sending to minions.
:param dict load: A load to be sent across the wire to minions
"""
if not self.pub_sock:
self.pub_connect()
log.error("Payload %r", payload)
self.pub_sock.send(payload)
log.debug("Sent payload to publish daemon.")
if not self.sock:
self.connect()
log.error("%r send %r", self, payload)
await self.sock.send(payload)
@property
def topic_support(self):
return self.opts.get("zmq_filtering", False)
def close(self):
self.pub_close()
def __enter__(self):
return self

View file

@ -49,6 +49,7 @@ Namespaced tag
"""
import asyncio
import atexit
import contextlib
import datetime
@ -352,18 +353,16 @@ class SaltEvent:
if self.cpub:
return True
log.error("WTF A")
if self._run_io_loop_sync:
log.error("WTF B")
with salt.utils.asynchronous.current_ioloop(self.io_loop):
if self.subscriber is None:
#self.subscriber = salt.utils.asynchronous.SyncWrapper(
# self.subscriber = salt.utils.asynchronous.SyncWrapper(
# salt.transport.ipc.IPCMessageSubscriber,
# args=(self.puburi,),
# kwargs={"io_loop": self.io_loop},
# loop_kwarg="io_loop",
#)
#self.subscriber = salt.transport.publish_client(self.opts)
# )
# self.subscriber = salt.transport.publish_client(self.opts)
self.subscriber = salt.utils.asynchronous.SyncWrapper(
salt.transport.publish_client,
args=(self.opts,),
@ -371,7 +370,7 @@ class SaltEvent:
loop_kwarg="io_loop",
)
try:
#self.subscriber.connect(timeout=timeout)
# self.subscriber.connect(timeout=timeout)
puburi = "ipc://{}".format(self.puburi)
self.subscriber.connect_uri(puburi)
self.cpub = True
@ -393,12 +392,12 @@ class SaltEvent:
self.opts["master_ip"] = ""
self.subscriber = salt.transport.publish_client(self.opts, self.io_loop)
puburi = "ipc://{}".format(self.puburi)
#self.io_loop.run_sync(self.subscriber.connect_uri, puburi)
# self.io_loop.run_sync(self.subscriber.connect_uri, puburi)
self.io_loop.spawn_callback(self.subscriber.connect_uri, puburi)
log.error("WTF")
#self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
# self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
# self.puburi, io_loop=self.io_loop
#)
# )
# For the asynchronous case, the connect will be defered to when
# set_event_handler() is invoked.
@ -426,45 +425,39 @@ class SaltEvent:
return True
if self._run_io_loop_sync:
with salt.utils.asynchronous.current_ioloop(self.io_loop):
if self.pusher is None:
self.pusher = salt.transport.publish_server(self.opts)
self.pusher.pub_uri = "ipc://{}".format(
os.path.join(self.opts["sock_dir"], "master_event_pub.ipc")
)
self.pusher.pull_uri = "ipc://{}".format(
os.path.join(self.opts["sock_dir"], "master_event_pull.ipc")
)
#self.pusher = salt.utils.asynchronous.SyncWrapper(
# salt.transport.ipc.IPCMessageClient,
# args=(self.pulluri,),
# kwargs={"io_loop": self.io_loop},
# loop_kwarg="io_loop",
#)
try:
#self.pusher.connect(timeout=timeout)
self.pusher.connect()
self.cpush = True
except tornado.iostream.StreamClosedError as exc:
log.debug("Unable to connect pusher: %s", exc)
except Exception as exc: # pylint: disable=broad-except
log.error(
"Unable to connect pusher: %s",
exc,
exc_info_on_loglevel=logging.DEBUG,
)
if self.pusher is None:
self.pusher = salt.utils.asynchronous.SyncWrapper(
salt.transport.publish_server,
args=(self.opts,),
)
self.pusher.obj.pub_uri = "ipc://{}".format(self.puburi)
self.pusher.obj.pull_uri = "ipc://{}".format(self.pulluri)
# self.pusher = salt.utils.asynchronous.SyncWrapper(
# salt.transport.ipc.IPCMessageClient,
# args=(self.pulluri,),
# kwargs={"io_loop": self.io_loop},
# loop_kwarg="io_loop",
# )
try:
# self.pusher.connect(timeout=timeout)
self.pusher.connect()
self.cpush = True
except tornado.iostream.StreamClosedError as exc:
log.debug("Unable to connect pusher: %s", exc)
except Exception as exc: # pylint: disable=broad-except
log.error(
"Unable to connect pusher: %s",
exc,
exc_info_on_loglevel=logging.DEBUG,
)
else:
if self.pusher is None:
#self.pusher = salt.transport.ipc.IPCMessageClient(
# self.pusher = salt.transport.ipc.IPCMessageClient(
# self.pulluri, io_loop=self.io_loop
#)
# )
self.pusher = salt.transport.publish_server(self.opts)
self.pusher.pub_uri = "ipc://{}".format(
os.path.join(self.opts["sock_dir"], "master_event_pub.ipc")
)
self.pusher.pull_uri = "ipc://{}".format(
os.path.join(self.opts["sock_dir"], "master_event_pull.ipc")
)
self.pusher.pub_uri = "ipc://{}".format(self.puburi)
self.pusher.pull_uri = "ipc://{}".format(self.pulluri)
# For the asynchronous case, the connect will be deferred to when
# fire_event() is invoked.
self.cpush = True
@ -596,12 +589,10 @@ class SaltEvent:
try:
if not self.cpub and not self.connect_pub(timeout=wait):
break
#riraw = self.subscriber.read(timeout=wait)
log.warning("Subscriber %r", self.subscriber)
# riraw = self.subscriber.read(timeout=wait)
raw = self.subscriber.recv(timeout=wait)
if raw is None:
break
print(raw)
mtag, data = self.unpack(raw)
ret = {"data": data, "tag": mtag}
except KeyboardInterrupt:
@ -798,7 +789,12 @@ class SaltEvent:
is_msgpacked=True,
use_bin_type=True,
)
log.error("Sending event(fire_event_async): tag = %s; data = %s %r", tag, data, self.pusher)
log.error(
"Sending event(fire_event_async): tag = %s; data = %s %r",
tag,
data,
self.pusher,
)
event = b"".join(
[
salt.utils.stringutils.to_bytes(tag),
@ -807,9 +803,9 @@ class SaltEvent:
]
)
msg = salt.utils.stringutils.to_bytes(event, "utf-8")
self.pusher.publish(msg, noserial=True)
#ret = yield self.pusher.send(msg)
#if cb is not None:
self.pusher.publish(msg)
# ret = yield self.pusher.send(msg)
# if cb is not None:
# cb(ret)
def fire_event(self, data, tag, timeout=1000):
@ -851,7 +847,12 @@ class SaltEvent:
is_msgpacked=True,
use_bin_type=True,
)
log.error("Sending event(fire_event): tag = %s; data = %s %s", tag, data, self.pusher.pull_uri)
log.error(
"Sending event(fire_event): tag = %s; data = %s %s",
tag,
data,
self.pusher.pull_uri,
)
event = b"".join(
[
salt.utils.stringutils.to_bytes(tag),
@ -864,7 +865,7 @@ class SaltEvent:
log.error("FIRE EVENT A %r %r", msg, self.pusher)
with salt.utils.asynchronous.current_ioloop(self.io_loop):
try:
#self.pusher.send(msg)
# self.pusher.send(msg)
self.pusher.publish(msg)
except Exception as exc: # pylint: disable=broad-except
log.debug(
@ -875,8 +876,8 @@ class SaltEvent:
raise
else:
log.error("FIRE EVENT B %r %r", msg, self.pusher)
self.pusher.publish(msg)
#self.io_loop.spawn_callback(self.pusher.send, msg)
asyncio.create_task(self.pusher.publish(msg))
# self.io_loop.spawn_callback(self.pusher.send, msg)
return True
def fire_master(self, data, tag, timeout=1000):
@ -990,7 +991,7 @@ class SaltEvent:
if not self.cpub:
self.connect_pub()
# This will handle reconnects
#return self.subscriber.read_async(event_handler)
# return self.subscriber.read_async(event_handler)
self.subscriber.on_recv(event_handler)
# pylint: disable=W1701
@ -1090,7 +1091,7 @@ class MinionEvent(SaltEvent):
)
#class AsyncEventPublisher:
# class AsyncEventPublisher:
# """
# An event publisher class intended to run in an ioloop (within a single process)
#
@ -1191,7 +1192,7 @@ class MinionEvent(SaltEvent):
# self.puller.close()
#
#
#class EventPublisher(salt.utils.process.SignalHandlingProcess):
# class EventPublisher(salt.utils.process.SignalHandlingProcess):
# """
# The interface that takes master events and republishes them out to anyone
# who wants to listen