Merge pull request #40429 from DSRCorporation/features/39275_memcache

MemCache - a minion data cache booster.
This commit is contained in:
Mike Place 2017-03-31 14:20:59 -06:00 committed by GitHub
commit fdb0250c95
11 changed files with 186 additions and 86 deletions

162
salt/cache/__init__.py vendored
View file

@ -7,12 +7,30 @@ Loader mechanism for caching data, with data expiration, etc.
# Import Python libs
from __future__ import absolute_import
import logging
import time
# Import Salt lobs
from salt.ext import six
from salt.payload import Serial
from salt.utils.odict import OrderedDict
import salt.loader
import salt.syspaths
from salt.payload import Serial
log = logging.getLogger(__name__)
def factory(opts, **kwargs):
'''
Creates and returns the cache class.
If memory caching is enabled by opts MemCache class will be instantiated.
If not Cache class will be returned.
'''
if opts.get('memcache_expire_seconds', 0):
cls = MemCache
else:
cls = Cache
return cls(opts, **kwargs)
class Cache(object):
@ -48,20 +66,25 @@ class Cache(object):
Key name is a string identifier of a data container (like a file inside a
directory) which will hold the data.
'''
def __init__(self, opts, cachedir=None):
def __init__(self, opts, **kwargs):
self.opts = opts
if cachedir is None:
self.cachedir = opts.get('cachedir', salt.syspaths.CACHE_DIR)
else:
self.cachedir = cachedir
self.driver = opts['cache']
self.serial = Serial(opts)
self._modules = None
self._kwargs = kwargs
def __lazy_init(self):
self._modules = salt.loader.cache(self.opts, self.serial)
fun = '{0}.init_kwargs'.format(self.driver)
if fun in self.modules:
self._kwargs = self.modules[fun](self._kwargs)
else:
self._kwargs = {}
@property
def modules(self):
if self._modules is None:
self._modules = salt.loader.cache(self.opts, self.serial)
self.__lazy_init()
return self._modules
def cache(self, bank, key, fun, loop_fun=None, **kwargs):
@ -123,11 +146,8 @@ class Cache(object):
Raises an exception if cache driver detected an error accessing data
in the cache backend (auth, permissions, etc).
'''
fun = '{0}.{1}'.format(self.driver, 'store')
try:
return self.modules[fun](bank, key, data, self.cachedir)
except TypeError:
return self.modules[fun](bank, key, data)
fun = '{0}.store'.format(self.driver)
return self.modules[fun](bank, key, data, **self._kwargs)
def fetch(self, bank, key):
'''
@ -150,11 +170,8 @@ class Cache(object):
Raises an exception if cache driver detected an error accessing data
in the cache backend (auth, permissions, etc).
'''
fun = '{0}.{1}'.format(self.driver, 'fetch')
try:
return self.modules[fun](bank, key, self.cachedir)
except TypeError:
return self.modules[fun](bank, key)
fun = '{0}.fetch'.format(self.driver)
return self.modules[fun](bank, key, **self._kwargs)
def updated(self, bank, key):
'''
@ -177,11 +194,8 @@ class Cache(object):
Raises an exception if cache driver detected an error accessing data
in the cache backend (auth, permissions, etc).
'''
fun = '{0}.{1}'.format(self.driver, 'updated')
try:
return self.modules[fun](bank, key, self.cachedir)
except TypeError:
return self.modules[fun](bank, key)
fun = '{0}.updated'.format(self.driver)
return self.modules[fun](bank, key, **self._kwargs)
def flush(self, bank, key=None):
'''
@ -201,13 +215,10 @@ class Cache(object):
Raises an exception if cache driver detected an error accessing data
in the cache backend (auth, permissions, etc).
'''
fun = '{0}.{1}'.format(self.driver, 'flush')
try:
return self.modules[fun](bank, key=key, cachedir=self.cachedir)
except TypeError:
return self.modules[fun](bank, key=key)
fun = '{0}.flush'.format(self.driver)
return self.modules[fun](bank, key=key, **self._kwargs)
def list(self, bank):
def ls(self, bank):
'''
Lists entries stored in the specified bank.
@ -223,11 +234,8 @@ class Cache(object):
Raises an exception if cache driver detected an error accessing data
in the cache backend (auth, permissions, etc).
'''
fun = '{0}.{1}'.format(self.driver, 'list')
try:
return self.modules[fun](bank, self.cachedir)
except TypeError:
return self.modules[fun](bank)
fun = '{0}.ls'.format(self.driver)
return self.modules[fun](bank, **self._kwargs)
def contains(self, bank, key=None):
'''
@ -251,8 +259,86 @@ class Cache(object):
Raises an exception if cache driver detected an error accessing data
in the cache backend (auth, permissions, etc).
'''
fun = '{0}.{1}'.format(self.driver, 'contains')
try:
return self.modules[fun](bank, key, self.cachedir)
except TypeError:
return self.modules[fun](bank, key)
fun = '{0}.contains'.format(self.driver)
return self.modules[fun](bank, key, **self._kwargs)
class MemCache(Cache):
'''
Short-lived in-memory cache store keeping values on time and/or size (count)
basis.
'''
# {<storage_id>: odict({<key>: [atime, data], ...}), ...}
data = {}
def __init__(self, opts, **kwargs):
super(MemCache, self).__init__(opts, **kwargs)
self.expire = opts.get('memcache_expire_seconds', 10)
self.max = opts.get('memcache_max_items', 1024)
self.cleanup = opts.get('memcache_full_cleanup', False)
self.debug = opts.get('memcache_debug', False)
if self.debug:
self.call = 0
self.hit = 0
self._storage = None
@classmethod
def __cleanup(cls, expire):
now = time.time()
for storage in six.itervalues(cls.data):
for key, data in list(storage.items()):
if data[0] + expire < now:
del storage[key]
def _get_storage_id(self):
fun = '{0}.storage_id'.format(self.driver)
if fun in self.modules:
return self.modules[fun](self.kwargs)
else:
return self.driver
@property
def storage(self):
if self._storage is None:
storage_id = self._get_storage_id()
if storage_id not in MemCache.data:
MemCache.data[storage_id] = OrderedDict()
self._storage = MemCache.data[storage_id]
return self._storage
def fetch(self, bank, key):
if self.debug:
self.call += 1
now = time.time()
record = self.storage.pop((bank, key), None)
# Have a cached value for the key
if record is not None and record[0] + self.expire >= now:
if self.debug:
self.hit += 1
log.trace('MemCache stats (call/hit/rate): '
'{0}/{1}/{2}'.format(self.call,
self.hit,
float(self.hit) / self.call))
# update atime and return
record[0] = now
self.storage[(bank, key)] = record
return record[1]
# Have no value for the key or value is expired
data = super(MemCache, self).fetch(bank, key)
self.storage[(bank, key)] = [now, data]
return data
def store(self, bank, key, data):
self.storage.pop((bank, key), None)
super(MemCache, self).store(bank, key, data)
if len(self.storage) >= self.max:
if self.cleanup:
MemCache.__cleanup(self.expire)
else:
self.storage.popitem(last=False)
self.storage[(bank, key)] = [time.time(), data]
def flush(self, bank, key=None):
self.storage.pop((bank, key), None)
super(MemCache, self).flush(bank, key)

