mirror of
https://github.com/saltstack/salt.git
synced 2025-04-16 09:40:20 +00:00
Add option to use a fresh connection for mysql cache
This commit is contained in:
parent
9539c31d48
commit
7da18aa93a
3 changed files with 24 additions and 8 deletions
1
changelog/63991.fixed.md
Normal file
1
changelog/63991.fixed.md
Normal file
|
@ -0,0 +1 @@
|
|||
Added option to use a fresh connection for mysql cache
|
28
salt/cache/mysql_cache.py
vendored
28
salt/cache/mysql_cache.py
vendored
|
@ -33,6 +33,8 @@ could be set in the master config. These are the defaults:
|
|||
mysql.password: None
|
||||
mysql.database: salt_cache
|
||||
mysql.table_name: cache
|
||||
# This may be enabled to create a fresh connection on every call
|
||||
mysql.fresh_connection: false
|
||||
|
||||
Related docs can be found in the `python-mysql documentation`_.
|
||||
|
||||
|
@ -63,10 +65,17 @@ try:
|
|||
import MySQLdb.converters
|
||||
import MySQLdb.cursors
|
||||
from MySQLdb.connections import OperationalError
|
||||
|
||||
# Define the interface error as a subclass of exception
|
||||
# It will never be thrown/used, it is defined to support the pymysql error below
|
||||
class InterfaceError(Exception):
|
||||
pass
|
||||
|
||||
except ImportError:
|
||||
try:
|
||||
# MySQLdb import failed, try to import PyMySQL
|
||||
import pymysql
|
||||
from pymysql.err import InterfaceError
|
||||
|
||||
pymysql.install_as_MySQLdb()
|
||||
import MySQLdb
|
||||
|
@ -109,8 +118,12 @@ def run_query(conn, query, args=None, retries=3):
|
|||
Get a cursor and run a query. Reconnect up to ``retries`` times if
|
||||
needed.
|
||||
Returns: cursor, affected rows counter
|
||||
Raises: SaltCacheError, AttributeError, OperationalError
|
||||
Raises: SaltCacheError, AttributeError, OperationalError, InterfaceError
|
||||
"""
|
||||
if __context__.get("mysql_fresh_connection"):
|
||||
# Create a new connection if configured
|
||||
conn = MySQLdb.connect(**__context__["mysql_kwargs"])
|
||||
__context__["mysql_client"] = conn
|
||||
if conn is None:
|
||||
conn = __context__.get("mysql_client")
|
||||
try:
|
||||
|
@ -124,7 +137,7 @@ def run_query(conn, query, args=None, retries=3):
|
|||
out = cur.execute(query, args)
|
||||
|
||||
return cur, out
|
||||
except (AttributeError, OperationalError) as e:
|
||||
except (AttributeError, OperationalError, InterfaceError) as e:
|
||||
if retries == 0:
|
||||
raise
|
||||
# reconnect creating new client
|
||||
|
@ -144,9 +157,7 @@ def run_query(conn, query, args=None, retries=3):
|
|||
if len(query) > 150:
|
||||
query = query[:150] + "<...>"
|
||||
raise SaltCacheError(
|
||||
"Error running {}{}: {}".format(
|
||||
query, "- args: {}".format(args) if args else "", e
|
||||
)
|
||||
"Error running {}{}: {}".format(query, f"- args: {args}" if args else "", e)
|
||||
)
|
||||
|
||||
|
||||
|
@ -232,6 +243,7 @@ def _init_client():
|
|||
mysql_kwargs["autocommit"] = True
|
||||
|
||||
__context__["mysql_table_name"] = opts.pop("mysql.table_name", "salt")
|
||||
__context__["mysql_fresh_connection"] = opts.pop("mysql.fresh_connection", False)
|
||||
|
||||
# Gather up any additional MySQL configuration options
|
||||
for k in opts:
|
||||
|
@ -266,7 +278,7 @@ def store(bank, key, data):
|
|||
cur, cnt = run_query(__context__.get("mysql_client"), query, args=args)
|
||||
cur.close()
|
||||
if cnt not in (1, 2):
|
||||
raise SaltCacheError("Error storing {} {} returned {}".format(bank, key, cnt))
|
||||
raise SaltCacheError(f"Error storing {bank} {key} returned {cnt}")
|
||||
|
||||
|
||||
def fetch(bank, key):
|
||||
|
@ -297,7 +309,7 @@ def flush(bank, key=None):
|
|||
data = (bank, key)
|
||||
query += " AND etcd_key=%s"
|
||||
|
||||
cur, _ = run_query(__context__["mysql_client"], query, args=data)
|
||||
cur, _ = run_query(__context__.get("mysql_client"), query, args=data)
|
||||
cur.close()
|
||||
|
||||
|
||||
|
@ -348,7 +360,7 @@ def updated(bank, key):
|
|||
"AND etcd_key=%s".format(__context__["mysql_table_name"])
|
||||
)
|
||||
data = (bank, key)
|
||||
cur, _ = run_query(__context__["mysql_client"], query=query, args=data)
|
||||
cur, _ = run_query(__context__.get("mysql_client"), query=query, args=data)
|
||||
r = cur.fetchone()
|
||||
cur.close()
|
||||
return int(r[0]) if r else r
|
||||
|
|
3
tests/pytests/unit/cache/test_mysql_cache.py
vendored
3
tests/pytests/unit/cache/test_mysql_cache.py
vendored
|
@ -170,6 +170,7 @@ def test_init_client():
|
|||
assert (
|
||||
mysql_cache.__context__["mysql_kwargs"]["max_allowed_packet"] == 100000
|
||||
)
|
||||
assert not mysql_cache.__context__["mysql_fresh_connection"]
|
||||
|
||||
with patch.dict(
|
||||
mysql_cache.__opts__,
|
||||
|
@ -177,6 +178,7 @@ def test_init_client():
|
|||
"mysql.max_allowed_packet": 100000,
|
||||
"mysql.db": "salt_mysql_db",
|
||||
"mysql.host": "mysql-host",
|
||||
"mysql.fresh_connection": True,
|
||||
},
|
||||
):
|
||||
with patch.object(mysql_cache, "_create_table") as mock_create_table:
|
||||
|
@ -193,6 +195,7 @@ def test_init_client():
|
|||
assert (
|
||||
mysql_cache.__context__["mysql_kwargs"]["max_allowed_packet"] == 100000
|
||||
)
|
||||
assert mysql_cache.__context__["mysql_fresh_connection"]
|
||||
|
||||
|
||||
def test_create_table():
|
||||
|
|
Loading…
Add table
Reference in a new issue