Update mysql_database state to work in test mode.

Fix str comparison for the charset and collate.
This commit is contained in:
Cesar Augusto Sanchez 2020-08-05 17:50:11 -04:00 committed by Daniel Wozniak
parent 6a2b243029
commit 8c3260270d
2 changed files with 235 additions and 25 deletions

View file

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Management of MySQL databases (schemas)
=======================================
@ -14,13 +13,14 @@ Databases can be set as either absent or present.
frank:
mysql_database.present
"""
from __future__ import absolute_import, print_function, unicode_literals
import logging
import sys
log = logging.getLogger(__name__)
# pylint: disable=undefined-variable
def __virtual__():
"""
@ -52,28 +52,57 @@ def present(name, character_set=None, collate=None, **connection_args):
"name": name,
"changes": {},
"result": True,
"comment": "Database {0} is already present".format(name),
"comment": "Database {} is already present".format(name),
}
# check if database exists
existing = __salt__["mysql.db_get"](name, **connection_args)
if existing:
alter = False
if character_set and character_set != existing.get("character_set"):
alter_charset = False
alter_collate = False
existing_charset = bytes(str(existing.get("character_set")).encode()).decode()
if character_set and character_set != existing_charset:
alter_charset = True
log.debug(
"character set differes from %s : %s",
character_set,
existing.get("character_set"),
"character set differes from %s : %s", character_set, existing_charset,
)
alter = True
if collate and collate != existing.get("collate"):
comment = ("Database character set {} != {} needs to be updated").format(
character_set, existing_charset
)
if __opts__.get("test", False):
ret["result"] = None
ret["comment"] = comment
else:
ret["comment"] = comment
existing_collate = bytes(str(existing.get("collate")).encode()).decode()
if collate and collate != existing_collate:
alter_collate = True
log.debug(
"collate set differs from %s : %s", collate, existing.get("collate")
"collate set differs from %s : %s", collate, existing_collate,
)
alter = True
if alter:
__salt__["mysql.alter_db"](
name, character_set=character_set, collate=collate, **connection_args
comment = ("Database collate {} != {} needs to be updated").format(
collate, existing_collate
)
if __opts__.get("test", False):
ret["result"] = None
ret["comment"] += "\n{}".format(comment)
return ret
else:
ret["comment"] += "\n{}".format(comment)
if alter_charset or alter_collate:
if __opts__.get("test", False):
ret["comment"] += "\nDatabase {} is going to be updated".format(name)
else:
__salt__["mysql.alter_db"](
name,
character_set=character_set,
collate=collate,
**connection_args
)
current = __salt__["mysql.db_get"](name, **connection_args)
if existing.get("collate", None) != current.get("collate", None):
ret["changes"].update(
@ -101,23 +130,24 @@ def present(name, character_set=None, collate=None, **connection_args):
ret["result"] = False
return ret
if __opts__["test"]:
if __opts__.get("test", False):
ret["result"] = None
ret["comment"] = ("Database {0} is not present and needs to be created").format(
ret["comment"] = ("Database {} is not present and needs to be created").format(
name
)
return ret
# The database is not present, make it!
if __salt__["mysql.db_create"](
name, character_set=character_set, collate=collate, **connection_args
):
ret["comment"] = "The database {0} has been created".format(name)
ret["comment"] = "The database {} has been created".format(name)
ret["changes"][name] = "Present"
else:
ret["comment"] = "Failed to create database {0}".format(name)
ret["comment"] = "Failed to create database {}".format(name)
err = _get_mysql_error()
if err is not None:
ret["comment"] += " ({0})".format(err)
ret["comment"] += " ({})".format(err)
ret["result"] = False
return ret
@ -134,20 +164,20 @@ def absent(name, **connection_args):
# check if db exists and remove it
if __salt__["mysql.db_exists"](name, **connection_args):
if __opts__["test"]:
if __opts__.get("test", False):
ret["result"] = None
ret["comment"] = "Database {0} is present and needs to be removed".format(
ret["comment"] = "Database {} is present and needs to be removed".format(
name
)
return ret
if __salt__["mysql.db_remove"](name, **connection_args):
ret["comment"] = "Database {0} has been removed".format(name)
ret["comment"] = "Database {} has been removed".format(name)
ret["changes"][name] = "Absent"
return ret
else:
err = _get_mysql_error()
if err is not None:
ret["comment"] = "Unable to remove database {0} " "({1})".format(
ret["comment"] = "Unable to remove database {} " "({})".format(
name, err
)
ret["result"] = False
@ -160,7 +190,7 @@ def absent(name, **connection_args):
return ret
# fallback
ret["comment"] = ("Database {0} is not present, so it cannot be removed").format(
ret["comment"] = ("Database {} is not present, so it cannot be removed").format(
name
)
return ret

View file

@ -0,0 +1,180 @@
"""
This test checks mysql_database salt state
"""
# Import Salt Libs
import salt.states.mysql_database as mysql_database
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import MagicMock, patch
from tests.support.unit import TestCase
class MysqlDatabaseTestCase(TestCase, LoaderModuleMockMixin):
"""
Test cases for salt.states.mysql_database
"""
def setup_loader_modules(self):
return {mysql_database: {}}
# 'present' function tests: 1
def test_present(self):
"""
Test to ensure that the named user is present with
the specified properties.
"""
dbname = "my_test"
charset = "utf8"
collate = "utf8_unicode_ci"
ret = {"name": dbname, "result": False, "comment": "", "changes": {}}
mock_result = {
"character_set": charset,
"collate": collate,
"name": dbname,
}
mock_result_charset_updated = {
"character_set": "ascii",
"collate": "utf8_unicode_ci",
"name": dbname,
}
mock_result_alter_db = {True}
mock = MagicMock(return_value=mock_result)
mock_a = MagicMock(return_value=mock_result_alter_db)
mock_b = MagicMock(return_value=mock_result_charset_updated)
mock_failed = MagicMock(return_value=False)
mock_err = MagicMock(return_value="salt")
mock_no_err = MagicMock(return_value=None)
mock_create = MagicMock(return_value=True)
mock_create_failed = MagicMock(return_value=False)
with patch.dict(
mysql_database.__salt__, {"mysql.db_get": mock, "mysql.alter_db": mock_a}
):
mod_charset = "ascii"
mod_collate = "ascii_general_ci"
with patch.dict(mysql_database.__opts__, {"test": True}):
comt = [
"Database character set {} != {} needs to be updated".format(
mod_charset, charset
),
"Database {} is going to be updated".format(dbname),
]
ret.update({"comment": "\n".join(comt)})
ret.update({"result": None})
self.assertDictEqual(
mysql_database.present(dbname, character_set=mod_charset), ret
)
with patch.dict(mysql_database.__opts__, {"test": True}):
comt = [
"Database {} is already present".format(dbname),
"Database collate {} != {} needs to be updated".format(
mod_collate, collate
),
]
ret.update({"comment": "\n".join(comt)})
ret.update({"result": None})
self.assertDictEqual(
mysql_database.present(
dbname, character_set=charset, collate=mod_collate
),
ret,
)
with patch.dict(mysql_database.__opts__, {}):
comt = [
"Database character set {} != {} needs to be updated".format(
mod_charset, charset
),
"Database collate {} != {} needs to be updated".format(
mod_collate, collate
),
]
ret.update({"comment": "\n".join(comt)})
ret.update({"result": True})
self.assertDictEqual(
mysql_database.present(
dbname, character_set=mod_charset, collate=mod_collate
),
ret,
)
with patch.dict(mysql_database.__opts__, {"test": False}):
comt = "Database {} is already present".format(dbname)
ret.update({"comment": comt})
ret.update({"result": True})
self.assertDictEqual(
mysql_database.present(
dbname, character_set=charset, collate=collate
),
ret,
)
with patch.dict(mysql_database.__salt__, {"mysql.db_get": mock_failed}):
with patch.dict(mysql_database.__salt__, {"mysql.db_create": mock_create}):
with patch.object(mysql_database, "_get_mysql_error", mock_err):
ret.update({"comment": "salt", "result": False})
self.assertDictEqual(mysql_database.present(dbname), ret)
with patch.object(mysql_database, "_get_mysql_error", mock_no_err):
comt = "The database {} has been created".format(dbname)
ret.update({"comment": comt, "result": True})
ret.update({"changes": {dbname: "Present"}})
self.assertDictEqual(mysql_database.present(dbname), ret)
with patch.dict(
mysql_database.__salt__, {"mysql.db_create": mock_create_failed}
):
ret["comment"] = ""
with patch.object(mysql_database, "_get_mysql_error", mock_no_err):
ret.update({"changes": {}})
comt = "Failed to create database {}".format(dbname)
ret.update({"comment": comt, "result": False})
self.assertDictEqual(mysql_database.present(dbname), ret)
# 'absent' function tests: 1
def test_absent(self):
"""
Test to ensure that the named user is absent.
"""
dbname = "my_test"
ret = {"name": dbname, "result": True, "comment": "", "changes": {}}
mock_db_exists = MagicMock(return_value=True)
mock_db_no_exists = MagicMock(return_value=False)
mock_remove = MagicMock(return_value=True)
mock_remove_fail = MagicMock(return_value=False)
mock_err = MagicMock(return_value="salt")
mock_no_err = MagicMock(return_value=None)
with patch.dict(
mysql_database.__salt__,
{"mysql.db_exists": mock_db_exists, "mysql.db_remove": mock_remove},
):
with patch.dict(mysql_database.__opts__, {"test": True}):
comt = "Database {} is present and needs to be removed".format(dbname)
ret.update({"comment": comt, "result": None})
self.assertDictEqual(mysql_database.absent(dbname), ret)
with patch.dict(mysql_database.__opts__, {}):
comt = "Database {} has been removed".format(dbname)
ret.update({"comment": comt, "result": True})
ret.update({"changes": {dbname: "Absent"}})
self.assertDictEqual(mysql_database.absent(dbname), ret)
with patch.dict(
mysql_database.__salt__,
{"mysql.db_exists": mock_db_exists, "mysql.db_remove": mock_remove_fail},
):
with patch.dict(mysql_database.__opts__, {}):
with patch.object(mysql_database, "_get_mysql_error", mock_err):
ret["changes"] = {}
comt = "Unable to remove database {} ({})".format(dbname, "salt")
ret.update({"comment": comt, "result": False})
self.assertDictEqual(mysql_database.absent(dbname), ret)