mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge branch '2019.2.1' into cherry-pick-pr-53370
This commit is contained in:
commit
450d23d427
4 changed files with 81 additions and 34 deletions
|
@ -151,7 +151,7 @@
|
|||
# Store all returns in the given returner.
|
||||
# Setting this option requires that any returner-specific configuration also
|
||||
# be set. See various returners in salt/returners for details on required
|
||||
# configuration values. (See also, event_return_queue below.)
|
||||
# configuration values. (See also, event_return_queue, and event_return_queue_max_seconds below.)
|
||||
#
|
||||
#event_return: mysql
|
||||
|
||||
|
@ -161,6 +161,12 @@
|
|||
# By default, events are not queued.
|
||||
#event_return_queue: 0
|
||||
|
||||
# In some cases enabling event return queueing can be very helpful, but the bus
|
||||
# may not busy enough to flush the queue consistently. Setting this to a reasonable
|
||||
# value (1-30 seconds) will cause the queue to be flushed when the oldest event is older
|
||||
# than `event_return_queue_max_seconds` regardless of how many events are in the queue.
|
||||
#event_return_queue_max_seconds: 0
|
||||
|
||||
# Only return events matching tags in a whitelist, supports glob matches.
|
||||
#event_return_whitelist:
|
||||
# - salt/master/a_tag
|
||||
|
|
|
@ -4,3 +4,13 @@ In Progress: Salt 2018.3.5 Release Notes
|
|||
|
||||
Version 2018.3.5 is an **unreleased** bugfix release for :ref:`2018.3.0 <release-2018-3-0>`.
|
||||
This release is still in progress and has not been released yet.
|
||||
|
||||
Master Configuration Changes
|
||||
============================
|
||||
|
||||
To fix `#53411`_ a new configuration parameter `event_listen_queue_max_seconds` is provided.
|
||||
When this is set to a value greater than 0 and `event_listen_queue` is not 0, if the oldest event
|
||||
in the listen queue is older than `event_listen_queue_max_seconds`, the queue will be flushed to
|
||||
returners regardless of how many events are in the queue.
|
||||
|
||||
.. _`#53411`: https://github.com/saltstack/salt/issues/53411
|
||||
|
|
|
@ -550,6 +550,11 @@ VALID_OPTS = {
|
|||
# returner specified by 'event_return'
|
||||
'event_return_queue': int,
|
||||
|
||||
# The number of seconds that events can languish in the queue before we flush them.
|
||||
# The goal here is to ensure that if the bus is not busy enough to reach a total
|
||||
# `event_return_queue` events won't get stale.
|
||||
'event_return_queue_max_seconds': int,
|
||||
|
||||
# Only forward events to an event returner if it matches one of the tags in this list
|
||||
'event_return_whitelist': list,
|
||||
|
||||
|
|
|
@ -320,12 +320,8 @@ class SaltEvent(object):
|
|||
sock_dir,
|
||||
'minion_event_{0}_pull.ipc'.format(id_hash)
|
||||
)
|
||||
log.debug(
|
||||
'{0} PUB socket URI: {1}'.format(self.__class__.__name__, puburi)
|
||||
)
|
||||
log.debug(
|
||||
'{0} PULL socket URI: {1}'.format(self.__class__.__name__, pulluri)
|
||||
)
|
||||
log.debug('%s PUB socket URI: %s', self.__class__.__name__, puburi)
|
||||
log.debug('%s PULL socket URI: %s', self.__class__.__name__, pulluri)
|
||||
return puburi, pulluri
|
||||
|
||||
def subscribe(self, tag=None, match_type=None):
|
||||
|
@ -370,9 +366,9 @@ class SaltEvent(object):
|
|||
with salt.utils.asynchronous.current_ioloop(self.io_loop):
|
||||
if self.subscriber is None:
|
||||
self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
|
||||
self.puburi,
|
||||
io_loop=self.io_loop
|
||||
)
|
||||
self.puburi,
|
||||
io_loop=self.io_loop
|
||||
)
|
||||
try:
|
||||
self.io_loop.run_sync(
|
||||
lambda: self.subscriber.connect(timeout=timeout))
|
||||
|
@ -382,9 +378,9 @@ class SaltEvent(object):
|
|||
else:
|
||||
if self.subscriber is None:
|
||||
self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
|
||||
self.puburi,
|
||||
io_loop=self.io_loop
|
||||
)
|
||||
self.puburi,
|
||||
io_loop=self.io_loop
|
||||
)
|
||||
|
||||
# For the asynchronous case, the connect will be defered to when
|
||||
# set_event_handler() is invoked.
|
||||
|
@ -982,16 +978,10 @@ class AsyncEventPublisher(object):
|
|||
epub_uri = epub_sock_path
|
||||
epull_uri = epull_sock_path
|
||||
|
||||
log.debug(
|
||||
'{0} PUB socket URI: {1}'.format(
|
||||
self.__class__.__name__, epub_uri
|
||||
)
|
||||
)
|
||||
log.debug(
|
||||
'{0} PULL socket URI: {1}'.format(
|
||||
self.__class__.__name__, epull_uri
|
||||
)
|
||||
)
|
||||
log.debug('%s PUB socket URI: %s',
|
||||
self.__class__.__name__, epub_uri)
|
||||
log.debug('%s PULL socket URI: %s',
|
||||
self.__class__.__name__, epull_uri)
|
||||
|
||||
minion_sock_dir = self.opts['sock_dir']
|
||||
|
||||
|
@ -1001,7 +991,7 @@ class AsyncEventPublisher(object):
|
|||
try:
|
||||
os.makedirs(minion_sock_dir, 0o755)
|
||||
except OSError as exc:
|
||||
log.error('Could not create SOCK_DIR: {0}'.format(exc))
|
||||
log.error('Could not create SOCK_DIR: %s', exc)
|
||||
# Let's not fail yet and try using the default path
|
||||
if minion_sock_dir == default_minion_sock_dir:
|
||||
# We're already trying the default system path, stop now!
|
||||
|
@ -1011,7 +1001,7 @@ class AsyncEventPublisher(object):
|
|||
try:
|
||||
os.makedirs(default_minion_sock_dir, 0o755)
|
||||
except OSError as exc:
|
||||
log.error('Could not create SOCK_DIR: {0}'.format(exc))
|
||||
log.error('Could not create SOCK_DIR: %s', exc)
|
||||
# Let's stop at this stage
|
||||
raise
|
||||
|
||||
|
@ -1027,7 +1017,7 @@ class AsyncEventPublisher(object):
|
|||
payload_handler=self.handle_publish
|
||||
)
|
||||
|
||||
log.info('Starting pull socket on {0}'.format(epull_uri))
|
||||
log.info('Starting pull socket on %s', epull_uri)
|
||||
with salt.utils.files.set_umask(0o177):
|
||||
self.publisher.start()
|
||||
self.puller.start()
|
||||
|
@ -1192,6 +1182,7 @@ class EventReturn(salt.utils.process.SignalHandlingMultiprocessingProcess):
|
|||
|
||||
self.opts = opts
|
||||
self.event_return_queue = self.opts['event_return_queue']
|
||||
self.event_return_queue_max_seconds = self.opts.get('event_return_queue_max_seconds', 0)
|
||||
local_minion_opts = self.opts.copy()
|
||||
local_minion_opts['file_client'] = 'local'
|
||||
self.minion = salt.minion.MasterMinion(local_minion_opts)
|
||||
|
@ -1227,13 +1218,13 @@ class EventReturn(salt.utils.process.SignalHandlingMultiprocessingProcess):
|
|||
if isinstance(self.opts['event_return'], list):
|
||||
# Multiple event returners
|
||||
for r in self.opts['event_return']:
|
||||
log.debug('Calling event returner {0}, one of many.'.format(r))
|
||||
log.debug('Calling event returner %s, one of many.', r)
|
||||
event_return = '{0}.event_return'.format(r)
|
||||
self._flush_event_single(event_return)
|
||||
else:
|
||||
# Only a single event returner
|
||||
log.debug('Calling event returner {0}, only one '
|
||||
'configured.'.format(self.opts['event_return']))
|
||||
log.debug('Calling event returner %s, only one '
|
||||
'configured.', self.opts['event_return'])
|
||||
event_return = '{0}.event_return'.format(
|
||||
self.opts['event_return']
|
||||
)
|
||||
|
@ -1245,13 +1236,13 @@ class EventReturn(salt.utils.process.SignalHandlingMultiprocessingProcess):
|
|||
try:
|
||||
self.minion.returners[event_return](self.event_queue)
|
||||
except Exception as exc:
|
||||
log.error('Could not store events - returner \'{0}\' raised '
|
||||
'exception: {1}'.format(event_return, exc))
|
||||
log.error('Could not store events - returner \'%s\' raised '
|
||||
'exception: %s', event_return, exc)
|
||||
# don't waste processing power unnecessarily on converting a
|
||||
# potentially huge dataset to a string
|
||||
if log.level <= logging.DEBUG:
|
||||
log.debug('Event data that caused an exception: {0}'.format(
|
||||
self.event_queue))
|
||||
log.debug('Event data that caused an exception: %s',
|
||||
self.event_queue)
|
||||
else:
|
||||
log.error('Could not store return for event(s) - returner '
|
||||
'\'%s\' not found.', event_return)
|
||||
|
@ -1265,17 +1256,52 @@ class EventReturn(salt.utils.process.SignalHandlingMultiprocessingProcess):
|
|||
events = self.event.iter_events(full=True)
|
||||
self.event.fire_event({}, 'salt/event_listen/start')
|
||||
try:
|
||||
# events below is a generator, we will iterate until we get the salt/event/exit tag
|
||||
oldestevent = None
|
||||
for event in events:
|
||||
|
||||
if event['tag'] == 'salt/event/exit':
|
||||
# We're done eventing
|
||||
self.stop = True
|
||||
if self._filter(event):
|
||||
# This event passed the filter, add it to the queue
|
||||
self.event_queue.append(event)
|
||||
if len(self.event_queue) >= self.event_return_queue:
|
||||
too_long_in_queue = False
|
||||
|
||||
# if max_seconds is >0, then we want to make sure we flush the queue
|
||||
# every event_return_queue_max_seconds seconds, If it's 0, don't
|
||||
# apply any of this logic
|
||||
if self.event_return_queue_max_seconds > 0:
|
||||
rightnow = datetime.datetime.now()
|
||||
if not oldestevent:
|
||||
oldestevent = rightnow
|
||||
age_in_seconds = (rightnow - oldestevent).seconds
|
||||
if age_in_seconds > 0:
|
||||
log.debug('Oldest event in queue is %s seconds old.', age_in_seconds)
|
||||
if age_in_seconds >= self.event_return_queue_max_seconds:
|
||||
too_long_in_queue = True
|
||||
oldestevent = None
|
||||
else:
|
||||
too_long_in_queue = False
|
||||
|
||||
if too_long_in_queue:
|
||||
log.debug('Oldest event has been in queue too long, will flush queue')
|
||||
|
||||
# If we are over the max queue size or the oldest item in the queue has been there too long
|
||||
# then flush the queue
|
||||
if len(self.event_queue) >= self.event_return_queue or too_long_in_queue:
|
||||
log.debug('Flushing %s events.', len(self.event_queue))
|
||||
self.flush_events()
|
||||
oldestevent = None
|
||||
if self.stop:
|
||||
# We saw the salt/event/exit tag, we can stop eventing
|
||||
break
|
||||
finally: # flush all we have at this moment
|
||||
# No matter what, make sure we flush the queue even when we are exiting
|
||||
# and there will be no more events.
|
||||
if self.event_queue:
|
||||
log.debug('Flushing %s events.', len(self.event_queue))
|
||||
|
||||
self.flush_events()
|
||||
|
||||
def _filter(self, event):
|
||||
|
|
Loading…
Add table
Reference in a new issue