mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Fix broken beacons_before_connect
feature
The `beacons_before_connect` and `scheduler_before_connect` features that came from PR #38289 had code that was lost and so the features don't work. Added them back in and also added unit tests to help prevent this type of issue in the future. Signed-off-by: Sergey Kizunov <sergey.kizunov@ni.com>
This commit is contained in:
parent
5b95495e75
commit
4d0d023115
2 changed files with 200 additions and 63 deletions
215
salt/minion.py
215
salt/minion.py
|
@ -862,6 +862,10 @@ class MinionManager(MinionBase):
|
|||
failed = False
|
||||
while True:
|
||||
try:
|
||||
if minion.opts.get('beacons_before_connect', False):
|
||||
minion.setup_beacons(before_connect=True)
|
||||
if minion.opts.get('scheduler_before_connect', False):
|
||||
minion.setup_scheduler(before_connect=True)
|
||||
yield minion.connect_master(failed=failed)
|
||||
minion.tune_in(start=False)
|
||||
break
|
||||
|
@ -936,6 +940,7 @@ class Minion(MinionBase):
|
|||
# True means the Minion is fully functional and ready to handle events.
|
||||
self.ready = False
|
||||
self.jid_queue = jid_queue or []
|
||||
self.periodic_callbacks = {}
|
||||
|
||||
if io_loop is None:
|
||||
if HAS_ZMQ:
|
||||
|
@ -967,6 +972,19 @@ class Minion(MinionBase):
|
|||
# post_master_init
|
||||
if not salt.utils.is_proxy():
|
||||
self.opts['grains'] = salt.loader.grains(opts)
|
||||
else:
|
||||
if self.opts.get('beacons_before_connect', False):
|
||||
log.warning(
|
||||
'\'beacons_before_connect\' is not supported '
|
||||
'for proxy minions. Setting to False'
|
||||
)
|
||||
self.opts['beacons_before_connect'] = False
|
||||
if self.opts.get('scheduler_before_connect', False):
|
||||
log.warning(
|
||||
'\'scheduler_before_connect\' is not supported '
|
||||
'for proxy minions. Setting to False'
|
||||
)
|
||||
self.opts['scheduler_before_connect'] = False
|
||||
|
||||
log.info('Creating minion process manager')
|
||||
|
||||
|
@ -1070,19 +1088,22 @@ class Minion(MinionBase):
|
|||
pillarenv=self.opts.get('pillarenv')
|
||||
).compile_pillar()
|
||||
|
||||
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
|
||||
self.serial = salt.payload.Serial(self.opts)
|
||||
self.mod_opts = self._prep_mod_opts()
|
||||
self.matcher = Matcher(self.opts, self.functions)
|
||||
self.beacons = salt.beacons.Beacon(self.opts, self.functions)
|
||||
uid = salt.utils.get_uid(user=self.opts.get('user', None))
|
||||
self.proc_dir = get_proc_dir(self.opts['cachedir'], uid=uid)
|
||||
if not self.ready:
|
||||
self._setup_core()
|
||||
elif self.connected and self.opts['pillar']:
|
||||
# The pillar has changed due to the connection to the master.
|
||||
# Reload the functions so that they can use the new pillar data.
|
||||
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
|
||||
if hasattr(self, 'schedule'):
|
||||
self.schedule.functions = self.functions
|
||||
self.schedule.returners = self.returners
|
||||
|
||||
self.schedule = salt.utils.schedule.Schedule(
|
||||
self.opts,
|
||||
self.functions,
|
||||
self.returners,
|
||||
cleanup=[master_event(type='alive')])
|
||||
if not hasattr(self, 'schedule'):
|
||||
self.schedule = salt.utils.schedule.Schedule(
|
||||
self.opts,
|
||||
self.functions,
|
||||
self.returners,
|
||||
cleanup=[master_event(type='alive')])
|
||||
|
||||
# add default scheduling jobs to the minions scheduler
|
||||
if self.opts['mine_enabled'] and 'mine.update' in self.functions:
|
||||
|
@ -1136,9 +1157,6 @@ class Minion(MinionBase):
|
|||
self.schedule.delete_job(master_event(type='alive', master=self.opts['master']), persist=True)
|
||||
self.schedule.delete_job(master_event(type='failback'), persist=True)
|
||||
|
||||
self.grains_cache = self.opts['grains']
|
||||
self.ready = True
|
||||
|
||||
def _return_retry_timer(self):
|
||||
'''
|
||||
Based on the minion configuration, either return a randomized timer or
|
||||
|
@ -2176,6 +2194,118 @@ class Minion(MinionBase):
|
|||
except (ValueError, NameError):
|
||||
pass
|
||||
|
||||
def _setup_core(self):
|
||||
'''
|
||||
Set up the core minion attributes.
|
||||
This is safe to call multiple times.
|
||||
'''
|
||||
if not self.ready:
|
||||
# First call. Initialize.
|
||||
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
|
||||
self.serial = salt.payload.Serial(self.opts)
|
||||
self.mod_opts = self._prep_mod_opts()
|
||||
self.matcher = Matcher(self.opts, self.functions)
|
||||
uid = salt.utils.get_uid(user=self.opts.get('user', None))
|
||||
self.proc_dir = get_proc_dir(self.opts['cachedir'], uid=uid)
|
||||
self.grains_cache = self.opts['grains']
|
||||
self.ready = True
|
||||
|
||||
def setup_beacons(self, before_connect=False):
|
||||
'''
|
||||
Set up the beacons.
|
||||
This is safe to call multiple times.
|
||||
'''
|
||||
self._setup_core()
|
||||
|
||||
loop_interval = self.opts['loop_interval']
|
||||
new_periodic_callbacks = {}
|
||||
|
||||
if 'beacons' not in self.periodic_callbacks:
|
||||
self.beacons = salt.beacons.Beacon(self.opts, self.functions)
|
||||
|
||||
def handle_beacons():
|
||||
# Process Beacons
|
||||
beacons = None
|
||||
try:
|
||||
beacons = self.process_beacons(self.functions)
|
||||
except Exception:
|
||||
log.critical('The beacon errored: ', exc_info=True)
|
||||
if beacons and self.connected:
|
||||
self._fire_master(events=beacons)
|
||||
|
||||
new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(handle_beacons, loop_interval * 1000, io_loop=self.io_loop)
|
||||
if before_connect:
|
||||
# Make sure there is a chance for one iteration to occur before connect
|
||||
handle_beacons()
|
||||
|
||||
if 'cleanup' not in self.periodic_callbacks:
|
||||
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
|
||||
|
||||
# start all the other callbacks
|
||||
for periodic_cb in six.itervalues(new_periodic_callbacks):
|
||||
periodic_cb.start()
|
||||
|
||||
self.periodic_callbacks.update(new_periodic_callbacks)
|
||||
|
||||
def setup_scheduler(self, before_connect=False):
|
||||
'''
|
||||
Set up the scheduler.
|
||||
This is safe to call multiple times.
|
||||
'''
|
||||
self._setup_core()
|
||||
|
||||
loop_interval = self.opts['loop_interval']
|
||||
new_periodic_callbacks = {}
|
||||
|
||||
if 'schedule' not in self.periodic_callbacks:
|
||||
if 'schedule' not in self.opts:
|
||||
self.opts['schedule'] = {}
|
||||
if not hasattr(self, 'schedule'):
|
||||
self.schedule = salt.utils.schedule.Schedule(
|
||||
self.opts,
|
||||
self.functions,
|
||||
self.returners,
|
||||
cleanup=[master_event(type='alive')])
|
||||
|
||||
try:
|
||||
if self.opts['grains_refresh_every']: # If exists and is not zero. In minutes, not seconds!
|
||||
if self.opts['grains_refresh_every'] > 1:
|
||||
log.debug(
|
||||
'Enabling the grains refresher. Will run every {0} minutes.'.format(
|
||||
self.opts['grains_refresh_every'])
|
||||
)
|
||||
else: # Clean up minute vs. minutes in log message
|
||||
log.debug(
|
||||
'Enabling the grains refresher. Will run every {0} minute.'.format(
|
||||
self.opts['grains_refresh_every'])
|
||||
)
|
||||
self._refresh_grains_watcher(
|
||||
abs(self.opts['grains_refresh_every'])
|
||||
)
|
||||
except Exception as exc:
|
||||
log.error(
|
||||
'Exception occurred in attempt to initialize grain refresh routine during minion tune-in: {0}'.format(
|
||||
exc)
|
||||
)
|
||||
|
||||
# TODO: actually listen to the return and change period
|
||||
def handle_schedule():
|
||||
self.process_schedule(self, loop_interval)
|
||||
new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000, io_loop=self.io_loop)
|
||||
|
||||
if before_connect:
|
||||
# Make sure there is a chance for one iteration to occur before connect
|
||||
handle_schedule()
|
||||
|
||||
if 'cleanup' not in self.periodic_callbacks:
|
||||
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
|
||||
|
||||
# start all the other callbacks
|
||||
for periodic_cb in six.itervalues(new_periodic_callbacks):
|
||||
periodic_cb.start()
|
||||
|
||||
self.periodic_callbacks.update(new_periodic_callbacks)
|
||||
|
||||
# Main Minion Tune In
|
||||
def tune_in(self, start=True):
|
||||
'''
|
||||
|
@ -2187,6 +2317,10 @@ class Minion(MinionBase):
|
|||
log.debug('Minion \'{0}\' trying to tune in'.format(self.opts['id']))
|
||||
|
||||
if start:
|
||||
if self.opts.get('beacons_before_connect', False):
|
||||
self.setup_beacons(before_connect=True)
|
||||
if self.opts.get('scheduler_before_connect', False):
|
||||
self.setup_scheduler(before_connect=True)
|
||||
self.sync_connect_master()
|
||||
if self.connected:
|
||||
self._fire_master_minion_start()
|
||||
|
@ -2201,31 +2335,9 @@ class Minion(MinionBase):
|
|||
# On first startup execute a state run if configured to do so
|
||||
self._state_run()
|
||||
|
||||
loop_interval = self.opts['loop_interval']
|
||||
self.setup_beacons()
|
||||
self.setup_scheduler()
|
||||
|
||||
try:
|
||||
if self.opts['grains_refresh_every']: # If exists and is not zero. In minutes, not seconds!
|
||||
if self.opts['grains_refresh_every'] > 1:
|
||||
log.debug(
|
||||
'Enabling the grains refresher. Will run every {0} minutes.'.format(
|
||||
self.opts['grains_refresh_every'])
|
||||
)
|
||||
else: # Clean up minute vs. minutes in log message
|
||||
log.debug(
|
||||
'Enabling the grains refresher. Will run every {0} minute.'.format(
|
||||
self.opts['grains_refresh_every'])
|
||||
|
||||
)
|
||||
self._refresh_grains_watcher(
|
||||
abs(self.opts['grains_refresh_every'])
|
||||
)
|
||||
except Exception as exc:
|
||||
log.error(
|
||||
'Exception occurred in attempt to initialize grain refresh routine during minion tune-in: {0}'.format(
|
||||
exc)
|
||||
)
|
||||
|
||||
self.periodic_callbacks = {}
|
||||
# schedule the stuff that runs every interval
|
||||
ping_interval = self.opts.get('ping_interval', 0) * 60
|
||||
if ping_interval > 0 and self.connected:
|
||||
|
@ -2243,30 +2355,7 @@ class Minion(MinionBase):
|
|||
except Exception:
|
||||
log.warning('Attempt to ping master failed.', exc_on_loglevel=logging.DEBUG)
|
||||
self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000, io_loop=self.io_loop)
|
||||
|
||||
self.periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
|
||||
|
||||
def handle_beacons():
|
||||
# Process Beacons
|
||||
beacons = None
|
||||
try:
|
||||
beacons = self.process_beacons(self.functions)
|
||||
except Exception:
|
||||
log.critical('The beacon errored: ', exc_info=True)
|
||||
if beacons and self.connected:
|
||||
self._fire_master(events=beacons, sync=False)
|
||||
|
||||
self.periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(handle_beacons, loop_interval * 1000, io_loop=self.io_loop)
|
||||
|
||||
# TODO: actually listen to the return and change period
|
||||
def handle_schedule():
|
||||
self.process_schedule(self, loop_interval)
|
||||
if hasattr(self, 'schedule'):
|
||||
self.periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000, io_loop=self.io_loop)
|
||||
|
||||
# start all the other callbacks
|
||||
for periodic_cb in six.itervalues(self.periodic_callbacks):
|
||||
periodic_cb.start()
|
||||
self.periodic_callbacks['ping'].start()
|
||||
|
||||
# add handler to subscriber
|
||||
if hasattr(self, 'pub_channel') and self.pub_channel is not None:
|
||||
|
|
|
@ -131,3 +131,51 @@ class MinionTestCase(TestCase):
|
|||
self.assertEqual(minion.jid_queue, [456, 789])
|
||||
finally:
|
||||
minion.destroy()
|
||||
|
||||
def test_beacons_before_connect(self):
|
||||
'''
|
||||
Tests that the 'beacons_before_connect' option causes the beacons to be initialized before connect.
|
||||
'''
|
||||
with patch('salt.minion.Minion.ctx', MagicMock(return_value={})), \
|
||||
patch('salt.minion.Minion.sync_connect_master', MagicMock(side_effect=RuntimeError('stop execution'))), \
|
||||
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \
|
||||
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)):
|
||||
mock_opts = copy.copy(salt.config.DEFAULT_MINION_OPTS)
|
||||
mock_opts['beacons_before_connect'] = True
|
||||
try:
|
||||
minion = salt.minion.Minion(mock_opts, io_loop=tornado.ioloop.IOLoop())
|
||||
|
||||
try:
|
||||
minion.tune_in(start=True)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
# Make sure beacons are initialized but the sheduler is not
|
||||
self.assertTrue('beacons' in minion.periodic_callbacks)
|
||||
self.assertTrue('schedule' not in minion.periodic_callbacks)
|
||||
finally:
|
||||
minion.destroy()
|
||||
|
||||
def test_scheduler_before_connect(self):
|
||||
'''
|
||||
Tests that the 'scheduler_before_connect' option causes the scheduler to be initialized before connect.
|
||||
'''
|
||||
with patch('salt.minion.Minion.ctx', MagicMock(return_value={})), \
|
||||
patch('salt.minion.Minion.sync_connect_master', MagicMock(side_effect=RuntimeError('stop execution'))), \
|
||||
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \
|
||||
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)):
|
||||
mock_opts = copy.copy(salt.config.DEFAULT_MINION_OPTS)
|
||||
mock_opts['scheduler_before_connect'] = True
|
||||
try:
|
||||
minion = salt.minion.Minion(mock_opts, io_loop=tornado.ioloop.IOLoop())
|
||||
|
||||
try:
|
||||
minion.tune_in(start=True)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
# Make sure the scheduler is initialized but the beacons are not
|
||||
self.assertTrue('schedule' in minion.periodic_callbacks)
|
||||
self.assertTrue('beacons' not in minion.periodic_callbacks)
|
||||
finally:
|
||||
minion.destroy()
|
||||
|
|
Loading…
Add table
Reference in a new issue