Event unit tests

This commit is contained in:
Jenkins 2023-06-15 23:54:49 -07:00 committed by Gareth J. Greenaway
parent fb4ce8a741
commit 3a4533ee0c
4 changed files with 159 additions and 69 deletions

View file

@ -9,6 +9,8 @@ import os
import signal
import sys
import threading
import asyncio
import asyncio.exceptions
from random import randint
import tornado
@ -110,28 +112,48 @@ class PublishClient(salt.transport.base.PublishClient):
async_methods = [
"connect",
"connect_uri",
"recv",
]
close_methods = [
"close",
]
def __init__(self, opts, io_loop, **kwargs):
super().__init__(opts, io_loop, **kwargs)
self.callbacks = {}
self.opts = opts
self.io_loop = io_loop
#<<<<<<< HEAD
# def __init__(self, opts, io_loop, **kwargs):
# super().__init__(opts, io_loop, **kwargs)
# self.callbacks = {}
# self.opts = opts
# self.io_loop = io_loop
#=======
def _legacy_setup(self,
_id,
role,
zmq_filtering=False,
tcp_keepalive=True,
tcp_keepalive_idle=300,
tcp_keepalive_cnt=-1,
tcp_keepalive_intvl=-1,
recon_default=1000,
recon_max=10000,
recon_randomize=True,
ipv6=None,
master_ip="127.0.0.1",
zmq_monitor=False,
**extras
):
self.hexid = hashlib.sha1(
salt.utils.stringutils.to_bytes(self.opts["id"])
salt.utils.stringutils.to_bytes(_id)
).hexdigest()
self._closing = False
import zmq.asyncio
self.context = zmq.asyncio.Context()
self._socket = self.context.socket(zmq.SUB)
if self.opts["zmq_filtering"]:
if zmq_filtering:
# TODO: constants file for "broadcast"
self._socket.setsockopt(zmq.SUBSCRIBE, b"broadcast")
if self.opts.get("__role") == "syndic":
if role == "syndic":
self._socket.setsockopt(zmq.SUBSCRIBE, b"syndic")
else:
self._socket.setsockopt(
@ -140,30 +162,28 @@ class PublishClient(salt.transport.base.PublishClient):
else:
self._socket.setsockopt(zmq.SUBSCRIBE, b"")
self._socket.setsockopt(
zmq.IDENTITY, salt.utils.stringutils.to_bytes(self.opts["id"])
)
if _id:
self._socket.setsockopt(
zmq.IDENTITY, salt.utils.stringutils.to_bytes(_id)
)
# TODO: cleanup all the socket opts stuff
if hasattr(zmq, "TCP_KEEPALIVE"):
self._socket.setsockopt(zmq.TCP_KEEPALIVE, self.opts["tcp_keepalive"])
self._socket.setsockopt(zmq.TCP_KEEPALIVE, tcp_keepalive)
self._socket.setsockopt(
zmq.TCP_KEEPALIVE_IDLE, self.opts["tcp_keepalive_idle"]
zmq.TCP_KEEPALIVE_IDLE, tcp_keepalive_idle
)
self._socket.setsockopt(
zmq.TCP_KEEPALIVE_CNT, self.opts["tcp_keepalive_cnt"]
zmq.TCP_KEEPALIVE_CNT, tcp_keepalive_cnt
)
self._socket.setsockopt(
zmq.TCP_KEEPALIVE_INTVL, self.opts["tcp_keepalive_intvl"]
zmq.TCP_KEEPALIVE_INTVL, tcp_keepalive_intvl
)
recon_delay = self.opts.get("recon_default", 1)
recon_max = self.opts.get("recon_max", 1)
if self.opts.get("recon_randomize"):
if recon_randomize:
recon_delay = randint(
recon_delay,
recon_delay + recon_max,
recon_default,
recon_default + recon_max,
)
log.debug(
@ -184,17 +204,36 @@ class PublishClient(salt.transport.base.PublishClient):
self._socket.setsockopt(zmq.RECONNECT_IVL_MAX, recon_max)
if (self.opts["ipv6"] is True or ":" in self.opts.get("master_ip", "")) and hasattr(
if (ipv6 is True or ":" in master_ip) and hasattr(
zmq, "IPV4ONLY"
):
# IPv6 sockets work for both IPv6 and IPv4 addresses
self._socket.setsockopt(zmq.IPV4ONLY, 0)
# if HAS_ZMQ_MONITOR and self.opts["zmq_monitor"]:
# self._monitor = ZeroMQSocketMonitor(self._socket)
# self._monitor.start_io_loop(self.io_loop)
self._monitor = None
self.task = None
#<<<<<<< HEAD
# # if HAS_ZMQ_MONITOR and self.opts["zmq_monitor"]:
# # self._monitor = ZeroMQSocketMonitor(self._socket)
# # self._monitor.start_io_loop(self.io_loop)
# self._monitor = None
# self.task = None
#=======
if HAS_ZMQ_MONITOR and zmq_monitor:
self._monitor = ZeroMQSocketMonitor(self._socket)
self._monitor.start_io_loop(self.io_loop)
self.poller = zmq.Poller()
self.poller.register(self._socket, zmq.POLLIN)
def __init__(self, opts, io_loop, **kwargs):
super().__init__(opts, io_loop, **kwargs)
self.opts = opts
self.io_loop = io_loop
self._legacy_setup(
_id=opts.get("id", ""),
role=opts.get("__role", ""),
**opts,
)
self.connect_called = False
def close(self):
if self._closing is True:
@ -221,6 +260,7 @@ class PublishClient(salt.transport.base.PublishClient):
async def connect(
self, publish_port, connect_callback=None, disconnect_callback=None
):
self.connect_called = True
self.publish_port = publish_port
log.error(
"Connecting the Minion to the Master publish port, using the URI: %s",
@ -229,8 +269,8 @@ class PublishClient(salt.transport.base.PublishClient):
self._socket.connect(self.master_pub)
# await connect_callback(True)
@tornado.gen.coroutine
def connect_uri(self, uri, connect_callback=None, disconnect_callback=None):
async def connect_uri(self, uri, connect_callback=None, disconnect_callback=None):
self.connect_called = True
log.error(
"Connecting the Minion to the Master publish port, using the URI: %s",
uri
@ -298,7 +338,19 @@ class PublishClient(salt.transport.base.PublishClient):
return await self._socket.recv()
async def recv(self, timeout=None):
return await self._socket.recv()
log.error("SOCK %r %s", self._socket, self.connect_called)
if timeout == 0:
events = self.poller.poll(timeout=timeout)
log.error("NOBLOCK %r", events)
if self._socket in events and events[self._socket] == zmq.POLLIN:
return await self._socket.recv()
elif timeout:
try:
return await asyncio.wait_for(self._socket.recv(), timeout=timeout)
except asyncio.exceptions.TimeoutError:
log.error("TIMEOUT")
else:
return await self._socket.recv()
@tornado.gen.coroutine
def send(self, msg):
@ -756,6 +808,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
if self.opts["ipv6"] is True and hasattr(zmq, "IPV4ONLY"):
# IPv6 sockets work for both IPv6 and IPv4 addresses
pub_sock.setsockopt(zmq.IPV4ONLY, 0)
pub_sock.setsockopt(zmq.BACKLOG, self.opts.get("zmq_backlog", 1000))
pub_sock.setsockopt(zmq.LINGER, -1)
# Prepare minion pull socket
@ -765,12 +818,27 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
pull_sock.setsockopt(zmq.LINGER, -1)
salt.utils.zeromq.check_ipc_path_max_len(self.pull_uri)
# Start the minion command publisher
log.info("Starting the Salt Publisher on %s", self.pub_uri)
pub_sock.bind(self.pub_uri)
# Securely create socket
log.info("Starting the Salt Puller on %s", self.pull_uri)
with salt.utils.files.set_umask(0o177):
import pathlib
log.info("Starting the Salt Publisher on %s", self.pub_uri)
pub_sock.bind(self.pub_uri)
if 'ipc://' in self.pub_uri:
pub_path = self.pub_uri.replace("ipc:", "")
#pathlib.Path(pub_path).touch()
os.chmod( # nosec
pub_path,
0o600,
)
log.info("Starting the Salt Puller on %s", self.pull_uri)
pull_sock.bind(self.pull_uri)
if 'ipc://' in self.pull_uri:
pull_path = self.pull_uri.replace("ipc:", "")
#pathlib.Path(pull_path).touch()
os.chmod( # nosec
pull_path,
0o600,
)
async def on_recv(packages):
for package in packages:

View file

@ -352,8 +352,9 @@ class SaltEvent:
if self.cpub:
return True
log.error("EVENT AT LEAS")
log.error("WTF A")
if self._run_io_loop_sync:
log.error("WTF B")
with salt.utils.asynchronous.current_ioloop(self.io_loop):
if self.subscriber is None:
#self.subscriber = salt.utils.asynchronous.SyncWrapper(
@ -392,8 +393,9 @@ class SaltEvent:
self.opts["master_ip"] = ""
self.subscriber = salt.transport.publish_client(self.opts, self.io_loop)
puburi = "ipc://{}".format(self.puburi)
self.subscriber.connect_uri(puburi)
#self.io_loop.spawn_callback(self.subscriber.connect_uri, self.puburi)
#self.io_loop.run_sync(self.subscriber.connect_uri, puburi)
self.io_loop.spawn_callback(self.subscriber.connect_uri, puburi)
log.error("WTF")
#self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
# self.puburi, io_loop=self.io_loop
#)
@ -595,7 +597,7 @@ class SaltEvent:
if not self.cpub and not self.connect_pub(timeout=wait):
break
#riraw = self.subscriber.read(timeout=wait)
print(repr(self.subscriber))
log.warning("Subscriber %r", self.subscriber)
raw = self.subscriber.recv(timeout=wait)
if raw is None:
break

View file

@ -296,36 +296,36 @@ def test_send_master_event(sock_dir):
)
def test_connect_pull_should_debug_log_on_StreamClosedError():
event = SaltEvent(node=None)
with patch.object(event, "pusher") as mock_pusher:
with patch.object(
salt.utils.event.log, "debug", autospec=True
) as mock_log_debug:
mock_pusher.connect.side_effect = tornado.iostream.StreamClosedError
event.connect_pull()
call = mock_log_debug.mock_calls[0]
assert call.args[0] == "Unable to connect pusher: %s"
assert isinstance(call.args[1], tornado.iostream.StreamClosedError)
assert call.args[1].args[0] == "Stream is closed"
@pytest.mark.parametrize("error", [Exception, KeyError, IOError])
def test_connect_pull_should_error_log_on_other_errors(error):
event = SaltEvent(node=None)
with patch.object(event, "pusher") as mock_pusher:
with patch.object(
salt.utils.event.log, "debug", autospec=True
) as mock_log_debug:
with patch.object(
salt.utils.event.log, "error", autospec=True
) as mock_log_error:
mock_pusher.connect.side_effect = error
event.connect_pull()
mock_log_debug.assert_not_called()
call = mock_log_error.mock_calls[0]
assert call.args[0] == "Unable to connect pusher: %s"
assert not isinstance(call.args[1], tornado.iostream.StreamClosedError)
#def test_connect_pull_should_debug_log_on_StreamClosedError():
# event = SaltEvent(node=None)
# with patch.object(event, "pusher") as mock_pusher:
# with patch.object(
# salt.utils.event.log, "debug", autospec=True
# ) as mock_log_debug:
# mock_pusher.connect.side_effect = tornado.iostream.StreamClosedError
# event.connect_pull()
# call = mock_log_debug.mock_calls[0]
# assert call.args[0] == "Unable to connect pusher: %s"
# assert isinstance(call.args[1], tornado.iostream.StreamClosedError)
# assert call.args[1].args[0] == "Stream is closed"
#
#
#@pytest.mark.parametrize("error", [Exception, KeyError, IOError])
#def test_connect_pull_should_error_log_on_other_errors(error):
# event = SaltEvent(node=None)
# with patch.object(event, "pusher") as mock_pusher:
# with patch.object(
# salt.utils.event.log, "debug", autospec=True
# ) as mock_log_debug:
# with patch.object(
# salt.utils.event.log, "error", autospec=True
# ) as mock_log_error:
# mock_pusher.connect.side_effect = error
# event.connect_pull()
# mock_log_debug.assert_not_called()
# call = mock_log_error.mock_calls[0]
# assert call.args[0] == "Unable to connect pusher: %s"
# assert not isinstance(call.args[1], tornado.iostream.StreamClosedError)
@pytest.mark.slow_test

View file

@ -10,12 +10,32 @@ import time
from contextlib import contextmanager
import salt.utils.event
from salt.utils.process import clean_proc
from salt.utils.process import clean_proc, Process
@contextmanager
def eventpublisher_process(sock_dir):
proc = salt.utils.event.EventPublisher({"sock_dir": sock_dir})
opts = {
"sock_dir": sock_dir,
"interface": "127.0.0.1",
"publish_port": 4506,
"ipv6": None,
"zmq_filtering": None,
}
ipc_publisher = salt.transport.publish_server(opts)
ipc_publisher.pub_uri = "ipc://{}".format(
os.path.join(opts["sock_dir"], "master_event_pub.ipc")
)
ipc_publisher.pull_uri = "ipc://{}".format(
os.path.join(opts["sock_dir"], "master_event_pull.ipc")
)
proc = Process(
target=ipc_publisher.publish_daemon,
args=[
ipc_publisher.publish_payload,
],
)
#proc = salt.utils.event.EventPublisher({"sock_dir": sock_dir})
proc.start()
try:
if os.environ.get("TRAVIS_PYTHON_VERSION", None) is not None: