Cache API tests & unifying behavior (#61229)

* Add tests for cache backends

* Tests run against everything!

* Get consul passing cache tests

* now mysql tests also pass

* comments

* add mysql cache tests

* Ensure consul cache is flushing timestamp keys

* Ensure etcd cache is flushing timestamp keys

* Update redis cache api to conform to localfs

There was a bug in some of the lookups where it was looking at sub banks
instead of the banks containing the leaf/key nodes. That was fixed. Also
restored the original behaviors, only adding the updated functionality,
namely adding timestamp key/values to redis.

* Fix etcd path inconsistency

Previously on X-Files. Wait nope. Previously on etcd cache files...
😂 if a path were 1/2/3 then etcd would break with IndexError, but
when it was 1/2/3/4/5 then instead of returning the "file" (key?) 5 it
would return `4/5` as the name. Which is probably not correct, and
definitely inconsistent with localfs (and every other cache module).

Now it will always only return the final segment of the cache (e.g. 7
for 1/2/3/4/5/6/7 and 5 for 1/2/5)

* Remove comments and cleanup test setup

Comments were outdated, now they're gone or fixed. Also shifted around
some of the test setup to make it a bit cleaner.

* Add localfs backed memcache to the tests

Fixed an inconsistency when flushing banks with None key. Also added
to the test run. We may want to add other backing caches via alternate
fixtures?

* Bad serial

* No this is wrong

* Probably the right lazy loader fix

* Restore tested mysql cache functionality

Added a force_reconnect function because I couldn't figure out how to
remove the client from the context any other way. There may be a better
way to do that, but there will probably be some loader changes anyway.

* remove silly files

Saltfile never should have been added in the first place and the
integration test wasn't really necessary, functional was better/lighter.

* Fix redis_cache cluster mode import

Fixes #60272 - the import statement was totally incorrect. Now it will
actually attempt to connect to a redis cluster.

* payload should string-ify all the data

* Fix unit tests for parametrization

From what I can tell, previously the query was being built in Python
instead of using mysql to add the parameters, and this probably horked
up the handling of binary strings.

Unless some other version of the DB will be problematic...

* Fix missing docstring complaints

* incorporate PR feedback

* Skip docker if not found and add the rest of mysql

I really really hate the way this is configured but I can't make pytest
use fixtures as parameters in fixtures. I may have to struggle to come
up with another way.

* Tests all passing, woo!

* run black

* Disable pylint here

* Skip when mysql fails during CI

This has been tested locally and it runs all of them. I'm sure with the
CI environment it's possible that it fails for no particularly good
reason. And it's definitely not worth blocking a PR for (though arguably
it could be if *all* of the images fail, but...)
This commit is contained in:
Wayne Werner 2022-04-08 08:18:38 -05:00 committed by GitHub
parent 5765b31dd7
commit deeeaa3cd3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1234 additions and 78 deletions

1
changelog/60272.fixed Normal file
View file

@ -0,0 +1 @@
Corrected import statement for redis_cache in cluster mode.

1
changelog/61081.changed Normal file
View file

@ -0,0 +1 @@
Updated mysql cache module to also store updated timestamp, making it consistent with default cache module. Users of mysql cache should ensure database size before updating, as ALTER TABLE will add the timestamp column.

View file

@ -39,7 +39,7 @@ class Cache:
The name of the cache driver to use. This is the name of the python
module of the `salt.cache` package. Default is `localfs`.
Terminology.
Terminology:
Salt cache subsystem is organized as a tree with nodes and leafs like a
filesystem. Cache consists of banks. Each bank can contain a number of
@ -345,5 +345,10 @@ class MemCache(Cache):
self.storage[(bank, key)] = [time.time(), data]
def flush(self, bank, key=None):
self.storage.pop((bank, key), None)
if key is None:
for bank_, key_ in tuple(self.storage):
if bank == bank_:
self.storage.pop((bank_, key_))
else:
self.storage.pop((bank, key), None)
super().flush(bank, key)

60
salt/cache/consul.py vendored
View file