View file

@ -54,9 +54,6 @@ except ImportError:
from salt.exceptions import SaltCacheError
# Don't shadow built-ins
__func_alias__ = {'list_': 'list'}
log = logging.getLogger(__name__)
api = None
@ -140,7 +137,7 @@ def flush(bank, key=None):
)
def list_(bank):
def ls(bank):
'''
Return an iterable object containing all entries stored in the specified bank.
'''
@ -164,9 +161,6 @@ def list_(bank):
return keys
getlist = list_
def contains(bank, key):
'''
Checks if the specified bank contains the specified key.

24
salt/cache/localfs.py vendored
View file

@ -21,12 +21,23 @@ from salt.exceptions import SaltCacheError
import salt.utils
import salt.utils.atomicfile
# Don't shadow built-ins
__func_alias__ = {'list_': 'list'}
log = logging.getLogger(__name__)
def __cachedir(kwargs=None):
if kwargs and 'cachedir' in kwargs:
return kwargs['cachedir']
return __opts__.get('cachedir', salt.syspaths.CACHE_DIR)
def init_kwargs(kwargs):
return {'cachedir': __cachedir(kwargs)}
def get_storage_id(kwargs):
return ('localfs', __cachedir(kwargs))
def store(bank, key, data, cachedir):
'''
Store information in a file.
@ -99,7 +110,7 @@ def flush(bank, key=None, cachedir=None):
Remove the key from the cache bank with all the key content.
'''
if cachedir is None:
cachedir = __opts__['cachedir']
cachedir = __cachedir()
try:
if key is None:
@ -121,7 +132,7 @@ def flush(bank, key=None, cachedir=None):
return True
def list_(bank, cachedir):
def ls(bank, cachedir):
'''
Return an iterable object containing all entries stored in the specified bank.
'''
@ -145,9 +156,6 @@ def list_(bank, cachedir):
return ret
getlist = list_
def contains(bank, key, cachedir):
'''
Checks if the specified bank contains the specified key.

View file

