WIP - Test kill cleanup of resources

This commit is contained in:
David Murphy 2024-02-27 16:58:55 -07:00 committed by Daniel Wozniak
parent ee55b9abe3
commit 443fdd3e34
2 changed files with 81 additions and 16 deletions

View file

@ -577,6 +577,7 @@ class ProcessManager:
self._restart_processes = False
def send_signal_to_processes(self, signal_):
log.warning(f"DGM process send_signal_to_processes signal '{signal_}'")
if salt.utils.platform.is_windows() and signal_ in (
signal.SIGTERM,
signal.SIGINT,
@ -595,8 +596,14 @@ class ProcessManager:
for pid in self._process_map.copy():
try:
log.warning(
f"DGM process send_signal_to_processes kill pid '{pid}', signal '{signal_}'"
)
os.kill(pid, signal_)
except OSError as exc:
log.warning(
f"DGM process send_signal_to_processes OSError exc, '{exc}'"
)
if exc.errno not in (errno.ESRCH, errno.EACCES):
# If it's not a "No such process" error, raise it
raise
@ -658,6 +665,7 @@ class ProcessManager:
"""
Kill all of the children
"""
log.warning("DGM process kill_children entry")
if salt.utils.platform.is_windows():
if multiprocessing.current_process().name != "MainProcess":
# Since the main process will kill subprocesses by tree,
@ -678,6 +686,7 @@ class ProcessManager:
p_map["Process"].terminate()
else:
for pid, p_map in self._process_map.copy().items():
log.warning("DGM Terminating pid %s: %s", pid, p_map["Process"])
log.trace("Terminating pid %s: %s", pid, p_map["Process"])
if args:
# escalate the signal to the process
@ -774,11 +783,17 @@ class ProcessManager:
"""
Properly terminate this process manager instance
"""
log.warning("DGM process terminate entry")
self.stop_restarting()
log.warning("DGM process terminate send signal SIGTERM")
self.send_signal_to_processes(signal.SIGTERM)
log.warning("DGM process terminate kill children")
self.kill_children()
log.warning("DGM process terminate exit")
def _handle_signals(self, *args, **kwargs):
log.warning(f"DGM process _handle_signals args '{args}', kwargs '{kwargs}'")
# first lets reset signal handlers to default one to prevent running this twice
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
@ -1062,6 +1077,10 @@ class SignalHandlingProcess(Process):
signal.signal(signal.SIGTERM, self._handle_signals)
def _handle_signals(self, signum, sigframe):
log.warning(
f"DGM SignalHandlingProcess _handle_signals, signum '{signum}', sigframe '{sigframe}'"
)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
msg = f"{self.__class__.__name__} received a "
@ -1077,6 +1096,10 @@ class SignalHandlingProcess(Process):
f"exiting for process id '{os.getpid()}' and machine identifer '{mach_id}'"
)
log.warning(
f"DGM _handle_signals about to check HAS_PSUTIL, for process id '{os.getpid()}' and machine identifer '{mach_id}'"
)
if HAS_PSUTIL:
try:
cur_pid = os.getpid()
@ -1098,16 +1121,28 @@ class SignalHandlingProcess(Process):
# example lockfile /var/cache/salt/master/gitfs/work/NlJQs6Pss_07AugikCrmqfmqEFrfPbCDBqGLBiCd3oU=/_/update.lk
cache_dir = self.opts.get("cachedir", None)
gitfs_active = self.opts.get("gitfs_remotes", None)
log.warning(
f"DGM _handle_signals HAS_PSUTIL, cache_dir '{cache_dir}', gitfs_active '{gitfs_active}'"
)
if cache_dir and gitfs_active:
# check for gitfs file locks to ensure no resource leaks
# last chance to clean up any missed unlock droppings
cache_dir = Path(cache_dir + "/gitfs/work")
log.warning(
f"DGM _handle_signals HAS_PSUTIL,find for final cache_dir '{cache_dir}'"
)
if cache_dir.exists and cache_dir.is_dir():
file_list = list(cache_dir.glob("**/*.lk"))
log.warning(
f"DGM _handle_signals HAS_PSUTIL,find for final cache_dir '{cache_dir}', produced glob file_list '{file_list}'"
)
file_del_list = []
try:
for file_name in file_list:
log.warning(
f"DGM _handle_signals HAS_PSUTIL, checking file name '{file_name}'"
)
with salt.utils.files.fopen(file_name, "r") as fd_:
try:
file_pid = int(
@ -1127,6 +1162,9 @@ class SignalHandlingProcess(Process):
except ValueError:
# Lock file is empty, set mach_id to 0 so it evaluates as False.
file_mach_id = 0
log.warning(
f"DGM _handle_signals HAS_PSUTIL, cur_pid '{cur_pid}', mach_id '{mach_id}', file_pid '{file_pid}', file_mach_id '{file_mach_id}'"
)
if cur_pid == file_pid:
if mach_id != file_mach_id:
if not file_mach_id:
@ -1145,6 +1183,9 @@ class SignalHandlingProcess(Process):
for (file_name, file_pid, file_mach_id) in file_del_list:
try:
log.warning(
f"DGM _handle_signals file_pid '{file_pid}', file_mach_id '{file_mach_id}', removing file name '{file_name}'"
)
os.remove(file_name)
except OSError as exc:
if exc.errno == errno.ENOENT:

View file

@ -7,7 +7,6 @@ import logging
import pathlib
import tempfile
import time
from multiprocessing import Process
import pytest
from saltfactories.utils import random_string
@ -18,6 +17,7 @@ import salt.utils.files
import salt.utils.gitfs
import salt.utils.path
import salt.utils.platform
import salt.utils.process
from salt.utils.immutabletypes import freeze
from salt.utils.verify import verify_env
from tests.support.runtests import RUNTIME_VARS
@ -477,33 +477,34 @@ def process_kill_test(main_class):
Check that lock is obtained and then it should be released by SIGTERM checks
"""
log.debug("DGM process_kill_test entry")
provider = main_class.remotes[0]
provider.lock()
log.debug("DGM process_kill_test obtained lock")
# check that lock has been released
assert provider._master_lock.acquire(timeout=5)
log.debug("DGM process_kill_test tested assert masterlock acquire")
time.sleep(20) # give time for kill by sigterm
log.debug("DGM process_kill_test exit")
@pytest.mark.slow_test
@pytest.mark.skip_unless_on_linux
def test_git_provider_sigterm_cleanup(caplog):
def test_git_provider_sigterm_cleanup(main_class, caplog):
"""
Start process which will obtain lock, and leave it locked
then kill the process via SIGTERM and ensure locked resources are cleaned up
"""
log.debug("DGM test_git_provider_sigterm_cleanup entry")
## start process_kill_test and obtain it's PID
## proc = subprocess.Popen("what go's here to start process_kill_test, etc")
##
## child_pid = proc.pid
## log.debug(f"DGM test_git_provider_sigterm_cleanup child process pid '{child_pid}'")
##
## with caplog.at_level(logging.DEBUG):
## proc.send_signal(signal.SIGTERM)
proc = Process(target=process_kill_test)
proc.start()
provider = main_class.remotes[0]
procmgr = salt.utils.process.ProcessManager(wait_for_kill=30)
proc = procmgr.add_process(process_kill_test, args=(main_class,), name="test_kill")
## proc.start()
while not proc.is_alive():
log.debug(
@ -516,10 +517,33 @@ def test_git_provider_sigterm_cleanup(caplog):
f"DGM test_git_provider_sigterm_cleanup child process is alive with pid '{proc.pid}'"
)
with caplog.at_level(logging.DEBUG):
proc.terminate() # sends a SIGTERM
file_name = provider._get_lock_file("update")
log.debug(
f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}'"
)
test_msg1 = "SIGTERM clean up of resources, removed lock file"
assert test_msg1 in caplog.text
assert pathlib.Path(file_name).exists()
assert pathlib.Path(file_name).is_file()
log.debug(
f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', exists and is a file, send SIGTERM signal"
)
proc.terminate() # sends a SIGTERM
time.sleep(1) # give some time for it to terminate
log.debug("DGM test_git_provider_sigterm_cleanup lock , post terminate")
assert not proc.is_alive()
log.debug("DGM test_git_provider_sigterm_cleanup lock , child is not alive")
test_file_exits = pathlib.Path(file_name).exists()
log.debug(
f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', does it exist anymore '{test_file_exits}'"
)
assert not pathlib.Path(file_name).exists()
log.debug(
f"DGM test_git_provider_sigterm_cleanup lock file location, '{file_name}', does NOT exist anymore"
)
log.debug("DGM test_git_provider_sigterm_cleanup exit")