Merge pull request #26000 from driskell/fix_discarded_events

Implement full event caching for subscribed tags
This commit is contained in:
Mike Place 2015-08-10 12:57:17 -06:00
commit f39780f8ce
4 changed files with 141 additions and 155 deletions

View file

@ -202,6 +202,9 @@ class LocalClient(object):
timeout=timeout,
)
if 'jid' in pub_data:
self.event.subscribe(pub_data['jid'])
return pub_data
def _check_pub_data(self, pub_data):
@ -233,6 +236,10 @@ class LocalClient(object):
print('No minions matched the target. '
'No command was sent, no jid was assigned.')
return {}
else:
self.event.subscribe_regex('^syndic/.*/{0}'.format(pub_data['jid']))
self.event.subscribe('salt/job/{0}'.format(pub_data['jid']))
return pub_data
@ -263,9 +270,6 @@ class LocalClient(object):
'''
arg = salt.utils.args.condition_input(arg, kwarg)
# Subscribe to all events and subscribe as early as possible
self.event.subscribe(jid)
try:
pub_data = self.pub(
tgt,
@ -797,8 +801,6 @@ class LocalClient(object):
def get_returns_no_block(
self,
jid,
event=None,
gather_errors=False,
tags_regex=None
):
'''
@ -806,48 +808,16 @@ class LocalClient(object):
Yield either the raw event data or None
Pass a list of additional regular expressions as `tags_regex` to search
the event bus for non-return data, such as minion lists returned from
syndics.
Pass a list of additional regular expressions as `tags_regex` to search
the event bus for non-return data, such as minion lists returned from
syndics.
'''
if event is None:
event = self.event
jid_tag = 'salt/job/{0}'.format(jid)
jid_tag_regex = '^salt/job/{0}'.format(jid)
tag_search = []
tag_search.append(re.compile(jid_tag_regex))
if isinstance(tags_regex, str):
tag_search.append(re.compile(tags_regex))
elif isinstance(tags_regex, list):
for tag in tags_regex:
tag_search.append(re.compile(tag))
while True:
if self.opts.get('transport') == 'zeromq':
try:
raw = event.get_event_noblock()
if gather_errors:
if (raw and
(raw.get('tag', '').startswith('_salt_error') or
any([tag.search(raw.get('tag', '')) for tag in tag_search]))):
yield raw
else:
if raw and raw.get('tag', '').startswith(jid_tag):
yield raw
else:
yield None
except zmq.ZMQError as ex:
if ex.errno == errno.EAGAIN or ex.errno == errno.EINTR:
yield None
else:
raise
else:
raw = event.get_event_noblock()
if raw and raw.get('tag', '').startswith(jid_tag):
yield raw
else:
yield None
# TODO(driskell): This was previously completely nonblocking.
# Should get_event have a nonblock option?
raw = self.event.get_event(wait=0.01, tag='salt/job/{0}'.format(jid), tags_regex=tags_regex, full=True)
yield raw
def get_iter_returns(
self,
@ -857,7 +827,6 @@ class LocalClient(object):
tgt='*',
tgt_type='glob',
expect_minions=False,
gather_errors=True,
block=True,
**kwargs):
'''
@ -894,9 +863,9 @@ class LocalClient(object):
# iterator for this job's return
if self.opts['order_masters']:
# If we are a MoM, we need to gather expected minions from downstreams masters.
ret_iter = self.get_returns_no_block(jid, gather_errors=gather_errors, tags_regex='^syndic/.*/{0}'.format(jid))
ret_iter = self.get_returns_no_block(jid, tags_regex=['^syndic/.*/{0}'.format(jid)])
else:
ret_iter = self.get_returns_no_block(jid, gather_errors=gather_errors)
ret_iter = self.get_returns_no_block(jid)
# iterator for the info of this job
jinfo_iter = []
timeout_at = time.time() + timeout
@ -915,10 +884,6 @@ class LocalClient(object):
# if we got None, then there were no events
if raw is None:
break
if gather_errors:
if raw['tag'] == '_salt_error':
ret = {raw['data']['id']: raw['data']['data']}
yield ret
if 'minions' in raw.get('data', {}):
minions.update(raw['data']['minions'])
continue
@ -965,15 +930,6 @@ class LocalClient(object):
# if the jinfo has timed out and some minions are still running the job
# re-do the ping
if time.time() > timeout_at and minions_running:
# need our own event listener, so we don't clobber the class one
event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=not self.opts.get('__worker', False))
# start listening for new events, before firing off the pings
event.connect_pub()
# since this is a new ping, no one has responded yet
jinfo = self.gather_job_info(jid, tgt, tgt_type)
minions_running = False
@ -982,7 +938,7 @@ class LocalClient(object):
if 'jid' not in jinfo:
jinfo_iter = []
else:
jinfo_iter = self.get_returns_no_block(jinfo['jid'], event=event)
jinfo_iter = self.get_returns_no_block(jinfo['jid'])
timeout_at = time.time() + self.opts['gather_job_timeout']
# if you are a syndic, wait a little longer
if self.opts['order_masters']:
@ -1044,8 +1000,7 @@ class LocalClient(object):
self,
jid,
minions,
timeout=None,
pending_tags=None):
timeout=None):
'''
Get the returns for the command line interface via the event system
'''