@ -3,6 +3,10 @@ Minion data cache plugin for Consul key/value data store.
.. versionadded:: 2016.11.2
.. versionchanged:: 3005.0
Timestamp/cache updated support added.
:depends: python-consul >= 0.2.0
It is up to the system administrator to set up and configure the Consul
@ -30,6 +34,12 @@ could be set in the master config. These are the defaults:
consul.consistency: default
consul.dc: dc1
consul.verify: True
consul.timestamp_suffix: .tstamp # Added in 3005.0
In order to bring the cache APIs into conformity, in 3005.0 timestamp
information gets stored as a separate ``{key}.tstamp`` key/value. If your
existing functionality depends on being able to store normal keys with the
``.tstamp`` suffix, override the ``consul.timestamp_suffix`` default config.
Related docs could be found in the `python-consul documentation`_.
@ -47,6 +57,7 @@ value to ``consul``:
"""
import logging
import time
import salt.payload
from salt.exceptions import SaltCacheError
@ -61,6 +72,7 @@ except ImportError:
log = logging.getLogger(__name__)
api = None
_tstamp_suffix = ".tstamp"
# Define the module's virtual name
@ -90,7 +102,8 @@ def __virtual__():
}
try:
global api
global api, _tstamp_suffix
_tstamp_suffix = __opts__.get("consul.timestamp_suffix", _tstamp_suffix)
api = consul.Consul(**consul_kwargs)
except AttributeError:
return (
@ -107,9 +120,12 @@ def store(bank, key, data):
Store a key value.
"""
c_key = "{}/{}".format(bank, key)
tstamp_key = "{}/{}{}".format(bank, key, _tstamp_suffix)
try:
c_data = salt.payload.dumps(data)
api.kv.put(c_key, c_data)
api.kv.put(tstamp_key, salt.payload.dumps(int(time.time())))
except Exception as exc: # pylint: disable=broad-except
raise SaltCacheError(
"There was an error writing the key, {}: {}".format(c_key, exc)
@ -138,9 +154,13 @@ def flush(bank, key=None):
"""
if key is None:
c_key = bank
tstamp_key = None
else:
c_key = "{}/{}".format(bank, key)
tstamp_key = "{}/{}{}".format(bank, key, _tstamp_suffix)
try:
if tstamp_key:
api.kv.delete(tstamp_key)
return api.kv.delete(c_key, recurse=key is None)
except Exception as exc: # pylint: disable=broad-except
raise SaltCacheError(
@ -166,7 +186,7 @@ def list_(bank):
out = set()
for key in keys:
out.add(key[len(bank) + 1 :].rstrip("/"))
keys = list(out)
keys = [o for o in out if not o.endswith(_tstamp_suffix)]
return keys
@ -174,14 +194,28 @@ def contains(bank, key):
"""
Checks if the specified bank contains the specified key.
"""
if key is None:
return True # any key could be a branch and a leaf at the same time in Consul
else:
try:
c_key = "{}/{}".format(bank, key)
_, value = api.kv.get(c_key)
except Exception as exc: # pylint: disable=broad-except
raise SaltCacheError(
"There was an error getting the key, {}: {}".format(c_key, exc)
)
return value is not None
try:
c_key = "{}/{}".format(bank, key or "")
_, value = api.kv.get(c_key, keys=True)
except Exception as exc: # pylint: disable=broad-except
raise SaltCacheError(
"There was an error getting the key, {}: {}".format(c_key, exc)
)
return value is not None
def updated(bank, key):
"""
Return the Unix Epoch timestamp of when the key was last updated. Return
None if key is not found.
"""
c_key = "{}/{}{}".format(bank, key, _tstamp_suffix)
try:
_, value = api.kv.get(c_key)
if value is None:
return None
return salt.payload.loads(value["Value"])
except Exception as exc: # pylint: disable=broad-except
raise SaltCacheError(
"There was an error reading the key, {}: {}".format(c_key, exc)
)

View file

@ -2,6 +2,7 @@
Minion data cache plugin for Etcd key/value data store.
.. versionadded:: 2018.3.0
.. versionchanged:: 3005
It is up to the system administrator to set up and configure the Etcd
infrastructure. All is needed for this plugin is a working Etcd agent
@ -42,6 +43,9 @@ value to ``etcd``:
cache: etcd
In Phosphorus, ls/list was changed to always return the final name in the path.
This should only make a difference if you were directly using ``ls`` on paths
that were more or less nested than, for example: ``1/2/3/4``.
.. _`Etcd documentation`: https://github.com/coreos/etcd
.. _`python-etcd documentation`: http://python-etcd.readthedocs.io/en/latest/
@ -50,6 +54,7 @@ value to ``etcd``:
import base64
import logging
import time
import salt.payload
from salt.exceptions import SaltCacheError
@ -72,6 +77,7 @@ if HAS_ETCD:
log = logging.getLogger(__name__)
client = None
path_prefix = None
_tstamp_suffix = ".tstamp"
# Module properties
@ -94,7 +100,7 @@ def __virtual__():
def _init_client():
"""Setup client and init datastore."""
global client, path_prefix
global client, path_prefix, _tstamp_suffix
if client is not None:
return
@ -111,6 +117,7 @@ def _init_client():
"cert": __opts__.get("etcd.cert", None),
"ca_cert": __opts__.get("etcd.ca_cert", None),
}
_tstamp_suffix = __opts__.get("etcd.timestamp_suffix", _tstamp_suffix)
path_prefix = __opts__.get("etcd.path_prefix", _DEFAULT_PATH_PREFIX)
if path_prefix != "":
path_prefix = "/{}".format(path_prefix.strip("/"))
@ -129,9 +136,11 @@ def store(bank, key, data):
"""
_init_client()
etcd_key = "{}/{}/{}".format(path_prefix, bank, key)
etcd_tstamp_key = "{}/{}/{}".format(path_prefix, bank, key + _tstamp_suffix)
try:
value = salt.payload.dumps(data)
client.write(etcd_key, base64.b64encode(value))
client.write(etcd_tstamp_key, int(time.time()))
except Exception as exc: # pylint: disable=broad-except
raise SaltCacheError(
"There was an error writing the key, {}: {}".format(etcd_key, exc)
@ -162,13 +171,17 @@ def flush(bank, key=None):
_init_client()
if key is None:
etcd_key = "{}/{}".format(path_prefix, bank)
tstamp_key = None
else:
etcd_key = "{}/{}/{}".format(path_prefix, bank, key)
tstamp_key = "{}/{}/{}".format(path_prefix, bank, key + _tstamp_suffix)
try:
client.read(etcd_key)
except etcd.EtcdKeyNotFound:
return # nothing to flush
try:
if tstamp_key:
client.delete(tstamp_key)
client.delete(etcd_key, recursive=True)
except Exception as exc: # pylint: disable=broad-except
raise SaltCacheError(
@ -182,7 +195,10 @@ def _walk(r):
r: etcd.EtcdResult
"""
if not r.dir:
return [r.key.split("/", 3)[3]]
if r.key.endswith(_tstamp_suffix):
return []
else:
return [r.key.rsplit("/", 1)[-1]]
keys = []
for c in client.read(r.key).children:
@ -199,10 +215,12 @@ def ls(bank):
path = "{}/{}".format(path_prefix, bank)
try:
return _walk(client.read(path))
except etcd.EtcdKeyNotFound:
return []
except Exception as exc: # pylint: disable=broad-except
raise SaltCacheError(
'There was an error getting the key "{}": {}'.format(bank, exc)
)
) from exc
def contains(bank, key):
@ -210,14 +228,31 @@ def contains(bank, key):
Checks if the specified bank contains the specified key.
"""
_init_client()
etcd_key = "{}/{}/{}".format(path_prefix, bank, key)
etcd_key = "{}/{}/{}".format(path_prefix, bank, key or "")
try:
r = client.read(etcd_key)
# return True for keys, not dirs
return r.dir is False
# return True for keys, not dirs, unless key is None
return r.dir if key is None else r.dir is False
except etcd.EtcdKeyNotFound:
return False
except Exception as exc: # pylint: disable=broad-except
raise SaltCacheError(
"There was an error getting the key, {}: {}".format(etcd_key, exc)
)
def updated(bank, key):
"""
Return Unix Epoch based timestamp of when the bank/key was updated.
"""
_init_client()
tstamp_key = "{}/{}/{}".format(path_prefix, bank, key + _tstamp_suffix)
try:
value = client.read(tstamp_key).value
return int(value)
except etcd.EtcdKeyNotFound:
return None
except Exception as exc: # pylint: disable=broad-except
raise SaltCacheError(
"There was an error reading the key, {}: {}".format(tstamp_key, exc)
)

View file

@ -6,15 +6,21 @@ Minion data cache plugin for MySQL database.
It is up to the system administrator to set up and configure the MySQL
infrastructure. All is needed for this plugin is a working MySQL server.
The module requires the `salt_cache` database to exists but creates its own
table if needed. The keys are indexed using the `bank` and `etcd_key` columns.
.. warning::
The mysql.database and mysql.table_name will be directly added into certain
queries. Salt treats these as trusted input.
The module requires the database (default ``salt_cache``) to exist but creates
its own table if needed. The keys are indexed using the ``bank`` and
``etcd_key`` columns.
To enable this cache plugin, the master will need the python client for
MySQL installed. This can be easily installed with pip:
.. code-block:: bash
pip install python-mysql
pip install pymysql
Optionally, depending on the MySQL agent configuration, the following values
could be set in the master config. These are the defaults:
@ -28,7 +34,7 @@ could be set in the master config. These are the defaults:
mysql.database: salt_cache
mysql.table_name: cache
Related docs could be found in the `python-mysql documentation`_.
Related docs can be found in the `python-mysql documentation`_.
To use the mysql as a minion data cache backend, set the master ``cache`` config
value to ``mysql``:
@ -90,17 +96,27 @@ def __virtual__():
return bool(MySQLdb), "No python mysql client installed." if MySQLdb is None else ""
def force_reconnect():
"""
Force a reconnection to the MySQL database, by removing the client from
Salt's __context__.
"""
__context__.pop("mysql_client", None)
def run_query(conn, query, args=None, retries=3):
"""
Get a cursor and run a query. Reconnect up to `retries` times if
Get a cursor and run a query. Reconnect up to ``retries`` times if
needed.
Returns: cursor, affected rows counter
Raises: SaltCacheError, AttributeError, OperationalError
"""
if conn is None:
conn = __context__.get("mysql_client")
try:
cur = conn.cursor()
if args is None or args == {}:
if not args:
log.debug("Doing query: %s", query)
out = cur.execute(query)
else:
@ -119,12 +135,19 @@ def run_query(conn, query, args=None, retries=3):
log.info("mysql_cache: recreating db connection due to: %r", e)
__context__["mysql_client"] = MySQLdb.connect(**__context__["mysql_kwargs"])
return run_query(
__context__.get("mysql_client"), query, args=args, retries=(retries - 1)
conn=__context__.get("mysql_client"),
query=query,
args=args,
retries=(retries - 1),
)
except Exception as e: # pylint: disable=broad-except
if len(query) > 150:
query = query[:150] + "<...>"
raise SaltCacheError("Error running {}: {}".format(query, e))
raise SaltCacheError(
"Error running {}{}: {}".format(
query, "- args: {}".format(args) if args else "", e
)
)
def _create_table():
@ -134,20 +157,53 @@ def _create_table():
# Explicitly check if the table already exists as the library logs a
# warning on CREATE TABLE
query = """SELECT COUNT(TABLE_NAME) FROM information_schema.tables
WHERE table_schema = '{}' AND table_name = '{}'""".format(
__context__["mysql_kwargs"]["db"],
__context__["mysql_table_name"],
WHERE table_schema = %s AND table_name = %s"""
cur, _ = run_query(
__context__.get("mysql_client"),
query,
args=(__context__["mysql_kwargs"]["db"], __context__["mysql_table_name"]),
)
cur, _ = run_query(__context__.get("mysql_client"), query)
r = cur.fetchone()
cur.close()
if r[0] == 1:
return
query = """
SELECT COUNT(TABLE_NAME)
FROM
information_schema.columns
WHERE
table_schema = %s
AND table_name = %s
AND column_name = 'last_update'
"""
cur, _ = run_query(
__context__["mysql_client"],
query,
args=(__context__["mysql_kwargs"]["db"], __context__["mysql_table_name"]),
)
r = cur.fetchone()
cur.close()
if r[0] == 1:
return
else:
query = """
ALTER TABLE {}.{}
ADD COLUMN last_update TIMESTAMP NOT NULL
DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP
""".format(
__context__["mysql_kwargs"]["db"], __context__["mysql_table_name"]
)
cur, _ = run_query(__context__["mysql_client"], query)
cur.close()
return
query = """CREATE TABLE IF NOT EXISTS {} (
bank CHAR(255),
etcd_key CHAR(255),
data MEDIUMBLOB,
last_update TIMESTAMP NOT NULL
DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(bank, etcd_key)
);""".format(
__context__["mysql_table_name"]
@ -218,10 +274,10 @@ def fetch(bank, key):
Fetch a key value.
"""
_init_client()
query = "SELECT data FROM {} WHERE bank='{}' AND etcd_key='{}'".format(
__context__["mysql_table_name"], bank, key
query = "SELECT data FROM {} WHERE bank=%s AND etcd_key=%s".format(
__context__["mysql_table_name"]
)
cur, _ = run_query(__context__.get("mysql_client"), query)
cur, _ = run_query(__context__.get("mysql_client"), query, args=(bank, key))
r = cur.fetchone()
cur.close()
if r is None:
@ -234,13 +290,14 @@ def flush(bank, key=None):
Remove the key from the cache bank with all the key content.
"""
_init_client()
query = "DELETE FROM {} WHERE bank='{}'".format(
__context__["mysql_table_name"], bank
)
if key is not None:
query += " AND etcd_key='{}'".format(key)
query = "DELETE FROM {} WHERE bank=%s".format(__context__["mysql_table_name"])
if key is None:
data = (bank,)
else:
data = (bank, key)
query += " AND etcd_key=%s"
cur, _ = run_query(__context__.get("mysql_client"), query)
cur, _ = run_query(__context__["mysql_client"], query, args=data)
cur.close()
@ -250,10 +307,10 @@ def ls(bank):
bank.
"""
_init_client()
query = "SELECT etcd_key FROM {} WHERE bank='{}'".format(
__context__["mysql_table_name"], bank
query = "SELECT etcd_key FROM {} WHERE bank=%s".format(
__context__["mysql_table_name"]
)
cur, _ = run_query(__context__.get("mysql_client"), query)
cur, _ = run_query(__context__.get("mysql_client"), query, args=(bank,))
out = [row[0] for row in cur.fetchall()]
cur.close()
return out
@ -264,10 +321,34 @@ def contains(bank, key):
Checks if the specified bank contains the specified key.
"""
_init_client()
query = "SELECT COUNT(data) FROM {} WHERE bank='{}' AND etcd_key='{}'".format(
__context__["mysql_table_name"], bank, key
)
cur, _ = run_query(__context__.get("mysql_client"), query)
if key is None:
data = (bank,)
query = "SELECT COUNT(data) FROM {} WHERE bank=%s".format(
__context__["mysql_table_name"]
)
else:
data = (bank, key)
query = "SELECT COUNT(data) FROM {} WHERE bank=%s AND etcd_key=%s".format(
__context__["mysql_table_name"]
)
cur, _ = run_query(__context__.get("mysql_client"), query, args=data)
r = cur.fetchone()
cur.close()
return r[0] == 1
def updated(bank, key):
"""
Return the integer Unix epoch update timestamp of the specified bank and
key.
"""
_init_client()
query = (
"SELECT UNIX_TIMESTAMP(last_update) FROM {} WHERE bank=%s "
"AND etcd_key=%s".format(__context__["mysql_table_name"])
)
data = (bank, key)
cur, _ = run_query(__context__["mysql_client"], query=query, args=data)
r = cur.fetchone()
cur.close()
return int(r[0]) if r else r

View file

@ -5,6 +5,7 @@ Redis
Redis plugin for the Salt caching subsystem.
.. versionadded:: 2017.7.0
.. versionchanged:: 3005
As Redis provides a simple mechanism for very fast key-value store, in order to
provide the necessary features for the Salt caching subsystem, the following
@ -36,11 +37,13 @@ the following hierarchy will be built:
127.0.0.1:6379> GET $KEY_root-bank/sub-bank/leaf-bank/my-key
"my-value"
There are three types of keys stored:
- ``$BANK_*`` is a Redis SET containing the list of banks under the current bank
- ``$BANKEYS_*`` is a Redis SET containing the list of keys under the current bank
- ``$KEY_*`` keeps the value of the key
There are four types of keys stored:
- ``$BANK_*`` is a Redis SET containing the list of banks under the current bank.
- ``$BANKEYS_*`` is a Redis SET containing the list of keys under the current bank.
- ``$KEY_*`` keeps the value of the key.
- ``$TSTAMP_*`` stores the last updated timestamp of the key.
These prefixes and the separator can be adjusted using the configuration options:
@ -48,12 +51,17 @@ bank_prefix: ``$BANK``
The prefix used for the name of the Redis key storing the list of sub-banks.
bank_keys_prefix: ``$BANKEYS``
The prefix used for the name of the Redis keyt storing the list of keys under a certain bank.
The prefix used for the name of the Redis key storing the list of keys under a certain bank.
key_prefix: ``$KEY``
The prefix of the Redis keys having the value of the keys to be cached under
a certain bank.
timestamp_prefix: ``$TSTAMP``
The prefix for the last modified timestamp for keys.
.. versionadded:: 3005
separator: ``_``
The separator between the prefix and the key body.
@ -114,6 +122,7 @@ Configuration Example:
cache.redis.bank_prefix: #BANK
cache.redis.bank_keys_prefix: #BANKEYS
cache.redis.key_prefix: #KEY
cache.redis.timestamp_prefix: #TICKS
cache.redis.separator: '@'
Cluster Configuration Example:
@ -136,9 +145,12 @@ Cluster Configuration Example:
"""
import itertools
import logging
import time
import salt.payload
import salt.utils.stringutils
from salt.exceptions import SaltCacheError
# Import salt
@ -153,10 +165,7 @@ except ImportError:
HAS_REDIS = False
try:
# pylint: disable=no-name-in-module
from rediscluster import StrictRedisCluster
# pylint: enable=no-name-in-module
from rediscluster import RedisCluster # pylint: disable=no-name-in-module
HAS_REDIS_CLUSTER = True
except ImportError:
@ -174,6 +183,7 @@ log = logging.getLogger(__file__)
_BANK_PREFIX = "$BANK"
_KEY_PREFIX = "$KEY"
_TIMESTAMP_PREFIX = "$TSTAMP"
_BANK_KEYS_PREFIX = "$BANKEYS"
_SEPARATOR = "_"
@ -203,6 +213,9 @@ def __virtual__():
def init_kwargs(kwargs):
"""
Effectively a noop. Return an empty dictionary.
"""
return {}
@ -236,7 +249,7 @@ def _get_redis_server(opts=None):
opts = _get_redis_cache_opts()
if opts["cluster_mode"]:
REDIS_SERVER = StrictRedisCluster(
REDIS_SERVER = RedisCluster(
startup_nodes=opts["startup_nodes"],
skip_full_coverage_check=opts["skip_full_coverage_check"],
)
@ -262,6 +275,9 @@ def _get_redis_keys_opts():
),
"key_prefix": __opts__.get("cache.redis.key_prefix", _KEY_PREFIX),
"separator": __opts__.get("cache.redis.separator", _SEPARATOR),
"timestamp_prefix": __opts__.get(
"cache.redis.timestamp_prefix", _TIMESTAMP_PREFIX
),
}
@ -275,13 +291,25 @@ def _get_bank_redis_key(bank):
)
def _get_timestamp_key(bank, key):
opts = _get_redis_keys_opts()
return "{}{}{}/{}".format(
opts["timestamp_prefix"], opts["separator"], {bank}, {key}
)
# Use this line when we can use modern python
# return f"{opts['timestamp_prefix']}{opts['separator']}{bank}/{key}"
def _get_key_redis_key(bank, key):
"""
Return the Redis key given the bank name and the key name.
"""
opts = _get_redis_keys_opts()
return "{prefix}{separator}{bank}/{key}".format(
prefix=opts["key_prefix"], separator=opts["separator"], bank=bank, key=key
prefix=opts["key_prefix"],
separator=opts["separator"],
bank=bank,
key=salt.utils.stringutils.to_str(key),
)
@ -302,16 +330,14 @@ def _build_bank_hier(bank, redis_pipe):
It's using the Redis pipeline,
so there will be only one interaction with the remote server.
"""
bank_list = bank.split("/")
parent_bank_path = bank_list[0]
for bank_name in bank_list[1:]:
prev_bank_redis_key = _get_bank_redis_key(parent_bank_path)
redis_pipe.sadd(prev_bank_redis_key, bank_name)
log.debug("Adding %s to %s", bank_name, prev_bank_redis_key)
parent_bank_path = "{curr_path}/{bank_name}".format(
curr_path=parent_bank_path, bank_name=bank_name
) # this becomes the parent of the next
return True
def joinbanks(*banks):
return "/".join(banks)
for bank_path in itertools.accumulate(bank.split("/"), joinbanks):
bank_set = _get_bank_redis_key(bank_path)
log.debug("Adding %s to %s", bank, bank_set)
redis_pipe.sadd(bank_set, ".")
def _get_banks_to_remove(redis_server, bank, path=""):
@ -355,6 +381,11 @@ def store(bank, key, data):
redis_pipe.set(redis_key, value)
log.debug("Setting the value for %s under %s (%s)", key, bank, redis_key)
redis_pipe.sadd(redis_bank_keys, key)
# localfs cache truncates the timestamp to int only. We'll do the same.
redis_pipe.set(
_get_timestamp_key(bank=bank, key=key),
salt.payload.dumps(int(time.time())),
)
log.debug("Adding %s to %s", key, redis_bank_keys)
redis_pipe.execute()
except (RedisConnectionError, RedisResponseError) as rerr:
@ -442,6 +473,8 @@ def flush(bank, key=None):
for key in bank_keys:
redis_key = _get_key_redis_key(bank_path, key)
redis_pipe.delete(redis_key) # kill 'em all!
timestamp_key = _get_timestamp_key(bank=bank_path, key=key.decode())
redis_pipe.delete(timestamp_key)
log.debug(
"Removing the key %s under the %s bank (%s)",
key,
@ -464,6 +497,8 @@ def flush(bank, key=None):
else:
redis_key = _get_key_redis_key(bank, key)
redis_pipe.delete(redis_key) # delete the key cached
timestamp_key = _get_timestamp_key(bank=bank, key=key)
redis_pipe.delete(timestamp_key)
log.debug("Removing the key %s under the %s bank (%s)", key, bank, redis_key)
bank_keys_redis_key = _get_bank_keys_redis_key(bank)
redis_pipe.srem(bank_keys_redis_key, key)
@ -490,7 +525,7 @@ def list_(bank):
Lists entries stored in the specified bank.
"""
redis_server = _get_redis_server()
bank_redis_key = _get_bank_redis_key(bank)
bank_redis_key = _get_bank_keys_redis_key(bank)
try:
banks = redis_server.smembers(bank_redis_key)
except (RedisConnectionError, RedisResponseError) as rerr:
@ -501,7 +536,7 @@ def list_(bank):
raise SaltCacheError(mesg)
if not banks:
return []
return list(banks)
return [bank.decode() for bank in banks if bank != b"."]
def contains(bank, key):
@ -509,12 +544,31 @@ def contains(bank, key):
Checks if the specified bank contains the specified key.
"""
redis_server = _get_redis_server()
bank_redis_key = _get_bank_redis_key(bank)
bank_redis_key = _get_bank_keys_redis_key(bank)
try:
return redis_server.sismember(bank_redis_key, key)
if key is None:
return (
salt.utils.stringutils.to_str(redis_server.type(bank_redis_key))
!= "none"
)
else:
return redis_server.sismember(bank_redis_key, key)
except (RedisConnectionError, RedisResponseError) as rerr:
mesg = "Cannot retrieve the Redis cache key {rkey}: {rerr}".format(
rkey=bank_redis_key, rerr=rerr
)
log.error(mesg)
raise SaltCacheError(mesg)
def updated(bank, key):
"""
Return the Unix Epoch timestamp of when the key was last updated. Return
None if key is not found.
"""
redis_server = _get_redis_server()
timestamp_key = _get_timestamp_key(bank=bank, key=key)
value = redis_server.get(timestamp_key)
if value is not None:
value = salt.payload.loads(value)
return value

View file

@ -28,7 +28,6 @@ MISSING_DOCSTRINGS = {
"salt/beacons/salt_monitor.py": ["validate", "beacon"],
"salt/beacons/watchdog.py": ["close", "to_salt_event"],
"salt/cache/localfs.py": ["get_storage_id", "init_kwargs"],
"salt/cache/redis_cache.py": ["init_kwargs"],
"salt/cloud/clouds/clc.py": [
"get_creds",
"get_configured_provider",

View file

View file

@ -0,0 +1,11 @@
import pytest
@pytest.fixture(scope="module")
def states(loaders):
return loaders.states
@pytest.fixture(scope="module")
def modules(loaders):
return loaders.modules

View file

@ -0,0 +1,785 @@
import logging
import os
import shutil
import socket
import time
import pytest
import salt.cache
import salt.loader
from salt.exceptions import SaltCacheError
from saltfactories.utils import random_string
from saltfactories.utils.ports import get_unused_localhost_port
from tests.support.mock import MagicMock, patch
docker = pytest.importorskip("docker")
log = logging.getLogger(__name__)
pytestmark = [
pytest.mark.slow_test,
pytest.mark.skip_if_binaries_missing("dockerd"),
]
# TODO: add out-of-band (i.e. not via the API) additions to the cache -W. Werner, 2021-09-28
# TODO: in PR request opinion: is it better to double serialize the data, e.g.
# store -> __context__['serial'].dumps({"timestamp": tstamp, "value": __context__['serial'].dumps(value)})
# or is the existing approach of storing timestamp as a secondary key a good one???
# ??? Is one slower than the other?
# TODO: Is there a better approach for waiting until the container is fully running? -W. Werner, 2021-07-27
class Timer:
def __init__(self, timeout=20):
self.start = time.time()
self.timeout = timeout
@property
def expired(self):
return time.time() - self.start > self.timeout
@pytest.fixture(scope="module")
def etcd_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def redis_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def consul_port():
return get_unused_localhost_port()
# GIVE ME FIXTURES ON FIXTURES NOW
@pytest.fixture(scope="module")
def mysql_5_6_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def mysql_5_7_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def mysql_8_0_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def mariadb_10_1_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def mariadb_10_2_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def mariadb_10_3_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def mariadb_10_4_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def mariadb_10_5_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def percona_5_5_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def percona_5_6_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def percona_5_7_port():
return get_unused_localhost_port()
@pytest.fixture(scope="module")
def percona_8_0_port():
return get_unused_localhost_port()
# TODO: We should probably be building our own etcd docker image - fine to base it off of this one (or... others) -W. Werner, 2021-07-27
@pytest.fixture(scope="module")
def etcd_apiv2_container(salt_factories, docker_client, etcd_port):
container = salt_factories.get_container(
random_string("etcd-server-"),
image_name="elcolio/etcd",
docker_client=docker_client,
check_ports=[etcd_port],
container_run_kwargs={
"environment": {"ALLOW_NONE_AUTHENTICATION": "yes"},
"ports": {"2379/tcp": etcd_port},
},
)
with container.started() as factory:
yield factory
@pytest.fixture(scope="module")
def redis_container(salt_factories, docker_client, redis_port, docker_redis_image):
container = salt_factories.get_container(
random_string("redis-server-"),
image_name=docker_redis_image,
docker_client=docker_client,
check_ports=[redis_port],
container_run_kwargs={"ports": {"6379/tcp": redis_port}},
)
with container.started() as factory:
yield factory
# Pytest does not have the ability to parametrize fixtures with parametriezed
# fixtures, which is super annoying. In other words, in order to have a `cache`
# test fixture that takes different versions of the cache that depend on
# different docker images, I've gotta make up fixtures for each
# image+container. When https://github.com/pytest-dev/pytest/issues/349 is
# actually fixed then we can go ahead and refactor all of these mysql
# containers, caches, and their images into a single parametrized fixture.
def start_mysql_container(
salt_factories, docker_client, mysql_port, docker_mysql_image
):
container = salt_factories.get_container(
random_string("mysql-server-"),
image_name=docker_mysql_image,
docker_client=docker_client,
check_ports=[mysql_port],
container_run_kwargs={
"environment": {
"MYSQL_ROOT_PASSWORD": "fnord",
"MYSQL_ROOT_HOST": "%",
},
"ports": {"3306/tcp": mysql_port},
},
)
return container.started()
@pytest.fixture(scope="module")
def mysql_5_6_container(
salt_factories, docker_client, mysql_5_6_port, docker_mysql_5_6_image
):
with start_mysql_container(
salt_factories, docker_client, mysql_5_6_port, docker_mysql_5_6_image
) as factory:
yield factory
@pytest.fixture(scope="module")
def mysql_5_7_container(
salt_factories, docker_client, mysql_5_7_port, docker_mysql_5_7_image
):
with start_mysql_container(
salt_factories, docker_client, mysql_5_7_port, docker_mysql_5_7_image
) as factory:
yield factory
@pytest.fixture(scope="module")
def mysql_8_0_container(
salt_factories, docker_client, mysql_8_0_port, docker_mysql_8_0_image
):
with start_mysql_container(
salt_factories, docker_client, mysql_8_0_port, docker_mysql_8_0_image
) as factory:
yield factory
@pytest.fixture(scope="module")
def mariadb_10_1_container(
salt_factories, docker_client, mariadb_10_1_port, docker_mariadb_10_1_image
):
with start_mysql_container(
salt_factories, docker_client, mariadb_10_1_port, docker_mariadb_10_1_image
) as factory:
yield factory
@pytest.fixture(scope="module")
def mariadb_10_2_container(
salt_factories, docker_client, mariadb_10_2_port, docker_mariadb_10_2_image
):
with start_mysql_container(
salt_factories, docker_client, mariadb_10_2_port, docker_mariadb_10_2_image
) as factory:
yield factory
@pytest.fixture(scope="module")
def mariadb_10_3_container(
salt_factories, docker_client, mariadb_10_3_port, docker_mariadb_10_3_image
):
with start_mysql_container(
salt_factories, docker_client, mariadb_10_3_port, docker_mariadb_10_3_image
) as factory:
yield factory
@pytest.fixture(scope="module")
def mariadb_10_4_container(
salt_factories, docker_client, mariadb_10_4_port, docker_mariadb_10_4_image
):
with start_mysql_container(
salt_factories, docker_client, mariadb_10_4_port, docker_mariadb_10_4_image
) as factory:
yield factory
@pytest.fixture(scope="module")
def mariadb_10_5_container(
salt_factories, docker_client, mariadb_10_5_port, docker_mariadb_10_5_image
):
with start_mysql_container(
salt_factories, docker_client, mariadb_10_5_port, docker_mariadb_10_5_image
) as factory:
yield factory
@pytest.fixture(scope="module")
def percona_5_5_container(
salt_factories, docker_client, percona_5_5_port, docker_percona_5_5_image
):
with start_mysql_container(
salt_factories, docker_client, percona_5_5_port, docker_percona_5_5_image
) as factory:
yield factory
@pytest.fixture(scope="module")
def percona_5_6_container(
salt_factories, docker_client, percona_5_6_port, docker_percona_5_6_image
):
with start_mysql_container(
salt_factories, docker_client, percona_5_6_port, docker_percona_5_6_image
) as factory:
yield factory
@pytest.fixture(scope="module")
def percona_5_7_container(
salt_factories, docker_client, percona_5_7_port, docker_percona_5_7_image
):
with start_mysql_container(
salt_factories, docker_client, percona_5_7_port, docker_percona_5_7_image
) as factory:
yield factory
@pytest.fixture(scope="module")
def percona_8_0_container(
salt_factories, docker_client, percona_8_0_port, docker_percona_8_0_image
):
with start_mysql_container(
salt_factories, docker_client, percona_8_0_port, docker_percona_8_0_image
) as factory:
yield factory
@pytest.fixture(scope="module")
def consul_container(salt_factories, docker_client, consul_port, docker_consul_image):
container = salt_factories.get_container(
random_string("consul-server-"),
image_name=docker_consul_image,
docker_client=docker_client,
check_ports=[consul_port],
container_run_kwargs={"ports": {"8500/tcp": consul_port}},
)
with container.started() as factory:
# TODO: May want to do the same thing for redis to ensure that service is up & running
# TODO: THIS IS HORRIBLE. THERE ARE BETTER WAYS TO DETECT SERVICE IS UP -W. Werner, 2021-10-12
timer = Timer(timeout=10)
sleeptime = 0.1
while not timer.expired:
try:
with socket.create_connection(
("localhost", consul_port), timeout=1
) as cli:
cli.send(b"GET /v1/kv/fnord HTTP/1.1\n\n")
cli.recv(2048)
break
except ConnectionResetError as e:
if e.errno == 104:
time.sleep(sleeptime)
sleeptime += sleeptime
else:
assert False, "Timer expired before connecting to consul"
yield factory
@pytest.fixture
def redis_cache(minion_opts, redis_port, redis_container):
opts = minion_opts.copy()
opts["cache"] = "redis"
opts["cache.redis.host"] = "127.0.0.1"
opts["cache.redis.port"] = redis_port
# NOTE: If you would like to ensure that alternate prefixes are properly
# tested, simply change these values and re-run the tests.
opts["cache.redis.bank_prefix"] = "#BANKY_BANK"
opts["cache.redis.bank_keys_prefix"] = "#WHO_HAS_MY_KEYS"
opts["cache.redis.key_prefix"] = "#LPL"
opts["cache.redis.timestamp_prefix"] = "%TICK_TOCK"
opts["cache.redis.separator"] = "\N{SNAKE}"
cache = salt.cache.factory(opts)
yield cache
@pytest.fixture(scope="module", autouse="true")
def ensure_deps(states):
installation_result = states.pip.installed(
name="fnord",
pkgs=["python-etcd", "redis", "redis-py-cluster", "python-consul", "pymysql"],
)
assert (
installation_result.result is True
), "unable to pip install requirements {}".format(installation_result.comment)
@pytest.fixture
def etcd_cache(minion_opts, etcd_port, etcd_apiv2_container):
opts = minion_opts.copy()
opts["cache"] = "etcd"
opts["etcd.host"] = "127.0.0.1"
opts["etcd.port"] = etcd_port
opts["etcd.protocol"] = "http"
# NOTE: If you would like to ensure that alternate suffixes are properly
# tested, simply change this value and re-run the tests.
opts["etcd.timestamp_suffix"] = ".frobnosticate"
cache = salt.cache.factory(opts)
yield cache
@pytest.fixture
def localfs_cache(minion_opts):
opts = minion_opts.copy()
opts["cache"] = "localfs"
cache = salt.cache.factory(opts)
yield cache
shutil.rmtree(opts["cachedir"], ignore_errors=True)
@pytest.fixture
def consul_cache(minion_opts, consul_port, consul_container):
opts = minion_opts.copy()
opts["cache"] = "consul"
opts["consul.host"] = "127.0.0.1"
opts["consul.port"] = consul_port
# NOTE: If you would like to ensure that alternate suffixes are properly
# tested, simply change this value and re-run the tests.
opts["consul.timestamp_suffix"] = ".frobnosticate"
cache = salt.cache.factory(opts)
yield cache
def fixy(minion_opts, mysql_port, mysql_container):
# We're doing a late import because we need access to the exception
import salt.cache.mysql_cache
# The container can be available before mysql actually is
mysql_container.container.exec_run(
[
"/bin/sh",
"-c",
'while ! mysql -u root -pfnord -e "SELECT 1;" >/dev/null; do sleep 1; done',
],
)
# Gotta make the db we're going to use
res = mysql_container.container.exec_run(
[
"/bin/sh",
"-c",
'echo "create database salt_cache;" | mysql -u root -pfnord ',
],
)
opts = minion_opts.copy()
opts["cache"] = "mysql"
opts["mysql.host"] = "127.0.0.1"
opts["mysql.port"] = mysql_port
opts["mysql.user"] = "root"
opts["mysql.password"] = "fnord"
opts["mysql.database"] = "salt_cache"
opts["mysql.table_name"] = "cache"
cache = salt.cache.factory(opts)
# For some reason even though mysql is available in the container, we
# can't reliably connect outside the container. Wait for access - but we
# may need a new cache...
timer = Timer(timeout=15)
while not timer.expired:
try:
# Doesn't matter what. We just have to execute so that we spin
# here until we can actually connect to the db instance.
cache.modules["mysql.list"]("salt_cache")
except salt.cache.mysql_cache.MySQLdb.DatabaseError:
# We don't really care what MySQL error is happening -
pass
else:
break
else:
if os.environ.get("CI_RUN"):
pytest.skip('Timer expired before "select 1;" worked')
else:
assert False, 'Timer expired before "select 1;" worked'
# This ensures that we will correctly alter any existing mysql tables for
# current mysql cache users. Without completely altering the mysql_cache
# implementation there's no real other reasonable way to reset the client
# and force the alter_table to be called. Resetting the client to `None` is
# what triggers the implementation to allow the ALTER TABLE to add the
# last_update column
run_query = cache.modules["mysql.run_query"]
run_query(
conn=None,
query="ALTER TABLE salt_cache.cache DROP COLUMN last_update",
)[0].fetchone()
cache.modules["mysql.force_reconnect"]()
return cache
# See container comment above >:(
@pytest.fixture(scope="module")
def mysql_5_6_cache(minion_opts, mysql_5_6_port, mysql_5_6_container):
yield fixy(minion_opts, mysql_5_6_port, mysql_5_6_container)
@pytest.fixture(scope="module")
def mysql_5_7_cache(minion_opts, mysql_5_7_port, mysql_5_7_container):
yield fixy(minion_opts, mysql_5_7_port, mysql_5_7_container)
@pytest.fixture(scope="module")
def mysql_8_0_cache(minion_opts, mysql_8_0_port, mysql_8_0_container):
yield fixy(minion_opts, mysql_8_0_port, mysql_8_0_container)
@pytest.fixture(scope="module")
def mariadb_10_1_cache(minion_opts, mariadb_10_1_port, mariadb_10_1_container):
yield fixy(minion_opts, mariadb_10_1_port, mariadb_10_1_container)
@pytest.fixture(scope="module")
def mariadb_10_2_cache(minion_opts, mariadb_10_2_port, mariadb_10_2_container):
yield fixy(minion_opts, mariadb_10_2_port, mariadb_10_2_container)
@pytest.fixture(scope="module")
def mariadb_10_3_cache(minion_opts, mariadb_10_3_port, mariadb_10_3_container):
yield fixy(minion_opts, mariadb_10_3_port, mariadb_10_3_container)
@pytest.fixture(scope="module")
def mariadb_10_4_cache(minion_opts, mariadb_10_4_port, mariadb_10_4_container):
yield fixy(minion_opts, mariadb_10_4_port, mariadb_10_4_container)
@pytest.fixture(scope="module")
def mariadb_10_5_cache(minion_opts, mariadb_10_5_port, mariadb_10_5_container):
yield fixy(minion_opts, mariadb_10_5_port, mariadb_10_5_container)
@pytest.fixture(scope="module")
def percona_5_5_cache(minion_opts, percona_5_5_port, percona_5_5_container):
yield fixy(minion_opts, percona_5_5_port, percona_5_5_container)
@pytest.fixture(scope="module")
def percona_5_6_cache(minion_opts, percona_5_6_port, percona_5_6_container):
yield fixy(minion_opts, percona_5_6_port, percona_5_6_container)
@pytest.fixture(scope="module")
def percona_5_7_cache(minion_opts, percona_5_7_port, percona_5_7_container):
yield fixy(minion_opts, percona_5_7_port, percona_5_7_container)
@pytest.fixture(scope="module")
def percona_8_0_cache(minion_opts, percona_8_0_port, percona_8_0_container):
yield fixy(minion_opts, percona_8_0_port, percona_8_0_container)
# TODO: Figure out how to parametrize this in combo with the getfixturevalue process -W. Werner, 2021-10-28
@pytest.fixture
def memcache_cache(minion_opts):
opts = minion_opts.copy()
opts["memcache_expire_seconds"] = 42
cache = salt.cache.factory(opts)
yield cache
@pytest.fixture(
params=[
"localfs_cache",
"redis_cache",
"etcd_cache",
"consul_cache",
"mysql_5_6_cache",
"mysql_5_7_cache",
"mysql_8_0_cache",
"mariadb_10_1_cache",
"mariadb_10_2_cache",
"mariadb_10_3_cache",
"mariadb_10_4_cache",
"mariadb_10_5_cache",
"percona_5_5_cache",
"percona_5_6_cache",
"percona_5_7_cache",
"percona_8_0_cache",
"memcache_cache", # Memcache actually delegates some behavior to the backing cache which alters the API somewhat.
]
)
def cache(request):
# This is not an ideal way to get the particular cache type but
# it's currently what we have available. It behaves *very* badly when
# attempting to parametrize these fixtures. Don't ask me how I known.
yield request.getfixturevalue(request.param)
def test_caching(subtests, cache):
bank = "fnord/kevin/stuart"
# ^^^^ This bank can be just fnord, or fnord/foo, or any mildly reasonable
# or possibly unreasonably nested names.
#
# No. Seriously. Try import string; bank = '/'.join(string.ascii_letters)
# - it works!
# import string; bank = "/".join(string.ascii_letters)
good_key = "roscivs"
bad_key = "monkey"
with subtests.test("non-existent bank should be empty on cache start"):
assert not cache.contains(bank=bank)
assert cache.list(bank=bank) == []
with subtests.test("after storing key in bank it should be in cache list"):
cache.store(bank=bank, key=good_key, data=b"\x01\x04\x05fnordy data")
assert cache.list(bank) == [good_key]
with subtests.test("after storing value, it should be fetchable"):
expected_data = "trombone pleasantry"
cache.store(bank=bank, key=good_key, data=expected_data)
assert cache.fetch(bank=bank, key=good_key) == expected_data
with subtests.test("bad key should still be absent from cache"):
assert cache.fetch(bank=bank, key=bad_key) == {}
with subtests.test("storing new value should update it"):
# Double check that the data was still the old stuff
old_data = expected_data
assert cache.fetch(bank=bank, key=good_key) == old_data
new_data = "stromboli"
cache.store(bank=bank, key=good_key, data=new_data)
assert cache.fetch(bank=bank, key=good_key) == new_data
with subtests.test("storing complex object works"):
new_thing = {
"some": "data",
42: "wheee",
"some other": {"sub": {"objects": "here"}},
}
cache.store(bank=bank, key=good_key, data=new_thing)
actual_thing = cache.fetch(bank=bank, key=good_key)
if isinstance(cache, salt.cache.MemCache):
# MemCache should actually store the object - everything else
# should create a copy of it.
assert actual_thing is new_thing
else:
assert actual_thing is not new_thing
assert actual_thing == new_thing
with subtests.test("contains returns true if key in bank"):
assert cache.contains(bank=bank, key=good_key)
with subtests.test("contains returns true if bank exists and key is None"):
assert cache.contains(bank=bank, key=None)
with subtests.test(
"contains returns False when bank not in cache and/or key not in bank"
):
assert not cache.contains(bank=bank, key=bad_key)
assert not cache.contains(bank="nonexistent", key=good_key)
assert not cache.contains(bank="nonexistent", key=bad_key)
assert not cache.contains(bank="nonexistent", key=None)
with subtests.test("flushing nonexistent key should not remove other keys"):
cache.flush(bank=bank, key=bad_key)
assert cache.contains(bank=bank, key=good_key)
with subtests.test(
"flushing existing key should not remove bank if no more keys exist"
):
pytest.skip(
"This is impossible with redis. Should we make localfs behave the same way?"
)
cache.flush(bank=bank, key=good_key)
assert cache.contains(bank=bank)
assert cache.list(bank=bank) == []
with subtests.test(
"after existing key is flushed updated should not return a timestamp for that key"
):
cache.store(bank=bank, key=good_key, data="fnord")
cache.flush(bank=bank, key=good_key)
timestamp = cache.updated(bank=bank, key=good_key)
assert timestamp is None
with subtests.test(
"after flushing bank containing a good key, updated should not return a timestamp for that key"
):
cache.store(bank=bank, key=good_key, data="fnord")
cache.flush(bank=bank, key=None)
timestamp = cache.updated(bank=bank, key=good_key)
assert timestamp is None
with subtests.test("flushing bank with None as key should remove bank"):
cache.flush(bank=bank, key=None)
assert not cache.contains(bank=bank)
with subtests.test("Exception should happen when flushing None bank"):
# This bit is maybe an accidental API, but currently there is no
# protection at least with the localfs cache when bank is None. If
# bank is None we try to `os.path.normpath` the bank, which explodes
# and is at least the current behavior. If we want to change that
# this test should change. Or be removed altogether.
# TODO: this should actually not raise. Not sure if there's a test that we can do here... or just call the code which will fail if there's actually an exception. -W. Werner, 2021-09-28
pytest.skip(
"Skipping for now - etcd, redis, and mysql do not raise. Should ensure all backends behave consistently"
)
with pytest.raises(Exception):
cache.flush(bank=None, key=None)
with subtests.test("Updated for non-existent key should return None"):
timestamp = cache.updated(bank="nonexistent", key="whatever")
assert timestamp is None
with subtests.test("Updated for key should return a reasonable time"):
before_storage = int(time.time())
cache.store(bank="fnord", key="updated test part 2", data="fnord")
after_storage = int(time.time())
timestamp = cache.updated(bank="fnord", key="updated test part 2")
assert before_storage <= timestamp <= after_storage
with subtests.test(
"If the module raises SaltCacheError then it should make it out of updated"
):
with patch.dict(
cache.modules._dict,
{"{}.updated".format(cache.driver): MagicMock(side_effect=SaltCacheError)},
), pytest.raises(SaltCacheError):
cache.updated(bank="kaboom", key="oops")
with subtests.test(
"cache.cache right after a value is cached should not update the cache"
):
expected_value = "some cool value yo"
cache.store(bank=bank, key=good_key, data=expected_value)
result = cache.cache(
bank=bank,
key=good_key,
fun=lambda **kwargs: "bad bad value no good",
value="some other value?",
loop_fun=lambda x: "super very no good bad",
)
fetch_result = cache.fetch(bank=bank, key=good_key)
assert result == fetch_result == expected_value
with subtests.test(
"cache.cache should update the value with the result of fun when value was updated longer than expiration",
), patch(
"salt.cache.Cache.updated",
return_value=42, # Dec 31, 1969... time to update the cache!
autospec=True,
):
expected_value = "this is the return value woo woo woo"
cache.store(bank=bank, key=good_key, data="not this value")
cache_result = cache.cache(
bank=bank, key=good_key, fun=lambda *args, **kwargs: expected_value
)
fetch_result = cache.fetch(bank=bank, key=good_key)
assert cache_result == fetch_result == expected_value
with subtests.test(
"cache.cache should update the value with all of the outputs from loop_fun if loop_fun was provided",
), patch(
"salt.cache.Cache.updated",
return_value=42,
autospec=True,
):
expected_value = "SOME HUGE STRING OKAY?"
cache.store(bank=bank, key=good_key, data="nope, not me")
cache_result = cache.cache(
bank=bank,
key=good_key,
fun=lambda **kwargs: "some huge string okay?",
loop_fun=str.upper,
)
fetch_result = cache.fetch(bank=bank, key=good_key)
assert cache_result == fetch_result
assert "".join(fetch_result) == expected_value
with subtests.test(
"cache.cache should update the value if the stored value is empty but present and expiry is way in the future"
), patch(
"salt.cache.Cache.updated",
return_value=time.time() * 2,
autospec=True,
):
# Unclear if this was intended behavior: currently any falsey data will
# be updated by cache.cache. If this is incorrect, this test should
# be updated or removed.
expected_data = "some random string whatever"
for empty in ("", (), [], {}, 0, 0.0, False, None):
with subtests.test(empty=empty):
cache.store(
bank=bank, key=good_key, data=empty
) # empty chairs and empty data
cache_result = cache.cache(
bank=bank, key=good_key, fun=lambda **kwargs: expected_data
)
fetch_result = cache.fetch(bank=bank, key=good_key)
assert cache_result == fetch_result == expected_data
with subtests.test("cache.cache should store a value if it does not exist"):
expected_result = "some result plz"
cache.flush(bank=bank, key=None)
assert cache.fetch(bank=bank, key=good_key) == {}
cache_result = cache.cache(
bank=bank, key=good_key, fun=lambda **kwargs: expected_result
)
fetch_result = cache.fetch(bank=bank, key=good_key)
assert cache_result == fetch_result
assert fetch_result == expected_result
assert cache_result == fetch_result == expected_result

View file

@ -0,0 +1,32 @@
import pytest
import salt.cache
@pytest.fixture
def redis_cluster_cache(minion_opts):
opts = minion_opts.copy()
opts["cache"] = "redis"
opts["cache.redis.cluster_mode"] = True
cache = salt.cache.factory(opts)
yield cache
@pytest.fixture(scope="module", autouse="true")
def ensure_deps(states):
installation_result = states.pip.installed(
name="fnord", pkgs=["redis", "redis-py-cluster"]
)
assert (
installation_result.result is True
), "unable to pip install requirements {}".format(installation_result.comment)
def test_redis_cluster_cache_should_import_correctly(redis_cluster_cache):
import rediscluster.exceptions
with pytest.raises(rediscluster.exceptions.RedisClusterException):
# Currently the opts aren't actually correct for a redis cluster
# so this will fail. If, in the future, the redis_cluster_cache fixture
# needs to point to an actual redis cluster, then this test will
# probably become obsolete
redis_cluster_cache.store(bank="foo", key="whatever", data="lol")

View file

@ -3,8 +3,17 @@ import pathlib
import shutil
import pytest
from saltfactories.daemons.container import Container
from saltfactories.utils.functional import Loaders
try:
import docker
except ImportError:
# Test suites depending on docker should be using
# docker = pytest.importorskip("docker")
# so any fixtures using docker shouldn't ever be called or used.
pass
log = logging.getLogger(__name__)
@ -90,3 +99,104 @@ def reset_loaders_state(loaders):
finally:
# Reset the loaders state
loaders.reset_state()
@pytest.fixture(scope="module")
def docker_client():
try:
client = docker.from_env()
except docker.errors.DockerException:
pytest.skip("Failed to get a connection to docker running on the system")
connectable = Container.client_connectable(client)
if connectable is not True: # pragma: nocover
pytest.skip(connectable)
return client
def pull_or_skip(image_name, docker_client):
try:
docker_client.images.pull(image_name)
except docker.errors.APIError as exc:
pytest.skip("Failed to pull docker image {!r}: {}".format(image_name, exc))
except ImportError:
pytest.skip("docker module was not installed")
return image_name
@pytest.fixture(scope="module")
def docker_redis_image(docker_client):
return pull_or_skip("redis:alpine", docker_client)
@pytest.fixture(scope="module")
def docker_consul_image(docker_client):
return pull_or_skip("consul:latest", docker_client)
# Pytest does not have the ability to parametrize fixtures with parametriezed
# fixtures, which is super annoying. In other words, in order to have a `cache`
# test fixture that takes different versions of the cache that depend on
# different docker images, I've gotta make up fixtures for each
# image+container. When https://github.com/pytest-dev/pytest/issues/349 is
# actually fixed then we can go ahead and refactor all of these mysql images
# (and container fixtures depending on it) into a single fixture.
@pytest.fixture(scope="module")
def docker_mysql_5_6_image(docker_client):
return pull_or_skip("mysql/mysql-server:5.6", docker_client)
@pytest.fixture(scope="module")
def docker_mysql_5_7_image(docker_client):
return pull_or_skip("mysql/mysql-server:5.7", docker_client)
@pytest.fixture(scope="module")
def docker_mysql_8_0_image(docker_client):
return pull_or_skip("mysql/mysql-server:8.0", docker_client)
@pytest.fixture(scope="module")
def docker_mariadb_10_1_image(docker_client):
return pull_or_skip("mariadb:10.1", docker_client)
@pytest.fixture(scope="module")
def docker_mariadb_10_2_image(docker_client):
return pull_or_skip("mariadb:10.2", docker_client)
@pytest.fixture(scope="module")
def docker_mariadb_10_3_image(docker_client):
return pull_or_skip("mariadb:10.3", docker_client)
@pytest.fixture(scope="module")
def docker_mariadb_10_4_image(docker_client):
return pull_or_skip("mariadb:10.4", docker_client)
@pytest.fixture(scope="module")
def docker_mariadb_10_5_image(docker_client):
return pull_or_skip("mariadb:10.5", docker_client)
@pytest.fixture(scope="module")
def docker_percona_5_5_image(docker_client):
return pull_or_skip("percona:5.5", docker_client)
@pytest.fixture(scope="module")
def docker_percona_5_6_image(docker_client):
return pull_or_skip("percona:5.6", docker_client)
@pytest.fixture(scope="module")
def docker_percona_5_7_image(docker_client):
return pull_or_skip("percona:5.7", docker_client)
@pytest.fixture(scope="module")
def docker_percona_8_0_image(docker_client):
return pull_or_skip("percona:8.0", docker_client)

View file

@ -135,7 +135,11 @@ def test_flush():
with patch.object(mysql_cache, "run_query") as mock_run_query:
expected_calls = [
call(mock_connect_client, "DELETE FROM salt WHERE bank='bank'")
call(
mock_connect_client,
"DELETE FROM salt WHERE bank=%s",
args=("bank",),
),
]
mock_run_query.return_value = (MagicMock(), "")
mysql_cache.flush(bank="bank")
@ -144,7 +148,8 @@ def test_flush():
expected_calls = [
call(
mock_connect_client,
"DELETE FROM salt WHERE bank='bank' AND etcd_key='key'",
"DELETE FROM salt WHERE bank=%s AND etcd_key=%s",
args=("bank", "key"),
)
]
mysql_cache.flush(bank="bank", key="key")
@ -219,6 +224,9 @@ def test_create_table(master_config):
bank CHAR(255),
etcd_key CHAR(255),
data MEDIUMBLOB,
last_update TIMESTAMP NOT NULL
DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(bank, etcd_key)
);"""
expected_calls = [call(mock_connect_client, sql_call)]