Address leaks in fileserver caused by git backends

At this time we do not have the ability to fix the upstream memory leaks
in the gitfs backend providers. Work around their limitations by
periodically restarting the file server update proccess. This will at
least partially address #50313
This commit is contained in:
Daniel A. Wozniak 2021-06-16 14:25:08 -07:00 committed by Megan Wilhite
parent 5ae3bc9821
commit d0141cc316
4 changed files with 71 additions and 40 deletions

1
changelog/50313.fixed Normal file
View file

@ -0,0 +1 @@
Periodically restart the fileserver update process to avoid leaks

View file

@ -438,52 +438,54 @@ class FileserverUpdate(salt.utils.process.SignalHandlingProcess):
(backend, update_func)
] = None
def update_fileserver(self, interval, backends):
@staticmethod
def _do_update(backends):
"""
Perform fileserver updates
"""
for backend, update_args in backends.items():
backend_name, update_func = backend
try:
if update_args:
log.debug(
"Updating %s fileserver cache for the following " "targets: %s",
backend_name,
update_args,
)
args = (update_args,)
else:
log.debug("Updating %s fileserver cache", backend_name)
args = ()
update_func(*args)
except Exception as exc: # pylint: disable=broad-except
log.exception(
"Uncaught exception while updating %s fileserver " "cache",
backend_name,
)
@classmethod
def update(cls, interval, backends, timeout=300):
"""
Threading target which handles all updates for a given wait interval
"""
def _do_update():
start = time.time()
condition = threading.Condition()
while time.time() - start < timeout:
log.debug(
"Performing fileserver updates for items with an update "
"interval of %d",
interval,
)
for backend, update_args in backends.items():
backend_name, update_func = backend
try:
if update_args:
log.debug(
"Updating %s fileserver cache for the following "
"targets: %s",
backend_name,
update_args,
)
args = (update_args,)
else:
log.debug("Updating %s fileserver cache", backend_name)
args = ()
update_func(*args)
except Exception as exc: # pylint: disable=broad-except
log.exception(
"Uncaught exception while updating %s fileserver " "cache",
backend_name,
)
cls._do_update(backends)
log.debug(
"Completed fileserver updates for items with an update "
"interval of %d, waiting %d seconds",
interval,
interval,
)
condition = threading.Condition()
_do_update()
while True:
with condition:
condition.wait(interval)
_do_update()
def run(self):
"""
@ -506,13 +508,15 @@ class FileserverUpdate(salt.utils.process.SignalHandlingProcess):
for interval in self.buckets:
self.update_threads[interval] = threading.Thread(
target=self.update_fileserver, args=(interval, self.buckets[interval]),
target=self.update, args=(interval, self.buckets[interval]),
)
self.update_threads[interval].start()
# Keep the process alive
while True:
time.sleep(60)
while self.update_threads:
for name, thread in list(self.update_threads.items()):
thread.join(1)
if not thread.is_alive():
self.update_threads.pop(name)
class Master(SMaster):

View file

@ -585,12 +585,21 @@ class ProcessManager:
"""
if self._restart_processes is False:
return
log.info(
"Process %s (%s) died with exit status %s, restarting...",
self._process_map[pid]["tgt"],
pid,
self._process_map[pid]["Process"].exitcode,
)
exit = self._process_map[pid]["Process"].exitcode
if exit > 0:
log.info(
"Process %s (%s) died with exit status %s, restarting...",
self._process_map[pid]["tgt"],
pid,
self._process_map[pid]["Process"].exitcode,
)
else:
log.debug(
"Process %s (%s) died with exit status %s, restarting...",
self._process_map[pid]["tgt"],
pid,
self._process_map[pid]["Process"].exitcode,
)
# don't block, the process is already dead
self._process_map[pid]["Process"].join(1)

View file

@ -0,0 +1,17 @@
import time
import pytest
import salt.master
from tests.support.mock import patch
def test_fileserver_duration():
with patch("salt.master.FileserverUpdate._do_update") as update:
start = time.time()
salt.master.FileserverUpdate.update(1, {}, 1)
end = time.time()
# Interval is equal to timeout so the _do_update method will be called
# one time.
update.called_once()
# Timeout is 1 second
assert 2 > end - start > 1