Be sure to always terminate salt's event classes

This commit is contained in:
Pedro Algarvio 2020-12-23 07:31:21 +00:00
parent d9c163f9b0
commit 4291e76bf1
21 changed files with 724 additions and 727 deletions

View file

@ -496,13 +496,13 @@ class Beacon:
Reset the beacons to defaults
"""
self.opts["beacons"] = {}
evt = salt.utils.event.get_event("minion", opts=self.opts)
evt.fire_event(
{
"complete": True,
"comment": "Beacons have been reset",
"beacons": self.opts["beacons"],
},
tag="/salt/minion/minion_beacon_reset_complete",
)
with salt.utils.event.get_event("minion", opts=self.opts) as evt:
evt.fire_event(
{
"complete": True,
"comment": "Beacons have been reset",
"beacons": self.opts["beacons"],
},
tag="/salt/minion/minion_beacon_reset_complete",
)
return True

View file

@ -43,8 +43,6 @@ import salt.utils.stringutils
import salt.utils.thin
import salt.utils.url
import salt.utils.verify
from salt.ext import six
from salt.ext.six.moves import input # pylint: disable=import-error,redefined-builtin
from salt.template import compile_template
from salt.utils.platform import is_junos, is_windows
from salt.utils.process import Process
@ -831,6 +829,8 @@ class SSH:
self.event.fire_event(
data, salt.utils.event.tagify([jid, "ret", host], "job")
)
if self.event is not None:
self.event.destroy()
if self.opts.get("static"):
salt.output.display_output(sret, outputter, self.opts)
if final_exit:

View file

@ -85,7 +85,8 @@ def start(host="localhost", port=24224, app="engine"):
)
log.info("Fluent engine started")
while True:
salt_event = event_bus.get_event_block()
if salt_event:
event.Event(app, salt_event)
with event_bus:
while True:
salt_event = event_bus.get_event_block()
if salt_event:
event.Event(app, salt_event)

View file

@ -1193,10 +1193,12 @@ class MinionManager(MinionBase):
# kill any remaining processes
minion.process_manager.kill_children()
minion.destroy()
self.event.destroy()
def destroy(self):
for minion in self.minions:
minion.destroy()
self.event.destroy()
class Minion(MinionBase):
@ -2879,11 +2881,10 @@ class Minion(MinionBase):
except Exception: # pylint: disable=broad-except
log.critical("The beacon errored: ", exc_info=True)
if beacons:
event = salt.utils.event.get_event(
with salt.utils.event.get_event(
"minion", opts=self.opts, listen=False
)
event.fire_event({"beacons": beacons}, "__beacons_return")
event.destroy()
) as event:
event.fire_event({"beacons": beacons}, "__beacons_return")
if before_connect:
# Make sure there is a chance for one iteration to occur before connect

View file

@ -2354,25 +2354,25 @@ class Events:
"""
An iterator to yield Salt events
"""
event = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
sock_dir=self.opts["sock_dir"],
transport=self.opts["transport"],
opts=self.opts,
listen=True,
)
stream = event.iter_events(full=True, auto_reconnect=True)
) as event:
stream = event.iter_events(full=True, auto_reconnect=True)
yield "retry: 400\n" # future lint: disable=blacklisted-function
yield "retry: 400\n" # future lint: disable=blacklisted-function
while True:
data = next(stream)
yield "tag: {}\n".format(
data.get("tag", "")
) # future lint: disable=blacklisted-function
yield "data: {}\n\n".format(
salt.utils.json.dumps(data)
) # future lint: disable=blacklisted-function
while True:
data = next(stream)
yield "tag: {}\n".format(
data.get("tag", "")
) # future lint: disable=blacklisted-function
yield "data: {}\n\n".format(
salt.utils.json.dumps(data)
) # future lint: disable=blacklisted-function
return listen()
@ -2534,38 +2534,38 @@ class WebsocketEndpoint:
# blocks until send is called on the parent end of this pipe.
pipe.recv()
event = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
sock_dir=self.opts["sock_dir"],
transport=self.opts["transport"],
opts=self.opts,
listen=True,
)
stream = event.iter_events(full=True, auto_reconnect=True)
SaltInfo = event_processor.SaltInfo(handler)
) as event:
stream = event.iter_events(full=True, auto_reconnect=True)
SaltInfo = event_processor.SaltInfo(handler)
def signal_handler(signal, frame):
os._exit(0)
def signal_handler(signal, frame):
os._exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
while True:
data = next(stream)
if data:
try: # work around try to decode catch unicode errors
if "format_events" in kwargs:
SaltInfo.process(data, salt_token, self.opts)
else:
handler.send(
"data: {}\n\n".format(
salt.utils.json.dumps(data)
), # future lint: disable=blacklisted-function
False,
while True:
data = next(stream)
if data:
try: # work around try to decode catch unicode errors
if "format_events" in kwargs:
SaltInfo.process(data, salt_token, self.opts)
else:
handler.send(
"data: {}\n\n".format(
salt.utils.json.dumps(data)
), # future lint: disable=blacklisted-function
False,
)
except UnicodeDecodeError:
logger.error(
"Error: Salt event has non UTF-8 data:\n{}".format(data)
)
except UnicodeDecodeError:
logger.error(
"Error: Salt event has non UTF-8 data:\n{}".format(data)
)
parent_pipe, child_pipe = Pipe()
handler.pipe = parent_pipe

View file

@ -55,21 +55,21 @@ def list_(saltenv="base", test=None):
if not _reactor_system_available():
raise CommandExecutionError("Reactor system is not running.")
sevent = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
__opts__["sock_dir"],
__opts__["transport"],
opts=__opts__,
listen=True,
)
) as sevent:
master_key = salt.utils.master.get_master_key("root", __opts__)
master_key = salt.utils.master.get_master_key("root", __opts__)
__jid_event__.fire_event({"key": master_key}, "salt/reactors/manage/list")
__jid_event__.fire_event({"key": master_key}, "salt/reactors/manage/list")
results = sevent.get_event(wait=30, tag="salt/reactors/manage/list-results")
reactors = results["reactors"]
return reactors
results = sevent.get_event(wait=30, tag="salt/reactors/manage/list-results")
reactors = results["reactors"]
return reactors
def add(event, reactors, saltenv="base", test=None):
@ -88,23 +88,23 @@ def add(event, reactors, saltenv="base", test=None):
if isinstance(reactors, str):
reactors = [reactors]
sevent = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
__opts__["sock_dir"],
__opts__["transport"],
opts=__opts__,
listen=True,
)
) as sevent:
master_key = salt.utils.master.get_master_key("root", __opts__)
master_key = salt.utils.master.get_master_key("root", __opts__)
__jid_event__.fire_event(
{"event": event, "reactors": reactors, "key": master_key},
"salt/reactors/manage/add",
)
__jid_event__.fire_event(
{"event": event, "reactors": reactors, "key": master_key},
"salt/reactors/manage/add",
)
res = sevent.get_event(wait=30, tag="salt/reactors/manage/add-complete")
return res["result"]
res = sevent.get_event(wait=30, tag="salt/reactors/manage/add-complete")
return res["result"]
def delete(event, saltenv="base", test=None):
@ -120,22 +120,22 @@ def delete(event, saltenv="base", test=None):
if not _reactor_system_available():
raise CommandExecutionError("Reactor system is not running.")
sevent = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
__opts__["sock_dir"],
__opts__["transport"],
opts=__opts__,
listen=True,
)
) as sevent:
master_key = salt.utils.master.get_master_key("root", __opts__)
master_key = salt.utils.master.get_master_key("root", __opts__)
__jid_event__.fire_event(
{"event": event, "key": master_key}, "salt/reactors/manage/delete"
)
__jid_event__.fire_event(
{"event": event, "key": master_key}, "salt/reactors/manage/delete"
)
res = sevent.get_event(wait=30, tag="salt/reactors/manage/delete-complete")
return res["result"]
res = sevent.get_event(wait=30, tag="salt/reactors/manage/delete-complete")
return res["result"]
def is_leader():
@ -151,20 +151,20 @@ def is_leader():
if not _reactor_system_available():
raise CommandExecutionError("Reactor system is not running.")
sevent = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
__opts__["sock_dir"],
__opts__["transport"],
opts=__opts__,
listen=True,
)
) as sevent:
master_key = salt.utils.master.get_master_key("root", __opts__)
master_key = salt.utils.master.get_master_key("root", __opts__)
__jid_event__.fire_event({"key": master_key}, "salt/reactors/manage/is_leader")
__jid_event__.fire_event({"key": master_key}, "salt/reactors/manage/is_leader")
res = sevent.get_event(wait=30, tag="salt/reactors/manage/leader/value")
return res["result"]
res = sevent.get_event(wait=30, tag="salt/reactors/manage/leader/value")
return res["result"]
def set_leader(value=True):
@ -180,20 +180,20 @@ def set_leader(value=True):
if not _reactor_system_available():
raise CommandExecutionError("Reactor system is not running.")
sevent = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
__opts__["sock_dir"],
__opts__["transport"],
opts=__opts__,
listen=True,
)
) as sevent:
master_key = salt.utils.master.get_master_key("root", __opts__)
master_key = salt.utils.master.get_master_key("root", __opts__)
__jid_event__.fire_event(
{"id": __opts__["id"], "value": value, "key": master_key},
"salt/reactors/manage/set_leader",
)
__jid_event__.fire_event(
{"id": __opts__["id"], "value": value, "key": master_key},
"salt/reactors/manage/set_leader",
)
res = sevent.get_event(wait=30, tag="salt/reactors/manage/leader/value")
return res["result"]
res = sevent.get_event(wait=30, tag="salt/reactors/manage/leader/value")
return res["result"]

View file

@ -640,70 +640,70 @@ def wait_for_event(name, id_list, event_id="id", timeout=300, node="master"):
ret["result"] = None
return ret
sevent = salt.utils.event.get_event(
with salt.utils.event.get_event(
node, __opts__["sock_dir"], __opts__["transport"], opts=__opts__, listen=True
)
) as sevent:
del_counter = 0
starttime = time.time()
timelimit = starttime + timeout
while True:
event = sevent.get_event(full=True)
is_timedout = time.time() > timelimit
del_counter = 0
starttime = time.time()
timelimit = starttime + timeout
while True:
event = sevent.get_event(full=True)
is_timedout = time.time() > timelimit
if event is None and not is_timedout:
log.trace("wait_for_event: No event data; waiting.")
continue
elif event is None and is_timedout:
ret["comment"] = "Timeout value reached."
return ret
if event is None and not is_timedout:
log.trace("wait_for_event: No event data; waiting.")
continue
elif event is None and is_timedout:
ret["comment"] = "Timeout value reached."
return ret
if fnmatch.fnmatch(event["tag"], name):
val = event["data"].get(event_id)
if val is None and "data" in event["data"]:
val = event["data"]["data"].get(event_id)
if fnmatch.fnmatch(event["tag"], name):
val = event["data"].get(event_id)
if val is None and "data" in event["data"]:
val = event["data"]["data"].get(event_id)
if val is not None:
try:
val_idx = id_list.index(val)
except ValueError:
log.trace(
"wait_for_event: Event identifier '%s' not in "
"id_list; skipping.",
event_id,
)
if val is not None:
try:
val_idx = id_list.index(val)
except ValueError:
log.trace(
"wait_for_event: Event identifier '%s' not in "
"id_list; skipping.",
event_id,
)
else:
del id_list[val_idx]
del_counter += 1
minions_seen = ret["changes"].setdefault("minions_seen", [])
minions_seen.append(val)
log.debug(
"wait_for_event: Event identifier '%s' removed "
"from id_list; %s items remaining.",
val,
len(id_list),
)
else:
del id_list[val_idx]
del_counter += 1
minions_seen = ret["changes"].setdefault("minions_seen", [])
minions_seen.append(val)
log.debug(
"wait_for_event: Event identifier '%s' removed "
"from id_list; %s items remaining.",
val,
len(id_list),
log.trace(
"wait_for_event: Event identifier '%s' not in event "
"'%s'; skipping.",
event_id,
event["tag"],
)
else:
log.trace(
"wait_for_event: Event identifier '%s' not in event "
"'%s'; skipping.",
event_id,
event["tag"],
log.debug("wait_for_event: Skipping unmatched event '%s'", event["tag"])
if len(id_list) == 0:
ret["result"] = True
ret["comment"] = "All events seen in {} seconds.".format(
time.time() - starttime
)
else:
log.debug("wait_for_event: Skipping unmatched event '%s'", event["tag"])
return ret
if len(id_list) == 0:
ret["result"] = True
ret["comment"] = "All events seen in {} seconds.".format(
time.time() - starttime
)
return ret
if is_timedout:
ret["comment"] = "Timeout value reached."
return ret
if is_timedout:
ret["comment"] = "Timeout value reached."
return ret
def runner(name, **kwargs):

View file

@ -463,6 +463,9 @@ class AsyncTCPPubChannel(
if self._closing:
return
self._closing = True
if self.event is not None:
self.event.destroy()
self.event = None
if hasattr(self, "message_client"):
self.message_client.close()
@ -1418,11 +1421,16 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer):
self.event = salt.utils.event.get_event(
"master", opts=self.opts, listen=False
)
else:
self.event = None
def close(self):
if self._closing:
return
self._closing = True
if self.event is not None:
self.event.destroy()
self.event = None
# pylint: disable=W1701
def __del__(self):

View file

@ -49,7 +49,8 @@ Namespaced tag
"""
import atexit
import contextlib
import datetime
import fnmatch
import hashlib
@ -57,7 +58,6 @@ import logging
import os
import time
from collections.abc import MutableMapping
from multiprocessing.util import Finalize
import salt.config
import salt.defaults.exitcodes
@ -75,8 +75,6 @@ import salt.utils.platform
import salt.utils.process
import salt.utils.stringutils
import salt.utils.zeromq
from salt.ext import six
from salt.ext.six.moves import range
log = logging.getLogger(__name__)
@ -747,7 +745,7 @@ class SaltEvent:
dump_data,
self.opts["max_event_size"],
is_msgpacked=True,
use_bin_type=six.PY3,
use_bin_type=True,
)
log.debug("Sending event: tag = %s; data = %s", tag, data)
event = b"".join(
@ -1091,6 +1089,9 @@ class EventPublisher(salt.utils.process.SignalHandlingProcess):
self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
self.opts.update(opts)
self._closing = False
self.io_loop = None
self.puller = None
self.publisher = None
# __setstate__ and __getstate__ are only used on Windows.
# We do this so that __init__ will be invoked on Windows in the child
@ -1154,11 +1155,13 @@ class EventPublisher(salt.utils.process.SignalHandlingProcess):
0o666,
)
# Make sure the IO loop and respective sockets are closed and
# destroyed
Finalize(self, self.close, exitpriority=15)
self.io_loop.start()
atexit.register(self.close)
with contextlib.suppress(KeyboardInterrupt):
try:
self.io_loop.start()
finally:
# Make sure the IO loop and respective sockets are closed and destroyed
self.close()
def handle_publish(self, package, _):
"""
@ -1176,12 +1179,16 @@ class EventPublisher(salt.utils.process.SignalHandlingProcess):
if self._closing:
return
self._closing = True
if hasattr(self, "publisher"):
atexit.unregister(self.close)
if self.publisher is not None:
self.publisher.close()
if hasattr(self, "puller"):
self.publisher = None
if self.puller is not None:
self.puller.close()
if hasattr(self, "io_loop"):
self.puller = None
if self.io_loop is not None:
self.io_loop.close()
self.io_loop = None
def _handle_signals(self, signum, sigframe):
self.close()

View file

@ -2604,14 +2604,14 @@ class GitBase:
# if there is a change, fire an event
if self.opts.get("fileserver_events", False):
event = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
self.opts["sock_dir"],
self.opts["transport"],
opts=self.opts,
listen=False,
)
event.fire_event(data, tagify(["gitfs", "update"], prefix="fileserver"))
) as event:
event.fire_event(data, tagify(["gitfs", "update"], prefix="fileserver"))
try:
salt.fileserver.reap_fileserver_cache_dir(
self.hash_cachedir, self.find_file

View file

@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import logging
import os
import shutil
@ -101,26 +98,26 @@ class ClearFuncsPubTestCase(ConfigMixin, TestCase):
"tgt_type": "glob",
"user": "root",
}
eventbus = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
sock_dir=self.master_config["sock_dir"],
transport=self.master_config["transport"],
opts=self.master_config,
)
ret = clear_channel.send(msg, timeout=15)
if salt.utils.platform.is_windows():
time.sleep(30)
timeout = 30
else:
timeout = 5
ret_evt = None
start = time.time()
while time.time() - start <= timeout:
raw = eventbus.get_event(timeout, auto_reconnect=True)
if raw and "jid" in raw and raw["jid"] == jid:
ret_evt = raw
break
assert not os.path.exists(self.tmpfile), "Evil file created"
) as eventbus:
ret = clear_channel.send(msg, timeout=15)
if salt.utils.platform.is_windows():
time.sleep(30)
timeout = 30
else:
timeout = 5
ret_evt = None
start = time.time()
while time.time() - start <= timeout:
raw = eventbus.get_event(timeout, auto_reconnect=True)
if raw and "jid" in raw and raw["jid"] == jid:
ret_evt = raw
break
assert not os.path.exists(self.tmpfile), "Evil file created"
class ClearFuncsConfigTest(ConfigMixin, TestCase):

View file

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
:codeauthor: Pedro Algarvio (pedro@algarvio.me)
@ -7,14 +6,12 @@
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
from __future__ import absolute_import
import queue
import threading
import time
import pytest
import salt.utils.event as event
from salt.ext.six.moves.queue import Empty, Queue
from tests.support.case import ModuleCase
@ -22,11 +19,11 @@ from tests.support.case import ModuleCase
@pytest.mark.usefixtures("salt_sub_minion")
class EventModuleTest(ModuleCase):
def __test_event_fire_master(self):
events = Queue()
events = queue.Queue()
def get_event(events):
me = event.MasterEvent(self.master_opts["sock_dir"], listen=True)
events.put_nowait(me.get_event(wait=10, tag="salttest", full=False))
with event.MasterEvent(self.master_opts["sock_dir"], listen=True) as me:
events.put_nowait(me.get_event(wait=10, tag="salttest", full=False))
threading.Thread(target=get_event, args=(events,)).start()
time.sleep(1) # Allow multiprocessing.Process to start
@ -46,15 +43,15 @@ class EventModuleTest(ModuleCase):
)
self.assertTrue(ret)
with self.assertRaises(Empty):
with self.assertRaises(queue.Empty):
eventfired = events.get(block=True, timeout=10)
def __test_event_fire(self):
events = Queue()
events = queue.Queue()
def get_event(events):
me = event.MinionEvent(self.minion_opts, listen=True)
events.put_nowait(me.get_event(wait=10, tag="salttest", full=False))
with event.MinionEvent(self.minion_opts, listen=True) as me:
events.put_nowait(me.get_event(wait=10, tag="salttest", full=False))
threading.Thread(target=get_event, args=(events,)).start()
time.sleep(1) # Allow multiprocessing.Process to start
@ -73,15 +70,15 @@ class EventModuleTest(ModuleCase):
)
self.assertTrue(ret)
with self.assertRaises(Empty):
with self.assertRaises(queue.Empty):
eventfired = events.get(block=True, timeout=10)
def __test_event_fire_ipc_mode_tcp(self):
events = Queue()
events = queue.Queue()
def get_event(events):
me = event.MinionEvent(self.sub_minion_opts, listen=True)
events.put_nowait(me.get_event(wait=10, tag="salttest", full=False))
with event.MinionEvent(self.sub_minion_opts, listen=True) as me:
events.put_nowait(me.get_event(wait=10, tag="salttest", full=False))
threading.Thread(target=get_event, args=(events,)).start()
time.sleep(1) # Allow multiprocessing.Process to start
@ -104,5 +101,5 @@ class EventModuleTest(ModuleCase):
)
self.assertTrue(ret)
with self.assertRaises(Empty):
with self.assertRaises(queue.Empty):
eventfired = events.get(block=True, timeout=10)

View file

@ -6,17 +6,13 @@ import pytest
import salt.utils.json
import salt.utils.stringutils
from salt.netapi.rest_tornado import saltnado
from salt.utils.versions import StrictVersion
from salt.utils.zeromq import ZMQDefaultLoop as ZMQIOLoop
from salt.utils.zeromq import zmq
from tests.support.helpers import TstSuiteLoggingHandler, flaky, slowTest
from tests.support.unit import skipIf
from tests.unit.netapi.test_rest_tornado import SaltnadoTestCase
HAS_ZMQ_IOLOOP = bool(zmq)
from tests.unit.netapi.test_rest_tornado import SaltnadoTestsBase
class _SaltnadoIntegrationTestCase(SaltnadoTestCase): # pylint: disable=abstract-method
class SaltnadoIntegrationTestsBase(SaltnadoTestsBase):
@property
def opts(self):
return self.get_config("client_config", from_scratch=True)
@ -25,14 +21,12 @@ class _SaltnadoIntegrationTestCase(SaltnadoTestCase): # pylint: disable=abstrac
def mod_opts(self):
return self.get_config("minion", from_scratch=True)
def get_app(self):
raise NotImplementedError
@skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
@skipIf(
StrictVersion(zmq.__version__) < StrictVersion("14.0.1"),
"PyZMQ must be >= 14.0.1 to run these tests.",
)
@pytest.mark.usefixtures("salt_sub_minion")
class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):
class TestSaltAPIHandler(SaltnadoIntegrationTestsBase):
def setUp(self):
super().setUp()
os.environ["ASYNC_TEST_TIMEOUT"] = "300"
@ -432,8 +426,7 @@ class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):
@flaky
@skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
class TestMinionSaltAPIHandler(_SaltnadoIntegrationTestCase):
class TestMinionSaltAPIHandler(SaltnadoIntegrationTestsBase):
def get_app(self):
urls = [
(r"/minions/(.*)", saltnado.MinionSaltAPIHandler),
@ -536,8 +529,7 @@ class TestMinionSaltAPIHandler(_SaltnadoIntegrationTestCase):
self.assertEqual(response.code, 400)
@skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
class TestJobsSaltAPIHandler(_SaltnadoIntegrationTestCase):
class TestJobsSaltAPIHandler(SaltnadoIntegrationTestsBase):
def get_app(self):
urls = [
(r"/jobs/(.*)", saltnado.JobsSaltAPIHandler),
@ -593,8 +585,7 @@ class TestJobsSaltAPIHandler(_SaltnadoIntegrationTestCase):
# TODO: run all the same tests from the root handler, but for now since they are
# the same code, we'll just sanity check
@skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
class TestRunSaltAPIHandler(_SaltnadoIntegrationTestCase):
class TestRunSaltAPIHandler(SaltnadoIntegrationTestsBase):
def get_app(self):
urls = [
("/run", saltnado.RunSaltAPIHandler),
@ -619,8 +610,7 @@ class TestRunSaltAPIHandler(_SaltnadoIntegrationTestCase):
self.assertEqual(response_obj["return"], [{"minion": True, "sub_minion": True}])
@skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
class TestEventsSaltAPIHandler(_SaltnadoIntegrationTestCase):
class TestEventsSaltAPIHandler(SaltnadoIntegrationTestsBase):
def get_app(self):
urls = [
(r"/events", saltnado.EventsSaltAPIHandler),
@ -666,8 +656,7 @@ class TestEventsSaltAPIHandler(_SaltnadoIntegrationTestCase):
self.assertTrue(data.startswith("data: "))
@skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
class TestWebhookSaltAPIHandler(_SaltnadoIntegrationTestCase):
class TestWebhookSaltAPIHandler(SaltnadoIntegrationTestsBase):
def get_app(self):
urls = [

View file

@ -43,8 +43,8 @@ class ReactorTest(SaltMinionEventAssertsMixin, ShellCase):
)
def fire_event(self, tag, data):
event = self.get_event()
event.fire_event(tag, data)
with self.get_event() as event:
event.fire_event(tag, data)
def alarm_handler(self, signal, frame):
raise TimeoutException("Timeout of {} seconds reached".format(self.timeout))
@ -55,12 +55,10 @@ class ReactorTest(SaltMinionEventAssertsMixin, ShellCase):
that it pings the minion
"""
# Create event bus connection
e = salt.utils.event.get_event(
with salt.utils.event.get_event(
"minion", sock_dir=self.minion_opts["sock_dir"], opts=self.minion_opts
)
e.fire_event({"a": "b"}, "/test_event")
) as event:
event.fire_event({"a": "b"}, "/test_event")
self.assertMinionEventReceived({"a": "b"}, timeout=30)
@skipIf(salt.utils.platform.is_windows(), "no sigalarm on windows")
@ -72,8 +70,8 @@ class ReactorTest(SaltMinionEventAssertsMixin, ShellCase):
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(self.timeout)
master_event = self.get_event()
master_event.fire_event({"id": "minion"}, "salt/test/reactor")
with self.get_event() as master_event:
master_event.fire_event({"id": "minion"}, "salt/test/reactor")
try:
while True:
@ -126,19 +124,18 @@ class ReactorTest(SaltMinionEventAssertsMixin, ShellCase):
self.assertFalse(ret["return"])
try:
master_event = self.get_event()
self.fire_event({"id": "minion"}, "salt/test/reactor")
with self.get_event() as master_event:
self.fire_event({"id": "minion"}, "salt/test/reactor")
while True:
event = master_event.get_event(full=True)
while True:
event = master_event.get_event(full=True)
if event is None:
continue
if event is None:
continue
if event.get("tag") == "test_reaction":
# if we reach this point, the test is a failure
self.assertTrue(True) # pylint: disable=redundant-unittest-assert
break
if event.get("tag") == "test_reaction":
# if we reach this point, the test is a failure
break
except TimeoutException as exc:
self.assertTrue("Timeout" in str(exc))
finally:
@ -165,17 +162,17 @@ class ReactorTest(SaltMinionEventAssertsMixin, ShellCase):
signal.alarm(self.timeout)
try:
master_event = self.get_event()
self.fire_event({"id": "minion"}, "salt/test/reactor")
with self.get_event() as master_event:
self.fire_event({"id": "minion"}, "salt/test/reactor")
while True:
event = master_event.get_event(full=True)
while True:
event = master_event.get_event(full=True)
if event is None:
continue
if event is None:
continue
if event.get("tag") == "test_reaction":
self.assertTrue(event["data"]["test_reaction"])
break
if event.get("tag") == "test_reaction":
self.assertTrue(event["data"]["test_reaction"])
break
finally:
signal.alarm(0)

