Ability to queue event returns

This commit is contained in:
Mike Place 2014-12-01 11:46:27 -07:00
parent 328ff6a0f2
commit 8877ba7d78
3 changed files with 17 additions and 2 deletions

View file

@ -122,10 +122,16 @@
# Store all returns in the given returner.
# Setting this option requires that any returner-specific configuration also
# be set. See various returners in salt/returners for details on required
# configuration values.
# configuration values. (See also, event_return_queue below.)
#
#event_return: mysql
# On busy systems, enabling event_returns can cause a considerable load on
# the storage system for returners. Events can be queued on the master and
# stored in a batched fashion using a single transaction for multiple events.
# By default, events are not queued.
#event_return_queue: 0
# Passing very large events can cause the minion to consume large amounts of
# memory. This value tunes the maximum size of a message allowed onto the
# master event bus. The value is expressed in bytes.

View file

@ -138,6 +138,7 @@ VALID_OPTS = {
'recon_default': float,
'recon_randomize': float,
'event_return': str,
'event_return_queue': int,
'win_repo_cachefile': str,
'pidfile': str,
'range_server': str,
@ -545,6 +546,7 @@ DEFAULT_MASTER_OPTS = {
'reactor_worker_threads': 10,
'reactor_worker_hwm': 10000,
'event_return': '',
'event_return_queue': 0,
'serial': 'msgpack',
'state_verbose': True,
'state_output': 'full',

View file

@ -603,6 +603,7 @@ class EventReturn(multiprocessing.Process):
multiprocessing.Process.__init__(self)
self.opts = opts
self.event_return_queue = self.opts['event_return_queue']
local_minion_opts = self.opts.copy()
local_minion_opts['file_client'] = 'local'
self.minion = salt.minion.MasterMinion(local_minion_opts)
@ -612,8 +613,14 @@ class EventReturn(multiprocessing.Process):
self.event = get_event('master', opts=self.opts)
events = self.event.iter_events(full=True)
self.event.fire_event({}, 'salt/event_listen/start')
event_queue = []
try:
self.minion.returners['{0}.event_return'.format(self.opts['event_return'])](events)
for event in events:
event_queue.append(event)
if len(event_queue) >= self.event_return_queue:
log.trace('Storing events')
self.minion.returners['{0}.event_return'.format(self.opts['event_return'])](event_queue)
event_queue = []
except KeyError:
log.error('Could not store return for events {0}. Returner {1} '
'not found.'.format(events, self.opts.get('event_return', None)))