mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Make LazyLoader thread safe
Even when `multiprocessing` is set to `True`, there is a case where multiple threads in the same process attempt to use the same LazyLoader object. When using the reactor and reacting to an event that will call a runner, `salt.utils.reactor.ReactWrap.runner` will invoke `self.pool.fire_async(self.client_cache['runner'].low, args=(fun, kwargs))` potentially multiple times, each time using a thread from `salt.utils.process.ThreadPool`. Each thread will invoke `salt.client.mixins.SyncClientMixin.low` which in turn will invoke its `_low` and call `salt.utils.job.store_job`. `salt.utils.job.store_job` will invoke the LazyLoader object for the returner. Since the LazyLoader object is not thread safe, occasional failures will occur which will reduce the reliability of the overall system. Let's examine why a function such as `LazyLoader._load` is not thread safe. Any time the GIL is released, it allows another thread to run. There are various types of operations that could release the GIL, but in this particular case they are file operations that happen in both `refresh_file_mapping` and `_load_module`. Note that if you add `print` statements, those also release the GIL (and make the problem more frequent). In the failure case, `refresh_file_mapping` releases the GIL, another thread loads the module, and then when the original thread runs again it will fail when `_inner_load` runs the second time (after `refresh_file_mapping`). The failure is because the module is already in `self.loaded_files`, so it is skipped over and `_inner_load` returns `False` even though the required `key` is already in `self._dict`. Since adding in stuff like `print` statements, or other logic also adds points in the code that allow thread switches, the most robust solution to such a problem is to use a mutex (as opposed to rechecking if `key` now appears in `self._dict` at certain checkpoints). This solution adds such a mutex and uses it in key places to ensure integrity. Signed-off-by: Sergey Kizunov <sergey.kizunov@ni.com>
This commit is contained in:
parent
4e7466a21c
commit
c624aa4827
1 changed files with 59 additions and 51 deletions
110
salt/loader.py
110
salt/loader.py
|
@ -14,6 +14,7 @@ import logging
|
|||
import inspect
|
||||
import tempfile
|
||||
import functools
|
||||
import threading
|
||||
import types
|
||||
from collections import MutableMapping
|
||||
from zipimport import zipimporter
|
||||
|
@ -1100,7 +1101,8 @@ class LazyLoader(salt.utils.lazy.LazyDict):
|
|||
self.disabled = set(self.opts.get('disable_{0}{1}'.format(
|
||||
self.tag, '' if self.tag[-1] == 's' else 's'), []))
|
||||
|
||||
self.refresh_file_mapping()
|
||||
self._lock = threading.RLock()
|
||||
self._refresh_file_mapping()
|
||||
|
||||
super(LazyLoader, self).__init__() # late init the lazy loader
|
||||
# create all of the import namespaces
|
||||
|
@ -1167,7 +1169,7 @@ class LazyLoader(salt.utils.lazy.LazyDict):
|
|||
else:
|
||||
return '\'{0}\' __virtual__ returned False'.format(mod_name)
|
||||
|
||||
def refresh_file_mapping(self):
|
||||
def _refresh_file_mapping(self):
|
||||
'''
|
||||
refresh the mapping of the FS on disk
|
||||
'''
|
||||
|
@ -1285,15 +1287,16 @@ class LazyLoader(salt.utils.lazy.LazyDict):
|
|||
'''
|
||||
Clear the dict
|
||||
'''
|
||||
super(LazyLoader, self).clear() # clear the lazy loader
|
||||
self.loaded_files = set()
|
||||
self.missing_modules = {}
|
||||
self.loaded_modules = {}
|
||||
# if we have been loaded before, lets clear the file mapping since
|
||||
# we obviously want a re-do
|
||||
if hasattr(self, 'opts'):
|
||||
self.refresh_file_mapping()
|
||||
self.initial_load = False
|
||||
with self._lock:
|
||||
super(LazyLoader, self).clear() # clear the lazy loader
|
||||
self.loaded_files = set()
|
||||
self.missing_modules = {}
|
||||
self.loaded_modules = {}
|
||||
# if we have been loaded before, lets clear the file mapping since
|
||||
# we obviously want a re-do
|
||||
if hasattr(self, 'opts'):
|
||||
self._refresh_file_mapping()
|
||||
self.initial_load = False
|
||||
|
||||
def __prep_mod_opts(self, opts):
|
||||
'''
|
||||
|
@ -1504,14 +1507,14 @@ class LazyLoader(salt.utils.lazy.LazyDict):
|
|||
virtual_funcs_to_process = ['__virtual__'] + self.virtual_funcs
|
||||
for virtual_func in virtual_funcs_to_process:
|
||||
virtual_ret, module_name, virtual_err, virtual_aliases = \
|
||||
self.process_virtual(mod, module_name)
|
||||
self._process_virtual(mod, module_name)
|
||||
if virtual_err is not None:
|
||||
log.trace(
|
||||
'Error loading %s.%s: %s',
|
||||
self.tag, module_name, virtual_err
|
||||
)
|
||||
|
||||
# if process_virtual returned a non-True value then we are
|
||||
# if _process_virtual returned a non-True value then we are
|
||||
# supposed to not process this module
|
||||
if virtual_ret is not True and module_name not in self.missing_modules:
|
||||
# If a module has information about why it could not be loaded, record it
|
||||
|
@ -1601,39 +1604,42 @@ class LazyLoader(salt.utils.lazy.LazyDict):
|
|||
if not isinstance(key, six.string_types) or '.' not in key:
|
||||
raise KeyError
|
||||
mod_name, _ = key.split('.', 1)
|
||||
if mod_name in self.missing_modules:
|
||||
return True
|
||||
# if the modulename isn't in the whitelist, don't bother
|
||||
if self.whitelist and mod_name not in self.whitelist:
|
||||
raise KeyError
|
||||
with self._lock:
|
||||
# It is possible that the key is in the dictionary after
|
||||
# acquiring the lock due to another thread loading it.
|
||||
if mod_name in self.missing_modules or key in self._dict:
|
||||
return True
|
||||
# if the modulename isn't in the whitelist, don't bother
|
||||
if self.whitelist and mod_name not in self.whitelist:
|
||||
raise KeyError
|
||||
|
||||
def _inner_load(mod_name):
|
||||
for name in self._iter_files(mod_name):
|
||||
if name in self.loaded_files:
|
||||
continue
|
||||
# if we got what we wanted, we are done
|
||||
if self._load_module(name) and key in self._dict:
|
||||
return True
|
||||
return False
|
||||
def _inner_load(mod_name):
|
||||
for name in self._iter_files(mod_name):
|
||||
if name in self.loaded_files:
|
||||
continue
|
||||
# if we got what we wanted, we are done
|
||||
if self._load_module(name) and key in self._dict:
|
||||
return True
|
||||
return False
|
||||
|
||||
# try to load the module
|
||||
ret = None
|
||||
reloaded = False
|
||||
# re-scan up to once, IOErrors or a failed load cause re-scans of the
|
||||
# filesystem
|
||||
while True:
|
||||
try:
|
||||
ret = _inner_load(mod_name)
|
||||
if not reloaded and ret is not True:
|
||||
self.refresh_file_mapping()
|
||||
reloaded = True
|
||||
# try to load the module
|
||||
ret = None
|
||||
reloaded = False
|
||||
# re-scan up to once, IOErrors or a failed load cause re-scans of the
|
||||
# filesystem
|
||||
while True:
|
||||
try:
|
||||
ret = _inner_load(mod_name)
|
||||
if not reloaded and ret is not True:
|
||||
self._refresh_file_mapping()
|
||||
reloaded = True
|
||||
continue
|
||||
break
|
||||
except IOError:
|
||||
if not reloaded:
|
||||
self._refresh_file_mapping()
|
||||
reloaded = True
|
||||
continue
|
||||
break
|
||||
except IOError:
|
||||
if not reloaded:
|
||||
self.refresh_file_mapping()
|
||||
reloaded = True
|
||||
continue
|
||||
|
||||
return ret
|
||||
|
||||
|
@ -1641,16 +1647,18 @@ class LazyLoader(salt.utils.lazy.LazyDict):
|
|||
'''
|
||||
Load all of them
|
||||
'''
|
||||
for name in self.file_mapping:
|
||||
if name in self.loaded_files or name in self.missing_modules:
|
||||
continue
|
||||
self._load_module(name)
|
||||
with self._lock:
|
||||
for name in self.file_mapping:
|
||||
if name in self.loaded_files or name in self.missing_modules:
|
||||
continue
|
||||
self._load_module(name)
|
||||
|
||||
self.loaded = True
|
||||
self.loaded = True
|
||||
|
||||
def reload_modules(self):
|
||||
self.loaded_files = set()
|
||||
self._load_all()
|
||||
with self._lock:
|
||||
self.loaded_files = set()
|
||||
self._load_all()
|
||||
|
||||
def _apply_outputter(self, func, mod):
|
||||
'''
|
||||
|
@ -1661,7 +1669,7 @@ class LazyLoader(salt.utils.lazy.LazyDict):
|
|||
if func.__name__ in outp:
|
||||
func.__outputter__ = outp[func.__name__]
|
||||
|
||||
def process_virtual(self, mod, module_name, virtual_func='__virtual__'):
|
||||
def _process_virtual(self, mod, module_name, virtual_func='__virtual__'):
|
||||
'''
|
||||
Given a loaded module and its default name determine its virtual name
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue