Working test for SIGTERM of held lock

This commit is contained in:
David Murphy 2024-03-05 11:59:13 -07:00 committed by Daniel Wozniak
parent 56a11ba9d1
commit 2265376154
3 changed files with 72 additions and 235 deletions

View file

@ -949,6 +949,7 @@ class GitProvider:
os.write(fh_, salt.utils.stringutils.to_bytes(str(os.getpid())))
os.write(fh_, salt.utils.stringutils.to_bytes("\n"))
os.write(fh_, salt.utils.stringutils.to_bytes(str(self.mach_id)))
os.write(fh_, salt.utils.stringutils.to_bytes("\n"))
except OSError as exc:
if exc.errno == errno.EEXIST:
@ -957,40 +958,57 @@ class GitProvider:
pid = int(
salt.utils.stringutils.to_unicode(fd_.readline()).rstrip()
)
mach_id = int(
salt.utils.stringutils.to_unicode(fd_.readline()).rstrip()
)
except ValueError:
# Lock file is empty, set pid to 0 so it evaluates as
# False.
pid = 0
try:
mach_id = salt.utils.stringutils.to_unicode(
fd_.readline()
).rstrip()
except ValueError as exc:
# Lock file is empty, set machine id to 0 so it evaluates as
# False.
mach_id = 0
global_lock_key = self.role + "_global_lock"
lock_file = self._get_lock_file(lock_type=lock_type)
if self.opts[global_lock_key]:
msg = (
f"{global_lock_key} is enabled and {lock_type} lockfile {lock_file} is present for "
f"{self.role} remote '{self.id}' on machine_id {self.mach_id}."
f"{self.role} remote '{self.id}' on machine_id {self.mach_id} with pid '{pid}'."
)
if pid:
msg += f" Process {pid} obtained the lock"
if self.mach_id or mach_id:
msg += f" Process {pid} obtained the lock for machine_id {mach_id}, current machine_id {self.mach_id}"
else:
msg += f" Process {pid} obtained the lock"
msg += f" for machine_id {mach_id}, current machine_id {self.mach_id}"
if not pid_exists(pid):
msg += (
" but this process is not running. The "
"update may have been interrupted. If "
"using multi-master with shared gitfs "
"cache, the lock may have been obtained "
"by another master"
)
if self.mach_id != mach_id:
msg += f", with machine_id {mach_id}"
msg += (
" but this process is not running. The "
"update may have been interrupted. If "
"using multi-master with shared gitfs "
"cache, the lock may have been obtained "
"by another master, with machine_id {mach_id}"
)
else:
msg += "."
msg += (
" but this process is not running. The "
"update may have been interrupted. "
" Given this process is for the same machine "
" the lock will be reallocated to new process "
)
log.warning(msg)
success, fail = self._clear_lock()
if success:
return self.__lock(
lock_type="update", failhard=failhard
)
elif failhard:
raise
return
log.warning(msg)
if failhard:
raise
@ -1041,8 +1059,6 @@ class GitProvider:
contextmanager here because the lock is meant to stay and not be
automatically removed.
"""
dbg_msg = f"DGM GitProvider lock entry, pid '{os.getpid()}'"
log.warning(dbg_msg)
success = []
failed = []
try:

View file

@ -514,8 +514,6 @@ class ProcessManager:
This will deterimine if it is a Process class, otherwise it assumes
it is a function
"""
dbg_msg = f"DGM process add_process entry, tgt '{tgt}', args '{args}', kwargs '{kwargs}', name '{name}'"
log.warning(dbg_msg)
if args is None:
args = []
if kwargs is None:
@ -531,14 +529,8 @@ class ProcessManager:
if isinstance(process, SignalHandlingProcess):
with default_signals(signal.SIGINT, signal.SIGTERM):
log.warning(
"DGM process add_process with default_signals, process start"
)
process.start()
else:
log.warning(
"DGM process add_process without default_signals, process start"
)
process.start()
log.debug("Started '%s' with pid %s", process.name, process.pid)
@ -587,9 +579,6 @@ class ProcessManager:
self._restart_processes = False
def send_signal_to_processes(self, signal_):
dbg_msg = f"DGM process send_signal_to_processes signal '{signal_}'"
log.warning(dbg_msg)
if salt.utils.platform.is_windows() and signal_ in (
signal.SIGTERM,
signal.SIGINT,
@ -608,14 +597,8 @@ class ProcessManager:
for pid in self._process_map.copy():
try:
dbg_msg = f"DGM process send_signal_to_processes kill pid '{pid}', signal '{signal_}'"
log.warning(dbg_msg)
os.kill(pid, signal_)
dbg_msg = f"DGM process sent_signal_to_processes os.kill pid '{pid}', signal '{signal_}'"
log.warning(dbg_msg)
except OSError as exc:
dbg_msg = f"DGM process send_signal_to_processes OSError exc, '{exc}'"
log.warning(dbg_msg)
if exc.errno not in (errno.ESRCH, errno.EACCES):
# If it's not a "No such process" error, raise it
raise
@ -677,7 +660,6 @@ class ProcessManager:
"""
Kill all of the children
"""
log.warning("DGM process kill_children entry")
if salt.utils.platform.is_windows():
if multiprocessing.current_process().name != "MainProcess":
# Since the main process will kill subprocesses by tree,
@ -698,9 +680,6 @@ class ProcessManager:
p_map["Process"].terminate()
else:
for pid, p_map in self._process_map.copy().items():
dgm_p_map = p_map["Process"]
dgm_msg = f"DGM process kill_children Terminating pid '{pid}': '{dgm_p_map}', args '{args}'"
log.warning(dgm_msg)
log.trace("Terminating pid %s: %s", pid, p_map["Process"])
if args:
# escalate the signal to the process
@ -711,98 +690,6 @@ class ProcessManager:
try:
p_map["Process"].terminate()
# need to go through and clean up any resources left around like lock files if using gitfs
# example lockfile /var/cache/salt/master/gitfs/work/NlJQs6Pss_07AugikCrmqfmqEFrfPbCDBqGLBiCd3oU=/_/update.lk
mach_id = salt.utils.files.get_machine_identifier()
## cache_dir = self.opts.get("cachedir", None)
## gitfs_active = self.opts.get("gitfs_remotes", None)
cache_dir = "/tmp"
gitfs_active = True
terminate_pid = pid
dbg_msg = f"DGM process kill_children, cache_dir '{cache_dir}', gitfs_active '{gitfs_active}'"
log.warning(dbg_msg)
if cache_dir and gitfs_active:
# check for gitfs file locks to ensure no resource leaks
# last chance to clean up any missed unlock droppings
## cache_dir = Path(cache_dir + "/gitfs/work")
cache_dir = Path(cache_dir)
dbg_msg = f"DGM process kill_children ,find for final cache_dir '{cache_dir}'"
log.warning(dbg_msg)
if cache_dir.exists and cache_dir.is_dir():
file_list = list(cache_dir.glob("**/*.lk"))
dbg_msg = f"DGM process kill_children ,find for final cache_dir '{cache_dir}', produced glob file_list '{file_list}'"
log.warning(dbg_msg)
file_del_list = []
try:
for file_name in file_list:
dbg_msg = f"DGM process kill_children , checking file name '{file_name}'"
log.warning(dbg_msg)
with salt.utils.files.fopen(file_name, "r") as fd_:
try:
file_pid = int(
salt.utils.stringutils.to_unicode(
fd_.readline()
).rstrip()
)
except ValueError:
# Lock file is empty, set pid to 0 so it evaluates as False.
file_pid = 0
try:
file_mach_id = int(
salt.utils.stringutils.to_unicode(
fd_.readline()
).rstrip()
)
except ValueError:
# Lock file is empty, set mach_id to 0 so it evaluates as False.
file_mach_id = 0
dbg_msg = f"DGM process kill_children , terminate_pid '{terminate_pid}', mach_id '{mach_id}', file_pid '{file_pid}', file_mach_id '{file_mach_id}'"
log.warning(dbg_msg)
if terminate_pid == file_pid:
if mach_id != file_mach_id:
if not file_mach_id:
msg = f"gitfs lock file for pid '{file_pid}' does not contain a machine id, deleting lock file which may affect if using multi-master with shared gitfs cache, the lock may have been obtained by another master recommend updating Salt version on other masters to a version which insert machine identification in lock a file."
log.debug(msg)
file_del_list.append(
(file_name, file_pid, file_mach_id)
)
else:
file_del_list.append(
(file_name, file_pid, file_mach_id)
)
except FileNotFoundError:
log.debug("gitfs lock file: %s not found", file_name)
for file_name, file_pid, file_mach_id in file_del_list:
try:
dbg_msg = f"DGM process kill_children file_pid '{file_pid}', file_mach_id '{file_mach_id}', removing file name '{file_name}'"
log.warning(dbg_msg)
os.remove(file_name)
except OSError as exc:
if exc.errno == errno.ENOENT:
# No lock file present
msg = f"SIGTERM clean up of resources attempted to remove lock file {file_name}, pid '{file_pid}', machine identifier '{mach_id}' but it did not exist, exception : {exc} "
log.debug(msg)
elif exc.errno == errno.EISDIR:
# Somehow this path is a directory. Should never happen
# unless some wiseguy manually creates a directory at this
# path, but just in case, handle it.
try:
shutil.rmtree(file_name)
except OSError as exc:
msg = f"SIGTERM clean up of resources, lock file '{file_name}', pid '{file_pid}', machine identifier '{file_mach_id}' was a directory, removed directory, exception : '{exc}'"
log.debug(msg)
else:
msg = f"SIGTERM clean up of resources, unable to remove lock file '{file_name}', pid '{file_pid}', machine identifier '{file_mach_id}', exception : '{exc}'"
log.debug(msg)
else:
msg = f"SIGTERM clean up of resources, removed lock file '{file_name}', pid '{file_pid}', machine identifier '{file_mach_id}'"
log.debug(msg)
except OSError as exc:
if exc.errno not in (errno.ESRCH, errno.EACCES):
raise
@ -890,18 +777,11 @@ class ProcessManager:
"""
Properly terminate this process manager instance
"""
log.warning("DGM process terminate entry")
self.stop_restarting()
log.warning("DGM process terminate send signal SIGTERM")
self.send_signal_to_processes(signal.SIGTERM)
log.warning("DGM process terminate kill children")
self.kill_children()
log.warning("DGM process terminate exit")
def _handle_signals(self, *args, **kwargs):
dbg_msg = f"DGM process _handle_signals args '{args}', kwargs '{kwargs}'"
log.warning(dbg_msg)
# first lets reset signal handlers to default one to prevent running this twice
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
@ -911,8 +791,6 @@ class ProcessManager:
# check that this is the correct process, children inherit this
# handler, if we are in a child lets just run the original handler
dbg_msg = f"DGM process _handle_signals os.getpid '{os.getpid()}', self pid '{self._pid}'"
log.warning(dbg_msg)
if os.getpid() != self._pid:
if callable(self._sigterm_handler):
return self._sigterm_handler(*args)
@ -921,8 +799,6 @@ class ProcessManager:
else:
return
dbg_msg = f"DGM process _handle_signals call self.kill_children, args '{args}', kwargs '{kwargs}'"
log.warning(dbg_msg)
# Terminate child processes
self.kill_children(*args, **kwargs)
@ -1189,9 +1065,6 @@ class SignalHandlingProcess(Process):
signal.signal(signal.SIGTERM, self._handle_signals)
def _handle_signals(self, signum, sigframe):
dbg_msg = f"DGM SignalHandlingProcess _handle_signals, signum '{signum}', sigframe '{sigframe}'"
log.warning(dbg_msg)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
msg = f"{self.__class__.__name__} received a "
@ -1208,12 +1081,9 @@ class SignalHandlingProcess(Process):
)
log.debug(dbg_msg)
dbg_msg = f"DGM _handle_signals about to check HAS_PSUTIL, for process id '{os.getpid()}' and machine identifer '{mach_id}'"
log.warning(dbg_msg)
cur_pid = os.getpid()
if HAS_PSUTIL:
try:
cur_pid = os.getpid()
process = psutil.Process(cur_pid)
if hasattr(process, "children"):
for child in process.children(recursive=True):
@ -1232,24 +1102,17 @@ class SignalHandlingProcess(Process):
# example lockfile /var/cache/salt/master/gitfs/work/NlJQs6Pss_07AugikCrmqfmqEFrfPbCDBqGLBiCd3oU=/_/update.lk
cache_dir = self.opts.get("cachedir", None)
gitfs_active = self.opts.get("gitfs_remotes", None)
dbg_msg = f"DGM _handle_signals HAS_PSUTIL, cache_dir '{cache_dir}', gitfs_active '{gitfs_active}'"
log.warning(dbg_msg)
if cache_dir and gitfs_active:
# check for gitfs file locks to ensure no resource leaks
# last chance to clean up any missed unlock droppings
cache_dir = Path(cache_dir + "/gitfs/work")
dbg_msg = f"DGM _handle_signals HAS_PSUTIL,find for final cache_dir '{cache_dir}'"
log.warning(dbg_msg)
if cache_dir.exists and cache_dir.is_dir():
file_list = list(cache_dir.glob("**/*.lk"))
dbg_msg = f"DGM _handle_signals HAS_PSUTIL,find for final cache_dir '{cache_dir}', produced glob file_list '{file_list}'"
log.warning(dbg_msg)
file_del_list = []
file_pid = 0
file_mach_id = 0
try:
for file_name in file_list:
dbg_msg = f"DGM _handle_signals HAS_PSUTIL, checking file name '{file_name}'"
log.warning(dbg_msg)
with salt.utils.files.fopen(file_name, "r") as fd_:
try:
file_pid = int(
@ -1261,7 +1124,7 @@ class SignalHandlingProcess(Process):
# Lock file is empty, set pid to 0 so it evaluates as False.
file_pid = 0
try:
file_mach_id = int(
file_mach_id = (
salt.utils.stringutils.to_unicode(
fd_.readline()
).rstrip()
@ -1269,8 +1132,7 @@ class SignalHandlingProcess(Process):
except ValueError:
# Lock file is empty, set mach_id to 0 so it evaluates as False.
file_mach_id = 0
dbg_msg = f"DGM _handle_signals HAS_PSUTIL, cur_pid '{cur_pid}', mach_id '{mach_id}', file_pid '{file_pid}', file_mach_id '{file_mach_id}'"
log.warning(dbg_msg)
if cur_pid == file_pid:
if mach_id != file_mach_id:
if not file_mach_id:
@ -1289,8 +1151,6 @@ class SignalHandlingProcess(Process):
for file_name, file_pid, file_mach_id in file_del_list:
try:
dbg_msg = f"DGM _handle_signals file_pid '{file_pid}', file_mach_id '{file_mach_id}', removing file name '{file_name}'"
log.warning(dbg_msg)
os.remove(file_name)
except OSError as exc:
if exc.errno == errno.ENOENT:
@ -1334,8 +1194,6 @@ def default_signals(*signals):
"""
Temporarily restore signals to their default values.
"""
dbg_msg = f"DGM default_signals entry, signals '{signals}'"
log.warning(dbg_msg)
old_signals = {}
for signum in signals:
try:
@ -1345,7 +1203,6 @@ def default_signals(*signals):
# This happens when a netapi module attempts to run a function
# using wheel_async, because the process trying to register signals
# will not be the main PID.
log.warning("DGM Failed to register signal for signum %d: %s", signum, exc)
log.trace("Failed to register signal for signum %d: %s", signum, exc)
else:
old_signals[signum] = saved_signal
@ -1355,8 +1212,6 @@ def default_signals(*signals):
yield
finally:
# Restore signals
dbg_msg = f"DGM default_signals entry, restoring old signals '{old_signals}'"
log.warning(dbg_msg)
for signum in old_signals:
signal.signal(signum, old_signals[signum])

View file

@ -4,6 +4,7 @@ any remotes.
"""
import logging
import os
import pathlib
import signal
import tempfile
@ -23,14 +24,7 @@ from salt.utils.immutabletypes import freeze
from salt.utils.verify import verify_env
from tests.support.runtests import RUNTIME_VARS
# import multiprocessing
log = logging.getLogger(__name__)
## logger = multiprocessing.log_to_stderr()
## logger.setLevel(logging.INFO)
## ## log = logger.getLogger(__name__)
## log = logger.getLogger()
@pytest.fixture(scope="session", autouse=True)
@ -327,25 +321,16 @@ class TestGitBase(AdaptedConfigurationTestCaseMixin):
)
def init_remote(self):
dbg_msg = f"DGM MockedProvider init_remote tmp_name '{tmp_name}'"
log.debug(dbg_msg)
self.gitdir = salt.utils.path.join(tmp_name, ".git")
dbg_msg = f"DGM MockedProvider init_remote gitdir '{self.gitdir}'"
log.debug(dbg_msg)
self.repo = True
new = False
return new
def envs(self):
dgm_test_base = ["base"]
dbg_msg = f"DGM MockedProvider env base '{dgm_test_base}'"
log.debug(dbg_msg)
return ["base"]
def _fetch(self):
self.fetched = True
dbg_msg = f"DGM MockedProvider _fetch self.fetched '{self.fetched}'"
log.debug(dbg_msg)
# Clear the instance map so that we make sure to create a new instance
# for this test class.
@ -482,25 +467,37 @@ def test_git_provider_mp_gen_lock(main_class, caplog):
assert test_msg3 in caplog.text
def process_kill_test(main_class):
class KillProcessTest(salt.utils.process.SignalHandlingProcess):
"""
Process to obtain a lock and hold it,
which will then be given a SIGTERM to ensure clean up of resources for the lock
Check that lock is obtained and then it should be released by SIGTERM checks
Test process for which to kill and check lock resources are cleaned up
"""
log.debug("DGM process_kill_test entry pid, '{os.getpid()}'")
provider = main_class.remotes[0]
provider.lock()
log.debug("DGM process_kill_test obtained lock")
def __init__(self, provider, **kwargs):
super().__init__(**kwargs)
self.provider = provider
self.opts = provider.opts
self.threads = {}
# check that lock has been released
assert provider._master_lock.acquire(timeout=5)
log.debug("DGM process_kill_test tested assert masterlock acquire")
def run(self):
"""
Start the test process to kill
"""
log.debug("DGM kill_test_process entry pid %s", os.getpid())
time.sleep(20) # give time for kill by sigterm
log.debug("DGM process_kill_test exit")
## provider = main_class.remotes[0]
self.provider.lock()
log.debug("DGM kill_test_process obtained lock")
# check that lock has been released
assert self.provider._master_lock.acquire(timeout=5)
log.debug("DGM kill_test_process tested assert masterlock acquire")
while True:
tsleep = 1
time.sleep(tsleep) # give time for kill by sigterm
log.debug("DGM kill_test_process exit")
@pytest.mark.slow_test
@ -514,43 +511,18 @@ def test_git_provider_sigterm_cleanup(main_class, caplog):
provider = main_class.remotes[0]
## DGM find lock file location
## provider.lock()
## file_name = provider._get_lock_file("update")
## log.debug(f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}'")
## proc = multiprocessing.Process(target=process_kill_test)
## procmgr = salt.utils.process.ProcessManager(wait_for_kill=30)
## proc = procmgr.add_process(process_kill_test, args=(main_class,), name="test_kill")
## proc.start()
# Reset signals to default ones before adding processes to the process
# manager. We don't want the processes being started to inherit those
# signal handlers
log.debug("DGM test_git_provider_sigterm_cleanup, get procmgn and add process")
with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM):
procmgr = salt.utils.process.ProcessManager(wait_for_kill=5)
proc = procmgr.add_process(
process_kill_test, args=(main_class,), name="test_kill"
)
# Install the SIGINT/SIGTERM handlers if not done so far
if signal.getsignal(signal.SIGINT) is signal.SIG_DFL:
# No custom signal handling was added, install our own
signal.signal(signal.SIGINT, procmgr._handle_signals)
if signal.getsignal(signal.SIGTERM) is signal.SIG_DFL:
# No custom signal handling was added, install our own
signal.signal(signal.SIGTERM, procmgr._handle_signals)
procmgr = salt.utils.process.ProcessManager(wait_for_kill=1)
proc = procmgr.add_process(KillProcessTest, args=(provider,), name="test_kill")
log.debug("DGM test_git_provider_sigterm_cleanup, check if process is alive")
while not proc.is_alive():
dbg_msg = "DGM test_git_provider_sigterm_cleanup sleeping waiting for child process to become alive"
log.debug(dbg_msg)
time.sleep(1) # give some time for it to be started
# child process should be alive
dbg_msg = f"DGM test_git_provider_sigterm_cleanup child process is alive with pid '{proc.pid}'"
log.debug(dbg_msg)
procmgr.run()
# child process should be alive
file_name = provider._get_lock_file("update")
dbg_msg = f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}'"
log.debug(dbg_msg)
@ -572,13 +544,7 @@ def test_git_provider_sigterm_cleanup(main_class, caplog):
log.debug(dbg_msg)
test_file_exits = pathlib.Path(file_name).exists()
dbg_msg = f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', does it exist anymore '{test_file_exits}'"
log.debug(dbg_msg)
assert not pathlib.Path(file_name).exists()
dbg_msg = f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', does NOT exist anymore"
log.debug(dbg_msg)
log.debug("DGM test_git_provider_sigterm_cleanup exit")