mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Implement full event caching for subscribed tags
Require all multitasking contexts to subscribe to their events so one call to get_event for one tag does not discard events that should be saved for a subsequent call to get_event with another tag. Use blocking get_event in batching with very small timeout. Fixes #25998
This commit is contained in:
parent
2dca8d959b
commit
65acf975dd
4 changed files with 141 additions and 155 deletions
|
@ -202,6 +202,9 @@ class LocalClient(object):
|
|||
timeout=timeout,
|
||||
)
|
||||
|
||||
if 'jid' in pub_data:
|
||||
self.event.subscribe(pub_data['jid'])
|
||||
|
||||
return pub_data
|
||||
|
||||
def _check_pub_data(self, pub_data):
|
||||
|
@ -233,6 +236,10 @@ class LocalClient(object):
|
|||
print('No minions matched the target. '
|
||||
'No command was sent, no jid was assigned.')
|
||||
return {}
|
||||
else:
|
||||
self.event.subscribe_regex('^syndic/.*/{0}'.format(pub_data['jid']))
|
||||
|
||||
self.event.subscribe('salt/job/{0}'.format(pub_data['jid']))
|
||||
|
||||
return pub_data
|
||||
|
||||
|
@ -263,9 +270,6 @@ class LocalClient(object):
|
|||
'''
|
||||
arg = salt.utils.args.condition_input(arg, kwarg)
|
||||
|
||||
# Subscribe to all events and subscribe as early as possible
|
||||
self.event.subscribe(jid)
|
||||
|
||||
try:
|
||||
pub_data = self.pub(
|
||||
tgt,
|
||||
|
@ -797,8 +801,6 @@ class LocalClient(object):
|
|||
def get_returns_no_block(
|
||||
self,
|
||||
jid,
|
||||
event=None,
|
||||
gather_errors=False,
|
||||
tags_regex=None
|
||||
):
|
||||
'''
|
||||
|
@ -806,48 +808,16 @@ class LocalClient(object):
|
|||
|
||||
Yield either the raw event data or None
|
||||
|
||||
Pass a list of additional regular expressions as `tags_regex` to search
|
||||
the event bus for non-return data, such as minion lists returned from
|
||||
syndics.
|
||||
Pass a list of additional regular expressions as `tags_regex` to search
|
||||
the event bus for non-return data, such as minion lists returned from
|
||||
syndics.
|
||||
'''
|
||||
if event is None:
|
||||
event = self.event
|
||||
|
||||
jid_tag = 'salt/job/{0}'.format(jid)
|
||||
jid_tag_regex = '^salt/job/{0}'.format(jid)
|
||||
|
||||
tag_search = []
|
||||
tag_search.append(re.compile(jid_tag_regex))
|
||||
if isinstance(tags_regex, str):
|
||||
tag_search.append(re.compile(tags_regex))
|
||||
elif isinstance(tags_regex, list):
|
||||
for tag in tags_regex:
|
||||
tag_search.append(re.compile(tag))
|
||||
while True:
|
||||
if self.opts.get('transport') == 'zeromq':
|
||||
try:
|
||||
raw = event.get_event_noblock()
|
||||
if gather_errors:
|
||||
if (raw and
|
||||
(raw.get('tag', '').startswith('_salt_error') or
|
||||
any([tag.search(raw.get('tag', '')) for tag in tag_search]))):
|
||||
yield raw
|
||||
else:
|
||||
if raw and raw.get('tag', '').startswith(jid_tag):
|
||||
yield raw
|
||||
else:
|
||||
yield None
|
||||
except zmq.ZMQError as ex:
|
||||
if ex.errno == errno.EAGAIN or ex.errno == errno.EINTR:
|
||||
yield None
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
raw = event.get_event_noblock()
|
||||
if raw and raw.get('tag', '').startswith(jid_tag):
|
||||
yield raw
|
||||
else:
|
||||
yield None
|
||||
# TODO(driskell): This was previously completely nonblocking.
|
||||
# Should get_event have a nonblock option?
|
||||
raw = self.event.get_event(wait=0.01, tag='salt/job/{0}'.format(jid), tags_regex=tags_regex, full=True)
|
||||
yield raw
|
||||
|
||||
def get_iter_returns(
|
||||
self,
|
||||
|
@ -857,7 +827,6 @@ class LocalClient(object):
|
|||
tgt='*',
|
||||
tgt_type='glob',
|
||||
expect_minions=False,
|
||||
gather_errors=True,
|
||||
block=True,
|
||||
**kwargs):
|
||||
'''
|
||||
|
@ -894,9 +863,9 @@ class LocalClient(object):
|
|||
# iterator for this job's return
|
||||
if self.opts['order_masters']:
|
||||
# If we are a MoM, we need to gather expected minions from downstreams masters.
|
||||
ret_iter = self.get_returns_no_block(jid, gather_errors=gather_errors, tags_regex='^syndic/.*/{0}'.format(jid))
|
||||
ret_iter = self.get_returns_no_block(jid, tags_regex=['^syndic/.*/{0}'.format(jid)])
|
||||
else:
|
||||
ret_iter = self.get_returns_no_block(jid, gather_errors=gather_errors)
|
||||
ret_iter = self.get_returns_no_block(jid)
|
||||
# iterator for the info of this job
|
||||
jinfo_iter = []
|
||||
timeout_at = time.time() + timeout
|
||||
|
@ -915,10 +884,6 @@ class LocalClient(object):
|
|||
# if we got None, then there were no events
|
||||
if raw is None:
|
||||
break
|
||||
if gather_errors:
|
||||
if raw['tag'] == '_salt_error':
|
||||
ret = {raw['data']['id']: raw['data']['data']}
|
||||
yield ret
|
||||
if 'minions' in raw.get('data', {}):
|
||||
minions.update(raw['data']['minions'])
|
||||
continue
|
||||
|
@ -965,15 +930,6 @@ class LocalClient(object):
|
|||
# if the jinfo has timed out and some minions are still running the job
|
||||
# re-do the ping
|
||||
if time.time() > timeout_at and minions_running:
|
||||
# need our own event listener, so we don't clobber the class one
|
||||
event = salt.utils.event.get_event(
|
||||
'master',
|
||||
self.opts['sock_dir'],
|
||||
self.opts['transport'],
|
||||
opts=self.opts,
|
||||
listen=not self.opts.get('__worker', False))
|
||||
# start listening for new events, before firing off the pings
|
||||
event.connect_pub()
|
||||
# since this is a new ping, no one has responded yet
|
||||
jinfo = self.gather_job_info(jid, tgt, tgt_type)
|
||||
minions_running = False
|
||||
|
@ -982,7 +938,7 @@ class LocalClient(object):
|
|||
if 'jid' not in jinfo:
|
||||
jinfo_iter = []
|
||||
else:
|
||||
jinfo_iter = self.get_returns_no_block(jinfo['jid'], event=event)
|
||||
jinfo_iter = self.get_returns_no_block(jinfo['jid'])
|
||||
timeout_at = time.time() + self.opts['gather_job_timeout']
|
||||
# if you are a syndic, wait a little longer
|
||||
if self.opts['order_masters']:
|
||||
|
@ -1044,8 +1000,7 @@ class LocalClient(object):
|
|||
self,
|
||||
jid,
|
||||
minions,
|
||||
timeout=None,
|
||||
pending_tags=None):
|
||||
timeout=None):
|
||||
'''
|
||||
Get the returns for the command line interface via the event system
|
||||
'''
|
||||
|
|
|
@ -261,8 +261,6 @@ class EventListener(object):
|
|||
opts=opts,
|
||||
)
|
||||
|
||||
self.event.subscribe() # start listening for events immediately
|
||||
|
||||
# tag -> list of futures
|
||||
self.tag_map = defaultdict(list)
|
||||
|
||||
|
|
|
@ -59,6 +59,7 @@ import logging
|
|||
import time
|
||||
import datetime
|
||||
import multiprocessing
|
||||
import re
|
||||
from collections import MutableMapping
|
||||
|
||||
# Import third party libs
|
||||
|
@ -169,10 +170,12 @@ class SaltEvent(object):
|
|||
if sock_dir is None:
|
||||
sock_dir = opts.get('sock_dir', None)
|
||||
self.puburi, self.pulluri = self.__load_uri(sock_dir, node)
|
||||
self.subscribe()
|
||||
self.pending_tags = []
|
||||
self.pending_rtags = []
|
||||
self.pending_events = []
|
||||
# since ZMQ connect() has no guarantees about the socket actually being
|
||||
# connected this is a hack to attempt to do so.
|
||||
self.connect_pub()
|
||||
self.fire_event({}, tagify('event/new_client'), 0)
|
||||
self.get_event(wait=1)
|
||||
|
||||
|
@ -223,17 +226,54 @@ class SaltEvent(object):
|
|||
)
|
||||
return puburi, pulluri
|
||||
|
||||
def subscribe(self, tag=None):
|
||||
def subscribe(self, tag):
|
||||
'''
|
||||
Subscribe to events matching the passed tag.
|
||||
'''
|
||||
if not self.cpub:
|
||||
self.connect_pub()
|
||||
|
||||
def unsubscribe(self, tag=None):
|
||||
If you do not subscribe to a tag, events will be discarded by calls to
|
||||
get_event that request a different tag. In contexts where many different
|
||||
jobs are outstanding it is important to subscribe to prevent one call
|
||||
to get_event from discarding a response required by a subsequent call
|
||||
to get_event.
|
||||
'''
|
||||
self.pending_tags.append(tag)
|
||||
|
||||
return
|
||||
|
||||
def subscribe_regex(self, tag_regex):
|
||||
'''
|
||||
Subscribe to events matching the passed tag expression.
|
||||
|
||||
If you do not subscribe to a tag, events will be discarded by calls to
|
||||
get_event that request a different tag. In contexts where many different
|
||||
jobs are outstanding it is important to subscribe to prevent one call
|
||||
to get_event from discarding a response required by a subsequent call
|
||||
to get_event.
|
||||
'''
|
||||
self.pending_rtags.append(re.compile(tag_regex))
|
||||
|
||||
return
|
||||
|
||||
def unsubscribe(self, tag):
|
||||
'''
|
||||
Un-subscribe to events matching the passed tag.
|
||||
'''
|
||||
self.pending_tags.remove(tag)
|
||||
|
||||
return
|
||||
|
||||
def unsubscribe_regex(self, tag_regex):
|
||||
'''
|
||||
Un-subscribe to events matching the passed tag.
|
||||
'''
|
||||
self.pending_rtags.remove(tag_regex)
|
||||
|
||||
old_events = self.pending_events
|
||||
self.pending_events = []
|
||||
for evt in old_events:
|
||||
if any(evt['tag'].startswith(ptag) for ptag in self.pending_tags) or any(rtag.search(evt['tag']) for rtag in self.pending_rtags):
|
||||
self.pending_events.append(evt)
|
||||
|
||||
return
|
||||
|
||||
def connect_pub(self):
|
||||
|
@ -276,29 +316,32 @@ class SaltEvent(object):
|
|||
data = serial.loads(mdata)
|
||||
return mtag, data
|
||||
|
||||
def _check_pending(self, tag, pending_tags):
|
||||
def _check_pending(self, tag, tags_regex):
|
||||
"""Check the pending_events list for events that match the tag
|
||||
|
||||
:param tag: The tag to search for
|
||||
:type tag: str
|
||||
:param pending_tags: List of tags to preserve
|
||||
:type pending_tags: list[str]
|
||||
:param tags_regex: List of re expressions to search for also
|
||||
:type tags_regex: list[re.compile()]
|
||||
:return:
|
||||
"""
|
||||
old_events = self.pending_events
|
||||
self.pending_events = []
|
||||
ret = None
|
||||
for evt in old_events:
|
||||
if evt['tag'].startswith(tag):
|
||||
if evt['tag'].startswith(tag) or any(rtag.search(evt['tag']) for rtag in tags_regex):
|
||||
if ret is None:
|
||||
ret = evt
|
||||
log.trace('get_event() returning cached event = {0}'.format(ret))
|
||||
else:
|
||||
self.pending_events.append(evt)
|
||||
elif any(evt['tag'].startswith(ptag) for ptag in pending_tags):
|
||||
elif any(evt['tag'].startswith(ptag) for ptag in self.pending_tags) or any(rtag.search(evt['tag']) for rtag in self.pending_rtags):
|
||||
self.pending_events.append(evt)
|
||||
else:
|
||||
log.trace('get_event() discarding cached event that no longer has any subscriptions = {0}'.format(evt))
|
||||
return ret
|
||||
|
||||
def _get_event(self, wait, tag, pending_tags):
|
||||
def _get_event(self, wait, tag, tags_regex):
|
||||
start = time.time()
|
||||
timeout_at = start + wait
|
||||
while not wait or time.time() <= timeout_at:
|
||||
|
@ -316,8 +359,10 @@ class SaltEvent(object):
|
|||
else:
|
||||
raise
|
||||
|
||||
if not ret['tag'].startswith(tag): # tag not match
|
||||
if any(ret['tag'].startswith(ptag) for ptag in pending_tags):
|
||||
if not ret['tag'].startswith(tag) and not any(rtag.search(ret['tag']) for rtag in tags_regex):
|
||||
# tag not match
|
||||
if any(ret['tag'].startswith(ptag) for ptag in self.pending_tags) or any(rtag.search(ret['tag']) for rtag in self.pending_rtags):
|
||||
log.trace('get_event() caching unwanted event = {0}'.format(ret))
|
||||
self.pending_events.append(ret)
|
||||
if wait: # only update the wait timeout if we had one
|
||||
wait = timeout_at - time.time()
|
||||
|
@ -328,7 +373,7 @@ class SaltEvent(object):
|
|||
|
||||
return None
|
||||
|
||||
def get_event(self, wait=5, tag='', full=False, use_pending=False, pending_tags=None):
|
||||
def get_event(self, wait=5, tag='', tags_regex=None, full=False):
|
||||
'''
|
||||
Get a single publication.
|
||||
IF no publication available THEN block for up to wait seconds
|
||||
|
@ -336,29 +381,32 @@ class SaltEvent(object):
|
|||
|
||||
IF wait is 0 then block forever.
|
||||
|
||||
New in Boron always checks the list of pending events
|
||||
A tag specification can be given to only return publications with a tag
|
||||
STARTING WITH a given string (tag) OR MATCHING one or more string
|
||||
regular expressions (tags_regex list). If tag is not specified or given
|
||||
as an empty string, all events are considered.
|
||||
|
||||
use_pending
|
||||
Defines whether to keep all unconsumed events in a pending_events
|
||||
list, or to discard events that don't match the requested tag. If
|
||||
set to True, MAY CAUSE MEMORY LEAKS.
|
||||
Searches cached publications first. If no cached publications are found
|
||||
that match the given tag specification, new publications are received
|
||||
and checked.
|
||||
|
||||
pending_tags
|
||||
Add any events matching the listed tags to the pending queue.
|
||||
Still MAY CAUSE MEMORY LEAKS but less likely than use_pending
|
||||
assuming you later get_event for the tags you've listed here
|
||||
If a publication is received that does not match the tag specification,
|
||||
it is DISCARDED unless it is subscribed to via subscribe() and
|
||||
subscribe_regex() which will cause it to be cached.
|
||||
|
||||
New in Boron
|
||||
If a caller is not going to call get_event immediately after sending a
|
||||
request, it MUST subscribe the result to ensure the response is not lost
|
||||
should other regions of code call get_event for other purposes.
|
||||
'''
|
||||
|
||||
if pending_tags is None:
|
||||
pending_tags = []
|
||||
if use_pending:
|
||||
pending_tags = ['']
|
||||
if tags_regex is None:
|
||||
tags_regex = []
|
||||
else:
|
||||
tags_regex = [re.compile(rtag) for rtag in tags_regex]
|
||||
|
||||
ret = self._check_pending(tag, pending_tags)
|
||||
ret = self._check_pending(tag, tags_regex)
|
||||
if ret is None:
|
||||
ret = self._get_event(wait, tag, pending_tags)
|
||||
ret = self._get_event(wait, tag, tags_regex)
|
||||
|
||||
if ret is None or full:
|
||||
return ret
|
||||
|
|
|
@ -150,11 +150,10 @@ class TestSaltEvent(TestCase):
|
|||
)
|
||||
)
|
||||
|
||||
def test_event_subscription(self):
|
||||
def test_event_single(self):
|
||||
'''Test a single event is received'''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.subscribe()
|
||||
me.fire_event({'data': 'foo1'}, 'evt1')
|
||||
evt1 = me.get_event(tag='evt1')
|
||||
self.assertGotEvent(evt1, {'data': 'foo1'})
|
||||
|
@ -163,7 +162,6 @@ class TestSaltEvent(TestCase):
|
|||
'''Test no event is received if the timeout is reached'''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.subscribe()
|
||||
me.fire_event({'data': 'foo1'}, 'evt1')
|
||||
evt1 = me.get_event(tag='evt1')
|
||||
self.assertGotEvent(evt1, {'data': 'foo1'})
|
||||
|
@ -174,89 +172,79 @@ class TestSaltEvent(TestCase):
|
|||
'''Test no wait timeout, we should block forever, until we get one '''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.subscribe()
|
||||
me.fire_event({'data': 'foo1'}, 'evt1')
|
||||
me.fire_event({'data': 'foo2'}, 'evt2')
|
||||
evt = me.get_event(tag='evt2', wait=0)
|
||||
with eventsender_process({'data': 'foo2'}, 'evt2', 5):
|
||||
evt = me.get_event(tag='evt2', wait=0)
|
||||
self.assertGotEvent(evt, {'data': 'foo2'})
|
||||
|
||||
def test_event_subscription_matching(self):
|
||||
'''Test a subscription startswith matching'''
|
||||
def test_event_matching(self):
|
||||
'''Test a startswith match'''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.subscribe()
|
||||
me.fire_event({'data': 'foo1'}, 'evt1')
|
||||
evt1 = me.get_event(tag='evt1')
|
||||
evt1 = me.get_event(tag='ev')
|
||||
self.assertGotEvent(evt1, {'data': 'foo1'})
|
||||
|
||||
def test_event_subscription_matching_all(self):
|
||||
'''Test a subscription matching'''
|
||||
def test_event_matching_regex(self):
|
||||
'''Test a regex match'''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.fire_event({'data': 'foo1'}, 'evt1')
|
||||
evt1 = me.get_event(tag='not', tags_regex=['^ev'])
|
||||
self.assertGotEvent(evt1, {'data': 'foo1'})
|
||||
|
||||
def test_event_matching_all(self):
|
||||
'''Test an all match'''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.subscribe()
|
||||
me.fire_event({'data': 'foo1'}, 'evt1')
|
||||
evt1 = me.get_event(tag='')
|
||||
self.assertGotEvent(evt1, {'data': 'foo1'})
|
||||
|
||||
def test_event_not_subscribed(self):
|
||||
'''Test get event ignores non-subscribed events'''
|
||||
'''Test get_event drops non-subscribed events'''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.subscribe()
|
||||
with eventsender_process({'data': 'foo1'}, 'evt1', 5):
|
||||
me.fire_event({'data': 'foo1'}, 'evt2')
|
||||
evt1 = me.get_event(tag='evt1', wait=10)
|
||||
me.fire_event({'data': 'foo1'}, 'evt1')
|
||||
me.fire_event({'data': 'foo2'}, 'evt2')
|
||||
evt2 = me.get_event(tag='evt2')
|
||||
evt1 = me.get_event(tag='evt1')
|
||||
self.assertGotEvent(evt2, {'data': 'foo2'})
|
||||
self.assertIsNone(evt1)
|
||||
|
||||
def test_event_subscription_cache(self):
|
||||
'''Test subscriptions cache a message until requested'''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.subscribe('evt1')
|
||||
me.fire_event({'data': 'foo1'}, 'evt1')
|
||||
me.fire_event({'data': 'foo2'}, 'evt2')
|
||||
evt2 = me.get_event(tag='evt2')
|
||||
evt1 = me.get_event(tag='evt1')
|
||||
self.assertGotEvent(evt2, {'data': 'foo2'})
|
||||
self.assertGotEvent(evt1, {'data': 'foo1'})
|
||||
|
||||
def test_event_multiple_subscriptions(self):
|
||||
'''Test multiple subscriptions do not interfere'''
|
||||
def test_event_subscriptions_cache_regex(self):
|
||||
'''Test regex subscriptions cache a message until requested'''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.subscribe()
|
||||
with eventsender_process({'data': 'foo1'}, 'evt1', 5):
|
||||
me.fire_event({'data': 'foo1'}, 'evt2')
|
||||
evt1 = me.get_event(tag='evt1', wait=10)
|
||||
me.subscribe_regex('1$')
|
||||
me.fire_event({'data': 'foo1'}, 'evt1')
|
||||
me.fire_event({'data': 'foo2'}, 'evt2')
|
||||
evt2 = me.get_event(tag='evt2')
|
||||
evt1 = me.get_event(tag='evt1')
|
||||
self.assertGotEvent(evt2, {'data': 'foo2'})
|
||||
self.assertGotEvent(evt1, {'data': 'foo1'})
|
||||
|
||||
def test_event_multiple_clients(self):
|
||||
'''Test event is received by multiple clients'''
|
||||
with eventpublisher_process():
|
||||
me1 = event.MasterEvent(SOCK_DIR)
|
||||
me1.subscribe()
|
||||
me2 = event.MasterEvent(SOCK_DIR)
|
||||
me2.subscribe()
|
||||
me1.fire_event({'data': 'foo1'}, 'evt1')
|
||||
evt1 = me1.get_event(tag='evt1')
|
||||
self.assertGotEvent(evt1, {'data': 'foo1'})
|
||||
# Can't replicate this failure in the wild, need to fix the
|
||||
# test system bug here
|
||||
#evt2 = me2.get_event(tag='evt1')
|
||||
#self.assertGotEvent(evt2, {'data': 'foo1'})
|
||||
|
||||
def test_event_nested_subs(self):
|
||||
'''Test nested event subscriptions do not drop events, issue #8580'''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.subscribe()
|
||||
me.fire_event({'data': 'foo1'}, 'evt1')
|
||||
me.fire_event({'data': 'foo2'}, 'evt2')
|
||||
# Since we now drop unrelated events to avoid memory leaks, see http://goo.gl/2n3L09 commit bcbc5340ef, the
|
||||
# calls below will return None and will drop the unrelated events
|
||||
evt2 = me.get_event(tag='evt2')
|
||||
evt1 = me.get_event(tag='evt1')
|
||||
self.assertGotEvent(evt2, {'data': 'foo2'})
|
||||
# This one will be None because we're dripping unrelated events
|
||||
self.assertIsNone(evt1)
|
||||
|
||||
# Fire events again
|
||||
me.fire_event({'data': 'foo3'}, 'evt3')
|
||||
me.fire_event({'data': 'foo4'}, 'evt4')
|
||||
# We not force unrelated pending events not to be dropped, so both of the event bellow work and are not
|
||||
# None
|
||||
evt2 = me.get_event(tag='evt4', use_pending=True)
|
||||
evt1 = me.get_event(tag='evt3', use_pending=True)
|
||||
self.assertGotEvent(evt2, {'data': 'foo4'})
|
||||
self.assertGotEvent(evt1, {'data': 'foo3'})
|
||||
evt2 = me2.get_event(tag='evt1')
|
||||
self.assertGotEvent(evt2, {'data': 'foo1'})
|
||||
|
||||
@expectedFailure
|
||||
def test_event_nested_sub_all(self):
|
||||
|
@ -264,7 +252,6 @@ class TestSaltEvent(TestCase):
|
|||
# Show why not to call get_event(tag='')
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.subscribe()
|
||||
me.fire_event({'data': 'foo1'}, 'evt1')
|
||||
me.fire_event({'data': 'foo2'}, 'evt2')
|
||||
evt2 = me.get_event(tag='')
|
||||
|
@ -276,7 +263,6 @@ class TestSaltEvent(TestCase):
|
|||
'''Test a large number of events, one at a time'''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.subscribe()
|
||||
for i in xrange(500):
|
||||
me.fire_event({'data': '{0}'.format(i)}, 'testevents')
|
||||
evt = me.get_event(tag='testevents')
|
||||
|
@ -286,7 +272,6 @@ class TestSaltEvent(TestCase):
|
|||
'''Test a large number of events, send all then recv all'''
|
||||
with eventpublisher_process():
|
||||
me = event.MasterEvent(SOCK_DIR)
|
||||
me.subscribe()
|
||||
# Must not exceed zmq HWM
|
||||
for i in xrange(500):
|
||||
me.fire_event({'data': '{0}'.format(i)}, 'testevents')
|
||||
|
|
Loading…
Add table
Reference in a new issue