Revised finalization of processes

This commit is contained in:
David Murphy 2024-05-22 10:59:21 -06:00 committed by Daniel Wozniak
parent 8198500f51
commit 9bb92ed23d
3 changed files with 212 additions and 52 deletions

View file

@ -33,6 +33,7 @@ import salt.utils.hashutils
import salt.utils.itertools
import salt.utils.path
import salt.utils.platform
import salt.utils.process
import salt.utils.stringutils
import salt.utils.url
import salt.utils.user
@ -43,7 +44,6 @@ from salt.exceptions import FileserverConfigError, GitLockError, get_error_messa
from salt.utils.event import tagify
from salt.utils.odict import OrderedDict
from salt.utils.platform import get_machine_identifier as _get_machine_identifier
from salt.utils.process import os_is_running as pid_exists
from salt.utils.versions import Version
VALID_REF_TYPES = _DEFAULT_MASTER_OPTS["gitfs_ref_types"]
@ -258,6 +258,8 @@ class GitProvider:
def _val_cb(x, y):
return str(y)
## DGM print(f"DGM class GitProvider dunder init, opts '{opts}'", flush=True)
# get machine_identifier
self.mach_id = _get_machine_identifier().get(
"machine_id", "no_machine_id_available"
@ -533,25 +535,29 @@ class GitProvider:
f"DGM class GitProvider dunder init, cur_pid '{cur_pid}', process '{process}'"
)
print(
f"DGM class GitProvider dunder init, cur_pid '{cur_pid}', process '{process}', process dir '{dgm_process_dir}'"
f"DGM class GitProvider dunder init, cur_pid '{cur_pid}', process '{process}', process dir '{dgm_process_dir}'",
flush=True,
)
if isinstance(process, salt.utils.process.Process):
cache_dir = self.opts.get("cachedir", None)
gitfs_active = self.opts.get("gitfs_remotes", None)
cache_dir = self.opts.get("cachedir", None)
gitfs_active = self.opts.get("gitfs_remotes", None)
log.warning(
f"DGM class GitProvider dunder init, cache_dir '{cache_dir}', gitfs_active '{gitfs_active}'"
)
print(
f"DGM class GitProvider dunder init, cache_dir '{cache_dir}', gitfs_active '{gitfs_active}'",
flush=True,
)
if cache_dir and gitfs_active:
log.warning(
f"DGM class GitProvider dunder init, cache_dir '{cache_dir}', gitfs_active '{gitfs_active}'"
f"DGM class GitProvider registering gitfs_zombie_cleanup with cache_dir '{cache_dir}'"
)
print(
f"DGM class GitProvider dunder init, cache_dir '{cache_dir}', gitfs_active '{gitfs_active}'"
f"DGM class GitProvider registering gitfs_zombie_cleanup with cache_dir '{cache_dir}'",
flush=True,
)
salt.utils.process.register_cleanup_zombie_function(
gitfs_zombie_cleanup, cache_dir
)
if cache_dir and gitfs_active:
log.warning(
f"DGM class GitProvider registering gitfs_zombie_cleanup with cache_dir '{cache_dir}'"
)
print(
f"DGM class GitProvider registering gitfs_zombie_cleanup with cache_dir '{cache_dir}'"
)
process.register_finalize_method(gitfs_zombie_cleanup, cache_dir)
def get_cache_basehash(self):
return self._cache_basehash
@ -1028,7 +1034,7 @@ class GitProvider:
if self.mach_id or mach_id:
msg += f" for machine_id {mach_id}, current machine_id {self.mach_id}"
if not pid_exists(pid):
if not salt.utils.process.os_is_running(pid):
if self.mach_id != mach_id:
msg += (
" but this process is not running. The "
@ -1058,7 +1064,7 @@ class GitProvider:
if failhard:
raise
return
elif pid and pid_exists(pid):
elif pid and salt.utils.process.os_is_running(pid):
log.warning(
"Process %d has a %s %s lock (%s) on machine_id %s",
pid,
@ -3680,11 +3686,17 @@ def gitfs_zombie_cleanup(cache_dir):
log.warning(
f"DGM class GitProvider gitfs_zombie_cleanup entry, cache_dir '{cache_dir}'"
)
print(f"DGM class GitProvider gitfs_zombie_cleanup entry, cache_dir '{cache_dir}'")
print(
f"DGM class GitProvider gitfs_zombie_cleanup entry, cache_dir '{cache_dir}'",
flush=True,
)
cur_pid = os.getpid()
mach_id = _get_machine_identifier().get("machine_id", "no_machine_id_available")
log.debug("exiting for process id %s and machine identifer %s", cur_pid, mach_id)
print(f"exiting for process id '{cur_pid}' and machine identifer '{mach_id}'")
print(
f"DGM exiting for process id '{cur_pid}' and machine identifer '{mach_id}'",
flush=True,
)
# need to clean up any resources left around like lock files if using gitfs
# example: lockfile

View file

@ -51,6 +51,9 @@ try:
except ImportError:
HAS_SETPROCTITLE = False
## DGM
_INTERNAL_PROCESS_ZOMBIE_LIST = []
def appendproctitle(name):
"""
@ -530,6 +533,9 @@ class ProcessManager:
target=tgt, args=args, kwargs=kwargs, name=name or tgt.__qualname__
)
## DGM try cleaning up call
process.register_finalize_method(cleanup_zombie_process, args, kwargs)
if isinstance(process, SignalHandlingProcess):
with default_signals(signal.SIGINT, signal.SIGTERM):
process.start()
@ -684,6 +690,7 @@ class ProcessManager:
else:
for pid, p_map in self._process_map.copy().items():
log.trace("Terminating pid %s: %s", pid, p_map["Process"])
print("Terminating pid %s: %s", pid, p_map["Process"])
if args:
# escalate the signal to the process
try:
@ -706,9 +713,11 @@ class ProcessManager:
end_time = time.time() + self.wait_for_kill # when to die
log.trace("Waiting to kill process manager children")
print("Waiting to kill process manager children")
while self._process_map and time.time() < end_time:
for pid, p_map in self._process_map.copy().items():
log.trace("Joining pid %s: %s", pid, p_map["Process"])
print("Joining pid %s: %s", pid, p_map["Process"])
p_map["Process"].join(0)
if not p_map["Process"].is_alive():
@ -757,7 +766,15 @@ class ProcessManager:
for (k, v) in self._process_map.items()
),
)
print(
"Some processes failed to respect the KILL signal: %s",
"; ".join(
"Process: {} (Pid: {})".format(v["Process"], k)
for (k, v) in self._process_map.items()
),
)
log.info("kill_children retries left: %s", available_retries)
print("kill_children retries left: %s", available_retries)
kwargs["retry"] = available_retries - 1
return self.kill_children(*args, **kwargs)
else:
@ -771,18 +788,34 @@ class ProcessManager:
) in self._process_map.items()
),
)
print(
"Failed to kill the following processes: %s",
"; ".join(
"Process: {} (Pid: {})".format(v["Process"], k)
for (
k,
v,
) in self._process_map.items()
),
)
log.warning(
"Salt will either fail to terminate now or leave some "
"zombie processes behind"
)
print(
"Salt will either fail to terminate now or leave some "
"zombie processes behind"
)
def terminate(self):
"""
Properly terminate this process manager instance
"""
print("DGM class ProcessManager terminate entry", flush=True)
self.stop_restarting()
self.send_signal_to_processes(signal.SIGTERM)
self.kill_children()
print("DGM class ProcessManager terminate exit", flush=True)
def _handle_signals(self, *args, **kwargs):
# first lets reset signal handlers to default one to prevent running this twice
@ -1023,7 +1056,8 @@ class Process(multiprocessing.Process):
f"DGM class Process wrapped_run_func, method '{method}', args '{args}', kwargs '{kwargs}'"
)
print(
f"DGM class Process wrapped_run_func, method '{method}', args '{args}', kwargs '{kwargs}'"
f"DGM class Process wrapped_run_func, method '{method}', args '{args}', kwargs '{kwargs}'",
flush=True,
)
try:
method(*args, **kwargs)
@ -1062,7 +1096,8 @@ class Process(multiprocessing.Process):
f"DGM class Process register_finalize_method entry, function '{function}', args '{args}', kwargs '{kwargs}'"
)
print(
f"DGM class Process register_finalize_method entry, function '{function}', args '{args}', kwargs '{kwargs}'"
f"DGM class Process register_finalize_method entry, function '{function}', args '{args}', kwargs '{kwargs}'",
flush=True,
)
finalize_method_tuple = (function, args, kwargs)
if finalize_method_tuple not in self._finalize_methods:
@ -1070,7 +1105,8 @@ class Process(multiprocessing.Process):
f"DGM register_finalize_method, appending tuple finalize_method_tuple '{finalize_method_tuple}'"
)
print(
f"DGM register_finalize_method, appending tuple finalize_method_tuple '{finalize_method_tuple}'"
f"DGM register_finalize_method, appending tuple finalize_method_tuple '{finalize_method_tuple}'",
flush=True,
)
self._finalize_methods.append(finalize_method_tuple)
@ -1098,6 +1134,8 @@ class SignalHandlingProcess(Process):
msg += ". Exiting"
log.debug(msg)
print(f"DGM class SignalHandlingProcess, _handle_signals {msg}", flush=True)
## DGM mach_id = _get_machine_identifier().get("machine_id", "no_machine_id_available")
## DGM log.debug(
## DGM "exiting for process id %s and machine identifer %s", os.getpid(), mach_id
@ -1105,6 +1143,36 @@ class SignalHandlingProcess(Process):
## DGM
## DGM cur_pid = os.getpid()
# Run any registered process finalization routines
print(
"DGM class SignalHandlingProcess, attempt to print out _finalize_methods",
flush=True,
)
for method, args, kwargs in self._finalize_methods:
# pylint: disable=logging-fstring-interpolation
log.warning(
f"DGM class SignalHandlingProcess, method '{method}', args '{args}', kwargs '{kwargs}'"
)
print(
f"DGM class SignalHandlingProcess, method '{method}', args '{args}', kwargs '{kwargs}', flush=True"
)
try:
method(*args, **kwargs)
except Exception: # pylint: disable=broad-except
log.exception(
"Failed to run finalize callback on %s; method=%r; args=%r; and kwargs=%r",
self,
method,
args,
kwargs,
)
continue
print(
"DGM class SignalHandlingProcess, done to print out _finalize_methods",
flush=True,
)
if HAS_PSUTIL:
try:
process = psutil.Process(os.getpid())
@ -1292,3 +1360,72 @@ class SubprocessList:
self.processes.remove(proc)
self.count -= 1
log.debug("Subprocess %s cleaned up", proc.name)
def cleanup_zombie_process(*args, **kwargs):
"""
Generic process to allow for any registered process cleanup routines to execute.
While class Process has a register_finalize_method, when a process is looked up by pid
using psutil.Process, there is no method available to register a cleanup process.
Hence, this function is added as part of the add_process to allow usage of other cleanup processes
which cannot be added by the register_finalize_method.
"""
print("\nDGM cleanup_zombie_process entry\n", flush=True)
# Run any register process cleanup routines
for method, args, kwargs in _INTERNAL_PROCESS_ZOMBIE_LIST:
# pylint: disable=logging-fstring-interpolation
log.warning(
f"DGM cleanup_zombie_process, method '{method}', args '{args}', kwargs '{kwargs}'"
)
print(
f"DGM cleanup_zombie_process, method '{method}', args '{args}', kwargs '{kwargs}'",
flush=True,
)
try:
method(*args, **kwargs)
except Exception: # pylint: disable=broad-except
log.exception(
"Failed to run registered function finalize callback; method=%r; args=%r; and kwargs=%r",
method,
args,
kwargs,
)
continue
print("\nDGM cleanup_zombie_process exit\n", flush=True)
def register_cleanup_zombie_function(function, *args, **kwargs):
"""
Register a function to run as process terminates
While class Process has a register_finalize_method, when a process is looked up by pid
using psutil.Process, there is no method available to register a cleanup process.
Hence, this function can be used to register a function to allow cleanup processes
which cannot be added by the register_finalize_method.
Note: there is no deletion, since it is assummed that if something is registered, it will continue to be used
"""
# pylint: disable=logging-fstring-interpolation
log.warning(
f"DGM register_cleanup_zombie_function entry, function '{function}', args '{args}', kwargs '{kwargs}'"
)
print(
f"DGM register_cleanup_zombie_function entry, function '{function}', args '{args}', kwargs '{kwargs}'",
flush=True,
)
finalize_function_tuple = (function, args, kwargs)
if finalize_function_tuple not in _INTERNAL_PROCESS_ZOMBIE_LIST:
log.warning(
f"DGM register_cleanup_zombie_function, appending tuple finalize_function_tuple '{finalize_function_tuple}'"
)
print(
f"DGM register_cleanup_zombie_function, appending tuple finalize_function_tuple '{finalize_function_tuple}'",
flush=True,
)
_INTERNAL_PROCESS_ZOMBIE_LIST.append(finalize_function_tuple)

View file

@ -9,7 +9,7 @@ import pathlib
import signal
import time
import psutil
## import psutil
import pytest
from saltfactories.utils import random_string
@ -552,6 +552,9 @@ class KillProcessTest(salt.utils.process.SignalHandlingProcess):
lockfile = self.provider._get_lock_file()
log.debug("KillProcessTest acquried lock file %s", lockfile)
killtest_pid = os.getpid()
print(f"KillProcessTest pid '{killtest_pid}', acquried lock file '{lockfile}'")
# check that lock has been released
assert self.provider._master_lock.acquire(timeout=5)
@ -571,7 +574,7 @@ def test_git_provider_sigterm_cleanup(
then kill the process via SIGTERM and ensure locked resources are cleaned up
"""
log.warning("DGM test_git_provider_sigterm_cleanup entry")
print("DGM test_git_provider_sigterm_cleanup entry")
print("DGM test_git_provider_sigterm_cleanup entry", flush=True)
provider = main_class.remotes[0]
@ -585,40 +588,45 @@ def test_git_provider_sigterm_cleanup(
log.warning(
f"DGM test_git_provider_sigterm_cleanup, post add_process, proc '{proc}'"
)
print(f"DGM test_git_provider_sigterm_cleanup, post add_process, proc '{proc}'")
print("DGM test area entry")
## DGM print("DGM test area entry\n\n\n\n", flush=True)
dgm_proc_dir = dir(proc)
print(
f"DGM test_git_provider_sigterm_cleanup, post add_process, proc '{proc}', proc dir '{dgm_proc_dir}'"
)
## DGM dgm_proc_dir = dir(proc)
## DGM print(f"DGM test_git_provider_sigterm_cleanup, post add_process, proc '{proc}', proc dir '{dgm_proc_dir}'", flush=True)
dgm_pid = proc.pid
print(f"DGM test_git_provider_sigterm_cleanup, proc pid '{proc.pid}'")
## DGM dgm_pid = proc.pid
## DGM print(f"DGM test_git_provider_sigterm_cleanup, proc pid '{proc.pid}'", flush=True)
dgm_process = psutil.Process(dgm_pid)
print(f"DGM test_git_provider_sigterm_cleanup, psutil process '{dgm_process}'")
## DGM dgm_process = psutil.Process(dgm_pid)
## DGM print(f"DGM test_git_provider_sigterm_cleanup, psutil process '{dgm_process}'", flush=True)
if isinstance(proc, salt.utils.process.Process):
print(
"DGM test_git_provider_sigterm_cleanup, proc isinstance salt utils process Process is TRUE"
)
else:
print(
"DGM test_git_provider_sigterm_cleanup, proc isinstance salt utils process Process is FALSE"
)
## DGM dgm_process_dir = dir(dgm_process)
## DGM print(f"DGM test_git_provider_sigterm_cleanup, psutil process '{dgm_process}', process dir '{dgm_process_dir}'", flush=True)
## DGM ## DGM print(f"DGM test_git_provider_sigterm_cleanup, checking values psutil process '{dgm_process}', pid '{dgm_process.pid}', name '{dgm_process.name()}', username '{dgm_process.username()}', as_dict '{dgm_process.as_dict()}', cmdline '{dgm_process.cmdline()}'", flush=True)
## DGM print(f"DGM test_git_provider_sigterm_cleanup, checking values psutil process '{dgm_process}', pid '{dgm_process.pid}', ppid '{dgm_process.ppid}', name '{dgm_process.name()}', username '{dgm_process.username()}', cmdline '{dgm_process.cmdline()}'", flush=True)
if isinstance(dgm_process, salt.utils.process.Process):
print(
"DGM test_git_provider_sigterm_cleanup, process isinstance salt utils process Process is TRUE"
)
else:
print(
"DGM test_git_provider_sigterm_cleanup, process isinstance salt utils process Process is FALSE"
)
## DGM dgm_dict = dgm_process.as_dict()
## DGM dgm_process_parent_pid = dgm_dict["ppid"]
## DGM dgm_process_parent = psutil.Process(dgm_process_parent_pid)
## DGM dgm_process_parent_dir = dir(dgm_process_parent)
## DGM print(f"DGM test_git_provider_sigterm_cleanup, parent pid '{dgm_process_parent_pid}' psutil process '{dgm_process_parent}', name '{dgm_process_parent.name()}', cmdline '{dgm_process_parent.cmdline()}', dir '{dgm_process_parent_dir}'", flush=True)
print("DGM test area exit")
## DGM if isinstance(proc, salt.utils.process.Process):
## DGM print("DGM test_git_provider_sigterm_cleanup, proc isinstance salt utils process Process is TRUE", flush=True)
## DGM else:
## DGM print("DGM test_git_provider_sigterm_cleanup, proc isinstance salt utils process Process is FALSE", flush=True)
## DGM if isinstance(dgm_process, salt.utils.process.Process):
## DGM print("DGM test_git_provider_sigterm_cleanup, process isinstance salt utils process Process is TRUE", flush=True)
## DGM else:
## DGM print("DGM test_git_provider_sigterm_cleanup, process isinstance salt utils process Process is FALSE", flush=True)
## DGM if isinstance(dgm_process_parent, salt.utils.process.ProcessManager):
## DGM print("DGM test_git_provider_sigterm_cleanup, process isinstance salt utils process ProcessManager is TRUE", flush=True)
## DGM else:
## DGM print("DGM test_git_provider_sigterm_cleanup, process isinstance salt utils process ProcessManager is FALSE", flush=True)
## DGM print("DGM test area exit\n\n\n\n", flush=True)
procmgr.run(asynchronous=True)
@ -628,14 +636,17 @@ def test_git_provider_sigterm_cleanup(
file_name = provider._get_lock_file("update")
log.warning(f"DGM test_git_provider_sigterm_cleanup, file_name '{file_name}'")
print(f"DGM test_git_provider_sigterm_cleanup, file_name '{file_name}'")
print(f"DGM test_git_provider_sigterm_cleanup, file_name '{file_name}'", flush=True)
assert pathlib.Path(file_name).exists()
assert pathlib.Path(file_name).is_file()
print("DGM test_git_provider_sigterm_cleanup, terminate procmgr start", flush=True)
procmgr.terminate() # sends a SIGTERM
time.sleep(2) # give some time for it to terminate
print("DGM test_git_provider_sigterm_cleanup, terminate procmgr exit", flush=True)
assert not proc.is_alive()
assert not pathlib.Path(file_name).exists()