Merge pull request #54259 from dwoz/less_forking

Call os.fork less to avoid race conditions
This commit is contained in:
Daniel Wozniak 2019-08-23 19:12:41 -07:00 committed by GitHub
commit 583885149c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 180 additions and 137 deletions

View file

@ -331,6 +331,7 @@ class Maintenance(salt.utils.process.SignalHandlingMultiprocessingProcess):
self.loop_interval = self.schedule.loop_interval
except Exception as exc:
log.error('Exception %s occurred in scheduled job', exc)
self.schedule.cleanup_subprocesses()
def handle_presence(self, old_present):
'''

View file

@ -69,7 +69,6 @@ log = logging.getLogger(__name__)
def post_master_init(self, master):
log.debug("subclassed LazyLoaded _post_master_init")
if self.connected:
self.opts['master'] = master
@ -336,15 +335,6 @@ def thread_return(cls, minion_instance, opts, data):
'''
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
if opts['multiprocessing'] and not salt.utils.platform.is_windows():
# Shutdown the multiprocessing before daemonizing
salt.log.setup.shutdown_multiprocessing_logging()
salt.utils.process.daemonize_if(opts)
# Reconfigure multiprocessing logging after daemonizing
salt.log.setup.setup_multiprocessing_logging()
salt.utils.process.appendproctitle('{0}._thread_return {1}'.format(cls.__name__, data['jid']))
sdata = {'pid': os.getpid()}
@ -559,15 +549,6 @@ def thread_multi_return(cls, minion_instance, opts, data):
'''
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
if opts['multiprocessing'] and not salt.utils.platform.is_windows():
# Shutdown the multiprocessing before daemonizing
salt.log.setup.shutdown_multiprocessing_logging()
salt.utils.process.daemonize_if(opts)
# Reconfigure multiprocessing logging after daemonizing
salt.log.setup.setup_multiprocessing_logging()
salt.utils.process.appendproctitle('{0}._thread_multi_return {1}'.format(cls.__name__, data['jid']))
sdata = {'pid': os.getpid()}
@ -761,13 +742,8 @@ def handle_decoded_payload(self, data):
process.start()
else:
process.start()
# TODO: remove the windows specific check?
if multiprocessing_enabled and not salt.utils.platform.is_windows():
# we only want to join() immediately if we are daemonizing a process
process.join()
else:
self.win_proc.append(process)
process.name = '{}-Job-{}'.format(process.name, data['jid'])
self.subprocess_list.add(process)
def target_load(self, load):

View file

@ -423,6 +423,41 @@ class MinionBase(object):
self.opts = opts
self.beacons_leader = opts.get('beacons_leader', True)
def gen_modules(self, initial_load=False):
'''
Tell the minion to reload the execution modules
CLI Example:
.. code-block:: bash
salt '*' sys.reload_modules
'''
self.opts['pillar'] = salt.pillar.get_pillar(
self.opts,
self.opts['grains'],
self.opts['id'],
self.opts['saltenv'],
pillarenv=self.opts.get('pillarenv'),
).compile_pillar()
self.utils = salt.loader.utils(self.opts)
self.functions = salt.loader.minion_mods(self.opts, utils=self.utils)
self.serializers = salt.loader.serializers(self.opts)
self.returners = salt.loader.returners(self.opts, self.functions)
self.proxy = salt.loader.proxy(self.opts, self.functions, self.returners, None)
# TODO: remove
self.function_errors = {} # Keep the funcs clean
self.states = salt.loader.states(self.opts,
self.functions,
self.utils,
self.serializers)
self.rend = salt.loader.render(self.opts, self.functions)
# self.matcher = Matcher(self.opts, self.functions)
self.matchers = salt.loader.matchers(self.opts)
self.functions['sys.reload_modules'] = self.gen_modules
self.executors = salt.loader.executors(self.opts)
@staticmethod
def process_schedule(minion, loop_interval):
try:
@ -829,41 +864,6 @@ class SMinion(MinionBase):
salt.utils.yaml.safe_dump(self.opts['pillar'], fp_)
os.chmod(cache_sls, 0o600)
def gen_modules(self, initial_load=False):
'''
Tell the minion to reload the execution modules
CLI Example:
.. code-block:: bash
salt '*' sys.reload_modules
'''
self.opts['pillar'] = salt.pillar.get_pillar(
self.opts,
self.opts['grains'],
self.opts['id'],
self.opts['saltenv'],
pillarenv=self.opts.get('pillarenv'),
).compile_pillar()
self.utils = salt.loader.utils(self.opts)
self.functions = salt.loader.minion_mods(self.opts, utils=self.utils)
self.serializers = salt.loader.serializers(self.opts)
self.returners = salt.loader.returners(self.opts, self.functions)
self.proxy = salt.loader.proxy(self.opts, self.functions, self.returners, None)
# TODO: remove
self.function_errors = {} # Keep the funcs clean
self.states = salt.loader.states(self.opts,
self.functions,
self.utils,
self.serializers)
self.rend = salt.loader.render(self.opts, self.functions)
# self.matcher = Matcher(self.opts, self.functions)
self.matchers = salt.loader.matchers(self.opts)
self.functions['sys.reload_modules'] = self.gen_modules
self.executors = salt.loader.executors(self.opts)
class MasterMinion(object):
'''
@ -1102,6 +1102,7 @@ class Minion(MinionBase):
self._running = None
self.win_proc = []
self.subprocess_list = salt.utils.process.SubprocessList()
self.loaded_base_name = loaded_base_name
self.connected = False
self.restart = False
@ -1539,13 +1540,8 @@ class Minion(MinionBase):
process.start()
else:
process.start()
# TODO: remove the windows specific check?
if multiprocessing_enabled and not salt.utils.platform.is_windows():
# we only want to join() immediately if we are daemonizing a process
process.join()
else:
self.win_proc.append(process)
process.name = '{}-Job-{}'.format(process.name, data['jid'])
self.subprocess_list.add(process)
def ctx(self):
'''
@ -1602,17 +1598,9 @@ class Minion(MinionBase):
This method should be used as a threading target, start the actual
minion side execution.
'''
minion_instance.gen_modules()
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
if opts['multiprocessing'] and not salt.utils.platform.is_windows():
# Shutdown the multiprocessing before daemonizing
salt.log.setup.shutdown_multiprocessing_logging()
salt.utils.process.daemonize_if(opts)
# Reconfigure multiprocessing logging after daemonizing
salt.log.setup.setup_multiprocessing_logging()
salt.utils.process.appendproctitle('{0}._thread_return {1}'.format(cls.__name__, data['jid']))
sdata = {'pid': os.getpid()}
@ -1825,17 +1813,9 @@ class Minion(MinionBase):
This method should be used as a threading target, start the actual
minion side execution.
'''
minion_instance.gen_modules()
fn_ = os.path.join(minion_instance.proc_dir, data['jid'])
if opts['multiprocessing'] and not salt.utils.platform.is_windows():
# Shutdown the multiprocessing before daemonizing
salt.log.setup.shutdown_multiprocessing_logging()
salt.utils.process.daemonize_if(opts)
# Reconfigure multiprocessing logging after daemonizing
salt.log.setup.setup_multiprocessing_logging()
salt.utils.process.appendproctitle('{0}._thread_multi_return {1}'.format(cls.__name__, data['jid']))
sdata = {'pid': os.getpid()}
@ -2561,24 +2541,15 @@ class Minion(MinionBase):
log.debug('Firing beacons to master')
self._fire_master(events=data['beacons'])
def _fallback_cleanups(self):
def cleanup_subprocesses(self):
'''
Fallback cleanup routines, attempting to fix leaked processes, threads, etc.
Clean up subprocesses and spawned threads.
'''
# Add an extra fallback in case a forked process leaks through
multiprocessing.active_children()
# Cleanup Windows threads
if not salt.utils.platform.is_windows():
return
for thread in self.win_proc:
if not thread.is_alive():
thread.join()
try:
self.win_proc.remove(thread)
del thread
except (ValueError, NameError):
pass
self.subprocess_list.cleanup()
if self.schedule:
self.schedule.cleanup_subprocesses()
def _setup_core(self):
'''
@ -2609,10 +2580,7 @@ class Minion(MinionBase):
return
self._setup_core()
loop_interval = self.opts['loop_interval']
new_periodic_callbacks = {}
if 'beacons' not in self.periodic_callbacks:
self.beacons = salt.beacons.Beacon(self.opts, self.functions)
@ -2630,21 +2598,11 @@ class Minion(MinionBase):
event.fire_event({'beacons': beacons}, '__beacons_return')
event.destroy()
new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(
handle_beacons, loop_interval * 1000)
if before_connect:
# Make sure there is a chance for one iteration to occur before connect
handle_beacons()
if 'cleanup' not in self.periodic_callbacks:
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(
self._fallback_cleanups, loop_interval * 1000)
# start all the other callbacks
for periodic_cb in six.itervalues(new_periodic_callbacks):
periodic_cb.start()
self.periodic_callbacks.update(new_periodic_callbacks)
self.add_periodic_callback('beacons', handle_beacons)
def setup_scheduler(self, before_connect=False):
'''
@ -2654,7 +2612,6 @@ class Minion(MinionBase):
self._setup_core()
loop_interval = self.opts['loop_interval']
new_periodic_callbacks = {}
if 'schedule' not in self.periodic_callbacks:
if 'schedule' not in self.opts:
@ -2683,21 +2640,36 @@ class Minion(MinionBase):
# TODO: actually listen to the return and change period
def handle_schedule():
self.process_schedule(self, loop_interval)
new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000)
if before_connect:
# Make sure there is a chance for one iteration to occur before connect
handle_schedule()
if 'cleanup' not in self.periodic_callbacks:
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(
self._fallback_cleanups, loop_interval * 1000)
self.add_periodic_callback('schedule', handle_schedule)
# start all the other callbacks
for periodic_cb in six.itervalues(new_periodic_callbacks):
periodic_cb.start()
def add_periodic_callback(self, name, method, interval=1000):
'''
Add a periodic callback to the event loop and call it's start method.
If a callback by the given name exists this method returns False
'''
if name in self.periodic_callbacks:
return False
self.periodic_callbacks[name] = tornado.ioloop.PeriodicCallback(
method, interval * 1000,
)
self.periodic_callbacks[name].start()
return True
self.periodic_callbacks.update(new_periodic_callbacks)
def remove_periodic_callback(self, name):
'''
Remove a periodic callback.
If a callback by the given name does not exist this method returns False
'''
callback = self.periodic_callbacks.pop(name, None)
if callback is None:
return False
callback.stop()
return True
# Main Minion Tune In
def tune_in(self, start=True):
@ -2731,6 +2703,7 @@ class Minion(MinionBase):
self.setup_beacons()
self.setup_scheduler()
self.add_periodic_callback('cleanup', self.cleanup_subprocesses)
# schedule the stuff that runs every interval
ping_interval = self.opts.get('ping_interval', 0) * 60
@ -2757,8 +2730,8 @@ class Minion(MinionBase):
self._fire_master('ping', 'minion_ping', sync=False, timeout_handler=ping_timeout_handler)
except Exception:
log.warning('Attempt to ping master failed.', exc_on_loglevel=logging.DEBUG)
self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000)
self.periodic_callbacks['ping'].start()
self.remove_periodic_callbback('ping', ping_master)
self.add_periodic_callback('ping', ping_master, ping_interval)
# add handler to subscriber
if hasattr(self, 'pub_channel') and self.pub_channel is not None:

View file

@ -112,6 +112,11 @@ def daemonize(redirect_out=True):
def dup2(file1, file2):
'''
Duplicate file descriptor fd to fd2, closing the latter first if necessary.
This method is similar to os.dup2 but ignores streams that do not have a
supported fileno method.
'''
if isinstance(file1, int):
fno1 = file1
else:
@ -835,3 +840,30 @@ def default_signals(*signals):
signal.signal(signum, old_signals[signum])
del old_signals
class SubprocessList(object):
def __init__(self, processes=None, lock=None):
if processes is None:
self.processes = []
else:
self.processes = processes
if lock is None:
self.lock = multiprocessing.Lock()
else:
self.lock = lock
def add(self, proc):
with self.lock:
self.processes.append(proc)
log.debug('Subprocess %s added', proc.name)
def cleanup(self):
with self.lock:
for proc in self.processes:
if proc.is_alive():
continue
proc.join()
self.processes.remove(proc)
log.debug('Subprocess %s cleaned up', proc.name)

View file

@ -44,7 +44,6 @@ import salt.minion
import salt.payload
import salt.syspaths
import salt.exceptions
import salt.log.setup as log_setup
import salt.defaults.exitcodes
from salt.utils.odict import OrderedDict
@ -130,7 +129,8 @@ class Schedule(object):
cleanup=None,
proxy=None,
standalone=False,
utils=None):
utils=None,
_subprocess_list=None):
self.opts = opts
self.proxy = proxy
self.functions = functions
@ -158,6 +158,10 @@ class Schedule(object):
if cleanup:
for prefix in cleanup:
self.delete_job_prefix(prefix)
if _subprocess_list is None:
self._subprocess_list = salt.utils.process.SubprocessList()
else:
self._subprocess_list = _subprocess_list
def __getnewargs__(self):
return self.opts, self.functions, self.returners, self.intervals, None
@ -655,14 +659,6 @@ class Schedule(object):
ret['jid']
)
if multiprocessing_enabled and not salt.utils.platform.is_windows():
# Reconfigure multiprocessing logging after daemonizing
log_setup.setup_multiprocessing_logging()
if multiprocessing_enabled:
# Don't *BEFORE* to go into try to don't let it triple execute the finally section.
salt.utils.process.daemonize_if(self.opts)
# TODO: Make it readable! Splt to funcs, remove nested try-except-finally sections.
try:
@ -1710,10 +1706,13 @@ class Schedule(object):
# Reset current signals before starting the process in
# order not to inherit the current signal handlers
proc.start()
proc.join()
proc.name = '{}-Schedule-{}'.format(proc.name, data['name'])
self._subprocess_list.add(proc)
else:
proc = thread_cls(target=self.handle_func, args=(multiprocessing_enabled, func, data))
proc.start()
proc.name = '{}-Schedule-{}'.format(proc.name, data['name'])
self._subprocess_list.add(proc)
finally:
if multiprocessing_enabled and salt.utils.platform.is_windows():
# Restore our function references.
@ -1721,6 +1720,9 @@ class Schedule(object):
self.returners = returners
self.utils = utils
def cleanup_subprocesses(self):
self._subprocess_list.cleanup()
def clean_proc_dir(opts):

View file

@ -174,7 +174,7 @@ class CallTest(ShellCase, testprogram.TestProgramCase, ShellCaseCommonTestsMixin
'--config-dir {0} cmd.run "echo foo"'.format(
config_dir
),
timeout=60,
timeout=120,
catch_stderr=True,
with_retcode=True
)

View file

@ -5,6 +5,7 @@ from __future__ import absolute_import, print_function, unicode_literals
import io
import os
import sys
import threading
import time
import signal
import multiprocessing
@ -416,3 +417,61 @@ class TestDup2(TestCase):
except io.UnsupportedOperation:
assert False, 'io.UnsupportedOperation was raised'
assert not dup_mock.called
class TestProcessList(TestCase):
@staticmethod
def null_target():
pass
@staticmethod
def event_target(event):
while True:
if event.wait(5):
break
@staticmethod
def wait_for_proc(proc, timeout=10):
start = time.time()
while proc.is_alive():
if time.time() - start > timeout:
raise Exception("Process did not finishe before timeout")
time.sleep(.3)
def test_process_list_process(self):
plist = salt.utils.process.SubprocessList()
proc = multiprocessing.Process(target=self.null_target)
proc.start()
plist.add(proc)
assert proc in plist.processes
self.wait_for_proc(proc)
assert not proc.is_alive()
plist.cleanup()
assert proc not in plist.processes
def test_process_list_thread(self):
plist = salt.utils.process.SubprocessList()
thread = threading.Thread(target=self.null_target)
thread.start()
plist.add(thread)
assert thread in plist.processes
self.wait_for_proc(thread)
assert not thread.is_alive()
plist.cleanup()
assert thread not in plist.processes
def test_process_list_cleanup(self):
plist = salt.utils.process.SubprocessList()
event = multiprocessing.Event()
proc = multiprocessing.Process(target=self.event_target, args=[event])
proc.start()
plist.add(proc)
assert proc in plist.processes
plist.cleanup()
event.set()
assert proc in plist.processes
self.wait_for_proc(proc)
assert not proc.is_alive()
plist.cleanup()
assert proc not in plist.processes