salt/tests/unit/utils/test_process.py
2020-02-26 00:57:58 +03:00

662 lines
23 KiB
Python

# -*- coding: utf-8 -*-
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import io
import os
import sys
import threading
import time
import signal
import multiprocessing
import functools
import datetime
import warnings
# Import Salt Testing libs
from tests.support.unit import TestCase, skipIf
from tests.support.mock import (
patch,
)
# Import salt libs
import salt.utils.platform
import salt.utils.process
from salt.utils.versions import warn_until_date
# Import 3rd-party libs
from salt.ext import six
from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
import psutil
def die(func):
'''
Add proc title
'''
@functools.wraps(func)
def wrapper(self):
# Strip off the "test_" from the function name
name = func.__name__[5:]
def _die():
salt.utils.process.appendproctitle('test_{0}'.format(name))
attrname = 'die_' + name
setattr(self, attrname, _die)
self.addCleanup(delattr, self, attrname)
return wrapper
def incr(func):
'''
Increment counter
'''
@functools.wraps(func)
def wrapper(self):
# Strip off the "test_" from the function name
name = func.__name__[5:]
def _incr(counter, num):
salt.utils.process.appendproctitle('test_{0}'.format(name))
for _ in range(0, num):
counter.value += 1
attrname = 'incr_' + name
setattr(self, attrname, _incr)
self.addCleanup(delattr, self, attrname)
return wrapper
def spin(func):
'''
Spin indefinitely
'''
@functools.wraps(func)
def wrapper(self):
# Strip off the "test_" from the function name
name = func.__name__[5:]
def _spin():
salt.utils.process.appendproctitle('test_{0}'.format(name))
while True:
time.sleep(1)
attrname = 'spin_' + name
setattr(self, attrname, _spin)
self.addCleanup(delattr, self, attrname)
return wrapper
class TestProcessManager(TestCase):
@spin
def test_basic(self):
'''
Make sure that the process is alive 2s later
'''
process_manager = salt.utils.process.ProcessManager()
process_manager.add_process(self.spin_basic)
initial_pid = next(six.iterkeys(process_manager._process_map))
time.sleep(2)
process_manager.check_children()
try:
assert initial_pid == next(six.iterkeys(process_manager._process_map))
finally:
process_manager.stop_restarting()
process_manager.kill_children()
time.sleep(0.5)
# Are there child processes still running?
if process_manager._process_map.keys():
process_manager.send_signal_to_processes(signal.SIGKILL)
process_manager.stop_restarting()
process_manager.kill_children()
@spin
def test_kill(self):
process_manager = salt.utils.process.ProcessManager()
process_manager.add_process(self.spin_kill)
initial_pid = next(six.iterkeys(process_manager._process_map))
# kill the child
if salt.utils.platform.is_windows():
os.kill(initial_pid, signal.SIGTERM)
else:
os.kill(initial_pid, signal.SIGKILL)
# give the OS time to give the signal...
time.sleep(0.1)
process_manager.check_children()
try:
assert initial_pid != next(six.iterkeys(process_manager._process_map))
finally:
process_manager.stop_restarting()
process_manager.kill_children()
time.sleep(0.5)
# Are there child processes still running?
if process_manager._process_map.keys():
process_manager.send_signal_to_processes(signal.SIGKILL)
process_manager.stop_restarting()
process_manager.kill_children()
@die
def test_restarting(self):
'''
Make sure that the process is alive 2s later
'''
process_manager = salt.utils.process.ProcessManager()
process_manager.add_process(self.die_restarting)
initial_pid = next(six.iterkeys(process_manager._process_map))
time.sleep(2)
process_manager.check_children()
try:
assert initial_pid != next(six.iterkeys(process_manager._process_map))
finally:
process_manager.stop_restarting()
process_manager.kill_children()
time.sleep(0.5)
# Are there child processes still running?
if process_manager._process_map.keys():
process_manager.send_signal_to_processes(signal.SIGKILL)
process_manager.stop_restarting()
process_manager.kill_children()
@skipIf(sys.version_info < (2, 7), 'Needs > Py 2.7 due to bug in stdlib')
@incr
def test_counter(self):
counter = multiprocessing.Value('i', 0)
process_manager = salt.utils.process.ProcessManager()
process_manager.add_process(self.incr_counter, args=(counter, 2))
time.sleep(1)
process_manager.check_children()
time.sleep(1)
# we should have had 2 processes go at it
try:
assert counter.value == 4
finally:
process_manager.stop_restarting()
process_manager.kill_children()
time.sleep(0.5)
# Are there child processes still running?
if process_manager._process_map.keys():
process_manager.send_signal_to_processes(signal.SIGKILL)
process_manager.stop_restarting()
process_manager.kill_children()
class TestThreadPool(TestCase):
def test_basic(self):
'''
Make sure the threadpool can do things
'''
def incr_counter(counter):
counter.value += 1
counter = multiprocessing.Value('i', 0)
pool = salt.utils.process.ThreadPool()
sent = pool.fire_async(incr_counter, args=(counter,))
self.assertTrue(sent)
time.sleep(1) # Sleep to let the threads do things
self.assertEqual(counter.value, 1)
self.assertEqual(pool._job_queue.qsize(), 0)
def test_full_queue(self):
'''
Make sure that a full threadpool acts as we expect
'''
def incr_counter(counter):
counter.value += 1
counter = multiprocessing.Value('i', 0)
# Create a pool with no workers and 1 queue size
pool = salt.utils.process.ThreadPool(0, 1)
# make sure we can put the one item in
sent = pool.fire_async(incr_counter, args=(counter,))
self.assertTrue(sent)
# make sure we can't put more in
sent = pool.fire_async(incr_counter, args=(counter,))
self.assertFalse(sent)
time.sleep(1) # Sleep to let the threads do things
# make sure no one updated the counter
self.assertEqual(counter.value, 0)
# make sure the queue is still full
self.assertEqual(pool._job_queue.qsize(), 1)
class TestProcess(TestCase):
def test_daemonize_if(self):
# pylint: disable=assignment-from-none
with patch('sys.argv', ['salt-call']):
ret = salt.utils.process.daemonize_if({})
self.assertEqual(None, ret)
ret = salt.utils.process.daemonize_if({'multiprocessing': False})
self.assertEqual(None, ret)
with patch('sys.platform', 'win'):
ret = salt.utils.process.daemonize_if({})
self.assertEqual(None, ret)
with patch('salt.utils.process.daemonize'), \
patch('sys.platform', 'linux2'):
salt.utils.process.daemonize_if({})
self.assertTrue(salt.utils.process.daemonize.called)
# pylint: enable=assignment-from-none
class TestProcessCallbacks(TestCase):
@staticmethod
def process_target(evt):
evt.set()
def test_callbacks(self):
'Validate Process call after fork and finalize methods'
teardown_to_mock = 'salt.log.setup.shutdown_multiprocessing_logging'
log_to_mock = 'salt.utils.process.Process._setup_process_logging'
with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
evt = multiprocessing.Event()
proc = salt.utils.process.Process(target=self.process_target, args=(evt,))
proc.run()
assert evt.is_set()
mb.assert_called()
ma.assert_called()
def test_callbacks_called_when_run_overriden(self):
'Validate Process sub classes call after fork and finalize methods when run is overridden'
class MyProcess(salt.utils.process.Process):
def __init__(self):
super(MyProcess, self).__init__()
self.evt = multiprocessing.Event()
def run(self):
self.evt.set()
teardown_to_mock = 'salt.log.setup.shutdown_multiprocessing_logging'
log_to_mock = 'salt.utils.process.Process._setup_process_logging'
with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
proc = MyProcess()
proc.run()
assert proc.evt.is_set()
ma.assert_called()
mb.assert_called()
class TestSignalHandlingProcess(TestCase):
@classmethod
def Process(cls, pid):
raise psutil.NoSuchProcess(pid)
@classmethod
def target(cls):
os.kill(os.getpid(), signal.SIGTERM)
@classmethod
def children(cls, *args, **kwargs):
raise psutil.NoSuchProcess(1)
def test_process_does_not_exist(self):
try:
with patch('psutil.Process', self.Process):
proc = salt.utils.process.SignalHandlingProcess(target=self.target)
proc.start()
except psutil.NoSuchProcess:
assert False, "psutil.NoSuchProcess raised"
def test_process_children_do_not_exist(self):
try:
with patch('psutil.Process.children', self.children):
proc = salt.utils.process.SignalHandlingProcess(target=self.target)
proc.start()
except psutil.NoSuchProcess:
assert False, "psutil.NoSuchProcess raised"
@staticmethod
def run_forever_sub_target(evt):
'Used by run_forever_target to create a sub-process'
while not evt.is_set():
time.sleep(1)
@staticmethod
def run_forever_target(sub_target, evt):
'A target that will run forever or until an event is set'
p = multiprocessing.Process(target=sub_target, args=(evt,))
p.start()
p.join()
@staticmethod
def kill_target_sub_proc():
pid = os.fork()
if pid == 0:
return
pid = os.fork()
if pid == 0:
return
time.sleep(.1)
try:
os.kill(os.getpid(), signal.SIGINT)
except KeyboardInterrupt:
pass
@skipIf(sys.platform.startswith('win'), 'No os.fork on Windows')
def test_signal_processing_regression_test(self):
evt = multiprocessing.Event()
sh_proc = salt.utils.process.SignalHandlingProcess(
target=self.run_forever_target,
args=(self.run_forever_sub_target, evt)
)
sh_proc.start()
proc = multiprocessing.Process(target=self.kill_target_sub_proc)
proc.start()
proc.join()
# When the bug exists, the kill_target_sub_proc signal will kill both
# processes. sh_proc will be alive if the bug is fixed
try:
assert sh_proc.is_alive()
finally:
evt.set()
sh_proc.join()
@staticmethod
def no_op_target():
pass
@staticmethod
def pid_setting_target(sub_target, val, evt):
val.value = os.getpid()
p = multiprocessing.Process(target=sub_target, args=(evt,))
p.start()
p.join()
@skipIf(sys.platform.startswith('win'), 'Required signals not supported on windows')
def test_signal_processing_handle_signals_called(self):
'Validate SignalHandlingProcess handles signals'
# Gloobal event to stop all processes we're creating
evt = multiprocessing.Event()
# Create a process to test signal handler
val = multiprocessing.Value('i', 0)
proc = salt.utils.process.SignalHandlingProcess(
target=self.pid_setting_target,
args=(self.run_forever_sub_target, val, evt),
)
proc.start()
# Create a second process that should not respond to SIGINT or SIGTERM
proc2 = multiprocessing.Process(
target=self.run_forever_target,
args=(self.run_forever_sub_target, evt),
)
proc2.start()
# Wait for the sub process to set its pid
while not val.value:
time.sleep(.3)
assert not proc.signal_handled()
# Send a signal that should get handled by the subprocess
os.kill(val.value, signal.SIGTERM)
# wait up to 10 seconds for signal handler:
start = time.time()
while time.time() - start < 10:
if proc.signal_handled():
break
time.sleep(.3)
try:
# Allow some time for the signal handler to do its thing
assert proc.signal_handled()
# Reap the signaled process
proc.join(1)
assert proc2.is_alive()
finally:
evt.set()
proc2.join(30)
proc.join(30)
class TestSignalHandlingProcessCallbacks(TestCase):
@staticmethod
def process_target(evt):
evt.set()
def test_callbacks(self):
'Validate SignalHandlingProcess call after fork and finalize methods'
teardown_to_mock = 'salt.log.setup.shutdown_multiprocessing_logging'
log_to_mock = 'salt.utils.process.Process._setup_process_logging'
sig_to_mock = 'salt.utils.process.SignalHandlingProcess._setup_signals'
# Mock _setup_signals so we do not register one for this process.
evt = multiprocessing.Event()
with patch(sig_to_mock):
with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
sh_proc = salt.utils.process.SignalHandlingProcess(
target=self.process_target,
args=(evt,)
)
sh_proc.run()
assert evt.is_set()
ma.assert_called()
mb.assert_called()
def test_callbacks_called_when_run_overriden(self):
'Validate SignalHandlingProcess sub classes call after fork and finalize methods when run is overridden'
class MyProcess(salt.utils.process.SignalHandlingProcess):
def __init__(self):
super(MyProcess, self).__init__()
self.evt = multiprocessing.Event()
def run(self):
self.evt.set()
teardown_to_mock = 'salt.log.setup.shutdown_multiprocessing_logging'
log_to_mock = 'salt.utils.process.Process._setup_process_logging'
sig_to_mock = 'salt.utils.process.SignalHandlingProcess._setup_signals'
# Mock _setup_signals so we do not register one for this process.
with patch(sig_to_mock):
with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
sh_proc = MyProcess()
sh_proc.run()
assert sh_proc.evt.is_set()
ma.assert_called()
mb.assert_called()
class TestDup2(TestCase):
def test_dup2_no_fileno(self):
'The dup2 method does not fail on streams without fileno support'
f1 = io.StringIO("some initial text data")
f2 = io.StringIO("some initial other text data")
with self.assertRaises(io.UnsupportedOperation):
f1.fileno()
with patch('os.dup2') as dup_mock:
try:
salt.utils.process.dup2(f1, f2)
except io.UnsupportedOperation:
assert False, 'io.UnsupportedOperation was raised'
assert not dup_mock.called
def null_target():
pass
def event_target(event):
while True:
if event.wait(5):
break
class TestProcessList(TestCase):
@staticmethod
def wait_for_proc(proc, timeout=10):
start = time.time()
while proc.is_alive():
if time.time() - start > timeout:
raise Exception("Process did not finishe before timeout")
time.sleep(.3)
def test_process_list_process(self):
plist = salt.utils.process.SubprocessList()
proc = multiprocessing.Process(target=null_target)
proc.start()
plist.add(proc)
assert proc in plist.processes
self.wait_for_proc(proc)
assert not proc.is_alive()
plist.cleanup()
assert proc not in plist.processes
def test_process_list_thread(self):
plist = salt.utils.process.SubprocessList()
thread = threading.Thread(target=null_target)
thread.start()
plist.add(thread)
assert thread in plist.processes
self.wait_for_proc(thread)
assert not thread.is_alive()
plist.cleanup()
assert thread not in plist.processes
def test_process_list_cleanup(self):
plist = salt.utils.process.SubprocessList()
event = multiprocessing.Event()
proc = multiprocessing.Process(target=event_target, args=[event])
proc.start()
plist.add(proc)
assert proc in plist.processes
plist.cleanup()
event.set()
assert proc in plist.processes
self.wait_for_proc(proc)
assert not proc.is_alive()
plist.cleanup()
assert proc not in plist.processes
class TestDeprecatedClassNames(TestCase):
@staticmethod
def process_target():
pass
@staticmethod
def patched_warn_until_date(current_date):
def _patched_warn_until_date(date,
message,
category=DeprecationWarning,
stacklevel=None,
_current_date=current_date,
_dont_call_warnings=False):
# Because we add another function in between, the stacklevel
# set in salt.utils.process, 3, needs to now be 4
stacklevel = 4
return warn_until_date(date,
message,
category=category,
stacklevel=stacklevel,
_current_date=_current_date,
_dont_call_warnings=_dont_call_warnings)
return _patched_warn_until_date
def test_multiprocessing_process_warning(self):
# We *always* want *all* warnings thrown on this module
warnings.filterwarnings('always', '', DeprecationWarning, __name__)
fake_utcnow = datetime.date(2021, 1, 1)
proc = None
try:
with patch('salt.utils.versions.warn_until_date', self.patched_warn_until_date(fake_utcnow)):
# Test warning
with warnings.catch_warnings(record=True) as recorded_warnings:
proc = salt.utils.process.MultiprocessingProcess(target=self.process_target)
self.assertEqual(
'Please stop using \'salt.utils.process.MultiprocessingProcess\' '
'and instead use \'salt.utils.process.Process\'. '
'\'salt.utils.process.MultiprocessingProcess\' will go away '
'after 2022-01-01.',
six.text_type(recorded_warnings[0].message)
)
finally:
if proc is not None:
del proc
def test_multiprocessing_process_runtime_error(self):
fake_utcnow = datetime.date(2022, 1, 1)
proc = None
try:
with patch('salt.utils.versions.warn_until_date', self.patched_warn_until_date(fake_utcnow)):
with self.assertRaisesRegex(
RuntimeError,
r"Please stop using 'salt.utils.process.MultiprocessingProcess' "
r"and instead use 'salt.utils.process.Process'. "
r"'salt.utils.process.MultiprocessingProcess' will go away "
r'after 2022-01-01. '
r'This warning\(now exception\) triggered on '
r"filename '(.*)test_process.py', line number ([\d]+), is "
r'supposed to be shown until ([\d-]+). Today is ([\d-]+). '
r'Please remove the warning.'):
proc = salt.utils.process.MultiprocessingProcess(target=self.process_target)
finally:
if proc is not None:
del proc
def test_signal_handling_multiprocessing_process_warning(self):
# We *always* want *all* warnings thrown on this module
warnings.filterwarnings('always', '', DeprecationWarning, __name__)
fake_utcnow = datetime.date(2021, 1, 1)
proc = None
try:
with patch('salt.utils.versions.warn_until_date', self.patched_warn_until_date(fake_utcnow)):
# Test warning
with warnings.catch_warnings(record=True) as recorded_warnings:
proc = salt.utils.process.SignalHandlingMultiprocessingProcess(target=self.process_target)
self.assertEqual(
'Please stop using \'salt.utils.process.SignalHandlingMultiprocessingProcess\' '
'and instead use \'salt.utils.process.SignalHandlingProcess\'. '
'\'salt.utils.process.SignalHandlingMultiprocessingProcess\' will go away '
'after 2022-01-01.',
six.text_type(recorded_warnings[0].message)
)
finally:
if proc is not None:
del proc
def test_signal_handling_multiprocessing_process_runtime_error(self):
fake_utcnow = datetime.date(2022, 1, 1)
proc = None
try:
with patch('salt.utils.versions.warn_until_date', self.patched_warn_until_date(fake_utcnow)):
with self.assertRaisesRegex(
RuntimeError,
r"Please stop using 'salt.utils.process.SignalHandlingMultiprocessingProcess' "
r"and instead use 'salt.utils.process.SignalHandlingProcess'. "
r"'salt.utils.process.SignalHandlingMultiprocessingProcess' will go away "
r'after 2022-01-01. '
r'This warning\(now exception\) triggered on '
r"filename '(.*)test_process.py', line number ([\d]+), is "
r'supposed to be shown until ([\d-]+). Today is ([\d-]+). '
r'Please remove the warning.'):
proc = salt.utils.process.SignalHandlingMultiprocessingProcess(target=self.process_target)
finally:
if proc is not None:
del proc