Merge pull request #44494 from skizunov/develop2

Fix broken `beacons_before_connect` feature
This commit is contained in:
Nicole Thomas 2017-12-07 13:24:49 -05:00 committed by GitHub
commit febb913743
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 200 additions and 63 deletions

View file

@ -862,6 +862,10 @@ class MinionManager(MinionBase):
failed = False
while True:
try:
if minion.opts.get('beacons_before_connect', False):
minion.setup_beacons(before_connect=True)
if minion.opts.get('scheduler_before_connect', False):
minion.setup_scheduler(before_connect=True)
yield minion.connect_master(failed=failed)
minion.tune_in(start=False)
break
@ -936,6 +940,7 @@ class Minion(MinionBase):
# True means the Minion is fully functional and ready to handle events.
self.ready = False
self.jid_queue = jid_queue or []
self.periodic_callbacks = {}
if io_loop is None:
if HAS_ZMQ:
@ -967,6 +972,19 @@ class Minion(MinionBase):
# post_master_init
if not salt.utils.is_proxy():
self.opts['grains'] = salt.loader.grains(opts)
else:
if self.opts.get('beacons_before_connect', False):
log.warning(
'\'beacons_before_connect\' is not supported '
'for proxy minions. Setting to False'
)
self.opts['beacons_before_connect'] = False
if self.opts.get('scheduler_before_connect', False):
log.warning(
'\'scheduler_before_connect\' is not supported '
'for proxy minions. Setting to False'
)
self.opts['scheduler_before_connect'] = False
log.info('Creating minion process manager')
@ -1070,19 +1088,22 @@ class Minion(MinionBase):
pillarenv=self.opts.get('pillarenv')
).compile_pillar()
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
self.serial = salt.payload.Serial(self.opts)
self.mod_opts = self._prep_mod_opts()
self.matcher = Matcher(self.opts, self.functions)
self.beacons = salt.beacons.Beacon(self.opts, self.functions)
uid = salt.utils.get_uid(user=self.opts.get('user', None))
self.proc_dir = get_proc_dir(self.opts['cachedir'], uid=uid)
if not self.ready:
self._setup_core()
elif self.connected and self.opts['pillar']:
# The pillar has changed due to the connection to the master.
# Reload the functions so that they can use the new pillar data.
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
if hasattr(self, 'schedule'):
self.schedule.functions = self.functions
self.schedule.returners = self.returners
self.schedule = salt.utils.schedule.Schedule(
self.opts,
self.functions,
self.returners,
cleanup=[master_event(type='alive')])
if not hasattr(self, 'schedule'):
self.schedule = salt.utils.schedule.Schedule(
self.opts,
self.functions,
self.returners,
cleanup=[master_event(type='alive')])
# add default scheduling jobs to the minions scheduler
if self.opts['mine_enabled'] and 'mine.update' in self.functions:
@ -1136,9 +1157,6 @@ class Minion(MinionBase):
self.schedule.delete_job(master_event(type='alive', master=self.opts['master']), persist=True)
self.schedule.delete_job(master_event(type='failback'), persist=True)
self.grains_cache = self.opts['grains']
self.ready = True
def _return_retry_timer(self):
'''
Based on the minion configuration, either return a randomized timer or
@ -2180,6 +2198,118 @@ class Minion(MinionBase):
except (ValueError, NameError):
pass
def _setup_core(self):
'''
Set up the core minion attributes.
This is safe to call multiple times.
'''
if not self.ready:
# First call. Initialize.
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
self.serial = salt.payload.Serial(self.opts)
self.mod_opts = self._prep_mod_opts()
self.matcher = Matcher(self.opts, self.functions)
uid = salt.utils.get_uid(user=self.opts.get('user', None))
self.proc_dir = get_proc_dir(self.opts['cachedir'], uid=uid)
self.grains_cache = self.opts['grains']
self.ready = True
def setup_beacons(self, before_connect=False):
'''
Set up the beacons.
This is safe to call multiple times.
'''
self._setup_core()
loop_interval = self.opts['loop_interval']
new_periodic_callbacks = {}
if 'beacons' not in self.periodic_callbacks:
self.beacons = salt.beacons.Beacon(self.opts, self.functions)
def handle_beacons():
# Process Beacons
beacons = None
try:
beacons = self.process_beacons(self.functions)
except Exception:
log.critical('The beacon errored: ', exc_info=True)
if beacons and self.connected:
self._fire_master(events=beacons)
new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(handle_beacons, loop_interval * 1000, io_loop=self.io_loop)
if before_connect:
# Make sure there is a chance for one iteration to occur before connect
handle_beacons()
if 'cleanup' not in self.periodic_callbacks:
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
# start all the other callbacks
for periodic_cb in six.itervalues(new_periodic_callbacks):
periodic_cb.start()
self.periodic_callbacks.update(new_periodic_callbacks)
def setup_scheduler(self, before_connect=False):
'''
Set up the scheduler.
This is safe to call multiple times.
'''
self._setup_core()
loop_interval = self.opts['loop_interval']
new_periodic_callbacks = {}
if 'schedule' not in self.periodic_callbacks:
if 'schedule' not in self.opts:
self.opts['schedule'] = {}
if not hasattr(self, 'schedule'):
self.schedule = salt.utils.schedule.Schedule(
self.opts,
self.functions,
self.returners,
cleanup=[master_event(type='alive')])
try:
if self.opts['grains_refresh_every']: # If exists and is not zero. In minutes, not seconds!
if self.opts['grains_refresh_every'] > 1:
log.debug(
'Enabling the grains refresher. Will run every {0} minutes.'.format(
self.opts['grains_refresh_every'])
)
else: # Clean up minute vs. minutes in log message
log.debug(
'Enabling the grains refresher. Will run every {0} minute.'.format(
self.opts['grains_refresh_every'])
)
self._refresh_grains_watcher(
abs(self.opts['grains_refresh_every'])
)
except Exception as exc:
log.error(
'Exception occurred in attempt to initialize grain refresh routine during minion tune-in: {0}'.format(
exc)
)
# TODO: actually listen to the return and change period
def handle_schedule():
self.process_schedule(self, loop_interval)
new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000, io_loop=self.io_loop)
if before_connect:
# Make sure there is a chance for one iteration to occur before connect
handle_schedule()
if 'cleanup' not in self.periodic_callbacks:
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
# start all the other callbacks
for periodic_cb in six.itervalues(new_periodic_callbacks):
periodic_cb.start()
self.periodic_callbacks.update(new_periodic_callbacks)
# Main Minion Tune In
def tune_in(self, start=True):
'''
@ -2191,6 +2321,10 @@ class Minion(MinionBase):
log.debug('Minion \'{0}\' trying to tune in'.format(self.opts['id']))
if start:
if self.opts.get('beacons_before_connect', False):
self.setup_beacons(before_connect=True)
if self.opts.get('scheduler_before_connect', False):
self.setup_scheduler(before_connect=True)
self.sync_connect_master()
if self.connected:
self._fire_master_minion_start()
@ -2205,31 +2339,9 @@ class Minion(MinionBase):
# On first startup execute a state run if configured to do so
self._state_run()
loop_interval = self.opts['loop_interval']
self.setup_beacons()
self.setup_scheduler()
try:
if self.opts['grains_refresh_every']: # If exists and is not zero. In minutes, not seconds!
if self.opts['grains_refresh_every'] > 1:
log.debug(
'Enabling the grains refresher. Will run every {0} minutes.'.format(
self.opts['grains_refresh_every'])
)
else: # Clean up minute vs. minutes in log message
log.debug(
'Enabling the grains refresher. Will run every {0} minute.'.format(
self.opts['grains_refresh_every'])
)
self._refresh_grains_watcher(
abs(self.opts['grains_refresh_every'])
)
except Exception as exc:
log.error(
'Exception occurred in attempt to initialize grain refresh routine during minion tune-in: {0}'.format(
exc)
)
self.periodic_callbacks = {}
# schedule the stuff that runs every interval
ping_interval = self.opts.get('ping_interval', 0) * 60
if ping_interval > 0 and self.connected:
@ -2247,30 +2359,7 @@ class Minion(MinionBase):
except Exception:
log.warning('Attempt to ping master failed.', exc_on_loglevel=logging.DEBUG)
self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000, io_loop=self.io_loop)
self.periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
def handle_beacons():
# Process Beacons
beacons = None
try:
beacons = self.process_beacons(self.functions)
except Exception:
log.critical('The beacon errored: ', exc_info=True)
if beacons and self.connected:
self._fire_master(events=beacons, sync=False)
self.periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(handle_beacons, loop_interval * 1000, io_loop=self.io_loop)
# TODO: actually listen to the return and change period
def handle_schedule():
self.process_schedule(self, loop_interval)
if hasattr(self, 'schedule'):
self.periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000, io_loop=self.io_loop)
# start all the other callbacks
for periodic_cb in six.itervalues(self.periodic_callbacks):
periodic_cb.start()
self.periodic_callbacks['ping'].start()
# add handler to subscriber
if hasattr(self, 'pub_channel') and self.pub_channel is not None:

