WIP Test for kill SIGTERM

This commit is contained in:
David Murphy 2024-02-29 13:59:10 -07:00 committed by Daniel Wozniak
parent 443fdd3e34
commit 56a11ba9d1
3 changed files with 211 additions and 64 deletions

View file

@ -251,7 +251,7 @@ class GitProvider:
# DGM try getting machine_identifier
# get machine_identifier
self.mach_id = salt.utils.files.get_machine_identifier()
log.debug(f"machine_id for lock file, machine_id '{self.mach_id}'")
log.debug("DGM machine_id for lock file, machine_id %s", self.mach_id)
self.global_saltenv = salt.utils.data.repack_dictlist(
self.opts.get(f"{self.role}_saltenv", []),
@ -920,7 +920,7 @@ class GitProvider:
)
return False
except NotImplementedError as exc:
log.warning(f"fetch got NotImplementedError, exc '{exc}'")
log.warning("fetch got NotImplementedError exception %s", exc)
def _lock(self, lock_type="update", failhard=False):
"""
@ -1041,6 +1041,8 @@ class GitProvider:
contextmanager here because the lock is meant to stay and not be
automatically removed.
"""
dbg_msg = f"DGM GitProvider lock entry, pid '{os.getpid()}'"
log.warning(dbg_msg)
success = []
failed = []
try:

View file

