Fixes to mysql modules and state modules (#60087)

* Update alter_db to return True or False depending on the success of failure of the alter.  Update grant_exists to only use the full list of available privileges when the grant is on the global level, eg. datbase is "*.*".  Adding some pytest integration tests, testing across all MySQL variants.

* Adding tests and changelog.

* Removing unneeded log messages.

* Skip new mysql tests if dockerd binary is unavailable.

* Add `pymysql` to linux CI requirements

* Make use of pytest-salt-factories container support.

* Increasing the number of login attempts before failing.

* Moving tests/unit/modules/test_mysql.py to tests/pytests/unit/modules/test_mysql.py

* removing unneeded code.  swapping out the skipIfs for the pytest version.

* Need to pass args in this call to _execute.

* try_except pymysql import, skip tests if not available.

* bump version of pymysql for Python 3.5 down to 1.0.0 as 1.0.2 is unavailable.

* Bumping pymysql version down to 0.9.3 for python 3.5.  version 1.0 is broken and later versions do not work.

* adding pymysql with correct versions to static/ci/linux.in.

* Fixing the password creating and altering issues raised in #60264

* Update linux.in

* Fixing failing unit test tests/pytests/unit/modules/test_mysql.py::test_user_chpass

* Lint fixes.

* Fixing failign tests.

* fixing failing test on MariaDB.

* Removing debugging line.

Co-authored-by: Pedro Algarvio <pedro@algarvio.me>
This commit is contained in:
Gareth J. Greenaway 2021-07-06 10:31:04 -07:00 committed by GitHub
parent 93f88571a8
commit a1f48432af
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1750 additions and 902 deletions

1
changelog/60031.fixed Normal file
View file

@ -0,0 +1 @@
Update alter_db to return True or False depending on the success of failure of the alter. Update grant_exists to only use the full list of available privileges when the grant is on the global level, eg. datbase is "*.*".

View file

@ -2,4 +2,6 @@
pyiface
pygit2<1.1.0; python_version <= '3.8'
pygit2>=1.4.0; python_version > '3.8'
ansible
pymysql==0.9.3; python_version <= '3.5'
pymysql>=1.0.2; python_version > '3.5'
ansible

View file

@ -729,6 +729,8 @@ pyinotify==0.9.6 ; sys_platform != "win32" and sys_platform != "darwin" and plat
# via -r requirements/static/ci/common.in
pyjwt==1.7.1
# via adal
pymysql==1.0.2 ; python_version > "3.5"
# via -r requirements/static/ci/linux.in
pynacl==1.3.0
# via paramiko
pyopenssl==19.1.0

View file

@ -719,6 +719,8 @@ pyinotify==0.9.6 ; sys_platform != "win32" and sys_platform != "darwin" and plat
# via -r requirements/static/ci/common.in
pyjwt==1.7.1
# via adal
pymysql==0.9.3 ; python_version <= "3.5"
# via -r requirements/static/ci/linux.in
pynacl==1.3.0
# via paramiko
pyopenssl==19.1.0

View file

@ -717,6 +717,8 @@ pyinotify==0.9.6 ; sys_platform != "win32" and sys_platform != "darwin" and plat
# via -r requirements/static/ci/common.in
pyjwt==1.7.1
# via adal
pymysql==1.0.2 ; python_version > "3.5"
# via -r requirements/static/ci/linux.in
pynacl==1.3.0
# via paramiko
pyopenssl==19.1.0

View file

@ -735,6 +735,8 @@ pyinotify==0.9.6 ; sys_platform != "win32" and sys_platform != "darwin" and plat
# via -r requirements/static/ci/common.in
pyjwt==1.7.1
# via adal
pymysql==1.0.2 ; python_version > "3.5"
# via -r requirements/static/ci/linux.in
pynacl==1.3.0
# via paramiko
pyopenssl==19.1.0

View file

@ -727,6 +727,8 @@ pyinotify==0.9.6 ; sys_platform != "win32" and sys_platform != "darwin" and plat
# via -r requirements/static/ci/common.in
pyjwt==1.7.1
# via adal
pymysql==1.0.2 ; python_version > "3.5"
# via -r requirements/static/ci/linux.in
pynacl==1.3.0
# via paramiko
pyopenssl==19.1.0

View file

@ -729,6 +729,8 @@ pyinotify==0.9.6 ; sys_platform != "win32" and sys_platform != "darwin" and plat
# via -r requirements/static/ci/common.in
pyjwt==1.7.1
# via adal
pymysql==1.0.2 ; python_version > "3.5"
# via -r requirements/static/ci/linux.in
pynacl==1.3.0
# via paramiko
pyopenssl==19.1.0

View file

@ -1110,7 +1110,15 @@ def alter_db(name, character_set=None, collate=None, **connection_args):
collate or existing.get("collate"),
)
args = {}
_execute(cur, qry, args)
try:
if _execute(cur, qry, args):
log.info("DB '%s' altered", name)
return True
except MySQLdb.OperationalError as exc:
err = "MySQL Error {}: {}".format(*exc.args)
__context__["mysql.error"] = err
log.error(err)
return False
def db_get(name, **connection_args):
@ -1134,7 +1142,14 @@ def db_get(name, **connection_args):
"INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=%(dbname)s;"
)
args = {"dbname": name}
_execute(cur, qry, args)
try:
_execute(cur, qry, args)
except MySQLdb.OperationalError as exc:
err = "MySQL Error {}: {}".format(*exc.args)
__context__["mysql.error"] = err
log.error(err)
return []
if cur.rowcount:
rows = cur.fetchall()
return {"character_set": rows[0][0], "collate": rows[0][1]}
@ -1579,7 +1594,8 @@ def _mysql_user_create(
args["password"] = str(password)
elif password_hash is not None:
if salt.utils.versions.version_cmp(server_version, compare_version) >= 0:
qry += " IDENTIFIED BY %(password)s"
args["auth_plugin"] = auth_plugin
qry += " IDENTIFIED WITH %(auth_plugin)s AS %(password)s"
else:
qry += " IDENTIFIED BY PASSWORD %(password)s"
args["password"] = password_hash
@ -1829,7 +1845,12 @@ def _mysql_user_chpass(
args["host"] = host
if salt.utils.versions.version_cmp(server_version, compare_version) >= 0:
qry = "ALTER USER %(user)s@%(host)s IDENTIFIED BY %(password)s;"
args["auth_plugin"] = auth_plugin
qry = "ALTER USER %(user)s@%(host)s IDENTIFIED WITH %(auth_plugin)s "
if password is not None:
qry += "BY %(password)s;"
elif password_hash is not None:
qry += "AS %(password)s;"
else:
qry = (
"UPDATE mysql.user SET "
@ -1882,7 +1903,7 @@ def _mariadb_user_chpass(
):
server_version = salt.utils.data.decode(version(**connection_args))
compare_version = "10.4.0"
compare_version = "10.4"
args = {}
@ -1905,7 +1926,9 @@ def _mariadb_user_chpass(
args["host"] = host
if salt.utils.versions.version_cmp(server_version, compare_version) >= 0:
qry = "ALTER USER %(user)s@%(host)s IDENTIFIED BY %(password)s;"
args["auth_plugin"] = auth_plugin
qry = "ALTER USER %(user)s@%(host)s IDENTIFIED VIA %(auth_plugin)s USING "
qry += password_sql
else:
qry = (
"UPDATE mysql.user SET "
@ -2374,6 +2397,7 @@ def grant_exists(
if (
salt.utils.versions.version_cmp(server_version, "8.0") >= 0
and "MariaDB" not in server_version
and database == "*.*"
):
grant = ",".join([i for i in __all_privileges__])
else:

View file

@ -0,0 +1,470 @@
"""
Test Salt MySQL module across various MySQL variants
"""
import logging
import pytest
import salt.modules.mysql as mysql
from tests.support.pytest.mysql import mysql_container # pylint: disable=unused-import
log = logging.getLogger(__name__)
pytestmark = [
pytest.mark.slow_test,
pytest.mark.skip_if_binaries_missing("dockerd"),
pytest.mark.skipif(
mysql.MySQLdb is None, reason="No python mysql client installed."
),
]
@pytest.fixture(scope="module")
def salt_call_cli_wrapper(salt_call_cli, mysql_container):
def run_command(*command, **kwargs):
connection_user = kwargs.pop("connection_user", mysql_container.mysql_user)
connection_pass = kwargs.pop("connection_pass", mysql_container.mysql_passwd)
connection_db = kwargs.pop("connection_db", "mysql")
connection_port = kwargs.pop("connection_port", mysql_container.mysql_port)
return salt_call_cli.run(
*command,
connection_user=connection_user,
connection_pass=connection_pass,
connection_db=connection_db,
connection_port=connection_port,
**kwargs
)
return run_command
def test_query(salt_call_cli_wrapper):
ret = salt_call_cli_wrapper("mysql.query", "mysql", "SELECT 1")
assert ret.json
assert ret.json["results"] == [["1"]]
def test_version(salt_call_cli_wrapper, mysql_container):
ret = salt_call_cli_wrapper("mysql.version")
assert ret.json
assert mysql_container.mysql_version in ret.json
def test_status(salt_call_cli_wrapper):
ret = salt_call_cli_wrapper("mysql.status")
assert ret.json
def test_db_list(salt_call_cli_wrapper):
ret = salt_call_cli_wrapper("mysql.db_list")
assert ret.json
assert "mysql" in ret.json
def test_db_create_alter_remove(salt_call_cli_wrapper):
ret = salt_call_cli_wrapper("mysql.db_create", "salt")
assert ret.json
ret = salt_call_cli_wrapper(
"mysql.alter_db",
name="salt",
character_set="latin1",
collate="latin1_general_ci",
)
assert ret.json
ret = salt_call_cli_wrapper("mysql.db_remove", name="salt")
assert ret.json
def test_user_list(salt_call_cli_wrapper):
ret = salt_call_cli_wrapper("mysql.user_list")
assert ret.json
assert {"User": "root", "Host": "%"} in ret.json
def test_user_exists(salt_call_cli_wrapper):
ret = salt_call_cli_wrapper("mysql.user_exists", "root", "%", "password")
assert ret.json
ret = salt_call_cli_wrapper(
"mysql.user_exists", "george", "hostname", "badpassword",
)
assert not ret.json
def test_user_info(salt_call_cli_wrapper):
ret = salt_call_cli_wrapper("mysql.user_info", "root", "%")
assert ret.json
# Check that a subset of the information
# is available in the returned user information.
expected = {
"Host": "%",
"User": "root",
"Select_priv": "Y",
"Insert_priv": "Y",
"Update_priv": "Y",
"Delete_priv": "Y",
"Create_priv": "Y",
"Drop_priv": "Y",
"Reload_priv": "Y",
"Shutdown_priv": "Y",
"Process_priv": "Y",
"File_priv": "Y",
"Grant_priv": "Y",
"References_priv": "Y",
"Index_priv": "Y",
"Alter_priv": "Y",
"Show_db_priv": "Y",
"Super_priv": "Y",
"Create_tmp_table_priv": "Y",
"Lock_tables_priv": "Y",
"Execute_priv": "Y",
"Repl_slave_priv": "Y",
"Repl_client_priv": "Y",
"Create_view_priv": "Y",
"Show_view_priv": "Y",
"Create_routine_priv": "Y",
"Alter_routine_priv": "Y",
"Create_user_priv": "Y",
"Event_priv": "Y",
"Trigger_priv": "Y",
"Create_tablespace_priv": "Y",
}
assert all(ret.json.get(key, None) == val for key, val in expected.items())
def test_user_create_chpass_delete(salt_call_cli_wrapper):
ret = salt_call_cli_wrapper(
"mysql.user_create", "george", host="localhost", password="badpassword",
)
assert ret.json
ret = salt_call_cli_wrapper(
"mysql.user_chpass", "george", host="localhost", password="different_password",
)
assert ret.json
ret = salt_call_cli_wrapper("mysql.user_remove", "george", host="localhost")
assert ret.json
def test_user_grants(salt_call_cli_wrapper):
ret = salt_call_cli_wrapper("mysql.user_grants", "root", host="%")
assert ret.json
def test_grant_add_revoke(salt_call_cli_wrapper):
# Create the database
ret = salt_call_cli_wrapper("mysql.db_create", "salt")
assert ret.json
# Create a user
ret = salt_call_cli_wrapper(
"mysql.user_create", "george", host="localhost", password="badpassword",
)
assert ret.json
# Grant privileges to user to specific table
ret = salt_call_cli_wrapper(
"mysql.grant_add",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="localhost",
)
assert ret.json
# Check the grant exists
ret = salt_call_cli_wrapper(
"mysql.grant_exists",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="localhost",
)
assert ret.json
# Revoke the grant
ret = salt_call_cli_wrapper(
"mysql.grant_revoke",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="localhost",
)
assert ret.json
# Check the grant does not exist
ret = salt_call_cli_wrapper(
"mysql.grant_exists",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="localhost",
)
assert not ret.json
# Grant privileges to user globally
ret = salt_call_cli_wrapper(
"mysql.grant_add",
grant="ALL PRIVILEGES",
database="*.*",
user="george",
host="localhost",
)
assert ret.json
# Check the global exists
ret = salt_call_cli_wrapper(
"mysql.grant_exists",
grant="ALL PRIVILEGES",
database="*.*",
user="george",
host="localhost",
)
assert ret.json
# Revoke the global grant
ret = salt_call_cli_wrapper(
"mysql.grant_revoke",
grant="ALL PRIVILEGES",
database="*.*",
user="george",
host="localhost",
)
assert ret.json
# Check the grant does not exist
ret = salt_call_cli_wrapper(
"mysql.grant_exists",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="localhost",
)
assert not ret.json
# Remove the user
ret = salt_call_cli_wrapper("mysql.user_remove", "george", host="localhost")
assert ret.json
# Remove the database
ret = salt_call_cli_wrapper("mysql.db_remove", "salt")
assert ret.json
def test_plugin_add_status_remove(salt_call_cli_wrapper, mysql_container):
if "mariadb" in mysql_container.mysql_name:
plugin = "simple_password_check"
else:
plugin = "auth_socket"
ret = salt_call_cli_wrapper("mysql.plugin_status", plugin, host="%")
assert not ret.json
ret = salt_call_cli_wrapper("mysql.plugin_add", plugin)
assert ret.json
ret = salt_call_cli_wrapper("mysql.plugin_status", plugin, host="%")
assert ret.json
assert ret.json == "ACTIVE"
ret = salt_call_cli_wrapper("mysql.plugin_remove", plugin)
assert ret.json
ret = salt_call_cli_wrapper("mysql.plugin_status", plugin, host="%")
assert not ret.json
def test_plugin_list(salt_call_cli_wrapper, mysql_container):
if "mariadb" in mysql_container.mysql_name:
plugin = "simple_password_check"
else:
plugin = "auth_socket"
ret = salt_call_cli_wrapper("mysql.plugins_list")
assert {"name": plugin, "status": "ACTIVE"} not in ret.json
assert ret.json
ret = salt_call_cli_wrapper("mysql.plugin_add", plugin)
assert ret.json
ret = salt_call_cli_wrapper("mysql.plugins_list")
assert ret.json
assert {"name": plugin, "status": "ACTIVE"} in ret.json
ret = salt_call_cli_wrapper("mysql.plugin_remove", plugin)
assert ret.json
def test_grant_add_revoke_password_hash(salt_call_cli_wrapper):
# Create the database
ret = salt_call_cli_wrapper("mysql.db_create", "salt")
assert ret.json
# Create a user
ret = salt_call_cli_wrapper(
"mysql.user_create",
"george",
host="%",
password_hash="*2470C0C06DEE42FD1618BB99005ADCA2EC9D1E19",
)
assert ret.json
# Grant privileges to user to specific table
ret = salt_call_cli_wrapper(
"mysql.grant_add",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="%",
)
assert ret.json
# Check the grant exists
ret = salt_call_cli_wrapper(
"mysql.grant_exists",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="%",
)
assert ret.json
# Check the grant exists via a query
ret = salt_call_cli_wrapper(
"mysql.query",
database="salt",
query="SELECT 1",
connection_user="george",
connection_pass="password",
connection_db="salt",
)
assert ret.json
# Revoke the grant
ret = salt_call_cli_wrapper(
"mysql.grant_revoke",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="%",
)
assert ret.json
# Check the grant does not exist
ret = salt_call_cli_wrapper(
"mysql.grant_exists",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="%",
)
assert not ret.json
# Remove the user
ret = salt_call_cli_wrapper("mysql.user_remove", "george", host="%")
assert ret.json
# Remove the database
ret = salt_call_cli_wrapper("mysql.db_remove", "salt")
assert ret.json
def test_create_alter_password_hash(salt_call_cli_wrapper):
# Create the database
ret = salt_call_cli_wrapper("mysql.db_create", "salt")
assert ret.json
# Create a user
ret = salt_call_cli_wrapper(
"mysql.user_create",
"george",
host="%",
password_hash="*2470C0C06DEE42FD1618BB99005ADCA2EC9D1E19",
)
assert ret.json
# Grant privileges to user to specific table
ret = salt_call_cli_wrapper(
"mysql.grant_add",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="%",
)
assert ret.json
# Check the grant exists
ret = salt_call_cli_wrapper(
"mysql.grant_exists",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="%",
)
assert ret.json
# Check we can query as the new user
ret = salt_call_cli_wrapper(
"mysql.query",
database="salt",
query="SELECT 1",
connection_user="george",
connection_pass="password",
connection_db="salt",
)
assert ret.json
# Change the user password
ret = salt_call_cli_wrapper(
"mysql.user_chpass",
"george",
host="%",
password_hash="*F4A5147613F01DEC0C5226BF24CD1D5762E6AAF2",
)
assert ret.json
# Check we can query with the new password
ret = salt_call_cli_wrapper(
"mysql.query",
database="salt",
query="SELECT 1",
connection_user="george",
connection_pass="badpassword",
connection_db="salt",
)
assert ret.json
# Revoke the grant
ret = salt_call_cli_wrapper(
"mysql.grant_revoke",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="%",
)
assert ret.json
# Check the grant does not exist
ret = salt_call_cli_wrapper(
"mysql.grant_exists",
grant="ALL PRIVILEGES",
database="salt.*",
user="george",
host="%",
)
assert not ret.json
# Remove the user
ret = salt_call_cli_wrapper("mysql.user_remove", "george", host="%")
assert ret.json
# Remove the database
ret = salt_call_cli_wrapper("mysql.db_remove", "salt")
assert ret.json

View file

@ -0,0 +1,170 @@
"""
Test Salt MySQL state module across various MySQL variants
"""
import logging
import pytest
import salt.modules.mysql as mysql
from tests.support.pytest.mysql import mysql_container # pylint: disable=unused-import
log = logging.getLogger(__name__)
pytestmark = [
pytest.mark.slow_test,
pytest.mark.skip_if_binaries_missing("dockerd"),
pytest.mark.skipif(
mysql.MySQLdb is None, reason="No python mysql client installed."
),
]
@pytest.fixture(scope="module")
def salt_cli_wrapper(salt_minion, salt_cli, mysql_container):
def run_command(*command, **kwargs):
return salt_cli.run(
*command,
minion_tgt=salt_minion.id,
connection_user=mysql_container.mysql_user,
connection_pass=mysql_container.mysql_passwd,
connection_db="mysql",
connection_port=mysql_container.mysql_port,
**kwargs
)
return run_command
@pytest.fixture(scope="module")
def salt_call_cli_wrapper(salt_call_cli, mysql_container):
def run_command(*command, **kwargs):
return salt_call_cli.run(
*command,
connection_user=mysql_container.mysql_user,
connection_pass=mysql_container.mysql_passwd,
connection_db="mysql",
connection_port=mysql_container.mysql_port,
**kwargs
)
return run_command
def test_database_present_absent(salt_cli_wrapper):
ret = salt_cli_wrapper(
"state.single", "mysql_database.present", name="test_database",
)
state = ret.json["mysql_database_|-test_database_|-test_database_|-present"]
assert ret.exitcode == 0, ret
assert "changes" in state
assert state["changes"] == {"test_database": "Present"}
assert "comment" in state
assert state["comment"] == "The database test_database has been created"
ret = salt_cli_wrapper(
"state.single", "mysql_database.absent", name="test_database",
)
state = ret.json["mysql_database_|-test_database_|-test_database_|-absent"]
assert ret.exitcode == 0, ret
assert "changes" in state
assert state["changes"] == {"test_database": "Absent"}
assert "comment" in state
assert state["comment"] == "Database test_database has been removed"
def test_grants_present_absent(salt_cli_wrapper, salt_call_cli_wrapper):
# Create the database
ret = salt_call_cli_wrapper("mysql.db_create", "salt")
assert ret.json
# Create a user
ret = salt_call_cli_wrapper(
"mysql.user_create", "george", host="localhost", password="badpassword",
)
assert ret.json
ret = salt_cli_wrapper(
"state.single",
"mysql_grants.present",
name="add_salt_grants",
grant="select,insert,update",
database="salt.*",
user="george",
host="localhost",
)
state = ret.json["mysql_grants_|-add_salt_grants_|-add_salt_grants_|-present"]
assert ret.exitcode == 0, ret
assert "changes" in state
assert state["changes"] == {"add_salt_grants": "Present"}
assert "comment" in state
assert (
state["comment"]
== "Grant select,insert,update on salt.* to george@localhost has been added"
)
ret = salt_cli_wrapper(
"state.single",
"mysql_grants.absent",
name="delete_salt_grants",
grant="select,insert,update",
database="salt.*",
user="george",
host="localhost",
)
state = ret.json["mysql_grants_|-delete_salt_grants_|-delete_salt_grants_|-absent"]
assert ret.exitcode == 0, ret
assert "changes" in state
assert state["changes"] == {"delete_salt_grants": "Absent"}
assert "comment" in state
assert (
state["comment"]
== "Grant select,insert,update on salt.* for george@localhost has been revoked"
)
# Remove the user
ret = salt_call_cli_wrapper("mysql.user_remove", "george", host="localhost")
assert ret.json
# Remove the database
ret = salt_call_cli_wrapper("mysql.db_remove", "salt")
assert ret.json
def test_user_present_absent(salt_cli_wrapper):
ret = salt_cli_wrapper(
"state.single",
"mysql_user.present",
name="george",
host="localhost",
password="password",
)
state = ret.json["mysql_user_|-george_|-george_|-present"]
assert ret.exitcode == 0, ret
assert "changes" in state
assert state["changes"] == {"george": "Present"}
assert "comment" in state
assert state["comment"] == "The user george@localhost has been added"
ret = salt_cli_wrapper(
"state.single", "mysql_user.absent", name="george", host="localhost",
)
state = ret.json["mysql_user_|-george_|-george_|-absent"]
assert ret.exitcode == 0, ret
assert "changes" in state
assert state["changes"] == {"george": "Absent"}
assert "comment" in state
assert state["comment"] == "User george@localhost has been removed"

View file

@ -0,0 +1,933 @@
"""
:codeauthor: Mike Place (mp@saltstack.com)
tests.unit.modules.mysql
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
import logging
import pytest
import salt.modules.mysql as mysql
from tests.support.mock import MagicMock, call, mock_open, patch
try:
import pymysql
HAS_PYMYSQL = True
except ImportError:
HAS_PYMYSQL = False
log = logging.getLogger(__name__)
__all_privileges__ = [
"ALTER",
"ALTER ROUTINE",
"BACKUP_ADMIN",
"BINLOG_ADMIN",
"CONNECTION_ADMIN",
"CREATE",
"CREATE ROLE",
"CREATE ROUTINE",
"CREATE TABLESPACE",
"CREATE TEMPORARY TABLES",
"CREATE USER",
"CREATE VIEW",
"DELETE",
"DROP",
"DROP ROLE",
"ENCRYPTION_KEY_ADMIN",
"EVENT",
"EXECUTE",
"FILE",
"GROUP_REPLICATION_ADMIN",
"INDEX",
"INSERT",
"LOCK TABLES",
"PERSIST_RO_VARIABLES_ADMIN",
"PROCESS",
"REFERENCES",
"RELOAD",
"REPLICATION CLIENT",
"REPLICATION SLAVE",
"REPLICATION_SLAVE_ADMIN",
"RESOURCE_GROUP_ADMIN",
"RESOURCE_GROUP_USER",
"ROLE_ADMIN",
"SELECT",
"SET_USER_ID",
"SHOW DATABASES",
"SHOW VIEW",
"SHUTDOWN",
"SUPER",
"SYSTEM_VARIABLES_ADMIN",
"TRIGGER",
"UPDATE",
"XA_RECOVER_ADMIN",
]
pytestmark = [
pytest.mark.slow_test,
pytest.mark.skipif(
mysql.MySQLdb is None, reason="No python mysql client installed."
),
]
class MockMySQLConnect:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def autocommit(self, *args, **kwards):
return True
@pytest.fixture
def configure_loader_modules():
return {mysql: {}}
def test_user_exists():
"""
Test to see if mysql module properly forms the MySQL query to see if a user exists
Do it before test_user_create_when_user_exists mocks the user_exists call
"""
with patch.object(mysql, "version", return_value="8.0.10"):
_test_call(
mysql.user_exists,
{
"sql": (
"SELECT User,Host FROM mysql.user WHERE "
"User = %(user)s AND Host = %(host)s AND "
"Password = PASSWORD(%(password)s)"
),
"sql_args": {
"host": "localhost",
"password": "BLUECOW",
"user": "mytestuser",
},
},
user="mytestuser",
host="localhost",
password="BLUECOW",
)
with patch.object(mysql, "version", return_value="10.1.38-MariaDB"):
_test_call(
mysql.user_exists,
{
"sql": (
"SELECT User,Host FROM mysql.user WHERE "
"User = %(user)s AND Host = %(host)s AND "
"Password = PASSWORD(%(password)s)"
),
"sql_args": {
"host": "localhost",
"password": "BLUECOW",
"user": "mytestuser",
},
},
user="mytestuser",
host="localhost",
password="BLUECOW",
)
with patch.object(mysql, "version", return_value="8.0.11"):
_test_call(
mysql.user_exists,
{
"sql": (
"SELECT User,Host FROM mysql.user WHERE "
"User = %(user)s AND Host = %(host)s"
),
"sql_args": {"host": "localhost", "user": "mytestuser"},
},
user="mytestuser",
host="localhost",
password="BLUECOW",
)
with patch.object(mysql, "version", return_value="8.0.11"):
with patch.object(
mysql, "__get_auth_plugin", MagicMock(return_value="mysql_native_password"),
):
_test_call(
mysql.user_exists,
{
"sql": (
"SELECT User,Host FROM mysql.user WHERE "
"User = %(user)s AND Host = %(host)s AND "
"Password = %(password)s"
),
"sql_args": {
"host": "%",
"password": "*1A01CF8FBE6425398935FB90359AD8B817399102",
"user": "mytestuser",
},
},
user="mytestuser",
host="%",
password="BLUECOW",
)
with patch.object(mysql, "version", return_value="10.2.21-MariaDB"):
_test_call(
mysql.user_exists,
{
"sql": (
"SELECT User,Host FROM mysql.user WHERE "
"User = %(user)s AND Host = %(host)s AND "
"Password = PASSWORD(%(password)s)"
),
"sql_args": {
"host": "localhost",
"password": "BLUECOW",
"user": "mytestuser",
},
},
user="mytestuser",
host="localhost",
password="BLUECOW",
)
with patch.object(
mysql, "version", side_effect=["", "10.2.21-MariaDB", "10.2.21-MariaDB"]
):
_test_call(
mysql.user_exists,
{
"sql": (
"SELECT User,Host FROM mysql.user WHERE "
"User = %(user)s AND Host = %(host)s AND "
"Password = PASSWORD(%(password)s)"
),
"sql_args": {
"host": "localhost",
"password": "new_pass",
"user": "root",
},
},
user="root",
host="localhost",
password="new_pass",
connection_user="root",
connection_pass="old_pass",
)
# test_user_create_when_user_exists():
# ensure we don't try to create a user when one already exists
# mock the version of MySQL
with patch.object(mysql, "version", return_value="8.0.10"):
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
ret = mysql.user_create("testuser")
assert not ret
# test_user_create_when_user_exists():
# ensure we don't try to create a user when one already exists
# mock the version of MySQL
with patch.object(mysql, "version", return_value="8.0.11"):
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
with patch.object(mysql, "verify_login", MagicMock(return_value=True)):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
ret = mysql.user_create("testuser")
assert not False
def test_user_create():
"""
Test the creation of a MySQL user in mysql exec module
"""
with patch.object(mysql, "version", return_value="8.0.10"):
with patch.object(
mysql, "__get_auth_plugin", MagicMock(return_value="mysql_native_password"),
):
_test_call(
mysql.user_create,
{
"sql": "CREATE USER %(user)s@%(host)s IDENTIFIED BY %(password)s",
"sql_args": {
"password": "BLUECOW",
"user": "testuser",
"host": "localhost",
},
},
"testuser",
password="BLUECOW",
)
with patch.object(mysql, "version", return_value="8.0.11"):
with patch.object(
mysql, "__get_auth_plugin", MagicMock(return_value="mysql_native_password"),
):
_test_call(
mysql.user_create,
{
"sql": "CREATE USER %(user)s@%(host)s IDENTIFIED WITH %(auth_plugin)s BY %(password)s",
"sql_args": {
"password": "BLUECOW",
"auth_plugin": "mysql_native_password",
"user": "testuser",
"host": "localhost",
},
},
"testuser",
password="BLUECOW",
)
# Test creating a user with passwordless=True and unix_socket=True
with patch.object(mysql, "version", return_value="8.0.10"):
with patch.object(mysql, "plugin_status", MagicMock(return_value="ACTIVE")):
_test_call(
mysql.user_create,
{
"sql": "CREATE USER %(user)s@%(host)s IDENTIFIED WITH auth_socket",
"sql_args": {"user": "testuser", "host": "localhost"},
},
"testuser",
allow_passwordless=True,
unix_socket=True,
)
with patch.object(mysql, "version", return_value="10.2.21-MariaDB"):
with patch.object(mysql, "plugin_status", MagicMock(return_value="ACTIVE")):
_test_call(
mysql.user_create,
{
"sql": "CREATE USER %(user)s@%(host)s IDENTIFIED VIA unix_socket",
"sql_args": {"user": "testuser", "host": "localhost"},
},
"testuser",
allow_passwordless=True,
unix_socket=True,
)
with patch.object(mysql, "version", side_effect=["", "8.0.10", "8.0.10"]):
with patch.object(
mysql, "user_exists", MagicMock(return_value=False)
), patch.object(
mysql, "__get_auth_plugin", MagicMock(return_value="mysql_native_password"),
):
_test_call(
mysql.user_create,
{
"sql": "CREATE USER %(user)s@%(host)s IDENTIFIED BY %(password)s",
"sql_args": {
"password": "new_pass",
"user": "root",
"host": "localhost",
},
},
"root",
password="new_pass",
connection_user="root",
connection_pass="old_pass",
)
def test_user_chpass():
"""
Test changing a MySQL user password in mysql exec module
"""
connect_mock = MagicMock()
with patch.object(mysql, "_connect", connect_mock):
with patch.object(mysql, "version", return_value="8.0.10"):
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
mysql.user_chpass("testuser", password="BLUECOW")
calls = (
call()
.cursor()
.execute(
"UPDATE mysql.user SET Password=PASSWORD(%(password)s) WHERE User=%(user)s AND Host = %(host)s;",
{
"password": "BLUECOW",
"user": "testuser",
"host": "localhost",
},
),
call().cursor().execute("FLUSH PRIVILEGES;"),
)
connect_mock.assert_has_calls(calls, any_order=True)
connect_mock = MagicMock()
with patch.object(mysql, "_connect", connect_mock):
with patch.object(mysql, "version", return_value="8.0.11"):
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
with patch.object(
mysql,
"__get_auth_plugin",
MagicMock(return_value="mysql_native_password"),
):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
mysql.user_chpass("testuser", password="BLUECOW")
calls = (
call()
.cursor()
.execute(
"ALTER USER %(user)s@%(host)s IDENTIFIED WITH %(auth_plugin)s BY %(password)s;",
{
"password": "BLUECOW",
"user": "testuser",
"host": "localhost",
"auth_plugin": "mysql_native_password",
},
),
call().cursor().execute("FLUSH PRIVILEGES;"),
)
connect_mock.assert_has_calls(calls, any_order=True)
connect_mock = MagicMock()
with patch.object(mysql, "_connect", connect_mock):
with patch.object(mysql, "version", side_effect=["", "8.0.11", "8.0.11"]):
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
with patch.object(
mysql,
"__get_auth_plugin",
MagicMock(return_value="mysql_native_password"),
):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
mysql.user_chpass(
"root",
password="new_pass",
connection_user="root",
connection_pass="old_pass",
)
calls = (
call()
.cursor()
.execute(
"ALTER USER %(user)s@%(host)s IDENTIFIED WITH %(auth_plugin)s BY %(password)s;",
{
"password": "new_pass",
"user": "root",
"host": "localhost",
"auth_plugin": "mysql_native_password",
},
),
call().cursor().execute("FLUSH PRIVILEGES;"),
)
connect_mock.assert_has_calls(calls, any_order=True)
def test_user_remove():
"""
Test the removal of a MySQL user in mysql exec module
"""
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
_test_call(
mysql.user_remove,
{
"sql": "DROP USER %(user)s@%(host)s",
"sql_args": {"user": "testuser", "host": "localhost"},
},
"testuser",
)
def test_db_check():
"""
Test MySQL db check function in mysql exec module
"""
_test_call(
mysql.db_check,
"CHECK TABLE `test``'\" db`.`my``'\" table`",
"test`'\" db",
"my`'\" table",
)
def test_db_repair():
"""
Test MySQL db repair function in mysql exec module
"""
_test_call(
mysql.db_repair,
"REPAIR TABLE `test``'\" db`.`my``'\" table`",
"test`'\" db",
"my`'\" table",
)
def test_db_optimize():
"""
Test MySQL db optimize function in mysql exec module
"""
_test_call(
mysql.db_optimize,
"OPTIMIZE TABLE `test``'\" db`.`my``'\" table`",
"test`'\" db",
"my`'\" table",
)
def test_db_remove():
"""
Test MySQL db remove function in mysql exec module
"""
with patch.object(mysql, "db_exists", MagicMock(return_value=True)):
_test_call(mysql.db_remove, "DROP DATABASE `test``'\" db`;", "test`'\" db")
def test_db_tables():
"""
Test MySQL db_tables function in mysql exec module
"""
with patch.object(mysql, "db_exists", MagicMock(return_value=True)):
_test_call(mysql.db_tables, "SHOW TABLES IN `test``'\" db`", "test`'\" db")
def test_db_exists():
"""
Test MySQL db_exists function in mysql exec module
"""
_test_call(
mysql.db_exists,
{
"sql": "SHOW DATABASES LIKE %(dbname)s;",
"sql_args": {"dbname": r"""test%_`" db"""},
},
'test%_`" db',
)
def test_db_create():
"""
Test MySQL db_create function in mysql exec module
"""
_test_call(
mysql.db_create, "CREATE DATABASE IF NOT EXISTS `test``'\" db`;", "test`'\" db",
)
def test_user_list():
"""
Test MySQL user_list function in mysql exec module
"""
_test_call(mysql.user_list, "SELECT User,Host FROM mysql.user")
def test_user_info():
"""
Test to see if the mysql execution module correctly forms the SQL for information on a MySQL user.
"""
_test_call(
mysql.user_info,
{
"sql": "SELECT * FROM mysql.user WHERE User = %(user)s AND Host = %(host)s",
"sql_args": {"host": "localhost", "user": "mytestuser"},
},
"mytestuser",
)
def test_user_grants():
"""
Test to ensure the mysql user_grants function returns properly formed SQL for a basic query
"""
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
_test_call(
mysql.user_grants,
{
"sql": "SHOW GRANTS FOR %(user)s@%(host)s",
"sql_args": {"host": "localhost", "user": "testuser"},
},
"testuser",
)
def test_grant_exists_true():
"""
Test to ensure that we can find a grant that exists
"""
mock_grants = [
"GRANT USAGE ON *.* TO 'testuser'@'%'",
"GRANT SELECT, INSERT, UPDATE ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT(column1,column2) ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT(column1,column2), INSERT(column1,column2) ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT(column1,column2), UPDATE ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT ON `testdb`.`testtabletwo` TO 'testuser'@'%'",
"GRANT SELECT ON `testdb`.`testtablethree` TO 'testuser'@'%'",
]
with patch.object(mysql, "version", return_value="5.6.41"):
mock = MagicMock(return_value=mock_grants)
with patch.object(
mysql, "user_grants", return_value=mock_grants
) as mock_user_grants:
ret = mysql.grant_exists(
"SELECT, INSERT, UPDATE", "testdb.testtableone", "testuser", "%"
)
assert ret
def test_grant_exists_false():
"""
Test to ensure that we don't find a grant that doesn't exist
"""
mock_grants = [
"GRANT USAGE ON *.* TO 'testuser'@'%'",
"GRANT SELECT, INSERT, UPDATE ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT(column1,column2) ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT(column1,column2), UPDATE ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT ON `testdb`.`testtablethree` TO 'testuser'@'%'",
]
with patch.object(mysql, "version", return_value="5.6.41"):
mock = MagicMock(return_value=mock_grants)
with patch.object(
mysql, "user_grants", return_value=mock_grants
) as mock_user_grants:
ret = mysql.grant_exists("SELECT", "testdb.testtabletwo", "testuser", "%")
assert not ret
def test_grant_exists_all():
"""
Test to ensure that we can find a grant that exists
"""
mock_grants = ["GRANT ALL PRIVILEGES ON testdb.testtableone TO `testuser`@`%`"]
with patch.object(mysql, "version", return_value="8.0.10"):
mock = MagicMock(return_value=mock_grants)
with patch.object(
mysql, "user_grants", return_value=mock_grants
) as mock_user_grants:
ret = mysql.grant_exists("ALL", "testdb.testtableone", "testuser", "%")
assert ret
with patch.object(mysql, "version", return_value="8.0.10"):
mock = MagicMock(return_value=mock_grants)
with patch.object(
mysql, "user_grants", return_value=mock_grants
) as mock_user_grants:
ret = mysql.grant_exists(
"all privileges", "testdb.testtableone", "testuser", "%"
)
assert ret
mock_grants = ["GRANT ALL PRIVILEGES ON testdb.testtableone TO `testuser`@`%`"]
with patch.object(mysql, "version", return_value="5.6.41"):
mock = MagicMock(return_value=mock_grants)
with patch.object(
mysql, "user_grants", return_value=mock_grants
) as mock_user_grants:
ret = mysql.grant_exists(
"ALL PRIVILEGES", "testdb.testtableone", "testuser", "%"
)
assert ret
mock_grants = [
"GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, SHUTDOWN, PROCESS, FILE, REFERENCES, INDEX, ALTER, SHOW DATABASES, SUPER, CREATE TEMPORARY TABLES, LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER, CREATE TABLESPACE, CREATE ROLE, DROP ROLE ON *.* TO `testuser`@`%`",
"GRANT BACKUP_ADMIN,BINLOG_ADMIN,CONNECTION_ADMIN,ENCRYPTION_KEY_ADMIN,GROUP_REPLICATION_ADMIN,PERSIST_RO_VARIABLES_ADMIN,REPLICATION_SLAVE_ADMIN,RESOURCE_GROUP_ADMIN,RESOURCE_GROUP_USER,ROLE_ADMIN,SET_USER_ID,SYSTEM_VARIABLES_ADMIN,XA_RECOVER_ADMIN ON *.* TO `testuser`@`%`",
]
with patch.object(mysql, "version", return_value="8.0.10"):
mock = MagicMock(return_value=mock_grants)
with patch.object(
mysql, "user_grants", return_value=mock_grants
) as mock_user_grants:
ret = mysql.grant_exists("ALL", "*.*", "testuser", "%")
assert ret
with patch.object(mysql, "version", return_value="8.0.10"):
mock = MagicMock(return_value=mock_grants)
with patch.object(
mysql, "user_grants", return_value=mock_grants
) as mock_user_grants:
ret = mysql.grant_exists("all privileges", "*.*", "testuser", "%")
assert ret
@pytest.mark.skipif(True, reason="TODO: Mock up user_grants()")
def test_grant_add():
"""
Test grant_add function in mysql exec module
"""
_test_call(
mysql.grant_add, "", "SELECT,INSERT,UPDATE", "database.*", "frank", "localhost",
)
@pytest.mark.skipif(True, reason="TODO: Mock up user_grants()")
def test_grant_revoke():
"""
Test grant revoke in mysql exec module
"""
_test_call(
mysql.grant_revoke,
"",
"SELECT,INSERT,UPDATE",
"database.*",
"frank",
"localhost",
)
def test_processlist():
"""
Test processlist function in mysql exec module
"""
_test_call(mysql.processlist, "SHOW FULL PROCESSLIST")
def test_get_master_status():
"""
Test get_master_status in the mysql execution module
"""
_test_call(mysql.get_master_status, "SHOW MASTER STATUS")
def test_get_slave_status():
"""
Test get_slave_status in the mysql execution module
"""
_test_call(mysql.get_slave_status, "SHOW SLAVE STATUS")
def test_get_slave_status_bad_server():
"""
Test get_slave_status in the mysql execution module, simulating a broken server
"""
connect_mock = MagicMock(return_value=None)
with patch.object(mysql, "_connect", connect_mock):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
rslt = mysql.get_slave_status()
connect_mock.assert_has_calls([call()])
assert rslt == []
@pytest.mark.skipif(
True, reason="MySQL module claims this function is not ready for production"
)
def test_free_slave():
pass
def test_query():
_test_call(mysql.query, "SELECT * FROM testdb", "testdb", "SELECT * FROM testdb")
@pytest.mark.skipif(not HAS_PYMYSQL, reason="Could not import pymysql")
def test_query_error():
connect_mock = MagicMock()
with patch.object(mysql, "_connect", connect_mock):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
# Use the OperationalError from the salt mysql module because that
# exception can come from either MySQLdb or pymysql
side_effect = mysql.OperationalError(9999, "Something Went Wrong")
with patch.object(mysql, "_execute", MagicMock(side_effect=side_effect)):
mysql.query("testdb", "SELECT * FROM testdb")
assert "mysql.error" in mysql.__context__
expected = "MySQL Error 9999: Something Went Wrong"
assert mysql.__context__["mysql.error"] == expected
def test_plugin_add():
"""
Test the adding/installing a MySQL / MariaDB plugin
"""
with patch.object(mysql, "plugin_status", MagicMock(return_value="")):
_test_call(
mysql.plugin_add,
'INSTALL PLUGIN auth_socket SONAME "auth_socket.so"',
"auth_socket",
)
def test_plugin_remove():
"""
Test the removing/uninstalling a MySQL / MariaDB plugin
"""
with patch.object(mysql, "plugin_status", MagicMock(return_value="ACTIVE")):
_test_call(
mysql.plugin_remove, "UNINSTALL PLUGIN auth_socket", "auth_socket",
)
def test_plugin_status():
"""
Test checking the status of a MySQL / MariaDB plugin
"""
_test_call(
mysql.plugin_status,
{
"sql": "SELECT PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS WHERE PLUGIN_NAME = %(name)s",
"sql_args": {"name": "auth_socket"},
},
"auth_socket",
)
def test_sanitize_comment():
"""
Test comment sanitization
"""
input_data = """/*
multiline
comment
*/
CREATE TABLE test_update (a VARCHAR(25)); # end of line comment
# example comment
insert into test_update values ("some #hash value"); -- ending comment
insert into test_update values ("crazy -- not comment"); -- another ending comment
-- another comment type
"""
expected_response = """CREATE TABLE test_update (a VARCHAR(25));
insert into test_update values ("some #hash value");
insert into test_update values ("crazy -- not comment");
"""
output = mysql._sanitize_comments(input_data)
assert output == expected_response
input_data = """-- --------------------------------------------------------
-- SQL Commands to set up the pmadb as described in the documentation.
--
-- This file is meant for use with MySQL 5 and above!
--
-- This script expects the user pma to already be existing. If we would put a
-- line here to create them too many users might just use this script and end
-- up with having the same password for the controluser.
--
-- This user "pma" must be defined in config.inc.php (controluser/controlpass)
--
-- Please don't forget to set up the tablenames in config.inc.php
--
-- --------------------------------------------------------
--
CREATE DATABASE IF NOT EXISTS `phpmyadmin`
DEFAULT CHARACTER SET utf8 COLLATE utf8_bin;
USE phpmyadmin;
"""
expected_response = """CREATE DATABASE IF NOT EXISTS `phpmyadmin`
DEFAULT CHARACTER SET utf8 COLLATE utf8_bin;
USE phpmyadmin;"""
output = mysql._sanitize_comments(input_data)
assert output == expected_response
def _test_call(function, expected_sql, *args, **kwargs):
connect_mock = MagicMock()
with patch.object(mysql, "_connect", connect_mock):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
function(*args, **kwargs)
if isinstance(expected_sql, dict):
calls = (
call()
.cursor()
.execute("{}".format(expected_sql["sql"]), expected_sql["sql_args"])
)
else:
calls = call().cursor().execute("{}".format(expected_sql))
connect_mock.assert_has_calls((calls,), True)
def test_file_query():
"""
Test file_query
"""
with patch.object(mysql, "HAS_SQLPARSE", False):
ret = mysql.file_query("database", "filename")
assert not ret
file_data = """-- --------------------------------------------------------
-- SQL Commands to set up the pmadb as described in the documentation.
--
-- This file is meant for use with MySQL 5 and above!
--
-- This script expects the user pma to already be existing. If we would put a
-- line here to create them too many users might just use this script and end
-- up with having the same password for the controluser.
--
-- This user "pma" must be defined in config.inc.php (controluser/controlpass)
--
-- Please don't forget to set up the tablenames in config.inc.php
--
-- --------------------------------------------------------
--
USE phpmyadmin;
--
-- Table structure for table `pma__bookmark`
--
CREATE TABLE IF NOT EXISTS `pma__bookmark` (
`id` int(10) unsigned NOT NULL auto_increment,
`dbase` varchar(255) NOT NULL default '',
`user` varchar(255) NOT NULL default '',
`label` varchar(255) COLLATE utf8_general_ci NOT NULL default '',
`query` text NOT NULL,
PRIMARY KEY (`id`)
)
COMMENT='Bookmarks'
DEFAULT CHARACTER SET utf8 COLLATE utf8_bin;
"""
side_effect = [
{"query time": {"human": "0.4ms", "raw": "0.00038"}, "rows affected": 0},
{"query time": {"human": "8.9ms", "raw": "0.00893"}, "rows affected": 0},
]
expected = {
"query time": {"human": "8.9ms", "raw": "0.00893"},
"rows affected": 0,
}
with patch("os.path.exists", MagicMock(return_value=True)):
with patch("salt.utils.files.fopen", mock_open(read_data=file_data)):
with patch.object(mysql, "query", side_effect=side_effect):
ret = mysql.file_query("database", "filename")
assert ret, expected
@pytest.mark.skipif(not HAS_PYMYSQL, reason="Could not import pymysql")
def test__connect_pymysql_exception():
"""
Test the _connect function in the MySQL module
"""
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
with patch(
"MySQLdb.connect",
side_effect=pymysql.err.InternalError(
1698, "Access denied for user 'root'@'localhost'"
),
):
ret = mysql._connect()
assert "mysql.error" in mysql.__context__
assert (
mysql.__context__["mysql.error"]
== "MySQL Error 1698: Access denied for user 'root'@'localhost'"
)
def test__connect_mysqldb_exception():
"""
Test the _connect function in the MySQL module
"""
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
with patch(
"MySQLdb.connect",
side_effect=mysql.OperationalError(
1698, "Access denied for user 'root'@'localhost'"
),
):
ret = mysql._connect()
assert "mysql.error" in mysql.__context__
assert (
mysql.__context__["mysql.error"]
== "MySQL Error 1698: Access denied for user 'root'@'localhost'"
)
def test__connect_mysqldb():
"""
Test the _connect function in the MySQL module
"""
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
with patch("MySQLdb.connect", return_value=MockMySQLConnect()):
ret = mysql._connect()
assert "mysql.error" not in mysql.__context__

View file

@ -0,0 +1,131 @@
import time
import attr
import pytest
from saltfactories.daemons.container import Container
from saltfactories.utils import random_string
from saltfactories.utils.ports import get_unused_localhost_port
docker = pytest.importorskip("docker")
docker_errors = pytest.importorskip("docker.errors")
@attr.s(kw_only=True, slots=True)
class MySQLImage:
name = attr.ib()
tag = attr.ib()
container_id = attr.ib()
def __str__(self):
return "{}:{}".format(self.name, self.tag)
@attr.s(kw_only=True, slots=True)
class MySQLCombo:
mysql_name = attr.ib()
mysql_version = attr.ib()
mysql_port = attr.ib()
mysql_user = attr.ib()
mysql_passwd = attr.ib()
@mysql_port.default
def _mysql_port(self):
return get_unused_localhost_port()
def get_test_versions():
test_versions = []
name = "mysql/mysql-server"
for version in ("5.5", "5.6", "5.7", "8.0"):
test_versions.append(
MySQLImage(
name=name,
tag=version,
container_id=random_string("mysql-{}-".format(version)),
)
)
name = "mariadb"
for version in ("10.1", "10.2", "10.3", "10.4", "10.5"):
test_versions.append(
MySQLImage(
name=name,
tag=version,
container_id=random_string("mariadb-{}-".format(version)),
)
)
name = "percona"
for version in ("5.5", "5.6", "5.7", "8.0"):
test_versions.append(
MySQLImage(
name=name,
tag=version,
container_id=random_string("percona-{}-".format(version)),
)
)
return test_versions
def get_test_version_id(value):
return "container={}".format(value)
@pytest.fixture(scope="module", params=get_test_versions(), ids=get_test_version_id)
def mysql_container(request, salt_factories, salt_call_cli):
try:
docker_client = docker.from_env()
except docker_errors.DockerException:
pytest.skip("Failed to get a connection to docker running on the system")
connectable = Container.client_connectable(docker_client)
if connectable is not True: # pragma: no cover
pytest.skip(connectable)
mysql_image = request.param
mysql_user = "root"
mysql_passwd = "password"
combo = MySQLCombo(
mysql_name=mysql_image.name,
mysql_version=mysql_image.tag,
mysql_user=mysql_user,
mysql_passwd=mysql_passwd,
)
container = salt_factories.get_container(
mysql_image.container_id,
"{}:{}".format(combo.mysql_name, combo.mysql_version),
docker_client=docker_client,
check_ports=[combo.mysql_port],
container_run_kwargs={
"ports": {"3306/tcp": combo.mysql_port},
"environment": {
"MYSQL_ROOT_PASSWORD": mysql_passwd,
"MYSQL_ROOT_HOST": "%",
},
},
)
with container.started():
authenticated = False
login_attempts = 6
while login_attempts:
login_attempts -= 1
# Make sure "MYSQL" is ready
ret = salt_call_cli.run(
"docker.run",
name=mysql_image.container_id,
cmd="mysql --user=root --password=password -e 'SELECT 1'",
)
authenticated = ret.exitcode == 0
if authenticated:
break
time.sleep(2)
if authenticated:
yield combo
else:
pytest.fail(
"Failed to login into mysql server running in container(id: {})".format(
mysql_image.container_id
)
)

View file

@ -1,895 +0,0 @@
"""
:codeauthor: Mike Place (mp@saltstack.com)
tests.unit.modules.mysql
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
import logging
import salt.modules.mysql as mysql
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import MagicMock, call, mock_open, patch
from tests.support.unit import TestCase, skipIf
log = logging.getLogger(__name__)
NO_MYSQL = False
NO_PyMYSQL = False
try:
import MySQLdb # pylint: disable=W0611
except ImportError:
NO_MYSQL = True
try:
# MySQLdb import failed, try to import PyMySQL
import pymysql
except ImportError:
NO_PyMYSQL = True
__all_privileges__ = [
"ALTER",
"ALTER ROUTINE",
"BACKUP_ADMIN",
"BINLOG_ADMIN",
"CONNECTION_ADMIN",
"CREATE",
"CREATE ROLE",
"CREATE ROUTINE",
"CREATE TABLESPACE",
"CREATE TEMPORARY TABLES",
"CREATE USER",
"CREATE VIEW",
"DELETE",
"DROP",
"DROP ROLE",
"ENCRYPTION_KEY_ADMIN",
"EVENT",
"EXECUTE",
"FILE",
"GROUP_REPLICATION_ADMIN",
"INDEX",
"INSERT",
"LOCK TABLES",
"PERSIST_RO_VARIABLES_ADMIN",
"PROCESS",
"REFERENCES",
"RELOAD",
"REPLICATION CLIENT",
"REPLICATION SLAVE",
"REPLICATION_SLAVE_ADMIN",
"RESOURCE_GROUP_ADMIN",
"RESOURCE_GROUP_USER",
"ROLE_ADMIN",
"SELECT",
"SET_USER_ID",
"SHOW DATABASES",
"SHOW VIEW",
"SHUTDOWN",
"SUPER",
"SYSTEM_VARIABLES_ADMIN",
"TRIGGER",
"UPDATE",
"XA_RECOVER_ADMIN",
]
class MockMySQLConnect:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def autocommit(self, *args, **kwards):
return True
@skipIf(NO_MYSQL, "Install MySQL bindings before running MySQL unit tests.")
class MySQLTestCase(TestCase, LoaderModuleMockMixin):
def setup_loader_modules(self):
return {mysql: {}}
def test_user_exists(self):
"""
Test to see if mysql module properly forms the MySQL query to see if a user exists
Do it before test_user_create_when_user_exists mocks the user_exists call
"""
with patch.object(mysql, "version", return_value="8.0.10"):
self._test_call(
mysql.user_exists,
{
"sql": (
"SELECT User,Host FROM mysql.user WHERE "
"User = %(user)s AND Host = %(host)s AND "
"Password = PASSWORD(%(password)s)"
),
"sql_args": {
"host": "localhost",
"password": "BLUECOW",
"user": "mytestuser",
},
},
user="mytestuser",
host="localhost",
password="BLUECOW",
)
with patch.object(mysql, "version", return_value="10.1.38-MariaDB"):
self._test_call(
mysql.user_exists,
{
"sql": (
"SELECT User,Host FROM mysql.user WHERE "
"User = %(user)s AND Host = %(host)s AND "
"Password = PASSWORD(%(password)s)"
),
"sql_args": {
"host": "localhost",
"password": "BLUECOW",
"user": "mytestuser",
},
},
user="mytestuser",
host="localhost",
password="BLUECOW",
)
with patch.object(mysql, "version", return_value="8.0.11"):
self._test_call(
mysql.user_exists,
{
"sql": (
"SELECT User,Host FROM mysql.user WHERE "
"User = %(user)s AND Host = %(host)s"
),
"sql_args": {"host": "localhost", "user": "mytestuser"},
},
user="mytestuser",
host="localhost",
password="BLUECOW",
)
with patch.object(mysql, "version", return_value="8.0.11"):
with patch.object(
mysql,
"__get_auth_plugin",
MagicMock(return_value="mysql_native_password"),
):
self._test_call(
mysql.user_exists,
{
"sql": (
"SELECT User,Host FROM mysql.user WHERE "
"User = %(user)s AND Host = %(host)s AND "
"Password = %(password)s"
),
"sql_args": {
"host": "%",
"password": "*1A01CF8FBE6425398935FB90359AD8B817399102",
"user": "mytestuser",
},
},
user="mytestuser",
host="%",
password="BLUECOW",
)
with patch.object(mysql, "version", return_value="10.2.21-MariaDB"):
self._test_call(
mysql.user_exists,
{
"sql": (
"SELECT User,Host FROM mysql.user WHERE "
"User = %(user)s AND Host = %(host)s AND "
"Password = PASSWORD(%(password)s)"
),
"sql_args": {
"host": "localhost",
"password": "BLUECOW",
"user": "mytestuser",
},
},
user="mytestuser",
host="localhost",
password="BLUECOW",
)
with patch.object(
mysql, "version", side_effect=["", "10.2.21-MariaDB", "10.2.21-MariaDB"]
):
self._test_call(
mysql.user_exists,
{
"sql": (
"SELECT User,Host FROM mysql.user WHERE "
"User = %(user)s AND Host = %(host)s AND "
"Password = PASSWORD(%(password)s)"
),
"sql_args": {
"host": "localhost",
"password": "new_pass",
"user": "root",
},
},
user="root",
host="localhost",
password="new_pass",
connection_user="root",
connection_pass="old_pass",
)
# test_user_create_when_user_exists(self):
# ensure we don't try to create a user when one already exists
# mock the version of MySQL
with patch.object(mysql, "version", return_value="8.0.10"):
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
ret = mysql.user_create("testuser")
self.assertEqual(False, ret)
# test_user_create_when_user_exists(self):
# ensure we don't try to create a user when one already exists
# mock the version of MySQL
with patch.object(mysql, "version", return_value="8.0.11"):
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
with patch.object(mysql, "verify_login", MagicMock(return_value=True)):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
ret = mysql.user_create("testuser")
self.assertEqual(False, ret)
def test_user_create(self):
"""
Test the creation of a MySQL user in mysql exec module
"""
with patch.object(mysql, "version", return_value="8.0.10"):
with patch.object(
mysql,
"__get_auth_plugin",
MagicMock(return_value="mysql_native_password"),
):
self._test_call(
mysql.user_create,
{
"sql": "CREATE USER %(user)s@%(host)s IDENTIFIED BY %(password)s",
"sql_args": {
"password": "BLUECOW",
"user": "testuser",
"host": "localhost",
},
},
"testuser",
password="BLUECOW",
)
with patch.object(mysql, "version", return_value="8.0.11"):
with patch.object(
mysql,
"__get_auth_plugin",
MagicMock(return_value="mysql_native_password"),
):
self._test_call(
mysql.user_create,
{
"sql": "CREATE USER %(user)s@%(host)s IDENTIFIED WITH %(auth_plugin)s BY %(password)s",
"sql_args": {
"password": "BLUECOW",
"auth_plugin": "mysql_native_password",
"user": "testuser",
"host": "localhost",
},
},
"testuser",
password="BLUECOW",
)
# Test creating a user with passwordless=True and unix_socket=True
with patch.object(mysql, "version", return_value="8.0.10"):
with patch.object(mysql, "plugin_status", MagicMock(return_value="ACTIVE")):
self._test_call(
mysql.user_create,
{
"sql": "CREATE USER %(user)s@%(host)s IDENTIFIED WITH auth_socket",
"sql_args": {"user": "testuser", "host": "localhost"},
},
"testuser",
allow_passwordless=True,
unix_socket=True,
)
with patch.object(mysql, "version", return_value="10.2.21-MariaDB"):
with patch.object(mysql, "plugin_status", MagicMock(return_value="ACTIVE")):
self._test_call(
mysql.user_create,
{
"sql": "CREATE USER %(user)s@%(host)s IDENTIFIED VIA unix_socket",
"sql_args": {"user": "testuser", "host": "localhost"},
},
"testuser",
allow_passwordless=True,
unix_socket=True,
)
with patch.object(mysql, "version", side_effect=["", "8.0.10", "8.0.10"]):
with patch.object(
mysql, "user_exists", MagicMock(return_value=False)
), patch.object(
mysql,
"__get_auth_plugin",
MagicMock(return_value="mysql_native_password"),
):
self._test_call(
mysql.user_create,
{
"sql": "CREATE USER %(user)s@%(host)s IDENTIFIED BY %(password)s",
"sql_args": {
"password": "new_pass",
"user": "root",
"host": "localhost",
},
},
"root",
password="new_pass",
connection_user="root",
connection_pass="old_pass",
)
def test_user_chpass(self):
"""
Test changing a MySQL user password in mysql exec module
"""
connect_mock = MagicMock()
with patch.object(mysql, "_connect", connect_mock):
with patch.object(mysql, "version", return_value="8.0.10"):
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
mysql.user_chpass("testuser", password="BLUECOW")
calls = (
call()
.cursor()
.execute(
"UPDATE mysql.user SET Password=PASSWORD(%(password)s) WHERE User=%(user)s AND Host = %(host)s;",
{
"password": "BLUECOW",
"user": "testuser",
"host": "localhost",
},
),
call().cursor().execute("FLUSH PRIVILEGES;"),
)
connect_mock.assert_has_calls(calls, any_order=True)
connect_mock = MagicMock()
with patch.object(mysql, "_connect", connect_mock):
with patch.object(mysql, "version", return_value="8.0.11"):
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
mysql.user_chpass("testuser", password="BLUECOW")
calls = (
call()
.cursor()
.execute(
"ALTER USER %(user)s@%(host)s IDENTIFIED BY %(password)s;",
{
"password": "BLUECOW",
"user": "testuser",
"host": "localhost",
},
),
call().cursor().execute("FLUSH PRIVILEGES;"),
)
connect_mock.assert_has_calls(calls, any_order=True)
connect_mock = MagicMock()
with patch.object(mysql, "_connect", connect_mock):
with patch.object(mysql, "version", side_effect=["", "8.0.11", "8.0.11"]):
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
mysql.user_chpass(
"root",
password="new_pass",
connection_user="root",
connection_pass="old_pass",
)
calls = (
call()
.cursor()
.execute(
"ALTER USER %(user)s@%(host)s IDENTIFIED BY %(password)s;",
{
"password": "new_pass",
"user": "root",
"host": "localhost",
},
),
call().cursor().execute("FLUSH PRIVILEGES;"),
)
connect_mock.assert_has_calls(calls, any_order=True)
def test_user_remove(self):
"""
Test the removal of a MySQL user in mysql exec module
"""
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
self._test_call(
mysql.user_remove,
{
"sql": "DROP USER %(user)s@%(host)s",
"sql_args": {"user": "testuser", "host": "localhost"},
},
"testuser",
)
def test_db_check(self):
"""
Test MySQL db check function in mysql exec module
"""
self._test_call(
mysql.db_check,
"CHECK TABLE `test``'\" db`.`my``'\" table`",
"test`'\" db",
"my`'\" table",
)
def test_db_repair(self):
"""
Test MySQL db repair function in mysql exec module
"""
self._test_call(
mysql.db_repair,
"REPAIR TABLE `test``'\" db`.`my``'\" table`",
"test`'\" db",
"my`'\" table",
)
def test_db_optimize(self):
"""
Test MySQL db optimize function in mysql exec module
"""
self._test_call(
mysql.db_optimize,
"OPTIMIZE TABLE `test``'\" db`.`my``'\" table`",
"test`'\" db",
"my`'\" table",
)
def test_db_remove(self):
"""
Test MySQL db remove function in mysql exec module
"""
with patch.object(mysql, "db_exists", MagicMock(return_value=True)):
self._test_call(
mysql.db_remove, "DROP DATABASE `test``'\" db`;", "test`'\" db"
)
def test_db_tables(self):
"""
Test MySQL db_tables function in mysql exec module
"""
with patch.object(mysql, "db_exists", MagicMock(return_value=True)):
self._test_call(
mysql.db_tables, "SHOW TABLES IN `test``'\" db`", "test`'\" db"
)
def test_db_exists(self):
"""
Test MySQL db_exists function in mysql exec module
"""
self._test_call(
mysql.db_exists,
{
"sql": "SHOW DATABASES LIKE %(dbname)s;",
"sql_args": {"dbname": r"""test%_`" db"""},
},
'test%_`" db',
)
def test_db_create(self):
"""
Test MySQL db_create function in mysql exec module
"""
self._test_call(
mysql.db_create,
"CREATE DATABASE IF NOT EXISTS `test``'\" db`;",
"test`'\" db",
)
def test_user_list(self):
"""
Test MySQL user_list function in mysql exec module
"""
self._test_call(mysql.user_list, "SELECT User,Host FROM mysql.user")
def test_user_info(self):
"""
Test to see if the mysql execution module correctly forms the SQL for information on a MySQL user.
"""
self._test_call(
mysql.user_info,
{
"sql": "SELECT * FROM mysql.user WHERE User = %(user)s AND Host = %(host)s",
"sql_args": {"host": "localhost", "user": "mytestuser"},
},
"mytestuser",
)
def test_user_grants(self):
"""
Test to ensure the mysql user_grants function returns properly formed SQL for a basic query
"""
with patch.object(mysql, "user_exists", MagicMock(return_value=True)):
self._test_call(
mysql.user_grants,
{
"sql": "SHOW GRANTS FOR %(user)s@%(host)s",
"sql_args": {"host": "localhost", "user": "testuser"},
},
"testuser",
)
def test_grant_exists_true(self):
"""
Test to ensure that we can find a grant that exists
"""
mock_grants = [
"GRANT USAGE ON *.* TO 'testuser'@'%'",
"GRANT SELECT, INSERT, UPDATE ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT(column1,column2) ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT(column1,column2), INSERT(column1,column2) ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT(column1,column2), UPDATE ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT ON `testdb`.`testtabletwo` TO 'testuser'@'%'",
"GRANT SELECT ON `testdb`.`testtablethree` TO 'testuser'@'%'",
]
with patch.object(mysql, "version", return_value="5.6.41"):
mock = MagicMock(return_value=mock_grants)
with patch.object(
mysql, "user_grants", return_value=mock_grants
) as mock_user_grants:
ret = mysql.grant_exists(
"SELECT, INSERT, UPDATE", "testdb.testtableone", "testuser", "%"
)
self.assertEqual(ret, True)
def test_grant_exists_false(self):
"""
Test to ensure that we don't find a grant that doesn't exist
"""
mock_grants = [
"GRANT USAGE ON *.* TO 'testuser'@'%'",
"GRANT SELECT, INSERT, UPDATE ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT(column1,column2) ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT(column1,column2), UPDATE ON `testdb`.`testtableone` TO 'testuser'@'%'",
"GRANT SELECT ON `testdb`.`testtablethree` TO 'testuser'@'%'",
]
with patch.object(mysql, "version", return_value="5.6.41"):
mock = MagicMock(return_value=mock_grants)
with patch.object(
mysql, "user_grants", return_value=mock_grants
) as mock_user_grants:
ret = mysql.grant_exists(
"SELECT", "testdb.testtabletwo", "testuser", "%"
)
self.assertEqual(ret, False)
def test_grant_exists_all(self):
"""
Test to ensure that we can find a grant that exists
"""
mock_grants = [
"GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, SHUTDOWN, PROCESS, FILE, REFERENCES, INDEX, ALTER, SHOW DATABASES, SUPER, CREATE TEMPORARY TABLES, LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER, CREATE TABLESPACE, CREATE ROLE, DROP ROLE ON testdb.testtableone TO `testuser`@`%`",
"GRANT BACKUP_ADMIN,BINLOG_ADMIN,CONNECTION_ADMIN,ENCRYPTION_KEY_ADMIN,GROUP_REPLICATION_ADMIN,PERSIST_RO_VARIABLES_ADMIN,REPLICATION_SLAVE_ADMIN,RESOURCE_GROUP_ADMIN,RESOURCE_GROUP_USER,ROLE_ADMIN,SET_USER_ID,SYSTEM_VARIABLES_ADMIN,XA_RECOVER_ADMIN ON testdb.testtableone TO `testuser`@`%`",
]
with patch.object(mysql, "version", return_value="8.0.10"):
mock = MagicMock(return_value=mock_grants)
with patch.object(
mysql, "user_grants", return_value=mock_grants
) as mock_user_grants:
ret = mysql.grant_exists("ALL", "testdb.testtableone", "testuser", "%")
self.assertEqual(ret, True)
with patch.object(mysql, "version", return_value="8.0.10"):
mock = MagicMock(return_value=mock_grants)
with patch.object(
mysql, "user_grants", return_value=mock_grants
) as mock_user_grants:
ret = mysql.grant_exists(
"all privileges", "testdb.testtableone", "testuser", "%"
)
self.assertEqual(ret, True)
mock_grants = ["GRANT ALL PRIVILEGES ON testdb.testtableone TO `testuser`@`%`"]
with patch.object(mysql, "version", return_value="5.6.41"):
mock = MagicMock(return_value=mock_grants)
with patch.object(
mysql, "user_grants", return_value=mock_grants
) as mock_user_grants:
ret = mysql.grant_exists(
"ALL PRIVILEGES", "testdb.testtableone", "testuser", "%"
)
self.assertEqual(ret, True)
@skipIf(True, "TODO: Mock up user_grants()")
def test_grant_add(self):
"""
Test grant_add function in mysql exec module
"""
self._test_call(
mysql.grant_add,
"",
"SELECT,INSERT,UPDATE",
"database.*",
"frank",
"localhost",
)
@skipIf(True, "TODO: Mock up user_grants()")
def test_grant_revoke(self):
"""
Test grant revoke in mysql exec module
"""
self._test_call(
mysql.grant_revoke,
"",
"SELECT,INSERT,UPDATE",
"database.*",
"frank",
"localhost",
)
def test_processlist(self):
"""
Test processlist function in mysql exec module
"""
self._test_call(mysql.processlist, "SHOW FULL PROCESSLIST")
def test_get_master_status(self):
"""
Test get_master_status in the mysql execution module
"""
self._test_call(mysql.get_master_status, "SHOW MASTER STATUS")
def test_get_slave_status(self):
"""
Test get_slave_status in the mysql execution module
"""
self._test_call(mysql.get_slave_status, "SHOW SLAVE STATUS")
def test_get_slave_status_bad_server(self):
"""
Test get_slave_status in the mysql execution module, simulating a broken server
"""
connect_mock = MagicMock(return_value=None)
with patch.object(mysql, "_connect", connect_mock):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
rslt = mysql.get_slave_status()
connect_mock.assert_has_calls([call()])
self.assertEqual(rslt, [])
@skipIf(True, "MySQL module claims this function is not ready for production")
def test_free_slave(self):
pass
def test_query(self):
self._test_call(
mysql.query, "SELECT * FROM testdb", "testdb", "SELECT * FROM testdb"
)
def test_query_error(self):
connect_mock = MagicMock()
with patch.object(mysql, "_connect", connect_mock):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
# Use the OperationalError from the salt mysql module because that
# exception can come from either MySQLdb or pymysql
side_effect = mysql.OperationalError(9999, "Something Went Wrong")
with patch.object(
mysql, "_execute", MagicMock(side_effect=side_effect)
):
mysql.query("testdb", "SELECT * FROM testdb")
self.assertIn("mysql.error", mysql.__context__)
expected = "MySQL Error 9999: Something Went Wrong"
self.assertEqual(mysql.__context__["mysql.error"], expected)
def test_plugin_add(self):
"""
Test the adding/installing a MySQL / MariaDB plugin
"""
with patch.object(mysql, "plugin_status", MagicMock(return_value="")):
self._test_call(
mysql.plugin_add,
'INSTALL PLUGIN auth_socket SONAME "auth_socket.so"',
"auth_socket",
)
def test_plugin_remove(self):
"""
Test the removing/uninstalling a MySQL / MariaDB plugin
"""
with patch.object(mysql, "plugin_status", MagicMock(return_value="ACTIVE")):
self._test_call(
mysql.plugin_remove, "UNINSTALL PLUGIN auth_socket", "auth_socket",
)
def test_plugin_status(self):
"""
Test checking the status of a MySQL / MariaDB plugin
"""
self._test_call(
mysql.plugin_status,
{
"sql": "SELECT PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS WHERE PLUGIN_NAME = %(name)s",
"sql_args": {"name": "auth_socket"},
},
"auth_socket",
)
def test_sanitize_comment(self):
"""
Test comment sanitization
"""
input_data = """/*
multiline
comment
*/
CREATE TABLE test_update (a VARCHAR(25)); # end of line comment
# example comment
insert into test_update values ("some #hash value"); -- ending comment
insert into test_update values ("crazy -- not comment"); -- another ending comment
-- another comment type
"""
expected_response = """CREATE TABLE test_update (a VARCHAR(25));
insert into test_update values ("some #hash value");
insert into test_update values ("crazy -- not comment");
"""
output = mysql._sanitize_comments(input_data)
self.assertEqual(output, expected_response)
input_data = """-- --------------------------------------------------------
-- SQL Commands to set up the pmadb as described in the documentation.
--
-- This file is meant for use with MySQL 5 and above!
--
-- This script expects the user pma to already be existing. If we would put a
-- line here to create them too many users might just use this script and end
-- up with having the same password for the controluser.
--
-- This user "pma" must be defined in config.inc.php (controluser/controlpass)
--
-- Please don't forget to set up the tablenames in config.inc.php
--
-- --------------------------------------------------------
--
CREATE DATABASE IF NOT EXISTS `phpmyadmin`
DEFAULT CHARACTER SET utf8 COLLATE utf8_bin;
USE phpmyadmin;
"""
expected_response = """CREATE DATABASE IF NOT EXISTS `phpmyadmin`
DEFAULT CHARACTER SET utf8 COLLATE utf8_bin;
USE phpmyadmin;"""
output = mysql._sanitize_comments(input_data)
self.assertEqual(output, expected_response)
def _test_call(self, function, expected_sql, *args, **kwargs):
connect_mock = MagicMock()
with patch.object(mysql, "_connect", connect_mock):
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
function(*args, **kwargs)
if isinstance(expected_sql, dict):
calls = (
call()
.cursor()
.execute(
"{}".format(expected_sql["sql"]), expected_sql["sql_args"]
)
)
else:
calls = call().cursor().execute("{}".format(expected_sql))
connect_mock.assert_has_calls((calls,), True)
def test_file_query(self):
"""
Test file_query
"""
with patch.object(mysql, "HAS_SQLPARSE", False):
ret = mysql.file_query("database", "filename")
self.assertFalse(ret)
file_data = """-- --------------------------------------------------------
-- SQL Commands to set up the pmadb as described in the documentation.
--
-- This file is meant for use with MySQL 5 and above!
--
-- This script expects the user pma to already be existing. If we would put a
-- line here to create them too many users might just use this script and end
-- up with having the same password for the controluser.
--
-- This user "pma" must be defined in config.inc.php (controluser/controlpass)
--
-- Please don't forget to set up the tablenames in config.inc.php
--
-- --------------------------------------------------------
--
USE phpmyadmin;
--
-- Table structure for table `pma__bookmark`
--
CREATE TABLE IF NOT EXISTS `pma__bookmark` (
`id` int(10) unsigned NOT NULL auto_increment,
`dbase` varchar(255) NOT NULL default '',
`user` varchar(255) NOT NULL default '',
`label` varchar(255) COLLATE utf8_general_ci NOT NULL default '',
`query` text NOT NULL,
PRIMARY KEY (`id`)
)
COMMENT='Bookmarks'
DEFAULT CHARACTER SET utf8 COLLATE utf8_bin;
"""
side_effect = [
{"query time": {"human": "0.4ms", "raw": "0.00038"}, "rows affected": 0},
{"query time": {"human": "8.9ms", "raw": "0.00893"}, "rows affected": 0},
]
expected = {
"query time": {"human": "8.9ms", "raw": "0.00893"},
"rows affected": 0,
}
with patch("os.path.exists", MagicMock(return_value=True)):
with patch("salt.utils.files.fopen", mock_open(read_data=file_data)):
with patch.object(mysql, "query", side_effect=side_effect):
ret = mysql.file_query("database", "filename")
self.assertTrue(ret, expected)
@skipIf(
NO_PyMYSQL, "Install pymysql bindings before running test__connect_pymysql."
)
def test__connect_pymysql_exception(self):
"""
Test the _connect function in the MySQL module
"""
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
with patch(
"MySQLdb.connect",
side_effect=pymysql.err.InternalError(
1698, "Access denied for user 'root'@'localhost'"
),
):
ret = mysql._connect()
self.assertIn("mysql.error", mysql.__context__)
self.assertEqual(
mysql.__context__["mysql.error"],
"MySQL Error 1698: Access denied for user 'root'@'localhost'",
)
@skipIf(not NO_PyMYSQL, "With pymysql installed use test__connect_pymysql.")
def test__connect_mysqldb_exception(self):
"""
Test the _connect function in the MySQL module
"""
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
with patch(
"MySQLdb.connect",
side_effect=mysql.OperationalError(
1698, "Access denied for user 'root'@'localhost'"
),
):
ret = mysql._connect()
self.assertIn("mysql.error", mysql.__context__)
self.assertEqual(
mysql.__context__["mysql.error"],
"MySQL Error 1698: Access denied for user 'root'@'localhost'",
)
def test__connect_mysqldb(self):
"""
Test the _connect function in the MySQL module
"""
with patch.dict(mysql.__salt__, {"config.option": MagicMock()}):
with patch("MySQLdb.connect", return_value=MockMySQLConnect()):
ret = mysql._connect()
self.assertNotIn("mysql.error", mysql.__context__)