Now fires events to specified returner

This commit is contained in:
Mike Place 2014-11-26 16:31:44 -07:00
parent 36017c0911
commit 5594be2708
5 changed files with 41 additions and 3 deletions

View file

@ -124,7 +124,7 @@
# be set. See various returners in salt/returners for details on required
# configuration values.
#
#return_events: mysql
#event_return: mysql
# Passing very large events can cause the minion to consume large amounts of
# memory. This value tunes the maximum size of a message allowed onto the

View file

@ -137,7 +137,7 @@ VALID_OPTS = {
'recon_max': float,
'recon_default': float,
'recon_randomize': float,
'return_events': str,
'event_return': str,
'win_repo_cachefile': str,
'pidfile': str,
'range_server': str,
@ -544,7 +544,7 @@ DEFAULT_MASTER_OPTS = {
'reactor_refresh_interval': 60,
'reactor_worker_threads': 10,
'reactor_worker_hwm': 10000,
'return_events': '',
'event_return': '',
'serial': 'msgpack',
'state_verbose': True,
'state_output': 'full',

View file

@ -401,6 +401,10 @@ class Master(SMaster):
log.info('Creating master reactor process')
process_manager.add_process(salt.utils.event.Reactor, args=(self.opts,))
if self.opts.get('event_return'):
log.info('Creating master event return process')
process_manager.add_process(salt.utils.event.EventReturn, args=(self.opts,))
ext_procs = self.opts.get('ext_processes', [])
for proc in ext_procs:
log.info('Creating ext_processes process: {0}'.format(proc))

View file

@ -17,3 +17,11 @@ def returner(ret):
Print the return data to the terminal to verify functionality
'''
print(ret)
def event_return(event):
'''
Print event return data to the terminal to verify functionality
'''
print(event)

View file

@ -594,6 +594,32 @@ class EventPublisher(multiprocessing.Process):
self.context.term()
class EventReturn(multiprocessing.Process):
'''
A dedicated process which listens to the master event bus and queues
and forwards events to the specified returner.
'''
def __init__(self, opts):
multiprocessing.Process.__init__(self)
self.opts = opts
local_minion_opts = self.opts.copy()
local_minion_opts['file_client'] = 'local'
self.minion = salt.minion.MasterMinion(local_minion_opts)
def run(self):
salt.utils.appendproctitle(self.__class__.__name__)
self.event = get_event('master', opts=self.opts)
events = self.event.iter_events(full=True)
self.event.fire_event({}, 'salt/event_listen/start')
for data in events:
try:
self.minion.returners['{0}.event_return'.format(self.opts['event_return'])](data)
except KeyError:
log.error('Could not store return for event {0}. Returner {1} '
'not found.'.format(data, self.opts.get('event_return', None)))
class Reactor(multiprocessing.Process, salt.state.Compiler):
'''
Read in the reactor configuration variable and compare it to events