Add SCRAM-SHA-256 support to postgres states (#59034)

* Tests for existing postgres_user states. Basic usage and password-related.

* Fix minor failing tests in postgres_user.present

* Make postgres_group consistent with postgres_user

Related: #59028

* Add support for SCRAM-SHA-256 for PostgreSQL.

Fixes: #51217

* Some additional old postgres tests that are now redundant.

* Add postgres changelog entries.

* Update documentation for 3003 release

* Convert DB_ARGS constant to fixture

* Simplify six auto-removal

Co-authored-by: Pedro Algarvio <pedro@algarvio.me>

* filename_map.yml should still list pytests

* Another six removal simplification

Co-authored-by: Pedro Algarvio <pedro@algarvio.me>

Co-authored-by: Sage the Rage <36676171+sagetherage@users.noreply.github.com>
Co-authored-by: Gareth J. Greenaway <gareth@wiked.org>
Co-authored-by: Pedro Algarvio <pedro@algarvio.me>
This commit is contained in:
James Howe 2021-03-01 20:39:48 +00:00 committed by GitHub
parent 71d274c2d0
commit 1e219c9074
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1586 additions and 648 deletions

2
changelog/51271.added Normal file
View file

@ -0,0 +1,2 @@
SCRAM-SHA-256 support for PostgreSQL passwords.
Pass encrypted=scram-sha-256 to the postgres_user.present (or postgres_group.present) state.

3
changelog/59034.fixed Normal file
View file

@ -0,0 +1,3 @@
Correct comment when updating postrges users and groups.
Errors reported when removing postgres groups.
Partial group membership changes in postgres groups.

View file

@ -6,6 +6,18 @@ Salt 3003 Release Notes - Codename Aluminium
Salt 3003 is an *unreleased* upcoming feature release.
New Features
============
SCRAM-SHA-256 support for PostgreSQL passwords
----------------------------------------------
Support for SCRAM-SHA-256 password hashes has been added to the
:py:func:`postgres_user.present <salt.states.postgres_user.present>`
and :py:func:`postgres_group.present <salt.states.postgres_group.present>`
states. This allows migration away from the insecure and deprecated
previous storage methods.
Execution Module Changes
========================

109
salt/ext/saslprep.py Normal file
View file

@ -0,0 +1,109 @@
# Copyright 2016-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# https://github.com/mongodb/mongo-python-driver/blob/3.11.1/pymongo/saslprep.py
"""An implementation of RFC4013 SASLprep."""
try:
import stringprep
except ImportError:
HAVE_STRINGPREP = False
def saslprep(data):
"""SASLprep dummy"""
if isinstance(data, str):
raise TypeError(
"The stringprep module is not available. Usernames and "
"passwords must be ASCII strings.")
return data
else:
HAVE_STRINGPREP = True
import unicodedata
# RFC4013 section 2.3 prohibited output.
_PROHIBITED = (
# A strict reading of RFC 4013 requires table c12 here, but
# characters from it are mapped to SPACE in the Map step. Can
# normalization reintroduce them somehow?
stringprep.in_table_c12,
stringprep.in_table_c21_c22,
stringprep.in_table_c3,
stringprep.in_table_c4,
stringprep.in_table_c5,
stringprep.in_table_c6,
stringprep.in_table_c7,
stringprep.in_table_c8,
stringprep.in_table_c9)
def saslprep(data, prohibit_unassigned_code_points=True):
"""An implementation of RFC4013 SASLprep.
:Parameters:
- `data`: The string to SASLprep. Unicode strings
(python 2.x unicode, 3.x str) are supported. Byte strings
(python 2.x str, 3.x bytes) are ignored.
- `prohibit_unassigned_code_points`: True / False. RFC 3454
and RFCs for various SASL mechanisms distinguish between
`queries` (unassigned code points allowed) and
`stored strings` (unassigned code points prohibited). Defaults
to ``True`` (unassigned code points are prohibited).
:Returns:
The SASLprep'ed version of `data`.
"""
if not isinstance(data, str):
return data
if prohibit_unassigned_code_points:
prohibited = _PROHIBITED + (stringprep.in_table_a1,)
else:
prohibited = _PROHIBITED
# RFC3454 section 2, step 1 - Map
# RFC4013 section 2.1 mappings
# Map Non-ASCII space characters to SPACE (U+0020). Map
# commonly mapped to nothing characters to, well, nothing.
in_table_c12 = stringprep.in_table_c12
in_table_b1 = stringprep.in_table_b1
data = "".join(
["\u0020" if in_table_c12(elt) else elt
for elt in data if not in_table_b1(elt)])
# RFC3454 section 2, step 2 - Normalize
# RFC4013 section 2.2 normalization
data = unicodedata.ucd_3_2_0.normalize('NFKC', data)
in_table_d1 = stringprep.in_table_d1
if in_table_d1(data[0]):
if not in_table_d1(data[-1]):
# RFC3454, Section 6, #3. If a string contains any
# RandALCat character, the first and last characters
# MUST be RandALCat characters.
raise ValueError("SASLprep: failed bidirectional check")
# RFC3454, Section 6, #2. If a string contains any RandALCat
# character, it MUST NOT contain any LCat character.
prohibited = prohibited + (stringprep.in_table_d2,)
else:
# RFC3454, Section 6, #3. Following the logic of #3, if
# the first character is not a RandALCat, no other character
# can be either.
prohibited = prohibited + (in_table_d1,)
# RFC3454 section 2, step 3 and 4 - Prohibit and check bidi
for char in data:
if any(in_table(char) for in_table in prohibited):
raise ValueError(
"SASLprep: failed prohibited character check")
return data

View file

@ -31,8 +31,11 @@ Module to provide Postgres compatibility to salt.
# pylint: disable=E8203
import base64
import datetime
import hashlib
import hmac
import io
import logging
import os
import pipes
@ -45,8 +48,7 @@ import salt.utils.odict
import salt.utils.path
import salt.utils.stringutils
from salt.exceptions import CommandExecutionError, SaltInvocationError
from salt.ext.six.moves import zip # pylint: disable=import-error,redefined-builtin
from salt.ext.six.moves import StringIO
from salt.ext.saslprep import saslprep
from salt.utils.versions import LooseVersion as _LooseVersion
try:
@ -56,11 +58,16 @@ try:
except ImportError:
HAS_CSV = False
try:
from secrets import token_bytes
except ImportError:
# python <3.6
from os import urandom as token_bytes
log = logging.getLogger(__name__)
_DEFAULT_PASSWORDS_ENCRYPTION = True
_DEFAULT_PASSWORDS_ENCRYPTION = "md5"
_EXTENSION_NOT_INSTALLED = "EXTENSION NOT INSTALLED"
_EXTENSION_INSTALLED = "EXTENSION INSTALLED"
_EXTENSION_TO_UPGRADE = "EXTENSION TO UPGRADE"
@ -272,9 +279,7 @@ def version(
salt '*' postgres.version
"""
query = (
"SELECT setting FROM pg_catalog.pg_settings " "WHERE name = 'server_version'"
)
query = "SELECT setting FROM pg_catalog.pg_settings WHERE name = 'server_version'"
cmd = _psql_cmd(
"-c",
query,
@ -459,7 +464,7 @@ def psql_query(
if cmdret["retcode"] > 0:
return ret
csv_file = StringIO(cmdret["stdout"])
csv_file = io.StringIO(cmdret["stdout"])
header = {}
for row in csv.reader(
csv_file,
@ -1151,20 +1156,89 @@ def _add_role_flag(string, test, flag, cond=None, prefix="NO", addtxt="", skip=F
def _maybe_encrypt_password(role, password, encrypted=_DEFAULT_PASSWORDS_ENCRYPTION):
"""
pgsql passwords are md5 hashes of the string: 'md5{password}{rolename}'
"""
if password is not None:
password = str(password)
if encrypted and password and not password.startswith("md5"):
password = "md5{}".format(
hashlib.md5(
salt.utils.stringutils.to_bytes("{}{}".format(password, role))
).hexdigest()
)
else:
return None
if encrypted is True:
encrypted = "md5"
if encrypted not in (False, "md5", "scram-sha-256"):
raise ValueError("Unknown password algorithm: " + str(encrypted))
if encrypted == "scram-sha-256" and not password.startswith("SCRAM-SHA-256"):
password = _scram_sha_256(password)
elif encrypted == "md5" and not password.startswith("md5"):
log.warning("The md5 password algorithm was deprecated in PostgreSQL 10")
password = _md5_password(role, password)
elif encrypted is False:
log.warning("Unencrypted passwords were removed in PostgreSQL 10")
return password
def _verify_password(role, password, verifier, method):
"""
Test the given password against the verifier.
The given password may already be a verifier, in which case test for
simple equality.
"""
if method == "md5" or method is True:
if password.startswith("md5"):
expected = password
else:
expected = _md5_password(role, password)
elif method == "scram-sha-256":
if password.startswith("SCRAM-SHA-256"):
expected = password
else:
match = re.match(r"^SCRAM-SHA-256\$(\d+):([^\$]+?)\$", verifier)
if match:
iterations = int(match.group(1))
salt_bytes = base64.b64decode(match.group(2))
expected = _scram_sha_256(
password, salt_bytes=salt_bytes, iterations=iterations
)
else:
expected = object()
elif method is False:
expected = password
else:
expected = object()
return verifier == expected
def _md5_password(role, password):
return "md5{}".format(
hashlib.md5(
salt.utils.stringutils.to_bytes("{}{}".format(password, role))
).hexdigest()
)
def _scram_sha_256(password, salt_bytes=None, iterations=4096):
"""
Build a SCRAM-SHA-256 password verifier.
Ported from https://doxygen.postgresql.org/scram-common_8c.html
"""
if salt_bytes is None:
salt_bytes = token_bytes(16)
password = salt.utils.stringutils.to_bytes(saslprep(password))
salted_password = hashlib.pbkdf2_hmac("sha256", password, salt_bytes, iterations)
stored_key = hmac.new(salted_password, b"Client Key", "sha256").digest()
stored_key = hashlib.sha256(stored_key).digest()
server_key = hmac.new(salted_password, b"Server Key", "sha256").digest()
return "SCRAM-SHA-256${}:{}${}:{}".format(
iterations,
base64.b64encode(salt_bytes).decode("ascii"),
base64.b64encode(stored_key).decode("ascii"),
base64.b64encode(server_key).decode("ascii"),
)
def _role_cmd_args(
name,
sub_cmd="",
@ -1190,7 +1264,7 @@ def _role_cmd_args(
login = True
if typ_ == "group":
login = False
# defaults to encrypted passwords (md5{password}{rolename})
# defaults to encrypted passwords
if encrypted is None:
encrypted = _DEFAULT_PASSWORDS_ENCRYPTION
skip_passwd = False
@ -1232,7 +1306,7 @@ def _role_cmd_args(
"flag": "ENCRYPTED",
"test": (encrypted is not None and bool(rolepassword)),
"skip": skip_passwd or isinstance(rolepassword, bool),
"cond": encrypted,
"cond": bool(encrypted),
"prefix": "UN",
},
{

View file

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Management of PostgreSQL groups (roles)
=======================================
@ -10,17 +9,12 @@ The postgres_group module is used to create and manage Postgres groups.
frank:
postgres_group.present
"""
from __future__ import absolute_import, print_function, unicode_literals
# Import salt libs
import logging
# Salt imports
from salt.modules import postgres
# Import Python libs
log = logging.getLogger(__name__)
@ -71,7 +65,19 @@ def present(
Is the group allowed to create other roles/users
encrypted
Should the password be encrypted in the system catalog?
How the password should be stored.
If encrypted is ``None``, ``True``, or ``md5``, it will use
PostgreSQL's MD5 algorithm.
If encrypted is ``False``, it will be stored in plaintext.
If encrypted is ``scram-sha-256``, it will use the algorithm described
in RFC 7677.
.. versionchanged:: 3003
Prior versions only supported ``True`` and ``False``
login
Should the group have login perm
@ -86,13 +92,15 @@ def present(
Should the new group be allowed to initiate streaming replication
password
The group's password
It can be either a plain string or a md5 postgresql hashed password::
The group's password.
It can be either a plain string or a pre-hashed password::
'md5{MD5OF({password}{role}}'
'SCRAM-SHA-256${iterations}:{salt}${stored_key}:{server_key}'
If encrypted is ``None`` or ``True``, the password will be automatically
encrypted to the previous format if it is not already done.
If encrypted is not ``False``, then the password will be converted
to the appropriate format above, if not already. As a consequence,
passwords that start with "md5" or "SCRAM-SHA-256" cannot be used.
refresh_password
Password refresh flag
@ -130,14 +138,13 @@ def present(
"name": name,
"changes": {},
"result": True,
"comment": "Group {0} is already present".format(name),
"comment": "Group {} is already present".format(name),
}
# default to encrypted passwords
if encrypted is not False:
if encrypted is None:
encrypted = postgres._DEFAULT_PASSWORDS_ENCRYPTION
# maybe encrypt if it's not already and necessary
password = postgres._maybe_encrypt_password(name, password, encrypted=encrypted)
db_args = {
"maintenance_db": maintenance_db,
"runas": user,
@ -155,10 +162,26 @@ def present(
if group_attr is not None:
mode = "update"
if password is not None:
if (
mode == "update"
and not refresh_password
and postgres._verify_password(
name, password, group_attr["password"], encrypted
)
):
# if password already matches then don't touch it
password = None
else:
# encrypt password if necessary
password = postgres._maybe_encrypt_password(
name, password, encrypted=encrypted
)
# The user is not present, make it!
cret = None
update = {}
if mode == "update":
role_groups = group_attr.get("groups", [])
if createdb is not None and group_attr["can create databases"] != createdb:
update["createdb"] = createdb
if inherit is not None and group_attr["inherits privileges"] != inherit:
@ -171,18 +194,25 @@ def present(
update["replication"] = replication
if superuser is not None and group_attr["superuser"] != superuser:
update["superuser"] = superuser
if password is not None and (
refresh_password or group_attr["password"] != password
):
if password is not None:
update["password"] = True
if groups is not None:
lgroups = groups
if isinstance(groups, str):
lgroups = lgroups.split(",")
if isinstance(lgroups, list):
missing_groups = [a for a in lgroups if a not in role_groups]
if missing_groups:
update["groups"] = missing_groups
if mode == "create" or (mode == "update" and update):
if __opts__["test"]:
if update:
ret["changes"][name] = update
ret["result"] = None
ret["comment"] = "Group {0} is set to be {1}d".format(name, mode)
ret["comment"] = "Group {} is set to be {}d".format(name, mode)
return ret
cret = __salt__["postgres.group_{0}".format(mode)](
cret = __salt__["postgres.group_{}".format(mode)](
groupname=name,
createdb=createdb,
createroles=createroles,
@ -197,12 +227,15 @@ def present(
)
else:
cret = None
if cret:
ret["comment"] = "The group {0} has been {1}d".format(name, mode)
ret["comment"] = "The group {} has been {}d".format(name, mode)
if update:
ret["changes"][name] = update
else:
ret["changes"][name] = "Present"
elif cret is not None:
ret["comment"] = "Failed to create group {0}".format(name)
ret["comment"] = "Failed to {} group {}".format(mode, name)
ret["result"] = False
else:
ret["result"] = True
@ -256,15 +289,17 @@ def absent(
if __salt__["postgres.user_exists"](name, **db_args):
if __opts__["test"]:
ret["result"] = None
ret["comment"] = "Group {0} is set to be removed".format(name)
ret["comment"] = "Group {} is set to be removed".format(name)
return ret
if __salt__["postgres.group_remove"](name, **db_args):
ret["comment"] = "Group {0} has been removed".format(name)
ret["comment"] = "Group {} has been removed".format(name)
ret["changes"][name] = "Absent"
return ret
else:
ret["result"] = False
ret["comment"] = "Group {} failed to be removed".format(name)
return ret
else:
ret["comment"] = "Group {0} is not present, so it cannot " "be removed".format(
name
)
ret["comment"] = "Group {} is not present, so it cannot be removed".format(name)
return ret

View file

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Management of PostgreSQL users (roles)
======================================
@ -10,20 +9,13 @@ The postgres_users module is used to create and manage Postgres users.
frank:
postgres_user.present
"""
from __future__ import absolute_import, print_function, unicode_literals
# Import Python libs
import datetime
import logging
from salt.ext import six
# Salt imports
from salt.modules import postgres
# Import salt libs
log = logging.getLogger(__name__)
@ -63,7 +55,7 @@ def present(
"""
Ensure that the named user is present with the specified privileges
Please note that the user/group notion in postgresql is just abstract, we
have roles, where users can be seens as roles with the LOGIN privilege
have roles, where users can be seen as roles with the LOGIN privilege
and groups the others.
name
@ -76,7 +68,19 @@ def present(
Is the user allowed to create other users?
encrypted
Should the password be encrypted in the system catalog?
How the password should be stored.
If encrypted is ``None``, ``True``, or ``md5``, it will use
PostgreSQL's MD5 algorithm.
If encrypted is ``False``, it will be stored in plaintext.
If encrypted is ``scram-sha-256``, it will use the algorithm described
in RFC 7677.
.. versionchanged:: 3003
Prior versions only supported ``True`` and ``False``
login
Should the group have login perm
@ -91,14 +95,15 @@ def present(
Should the new user be allowed to initiate streaming replication
password
The system user's password. It can be either a plain string or a
md5 postgresql hashed password::
The user's password.
It can be either a plain string or a pre-hashed password::
'md5{MD5OF({password}{role}}'
'SCRAM-SHA-256${iterations}:{salt}${stored_key}:{server_key}'
If encrypted is None or True, the password will be automatically
encrypted to the previous
format if it is not already done.
If encrypted is not ``False``, then the password will be converted
to the appropriate format above, if not already. As a consequence,
passwords that start with "md5" or "SCRAM-SHA-256" cannot be used.
default_password
The password used only when creating the user, unless password is set.
@ -144,20 +149,9 @@ def present(
"name": name,
"changes": {},
"result": True,
"comment": "User {0} is already present".format(name),
"comment": "User {} is already present".format(name),
}
# default to encrypted passwords
if encrypted is not False:
encrypted = postgres._DEFAULT_PASSWORDS_ENCRYPTION
# maybe encrypt if it's not already and necessary
password = postgres._maybe_encrypt_password(name, password, encrypted=encrypted)
if default_password is not None:
default_password = postgres._maybe_encrypt_password(
name, default_password, encrypted=encrypted
)
db_args = {
"maintenance_db": maintenance_db,
"runas": user,
@ -167,6 +161,10 @@ def present(
"password": db_password,
}
# default to encrypted passwords
if encrypted is None:
encrypted = postgres._DEFAULT_PASSWORDS_ENCRYPTION
# check if user exists
mode = "create"
user_attr = __salt__["postgres.role_get"](
@ -175,7 +173,25 @@ def present(
if user_attr is not None:
mode = "update"
cret = None
if mode == "create" and password is None:
password = default_password
if password is not None:
if (
mode == "update"
and not refresh_password
and postgres._verify_password(
name, password, user_attr["password"], encrypted
)
):
# if password already matches then don't touch it
password = None
else:
# encrypt password if necessary
password = postgres._maybe_encrypt_password(
name, password, encrypted=encrypted
)
update = {}
if mode == "update":
user_groups = user_attr.get("groups", [])
@ -191,13 +207,11 @@ def present(
update["replication"] = replication
if superuser is not None and user_attr["superuser"] != superuser:
update["superuser"] = superuser
if password is not None and (
refresh_password or user_attr["password"] != password
):
if password is not None:
update["password"] = True
if valid_until is not None:
valid_until_dt = __salt__["postgres.psql_query"](
"SELECT '{0}'::timestamp(0) as dt;".format(
"SELECT '{}'::timestamp(0) as dt;".format(
valid_until.replace("'", "''")
),
**db_args
@ -212,24 +226,21 @@ def present(
update["valid_until"] = valid_until
if groups is not None:
lgroups = groups
if isinstance(groups, (six.string_types, six.text_type)):
if isinstance(groups, str):
lgroups = lgroups.split(",")
if isinstance(lgroups, list):
missing_groups = [a for a in lgroups if a not in user_groups]
if missing_groups:
update["groups"] = missing_groups
if mode == "create" and password is None:
password = default_password
if mode == "create" or (mode == "update" and update):
if __opts__["test"]:
if update:
ret["changes"][name] = update
ret["result"] = None
ret["comment"] = "User {0} is set to be {1}d".format(name, mode)
ret["comment"] = "User {} is set to be {}d".format(name, mode)
return ret
cret = __salt__["postgres.user_{0}".format(mode)](
cret = __salt__["postgres.user_{}".format(mode)](
username=name,
createdb=createdb,
createroles=createroles,
@ -247,13 +258,13 @@ def present(
cret = None
if cret:
ret["comment"] = "The user {0} has been {1}d".format(name, mode)
ret["comment"] = "The user {} has been {}d".format(name, mode)
if update:
ret["changes"][name] = update
else:
ret["changes"][name] = "Present"
elif cret is not None:
ret["comment"] = "Failed to create user {0}".format(name)
ret["comment"] = "Failed to {} user {}".format(mode, name)
ret["result"] = False
else:
ret["result"] = True
@ -307,19 +318,17 @@ def absent(
if __salt__["postgres.user_exists"](name, **db_args):
if __opts__["test"]:
ret["result"] = None
ret["comment"] = "User {0} is set to be removed".format(name)
ret["comment"] = "User {} is set to be removed".format(name)
return ret
if __salt__["postgres.user_remove"](name, **db_args):
ret["comment"] = "User {0} has been removed".format(name)
ret["comment"] = "User {} has been removed".format(name)
ret["changes"][name] = "Absent"
return ret
else:
ret["result"] = False
ret["comment"] = "User {0} failed to be removed".format(name)
ret["comment"] = "User {} failed to be removed".format(name)
return ret
else:
ret["comment"] = "User {0} is not present, so it cannot " "be removed".format(
name
)
ret["comment"] = "User {} is not present, so it cannot be removed".format(name)
return ret

View file

@ -69,12 +69,12 @@ salt/(states|modules)/.*postgres.py:
- unit.states.test_postgres_cluster
- unit.states.test_postgres_database
- unit.states.test_postgres_extension
- unit.states.test_postgres_group
- pytests.unit.states.test_postgres_group
- unit.states.test_postgres_initdb
- unit.states.test_postgres_language
- unit.states.test_postgres_privileges
- unit.states.test_postgres_schema
- unit.states.test_postgres_user
- pytests.unit.states.test_postgres_user
salt/modules/rabbitmq.py:
- pytests.unit.states.test_rabbitmq_cluster

View file

@ -0,0 +1,50 @@
import pytest
import salt.modules.postgres as postgres
# 'md5' + md5('password' + 'username')
md5_pw = "md55a231fcdb710d73268c4f44283487ba2"
scram_pw = (
"SCRAM-SHA-256$4096:wLr5nqC+3F+r7FdQPnB+nA==$"
"0hn08ZdX8kirGaL4TM0j13digH9Wl365OOzCtAuF2pE=:"
"LzAh/MGUdjYkdbDzcOKpfGwa3WwPUsyGcY+TEnSpcto="
)
def idfn(val):
if val == md5_pw:
return "md5_pw"
if val == scram_pw:
return "scram_pw"
@pytest.mark.parametrize(
"role,password,verifier,method,result",
[
("username", "password", md5_pw, "md5", True),
("another", "password", md5_pw, "md5", False),
("username", "another", md5_pw, "md5", False),
("username", md5_pw, md5_pw, "md5", True),
("username", "md5another", md5_pw, "md5", False),
("username", "password", md5_pw, True, True),
("another", "password", md5_pw, True, False),
("username", "another", md5_pw, True, False),
("username", md5_pw, md5_pw, True, True),
("username", "md5another", md5_pw, True, False),
(None, "password", scram_pw, "scram-sha-256", True),
(None, "another", scram_pw, "scram-sha-256", False),
(None, scram_pw, scram_pw, "scram-sha-256", True),
(None, "SCRAM-SHA-256$4096:AAAA$AAAA:AAAA", scram_pw, "scram-sha-256", False),
(None, "SCRAM-SHA-256$foo", scram_pw, "scram-sha-256", False),
(None, "password", "password", False, True),
(None, "another", "password", False, False),
(None, "password", "password", "foo", False),
("username", "password", md5_pw, "scram-sha-256", False),
("username", "password", scram_pw, "md5", False),
# Code does not currently check role of pre-hashed md5 passwords
pytest.param("another", md5_pw, md5_pw, "md5", False, marks=pytest.mark.xfail),
],
ids=idfn,
)
def test_verify_password(role, password, verifier, method, result):
assert postgres._verify_password(role, password, verifier, method) == result

View file

@ -0,0 +1,449 @@
import pytest
import salt.modules.postgres as postgres
import salt.states.postgres_group as postgres_group
from tests.support.mock import create_autospec, patch
@pytest.fixture(name="db_args")
def fixture_db_args():
return {
"runas": None,
"host": None,
"port": None,
"maintenance_db": None,
"user": None,
"password": None,
}
@pytest.fixture(name="md5_pw")
def fixture_md5_pw():
# 'md5' + md5('password' + 'groupname')
return "md58b14c378fab8ef0dc227f4e6d6787a87"
@pytest.fixture(name="existing_group")
def fixture_existing_group():
return {
"superuser": False,
"inherits privileges": True,
"can create roles": False,
"can create databases": False,
"can update system catalogs": None,
"can login": False,
"replication": False,
"connections": None,
"expiry time": None,
"defaults variables": "",
"password": "",
"groups": [],
}
@pytest.fixture(name="test_mode")
def fixture_test_mode():
with patch.dict(postgres_group.__opts__, {"test": True}):
yield
@pytest.fixture(name="mocks")
def fixture_mocks():
return {
"postgres.role_get": create_autospec(postgres.role_get, return_value=None),
"postgres.user_exists": create_autospec(
postgres.user_exists, return_value=False
),
"postgres.group_create": create_autospec(
postgres.group_create, return_value=True
),
"postgres.group_update": create_autospec(
postgres.group_update, return_value=True
),
"postgres.group_remove": create_autospec(
postgres.group_remove, return_value=True
),
}
@pytest.fixture(autouse=True)
def setup_loader(mocks):
setup_loader_modules = {
postgres_group: {"__opts__": {"test": False}, "__salt__": mocks},
postgres: {"__opts__": {"test": False}},
}
with pytest.helpers.loader_mock(setup_loader_modules) as loader_mock:
yield loader_mock
# ==========
# postgres_group.present
# ==========
def test_present_create_basic(mocks, db_args):
assert postgres_group.present("groupname") == {
"name": "groupname",
"result": True,
"changes": {"groupname": "Present"},
"comment": "The group groupname has been created",
}
mocks["postgres.role_get"].assert_called_once_with(
"groupname", return_password=True, **db_args
)
mocks["postgres.group_create"].assert_called_once_with(
groupname="groupname",
createdb=None,
createroles=None,
encrypted="md5",
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=None,
groups=None,
**db_args
)
mocks["postgres.group_update"].assert_not_called()
@pytest.mark.usefixtures("test_mode")
def test_present_create_basic_test(mocks, db_args):
assert postgres_group.present("groupname") == {
"name": "groupname",
"result": None,
"changes": {},
"comment": "Group groupname is set to be created",
}
mocks["postgres.role_get"].assert_called_once_with(
"groupname", return_password=True, **db_args
)
mocks["postgres.group_create"].assert_not_called()
mocks["postgres.group_update"].assert_not_called()
def test_present_exists_basic(mocks, existing_group, db_args):
mocks["postgres.role_get"].return_value = existing_group
assert postgres_group.present("groupname") == {
"name": "groupname",
"result": True,
"changes": {},
"comment": "Group groupname is already present",
}
mocks["postgres.role_get"].assert_called_once_with(
"groupname", return_password=True, **db_args
)
mocks["postgres.group_create"].assert_not_called()
mocks["postgres.group_update"].assert_not_called()
def test_present_create_basic_error(mocks, db_args):
mocks["postgres.group_create"].return_value = False
assert postgres_group.present("groupname") == {
"name": "groupname",
"result": False,
"changes": {},
"comment": "Failed to create group groupname",
}
mocks["postgres.role_get"].assert_called_once_with(
"groupname", return_password=True, **db_args
)
mocks["postgres.group_create"].assert_called_once()
mocks["postgres.group_update"].assert_not_called()
def test_present_change_option(mocks, existing_group, db_args):
mocks["postgres.role_get"].return_value = existing_group
assert postgres_group.present("groupname", replication=True) == {
"name": "groupname",
"result": True,
"changes": {"groupname": {"replication": True}},
"comment": "The group groupname has been updated",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.group_create"].assert_not_called()
mocks["postgres.group_update"].assert_called_once_with(
groupname="groupname",
createdb=None,
createroles=None,
encrypted="md5",
superuser=None,
login=None,
inherit=None,
replication=True,
rolepassword=None,
groups=None,
**db_args
)
def test_present_create_md5_password(mocks, md5_pw, db_args):
assert postgres_group.present("groupname", password="password", encrypted=True) == {
"name": "groupname",
"result": True,
"changes": {"groupname": "Present"},
"comment": "The group groupname has been created",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.group_create"].assert_called_once_with(
groupname="groupname",
createdb=None,
createroles=None,
encrypted=True,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
groups=None,
**db_args
)
mocks["postgres.group_update"].assert_not_called()
def test_present_create_plain_password(mocks, db_args):
assert postgres_group.present(
"groupname", password="password", encrypted=False
) == {
"name": "groupname",
"result": True,
"changes": {"groupname": "Present"},
"comment": "The group groupname has been created",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.group_create"].assert_called_once_with(
groupname="groupname",
createdb=None,
createroles=None,
encrypted=False,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword="password",
groups=None,
**db_args
)
mocks["postgres.group_update"].assert_not_called()
def test_present_create_md5_password_default_plain(mocks, monkeypatch, md5_pw, db_args):
monkeypatch.setattr(postgres, "_DEFAULT_PASSWORDS_ENCRYPTION", False)
test_present_create_md5_password(mocks, md5_pw, db_args)
def test_present_create_md5_password_default_encrypted(
mocks, monkeypatch, md5_pw, db_args
):
monkeypatch.setattr(postgres, "_DEFAULT_PASSWORDS_ENCRYPTION", True)
assert postgres_group.present("groupname", password="password") == {
"name": "groupname",
"result": True,
"changes": {"groupname": "Present"},
"comment": "The group groupname has been created",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.group_create"].assert_called_once_with(
groupname="groupname",
createdb=None,
createroles=None,
encrypted=True,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
groups=None,
**db_args
)
mocks["postgres.group_update"].assert_not_called()
def test_present_create_md5_prehashed(mocks, md5_pw, db_args):
assert postgres_group.present("groupname", password=md5_pw, encrypted=True) == {
"name": "groupname",
"result": True,
"changes": {"groupname": "Present"},
"comment": "The group groupname has been created",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.group_create"].assert_called_once_with(
groupname="groupname",
createdb=None,
createroles=None,
encrypted=True,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
groups=None,
**db_args
)
mocks["postgres.group_update"].assert_not_called()
def test_present_md5_matches(mocks, existing_group, md5_pw):
existing_group["password"] = md5_pw
mocks["postgres.role_get"].return_value = existing_group
assert postgres_group.present("groupname", password="password", encrypted=True) == {
"name": "groupname",
"result": True,
"changes": {},
"comment": "Group groupname is already present",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.group_create"].assert_not_called()
mocks["postgres.group_update"].assert_not_called()
def test_present_md5_matches_prehashed(mocks, existing_group, md5_pw):
existing_group["password"] = md5_pw
mocks["postgres.role_get"].return_value = existing_group
assert postgres_group.present("groupname", password=md5_pw, encrypted=True) == {
"name": "groupname",
"result": True,
"changes": {},
"comment": "Group groupname is already present",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.group_create"].assert_not_called()
mocks["postgres.group_update"].assert_not_called()
def test_present_update_md5_password(mocks, existing_group, md5_pw, db_args):
existing_group["password"] = "md500000000000000000000000000000000"
mocks["postgres.role_get"].return_value = existing_group
assert postgres_group.present("groupname", password="password", encrypted=True) == {
"name": "groupname",
"result": True,
"changes": {"groupname": {"password": True}},
"comment": "The group groupname has been updated",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.group_create"].assert_not_called()
mocks["postgres.group_update"].assert_called_once_with(
groupname="groupname",
createdb=None,
createroles=None,
encrypted=True,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
groups=None,
**db_args
)
def test_present_update_error(mocks, existing_group):
existing_group["password"] = "md500000000000000000000000000000000"
mocks["postgres.role_get"].return_value = existing_group
mocks["postgres.group_update"].return_value = False
assert postgres_group.present("groupname", password="password", encrypted=True) == {
"name": "groupname",
"result": False,
"changes": {},
"comment": "Failed to update group groupname",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.group_create"].assert_not_called()
mocks["postgres.group_update"].assert_called_once()
def test_present_update_password_no_check(mocks, existing_group, md5_pw, db_args):
mocks["postgres.role_get"].return_value = existing_group
assert postgres_group.present(
"groupname", password="password", encrypted=True, refresh_password=True
) == {
"name": "groupname",
"result": True,
"changes": {"groupname": {"password": True}},
"comment": "The group groupname has been updated",
}
mocks["postgres.role_get"].assert_called_once_with(
"groupname", return_password=False, **db_args
)
mocks["postgres.group_create"].assert_not_called()
mocks["postgres.group_update"].assert_called_once_with(
groupname="groupname",
createdb=None,
createroles=None,
encrypted=True,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
groups=None,
**db_args
)
# ==========
# postgres_group.absent
# ==========
def test_absent_delete(mocks, db_args):
mocks["postgres.user_exists"].return_value = True
assert postgres_group.absent("groupname") == {
"name": "groupname",
"result": True,
"changes": {"groupname": "Absent"},
"comment": "Group groupname has been removed",
}
mocks["postgres.user_exists"].assert_called_once_with("groupname", **db_args)
mocks["postgres.group_remove"].assert_called_once_with("groupname", **db_args)
@pytest.mark.usefixtures("test_mode")
def test_absent_test(mocks, db_args):
mocks["postgres.user_exists"].return_value = True
assert postgres_group.absent("groupname") == {
"name": "groupname",
"result": None,
"changes": {},
"comment": "Group groupname is set to be removed",
}
mocks["postgres.user_exists"].assert_called_once_with("groupname", **db_args)
mocks["postgres.group_remove"].assert_not_called()
def test_absent_already(mocks, db_args):
mocks["postgres.user_exists"].return_value = False
assert postgres_group.absent("groupname") == {
"name": "groupname",
"result": True,
"changes": {},
"comment": "Group groupname is not present, so it cannot be removed",
}
mocks["postgres.user_exists"].assert_called_once_with("groupname", **db_args)
mocks["postgres.group_remove"].assert_not_called()
def test_absent_error(mocks):
mocks["postgres.user_exists"].return_value = True
mocks["postgres.group_remove"].return_value = False
assert postgres_group.absent("groupname") == {
"name": "groupname",
"result": False,
"changes": {},
"comment": "Group groupname failed to be removed",
}
mocks["postgres.user_exists"].assert_called_once()
mocks["postgres.group_remove"].assert_called_once()

View file

@ -0,0 +1,749 @@
import pytest
import salt.modules.postgres as postgres
import salt.states.postgres_user as postgres_user
from tests.support.mock import create_autospec, patch
class ScramHash:
def __eq__(self, other):
return other.startswith("SCRAM-SHA-256$4096:")
@pytest.fixture(name="db_args")
def fixture_db_args():
return {
"runas": None,
"host": None,
"port": None,
"maintenance_db": None,
"user": None,
"password": None,
}
@pytest.fixture(name="md5_pw")
def fixture_md5_pw():
# 'md5' + md5('password' + 'username')
return "md55a231fcdb710d73268c4f44283487ba2"
@pytest.fixture(name="scram_pw")
def fixture_scram_pw():
# scram_sha_256('password')
return (
"SCRAM-SHA-256$4096:wLr5nqC+3F+r7FdQPnB+nA==$"
"0hn08ZdX8kirGaL4TM0j13digH9Wl365OOzCtAuF2pE=:"
"LzAh/MGUdjYkdbDzcOKpfGwa3WwPUsyGcY+TEnSpcto="
)
@pytest.fixture(name="existing_user")
def fixture_existing_user(md5_pw):
return {
"superuser": False,
"inherits privileges": True,
"can create roles": False,
"can create databases": False,
"can update system catalogs": None,
"can login": True,
"replication": False,
"connections": None,
"expiry time": None,
"defaults variables": "",
"password": md5_pw,
"groups": [],
}
@pytest.fixture(name="test_mode")
def fixture_test_mode():
with patch.dict(postgres_user.__opts__, {"test": True}):
yield
@pytest.fixture(name="mocks")
def fixture_mocks():
return {
"postgres.role_get": create_autospec(postgres.role_get, return_value=None),
"postgres.user_exists": create_autospec(
postgres.user_exists, return_value=False
),
"postgres.user_create": create_autospec(
postgres.user_create, return_value=True
),
"postgres.user_update": create_autospec(
postgres.user_update, return_value=True
),
"postgres.user_remove": create_autospec(
postgres.user_remove, return_value=True
),
}
@pytest.fixture(autouse=True)
def setup_loader(mocks):
setup_loader_modules = {
postgres_user: {"__opts__": {"test": False}, "__salt__": mocks},
postgres: {"__opts__": {"test": False}},
}
with pytest.helpers.loader_mock(setup_loader_modules) as loader_mock:
yield loader_mock
# ==========
# postgres_user.present
# ==========
def test_present_create_basic(mocks, db_args):
assert postgres_user.present("username") == {
"name": "username",
"result": True,
"changes": {"username": "Present"},
"comment": "The user username has been created",
}
mocks["postgres.role_get"].assert_called_once_with(
"username", return_password=True, **db_args
)
mocks["postgres.user_create"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted="md5",
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=None,
valid_until=None,
groups=None,
**db_args
)
mocks["postgres.user_update"].assert_not_called()
@pytest.mark.usefixtures("test_mode")
def test_present_create_basic_test(mocks, db_args):
assert postgres_user.present("username") == {
"name": "username",
"result": None,
"changes": {},
"comment": "User username is set to be created",
}
mocks["postgres.role_get"].assert_called_once_with(
"username", return_password=True, **db_args
)
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_not_called()
def test_present_exists_basic(mocks, existing_user, db_args):
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present("username") == {
"name": "username",
"result": True,
"changes": {},
"comment": "User username is already present",
}
mocks["postgres.role_get"].assert_called_once_with(
"username", return_password=True, **db_args
)
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_not_called()
def test_present_create_basic_error(mocks, db_args):
mocks["postgres.user_create"].return_value = False
assert postgres_user.present("username") == {
"name": "username",
"result": False,
"changes": {},
"comment": "Failed to create user username",
}
mocks["postgres.role_get"].assert_called_once_with(
"username", return_password=True, **db_args
)
mocks["postgres.user_create"].assert_called_once()
mocks["postgres.user_update"].assert_not_called()
def test_present_change_option(mocks, existing_user, db_args):
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present("username", replication=True) == {
"name": "username",
"result": True,
"changes": {"username": {"replication": True}},
"comment": "The user username has been updated",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted="md5",
superuser=None,
login=None,
inherit=None,
replication=True,
rolepassword=None,
valid_until=None,
groups=None,
**db_args
)
def test_present_create_md5_password(mocks, md5_pw, db_args):
assert postgres_user.present("username", password="password", encrypted=True) == {
"name": "username",
"result": True,
"changes": {"username": "Present"},
"comment": "The user username has been created",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted=True,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
valid_until=None,
groups=None,
**db_args
)
mocks["postgres.user_update"].assert_not_called()
def test_present_create_scram_password(mocks, db_args):
assert postgres_user.present(
"username", password="password", encrypted="scram-sha-256"
) == {
"name": "username",
"result": True,
"changes": {"username": "Present"},
"comment": "The user username has been created",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted="scram-sha-256",
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=ScramHash(),
valid_until=None,
groups=None,
**db_args
)
mocks["postgres.user_update"].assert_not_called()
def test_present_create_plain_password(mocks, db_args):
assert postgres_user.present("username", password="password", encrypted=False) == {
"name": "username",
"result": True,
"changes": {"username": "Present"},
"comment": "The user username has been created",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted=False,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword="password",
valid_until=None,
groups=None,
**db_args
)
mocks["postgres.user_update"].assert_not_called()
def test_present_create_md5_password_default_plain(mocks, monkeypatch, md5_pw, db_args):
monkeypatch.setattr(postgres, "_DEFAULT_PASSWORDS_ENCRYPTION", False)
test_present_create_md5_password(mocks, md5_pw, db_args)
def test_present_create_md5_password_default_encrypted(
mocks, monkeypatch, md5_pw, db_args
):
monkeypatch.setattr(postgres, "_DEFAULT_PASSWORDS_ENCRYPTION", True)
assert postgres_user.present("username", password="password") == {
"name": "username",
"result": True,
"changes": {"username": "Present"},
"comment": "The user username has been created",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted=True,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
valid_until=None,
groups=None,
**db_args
)
mocks["postgres.user_update"].assert_not_called()
def test_present_create_md5_prehashed(mocks, md5_pw, db_args):
assert postgres_user.present("username", password=md5_pw, encrypted=True) == {
"name": "username",
"result": True,
"changes": {"username": "Present"},
"comment": "The user username has been created",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted=True,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
valid_until=None,
groups=None,
**db_args
)
mocks["postgres.user_update"].assert_not_called()
def test_present_md5_matches(mocks, existing_user):
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present("username", password="password", encrypted=True) == {
"name": "username",
"result": True,
"changes": {},
"comment": "User username is already present",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_not_called()
def test_present_md5_matches_prehashed(mocks, existing_user, md5_pw):
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present("username", password=md5_pw, encrypted=True) == {
"name": "username",
"result": True,
"changes": {},
"comment": "User username is already present",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_not_called()
def test_present_scram_matches(mocks, existing_user, scram_pw):
existing_user["password"] = scram_pw
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present(
"username", password="password", encrypted="scram-sha-256"
) == {
"name": "username",
"result": True,
"changes": {},
"comment": "User username is already present",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_not_called()
def test_present_scram_matches_prehashed(mocks, existing_user, scram_pw):
existing_user["password"] = scram_pw
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present(
"username", password=scram_pw, encrypted="scram-sha-256"
) == {
"name": "username",
"result": True,
"changes": {},
"comment": "User username is already present",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_not_called()
def test_present_update_md5_password(mocks, existing_user, md5_pw, db_args):
existing_user["password"] = "md500000000000000000000000000000000"
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present("username", password="password", encrypted=True) == {
"name": "username",
"result": True,
"changes": {"username": {"password": True}},
"comment": "The user username has been updated",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted=True,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
valid_until=None,
groups=None,
**db_args
)
def test_present_refresh_scram_password(mocks, existing_user, scram_pw, db_args):
existing_user["password"] = scram_pw
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present(
"username",
password="password",
encrypted="scram-sha-256",
refresh_password=True,
) == {
"name": "username",
"result": True,
"changes": {"username": {"password": True}},
"comment": "The user username has been updated",
}
mocks["postgres.role_get"].assert_called_once_with(
"username", return_password=False, **db_args
)
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted="scram-sha-256",
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=ScramHash(),
valid_until=None,
groups=None,
**db_args
)
def test_present_update_error(mocks, existing_user):
existing_user["password"] = "md500000000000000000000000000000000"
mocks["postgres.role_get"].return_value = existing_user
mocks["postgres.user_update"].return_value = False
assert postgres_user.present("username", password="password", encrypted=True) == {
"name": "username",
"result": False,
"changes": {},
"comment": "Failed to update user username",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_called_once()
def test_present_update_password_no_check(mocks, existing_user, md5_pw, db_args):
del existing_user["password"]
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present(
"username", password="password", encrypted=True, refresh_password=True
) == {
"name": "username",
"result": True,
"changes": {"username": {"password": True}},
"comment": "The user username has been updated",
}
mocks["postgres.role_get"].assert_called_once_with(
"username", return_password=False, **db_args
)
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted=True,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
valid_until=None,
groups=None,
**db_args
)
def test_present_create_default_password(mocks, md5_pw, db_args):
assert postgres_user.present(
"username", default_password="password", encrypted=True
) == {
"name": "username",
"result": True,
"changes": {"username": "Present"},
"comment": "The user username has been created",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted=True,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
valid_until=None,
groups=None,
**db_args
)
def test_present_create_unused_default_password(mocks, md5_pw, db_args):
assert postgres_user.present(
"username", password="password", default_password="changeme", encrypted=True
) == {
"name": "username",
"result": True,
"changes": {"username": "Present"},
"comment": "The user username has been created",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted=True,
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
valid_until=None,
groups=None,
**db_args
)
mocks["postgres.user_update"].assert_not_called()
def test_present_existing_default_password(mocks, existing_user):
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present(
"username", default_password="changeme", encrypted=True, refresh_password=True
) == {
"name": "username",
"result": True,
"changes": {},
"comment": "User username is already present",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_not_called()
def test_present_plain_to_scram(mocks, existing_user, db_args):
existing_user["password"] = "password"
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present(
"username", password="password", encrypted="scram-sha-256"
) == {
"name": "username",
"result": True,
"changes": {"username": {"password": True}},
"comment": "The user username has been updated",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted="scram-sha-256",
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=ScramHash(),
valid_until=None,
groups=None,
**db_args
)
def test_present_plain_to_md5(mocks, existing_user, md5_pw, db_args):
existing_user["password"] = "password"
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present("username", password="password", encrypted="md5") == {
"name": "username",
"result": True,
"changes": {"username": {"password": True}},
"comment": "The user username has been updated",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted="md5",
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
valid_until=None,
groups=None,
**db_args
)
def test_present_md5_to_scram(mocks, existing_user, db_args):
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present(
"username", password="password", encrypted="scram-sha-256"
) == {
"name": "username",
"result": True,
"changes": {"username": {"password": True}},
"comment": "The user username has been updated",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted="scram-sha-256",
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=ScramHash(),
valid_until=None,
groups=None,
**db_args
)
def test_present_scram_to_md5(mocks, existing_user, scram_pw, md5_pw, db_args):
existing_user["password"] = scram_pw
mocks["postgres.role_get"].return_value = existing_user
assert postgres_user.present("username", password="password", encrypted="md5") == {
"name": "username",
"result": True,
"changes": {"username": {"password": True}},
"comment": "The user username has been updated",
}
mocks["postgres.role_get"].assert_called_once()
mocks["postgres.user_create"].assert_not_called()
mocks["postgres.user_update"].assert_called_once_with(
username="username",
createdb=None,
createroles=None,
encrypted="md5",
superuser=None,
login=None,
inherit=None,
replication=None,
rolepassword=md5_pw,
valid_until=None,
groups=None,
**db_args
)
# ==========
# postgres_user.absent
# ==========
def test_absent_delete(mocks, db_args):
mocks["postgres.user_exists"].return_value = True
assert postgres_user.absent("username") == {
"name": "username",
"result": True,
"changes": {"username": "Absent"},
"comment": "User username has been removed",
}
mocks["postgres.user_exists"].assert_called_once_with("username", **db_args)
mocks["postgres.user_remove"].assert_called_once_with("username", **db_args)
@pytest.mark.usefixtures("test_mode")
def test_absent_test(mocks, db_args):
mocks["postgres.user_exists"].return_value = True
assert postgres_user.absent("username") == {
"name": "username",
"result": None,
"changes": {},
"comment": "User username is set to be removed",
}
mocks["postgres.user_exists"].assert_called_once_with("username", **db_args)
mocks["postgres.user_remove"].assert_not_called()
def test_absent_already(mocks, db_args):
mocks["postgres.user_exists"].return_value = False
assert postgres_user.absent("username") == {
"name": "username",
"result": True,
"changes": {},
"comment": "User username is not present, so it cannot be removed",
}
mocks["postgres.user_exists"].assert_called_once_with("username", **db_args)
mocks["postgres.user_remove"].assert_not_called()
def test_absent_error(mocks):
mocks["postgres.user_exists"].return_value = True
mocks["postgres.user_remove"].return_value = False
assert postgres_user.absent("username") == {
"name": "username",
"result": False,
"changes": {},
"comment": "User username failed to be removed",
}
mocks["postgres.user_exists"].assert_called_once()
mocks["postgres.user_remove"].assert_called_once()

View file

@ -1,399 +1,11 @@
import salt.modules.postgres as postgresmod
import salt.states.postgres_extension as postgres_extension
import salt.states.postgres_group as postgres_group
import salt.states.postgres_schema as postgres_schema
import salt.states.postgres_user as postgres_user
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import MagicMock, Mock, patch
from tests.support.unit import TestCase
class PostgresUserTestCase(TestCase, LoaderModuleMockMixin):
def setup_loader_modules(self):
patcher = patch("salt.utils.path.which", Mock(return_value="/usr/bin/pgsql"))
patcher.start()
self.addCleanup(patcher.stop)
return {
postgres_user: {
"__grains__": {"os_family": "Linux"},
"__salt__": {
"config.option": Mock(),
"cmd.run_all": Mock(),
"file.chown": Mock(),
"file.remove": Mock(),
},
"__opts__": {"test": False},
},
}
def test_present__creation(self):
# test=True
with patch.dict(
postgres_user.__salt__,
{
"postgres.role_get": Mock(return_value=None),
"postgres.user_create": MagicMock(),
},
):
with patch.dict(postgres_user.__opts__, {"test": True}):
ret = postgres_user.present("foo")
self.assertEqual(
ret,
{
"comment": "User foo is set to be created",
"changes": {},
"name": "foo",
"result": None,
},
)
self.assertEqual(
postgres_user.__salt__["postgres.user_create"].call_count, 0
)
# test=False
ret = postgres_user.present("foo")
self.assertEqual(
ret,
{
"comment": "The user foo has been created",
"changes": {"foo": "Present"},
"name": "foo",
"result": True,
},
)
postgres_user.__salt__["postgres.user_create"].assert_called_once_with(
username="foo",
superuser=None,
encrypted=True,
runas=None,
inherit=None,
rolepassword=None,
port=None,
replication=None,
host=None,
createroles=None,
user=None,
groups=None,
maintenance_db=None,
login=None,
password=None,
valid_until=None,
createdb=None,
)
def test_present__update(self):
# test=True
with patch.dict(
postgres_user.__salt__,
{
"postgres.role_get": Mock(
return_value={
"can create databases": False,
"can create roles": False,
"can login": False,
"can update system catalogs": False,
"connections": None,
"defaults variables": {},
"expiry time": None,
"inherits privileges": True,
"replication": False,
"superuser": False,
}
),
"postgres.user_update": MagicMock(),
},
):
with patch.dict(postgres_user.__opts__, {"test": True}):
ret = postgres_user.present("foo", login=True, replication=False)
self.assertEqual(
ret,
{
"comment": "User foo is set to be updated",
"changes": {"foo": {"login": True}},
"name": "foo",
"result": None,
},
)
self.assertEqual(
postgres_user.__salt__["postgres.user_update"].call_count, 0
)
# test=False
ret = postgres_user.present("foo", login=True, replication=False)
self.assertEqual(
ret,
{
"comment": "The user foo has been updated",
"changes": {"foo": {"login": True}},
"name": "foo",
"result": True,
},
)
postgres_user.__salt__["postgres.user_update"].assert_called_once_with(
username="foo",
superuser=None,
encrypted=True,
runas=None,
inherit=None,
rolepassword=None,
port=None,
replication=False,
host=None,
createroles=None,
user=None,
groups=None,
maintenance_db=None,
login=True,
password=None,
valid_until=None,
createdb=None,
)
def test_present__no_update(self):
# test=True
with patch.dict(
postgres_user.__salt__,
{
"postgres.role_get": Mock(
return_value={
"can create databases": False,
"can create roles": False,
"can login": False,
"can update system catalogs": False,
"connections": None,
"defaults variables": {},
"expiry time": None,
"inherits privileges": True,
"replication": False,
"superuser": False,
}
),
"postgres.user_update": MagicMock(),
},
):
with patch.dict(postgres_user.__opts__, {"test": True}):
ret = postgres_user.present("foo", login=False, replication=False)
self.assertEqual(
ret,
{
"comment": "User foo is already present",
"changes": {},
"name": "foo",
"result": True,
},
)
self.assertEqual(
postgres_user.__salt__["postgres.user_update"].call_count, 0
)
# test=False
ret = postgres_user.present("foo", login=False, replication=False)
self.assertEqual(
ret,
{
"comment": "User foo is already present",
"changes": {},
"name": "foo",
"result": True,
},
)
self.assertEqual(
postgres_user.__salt__["postgres.user_update"].call_count, 0
)
class PostgresGroupTestCase(TestCase, LoaderModuleMockMixin):
def setup_loader_modules(self):
patcher = patch("salt.utils.path.which", Mock(return_value="/usr/bin/pgsql"))
patcher.start()
self.addCleanup(patcher.stop)
return {
postgres_group: {
"__grains__": {"os_family": "Linux"},
"__salt__": {
"config.option": Mock(),
"cmd.run_all": Mock(),
"file.chown": Mock(),
"file.remove": Mock(),
},
"__opts__": {"test": False},
},
}
def test_present__creation(self):
# test=True
with patch.dict(
postgres_group.__salt__,
{
"postgres.role_get": Mock(return_value=None),
"postgres.group_create": MagicMock(),
},
):
with patch.dict(postgres_group.__opts__, {"test": True}):
ret = postgres_group.present("foo")
self.assertEqual(
ret,
{
"comment": "Group foo is set to be created",
"changes": {},
"name": "foo",
"result": None,
},
)
self.assertEqual(
postgres_group.__salt__["postgres.group_create"].call_count, 0
)
# test=False
ret = postgres_group.present("foo")
self.assertEqual(
ret,
{
"comment": "The group foo has been created",
"changes": {},
"name": "foo",
"result": True,
},
)
postgres_group.__salt__["postgres.group_create"].assert_called_once_with(
superuser=None,
replication=None,
encrypted=True,
runas=None,
inherit=None,
rolepassword=None,
port=None,
groupname="foo",
host=None,
createroles=None,
user=None,
groups=None,
maintenance_db=None,
login=None,
password=None,
createdb=None,
)
def test_present__update(self):
# test=True
with patch.dict(
postgres_group.__salt__,
{
"postgres.role_get": Mock(
return_value={
"can create databases": False,
"can create roles": False,
"can login": False,
"can update system catalogs": False,
"connections": None,
"defaults variables": {},
"expiry time": None,
"inherits privileges": True,
"replication": False,
"superuser": False,
}
),
"postgres.group_update": MagicMock(),
},
):
with patch.dict(postgres_group.__opts__, {"test": True}):
ret = postgres_group.present("foo", login=True, replication=False)
self.assertEqual(
ret,
{
"comment": "Group foo is set to be updated",
"changes": {"foo": {"login": True}},
"name": "foo",
"result": None,
},
)
self.assertEqual(
postgres_group.__salt__["postgres.group_update"].call_count, 0
)
# test=False
ret = postgres_group.present("foo", login=True, replication=False)
self.assertEqual(
ret,
{
"comment": "The group foo has been updated",
"changes": {"foo": {"login": True}},
"name": "foo",
"result": True,
},
)
postgres_group.__salt__["postgres.group_update"].assert_called_once_with(
superuser=None,
replication=False,
encrypted=True,
runas=None,
inherit=None,
rolepassword=None,
port=None,
groupname="foo",
host=None,
createroles=None,
user=None,
groups=None,
maintenance_db=None,
login=True,
password=None,
createdb=None,
)
def test_present__no_update(self):
# test=True
with patch.dict(
postgres_group.__salt__,
{
"postgres.role_get": Mock(
return_value={
"can create databases": False,
"can create roles": False,
"can login": False,
"can update system catalogs": False,
"connections": None,
"defaults variables": {},
"expiry time": None,
"inherits privileges": True,
"replication": False,
"superuser": False,
}
),
"postgres.group_update": MagicMock(),
},
):
with patch.dict(postgres_group.__opts__, {"test": True}):
ret = postgres_group.present("foo", login=False, replication=False)
self.assertEqual(
ret,
{
"comment": "Group foo is already present",
"changes": {},
"name": "foo",
"result": True,
},
)
self.assertEqual(
postgres_group.__salt__["postgres.group_update"].call_count, 0
)
# test=False
ret = postgres_group.present("foo", login=False, replication=False)
self.assertEqual(
ret,
{
"comment": "Group foo is already present",
"changes": {},
"name": "foo",
"result": True,
},
)
self.assertEqual(
postgres_group.__salt__["postgres.group_update"].call_count, 0
)
class PostgresExtensionTestCase(TestCase, LoaderModuleMockMixin):
def setup_loader_modules(self):
patcher = patch("salt.utils.path.which", Mock(return_value="/usr/bin/pgsql"))

View file

@ -1,82 +0,0 @@
# -*- coding: utf-8 -*-
"""
:codeauthor: Jayesh Kariya <jayeshk@saltstack.com>
"""
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Libs
import salt.states.postgres_group as postgres_group
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import MagicMock, patch
from tests.support.unit import TestCase
class PostgresGroupTestCase(TestCase, LoaderModuleMockMixin):
"""
Test cases for salt.states.postgres_group
"""
def setup_loader_modules(self):
return {postgres_group: {}}
# 'present' function tests: 1
def test_present(self):
"""
Test to ensure that the named group is present
with the specified privileges.
"""
name = "frank"
ret = {"name": name, "changes": {}, "result": False, "comment": ""}
mock_t = MagicMock(return_value=True)
mock = MagicMock(return_value=None)
with patch.dict(
postgres_group.__salt__,
{"postgres.role_get": mock, "postgres.group_create": mock_t},
):
with patch.dict(postgres_group.__opts__, {"test": True}):
comt = "Group {0} is set to be created".format(name)
ret.update({"comment": comt, "result": None})
self.assertDictEqual(postgres_group.present(name), ret)
with patch.dict(postgres_group.__opts__, {"test": False}):
comt = "The group {0} has been created".format(name)
ret.update({"comment": comt, "result": True})
self.assertDictEqual(postgres_group.present(name), ret)
# 'absent' function tests: 1
def test_absent(self):
"""
Test to ensure that the named group is absent.
"""
name = "frank"
ret = {"name": name, "changes": {}, "result": False, "comment": ""}
mock_t = MagicMock(return_value=True)
mock = MagicMock(side_effect=[True, True, False])
with patch.dict(
postgres_group.__salt__,
{"postgres.user_exists": mock, "postgres.group_remove": mock_t},
):
with patch.dict(postgres_group.__opts__, {"test": True}):
comt = "Group {0} is set to be removed".format(name)
ret.update({"comment": comt, "result": None})
self.assertDictEqual(postgres_group.absent(name), ret)
with patch.dict(postgres_group.__opts__, {"test": False}):
comt = "Group {0} has been removed".format(name)
ret.update(
{"comment": comt, "result": True, "changes": {name: "Absent"}}
)
self.assertDictEqual(postgres_group.absent(name), ret)
comt = "Group {0} is not present, so it cannot be removed".format(name)
ret.update({"comment": comt, "result": True, "changes": {}})
self.assertDictEqual(postgres_group.absent(name), ret)

View file

@ -1,84 +0,0 @@
# -*- coding: utf-8 -*-
"""
:codeauthor: Jayesh Kariya <jayeshk@saltstack.com>
"""
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Libs
import salt.states.postgres_user as postgres_user
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import MagicMock, patch
from tests.support.unit import TestCase
class PostgresUserTestCase(TestCase, LoaderModuleMockMixin):
"""
Test cases for salt.states.postgres_user
"""
def setup_loader_modules(self):
return {postgres_user: {}}
# 'present' function tests: 1
def test_present(self):
"""
Test to ensure that the named user is present
with the specified privileges.
"""
name = "frank"
ret = {"name": name, "changes": {}, "result": False, "comment": ""}
mock_t = MagicMock(return_value=True)
mock = MagicMock(return_value=None)
with patch.dict(
postgres_user.__salt__,
{"postgres.role_get": mock, "postgres.user_create": mock_t},
):
with patch.dict(postgres_user.__opts__, {"test": True}):
comt = "User {0} is set to be created".format(name)
ret.update({"comment": comt, "result": None})
self.assertDictEqual(postgres_user.present(name), ret)
with patch.dict(postgres_user.__opts__, {"test": False}):
comt = "The user {0} has been created".format(name)
ret.update(
{"comment": comt, "result": True, "changes": {name: "Present"}}
)
self.assertDictEqual(postgres_user.present(name), ret)
# 'absent' function tests: 1
def test_absent(self):
"""
Test to ensure that the named user is absent.
"""
name = "frank"
ret = {"name": name, "changes": {}, "result": False, "comment": ""}
mock_t = MagicMock(return_value=True)
mock = MagicMock(side_effect=[True, True, False])
with patch.dict(
postgres_user.__salt__,
{"postgres.user_exists": mock, "postgres.user_remove": mock_t},
):
with patch.dict(postgres_user.__opts__, {"test": True}):
comt = "User {0} is set to be removed".format(name)
ret.update({"comment": comt, "result": None})
self.assertDictEqual(postgres_user.absent(name), ret)
with patch.dict(postgres_user.__opts__, {"test": False}):
comt = "User {0} has been removed".format(name)
ret.update(
{"comment": comt, "result": True, "changes": {name: "Absent"}}
)
self.assertDictEqual(postgres_user.absent(name), ret)
comt = "User {0} is not present, so it cannot be removed".format(name)
ret.update({"comment": comt, "result": True, "changes": {}})
self.assertDictEqual(postgres_user.absent(name), ret)