mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #44494 from skizunov/develop2
Fix broken `beacons_before_connect` feature
This commit is contained in:
commit
febb913743
2 changed files with 200 additions and 63 deletions
215
salt/minion.py
215
salt/minion.py
|
@ -862,6 +862,10 @@ class MinionManager(MinionBase):
|
|||
failed = False
|
||||
while True:
|
||||
try:
|
||||
if minion.opts.get('beacons_before_connect', False):
|
||||
minion.setup_beacons(before_connect=True)
|
||||
if minion.opts.get('scheduler_before_connect', False):
|
||||
minion.setup_scheduler(before_connect=True)
|
||||
yield minion.connect_master(failed=failed)
|
||||
minion.tune_in(start=False)
|
||||
break
|
||||
|
@ -936,6 +940,7 @@ class Minion(MinionBase):
|
|||
# True means the Minion is fully functional and ready to handle events.
|
||||
self.ready = False
|
||||
self.jid_queue = jid_queue or []
|
||||
self.periodic_callbacks = {}
|
||||
|
||||
if io_loop is None:
|
||||
if HAS_ZMQ:
|
||||
|
@ -967,6 +972,19 @@ class Minion(MinionBase):
|
|||
# post_master_init
|
||||
if not salt.utils.is_proxy():
|
||||
self.opts['grains'] = salt.loader.grains(opts)
|
||||
else:
|
||||
if self.opts.get('beacons_before_connect', False):
|
||||
log.warning(
|
||||
'\'beacons_before_connect\' is not supported '
|
||||
'for proxy minions. Setting to False'
|
||||
)
|
||||
self.opts['beacons_before_connect'] = False
|
||||
if self.opts.get('scheduler_before_connect', False):
|
||||
log.warning(
|
||||
'\'scheduler_before_connect\' is not supported '
|
||||
'for proxy minions. Setting to False'
|
||||
)
|
||||
self.opts['scheduler_before_connect'] = False
|
||||
|
||||
log.info('Creating minion process manager')
|
||||
|
||||
|
@ -1070,19 +1088,22 @@ class Minion(MinionBase):
|
|||
pillarenv=self.opts.get('pillarenv')
|
||||
).compile_pillar()
|
||||
|
||||
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
|
||||
self.serial = salt.payload.Serial(self.opts)
|
||||
self.mod_opts = self._prep_mod_opts()
|
||||
self.matcher = Matcher(self.opts, self.functions)
|
||||
self.beacons = salt.beacons.Beacon(self.opts, self.functions)
|
||||
uid = salt.utils.get_uid(user=self.opts.get('user', None))
|
||||
self.proc_dir = get_proc_dir(self.opts['cachedir'], uid=uid)
|
||||
if not self.ready:
|
||||
self._setup_core()
|
||||
elif self.connected and self.opts['pillar']:
|
||||
# The pillar has changed due to the connection to the master.
|
||||
# Reload the functions so that they can use the new pillar data.
|
||||
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
|
||||
if hasattr(self, 'schedule'):
|
||||
self.schedule.functions = self.functions
|
||||
self.schedule.returners = self.returners
|
||||
|
||||
self.schedule = salt.utils.schedule.Schedule(
|
||||
self.opts,
|
||||
self.functions,
|
||||
self.returners,
|
||||
cleanup=[master_event(type='alive')])
|
||||
if not hasattr(self, 'schedule'):
|
||||
self.schedule = salt.utils.schedule.Schedule(
|
||||
self.opts,
|
||||
self.functions,
|
||||
self.returners,
|
||||
cleanup=[master_event(type='alive')])
|
||||
|
||||
# add default scheduling jobs to the minions scheduler
|
||||
if self.opts['mine_enabled'] and 'mine.update' in self.functions:
|
||||
|
@ -1136,9 +1157,6 @@ class Minion(MinionBase):
|
|||
self.schedule.delete_job(master_event(type='alive', master=self.opts['master']), persist=True)
|
||||
self.schedule.delete_job(master_event(type='failback'), persist=True)
|
||||
|
||||
self.grains_cache = self.opts['grains']
|
||||
self.ready = True
|
||||
|
||||
def _return_retry_timer(self):
|
||||
'''
|
||||
Based on the minion configuration, either return a randomized timer or
|
||||
|
@ -2180,6 +2198,118 @@ class Minion(MinionBase):
|
|||
except (ValueError, NameError):
|
||||
pass
|
||||
|
||||
def _setup_core(self):
|
||||
'''
|
||||
Set up the core minion attributes.
|
||||
This is safe to call multiple times.
|
||||
'''
|
||||
if not self.ready:
|
||||
# First call. Initialize.
|
||||
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
|
||||
self.serial = salt.payload.Serial(self.opts)
|
||||
self.mod_opts = self._prep_mod_opts()
|
||||
self.matcher = Matcher(self.opts, self.functions)
|
||||
uid = salt.utils.get_uid(user=self.opts.get('user', None))
|
||||
self.proc_dir = get_proc_dir(self.opts['cachedir'], uid=uid)
|
||||
self.grains_cache = self.opts['grains']
|
||||
self.ready = True
|
||||
|
||||
def setup_beacons(self, before_connect=False):
|
||||
'''
|
||||
Set up the beacons.
|
||||
This is safe to call multiple times.
|
||||
'''
|
||||
self._setup_core()
|
||||
|
||||
loop_interval = self.opts['loop_interval']
|
||||
new_periodic_callbacks = {}
|
||||
|
||||
if 'beacons' not in self.periodic_callbacks:
|
||||
self.beacons = salt.beacons.Beacon(self.opts, self.functions)
|
||||
|
||||
def handle_beacons():
|
||||
# Process Beacons
|
||||
beacons = None
|
||||
try:
|
||||
beacons = self.process_beacons(self.functions)
|
||||
except Exception:
|
||||
log.critical('The beacon errored: ', exc_info=True)
|
||||
if beacons and self.connected:
|
||||
self._fire_master(events=beacons)
|
||||
|
||||
new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(handle_beacons, loop_interval * 1000, io_loop=self.io_loop)
|
||||
if before_connect:
|
||||
# Make sure there is a chance for one iteration to occur before connect
|
||||
handle_beacons()
|
||||
|
||||
if 'cleanup' not in self.periodic_callbacks:
|
||||
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
|
||||
|
||||
# start all the other callbacks
|
||||
for periodic_cb in six.itervalues(new_periodic_callbacks):
|
||||
periodic_cb.start()
|
||||
|
||||
self.periodic_callbacks.update(new_periodic_callbacks)
|
||||
|
||||
def setup_scheduler(self, before_connect=False):
|
||||
'''
|
||||
Set up the scheduler.
|
||||
This is safe to call multiple times.
|
||||
'''
|
||||
self._setup_core()
|
||||
|
||||
loop_interval = self.opts['loop_interval']
|
||||
new_periodic_callbacks = {}
|
||||
|
||||
if 'schedule' not in self.periodic_callbacks:
|
||||
if 'schedule' not in self.opts:
|
||||
self.opts['schedule'] = {}
|
||||
if not hasattr(self, 'schedule'):
|
||||
self.schedule = salt.utils.schedule.Schedule(
|
||||
self.opts,
|
||||
self.functions,
|
||||
self.returners,
|
||||
cleanup=[master_event(type='alive')])
|
||||
|
||||
try:
|
||||
if self.opts['grains_refresh_every']: # If exists and is not zero. In minutes, not seconds!
|
||||
if self.opts['grains_refresh_every'] > 1:
|
||||
log.debug(
|
||||
'Enabling the grains refresher. Will run every {0} minutes.'.format(
|
||||
self.opts['grains_refresh_every'])
|
||||
)
|
||||
else: # Clean up minute vs. minutes in log message
|
||||
log.debug(
|
||||
'Enabling the grains refresher. Will run every {0} minute.'.format(
|
||||
self.opts['grains_refresh_every'])
|
||||
)
|
||||
self._refresh_grains_watcher(
|
||||
abs(self.opts['grains_refresh_every'])
|
||||
)
|
||||
except Exception as exc:
|
||||
log.error(
|
||||
'Exception occurred in attempt to initialize grain refresh routine during minion tune-in: {0}'.format(
|
||||
exc)
|
||||
)
|
||||
|
||||
# TODO: actually listen to the return and change period
|
||||
def handle_schedule():
|
||||
self.process_schedule(self, loop_interval)
|
||||
new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000, io_loop=self.io_loop)
|
||||
|
||||
if before_connect:
|
||||
# Make sure there is a chance for one iteration to occur before connect
|
||||
handle_schedule()
|
||||
|
||||
if 'cleanup' not in self.periodic_callbacks:
|
||||
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
|
||||
|
||||
# start all the other callbacks
|
||||
for periodic_cb in six.itervalues(new_periodic_callbacks):
|
||||
periodic_cb.start()
|
||||
|
||||
self.periodic_callbacks.update(new_periodic_callbacks)
|
||||
|
||||
# Main Minion Tune In
|
||||
def tune_in(self, start=True):
|
||||
'''
|
||||
|
@ -2191,6 +2321,10 @@ class Minion(MinionBase):
|
|||
log.debug('Minion \'{0}\' trying to tune in'.format(self.opts['id']))
|
||||
|
||||
if start:
|
||||
if self.opts.get('beacons_before_connect', False):
|
||||
self.setup_beacons(before_connect=True)
|
||||
if self.opts.get('scheduler_before_connect', False):
|
||||
self.setup_scheduler(before_connect=True)
|
||||
self.sync_connect_master()
|
||||
if self.connected:
|
||||
self._fire_master_minion_start()
|
||||
|
@ -2205,31 +2339,9 @@ class Minion(MinionBase):
|
|||
# On first startup execute a state run if configured to do so
|
||||
self._state_run()
|
||||
|
||||
loop_interval = self.opts['loop_interval']
|
||||
self.setup_beacons()
|
||||
self.setup_scheduler()
|
||||
|
||||
try:
|
||||
if self.opts['grains_refresh_every']: # If exists and is not zero. In minutes, not seconds!
|
||||
if self.opts['grains_refresh_every'] > 1:
|
||||
log.debug(
|
||||
'Enabling the grains refresher. Will run every {0} minutes.'.format(
|
||||
self.opts['grains_refresh_every'])
|
||||
)
|
||||
else: # Clean up minute vs. minutes in log message
|
||||
log.debug(
|
||||
'Enabling the grains refresher. Will run every {0} minute.'.format(
|
||||
self.opts['grains_refresh_every'])
|
||||
|
||||
)
|
||||
self._refresh_grains_watcher(
|
||||
abs(self.opts['grains_refresh_every'])
|
||||
)
|
||||
except Exception as exc:
|
||||
log.error(
|
||||
'Exception occurred in attempt to initialize grain refresh routine during minion tune-in: {0}'.format(
|
||||
exc)
|
||||
)
|
||||
|
||||
self.periodic_callbacks = {}
|
||||
# schedule the stuff that runs every interval
|
||||
ping_interval = self.opts.get('ping_interval', 0) * 60
|
||||
if ping_interval > 0 and self.connected:
|
||||
|
@ -2247,30 +2359,7 @@ class Minion(MinionBase):
|
|||
except Exception:
|
||||
log.warning('Attempt to ping master failed.', exc_on_loglevel=logging.DEBUG)
|
||||
self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000, io_loop=self.io_loop)
|
||||
|
||||
self.periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
|
||||
|
||||
def handle_beacons():
|
||||
# Process Beacons
|
||||
beacons = None
|
||||
try:
|
||||
beacons = self.process_beacons(self.functions)
|
||||
except Exception:
|
||||
log.critical('The beacon errored: ', exc_info=True)
|
||||
if beacons and self.connected:
|
||||
self._fire_master(events=beacons, sync=False)
|
||||
|
||||
self.periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(handle_beacons, loop_interval * 1000, io_loop=self.io_loop)
|
||||
|
||||
# TODO: actually listen to the return and change period
|
||||
def handle_schedule():
|
||||
self.process_schedule(self, loop_interval)
|
||||
if hasattr(self, 'schedule'):
|
||||
self.periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000, io_loop=self.io_loop)
|
||||
|
||||
# start all the other callbacks
|
||||
for periodic_cb in six.itervalues(self.periodic_callbacks):
|
||||
periodic_cb.start()
|
||||
self.periodic_callbacks['ping'].start()
|
||||
|
||||
# add handler to subscriber
|
||||
if hasattr(self, 'pub_channel') and self.pub_channel is not None:
|
||||
|
|
|
@ -131,3 +131,51 @@ class MinionTestCase(TestCase):
|
|||
self.assertEqual(minion.jid_queue, [456, 789])
|
||||
finally:
|
||||
minion.destroy()
|
||||
|
||||
def test_beacons_before_connect(self):
|
||||
'''
|
||||
Tests that the 'beacons_before_connect' option causes the beacons to be initialized before connect.
|
||||
'''
|
||||
with patch('salt.minion.Minion.ctx', MagicMock(return_value={})), \
|
||||
patch('salt.minion.Minion.sync_connect_master', MagicMock(side_effect=RuntimeError('stop execution'))), \
|
||||
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \
|
||||
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)):
|
||||
mock_opts = copy.copy(salt.config.DEFAULT_MINION_OPTS)
|
||||
mock_opts['beacons_before_connect'] = True
|
||||
try:
|
||||
minion = salt.minion.Minion(mock_opts, io_loop=tornado.ioloop.IOLoop())
|
||||
|
||||
try:
|
||||
minion.tune_in(start=True)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
# Make sure beacons are initialized but the sheduler is not
|
||||
self.assertTrue('beacons' in minion.periodic_callbacks)
|
||||
self.assertTrue('schedule' not in minion.periodic_callbacks)
|
||||
finally:
|
||||
minion.destroy()
|
||||
|
||||
def test_scheduler_before_connect(self):
|
||||
'''
|
||||
Tests that the 'scheduler_before_connect' option causes the scheduler to be initialized before connect.
|
||||
'''
|
||||
with patch('salt.minion.Minion.ctx', MagicMock(return_value={})), \
|
||||
patch('salt.minion.Minion.sync_connect_master', MagicMock(side_effect=RuntimeError('stop execution'))), \
|
||||
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \
|
||||
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)):
|
||||
mock_opts = copy.copy(salt.config.DEFAULT_MINION_OPTS)
|
||||
mock_opts['scheduler_before_connect'] = True
|
||||
try:
|
||||
minion = salt.minion.Minion(mock_opts, io_loop=tornado.ioloop.IOLoop())
|
||||
|
||||
try:
|
||||
minion.tune_in(start=True)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
# Make sure the scheduler is initialized but the beacons are not
|
||||
self.assertTrue('schedule' in minion.periodic_callbacks)
|
||||
self.assertTrue('beacons' not in minion.periodic_callbacks)
|
||||
finally:
|
||||
minion.destroy()
|
||||
|
|
Loading…
Add table
Reference in a new issue