salt/tests/support/events.py
2024-02-29 10:16:58 +00:00

76 lines
2 KiB
Python

"""
tests.support.events
~~~~~~~~~~~~~~~~~~~~
"""
import multiprocessing
import os
import time
from contextlib import contextmanager
import salt.utils.event
from salt.utils.process import Process, clean_proc
@contextmanager
def eventpublisher_process(sock_dir):
opts = {
"sock_dir": sock_dir,
"interface": "127.0.0.1",
"publish_port": 4506,
"ipv6": None,
"zmq_filtering": None,
}
ipc_publisher = salt.transport.publish_server(
opts,
pub_path=os.path.join(opts["sock_dir"], "master_event_pub.ipc"),
pull_path=os.path.join(opts["sock_dir"], "master_event_pull.ipc"),
transport="tcp",
)
proc = Process(
target=ipc_publisher.publish_daemon,
args=[
ipc_publisher.publish_payload,
],
)
# proc = salt.utils.event.EventPublisher({"sock_dir": sock_dir})
proc.start()
try:
if os.environ.get("TRAVIS_PYTHON_VERSION", None) is not None:
# Travis is slow
time.sleep(10)
else:
time.sleep(8)
yield
finally:
clean_proc(proc)
class EventSender(multiprocessing.Process):
def __init__(self, data, tag, wait, sock_dir):
super().__init__()
self.data = data
self.tag = tag
self.wait = wait
self.sock_dir = sock_dir
def run(self):
with salt.utils.event.MasterEvent(self.sock_dir, listen=False) as me:
time.sleep(self.wait)
me.fire_event(self.data, self.tag)
# Wait a few seconds before tearing down the zmq context
if os.environ.get("TRAVIS_PYTHON_VERSION", None) is not None:
# Travis is slow
time.sleep(10)
else:
time.sleep(2)
@contextmanager
def eventsender_process(data, tag, sock_dir, wait=0):
proc = EventSender(data, tag, wait, sock_dir=sock_dir)
proc.start()
try:
yield
finally:
clean_proc(proc)