View file

@ -431,36 +431,35 @@ class OrchEventTest(ShellCase):
)
)
listener = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
sock_dir=self.master_opts["sock_dir"],
transport=self.master_opts["transport"],
opts=self.master_opts,
)
) as listener:
jid = self.run_run_plus("state.orchestrate", "test_orch").get("jid")
jid = self.run_run_plus("state.orchestrate", "test_orch").get("jid")
if jid is None:
raise Exception("jid missing from run_run_plus output")
if jid is None:
raise Exception("jid missing from run_run_plus output")
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(self.timeout)
try:
while True:
event = listener.get_event(full=True)
if event is None:
continue
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(self.timeout)
try:
while True:
event = listener.get_event(full=True)
if event is None:
continue
if event["tag"] == "salt/run/{}/ret".format(jid):
# Don't wrap this in a try/except. We want to know if the
# data structure is different from what we expect!
ret = event["data"]["return"]["data"]["master"]
for job in ret:
self.assertTrue("__jid__" in ret[job])
break
finally:
del listener
signal.alarm(0)
if event["tag"] == "salt/run/{}/ret".format(jid):
# Don't wrap this in a try/except. We want to know if the
# data structure is different from what we expect!
ret = event["data"]["return"]["data"]["master"]
for job in ret:
self.assertTrue("__jid__" in ret[job])
break
finally:
signal.alarm(0)
@expensiveTest
def test_parallel_orchestrations(self):
@ -501,48 +500,47 @@ class OrchEventTest(ShellCase):
orch_sls = os.path.join(self.base_env, "test_par_orch.sls")
listener = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
sock_dir=self.master_opts["sock_dir"],
transport=self.master_opts["transport"],
opts=self.master_opts,
)
) as listener:
start_time = time.time()
jid = self.run_run_plus("state.orchestrate", "test_par_orch").get("jid")
start_time = time.time()
jid = self.run_run_plus("state.orchestrate", "test_par_orch").get("jid")
if jid is None:
raise Exception("jid missing from run_run_plus output")
if jid is None:
raise Exception("jid missing from run_run_plus output")
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(self.timeout)
received = False
try:
while True:
event = listener.get_event(full=True)
if event is None:
continue
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(self.timeout)
received = False
try:
while True:
event = listener.get_event(full=True)
if event is None:
continue
# if we receive the ret for this job before self.timeout (60),
# the test is implicitly successful; if it were happening in serial it would be
# atleast 110 seconds.
if event["tag"] == "salt/run/{}/ret".format(jid):
received = True
# Don't wrap this in a try/except. We want to know if the
# data structure is different from what we expect!
ret = event["data"]["return"]["data"]["master"]
for state in ret:
data = ret[state]
# we expect each duration to be greater than 10s
self.assertTrue(data["duration"] > 10000)
break
# if we receive the ret for this job before self.timeout (60),
# the test is implicitly successful; if it were happening in serial it would be
# atleast 110 seconds.
if event["tag"] == "salt/run/{}/ret".format(jid):
received = True
# Don't wrap this in a try/except. We want to know if the
# data structure is different from what we expect!
ret = event["data"]["return"]["data"]["master"]
for state in ret:
data = ret[state]
# we expect each duration to be greater than 10s
self.assertTrue(data["duration"] > 10000)
break
# self confirm that the total runtime is roughly 30s (left 10s for buffer)
self.assertTrue((time.time() - start_time) < 40)
finally:
self.assertTrue(received)
del listener
signal.alarm(0)
# self confirm that the total runtime is roughly 30s (left 10s for buffer)
self.assertTrue((time.time() - start_time) < 40)
finally:
self.assertTrue(received)
signal.alarm(0)
@expensiveTest
def test_orchestration_soft_kill(self):
@ -569,47 +567,46 @@ class OrchEventTest(ShellCase):
)
)
listener = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
sock_dir=self.master_opts["sock_dir"],
transport=self.master_opts["transport"],
opts=self.master_opts,
)
) as listener:
mock_jid = "20131219120000000000"
self.run_run("state.soft_kill {} stage_two".format(mock_jid))
with patch("salt.utils.jid.gen_jid", MagicMock(return_value=mock_jid)):
jid = self.run_run_plus("state.orchestrate", "two_stage_orch_kill").get(
"jid"
)
mock_jid = "20131219120000000000"
self.run_run("state.soft_kill {} stage_two".format(mock_jid))
with patch("salt.utils.jid.gen_jid", MagicMock(return_value=mock_jid)):
jid = self.run_run_plus("state.orchestrate", "two_stage_orch_kill").get(
"jid"
)
if jid is None:
raise Exception("jid missing from run_run_plus output")
if jid is None:
raise Exception("jid missing from run_run_plus output")
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(self.timeout)
received = False
try:
while True:
event = listener.get_event(full=True)
if event is None:
continue
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(self.timeout)
received = False
try:
while True:
event = listener.get_event(full=True)
if event is None:
continue
# Ensure that stage_two of the state does not run
if event["tag"] == "salt/run/{}/ret".format(jid):
received = True
# Don't wrap this in a try/except. We want to know if the
# data structure is different from what we expect!
ret = event["data"]["return"]["data"]["master"]
self.assertNotIn(
"test_|-stage_two_|-stage_two_|-fail_without_changes", ret
)
break
# Ensure that stage_two of the state does not run
if event["tag"] == "salt/run/{}/ret".format(jid):
received = True
# Don't wrap this in a try/except. We want to know if the
# data structure is different from what we expect!
ret = event["data"]["return"]["data"]["master"]
self.assertNotIn(
"test_|-stage_two_|-stage_two_|-fail_without_changes", ret
)
break
finally:
self.assertTrue(received)
del listener
signal.alarm(0)
finally:
self.assertTrue(received)
signal.alarm(0)
@slowTest
def test_orchestration_with_pillar_dot_items(self):
@ -672,42 +669,41 @@ class OrchEventTest(ShellCase):
orch_sls = os.path.join(self.base_env, "main.sls")
listener = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
sock_dir=self.master_opts["sock_dir"],
transport=self.master_opts["transport"],
opts=self.master_opts,
)
) as listener:
jid = self.run_run_plus("state.orchestrate", "main").get("jid")
jid = self.run_run_plus("state.orchestrate", "main").get("jid")
if jid is None:
raise salt.exceptions.SaltInvocationError(
"jid missing from run_run_plus output"
)
if jid is None:
raise salt.exceptions.SaltInvocationError(
"jid missing from run_run_plus output"
)
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(self.timeout)
received = False
try:
while True:
event = listener.get_event(full=True)
if event is None:
continue
if event.get("tag", "") == "salt/run/{}/ret".format(jid):
received = True
# Don't wrap this in a try/except. We want to know if the
# data structure is different from what we expect!
ret = event["data"]["return"]["data"]["master"]
for state in ret:
data = ret[state]
# Each state should be successful
self.assertEqual(data["comment"], "Success!")
break
finally:
self.assertTrue(received)
del listener
signal.alarm(0)
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(self.timeout)
received = False
try:
while True:
event = listener.get_event(full=True)
if event is None:
continue
if event.get("tag", "") == "salt/run/{}/ret".format(jid):
received = True
# Don't wrap this in a try/except. We want to know if the
# data structure is different from what we expect!
ret = event["data"]["return"]["data"]["master"]
for state in ret:
data = ret[state]
# Each state should be successful
self.assertEqual(data["comment"], "Success!")
break
finally:
self.assertTrue(received)
signal.alarm(0)
@slowTest
def test_orchestration_onchanges_and_prereq(self):
@ -747,53 +743,57 @@ class OrchEventTest(ShellCase):
)
)
listener = salt.utils.event.get_event(
with salt.utils.event.get_event(
"master",
sock_dir=self.master_opts["sock_dir"],
transport=self.master_opts["transport"],
opts=self.master_opts,
)
) as listener:
try:
jid1 = self.run_run_plus("state.orchestrate", "orch", test=True).get("jid")
# Run for real to create the file
self.run_run_plus("state.orchestrate", "orch").get("jid")
# Run again in test mode. Since there were no changes, the
# requisites should not fire.
jid2 = self.run_run_plus("state.orchestrate", "orch", test=True).get("jid")
finally:
try:
os.remove(os.path.join(RUNTIME_VARS.TMP, "orch.req_test"))
except OSError:
pass
jid1 = self.run_run_plus("state.orchestrate", "orch", test=True).get(
"jid"
)
assert jid1 is not None
assert jid2 is not None
# Run for real to create the file
self.run_run_plus("state.orchestrate", "orch").get("jid")
tags = {"salt/run/{}/ret".format(x): x for x in (jid1, jid2)}
ret = {}
# Run again in test mode. Since there were no changes, the
# requisites should not fire.
jid2 = self.run_run_plus("state.orchestrate", "orch", test=True).get(
"jid"
)
finally:
try:
os.remove(os.path.join(RUNTIME_VARS.TMP, "orch.req_test"))
except OSError:
pass
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(self.timeout)
try:
while True:
event = listener.get_event(full=True)
if event is None:
continue
assert jid1 is not None
assert jid2 is not None
if event["tag"] in tags:
ret[tags.pop(event["tag"])] = self.repack_state_returns(
event["data"]["return"]["data"]["master"]
)
if not tags:
# If tags is empty, we've grabbed all the returns we
# wanted, so let's stop listening to the event bus.
break
finally:
del listener
signal.alarm(0)
tags = {"salt/run/{}/ret".format(x): x for x in (jid1, jid2)}
ret = {}
signal.signal(signal.SIGALRM, self.alarm_handler)
signal.alarm(self.timeout)
try:
while True:
event = listener.get_event(full=True)
if event is None:
continue
if event["tag"] in tags:
ret[tags.pop(event["tag"])] = self.repack_state_returns(
event["data"]["return"]["data"]["master"]
)
if not tags:
# If tags is empty, we've grabbed all the returns we
# wanted, so let's stop listening to the event bus.
break
finally:
del listener
signal.alarm(0)
for sls_id in ("manage_a_file", "do_onchanges", "do_prereq"):
# The first time through, all three states should have a None