@ -514,6 +514,8 @@ class ProcessManager:
This will deterimine if it is a Process class, otherwise it assumes
it is a function
"""
dbg_msg = f"DGM process add_process entry, tgt '{tgt}', args '{args}', kwargs '{kwargs}', name '{name}'"
log.warning(dbg_msg)
if args is None:
args = []
if kwargs is None:
@ -529,9 +531,16 @@ class ProcessManager:
if isinstance(process, SignalHandlingProcess):
with default_signals(signal.SIGINT, signal.SIGTERM):
log.warning(
"DGM process add_process with default_signals, process start"
)
process.start()
else:
log.warning(
"DGM process add_process without default_signals, process start"
)
process.start()
log.debug("Started '%s' with pid %s", process.name, process.pid)
self._process_map[process.pid] = {
"tgt": tgt,
@ -539,6 +548,7 @@ class ProcessManager:
"kwargs": kwargs,
"Process": process,
}
return process
def restart_process(self, pid):
@ -577,7 +587,9 @@ class ProcessManager:
self._restart_processes = False
def send_signal_to_processes(self, signal_):
log.warning(f"DGM process send_signal_to_processes signal '{signal_}'")
dbg_msg = f"DGM process send_signal_to_processes signal '{signal_}'"
log.warning(dbg_msg)
if salt.utils.platform.is_windows() and signal_ in (
signal.SIGTERM,
signal.SIGINT,
@ -596,14 +608,14 @@ class ProcessManager:
for pid in self._process_map.copy():
try:
log.warning(
f"DGM process send_signal_to_processes kill pid '{pid}', signal '{signal_}'"
)
dbg_msg = f"DGM process send_signal_to_processes kill pid '{pid}', signal '{signal_}'"
log.warning(dbg_msg)
os.kill(pid, signal_)
dbg_msg = f"DGM process sent_signal_to_processes os.kill pid '{pid}', signal '{signal_}'"
log.warning(dbg_msg)
except OSError as exc:
log.warning(
f"DGM process send_signal_to_processes OSError exc, '{exc}'"
)
dbg_msg = f"DGM process send_signal_to_processes OSError exc, '{exc}'"
log.warning(dbg_msg)
if exc.errno not in (errno.ESRCH, errno.EACCES):
# If it's not a "No such process" error, raise it
raise
@ -686,7 +698,9 @@ class ProcessManager:
p_map["Process"].terminate()
else:
for pid, p_map in self._process_map.copy().items():
log.warning("DGM Terminating pid %s: %s", pid, p_map["Process"])
dgm_p_map = p_map["Process"]
dgm_msg = f"DGM process kill_children Terminating pid '{pid}': '{dgm_p_map}', args '{args}'"
log.warning(dgm_msg)
log.trace("Terminating pid %s: %s", pid, p_map["Process"])
if args:
# escalate the signal to the process
@ -696,6 +710,99 @@ class ProcessManager:
pass
try:
p_map["Process"].terminate()
# need to go through and clean up any resources left around like lock files if using gitfs
# example lockfile /var/cache/salt/master/gitfs/work/NlJQs6Pss_07AugikCrmqfmqEFrfPbCDBqGLBiCd3oU=/_/update.lk
mach_id = salt.utils.files.get_machine_identifier()
## cache_dir = self.opts.get("cachedir", None)
## gitfs_active = self.opts.get("gitfs_remotes", None)
cache_dir = "/tmp"
gitfs_active = True
terminate_pid = pid
dbg_msg = f"DGM process kill_children, cache_dir '{cache_dir}', gitfs_active '{gitfs_active}'"
log.warning(dbg_msg)
if cache_dir and gitfs_active:
# check for gitfs file locks to ensure no resource leaks
# last chance to clean up any missed unlock droppings
## cache_dir = Path(cache_dir + "/gitfs/work")
cache_dir = Path(cache_dir)
dbg_msg = f"DGM process kill_children ,find for final cache_dir '{cache_dir}'"
log.warning(dbg_msg)
if cache_dir.exists and cache_dir.is_dir():
file_list = list(cache_dir.glob("**/*.lk"))
dbg_msg = f"DGM process kill_children ,find for final cache_dir '{cache_dir}', produced glob file_list '{file_list}'"
log.warning(dbg_msg)
file_del_list = []
try:
for file_name in file_list:
dbg_msg = f"DGM process kill_children , checking file name '{file_name}'"
log.warning(dbg_msg)
with salt.utils.files.fopen(file_name, "r") as fd_:
try:
file_pid = int(
salt.utils.stringutils.to_unicode(
fd_.readline()
).rstrip()
)
except ValueError:
# Lock file is empty, set pid to 0 so it evaluates as False.
file_pid = 0
try:
file_mach_id = int(
salt.utils.stringutils.to_unicode(
fd_.readline()
).rstrip()
)
except ValueError:
# Lock file is empty, set mach_id to 0 so it evaluates as False.
file_mach_id = 0
dbg_msg = f"DGM process kill_children , terminate_pid '{terminate_pid}', mach_id '{mach_id}', file_pid '{file_pid}', file_mach_id '{file_mach_id}'"
log.warning(dbg_msg)
if terminate_pid == file_pid:
if mach_id != file_mach_id:
if not file_mach_id:
msg = f"gitfs lock file for pid '{file_pid}' does not contain a machine id, deleting lock file which may affect if using multi-master with shared gitfs cache, the lock may have been obtained by another master recommend updating Salt version on other masters to a version which insert machine identification in lock a file."
log.debug(msg)
file_del_list.append(
(file_name, file_pid, file_mach_id)
)
else:
file_del_list.append(
(file_name, file_pid, file_mach_id)
)
except FileNotFoundError:
log.debug("gitfs lock file: %s not found", file_name)
for file_name, file_pid, file_mach_id in file_del_list:
try:
dbg_msg = f"DGM process kill_children file_pid '{file_pid}', file_mach_id '{file_mach_id}', removing file name '{file_name}'"
log.warning(dbg_msg)
os.remove(file_name)
except OSError as exc:
if exc.errno == errno.ENOENT:
# No lock file present
msg = f"SIGTERM clean up of resources attempted to remove lock file {file_name}, pid '{file_pid}', machine identifier '{mach_id}' but it did not exist, exception : {exc} "
log.debug(msg)
elif exc.errno == errno.EISDIR:
# Somehow this path is a directory. Should never happen
# unless some wiseguy manually creates a directory at this
# path, but just in case, handle it.
try:
shutil.rmtree(file_name)
except OSError as exc:
msg = f"SIGTERM clean up of resources, lock file '{file_name}', pid '{file_pid}', machine identifier '{file_mach_id}' was a directory, removed directory, exception : '{exc}'"
log.debug(msg)
else:
msg = f"SIGTERM clean up of resources, unable to remove lock file '{file_name}', pid '{file_pid}', machine identifier '{file_mach_id}', exception : '{exc}'"
log.debug(msg)
else:
msg = f"SIGTERM clean up of resources, removed lock file '{file_name}', pid '{file_pid}', machine identifier '{file_mach_id}'"
log.debug(msg)
except OSError as exc:
if exc.errno not in (errno.ESRCH, errno.EACCES):
raise
@ -792,7 +899,8 @@ class ProcessManager:
log.warning("DGM process terminate exit")
def _handle_signals(self, *args, **kwargs):
log.warning(f"DGM process _handle_signals args '{args}', kwargs '{kwargs}'")
dbg_msg = f"DGM process _handle_signals args '{args}', kwargs '{kwargs}'"
log.warning(dbg_msg)
# first lets reset signal handlers to default one to prevent running this twice
signal.signal(signal.SIGTERM, signal.SIG_IGN)
@ -803,6 +911,8 @@ class ProcessManager:
# check that this is the correct process, children inherit this
# handler, if we are in a child lets just run the original handler
dbg_msg = f"DGM process _handle_signals os.getpid '{os.getpid()}', self pid '{self._pid}'"
log.warning(dbg_msg)
if os.getpid() != self._pid:
if callable(self._sigterm_handler):
return self._sigterm_handler(*args)
@ -811,6 +921,8 @@ class ProcessManager:
else:
return
dbg_msg = f"DGM process _handle_signals call self.kill_children, args '{args}', kwargs '{kwargs}'"
log.warning(dbg_msg)
# Terminate child processes
self.kill_children(*args, **kwargs)
@ -1077,9 +1189,8 @@ class SignalHandlingProcess(Process):
signal.signal(signal.SIGTERM, self._handle_signals)
def _handle_signals(self, signum, sigframe):
log.warning(
f"DGM SignalHandlingProcess _handle_signals, signum '{signum}', sigframe '{sigframe}'"
)
dbg_msg = f"DGM SignalHandlingProcess _handle_signals, signum '{signum}', sigframe '{sigframe}'"
log.warning(dbg_msg)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
@ -1092,13 +1203,13 @@ class SignalHandlingProcess(Process):
log.debug(msg)
mach_id = salt.utils.files.get_machine_identifier()
log.debug(
dbg_msg = (
f"exiting for process id '{os.getpid()}' and machine identifer '{mach_id}'"
)
log.debug(dbg_msg)
log.warning(
f"DGM _handle_signals about to check HAS_PSUTIL, for process id '{os.getpid()}' and machine identifer '{mach_id}'"
)
dbg_msg = f"DGM _handle_signals about to check HAS_PSUTIL, for process id '{os.getpid()}' and machine identifer '{mach_id}'"
log.warning(dbg_msg)
if HAS_PSUTIL:
try:
@ -1121,28 +1232,24 @@ class SignalHandlingProcess(Process):
# example lockfile /var/cache/salt/master/gitfs/work/NlJQs6Pss_07AugikCrmqfmqEFrfPbCDBqGLBiCd3oU=/_/update.lk
cache_dir = self.opts.get("cachedir", None)
gitfs_active = self.opts.get("gitfs_remotes", None)
log.warning(
f"DGM _handle_signals HAS_PSUTIL, cache_dir '{cache_dir}', gitfs_active '{gitfs_active}'"
)
dbg_msg = f"DGM _handle_signals HAS_PSUTIL, cache_dir '{cache_dir}', gitfs_active '{gitfs_active}'"
log.warning(dbg_msg)
if cache_dir and gitfs_active:
# check for gitfs file locks to ensure no resource leaks
# last chance to clean up any missed unlock droppings
cache_dir = Path(cache_dir + "/gitfs/work")
log.warning(
f"DGM _handle_signals HAS_PSUTIL,find for final cache_dir '{cache_dir}'"
)
dbg_msg = f"DGM _handle_signals HAS_PSUTIL,find for final cache_dir '{cache_dir}'"
log.warning(dbg_msg)
if cache_dir.exists and cache_dir.is_dir():
file_list = list(cache_dir.glob("**/*.lk"))
log.warning(
f"DGM _handle_signals HAS_PSUTIL,find for final cache_dir '{cache_dir}', produced glob file_list '{file_list}'"
)
dbg_msg = f"DGM _handle_signals HAS_PSUTIL,find for final cache_dir '{cache_dir}', produced glob file_list '{file_list}'"
log.warning(dbg_msg)
file_del_list = []
try:
for file_name in file_list:
log.warning(
f"DGM _handle_signals HAS_PSUTIL, checking file name '{file_name}'"
)
dbg_msg = f"DGM _handle_signals HAS_PSUTIL, checking file name '{file_name}'"
log.warning(dbg_msg)
with salt.utils.files.fopen(file_name, "r") as fd_:
try:
file_pid = int(
@ -1162,9 +1269,8 @@ class SignalHandlingProcess(Process):
except ValueError:
# Lock file is empty, set mach_id to 0 so it evaluates as False.
file_mach_id = 0
log.warning(
f"DGM _handle_signals HAS_PSUTIL, cur_pid '{cur_pid}', mach_id '{mach_id}', file_pid '{file_pid}', file_mach_id '{file_mach_id}'"
)
dbg_msg = f"DGM _handle_signals HAS_PSUTIL, cur_pid '{cur_pid}', mach_id '{mach_id}', file_pid '{file_pid}', file_mach_id '{file_mach_id}'"
log.warning(dbg_msg)
if cur_pid == file_pid:
if mach_id != file_mach_id:
if not file_mach_id:
@ -1181,11 +1287,10 @@ class SignalHandlingProcess(Process):
except FileNotFoundError:
log.debug("gitfs lock file: %s not found", file_name)
for (file_name, file_pid, file_mach_id) in file_del_list:
for file_name, file_pid, file_mach_id in file_del_list:
try:
log.warning(
f"DGM _handle_signals file_pid '{file_pid}', file_mach_id '{file_mach_id}', removing file name '{file_name}'"
)
dbg_msg = f"DGM _handle_signals file_pid '{file_pid}', file_mach_id '{file_mach_id}', removing file name '{file_name}'"
log.warning(dbg_msg)
os.remove(file_name)
except OSError as exc:
if exc.errno == errno.ENOENT:
@ -1229,6 +1334,8 @@ def default_signals(*signals):
"""
Temporarily restore signals to their default values.
"""
dbg_msg = f"DGM default_signals entry, signals '{signals}'"
log.warning(dbg_msg)
old_signals = {}
for signum in signals:
try:
@ -1238,6 +1345,7 @@ def default_signals(*signals):
# This happens when a netapi module attempts to run a function
# using wheel_async, because the process trying to register signals
# will not be the main PID.
log.warning("DGM Failed to register signal for signum %d: %s", signum, exc)
log.trace("Failed to register signal for signum %d: %s", signum, exc)
else:
old_signals[signum] = saved_signal
@ -1247,6 +1355,8 @@ def default_signals(*signals):
yield
finally:
# Restore signals
dbg_msg = f"DGM default_signals entry, restoring old signals '{old_signals}'"
log.warning(dbg_msg)
for signum in old_signals:
signal.signal(signum, old_signals[signum])

View file

@ -5,6 +5,7 @@ any remotes.
import logging
import pathlib
import signal
import tempfile
import time
@ -22,7 +23,14 @@ from salt.utils.immutabletypes import freeze
from salt.utils.verify import verify_env
from tests.support.runtests import RUNTIME_VARS
# import multiprocessing
log = logging.getLogger(__name__)
## logger = multiprocessing.log_to_stderr()
## logger.setLevel(logging.INFO)
## ## log = logger.getLogger(__name__)
## log = logger.getLogger()
@pytest.fixture(scope="session", autouse=True)
@ -319,21 +327,25 @@ class TestGitBase(AdaptedConfigurationTestCaseMixin):
)
def init_remote(self):
log.debug(f"DGM MockedProvider init_remote tmp_name '{tmp_name}'")
dbg_msg = f"DGM MockedProvider init_remote tmp_name '{tmp_name}'"
log.debug(dbg_msg)
self.gitdir = salt.utils.path.join(tmp_name, ".git")
log.debug(f"DGM MockedProvider init_remote gitdir '{self.gitdir}'")
dbg_msg = f"DGM MockedProvider init_remote gitdir '{self.gitdir}'"
log.debug(dbg_msg)
self.repo = True
new = False
return new
def envs(self):
dgm_test_base = ["base"]
log.debug(f"DGM MockedProvider env base '{dgm_test_base}'")
dbg_msg = f"DGM MockedProvider env base '{dgm_test_base}'"
log.debug(dbg_msg)
return ["base"]
def _fetch(self):
self.fetched = True
log.debug(f"DGM MockedProvider _fetch self.fetched '{self.fetched}'")
dbg_msg = f"DGM MockedProvider _fetch self.fetched '{self.fetched}'"
log.debug(dbg_msg)
# Clear the instance map so that we make sure to create a new instance
# for this test class.
@ -477,7 +489,7 @@ def process_kill_test(main_class):
Check that lock is obtained and then it should be released by SIGTERM checks
"""
log.debug("DGM process_kill_test entry")
log.debug("DGM process_kill_test entry pid, '{os.getpid()}'")
provider = main_class.remotes[0]
provider.lock()
@ -502,48 +514,71 @@ def test_git_provider_sigterm_cleanup(main_class, caplog):
provider = main_class.remotes[0]
procmgr = salt.utils.process.ProcessManager(wait_for_kill=30)
proc = procmgr.add_process(process_kill_test, args=(main_class,), name="test_kill")
## DGM find lock file location
## provider.lock()
## file_name = provider._get_lock_file("update")
## log.debug(f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}'")
## proc = multiprocessing.Process(target=process_kill_test)
## procmgr = salt.utils.process.ProcessManager(wait_for_kill=30)
## proc = procmgr.add_process(process_kill_test, args=(main_class,), name="test_kill")
## proc.start()
while not proc.is_alive():
log.debug(
"DGM test_git_provider_sigterm_cleanup sleeping waiting for child process to become alive"
# Reset signals to default ones before adding processes to the process
# manager. We don't want the processes being started to inherit those
# signal handlers
with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM):
procmgr = salt.utils.process.ProcessManager(wait_for_kill=5)
proc = procmgr.add_process(
process_kill_test, args=(main_class,), name="test_kill"
)
# Install the SIGINT/SIGTERM handlers if not done so far
if signal.getsignal(signal.SIGINT) is signal.SIG_DFL:
# No custom signal handling was added, install our own
signal.signal(signal.SIGINT, procmgr._handle_signals)
if signal.getsignal(signal.SIGTERM) is signal.SIG_DFL:
# No custom signal handling was added, install our own
signal.signal(signal.SIGTERM, procmgr._handle_signals)
while not proc.is_alive():
dbg_msg = "DGM test_git_provider_sigterm_cleanup sleeping waiting for child process to become alive"
log.debug(dbg_msg)
time.sleep(1) # give some time for it to be started
# child process should be alive
log.debug(
f"DGM test_git_provider_sigterm_cleanup child process is alive with pid '{proc.pid}'"
)
dbg_msg = f"DGM test_git_provider_sigterm_cleanup child process is alive with pid '{proc.pid}'"
log.debug(dbg_msg)
file_name = provider._get_lock_file("update")
log.debug(
f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}'"
)
dbg_msg = f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}'"
log.debug(dbg_msg)
assert pathlib.Path(file_name).exists()
assert pathlib.Path(file_name).is_file()
log.debug(
f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', exists and is a file, send SIGTERM signal"
)
dbg_msg = f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', exists and is a file, send SIGTERM signal"
log.debug(dbg_msg)
proc.terminate() # sends a SIGTERM
procmgr.terminate() # sends a SIGTERM
time.sleep(1) # give some time for it to terminate
log.debug("DGM test_git_provider_sigterm_cleanup lock , post terminate")
assert not proc.is_alive()
log.debug("DGM test_git_provider_sigterm_cleanup lock , child is not alive")
dbg_msg = "DGM test_git_provider_sigterm_cleanup lock , child is not alive"
log.debug(dbg_msg)
test_file_exits = pathlib.Path(file_name).exists()
log.debug(
f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', does it exist anymore '{test_file_exits}'"
)
dbg_msg = f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', does it exist anymore '{test_file_exits}'"
log.debug(dbg_msg)
assert not pathlib.Path(file_name).exists()
log.debug(
f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', does NOT exist anymore"
)
dbg_msg = f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', does NOT exist anymore"
log.debug(dbg_msg)
log.debug("DGM test_git_provider_sigterm_cleanup exit")