@ -1442,7 +1442,7 @@ class LocalClient(object):
if connected_minions is None:
connected_minions = salt.utils.minions.CkMinions(self.opts).connected_ids()
if self.opts['minion_data_cache'] \
and salt.cache.Cache(self.opts).contains('minions/{0}'.format(id_), 'data') \
and salt.cache.factory(self.opts).contains('minions/{0}'.format(id_), 'data') \
and connected_minions \
and id_ not in connected_minions:

View file

@ -919,6 +919,14 @@ VALID_OPTS = {
# Minion data cache driver (one of satl.cache.* modules)
'cache': str,
# Enables a fast in-memory cache booster and sets the expiration time.
'memcache_expire_seconds': int,
# Set a memcache limit in items per cache storage (driver+driver opts).
'memcache_max_items': int,
# Each time a cache storage got full cleanup all the expired items not just the oldest one.
'memcache_full_cleanup': bool,
# Enable collecting theh memcache stats and log it on `trace` log level.
'memcache_debug': bool,
# Extra modules for Salt Thin
'thin_extra_mods': str,
@ -1446,6 +1454,10 @@ DEFAULT_MASTER_OPTS = {
'python2_bin': 'python2',
'python3_bin': 'python3',
'cache': 'localfs',
'memcache_expire_seconds': 0,
'memcache_max_items': 1024,
'memcache_full_cleanup': False,
'memcache_debug': False,
'thin_extra_mods': '',
'ssl': None,
'django_auth_path': '',

View file

@ -454,7 +454,7 @@ class RemoteFuncs(object):
states=False,
rend=False)
self.__setup_fileserver()
self.cache = salt.cache.Cache(opts)
self.cache = salt.cache.factory(opts)
def __setup_fileserver(self):
'''

View file

@ -495,10 +495,10 @@ class Key(object):
for minion in os.listdir(m_cache):
if minion not in minions and minion not in preserve_minions:
shutil.rmtree(os.path.join(m_cache, minion))
cache = salt.cache.Cache(self.opts)
clist = cache.list(self.ACC)
cache = salt.cache.factory(self.opts)
clist = cache.ls(self.ACC)
if clist:
for minion in cache.list(self.ACC):
for minion in clist:
if minion not in minions and minion not in preserve_minions:
cache.flush('{0}/{1}'.format(self.ACC, minion))
@ -973,10 +973,10 @@ class RaetKey(Key):
for minion in os.listdir(m_cache):
if minion not in minions:
shutil.rmtree(os.path.join(m_cache, minion))
cache = salt.cache.Cache(self.opts)
clist = cache.list(self.ACC)
cache = salt.cache.factory(self.opts)
clist = cache.ls(self.ACC)
if clist:
for minion in cache.list(self.ACC):
for minion in clist:
if minion not in minions and minion not in preserve_minions:
cache.flush('{0}/{1}'.format(self.ACC, minion))

View file

@ -45,7 +45,7 @@ class ThorState(salt.state.HighState):
opts['file_client'] = 'local'
self.opts = opts
if opts.get('minion_data_cache'):
self.cache = salt.cache.Cache(opts)
self.cache = salt.cache.factory(opts)
salt.state.HighState.__init__(self, self.opts, loader='thorium')
self.returners = salt.loader.returners(self.opts, {})
@ -69,11 +69,11 @@ class ThorState(salt.state.HighState):
cache = {'grains': {}, 'pillar': {}}
if self.grains or self.pillar:
if self.opts.get('minion_data_cache'):
minions = self.cache.list('minions')
minions = self.cache.ls('minions')
if not minions:
return cache
for minion in minions:
total = salt.cache.fetch('minions/{0}'.format(minion), 'data')
total = self.cache.fetch('minions/{0}'.format(minion), 'data')
if 'pillar' in total:
if self.pillar_keys:

View file

@ -91,7 +91,7 @@ class MasterPillarUtil(object):
self.use_cached_pillar = use_cached_pillar
self.grains_fallback = grains_fallback
self.pillar_fallback = pillar_fallback
self.cache = salt.cache.Cache(opts)
self.cache = salt.cache.factory(opts)
log.debug(
'Init settings: tgt: \'{0}\', expr_form: \'{1}\', saltenv: \'{2}\', '
'use_cached_grains: {3}, use_cached_pillar: {4}, '
@ -110,7 +110,7 @@ class MasterPillarUtil(object):
'and enfore_mine_cache are both disabled.')
return mine_data
if not minion_ids:
minion_ids = self.cache.list('minions')
minion_ids = self.cache.ls('minions')
for minion_id in minion_ids:
if not salt.utils.verify.valid_id(self.opts, minion_id):
continue
@ -129,7 +129,7 @@ class MasterPillarUtil(object):
'enabled.')
return grains, pillars
if not minion_ids:
minion_ids = self.cache.list('minions')
minion_ids = self.cache.ls('minions')
for minion_id in minion_ids:
if not salt.utils.verify.valid_id(self.opts, minion_id):
continue
@ -352,7 +352,7 @@ class MasterPillarUtil(object):
# in the same file, 'data.p'
grains, pillars = self._get_cached_minion_data(*minion_ids)
try:
c_minions = self.cache.list('minions')
c_minions = self.cache.ls('minions')
for minion_id in minion_ids:
if not salt.utils.verify.valid_id(self.opts, minion_id):
continue

View file

@ -73,9 +73,9 @@ def get_minion_data(minion, opts):
grains = None
pillar = None
if opts.get('minion_data_cache', False):
cache = salt.cache.Cache(opts)
cache = salt.cache.factory(opts)
if minion is None:
for id_ in cache.list('minions'):
for id_ in cache.ls('minions'):
data = cache.fetch('minions/{0}'.format(id_), 'data')
if data is None:
continue
@ -180,7 +180,7 @@ class CkMinions(object):
def __init__(self, opts):
self.opts = opts
self.serial = salt.payload.Serial(opts)
self.cache = salt.cache.Cache(opts)
self.cache = salt.cache.factory(opts)
# TODO: this is actually an *auth* check
if self.opts.get('transport', 'zeromq') in ('zeromq', 'tcp'):
self.acc = 'minions'
@ -344,13 +344,13 @@ class CkMinions(object):
if greedy:
minions = self._pki_minions()
elif cache_enabled:
minions = self.cache.list('minions')
minions = self.cache.ls('minions')
else:
return []
if cache_enabled:
if greedy:
cminions = self.cache.list('minions')
cminions = self.cache.ls('minions')
else:
cminions = minions
if cminions is None:
@ -414,7 +414,7 @@ class CkMinions(object):
mlist.append(fn_)
return mlist
elif cache_enabled:
return self.cache.list('minions')
return self.cache.ls('minions')
else:
return list()
@ -576,7 +576,7 @@ class CkMinions(object):
'''
minions = set()
if self.opts.get('minion_data_cache', False):
search = self.cache.list('minions')
search = self.cache.ls('minions')
if search is None:
return minions
addrs = salt.utils.network.local_port_tcp(int(self.opts['publish_port']))
@ -1097,7 +1097,7 @@ def mine_get(tgt, fun, tgt_type='glob', opts=None):
minions = checker.check_minions(
tgt,
tgt_type)
cache = salt.cache.Cache(opts)
cache = salt.cache.factory(opts)
for minion in minions:
mdata = cache.fetch('minions/{0}'.format(minion), 'mine')
if mdata is None:

View file

@ -209,29 +209,29 @@ class LocalFSTest(TestCase):
'''
self.assertRaises(SaltCacheError, localfs.flush, bank='', key='key', cachedir='/var/cache/salt')
# 'list' function tests: 3
# 'ls' function tests: 3
@patch('os.path.isdir', MagicMock(return_value=False))
def test_list_no_base_dir(self):
def test_ls_no_base_dir(self):
'''
Tests that the list function returns an empty list if the bank directory
Tests that the ls function returns an empty list if the bank directory
doesn't exist.
'''
self.assertEqual(localfs.list_(bank='', cachedir=''), [])
self.assertEqual(localfs.ls(bank='', cachedir=''), [])
@patch('os.path.isdir', MagicMock(return_value=True))
@patch('os.listdir', MagicMock(side_effect=OSError))
def test_list_error_raised_no_bank_directory_access(self):
def test_ls_error_raised_no_bank_directory_access(self):
'''
Tests that a SaltCacheError is raised when there is a problem accessing the
cache bank directory.
'''
self.assertRaises(SaltCacheError, localfs.list_, bank='', cachedir='')
self.assertRaises(SaltCacheError, localfs.ls, bank='', cachedir='')
@destructiveTest
def test_list_success(self):
def test_ls_success(self):
'''
Tests the return of the list function containing bank entries.
Tests the return of the ls function containing bank entries.
'''
# Create a temporary cache dir
tmp_dir = tempfile.mkdtemp(dir=integration.SYS_TMP_DIR)
@ -239,9 +239,9 @@ class LocalFSTest(TestCase):
# Use the helper function to create the cache file using localfs.store()
self._create_tmp_cache_file(tmp_dir, salt.payload.Serial(self))
# Now test the return of the list function
# Now test the return of the ls function
with patch.dict(localfs.__opts__, {'cachedir': tmp_dir}):
self.assertEqual(localfs.list_(bank='bank', cachedir=tmp_dir), ['key'])
self.assertEqual(localfs.ls(bank='bank', cachedir=tmp_dir), ['key'])
# 'contains' function tests: 1