View file

@ -131,3 +131,51 @@ class MinionTestCase(TestCase):
self.assertEqual(minion.jid_queue, [456, 789])
finally:
minion.destroy()
def test_beacons_before_connect(self):
'''
Tests that the 'beacons_before_connect' option causes the beacons to be initialized before connect.
'''
with patch('salt.minion.Minion.ctx', MagicMock(return_value={})), \
patch('salt.minion.Minion.sync_connect_master', MagicMock(side_effect=RuntimeError('stop execution'))), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)):
mock_opts = copy.copy(salt.config.DEFAULT_MINION_OPTS)
mock_opts['beacons_before_connect'] = True
try:
minion = salt.minion.Minion(mock_opts, io_loop=tornado.ioloop.IOLoop())
try:
minion.tune_in(start=True)
except RuntimeError:
pass
# Make sure beacons are initialized but the sheduler is not
self.assertTrue('beacons' in minion.periodic_callbacks)
self.assertTrue('schedule' not in minion.periodic_callbacks)
finally:
minion.destroy()
def test_scheduler_before_connect(self):
'''
Tests that the 'scheduler_before_connect' option causes the scheduler to be initialized before connect.
'''
with patch('salt.minion.Minion.ctx', MagicMock(return_value={})), \
patch('salt.minion.Minion.sync_connect_master', MagicMock(side_effect=RuntimeError('stop execution'))), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)):
mock_opts = copy.copy(salt.config.DEFAULT_MINION_OPTS)
mock_opts['scheduler_before_connect'] = True
try:
minion = salt.minion.Minion(mock_opts, io_loop=tornado.ioloop.IOLoop())
try:
minion.tune_in(start=True)
except RuntimeError:
pass
# Make sure the scheduler is initialized but the beacons are not
self.assertTrue('schedule' in minion.periodic_callbacks)
self.assertTrue('beacons' not in minion.periodic_callbacks)
finally:
minion.destroy()