From 2265376154d63ea8683af415b371b6cda4d12d5c Mon Sep 17 00:00:00 2001 From: David Murphy < dmurphy@saltstack.com> Date: Tue, 5 Mar 2024 11:59:13 -0700 Subject: [PATCH] Working test for SIGTERM of held lock --- salt/utils/gitfs.py | 52 ++++--- salt/utils/process.py | 155 +------------------ tests/pytests/unit/utils/test_gitfs_locks.py | 100 ++++-------- 3 files changed, 72 insertions(+), 235 deletions(-) diff --git a/salt/utils/gitfs.py b/salt/utils/gitfs.py index 84546c24b46..3f970fe7fc3 100644 --- a/salt/utils/gitfs.py +++ b/salt/utils/gitfs.py @@ -949,6 +949,7 @@ class GitProvider: os.write(fh_, salt.utils.stringutils.to_bytes(str(os.getpid()))) os.write(fh_, salt.utils.stringutils.to_bytes("\n")) os.write(fh_, salt.utils.stringutils.to_bytes(str(self.mach_id))) + os.write(fh_, salt.utils.stringutils.to_bytes("\n")) except OSError as exc: if exc.errno == errno.EEXIST: @@ -957,40 +958,57 @@ class GitProvider: pid = int( salt.utils.stringutils.to_unicode(fd_.readline()).rstrip() ) - mach_id = int( - salt.utils.stringutils.to_unicode(fd_.readline()).rstrip() - ) except ValueError: # Lock file is empty, set pid to 0 so it evaluates as # False. pid = 0 + try: + mach_id = salt.utils.stringutils.to_unicode( + fd_.readline() + ).rstrip() + except ValueError as exc: + # Lock file is empty, set machine id to 0 so it evaluates as + # False. mach_id = 0 + global_lock_key = self.role + "_global_lock" lock_file = self._get_lock_file(lock_type=lock_type) if self.opts[global_lock_key]: msg = ( f"{global_lock_key} is enabled and {lock_type} lockfile {lock_file} is present for " - f"{self.role} remote '{self.id}' on machine_id {self.mach_id}." + f"{self.role} remote '{self.id}' on machine_id {self.mach_id} with pid '{pid}'." ) if pid: msg += f" Process {pid} obtained the lock" if self.mach_id or mach_id: - msg += f" Process {pid} obtained the lock for machine_id {mach_id}, current machine_id {self.mach_id}" - else: - msg += f" Process {pid} obtained the lock" + msg += f" for machine_id {mach_id}, current machine_id {self.mach_id}" if not pid_exists(pid): - msg += ( - " but this process is not running. The " - "update may have been interrupted. If " - "using multi-master with shared gitfs " - "cache, the lock may have been obtained " - "by another master" - ) if self.mach_id != mach_id: - msg += f", with machine_id {mach_id}" + msg += ( + " but this process is not running. The " + "update may have been interrupted. If " + "using multi-master with shared gitfs " + "cache, the lock may have been obtained " + "by another master, with machine_id {mach_id}" + ) else: - msg += "." + msg += ( + " but this process is not running. The " + "update may have been interrupted. " + " Given this process is for the same machine " + " the lock will be reallocated to new process " + ) + log.warning(msg) + success, fail = self._clear_lock() + if success: + return self.__lock( + lock_type="update", failhard=failhard + ) + elif failhard: + raise + return + log.warning(msg) if failhard: raise @@ -1041,8 +1059,6 @@ class GitProvider: contextmanager here because the lock is meant to stay and not be automatically removed. """ - dbg_msg = f"DGM GitProvider lock entry, pid '{os.getpid()}'" - log.warning(dbg_msg) success = [] failed = [] try: diff --git a/salt/utils/process.py b/salt/utils/process.py index aed2633d454..b49a907ee93 100644 --- a/salt/utils/process.py +++ b/salt/utils/process.py @@ -514,8 +514,6 @@ class ProcessManager: This will deterimine if it is a Process class, otherwise it assumes it is a function """ - dbg_msg = f"DGM process add_process entry, tgt '{tgt}', args '{args}', kwargs '{kwargs}', name '{name}'" - log.warning(dbg_msg) if args is None: args = [] if kwargs is None: @@ -531,14 +529,8 @@ class ProcessManager: if isinstance(process, SignalHandlingProcess): with default_signals(signal.SIGINT, signal.SIGTERM): - log.warning( - "DGM process add_process with default_signals, process start" - ) process.start() else: - log.warning( - "DGM process add_process without default_signals, process start" - ) process.start() log.debug("Started '%s' with pid %s", process.name, process.pid) @@ -587,9 +579,6 @@ class ProcessManager: self._restart_processes = False def send_signal_to_processes(self, signal_): - dbg_msg = f"DGM process send_signal_to_processes signal '{signal_}'" - log.warning(dbg_msg) - if salt.utils.platform.is_windows() and signal_ in ( signal.SIGTERM, signal.SIGINT, @@ -608,14 +597,8 @@ class ProcessManager: for pid in self._process_map.copy(): try: - dbg_msg = f"DGM process send_signal_to_processes kill pid '{pid}', signal '{signal_}'" - log.warning(dbg_msg) os.kill(pid, signal_) - dbg_msg = f"DGM process sent_signal_to_processes os.kill pid '{pid}', signal '{signal_}'" - log.warning(dbg_msg) except OSError as exc: - dbg_msg = f"DGM process send_signal_to_processes OSError exc, '{exc}'" - log.warning(dbg_msg) if exc.errno not in (errno.ESRCH, errno.EACCES): # If it's not a "No such process" error, raise it raise @@ -677,7 +660,6 @@ class ProcessManager: """ Kill all of the children """ - log.warning("DGM process kill_children entry") if salt.utils.platform.is_windows(): if multiprocessing.current_process().name != "MainProcess": # Since the main process will kill subprocesses by tree, @@ -698,9 +680,6 @@ class ProcessManager: p_map["Process"].terminate() else: for pid, p_map in self._process_map.copy().items(): - dgm_p_map = p_map["Process"] - dgm_msg = f"DGM process kill_children Terminating pid '{pid}': '{dgm_p_map}', args '{args}'" - log.warning(dgm_msg) log.trace("Terminating pid %s: %s", pid, p_map["Process"]) if args: # escalate the signal to the process @@ -711,98 +690,6 @@ class ProcessManager: try: p_map["Process"].terminate() - # need to go through and clean up any resources left around like lock files if using gitfs - # example lockfile /var/cache/salt/master/gitfs/work/NlJQs6Pss_07AugikCrmqfmqEFrfPbCDBqGLBiCd3oU=/_/update.lk - mach_id = salt.utils.files.get_machine_identifier() - ## cache_dir = self.opts.get("cachedir", None) - ## gitfs_active = self.opts.get("gitfs_remotes", None) - cache_dir = "/tmp" - gitfs_active = True - terminate_pid = pid - dbg_msg = f"DGM process kill_children, cache_dir '{cache_dir}', gitfs_active '{gitfs_active}'" - log.warning(dbg_msg) - if cache_dir and gitfs_active: - # check for gitfs file locks to ensure no resource leaks - # last chance to clean up any missed unlock droppings - ## cache_dir = Path(cache_dir + "/gitfs/work") - cache_dir = Path(cache_dir) - dbg_msg = f"DGM process kill_children ,find for final cache_dir '{cache_dir}'" - log.warning(dbg_msg) - if cache_dir.exists and cache_dir.is_dir(): - file_list = list(cache_dir.glob("**/*.lk")) - dbg_msg = f"DGM process kill_children ,find for final cache_dir '{cache_dir}', produced glob file_list '{file_list}'" - log.warning(dbg_msg) - file_del_list = [] - - try: - for file_name in file_list: - dbg_msg = f"DGM process kill_children , checking file name '{file_name}'" - log.warning(dbg_msg) - with salt.utils.files.fopen(file_name, "r") as fd_: - try: - file_pid = int( - salt.utils.stringutils.to_unicode( - fd_.readline() - ).rstrip() - ) - except ValueError: - # Lock file is empty, set pid to 0 so it evaluates as False. - file_pid = 0 - try: - file_mach_id = int( - salt.utils.stringutils.to_unicode( - fd_.readline() - ).rstrip() - ) - except ValueError: - # Lock file is empty, set mach_id to 0 so it evaluates as False. - file_mach_id = 0 - - dbg_msg = f"DGM process kill_children , terminate_pid '{terminate_pid}', mach_id '{mach_id}', file_pid '{file_pid}', file_mach_id '{file_mach_id}'" - log.warning(dbg_msg) - if terminate_pid == file_pid: - if mach_id != file_mach_id: - if not file_mach_id: - msg = f"gitfs lock file for pid '{file_pid}' does not contain a machine id, deleting lock file which may affect if using multi-master with shared gitfs cache, the lock may have been obtained by another master recommend updating Salt version on other masters to a version which insert machine identification in lock a file." - log.debug(msg) - file_del_list.append( - (file_name, file_pid, file_mach_id) - ) - else: - file_del_list.append( - (file_name, file_pid, file_mach_id) - ) - - except FileNotFoundError: - log.debug("gitfs lock file: %s not found", file_name) - - for file_name, file_pid, file_mach_id in file_del_list: - try: - dbg_msg = f"DGM process kill_children file_pid '{file_pid}', file_mach_id '{file_mach_id}', removing file name '{file_name}'" - log.warning(dbg_msg) - os.remove(file_name) - except OSError as exc: - if exc.errno == errno.ENOENT: - # No lock file present - msg = f"SIGTERM clean up of resources attempted to remove lock file {file_name}, pid '{file_pid}', machine identifier '{mach_id}' but it did not exist, exception : {exc} " - log.debug(msg) - - elif exc.errno == errno.EISDIR: - # Somehow this path is a directory. Should never happen - # unless some wiseguy manually creates a directory at this - # path, but just in case, handle it. - try: - shutil.rmtree(file_name) - except OSError as exc: - msg = f"SIGTERM clean up of resources, lock file '{file_name}', pid '{file_pid}', machine identifier '{file_mach_id}' was a directory, removed directory, exception : '{exc}'" - log.debug(msg) - else: - msg = f"SIGTERM clean up of resources, unable to remove lock file '{file_name}', pid '{file_pid}', machine identifier '{file_mach_id}', exception : '{exc}'" - log.debug(msg) - else: - msg = f"SIGTERM clean up of resources, removed lock file '{file_name}', pid '{file_pid}', machine identifier '{file_mach_id}'" - log.debug(msg) - except OSError as exc: if exc.errno not in (errno.ESRCH, errno.EACCES): raise @@ -890,18 +777,11 @@ class ProcessManager: """ Properly terminate this process manager instance """ - log.warning("DGM process terminate entry") self.stop_restarting() - log.warning("DGM process terminate send signal SIGTERM") self.send_signal_to_processes(signal.SIGTERM) - log.warning("DGM process terminate kill children") self.kill_children() - log.warning("DGM process terminate exit") def _handle_signals(self, *args, **kwargs): - dbg_msg = f"DGM process _handle_signals args '{args}', kwargs '{kwargs}'" - log.warning(dbg_msg) - # first lets reset signal handlers to default one to prevent running this twice signal.signal(signal.SIGTERM, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -911,8 +791,6 @@ class ProcessManager: # check that this is the correct process, children inherit this # handler, if we are in a child lets just run the original handler - dbg_msg = f"DGM process _handle_signals os.getpid '{os.getpid()}', self pid '{self._pid}'" - log.warning(dbg_msg) if os.getpid() != self._pid: if callable(self._sigterm_handler): return self._sigterm_handler(*args) @@ -921,8 +799,6 @@ class ProcessManager: else: return - dbg_msg = f"DGM process _handle_signals call self.kill_children, args '{args}', kwargs '{kwargs}'" - log.warning(dbg_msg) # Terminate child processes self.kill_children(*args, **kwargs) @@ -1189,9 +1065,6 @@ class SignalHandlingProcess(Process): signal.signal(signal.SIGTERM, self._handle_signals) def _handle_signals(self, signum, sigframe): - dbg_msg = f"DGM SignalHandlingProcess _handle_signals, signum '{signum}', sigframe '{sigframe}'" - log.warning(dbg_msg) - signal.signal(signal.SIGTERM, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN) msg = f"{self.__class__.__name__} received a " @@ -1208,12 +1081,9 @@ class SignalHandlingProcess(Process): ) log.debug(dbg_msg) - dbg_msg = f"DGM _handle_signals about to check HAS_PSUTIL, for process id '{os.getpid()}' and machine identifer '{mach_id}'" - log.warning(dbg_msg) - + cur_pid = os.getpid() if HAS_PSUTIL: try: - cur_pid = os.getpid() process = psutil.Process(cur_pid) if hasattr(process, "children"): for child in process.children(recursive=True): @@ -1232,24 +1102,17 @@ class SignalHandlingProcess(Process): # example lockfile /var/cache/salt/master/gitfs/work/NlJQs6Pss_07AugikCrmqfmqEFrfPbCDBqGLBiCd3oU=/_/update.lk cache_dir = self.opts.get("cachedir", None) gitfs_active = self.opts.get("gitfs_remotes", None) - dbg_msg = f"DGM _handle_signals HAS_PSUTIL, cache_dir '{cache_dir}', gitfs_active '{gitfs_active}'" - log.warning(dbg_msg) if cache_dir and gitfs_active: # check for gitfs file locks to ensure no resource leaks # last chance to clean up any missed unlock droppings cache_dir = Path(cache_dir + "/gitfs/work") - dbg_msg = f"DGM _handle_signals HAS_PSUTIL,find for final cache_dir '{cache_dir}'" - log.warning(dbg_msg) if cache_dir.exists and cache_dir.is_dir(): file_list = list(cache_dir.glob("**/*.lk")) - dbg_msg = f"DGM _handle_signals HAS_PSUTIL,find for final cache_dir '{cache_dir}', produced glob file_list '{file_list}'" - log.warning(dbg_msg) file_del_list = [] - + file_pid = 0 + file_mach_id = 0 try: for file_name in file_list: - dbg_msg = f"DGM _handle_signals HAS_PSUTIL, checking file name '{file_name}'" - log.warning(dbg_msg) with salt.utils.files.fopen(file_name, "r") as fd_: try: file_pid = int( @@ -1261,7 +1124,7 @@ class SignalHandlingProcess(Process): # Lock file is empty, set pid to 0 so it evaluates as False. file_pid = 0 try: - file_mach_id = int( + file_mach_id = ( salt.utils.stringutils.to_unicode( fd_.readline() ).rstrip() @@ -1269,8 +1132,7 @@ class SignalHandlingProcess(Process): except ValueError: # Lock file is empty, set mach_id to 0 so it evaluates as False. file_mach_id = 0 - dbg_msg = f"DGM _handle_signals HAS_PSUTIL, cur_pid '{cur_pid}', mach_id '{mach_id}', file_pid '{file_pid}', file_mach_id '{file_mach_id}'" - log.warning(dbg_msg) + if cur_pid == file_pid: if mach_id != file_mach_id: if not file_mach_id: @@ -1289,8 +1151,6 @@ class SignalHandlingProcess(Process): for file_name, file_pid, file_mach_id in file_del_list: try: - dbg_msg = f"DGM _handle_signals file_pid '{file_pid}', file_mach_id '{file_mach_id}', removing file name '{file_name}'" - log.warning(dbg_msg) os.remove(file_name) except OSError as exc: if exc.errno == errno.ENOENT: @@ -1334,8 +1194,6 @@ def default_signals(*signals): """ Temporarily restore signals to their default values. """ - dbg_msg = f"DGM default_signals entry, signals '{signals}'" - log.warning(dbg_msg) old_signals = {} for signum in signals: try: @@ -1345,7 +1203,6 @@ def default_signals(*signals): # This happens when a netapi module attempts to run a function # using wheel_async, because the process trying to register signals # will not be the main PID. - log.warning("DGM Failed to register signal for signum %d: %s", signum, exc) log.trace("Failed to register signal for signum %d: %s", signum, exc) else: old_signals[signum] = saved_signal @@ -1355,8 +1212,6 @@ def default_signals(*signals): yield finally: # Restore signals - dbg_msg = f"DGM default_signals entry, restoring old signals '{old_signals}'" - log.warning(dbg_msg) for signum in old_signals: signal.signal(signum, old_signals[signum]) diff --git a/tests/pytests/unit/utils/test_gitfs_locks.py b/tests/pytests/unit/utils/test_gitfs_locks.py index 03508908e0e..1cf23d84ef7 100644 --- a/tests/pytests/unit/utils/test_gitfs_locks.py +++ b/tests/pytests/unit/utils/test_gitfs_locks.py @@ -4,6 +4,7 @@ any remotes. """ import logging +import os import pathlib import signal import tempfile @@ -23,14 +24,7 @@ from salt.utils.immutabletypes import freeze from salt.utils.verify import verify_env from tests.support.runtests import RUNTIME_VARS -# import multiprocessing - - log = logging.getLogger(__name__) -## logger = multiprocessing.log_to_stderr() -## logger.setLevel(logging.INFO) -## ## log = logger.getLogger(__name__) -## log = logger.getLogger() @pytest.fixture(scope="session", autouse=True) @@ -327,25 +321,16 @@ class TestGitBase(AdaptedConfigurationTestCaseMixin): ) def init_remote(self): - dbg_msg = f"DGM MockedProvider init_remote tmp_name '{tmp_name}'" - log.debug(dbg_msg) self.gitdir = salt.utils.path.join(tmp_name, ".git") - dbg_msg = f"DGM MockedProvider init_remote gitdir '{self.gitdir}'" - log.debug(dbg_msg) self.repo = True new = False return new def envs(self): - dgm_test_base = ["base"] - dbg_msg = f"DGM MockedProvider env base '{dgm_test_base}'" - log.debug(dbg_msg) return ["base"] def _fetch(self): self.fetched = True - dbg_msg = f"DGM MockedProvider _fetch self.fetched '{self.fetched}'" - log.debug(dbg_msg) # Clear the instance map so that we make sure to create a new instance # for this test class. @@ -482,25 +467,37 @@ def test_git_provider_mp_gen_lock(main_class, caplog): assert test_msg3 in caplog.text -def process_kill_test(main_class): +class KillProcessTest(salt.utils.process.SignalHandlingProcess): """ - Process to obtain a lock and hold it, - which will then be given a SIGTERM to ensure clean up of resources for the lock - - Check that lock is obtained and then it should be released by SIGTERM checks + Test process for which to kill and check lock resources are cleaned up """ - log.debug("DGM process_kill_test entry pid, '{os.getpid()}'") - provider = main_class.remotes[0] - provider.lock() - log.debug("DGM process_kill_test obtained lock") + def __init__(self, provider, **kwargs): + super().__init__(**kwargs) + self.provider = provider + self.opts = provider.opts + self.threads = {} - # check that lock has been released - assert provider._master_lock.acquire(timeout=5) - log.debug("DGM process_kill_test tested assert masterlock acquire") + def run(self): + """ + Start the test process to kill + """ + log.debug("DGM kill_test_process entry pid %s", os.getpid()) - time.sleep(20) # give time for kill by sigterm - log.debug("DGM process_kill_test exit") + ## provider = main_class.remotes[0] + self.provider.lock() + + log.debug("DGM kill_test_process obtained lock") + + # check that lock has been released + assert self.provider._master_lock.acquire(timeout=5) + log.debug("DGM kill_test_process tested assert masterlock acquire") + + while True: + tsleep = 1 + time.sleep(tsleep) # give time for kill by sigterm + + log.debug("DGM kill_test_process exit") @pytest.mark.slow_test @@ -514,43 +511,18 @@ def test_git_provider_sigterm_cleanup(main_class, caplog): provider = main_class.remotes[0] - ## DGM find lock file location - ## provider.lock() - ## file_name = provider._get_lock_file("update") - ## log.debug(f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}'") - - ## proc = multiprocessing.Process(target=process_kill_test) - ## procmgr = salt.utils.process.ProcessManager(wait_for_kill=30) - ## proc = procmgr.add_process(process_kill_test, args=(main_class,), name="test_kill") - ## proc.start() - - # Reset signals to default ones before adding processes to the process - # manager. We don't want the processes being started to inherit those - # signal handlers + log.debug("DGM test_git_provider_sigterm_cleanup, get procmgn and add process") with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM): - procmgr = salt.utils.process.ProcessManager(wait_for_kill=5) - proc = procmgr.add_process( - process_kill_test, args=(main_class,), name="test_kill" - ) - - # Install the SIGINT/SIGTERM handlers if not done so far - if signal.getsignal(signal.SIGINT) is signal.SIG_DFL: - # No custom signal handling was added, install our own - signal.signal(signal.SIGINT, procmgr._handle_signals) - - if signal.getsignal(signal.SIGTERM) is signal.SIG_DFL: - # No custom signal handling was added, install our own - signal.signal(signal.SIGTERM, procmgr._handle_signals) + procmgr = salt.utils.process.ProcessManager(wait_for_kill=1) + proc = procmgr.add_process(KillProcessTest, args=(provider,), name="test_kill") + log.debug("DGM test_git_provider_sigterm_cleanup, check if process is alive") while not proc.is_alive(): - dbg_msg = "DGM test_git_provider_sigterm_cleanup sleeping waiting for child process to become alive" - log.debug(dbg_msg) time.sleep(1) # give some time for it to be started - # child process should be alive - dbg_msg = f"DGM test_git_provider_sigterm_cleanup child process is alive with pid '{proc.pid}'" - log.debug(dbg_msg) + procmgr.run() + # child process should be alive file_name = provider._get_lock_file("update") dbg_msg = f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}'" log.debug(dbg_msg) @@ -572,13 +544,7 @@ def test_git_provider_sigterm_cleanup(main_class, caplog): log.debug(dbg_msg) test_file_exits = pathlib.Path(file_name).exists() - dbg_msg = f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', does it exist anymore '{test_file_exits}'" log.debug(dbg_msg) assert not pathlib.Path(file_name).exists() - - dbg_msg = f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', does NOT exist anymore" - log.debug(dbg_msg) - - log.debug("DGM test_git_provider_sigterm_cleanup exit")