View file

@ -37,15 +37,15 @@ class EventSender(multiprocessing.Process):
self.sock_dir = sock_dir
def run(self):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=False)
time.sleep(self.wait)
me.fire_event(self.data, self.tag)
# Wait a few seconds before tearing down the zmq context
if os.environ.get("TRAVIS_PYTHON_VERSION", None) is not None:
# Travis is slow
time.sleep(10)
else:
time.sleep(2)
with salt.utils.event.MasterEvent(self.sock_dir, listen=False) as me:
time.sleep(self.wait)
me.fire_event(self.data, self.tag)
# Wait a few seconds before tearing down the zmq context
if os.environ.get("TRAVIS_PYTHON_VERSION", None) is not None:
# Travis is slow
time.sleep(10)
else:
time.sleep(2)
@contextmanager

View file

@ -598,23 +598,25 @@ def _fetch_events(q, opts):
queue_item.task_done()
atexit.register(_clean_queue)
event = salt.utils.event.get_event("minion", sock_dir=opts["sock_dir"], opts=opts)
with salt.utils.event.get_event(
"minion", sock_dir=opts["sock_dir"], opts=opts
) as event:
# Wait for event bus to be connected
while not event.connect_pull(30):
time.sleep(1)
# Wait for event bus to be connected
while not event.connect_pull(30):
time.sleep(1)
# Notify parent process that the event bus is connected
q.put("CONNECTED")
# Notify parent process that the event bus is connected
q.put("CONNECTED")
while True:
try:
events = event.get_event(full=False)
except Exception as exc: # pylint: disable=broad-except
# This is broad but we'll see all kinds of issues right now
# if we drop the proc out from under the socket while we're reading
log.exception("Exception caught while getting events %r", exc)
q.put(events)
while True:
try:
events = event.get_event(full=False)
except Exception as exc: # pylint: disable=broad-except
# This is broad but we'll see all kinds of issues right now
# if we drop the proc out from under the socket while we're reading
log.exception("Exception caught while getting events %r", exc)
q.put(events)
class SaltMinionEventAssertsMixin:

