Make LazyLoader thread safe

Even when `multiprocessing` is set to `True`, there is a case where
multiple threads in the same process attempt to use the same LazyLoader
object. When using the reactor and reacting to an event that will call a
runner, `salt.utils.reactor.ReactWrap.runner` will invoke
`self.pool.fire_async(self.client_cache['runner'].low, args=(fun, kwargs))`
potentially multiple times, each time using a thread from
`salt.utils.process.ThreadPool`. Each thread will invoke
`salt.client.mixins.SyncClientMixin.low` which in turn will invoke its
`_low` and call `salt.utils.job.store_job`. `salt.utils.job.store_job`
will invoke the LazyLoader object for the returner.

Since the LazyLoader object is not thread safe, occasional failures will
occur which will reduce the reliability of the overall system.

Let's examine why a function such as `LazyLoader._load` is not thread safe.
Any time the GIL is released, it allows another thread to run. There are
various types of operations that could release the GIL, but in this
particular case they are file operations that happen in both
`refresh_file_mapping` and `_load_module`. Note that if you add `print`
statements, those also release the GIL (and make the problem more
frequent). In the failure case, `refresh_file_mapping` releases the
GIL, another thread loads the module, and then when the original thread
runs again it will fail when `_inner_load` runs the second time (after
`refresh_file_mapping`). The failure is because the module is already in
`self.loaded_files`, so it is skipped over and `_inner_load` returns
`False` even though the required `key` is already in `self._dict`. Since
adding in stuff like `print` statements, or other logic also adds points in
the code that allow thread switches, the most robust solution to such a
problem is to use a mutex (as opposed to rechecking if `key` now appears
in `self._dict` at certain checkpoints).

This solution adds such a mutex and uses it in key places to ensure
integrity.

Signed-off-by: Sergey Kizunov <sergey.kizunov@ni.com>
This commit is contained in:
Sergey Kizunov 2018-03-20 13:54:13 -05:00
parent 4e7466a21c
commit c624aa4827
No known key found for this signature in database
GPG key ID: 9CEE55A1DE138AAB

View file

@ -14,6 +14,7 @@ import logging
import inspect
import tempfile
import functools
import threading
import types
from collections import MutableMapping
from zipimport import zipimporter
@ -1100,7 +1101,8 @@ class LazyLoader(salt.utils.lazy.LazyDict):
self.disabled = set(self.opts.get('disable_{0}{1}'.format(
self.tag, '' if self.tag[-1] == 's' else 's'), []))
self.refresh_file_mapping()
self._lock = threading.RLock()
self._refresh_file_mapping()
super(LazyLoader, self).__init__() # late init the lazy loader
# create all of the import namespaces
@ -1167,7 +1169,7 @@ class LazyLoader(salt.utils.lazy.LazyDict):
else:
return '\'{0}\' __virtual__ returned False'.format(mod_name)
def refresh_file_mapping(self):
def _refresh_file_mapping(self):
'''
refresh the mapping of the FS on disk
'''
@ -1285,15 +1287,16 @@ class LazyLoader(salt.utils.lazy.LazyDict):
'''
Clear the dict
'''
super(LazyLoader, self).clear() # clear the lazy loader
self.loaded_files = set()
self.missing_modules = {}
self.loaded_modules = {}
# if we have been loaded before, lets clear the file mapping since
# we obviously want a re-do
if hasattr(self, 'opts'):
self.refresh_file_mapping()
self.initial_load = False
with self._lock:
super(LazyLoader, self).clear() # clear the lazy loader
self.loaded_files = set()
self.missing_modules = {}
self.loaded_modules = {}
# if we have been loaded before, lets clear the file mapping since
# we obviously want a re-do
if hasattr(self, 'opts'):
self._refresh_file_mapping()
self.initial_load = False
def __prep_mod_opts(self, opts):
'''
@ -1504,14 +1507,14 @@ class LazyLoader(salt.utils.lazy.LazyDict):
virtual_funcs_to_process = ['__virtual__'] + self.virtual_funcs
for virtual_func in virtual_funcs_to_process:
virtual_ret, module_name, virtual_err, virtual_aliases = \
self.process_virtual(mod, module_name)
self._process_virtual(mod, module_name)
if virtual_err is not None:
log.trace(
'Error loading %s.%s: %s',
self.tag, module_name, virtual_err
)
# if process_virtual returned a non-True value then we are
# if _process_virtual returned a non-True value then we are
# supposed to not process this module
if virtual_ret is not True and module_name not in self.missing_modules:
# If a module has information about why it could not be loaded, record it
@ -1601,39 +1604,42 @@ class LazyLoader(salt.utils.lazy.LazyDict):
if not isinstance(key, six.string_types) or '.' not in key:
raise KeyError
mod_name, _ = key.split('.', 1)
if mod_name in self.missing_modules:
return True
# if the modulename isn't in the whitelist, don't bother
if self.whitelist and mod_name not in self.whitelist:
raise KeyError
with self._lock:
# It is possible that the key is in the dictionary after
# acquiring the lock due to another thread loading it.
if mod_name in self.missing_modules or key in self._dict:
return True
# if the modulename isn't in the whitelist, don't bother
if self.whitelist and mod_name not in self.whitelist:
raise KeyError
def _inner_load(mod_name):
for name in self._iter_files(mod_name):
if name in self.loaded_files:
continue
# if we got what we wanted, we are done
if self._load_module(name) and key in self._dict:
return True
return False
def _inner_load(mod_name):
for name in self._iter_files(mod_name):
if name in self.loaded_files:
continue
# if we got what we wanted, we are done
if self._load_module(name) and key in self._dict:
return True
return False
# try to load the module
ret = None
reloaded = False
# re-scan up to once, IOErrors or a failed load cause re-scans of the
# filesystem
while True:
try:
ret = _inner_load(mod_name)
if not reloaded and ret is not True:
self.refresh_file_mapping()
reloaded = True
# try to load the module
ret = None
reloaded = False
# re-scan up to once, IOErrors or a failed load cause re-scans of the
# filesystem
while True:
try:
ret = _inner_load(mod_name)
if not reloaded and ret is not True:
self._refresh_file_mapping()
reloaded = True
continue
break
except IOError:
if not reloaded:
self._refresh_file_mapping()
reloaded = True
continue
break
except IOError:
if not reloaded:
self.refresh_file_mapping()
reloaded = True
continue
return ret
@ -1641,16 +1647,18 @@ class LazyLoader(salt.utils.lazy.LazyDict):
'''
Load all of them
'''
for name in self.file_mapping:
if name in self.loaded_files or name in self.missing_modules:
continue
self._load_module(name)
with self._lock:
for name in self.file_mapping:
if name in self.loaded_files or name in self.missing_modules:
continue
self._load_module(name)
self.loaded = True
self.loaded = True
def reload_modules(self):
self.loaded_files = set()
self._load_all()
with self._lock:
self.loaded_files = set()
self._load_all()
def _apply_outputter(self, func, mod):
'''
@ -1661,7 +1669,7 @@ class LazyLoader(salt.utils.lazy.LazyDict):
if func.__name__ in outp:
func.__outputter__ = outp[func.__name__]
def process_virtual(self, mod, module_name, virtual_func='__virtual__'):
def _process_virtual(self, mod, module_name, virtual_func='__virtual__'):
'''
Given a loaded module and its default name determine its virtual name