mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #54259 from dwoz/less_forking
Call os.fork less to avoid race conditions
This commit is contained in:
commit
583885149c
7 changed files with 180 additions and 137 deletions
|
@ -331,6 +331,7 @@ class Maintenance(salt.utils.process.SignalHandlingMultiprocessingProcess):
|
|||
self.loop_interval = self.schedule.loop_interval
|
||||
except Exception as exc:
|
||||
log.error('Exception %s occurred in scheduled job', exc)
|
||||
self.schedule.cleanup_subprocesses()
|
||||
|
||||
def handle_presence(self, old_present):
|
||||
'''
|
||||
|
|
|
@ -69,7 +69,6 @@ log = logging.getLogger(__name__)
|
|||
|
||||
|
||||
def post_master_init(self, master):
|
||||
|
||||
log.debug("subclassed LazyLoaded _post_master_init")
|
||||
if self.connected:
|
||||
self.opts['master'] = master
|
||||
|
@ -336,15 +335,6 @@ def thread_return(cls, minion_instance, opts, data):
|
|||
'''
|
||||
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
|
||||
|
||||
if opts['multiprocessing'] and not salt.utils.platform.is_windows():
|
||||
# Shutdown the multiprocessing before daemonizing
|
||||
salt.log.setup.shutdown_multiprocessing_logging()
|
||||
|
||||
salt.utils.process.daemonize_if(opts)
|
||||
|
||||
# Reconfigure multiprocessing logging after daemonizing
|
||||
salt.log.setup.setup_multiprocessing_logging()
|
||||
|
||||
salt.utils.process.appendproctitle('{0}._thread_return {1}'.format(cls.__name__, data['jid']))
|
||||
|
||||
sdata = {'pid': os.getpid()}
|
||||
|
@ -559,15 +549,6 @@ def thread_multi_return(cls, minion_instance, opts, data):
|
|||
'''
|
||||
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
|
||||
|
||||
if opts['multiprocessing'] and not salt.utils.platform.is_windows():
|
||||
# Shutdown the multiprocessing before daemonizing
|
||||
salt.log.setup.shutdown_multiprocessing_logging()
|
||||
|
||||
salt.utils.process.daemonize_if(opts)
|
||||
|
||||
# Reconfigure multiprocessing logging after daemonizing
|
||||
salt.log.setup.setup_multiprocessing_logging()
|
||||
|
||||
salt.utils.process.appendproctitle('{0}._thread_multi_return {1}'.format(cls.__name__, data['jid']))
|
||||
|
||||
sdata = {'pid': os.getpid()}
|
||||
|
@ -761,13 +742,8 @@ def handle_decoded_payload(self, data):
|
|||
process.start()
|
||||
else:
|
||||
process.start()
|
||||
|
||||
# TODO: remove the windows specific check?
|
||||
if multiprocessing_enabled and not salt.utils.platform.is_windows():
|
||||
# we only want to join() immediately if we are daemonizing a process
|
||||
process.join()
|
||||
else:
|
||||
self.win_proc.append(process)
|
||||
process.name = '{}-Job-{}'.format(process.name, data['jid'])
|
||||
self.subprocess_list.add(process)
|
||||
|
||||
|
||||
def target_load(self, load):
|
||||
|
|
171
salt/minion.py
171
salt/minion.py
|
@ -423,6 +423,41 @@ class MinionBase(object):
|
|||
self.opts = opts
|
||||
self.beacons_leader = opts.get('beacons_leader', True)
|
||||
|
||||
def gen_modules(self, initial_load=False):
|
||||
'''
|
||||
Tell the minion to reload the execution modules
|
||||
|
||||
CLI Example:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
salt '*' sys.reload_modules
|
||||
'''
|
||||
self.opts['pillar'] = salt.pillar.get_pillar(
|
||||
self.opts,
|
||||
self.opts['grains'],
|
||||
self.opts['id'],
|
||||
self.opts['saltenv'],
|
||||
pillarenv=self.opts.get('pillarenv'),
|
||||
).compile_pillar()
|
||||
|
||||
self.utils = salt.loader.utils(self.opts)
|
||||
self.functions = salt.loader.minion_mods(self.opts, utils=self.utils)
|
||||
self.serializers = salt.loader.serializers(self.opts)
|
||||
self.returners = salt.loader.returners(self.opts, self.functions)
|
||||
self.proxy = salt.loader.proxy(self.opts, self.functions, self.returners, None)
|
||||
# TODO: remove
|
||||
self.function_errors = {} # Keep the funcs clean
|
||||
self.states = salt.loader.states(self.opts,
|
||||
self.functions,
|
||||
self.utils,
|
||||
self.serializers)
|
||||
self.rend = salt.loader.render(self.opts, self.functions)
|
||||
# self.matcher = Matcher(self.opts, self.functions)
|
||||
self.matchers = salt.loader.matchers(self.opts)
|
||||
self.functions['sys.reload_modules'] = self.gen_modules
|
||||
self.executors = salt.loader.executors(self.opts)
|
||||
|
||||
@staticmethod
|
||||
def process_schedule(minion, loop_interval):
|
||||
try:
|
||||
|
@ -829,41 +864,6 @@ class SMinion(MinionBase):
|
|||
salt.utils.yaml.safe_dump(self.opts['pillar'], fp_)
|
||||
os.chmod(cache_sls, 0o600)
|
||||
|
||||
def gen_modules(self, initial_load=False):
|
||||
'''
|
||||
Tell the minion to reload the execution modules
|
||||
|
||||
CLI Example:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
salt '*' sys.reload_modules
|
||||
'''
|
||||
self.opts['pillar'] = salt.pillar.get_pillar(
|
||||
self.opts,
|
||||
self.opts['grains'],
|
||||
self.opts['id'],
|
||||
self.opts['saltenv'],
|
||||
pillarenv=self.opts.get('pillarenv'),
|
||||
).compile_pillar()
|
||||
|
||||
self.utils = salt.loader.utils(self.opts)
|
||||
self.functions = salt.loader.minion_mods(self.opts, utils=self.utils)
|
||||
self.serializers = salt.loader.serializers(self.opts)
|
||||
self.returners = salt.loader.returners(self.opts, self.functions)
|
||||
self.proxy = salt.loader.proxy(self.opts, self.functions, self.returners, None)
|
||||
# TODO: remove
|
||||
self.function_errors = {} # Keep the funcs clean
|
||||
self.states = salt.loader.states(self.opts,
|
||||
self.functions,
|
||||
self.utils,
|
||||
self.serializers)
|
||||
self.rend = salt.loader.render(self.opts, self.functions)
|
||||
# self.matcher = Matcher(self.opts, self.functions)
|
||||
self.matchers = salt.loader.matchers(self.opts)
|
||||
self.functions['sys.reload_modules'] = self.gen_modules
|
||||
self.executors = salt.loader.executors(self.opts)
|
||||
|
||||
|
||||
class MasterMinion(object):
|
||||
'''
|
||||
|
@ -1102,6 +1102,7 @@ class Minion(MinionBase):
|
|||
|
||||
self._running = None
|
||||
self.win_proc = []
|
||||
self.subprocess_list = salt.utils.process.SubprocessList()
|
||||
self.loaded_base_name = loaded_base_name
|
||||
self.connected = False
|
||||
self.restart = False
|
||||
|
@ -1539,13 +1540,8 @@ class Minion(MinionBase):
|
|||
process.start()
|
||||
else:
|
||||
process.start()
|
||||
|
||||
# TODO: remove the windows specific check?
|
||||
if multiprocessing_enabled and not salt.utils.platform.is_windows():
|
||||
# we only want to join() immediately if we are daemonizing a process
|
||||
process.join()
|
||||
else:
|
||||
self.win_proc.append(process)
|
||||
process.name = '{}-Job-{}'.format(process.name, data['jid'])
|
||||
self.subprocess_list.add(process)
|
||||
|
||||
def ctx(self):
|
||||
'''
|
||||
|
@ -1602,17 +1598,9 @@ class Minion(MinionBase):
|
|||
This method should be used as a threading target, start the actual
|
||||
minion side execution.
|
||||
'''
|
||||
minion_instance.gen_modules()
|
||||
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
|
||||
|
||||
if opts['multiprocessing'] and not salt.utils.platform.is_windows():
|
||||
# Shutdown the multiprocessing before daemonizing
|
||||
salt.log.setup.shutdown_multiprocessing_logging()
|
||||
|
||||
salt.utils.process.daemonize_if(opts)
|
||||
|
||||
# Reconfigure multiprocessing logging after daemonizing
|
||||
salt.log.setup.setup_multiprocessing_logging()
|
||||
|
||||
salt.utils.process.appendproctitle('{0}._thread_return {1}'.format(cls.__name__, data['jid']))
|
||||
|
||||
sdata = {'pid': os.getpid()}
|
||||
|
@ -1825,17 +1813,9 @@ class Minion(MinionBase):
|
|||
This method should be used as a threading target, start the actual
|
||||
minion side execution.
|
||||
'''
|
||||
minion_instance.gen_modules()
|
||||
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
|
||||
|
||||
if opts['multiprocessing'] and not salt.utils.platform.is_windows():
|
||||
# Shutdown the multiprocessing before daemonizing
|
||||
salt.log.setup.shutdown_multiprocessing_logging()
|
||||
|
||||
salt.utils.process.daemonize_if(opts)
|
||||
|
||||
# Reconfigure multiprocessing logging after daemonizing
|
||||
salt.log.setup.setup_multiprocessing_logging()
|
||||
|
||||
salt.utils.process.appendproctitle('{0}._thread_multi_return {1}'.format(cls.__name__, data['jid']))
|
||||
|
||||
sdata = {'pid': os.getpid()}
|
||||
|
@ -2561,24 +2541,15 @@ class Minion(MinionBase):
|
|||
log.debug('Firing beacons to master')
|
||||
self._fire_master(events=data['beacons'])
|
||||
|
||||
def _fallback_cleanups(self):
|
||||
def cleanup_subprocesses(self):
|
||||
'''
|
||||
Fallback cleanup routines, attempting to fix leaked processes, threads, etc.
|
||||
Clean up subprocesses and spawned threads.
|
||||
'''
|
||||
# Add an extra fallback in case a forked process leaks through
|
||||
multiprocessing.active_children()
|
||||
|
||||
# Cleanup Windows threads
|
||||
if not salt.utils.platform.is_windows():
|
||||
return
|
||||
for thread in self.win_proc:
|
||||
if not thread.is_alive():
|
||||
thread.join()
|
||||
try:
|
||||
self.win_proc.remove(thread)
|
||||
del thread
|
||||
except (ValueError, NameError):
|
||||
pass
|
||||
self.subprocess_list.cleanup()
|
||||
if self.schedule:
|
||||
self.schedule.cleanup_subprocesses()
|
||||
|
||||
def _setup_core(self):
|
||||
'''
|
||||
|
@ -2609,10 +2580,7 @@ class Minion(MinionBase):
|
|||
return
|
||||
|
||||
self._setup_core()
|
||||
|
||||
loop_interval = self.opts['loop_interval']
|
||||
new_periodic_callbacks = {}
|
||||
|
||||
if 'beacons' not in self.periodic_callbacks:
|
||||
self.beacons = salt.beacons.Beacon(self.opts, self.functions)
|
||||
|
||||
|
@ -2630,21 +2598,11 @@ class Minion(MinionBase):
|
|||
event.fire_event({'beacons': beacons}, '__beacons_return')
|
||||
event.destroy()
|
||||
|
||||
new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(
|
||||
handle_beacons, loop_interval * 1000)
|
||||
if before_connect:
|
||||
# Make sure there is a chance for one iteration to occur before connect
|
||||
handle_beacons()
|
||||
|
||||
if 'cleanup' not in self.periodic_callbacks:
|
||||
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(
|
||||
self._fallback_cleanups, loop_interval * 1000)
|
||||
|
||||
# start all the other callbacks
|
||||
for periodic_cb in six.itervalues(new_periodic_callbacks):
|
||||
periodic_cb.start()
|
||||
|
||||
self.periodic_callbacks.update(new_periodic_callbacks)
|
||||
self.add_periodic_callback('beacons', handle_beacons)
|
||||
|
||||
def setup_scheduler(self, before_connect=False):
|
||||
'''
|
||||
|
@ -2654,7 +2612,6 @@ class Minion(MinionBase):
|
|||
self._setup_core()
|
||||
|
||||
loop_interval = self.opts['loop_interval']
|
||||
new_periodic_callbacks = {}
|
||||
|
||||
if 'schedule' not in self.periodic_callbacks:
|
||||
if 'schedule' not in self.opts:
|
||||
|
@ -2683,21 +2640,36 @@ class Minion(MinionBase):
|
|||
# TODO: actually listen to the return and change period
|
||||
def handle_schedule():
|
||||
self.process_schedule(self, loop_interval)
|
||||
new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000)
|
||||
|
||||
if before_connect:
|
||||
# Make sure there is a chance for one iteration to occur before connect
|
||||
handle_schedule()
|
||||
|
||||
if 'cleanup' not in self.periodic_callbacks:
|
||||
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(
|
||||
self._fallback_cleanups, loop_interval * 1000)
|
||||
self.add_periodic_callback('schedule', handle_schedule)
|
||||
|
||||
# start all the other callbacks
|
||||
for periodic_cb in six.itervalues(new_periodic_callbacks):
|
||||
periodic_cb.start()
|
||||
def add_periodic_callback(self, name, method, interval=1000):
|
||||
'''
|
||||
Add a periodic callback to the event loop and call it's start method.
|
||||
If a callback by the given name exists this method returns False
|
||||
'''
|
||||
if name in self.periodic_callbacks:
|
||||
return False
|
||||
self.periodic_callbacks[name] = tornado.ioloop.PeriodicCallback(
|
||||
method, interval * 1000,
|
||||
)
|
||||
self.periodic_callbacks[name].start()
|
||||
return True
|
||||
|
||||
self.periodic_callbacks.update(new_periodic_callbacks)
|
||||
def remove_periodic_callback(self, name):
|
||||
'''
|
||||
Remove a periodic callback.
|
||||
If a callback by the given name does not exist this method returns False
|
||||
'''
|
||||
callback = self.periodic_callbacks.pop(name, None)
|
||||
if callback is None:
|
||||
return False
|
||||
callback.stop()
|
||||
return True
|
||||
|
||||
# Main Minion Tune In
|
||||
def tune_in(self, start=True):
|
||||
|
@ -2731,6 +2703,7 @@ class Minion(MinionBase):
|
|||
|
||||
self.setup_beacons()
|
||||
self.setup_scheduler()
|
||||
self.add_periodic_callback('cleanup', self.cleanup_subprocesses)
|
||||
|
||||
# schedule the stuff that runs every interval
|
||||
ping_interval = self.opts.get('ping_interval', 0) * 60
|
||||
|
@ -2757,8 +2730,8 @@ class Minion(MinionBase):
|
|||
self._fire_master('ping', 'minion_ping', sync=False, timeout_handler=ping_timeout_handler)
|
||||
except Exception:
|
||||
log.warning('Attempt to ping master failed.', exc_on_loglevel=logging.DEBUG)
|
||||
self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000)
|
||||
self.periodic_callbacks['ping'].start()
|
||||
self.remove_periodic_callbback('ping', ping_master)
|
||||
self.add_periodic_callback('ping', ping_master, ping_interval)
|
||||
|
||||
# add handler to subscriber
|
||||
if hasattr(self, 'pub_channel') and self.pub_channel is not None:
|
||||
|
|
|
@ -112,6 +112,11 @@ def daemonize(redirect_out=True):
|
|||
|
||||
|
||||
def dup2(file1, file2):
|
||||
'''
|
||||
Duplicate file descriptor fd to fd2, closing the latter first if necessary.
|
||||
This method is similar to os.dup2 but ignores streams that do not have a
|
||||
supported fileno method.
|
||||
'''
|
||||
if isinstance(file1, int):
|
||||
fno1 = file1
|
||||
else:
|
||||
|
@ -835,3 +840,30 @@ def default_signals(*signals):
|
|||
signal.signal(signum, old_signals[signum])
|
||||
|
||||
del old_signals
|
||||
|
||||
|
||||
class SubprocessList(object):
|
||||
|
||||
def __init__(self, processes=None, lock=None):
|
||||
if processes is None:
|
||||
self.processes = []
|
||||
else:
|
||||
self.processes = processes
|
||||
if lock is None:
|
||||
self.lock = multiprocessing.Lock()
|
||||
else:
|
||||
self.lock = lock
|
||||
|
||||
def add(self, proc):
|
||||
with self.lock:
|
||||
self.processes.append(proc)
|
||||
log.debug('Subprocess %s added', proc.name)
|
||||
|
||||
def cleanup(self):
|
||||
with self.lock:
|
||||
for proc in self.processes:
|
||||
if proc.is_alive():
|
||||
continue
|
||||
proc.join()
|
||||
self.processes.remove(proc)
|
||||
log.debug('Subprocess %s cleaned up', proc.name)
|
||||
|
|
|
@ -44,7 +44,6 @@ import salt.minion
|
|||
import salt.payload
|
||||
import salt.syspaths
|
||||
import salt.exceptions
|
||||
import salt.log.setup as log_setup
|
||||
import salt.defaults.exitcodes
|
||||
from salt.utils.odict import OrderedDict
|
||||
|
||||
|
@ -130,7 +129,8 @@ class Schedule(object):
|
|||
cleanup=None,
|
||||
proxy=None,
|
||||
standalone=False,
|
||||
utils=None):
|
||||
utils=None,
|
||||
_subprocess_list=None):
|
||||
self.opts = opts
|
||||
self.proxy = proxy
|
||||
self.functions = functions
|
||||
|
@ -158,6 +158,10 @@ class Schedule(object):
|
|||
if cleanup:
|
||||
for prefix in cleanup:
|
||||
self.delete_job_prefix(prefix)
|
||||
if _subprocess_list is None:
|
||||
self._subprocess_list = salt.utils.process.SubprocessList()
|
||||
else:
|
||||
self._subprocess_list = _subprocess_list
|
||||
|
||||
def __getnewargs__(self):
|
||||
return self.opts, self.functions, self.returners, self.intervals, None
|
||||
|
@ -655,14 +659,6 @@ class Schedule(object):
|
|||
ret['jid']
|
||||
)
|
||||
|
||||
if multiprocessing_enabled and not salt.utils.platform.is_windows():
|
||||
# Reconfigure multiprocessing logging after daemonizing
|
||||
log_setup.setup_multiprocessing_logging()
|
||||
|
||||
if multiprocessing_enabled:
|
||||
# Don't *BEFORE* to go into try to don't let it triple execute the finally section.
|
||||
salt.utils.process.daemonize_if(self.opts)
|
||||
|
||||
# TODO: Make it readable! Splt to funcs, remove nested try-except-finally sections.
|
||||
try:
|
||||
|
||||
|
@ -1710,10 +1706,13 @@ class Schedule(object):
|
|||
# Reset current signals before starting the process in
|
||||
# order not to inherit the current signal handlers
|
||||
proc.start()
|
||||
proc.join()
|
||||
proc.name = '{}-Schedule-{}'.format(proc.name, data['name'])
|
||||
self._subprocess_list.add(proc)
|
||||
else:
|
||||
proc = thread_cls(target=self.handle_func, args=(multiprocessing_enabled, func, data))
|
||||
proc.start()
|
||||
proc.name = '{}-Schedule-{}'.format(proc.name, data['name'])
|
||||
self._subprocess_list.add(proc)
|
||||
finally:
|
||||
if multiprocessing_enabled and salt.utils.platform.is_windows():
|
||||
# Restore our function references.
|
||||
|
@ -1721,6 +1720,9 @@ class Schedule(object):
|
|||
self.returners = returners
|
||||
self.utils = utils
|
||||
|
||||
def cleanup_subprocesses(self):
|
||||
self._subprocess_list.cleanup()
|
||||
|
||||
|
||||
def clean_proc_dir(opts):
|
||||
|
||||
|
|
|
@ -174,7 +174,7 @@ class CallTest(ShellCase, testprogram.TestProgramCase, ShellCaseCommonTestsMixin
|
|||
'--config-dir {0} cmd.run "echo foo"'.format(
|
||||
config_dir
|
||||
),
|
||||
timeout=60,
|
||||
timeout=120,
|
||||
catch_stderr=True,
|
||||
with_retcode=True
|
||||
)
|
||||
|
|
|
@ -5,6 +5,7 @@ from __future__ import absolute_import, print_function, unicode_literals
|
|||
import io
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import signal
|
||||
import multiprocessing
|
||||
|
@ -416,3 +417,61 @@ class TestDup2(TestCase):
|
|||
except io.UnsupportedOperation:
|
||||
assert False, 'io.UnsupportedOperation was raised'
|
||||
assert not dup_mock.called
|
||||
|
||||
|
||||
class TestProcessList(TestCase):
|
||||
|
||||
@staticmethod
|
||||
def null_target():
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def event_target(event):
|
||||
while True:
|
||||
if event.wait(5):
|
||||
break
|
||||
|
||||
@staticmethod
|
||||
def wait_for_proc(proc, timeout=10):
|
||||
start = time.time()
|
||||
while proc.is_alive():
|
||||
if time.time() - start > timeout:
|
||||
raise Exception("Process did not finishe before timeout")
|
||||
time.sleep(.3)
|
||||
|
||||
def test_process_list_process(self):
|
||||
plist = salt.utils.process.SubprocessList()
|
||||
proc = multiprocessing.Process(target=self.null_target)
|
||||
proc.start()
|
||||
plist.add(proc)
|
||||
assert proc in plist.processes
|
||||
self.wait_for_proc(proc)
|
||||
assert not proc.is_alive()
|
||||
plist.cleanup()
|
||||
assert proc not in plist.processes
|
||||
|
||||
def test_process_list_thread(self):
|
||||
plist = salt.utils.process.SubprocessList()
|
||||
thread = threading.Thread(target=self.null_target)
|
||||
thread.start()
|
||||
plist.add(thread)
|
||||
assert thread in plist.processes
|
||||
self.wait_for_proc(thread)
|
||||
assert not thread.is_alive()
|
||||
plist.cleanup()
|
||||
assert thread not in plist.processes
|
||||
|
||||
def test_process_list_cleanup(self):
|
||||
plist = salt.utils.process.SubprocessList()
|
||||
event = multiprocessing.Event()
|
||||
proc = multiprocessing.Process(target=self.event_target, args=[event])
|
||||
proc.start()
|
||||
plist.add(proc)
|
||||
assert proc in plist.processes
|
||||
plist.cleanup()
|
||||
event.set()
|
||||
assert proc in plist.processes
|
||||
self.wait_for_proc(proc)
|
||||
assert not proc.is_alive()
|
||||
plist.cleanup()
|
||||
assert proc not in plist.processes
|
||||
|
|
Loading…
Add table
Reference in a new issue