View file

@ -261,8 +261,6 @@ class EventListener(object):
opts=opts,
)
self.event.subscribe() # start listening for events immediately
# tag -> list of futures
self.tag_map = defaultdict(list)

View file

@ -59,6 +59,7 @@ import logging
import time
import datetime
import multiprocessing
import re
from collections import MutableMapping
# Import third party libs
@ -169,10 +170,12 @@ class SaltEvent(object):
if sock_dir is None:
sock_dir = opts.get('sock_dir', None)
self.puburi, self.pulluri = self.__load_uri(sock_dir, node)
self.subscribe()
self.pending_tags = []
self.pending_rtags = []
self.pending_events = []
# since ZMQ connect() has no guarantees about the socket actually being
# connected this is a hack to attempt to do so.
self.connect_pub()
self.fire_event({}, tagify('event/new_client'), 0)
self.get_event(wait=1)
@ -223,17 +226,54 @@ class SaltEvent(object):
)
return puburi, pulluri
def subscribe(self, tag=None):
def subscribe(self, tag):
'''
Subscribe to events matching the passed tag.
'''
if not self.cpub:
self.connect_pub()
def unsubscribe(self, tag=None):
If you do not subscribe to a tag, events will be discarded by calls to
get_event that request a different tag. In contexts where many different
jobs are outstanding it is important to subscribe to prevent one call
to get_event from discarding a response required by a subsequent call
to get_event.
'''
self.pending_tags.append(tag)
return
def subscribe_regex(self, tag_regex):
'''
Subscribe to events matching the passed tag expression.
If you do not subscribe to a tag, events will be discarded by calls to
get_event that request a different tag. In contexts where many different
jobs are outstanding it is important to subscribe to prevent one call
to get_event from discarding a response required by a subsequent call
to get_event.
'''
self.pending_rtags.append(re.compile(tag_regex))
return
def unsubscribe(self, tag):
'''
Un-subscribe to events matching the passed tag.
'''
self.pending_tags.remove(tag)
return
def unsubscribe_regex(self, tag_regex):
'''
Un-subscribe to events matching the passed tag.
'''
self.pending_rtags.remove(tag_regex)
old_events = self.pending_events
self.pending_events = []
for evt in old_events:
if any(evt['tag'].startswith(ptag) for ptag in self.pending_tags) or any(rtag.search(evt['tag']) for rtag in self.pending_rtags):
self.pending_events.append(evt)
return
def connect_pub(self):
@ -276,29 +316,32 @@ class SaltEvent(object):
data = serial.loads(mdata)
return mtag, data
def _check_pending(self, tag, pending_tags):
def _check_pending(self, tag, tags_regex):
"""Check the pending_events list for events that match the tag
:param tag: The tag to search for
:type tag: str
:param pending_tags: List of tags to preserve
:type pending_tags: list[str]
:param tags_regex: List of re expressions to search for also
:type tags_regex: list[re.compile()]
:return:
"""
old_events = self.pending_events
self.pending_events = []
ret = None
for evt in old_events:
if evt['tag'].startswith(tag):
if evt['tag'].startswith(tag) or any(rtag.search(evt['tag']) for rtag in tags_regex):
if ret is None:
ret = evt
log.trace('get_event() returning cached event = {0}'.format(ret))
else:
self.pending_events.append(evt)
elif any(evt['tag'].startswith(ptag) for ptag in pending_tags):
elif any(evt['tag'].startswith(ptag) for ptag in self.pending_tags) or any(rtag.search(evt['tag']) for rtag in self.pending_rtags):
self.pending_events.append(evt)
else:
log.trace('get_event() discarding cached event that no longer has any subscriptions = {0}'.format(evt))
return ret
def _get_event(self, wait, tag, pending_tags):
def _get_event(self, wait, tag, tags_regex):
start = time.time()
timeout_at = start + wait
while not wait or time.time() <= timeout_at:
@ -316,8 +359,10 @@ class SaltEvent(object):
else:
raise
if not ret['tag'].startswith(tag): # tag not match
if any(ret['tag'].startswith(ptag) for ptag in pending_tags):
if not ret['tag'].startswith(tag) and not any(rtag.search(ret['tag']) for rtag in tags_regex):
# tag not match
if any(ret['tag'].startswith(ptag) for ptag in self.pending_tags) or any(rtag.search(ret['tag']) for rtag in self.pending_rtags):
log.trace('get_event() caching unwanted event = {0}'.format(ret))
self.pending_events.append(ret)
if wait: # only update the wait timeout if we had one
wait = timeout_at - time.time()
@ -328,7 +373,7 @@ class SaltEvent(object):
return None
def get_event(self, wait=5, tag='', full=False, use_pending=False, pending_tags=None):
def get_event(self, wait=5, tag='', tags_regex=None, full=False):
'''
Get a single publication.
IF no publication available THEN block for up to wait seconds
@ -336,29 +381,32 @@ class SaltEvent(object):
IF wait is 0 then block forever.
New in Boron always checks the list of pending events
A tag specification can be given to only return publications with a tag
STARTING WITH a given string (tag) OR MATCHING one or more string
regular expressions (tags_regex list). If tag is not specified or given
as an empty string, all events are considered.
use_pending
Defines whether to keep all unconsumed events in a pending_events
list, or to discard events that don't match the requested tag. If
set to True, MAY CAUSE MEMORY LEAKS.
Searches cached publications first. If no cached publications are found
that match the given tag specification, new publications are received
and checked.
pending_tags
Add any events matching the listed tags to the pending queue.
Still MAY CAUSE MEMORY LEAKS but less likely than use_pending
assuming you later get_event for the tags you've listed here
If a publication is received that does not match the tag specification,
it is DISCARDED unless it is subscribed to via subscribe() and
subscribe_regex() which will cause it to be cached.
New in Boron
If a caller is not going to call get_event immediately after sending a
request, it MUST subscribe the result to ensure the response is not lost
should other regions of code call get_event for other purposes.
'''
if pending_tags is None:
pending_tags = []
if use_pending:
pending_tags = ['']
if tags_regex is None:
tags_regex = []
else:
tags_regex = [re.compile(rtag) for rtag in tags_regex]
ret = self._check_pending(tag, pending_tags)
ret = self._check_pending(tag, tags_regex)
if ret is None:
ret = self._get_event(wait, tag, pending_tags)
ret = self._get_event(wait, tag, tags_regex)
if ret is None or full:
return ret

View file

@ -150,11 +150,10 @@ class TestSaltEvent(TestCase):
)
)
def test_event_subscription(self):
def test_event_single(self):
'''Test a single event is received'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me.get_event(tag='evt1')
self.assertGotEvent(evt1, {'data': 'foo1'})
@ -163,7 +162,6 @@ class TestSaltEvent(TestCase):
'''Test no event is received if the timeout is reached'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me.get_event(tag='evt1')
self.assertGotEvent(evt1, {'data': 'foo1'})
@ -174,89 +172,79 @@ class TestSaltEvent(TestCase):
'''Test no wait timeout, we should block forever, until we get one '''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
me.fire_event({'data': 'foo2'}, 'evt2')
evt = me.get_event(tag='evt2', wait=0)
with eventsender_process({'data': 'foo2'}, 'evt2', 5):
evt = me.get_event(tag='evt2', wait=0)
self.assertGotEvent(evt, {'data': 'foo2'})
def test_event_subscription_matching(self):
'''Test a subscription startswith matching'''
def test_event_matching(self):
'''Test a startswith match'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me.get_event(tag='evt1')
evt1 = me.get_event(tag='ev')
self.assertGotEvent(evt1, {'data': 'foo1'})
def test_event_subscription_matching_all(self):
'''Test a subscription matching'''
def test_event_matching_regex(self):
'''Test a regex match'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me.get_event(tag='not', tags_regex=['^ev'])
self.assertGotEvent(evt1, {'data': 'foo1'})
def test_event_matching_all(self):
'''Test an all match'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me.get_event(tag='')
self.assertGotEvent(evt1, {'data': 'foo1'})
def test_event_not_subscribed(self):
'''Test get event ignores non-subscribed events'''
'''Test get_event drops non-subscribed events'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.subscribe()
with eventsender_process({'data': 'foo1'}, 'evt1', 5):
me.fire_event({'data': 'foo1'}, 'evt2')
evt1 = me.get_event(tag='evt1', wait=10)
me.fire_event({'data': 'foo1'}, 'evt1')
me.fire_event({'data': 'foo2'}, 'evt2')
evt2 = me.get_event(tag='evt2')
evt1 = me.get_event(tag='evt1')
self.assertGotEvent(evt2, {'data': 'foo2'})
self.assertIsNone(evt1)
def test_event_subscription_cache(self):
'''Test subscriptions cache a message until requested'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.subscribe('evt1')
me.fire_event({'data': 'foo1'}, 'evt1')
me.fire_event({'data': 'foo2'}, 'evt2')
evt2 = me.get_event(tag='evt2')
evt1 = me.get_event(tag='evt1')
self.assertGotEvent(evt2, {'data': 'foo2'})
self.assertGotEvent(evt1, {'data': 'foo1'})
def test_event_multiple_subscriptions(self):
'''Test multiple subscriptions do not interfere'''
def test_event_subscriptions_cache_regex(self):
'''Test regex subscriptions cache a message until requested'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.subscribe()
with eventsender_process({'data': 'foo1'}, 'evt1', 5):
me.fire_event({'data': 'foo1'}, 'evt2')
evt1 = me.get_event(tag='evt1', wait=10)
me.subscribe_regex('1$')
me.fire_event({'data': 'foo1'}, 'evt1')
me.fire_event({'data': 'foo2'}, 'evt2')
evt2 = me.get_event(tag='evt2')
evt1 = me.get_event(tag='evt1')
self.assertGotEvent(evt2, {'data': 'foo2'})
self.assertGotEvent(evt1, {'data': 'foo1'})
def test_event_multiple_clients(self):
'''Test event is received by multiple clients'''
with eventpublisher_process():
me1 = event.MasterEvent(SOCK_DIR)
me1.subscribe()
me2 = event.MasterEvent(SOCK_DIR)
me2.subscribe()
me1.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me1.get_event(tag='evt1')
self.assertGotEvent(evt1, {'data': 'foo1'})
# Can't replicate this failure in the wild, need to fix the
# test system bug here
#evt2 = me2.get_event(tag='evt1')
#self.assertGotEvent(evt2, {'data': 'foo1'})
def test_event_nested_subs(self):
'''Test nested event subscriptions do not drop events, issue #8580'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
me.fire_event({'data': 'foo2'}, 'evt2')
# Since we now drop unrelated events to avoid memory leaks, see http://goo.gl/2n3L09 commit bcbc5340ef, the
# calls below will return None and will drop the unrelated events
evt2 = me.get_event(tag='evt2')
evt1 = me.get_event(tag='evt1')
self.assertGotEvent(evt2, {'data': 'foo2'})
# This one will be None because we're dripping unrelated events
self.assertIsNone(evt1)
# Fire events again
me.fire_event({'data': 'foo3'}, 'evt3')
me.fire_event({'data': 'foo4'}, 'evt4')
# We not force unrelated pending events not to be dropped, so both of the event bellow work and are not
# None
evt2 = me.get_event(tag='evt4', use_pending=True)
evt1 = me.get_event(tag='evt3', use_pending=True)
self.assertGotEvent(evt2, {'data': 'foo4'})
self.assertGotEvent(evt1, {'data': 'foo3'})
evt2 = me2.get_event(tag='evt1')
self.assertGotEvent(evt2, {'data': 'foo1'})
@expectedFailure
def test_event_nested_sub_all(self):
@ -264,7 +252,6 @@ class TestSaltEvent(TestCase):
# Show why not to call get_event(tag='')
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.subscribe()
me.fire_event({'data': 'foo1'}, 'evt1')
me.fire_event({'data': 'foo2'}, 'evt2')
evt2 = me.get_event(tag='')
@ -276,7 +263,6 @@ class TestSaltEvent(TestCase):
'''Test a large number of events, one at a time'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.subscribe()
for i in xrange(500):
me.fire_event({'data': '{0}'.format(i)}, 'testevents')
evt = me.get_event(tag='testevents')
@ -286,7 +272,6 @@ class TestSaltEvent(TestCase):
'''Test a large number of events, send all then recv all'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR)
me.subscribe()
# Must not exceed zmq HWM
for i in xrange(500):
me.fire_event({'data': '{0}'.format(i)}, 'testevents')