From 800113083b0583a500721212744188e1322cb748 Mon Sep 17 00:00:00 2001 From: "C. R. Oldham" Date: Fri, 7 Jun 2019 13:23:03 -0600 Subject: [PATCH] Add event_listen_queue_max_seconds to fix #53411 --- conf/master | 8 ++- doc/topics/releases/2018.3.5.rst | 8 +++ salt/config/__init__.py | 5 ++ salt/utils/event.py | 92 ++++++++++++++++++++------------ 4 files changed, 79 insertions(+), 34 deletions(-) diff --git a/conf/master b/conf/master index a1ced68d8b0..546db5cb28d 100644 --- a/conf/master +++ b/conf/master @@ -151,7 +151,7 @@ # Store all returns in the given returner. # Setting this option requires that any returner-specific configuration also # be set. See various returners in salt/returners for details on required -# configuration values. (See also, event_return_queue below.) +# configuration values. (See also, event_return_queue, and event_return_queue_max_seconds below.) # #event_return: mysql @@ -161,6 +161,12 @@ # By default, events are not queued. #event_return_queue: 0 +# In some cases enabling event return queueing can be very helpful, but the bus +# may not busy enough to flush the queue consistently. Setting this to a reasonable +# value (1-30 seconds) will cause the queue to be flushed when the oldest event is older +# than `event_return_queue_max_seconds` regardless of how many events are in the queue. +#event_return_queue_max_seconds: 0 + # Only return events matching tags in a whitelist, supports glob matches. #event_return_whitelist: # - salt/master/a_tag diff --git a/doc/topics/releases/2018.3.5.rst b/doc/topics/releases/2018.3.5.rst index 284440b5955..bf1cbf9b31d 100644 --- a/doc/topics/releases/2018.3.5.rst +++ b/doc/topics/releases/2018.3.5.rst @@ -4,3 +4,11 @@ In Progress: Salt 2018.3.5 Release Notes Version 2018.3.5 is an **unreleased** bugfix release for :ref:`2018.3.0 `. This release is still in progress and has not been released yet. + +Master Configuration Changes +============================ + +To fix `#53411`_ a new configuration parameter `event_listen_queue_max_seconds` is provided. +When this is set to a value greater than 0 and `event_listen_queue` is not 0, if the oldest event +in the listen queue is older than `event_listen_queue_max_seconds`, the queue will be flushed to +returners regardless of how many events are in the queue. diff --git a/salt/config/__init__.py b/salt/config/__init__.py index a7f9af49442..b3dfc8e43d4 100644 --- a/salt/config/__init__.py +++ b/salt/config/__init__.py @@ -550,6 +550,11 @@ VALID_OPTS = { # returner specified by 'event_return' 'event_return_queue': int, + # The number of seconds that events can languish in the queue before we flush them. + # The goal here is to ensure that if the bus is not busy enough to reach a total + # `event_return_queue` events won't get stale. + 'event_return_queue_max_seconds': int, + # Only forward events to an event returner if it matches one of the tags in this list 'event_return_whitelist': list, diff --git a/salt/utils/event.py b/salt/utils/event.py index 296a296084d..ef31c3935af 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -320,12 +320,8 @@ class SaltEvent(object): sock_dir, 'minion_event_{0}_pull.ipc'.format(id_hash) ) - log.debug( - '{0} PUB socket URI: {1}'.format(self.__class__.__name__, puburi) - ) - log.debug( - '{0} PULL socket URI: {1}'.format(self.__class__.__name__, pulluri) - ) + log.debug('%s PUB socket URI: %s', self.__class__.__name__, puburi) + log.debug('%s PULL socket URI: %s', self.__class__.__name__, pulluri) return puburi, pulluri def subscribe(self, tag=None, match_type=None): @@ -370,9 +366,9 @@ class SaltEvent(object): with salt.utils.asynchronous.current_ioloop(self.io_loop): if self.subscriber is None: self.subscriber = salt.transport.ipc.IPCMessageSubscriber( - self.puburi, - io_loop=self.io_loop - ) + self.puburi, + io_loop=self.io_loop + ) try: self.io_loop.run_sync( lambda: self.subscriber.connect(timeout=timeout)) @@ -382,9 +378,9 @@ class SaltEvent(object): else: if self.subscriber is None: self.subscriber = salt.transport.ipc.IPCMessageSubscriber( - self.puburi, - io_loop=self.io_loop - ) + self.puburi, + io_loop=self.io_loop + ) # For the asynchronous case, the connect will be defered to when # set_event_handler() is invoked. @@ -982,16 +978,10 @@ class AsyncEventPublisher(object): epub_uri = epub_sock_path epull_uri = epull_sock_path - log.debug( - '{0} PUB socket URI: {1}'.format( - self.__class__.__name__, epub_uri - ) - ) - log.debug( - '{0} PULL socket URI: {1}'.format( - self.__class__.__name__, epull_uri - ) - ) + log.debug('%s PUB socket URI: %s', + self.__class__.__name__, epub_uri) + log.debug('%s PULL socket URI: %s', + self.__class__.__name__, epull_uri) minion_sock_dir = self.opts['sock_dir'] @@ -1001,7 +991,7 @@ class AsyncEventPublisher(object): try: os.makedirs(minion_sock_dir, 0o755) except OSError as exc: - log.error('Could not create SOCK_DIR: {0}'.format(exc)) + log.error('Could not create SOCK_DIR: %s', exc) # Let's not fail yet and try using the default path if minion_sock_dir == default_minion_sock_dir: # We're already trying the default system path, stop now! @@ -1011,7 +1001,7 @@ class AsyncEventPublisher(object): try: os.makedirs(default_minion_sock_dir, 0o755) except OSError as exc: - log.error('Could not create SOCK_DIR: {0}'.format(exc)) + log.error('Could not create SOCK_DIR: %s', exc) # Let's stop at this stage raise @@ -1027,7 +1017,7 @@ class AsyncEventPublisher(object): payload_handler=self.handle_publish ) - log.info('Starting pull socket on {0}'.format(epull_uri)) + log.info('Starting pull socket on %s', epull_uri) with salt.utils.files.set_umask(0o177): self.publisher.start() self.puller.start() @@ -1192,6 +1182,7 @@ class EventReturn(salt.utils.process.SignalHandlingMultiprocessingProcess): self.opts = opts self.event_return_queue = self.opts['event_return_queue'] + self.event_return_queue_max_seconds = self.opts.get('event_return_queue_max_seconds', 0) local_minion_opts = self.opts.copy() local_minion_opts['file_client'] = 'local' self.minion = salt.minion.MasterMinion(local_minion_opts) @@ -1227,13 +1218,13 @@ class EventReturn(salt.utils.process.SignalHandlingMultiprocessingProcess): if isinstance(self.opts['event_return'], list): # Multiple event returners for r in self.opts['event_return']: - log.debug('Calling event returner {0}, one of many.'.format(r)) + log.debug('Calling event returner %s, one of many.', r) event_return = '{0}.event_return'.format(r) self._flush_event_single(event_return) else: # Only a single event returner - log.debug('Calling event returner {0}, only one ' - 'configured.'.format(self.opts['event_return'])) + log.debug('Calling event returner %s, only one ' + 'configured.', self.opts['event_return']) event_return = '{0}.event_return'.format( self.opts['event_return'] ) @@ -1245,13 +1236,13 @@ class EventReturn(salt.utils.process.SignalHandlingMultiprocessingProcess): try: self.minion.returners[event_return](self.event_queue) except Exception as exc: - log.error('Could not store events - returner \'{0}\' raised ' - 'exception: {1}'.format(event_return, exc)) + log.error('Could not store events - returner \'%s\' raised ' + 'exception: %s', event_return, exc) # don't waste processing power unnecessarily on converting a # potentially huge dataset to a string if log.level <= logging.DEBUG: - log.debug('Event data that caused an exception: {0}'.format( - self.event_queue)) + log.debug('Event data that caused an exception: %s', + self.event_queue) else: log.error('Could not store return for event(s) - returner ' '\'%s\' not found.', event_return) @@ -1265,17 +1256,52 @@ class EventReturn(salt.utils.process.SignalHandlingMultiprocessingProcess): events = self.event.iter_events(full=True) self.event.fire_event({}, 'salt/event_listen/start') try: + # events below is a generator, we will iterate until we get the salt/event/exit tag + oldestevent = None for event in events: + if event['tag'] == 'salt/event/exit': + # We're done eventing self.stop = True if self._filter(event): + # This event passed the filter, add it to the queue self.event_queue.append(event) - if len(self.event_queue) >= self.event_return_queue: + too_long_in_queue = False + + # if max_seconds is >0, then we want to make sure we flush the queue + # every event_return_queue_max_seconds seconds, If it's 0, don't + # apply any of this logic + if self.event_return_queue_max_seconds > 0: + rightnow = datetime.datetime.now() + if not oldestevent: + oldestevent = rightnow + age_in_seconds = (rightnow - oldestevent).seconds + if age_in_seconds > 0: + log.debug('Oldest event in queue is %s seconds old.', age_in_seconds) + if age_in_seconds >= self.event_return_queue_max_seconds: + too_long_in_queue = True + oldestevent = None + else: + too_long_in_queue = False + + if too_long_in_queue: + log.debug('Oldest event has been in queue too long, will flush queue') + + # If we are over the max queue size or the oldest item in the queue has been there too long + # then flush the queue + if len(self.event_queue) >= self.event_return_queue or too_long_in_queue: + log.debug('Flushing %s events.', len(self.event_queue)) self.flush_events() + oldestevent = None if self.stop: + # We saw the salt/event/exit tag, we can stop eventing break finally: # flush all we have at this moment + # No matter what, make sure we flush the queue even when we are exiting + # and there will be no more events. if self.event_queue: + log.debug('Flushing %s events.', len(self.event_queue)) + self.flush_events() def _filter(self, event):