View file

@ -1,17 +1,11 @@
# -*- coding: utf-8 -*-
"""
:codeauthor: Rupesh Tare <rupesht@saltstack.com>
"""
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Libs
import salt.modules.event as event
import salt.utils.event
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import MagicMock, patch
# Import Salt Testing Libs
from tests.support.runtests import RUNTIME_VARS
from tests.support.unit import TestCase

View file

@ -2,54 +2,28 @@ import copy
import hashlib
import os
import shutil
from urllib.parse import urlencode, urlparse
import urllib.parse
import salt.auth
import salt.ext.tornado.concurrent
import salt.ext.tornado.escape
import salt.ext.tornado.testing
import salt.netapi.rest_tornado as rest_tornado
import salt.utils.event
import salt.utils.json
import salt.utils.yaml
from salt.ext.tornado.httpclient import HTTPError, HTTPRequest
from salt.ext.tornado.testing import AsyncHTTPTestCase, AsyncTestCase, gen_test
from salt.ext.tornado.websocket import websocket_connect
from salt.netapi.rest_tornado import saltnado
from tests.support.events import eventpublisher_process
from tests.support.helpers import patched_environ, slowTest
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
from tests.support.mock import MagicMock, patch
from tests.support.runtests import RUNTIME_VARS
from tests.support.unit import TestCase, skipIf
try:
HAS_TORNADO = True
except ImportError:
HAS_TORNADO = False
# pylint: disable=import-error
try:
import salt.ext.tornado.escape
import salt.ext.tornado.testing
import salt.ext.tornado.concurrent
from salt.ext.tornado.testing import AsyncTestCase, AsyncHTTPTestCase, gen_test
from salt.ext.tornado.httpclient import HTTPRequest, HTTPError
from salt.ext.tornado.websocket import websocket_connect
import salt.netapi.rest_tornado as rest_tornado
from salt.netapi.rest_tornado import saltnado
HAS_TORNADO = True
except ImportError:
HAS_TORNADO = False
# Create fake test case classes so we can properly skip the test case
class AsyncTestCase:
pass
class AsyncHTTPTestCase:
pass
# pylint: enable=import-error
@skipIf(
not HAS_TORNADO, "The tornado package needs to be installed"
) # pylint: disable=W0223
class SaltnadoTestCase(TestCase, AdaptedConfigurationTestCaseMixin, AsyncHTTPTestCase):
class SaltnadoTestsBase(AsyncHTTPTestCase, AdaptedConfigurationTestCaseMixin):
"""
Mixin to hold some shared things
"""
@ -115,8 +89,8 @@ class SaltnadoTestCase(TestCase, AdaptedConfigurationTestCaseMixin, AsyncHTTPTes
del self._AsyncHTTPTestCase__port
if hasattr(self, "__auth"):
del self.__auth
if hasattr(self, "_SaltnadoTestCase__auth"):
del self._SaltnadoTestCase__auth
if hasattr(self, "_SaltnadoTestsBase__auth"):
del self._SaltnadoTestsBase__auth
if hasattr(self, "_test_generator"):
del self._test_generator
if hasattr(self, "application"):
@ -147,8 +121,11 @@ class SaltnadoTestCase(TestCase, AdaptedConfigurationTestCaseMixin, AsyncHTTPTes
def fetch(self, path, **kwargs):
return self.decode_body(super().fetch(path, **kwargs))
def get_app(self):
raise NotImplementedError
class TestBaseSaltAPIHandler(SaltnadoTestCase):
class TestBaseSaltAPIHandler(SaltnadoTestsBase):
def get_app(self):
class StubHandler(saltnado.BaseSaltAPIHandler): # pylint: disable=W0223
def get(self, *args, **kwargs):
@ -328,7 +305,7 @@ class TestBaseSaltAPIHandler(SaltnadoTestCase):
response = self.fetch(
"/",
method="POST",
body=urlencode(form_lowstate),
body=urllib.parse.urlencode(form_lowstate),
headers={"Content-Type": self.content_type_map["form"]},
)
returned_lowstate = salt.utils.json.loads(response.body)["lowstate"]
@ -450,7 +427,7 @@ class TestBaseSaltAPIHandler(SaltnadoTestCase):
response = self.fetch(
"/",
method="POST",
body=urlencode(request_form_lowstate),
body=urllib.parse.urlencode(request_form_lowstate),
headers={"Content-Type": self.content_type_map["form"]},
)
self.assertEqual(
@ -539,7 +516,7 @@ class TestBaseSaltAPIHandler(SaltnadoTestCase):
self.assertEqual(headers["Access-Control-Allow-Origin"], "*")
class TestWebhookSaltHandler(SaltnadoTestCase):
class TestWebhookSaltHandler(SaltnadoTestsBase):
def get_app(self):
urls = [
(r"/hook(/.*)?", saltnado.WebhookSaltAPIHandler),
@ -559,7 +536,7 @@ class TestWebhookSaltHandler(SaltnadoTestCase):
headers={"Content-Type": self.content_type_map["json"]},
)
self.assertEqual(response.code, 200, response.body)
host = urlparse(response.effective_url).netloc
host = urllib.parse.urlparse(response.effective_url).netloc
event.fire_event.assert_called_once_with(
{
"headers": {
@ -576,7 +553,7 @@ class TestWebhookSaltHandler(SaltnadoTestCase):
)
class TestSaltAuthHandler(SaltnadoTestCase):
class TestSaltAuthHandler(SaltnadoTestsBase):
def get_app(self):
urls = [("/login", saltnado.SaltAuthHandler)]
return self.build_tornado_app(urls)
@ -596,7 +573,7 @@ class TestSaltAuthHandler(SaltnadoTestCase):
response = self.fetch(
"/login",
method="POST",
body=urlencode(self.auth_creds),
body=urllib.parse.urlencode(self.auth_creds),
headers={"Content-Type": self.content_type_map["form"]},
)
@ -673,7 +650,7 @@ class TestSaltAuthHandler(SaltnadoTestCase):
response = self.fetch(
"/login",
method="POST",
body=urlencode(bad_creds),
body=urllib.parse.urlencode(bad_creds),
headers={"Content-Type": self.content_type_map["form"]},
)
@ -694,7 +671,7 @@ class TestSaltAuthHandler(SaltnadoTestCase):
response = self.fetch(
"/login",
method="POST",
body=urlencode(bad_creds),
body=urllib.parse.urlencode(bad_creds),
headers={"Content-Type": self.content_type_map["form"]},
)
@ -732,7 +709,7 @@ class TestSaltAuthHandler(SaltnadoTestCase):
self.assertEqual(response.code, 400)
class TestSaltRunHandler(SaltnadoTestCase):
class TestSaltRunHandler(SaltnadoTestsBase):
def get_app(self):
urls = [("/run", saltnado.RunSaltAPIHandler)]
return self.build_tornado_app(urls)
@ -765,10 +742,7 @@ class TestSaltRunHandler(SaltnadoTestCase):
self.assertEqual(valid_response, salt.utils.json.loads(response.body))
@skipIf(
not HAS_TORNADO, "The tornado package needs to be installed"
) # pylint: disable=W0223
class TestWebsocketSaltAPIHandler(SaltnadoTestCase):
class TestWebsocketSaltAPIHandler(SaltnadoTestsBase):
def get_app(self):
opts = copy.deepcopy(self.opts)
opts.setdefault("rest_tornado", {})["websockets"] = True
@ -779,7 +753,7 @@ class TestWebsocketSaltAPIHandler(SaltnadoTestCase):
response = yield self.http_client.fetch(
self.get_url("/login"),
method="POST",
body=urlencode(self.auth_creds),
body=urllib.parse.urlencode(self.auth_creds),
headers={"Content-Type": self.content_type_map["form"]},
)
token = salt.utils.json.loads(self.decode_body(response).body)["return"][0][
@ -819,7 +793,7 @@ class TestWebsocketSaltAPIHandler(SaltnadoTestCase):
response = yield self.http_client.fetch(
self.get_url("/login"),
method="POST",
body=urlencode(self.auth_creds),
body=urllib.parse.urlencode(self.auth_creds),
headers={"Content-Type": self.content_type_map["form"]},
)
token = salt.utils.json.loads(self.decode_body(response).body)["return"][0][
@ -841,7 +815,7 @@ class TestWebsocketSaltAPIHandler(SaltnadoTestCase):
response = yield self.http_client.fetch(
self.get_url("/login"),
method="POST",
body=urlencode(self.auth_creds),
body=urllib.parse.urlencode(self.auth_creds),
headers={"Content-Type": self.content_type_map["form"]},
)
token = salt.utils.json.loads(self.decode_body(response).body)["return"][0][
@ -873,7 +847,7 @@ class TestWebsocketSaltAPIHandler(SaltnadoTestCase):
response = yield self.http_client.fetch(
self.get_url("/login"),
method="POST",
body=urlencode(self.auth_creds),
body=urllib.parse.urlencode(self.auth_creds),
headers={"Content-Type": self.content_type_map["form"]},
)
token = salt.utils.json.loads(self.decode_body(response).body)["return"][0][
@ -898,7 +872,6 @@ class TestWebsocketSaltAPIHandler(SaltnadoTestCase):
ws.close()
@skipIf(not HAS_TORNADO, "The tornado package needs to be installed")
class TestSaltnadoUtils(AsyncTestCase):
def test_any_future(self):
"""
@ -937,7 +910,6 @@ class TestSaltnadoUtils(AsyncTestCase):
self.assertIs(futures[1].done(), False)
@skipIf(not HAS_TORNADO, "The tornado package needs to be installed")
class TestEventListener(AsyncTestCase):
def setUp(self):
self.sock_dir = os.path.join(RUNTIME_VARS.TMP, "test-socks")
@ -952,23 +924,23 @@ class TestEventListener(AsyncTestCase):
Test getting a few events
"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir)
event_listener = saltnado.EventListener(
{}, # we don't use mod_opts, don't save?
{"sock_dir": self.sock_dir, "transport": "zeromq"},
)
self._finished = False # fit to event_listener's behavior
event_future = event_listener.get_event(
self, "evt1", callback=self.stop
) # get an event future
me.fire_event({"data": "foo2"}, "evt2") # fire an event we don't want
me.fire_event({"data": "foo1"}, "evt1") # fire an event we do want
self.wait() # wait for the future
with salt.utils.event.MasterEvent(self.sock_dir) as me:
event_listener = saltnado.EventListener(
{}, # we don't use mod_opts, don't save?
{"sock_dir": self.sock_dir, "transport": "zeromq"},
)
self._finished = False # fit to event_listener's behavior
event_future = event_listener.get_event(
self, "evt1", callback=self.stop
) # get an event future
me.fire_event({"data": "foo2"}, "evt2") # fire an event we don't want
me.fire_event({"data": "foo1"}, "evt1") # fire an event we do want
self.wait() # wait for the future
# check that we got the event we wanted
self.assertTrue(event_future.done())
self.assertEqual(event_future.result()["tag"], "evt1")
self.assertEqual(event_future.result()["data"]["data"], "foo1")
# check that we got the event we wanted
self.assertTrue(event_future.done())
self.assertEqual(event_future.result()["tag"], "evt1")
self.assertEqual(event_future.result()["data"]["data"], "foo1")
@slowTest
def test_set_event_handler(self):
@ -976,20 +948,20 @@ class TestEventListener(AsyncTestCase):
Test subscribing events using set_event_handler
"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir)
event_listener = saltnado.EventListener(
{}, # we don't use mod_opts, don't save?
{"sock_dir": self.sock_dir, "transport": "zeromq"},
)
self._finished = False # fit to event_listener's behavior
event_future = event_listener.get_event(
self, tag="evt", callback=self.stop, timeout=1,
) # get an event future
me.fire_event({"data": "foo"}, "evt") # fire an event we do want
self.wait()
with salt.utils.event.MasterEvent(self.sock_dir) as me:
event_listener = saltnado.EventListener(
{}, # we don't use mod_opts, don't save?
{"sock_dir": self.sock_dir, "transport": "zeromq"},
)
self._finished = False # fit to event_listener's behavior
event_future = event_listener.get_event(
self, tag="evt", callback=self.stop, timeout=1,
) # get an event future
me.fire_event({"data": "foo"}, "evt") # fire an event we do want
self.wait()
# check that we subscribed the event we wanted
self.assertEqual(len(event_listener.timeout_map), 0)
# check that we subscribed the event we wanted
self.assertEqual(len(event_listener.timeout_map), 0)
@slowTest
def test_timeout(self):
@ -1045,58 +1017,60 @@ class TestEventListener(AsyncTestCase):
self.stop()
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir)
event_listener = saltnado.EventListener(
{}, # we don't use mod_opts, don't save?
{"sock_dir": self.sock_dir, "transport": "zeromq"},
)
with salt.utils.event.MasterEvent(self.sock_dir) as me:
event_listener = saltnado.EventListener(
{}, # we don't use mod_opts, don't save?
{"sock_dir": self.sock_dir, "transport": "zeromq"},
)
self.assertEqual(0, len(event_listener.tag_map))
self.assertEqual(0, len(event_listener.request_map))
self.assertEqual(0, len(event_listener.tag_map))
self.assertEqual(0, len(event_listener.request_map))
self._finished = False # fit to event_listener's behavior
dummy_request = DummyRequest()
request_future_1 = event_listener.get_event(self, tag="evt1")
request_future_2 = event_listener.get_event(
self, tag="evt2", callback=lambda f: stop()
)
dummy_request_future_1 = event_listener.get_event(
dummy_request, tag="evt3", callback=lambda f: stop()
)
dummy_request_future_2 = event_listener.get_event(
dummy_request, timeout=10, tag="evt4"
)
self._finished = False # fit to event_listener's behavior
dummy_request = DummyRequest()
request_future_1 = event_listener.get_event(self, tag="evt1")
request_future_2 = event_listener.get_event(
self, tag="evt2", callback=lambda f: stop()
)
dummy_request_future_1 = event_listener.get_event(
dummy_request, tag="evt3", callback=lambda f: stop()
)
dummy_request_future_2 = event_listener.get_event(
dummy_request, timeout=10, tag="evt4"
)
self.assertEqual(4, len(event_listener.tag_map))
self.assertEqual(2, len(event_listener.request_map))
self.assertEqual(4, len(event_listener.tag_map))
self.assertEqual(2, len(event_listener.request_map))
me.fire_event({"data": "foo2"}, "evt2")
me.fire_event({"data": "foo3"}, "evt3")
self.wait()
event_listener.clean_by_request(self)
me.fire_event({"data": "foo1"}, "evt1")
me.fire_event({"data": "foo2"}, "evt2")
me.fire_event({"data": "foo3"}, "evt3")
self.wait()
event_listener.clean_by_request(self)
me.fire_event({"data": "foo1"}, "evt1")
self.assertTrue(request_future_1.done())
with self.assertRaises(saltnado.TimeoutException):
request_future_1.result()
self.assertTrue(request_future_1.done())
with self.assertRaises(saltnado.TimeoutException):
request_future_1.result()
self.assertTrue(request_future_2.done())
self.assertEqual(request_future_2.result()["tag"], "evt2")
self.assertEqual(request_future_2.result()["data"]["data"], "foo2")
self.assertTrue(request_future_2.done())
self.assertEqual(request_future_2.result()["tag"], "evt2")
self.assertEqual(request_future_2.result()["data"]["data"], "foo2")
self.assertTrue(dummy_request_future_1.done())
self.assertEqual(dummy_request_future_1.result()["tag"], "evt3")
self.assertEqual(dummy_request_future_1.result()["data"]["data"], "foo3")
self.assertTrue(dummy_request_future_1.done())
self.assertEqual(dummy_request_future_1.result()["tag"], "evt3")
self.assertEqual(
dummy_request_future_1.result()["data"]["data"], "foo3"
)
self.assertFalse(dummy_request_future_2.done())
self.assertFalse(dummy_request_future_2.done())
self.assertEqual(2, len(event_listener.tag_map))
self.assertEqual(1, len(event_listener.request_map))
self.assertEqual(2, len(event_listener.tag_map))
self.assertEqual(1, len(event_listener.request_map))
event_listener.clean_by_request(dummy_request)
event_listener.clean_by_request(dummy_request)
with self.assertRaises(saltnado.TimeoutException):
dummy_request_future_2.result()
with self.assertRaises(saltnado.TimeoutException):
dummy_request_future_2.result()
self.assertEqual(0, len(event_listener.tag_map))
self.assertEqual(0, len(event_listener.request_map))
self.assertEqual(0, len(event_listener.tag_map))
self.assertEqual(0, len(event_listener.request_map))

View file

@ -4,7 +4,6 @@
import os
import tempfile
import time
import salt.config
import salt.loader
@ -278,11 +277,19 @@ class SaltmodTestCase(TestCase, LoaderModuleMockMixin):
return {"tag": name, "data": {}}
return None
def __enter__(self):
return self
def __exit__(self, *args):
pass
with patch.object(
salt.utils.event, "get_event", MagicMock(return_value=Mockevent())
):
with patch.dict(saltmod.__opts__, {"sock_dir": True, "transport": True}):
with patch.object(time, "time", MagicMock(return_value=1.0)):
with patch(
"salt.states.saltmod.time.time", MagicMock(return_value=1.0)
):
self.assertDictEqual(
saltmod.wait_for_event(name, "salt", timeout=-1.0), ret
)

View file

@ -17,7 +17,6 @@ import salt.utils.event
import salt.utils.stringutils
import zmq
import zmq.eventloop.ioloop
from salt.ext.six.moves import range
from salt.ext.tornado.testing import AsyncTestCase
from saltfactories.utils.processes import terminate_process
from tests.support.events import eventpublisher_process, eventsender_process
@ -25,10 +24,6 @@ from tests.support.helpers import slowTest
from tests.support.runtests import RUNTIME_VARS
from tests.support.unit import TestCase, expectedFailure, skipIf
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, "ZMQIOLoop"):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
NO_LONG_IPC = False
if getattr(zmq, "IPC_PATH_MAX_LEN", 103) <= 103:
NO_LONG_IPC = True
@ -53,236 +48,254 @@ class TestSaltEvent(TestCase):
self.assertEqual(data[key], evt[key], assertMsg)
def test_master_event(self):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=False)
self.assertEqual(
me.puburi, "{}".format(os.path.join(self.sock_dir, "master_event_pub.ipc"))
)
self.assertEqual(
me.pulluri,
"{}".format(os.path.join(self.sock_dir, "master_event_pull.ipc")),
)
with salt.utils.event.MasterEvent(self.sock_dir, listen=False) as me:
self.assertEqual(
me.puburi,
"{}".format(os.path.join(self.sock_dir, "master_event_pub.ipc")),
)
self.assertEqual(
me.pulluri,
"{}".format(os.path.join(self.sock_dir, "master_event_pull.ipc")),
)
def test_minion_event(self):
opts = dict(id="foo", sock_dir=self.sock_dir)
id_hash = hashlib.sha256(
salt.utils.stringutils.to_bytes(opts["id"])
).hexdigest()[:10]
me = salt.utils.event.MinionEvent(opts, listen=False)
self.assertEqual(
me.puburi,
"{}".format(
os.path.join(self.sock_dir, "minion_event_{}_pub.ipc".format(id_hash))
),
)
self.assertEqual(
me.pulluri,
"{}".format(
os.path.join(self.sock_dir, "minion_event_{}_pull.ipc".format(id_hash))
),
)
with salt.utils.event.MinionEvent(opts, listen=False) as me:
self.assertEqual(
me.puburi,
"{}".format(
os.path.join(
self.sock_dir, "minion_event_{}_pub.ipc".format(id_hash)
)
),
)
self.assertEqual(
me.pulluri,
"{}".format(
os.path.join(
self.sock_dir, "minion_event_{}_pull.ipc".format(id_hash)
)
),
)
def test_minion_event_tcp_ipc_mode(self):
opts = dict(id="foo", ipc_mode="tcp")
me = salt.utils.event.MinionEvent(opts, listen=False)
self.assertEqual(me.puburi, 4510)
self.assertEqual(me.pulluri, 4511)
with salt.utils.event.MinionEvent(opts, listen=False) as me:
self.assertEqual(me.puburi, 4510)
self.assertEqual(me.pulluri, 4511)
def test_minion_event_no_id(self):
me = salt.utils.event.MinionEvent(dict(sock_dir=self.sock_dir), listen=False)
id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes("")).hexdigest()[:10]
self.assertEqual(
me.puburi,
"{}".format(
os.path.join(self.sock_dir, "minion_event_{}_pub.ipc".format(id_hash))
),
)
self.assertEqual(
me.pulluri,
"{}".format(
os.path.join(self.sock_dir, "minion_event_{}_pull.ipc".format(id_hash))
),
)
with salt.utils.event.MinionEvent(
dict(sock_dir=self.sock_dir), listen=False
) as me:
id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes("")).hexdigest()[
:10
]
self.assertEqual(
me.puburi,
"{}".format(
os.path.join(
self.sock_dir, "minion_event_{}_pub.ipc".format(id_hash)
)
),
)
self.assertEqual(
me.pulluri,
"{}".format(
os.path.join(
self.sock_dir, "minion_event_{}_pull.ipc".format(id_hash)
)
),
)
@slowTest
def test_event_single(self):
"""Test a single event is received"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event(tag="evt1")
self.assertGotEvent(evt1, {"data": "foo1"})
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event(tag="evt1")
self.assertGotEvent(evt1, {"data": "foo1"})
@slowTest
def test_event_single_no_block(self):
"""Test a single event is received, no block"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
start = time.time()
finish = start + 5
evt1 = me.get_event(wait=0, tag="evt1", no_block=True)
# We should get None and way before the 5 seconds wait since it's
# non-blocking, otherwise it would wait for an event which we
# didn't even send
self.assertIsNone(evt1, None)
self.assertLess(start, finish)
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event(wait=0, tag="evt1")
self.assertGotEvent(evt1, {"data": "foo1"})
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
start = time.time()
finish = start + 5
evt1 = me.get_event(wait=0, tag="evt1", no_block=True)
# We should get None and way before the 5 seconds wait since it's
# non-blocking, otherwise it would wait for an event which we
# didn't even send
self.assertIsNone(evt1, None)
self.assertLess(start, finish)
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event(wait=0, tag="evt1")
self.assertGotEvent(evt1, {"data": "foo1"})
@slowTest
def test_event_single_wait_0_no_block_False(self):
"""Test a single event is received with wait=0 and no_block=False and doesn't spin the while loop"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
me.fire_event({"data": "foo1"}, "evt1")
# This is too fast and will be None but assures we're not blocking
evt1 = me.get_event(wait=0, tag="evt1", no_block=False)
self.assertGotEvent(evt1, {"data": "foo1"})
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
me.fire_event({"data": "foo1"}, "evt1")
# This is too fast and will be None but assures we're not blocking
evt1 = me.get_event(wait=0, tag="evt1", no_block=False)
self.assertGotEvent(evt1, {"data": "foo1"})
@slowTest
def test_event_timeout(self):
"""Test no event is received if the timeout is reached"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event(tag="evt1")
self.assertGotEvent(evt1, {"data": "foo1"})
evt2 = me.get_event(tag="evt1")
self.assertIsNone(evt2)
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event(tag="evt1")
self.assertGotEvent(evt1, {"data": "foo1"})
evt2 = me.get_event(tag="evt1")
self.assertIsNone(evt2)
@slowTest
def test_event_no_timeout(self):
"""Test no wait timeout, we should block forever, until we get one """
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
with eventsender_process({"data": "foo2"}, "evt2", self.sock_dir, 5):
evt = me.get_event(tag="evt2", wait=0, no_block=False)
self.assertGotEvent(evt, {"data": "foo2"})
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
with eventsender_process({"data": "foo2"}, "evt2", self.sock_dir, 5):
evt = me.get_event(tag="evt2", wait=0, no_block=False)
self.assertGotEvent(evt, {"data": "foo2"})
@slowTest
def test_event_matching(self):
"""Test a startswith match"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event(tag="ev")
self.assertGotEvent(evt1, {"data": "foo1"})
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event(tag="ev")
self.assertGotEvent(evt1, {"data": "foo1"})
@slowTest
def test_event_matching_regex(self):
"""Test a regex match"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event(tag="^ev", match_type="regex")
self.assertGotEvent(evt1, {"data": "foo1"})
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event(tag="^ev", match_type="regex")
self.assertGotEvent(evt1, {"data": "foo1"})
@slowTest
def test_event_matching_all(self):
"""Test an all match"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event(tag="")
self.assertGotEvent(evt1, {"data": "foo1"})
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event(tag="")
self.assertGotEvent(evt1, {"data": "foo1"})
@slowTest
def test_event_matching_all_when_tag_is_None(self):
"""Test event matching all when not passing a tag"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event()
self.assertGotEvent(evt1, {"data": "foo1"})
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
me.fire_event({"data": "foo1"}, "evt1")
evt1 = me.get_event()
self.assertGotEvent(evt1, {"data": "foo1"})
@slowTest
def test_event_not_subscribed(self):
"""Test get_event drops non-subscribed events"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
me.fire_event({"data": "foo1"}, "evt1")
me.fire_event({"data": "foo2"}, "evt2")
evt2 = me.get_event(tag="evt2")
evt1 = me.get_event(tag="evt1")
self.assertGotEvent(evt2, {"data": "foo2"})
self.assertIsNone(evt1)
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
me.fire_event({"data": "foo1"}, "evt1")
me.fire_event({"data": "foo2"}, "evt2")
evt2 = me.get_event(tag="evt2")
evt1 = me.get_event(tag="evt1")
self.assertGotEvent(evt2, {"data": "foo2"})
self.assertIsNone(evt1)
@slowTest
def test_event_subscription_cache(self):
"""Test subscriptions cache a message until requested"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
me.subscribe("evt1")
me.fire_event({"data": "foo1"}, "evt1")
me.fire_event({"data": "foo2"}, "evt2")
evt2 = me.get_event(tag="evt2")
evt1 = me.get_event(tag="evt1")
self.assertGotEvent(evt2, {"data": "foo2"})
self.assertGotEvent(evt1, {"data": "foo1"})
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
me.subscribe("evt1")
me.fire_event({"data": "foo1"}, "evt1")
me.fire_event({"data": "foo2"}, "evt2")
evt2 = me.get_event(tag="evt2")
evt1 = me.get_event(tag="evt1")
self.assertGotEvent(evt2, {"data": "foo2"})
self.assertGotEvent(evt1, {"data": "foo1"})
@slowTest
def test_event_subscriptions_cache_regex(self):
"""Test regex subscriptions cache a message until requested"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
me.subscribe("e..1$", "regex")
me.fire_event({"data": "foo1"}, "evt1")
me.fire_event({"data": "foo2"}, "evt2")
evt2 = me.get_event(tag="evt2")
evt1 = me.get_event(tag="evt1")
self.assertGotEvent(evt2, {"data": "foo2"})
self.assertGotEvent(evt1, {"data": "foo1"})
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
me.subscribe("e..1$", "regex")
me.fire_event({"data": "foo1"}, "evt1")
me.fire_event({"data": "foo2"}, "evt2")
evt2 = me.get_event(tag="evt2")
evt1 = me.get_event(tag="evt1")
self.assertGotEvent(evt2, {"data": "foo2"})
self.assertGotEvent(evt1, {"data": "foo1"})
@slowTest
def test_event_multiple_clients(self):
"""Test event is received by multiple clients"""
with eventpublisher_process(self.sock_dir):
me1 = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
me2 = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
# We need to sleep here to avoid a race condition wherein
# the second socket may not be connected by the time the first socket
# sends the event.
time.sleep(0.5)
me1.fire_event({"data": "foo1"}, "evt1")
evt1 = me1.get_event(tag="evt1")
self.assertGotEvent(evt1, {"data": "foo1"})
evt2 = me2.get_event(tag="evt1")
self.assertGotEvent(evt2, {"data": "foo1"})
with salt.utils.event.MasterEvent(
self.sock_dir, listen=True
) as me1, salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me2:
# We need to sleep here to avoid a race condition wherein
# the second socket may not be connected by the time the first socket
# sends the event.
time.sleep(0.5)
me1.fire_event({"data": "foo1"}, "evt1")
evt1 = me1.get_event(tag="evt1")
self.assertGotEvent(evt1, {"data": "foo1"})
evt2 = me2.get_event(tag="evt1")
self.assertGotEvent(evt2, {"data": "foo1"})
@expectedFailure
def test_event_nested_sub_all(self):
"""Test nested event subscriptions do not drop events, get event for all tags"""
# Show why not to call get_event(tag='')
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
me.fire_event({"data": "foo1"}, "evt1")
me.fire_event({"data": "foo2"}, "evt2")
evt2 = me.get_event(tag="")
evt1 = me.get_event(tag="")
self.assertGotEvent(evt2, {"data": "foo2"})
self.assertGotEvent(evt1, {"data": "foo1"})
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
me.fire_event({"data": "foo1"}, "evt1")
me.fire_event({"data": "foo2"}, "evt2")
evt2 = me.get_event(tag="")
evt1 = me.get_event(tag="")
self.assertGotEvent(evt2, {"data": "foo2"})
self.assertGotEvent(evt1, {"data": "foo1"})
@slowTest
def test_event_many(self):
"""Test a large number of events, one at a time"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
for i in range(500):
me.fire_event({"data": "{}".format(i)}, "testevents")
evt = me.get_event(tag="testevents")
self.assertGotEvent(evt, {"data": "{}".format(i)}, "Event {}".format(i))
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
for i in range(500):
me.fire_event({"data": "{}".format(i)}, "testevents")
evt = me.get_event(tag="testevents")
self.assertGotEvent(
evt, {"data": "{}".format(i)}, "Event {}".format(i)
)
@slowTest
def test_event_many_backlog(self):
"""Test a large number of events, send all then recv all"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
# Must not exceed zmq HWM
for i in range(500):
me.fire_event({"data": "{}".format(i)}, "testevents")
for i in range(500):
evt = me.get_event(tag="testevents")
self.assertGotEvent(evt, {"data": "{}".format(i)}, "Event {}".format(i))
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
# Must not exceed zmq HWM
for i in range(500):
me.fire_event({"data": "{}".format(i)}, "testevents")
for i in range(500):
evt = me.get_event(tag="testevents")
self.assertGotEvent(
evt, {"data": "{}".format(i)}, "Event {}".format(i)
)
# Test the fire_master function. As it wraps the underlying fire_event,
# we don't need to perform extensive testing.
@ -290,15 +303,20 @@ class TestSaltEvent(TestCase):
def test_send_master_event(self):
"""Tests that sending an event through fire_master generates expected event"""
with eventpublisher_process(self.sock_dir):
me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
data = {"data": "foo1"}
me.fire_master(data, "test_master")
with salt.utils.event.MasterEvent(self.sock_dir, listen=True) as me:
data = {"data": "foo1"}
me.fire_master(data, "test_master")
evt = me.get_event(tag="fire_master")
self.assertGotEvent(
evt,
{"data": data, "tag": "test_master", "events": None, "pretag": None},
)
evt = me.get_event(tag="fire_master")
self.assertGotEvent(
evt,
{
"data": data,
"tag": "test_master",
"events": None,
"pretag": None,
},
)
class TestAsyncEventPublisher(AsyncTestCase):
@ -319,30 +337,35 @@ class TestAsyncEventPublisher(AsyncTestCase):
self.event.subscribe("")
self.event.set_event_handler(self._handle_publish)
def stop(self, _arg=None, **kwargs):
self.publisher.close()
self.event.destroy()
super().stop(_arg=_arg, **kwargs)
def _handle_publish(self, raw):
self.tag, self.data = salt.utils.event.SaltEvent.unpack(raw)
self.stop()
def test_event_subscription(self):
"""Test a single event is received"""
me = salt.utils.event.MinionEvent(self.opts, listen=True)
me.fire_event({"data": "foo1"}, "evt1")
self.wait()
evt1 = me.get_event(tag="evt1")
self.assertEqual(self.tag, "evt1")
self.data.pop("_stamp") # drop the stamp
self.assertEqual(self.data, {"data": "foo1"})
with salt.utils.event.MinionEvent(self.opts, listen=True) as me:
me.fire_event({"data": "foo1"}, "evt1")
self.wait()
evt1 = me.get_event(tag="evt1")
self.assertEqual(self.tag, "evt1")
self.data.pop("_stamp") # drop the stamp
self.assertEqual(self.data, {"data": "foo1"})
def test_event_unsubscribe_remove_error(self):
me = salt.utils.event.MinionEvent(self.opts, listen=True)
tag = "evt1"
me.fire_event({"data": "foo1"}, tag)
with salt.utils.event.MinionEvent(self.opts, listen=True) as me:
tag = "evt1"
me.fire_event({"data": "foo1"}, tag)
# Make sure no remove error is raised when tag is not found
for _ in range(2):
me.unsubscribe(tag)
# Make sure no remove error is raised when tag is not found
for _ in range(2):
me.unsubscribe(tag)
me.unsubscribe("tag_does_not_exist")
me.unsubscribe("tag_does_not_exist")
class TestEventReturn(TestCase):