Fixed salt key management with eauth.

Properly pass minion list to wheel command in case of using eauth with
salt-key command. Test included.
This commit is contained in:
Dmitry Kuzmenko 2019-07-31 17:53:51 +03:00 committed by Daniel Wozniak
parent f850d1796d
commit 400e9de43b
5 changed files with 111 additions and 65 deletions

View file

@ -541,7 +541,7 @@ class Resolver(object):
)
print(
"Available eauth types: {0}".format(
", ".join(self.auth.file_mapping.keys())
", ".join([k[:-5] for k in self.auth if k.endswith(".auth")])
)
)
return ret

View file

@ -184,6 +184,8 @@ class KeyCLI(object):
ret.pop("local", None)
return ret
if cmd in ("accept", "reject", "delete") and args is None:
args = self.opts.get("match_dict", {}).get("minions")
fstr = "key.{0}".format(cmd)
fun = self.client.functions[fstr]
args, kwargs = self._get_args_kwargs(fun, args)

View file

@ -11,15 +11,33 @@ import pytest
import salt.utils.files
import salt.utils.platform
import salt.utils.yaml
from salt.ext import six
from tests.support.case import ShellCase
from tests.support.helpers import destructiveTest, skip_if_not_root, slowTest
from tests.support.helpers import (
destructiveTest,
skip_if_not_root,
slowTest,
with_system_user,
)
from tests.support.mixins import ShellCaseCommonTestsMixin
from tests.support.runtests import RUNTIME_VARS
from tests.support.unit import skipIf
USERA = "saltdev"
USERA_PWD = "saltdev"
HASHED_USERA_PWD = "$6$SALTsalt$ZZFD90fKFWq8AGmmX0L3uBtS9fXL62SrTk5zcnQ6EkD6zoiM3kB88G1Zvs0xm/gZ7WXJRs5nsTBybUvGSqZkT."
PUB_KEY = textwrap.dedent(
"""\
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoqIZDtcQtqUNs0wC7qQz
JwFhXAVNT5C8M8zhI+pFtF/63KoN5k1WwAqP2j3LquTG68WpxcBwLtKfd7FVA/Kr
OF3kXDWFnDi+HDchW2lJObgfzLckWNRFaF8SBvFM2dys3CGSgCV0S/qxnRAjrJQb
B3uQwtZ64ncJAlkYpArv3GwsfRJ5UUQnYPDEJwGzMskZ0pHd60WwM1gMlfYmNX5O
RBEjybyNpYDzpda6e6Ypsn6ePGLkP/tuwUf+q9wpbRE3ZwqERC2XRPux+HX2rGP+
mkzpmuHkyi2wV33A9pDfMgRHdln2CLX0KgfRGixUQhW1o+Kmfv2rq4sGwpCgLbTh
NwIDAQAB
-----END PUBLIC KEY-----
"""
)
MIN_NAME = "minibar"
@pytest.mark.windows_whitelisted
@ -30,68 +48,52 @@ class KeyTest(ShellCase, ShellCaseCommonTestsMixin):
_call_binary_ = "salt-key"
def _add_user(self):
"""
helper method to add user
"""
try:
add_user = self.run_call("user.add {0} createhome=False".format(USERA))
add_pwd = self.run_call(
"shadow.set_password {0} '{1}'".format(
USERA,
USERA_PWD if salt.utils.platform.is_darwin() else HASHED_USERA_PWD,
)
)
self.assertTrue(add_user)
self.assertTrue(add_pwd)
user_list = self.run_call("user.list_users")
self.assertIn(USERA, six.text_type(user_list))
except AssertionError:
self.run_call("user.delete {0} remove=True".format(USERA))
self.skipTest("Could not add user or password, skipping test")
def _remove_user(self):
"""
helper method to remove user
"""
user_list = self.run_call("user.list_users")
for user in user_list:
if USERA in user:
self.run_call("user.delete {0} remove=True".format(USERA))
@slowTest
def test_remove_key(self):
"""
test salt-key -d usage
"""
min_name = "minibar"
pki_dir = self.master_opts["pki_dir"]
key = os.path.join(pki_dir, "minions", min_name)
key = os.path.join(pki_dir, "minions", MIN_NAME)
with salt.utils.files.fopen(key, "w") as fp:
fp.write(
textwrap.dedent(
"""\
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoqIZDtcQtqUNs0wC7qQz
JwFhXAVNT5C8M8zhI+pFtF/63KoN5k1WwAqP2j3LquTG68WpxcBwLtKfd7FVA/Kr
OF3kXDWFnDi+HDchW2lJObgfzLckWNRFaF8SBvFM2dys3CGSgCV0S/qxnRAjrJQb
B3uQwtZ64ncJAlkYpArv3GwsfRJ5UUQnYPDEJwGzMskZ0pHd60WwM1gMlfYmNX5O
RBEjybyNpYDzpda6e6Ypsn6ePGLkP/tuwUf+q9wpbRE3ZwqERC2XRPux+HX2rGP+
mkzpmuHkyi2wV33A9pDfMgRHdln2CLX0KgfRGixUQhW1o+Kmfv2rq4sGwpCgLbTh
NwIDAQAB
-----END PUBLIC KEY-----
"""
)
)
fp.write(PUB_KEY)
check_key = self.run_key("-p {0}".format(min_name))
check_key = self.run_key("-p {0}".format(MIN_NAME))
self.assertIn("Accepted Keys:", check_key)
self.assertIn("minibar: -----BEGIN PUBLIC KEY-----", check_key)
self.assertIn("{0}: -----BEGIN PUBLIC KEY-----".format(MIN_NAME), check_key)
remove_key = self.run_key("-d {0} -y".format(min_name))
remove_key = self.run_key("-d {0} -y".format(MIN_NAME))
check_key = self.run_key("-p {0}".format(min_name))
check_key = self.run_key("-p {0}".format(MIN_NAME))
self.assertEqual([], check_key)
@skip_if_not_root
@destructiveTest
@skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
@with_system_user(USERA, password=USERA_PWD)
@slowTest
def test_remove_key_eauth(self, username):
"""
test salt-key -d usage
"""
pki_dir = self.master_opts["pki_dir"]
key = os.path.join(pki_dir, "minions", MIN_NAME)
with salt.utils.files.fopen(key, "w") as fp:
fp.write(PUB_KEY)
check_key = self.run_key("-p {0}".format(MIN_NAME))
self.assertIn("Accepted Keys:", check_key)
self.assertIn("{0}: -----BEGIN PUBLIC KEY-----".format(MIN_NAME), check_key)
remove_key = self.run_key(
"-d {0} -y --eauth pam --username {1} --password {2}".format(
MIN_NAME, username, USERA_PWD
)
)
check_key = self.run_key("-p {0}".format(MIN_NAME))
self.assertEqual([], check_key)
@slowTest
@ -209,37 +211,42 @@ class KeyTest(ShellCase, ShellCaseCommonTestsMixin):
@skip_if_not_root
@destructiveTest
@skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
@with_system_user(USERA, password=USERA_PWD)
@slowTest
def test_list_acc_eauth(self):
def test_list_acc_eauth(self, username):
"""
test salt-key -l with eauth
"""
self._add_user()
data = self.run_key(
"-l acc --eauth pam --username {0} --password {1}".format(USERA, USERA_PWD)
"-l acc --eauth pam --username {0} --password {1}".format(
username, USERA_PWD
)
)
expect = ["Accepted Keys:", "minion", "sub_minion"]
self.assertEqual(data, expect)
self._remove_user()
@skip_if_not_root
@destructiveTest
@skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
@with_system_user(USERA, password=USERA_PWD)
@skipIf(True, "SLOWTEST skip")
@slowTest
def test_list_acc_eauth_bad_creds(self):
def test_list_acc_eauth_bad_creds(self, username):
"""
test salt-key -l with eauth and bad creds
"""
self._add_user()
data = self.run_key(
"-l acc --eauth pam --username {0} --password wrongpassword".format(USERA)
"-l acc --eauth pam --username {0} --password wrongpassword".format(
username
)
)
expect = [
'Authentication failure of type "eauth" occurred for user {0}.'.format(
USERA
username
)
]
self.assertEqual(data, expect)
self._remove_user()
@slowTest
def test_list_acc_wrong_eauth(self):
@ -266,12 +273,12 @@ class KeyTest(ShellCase, ShellCaseCommonTestsMixin):
@slowTest
def test_keys_generation(self):
tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
arg_str = "--gen-keys minibar --gen-keys-dir {0}".format(tempdir)
arg_str = "--gen-keys {0} --gen-keys-dir {1}".format(MIN_NAME, tempdir)
self.run_key(arg_str)
try:
key_names = None
if self.master_opts["transport"] in ("zeromq", "tcp"):
key_names = ("minibar.pub", "minibar.pem")
key_names = ("{0}.pub".format(MIN_NAME), "{0}.pem".format(MIN_NAME))
for fname in key_names:
self.assertTrue(os.path.isfile(os.path.join(tempdir, fname)))
finally:

View file

@ -255,6 +255,29 @@ class ShellCase(TestCase, AdaptedConfigurationTestCaseMixin, ScriptPathMixin):
log.debug("Result of run_call for command '%s': %s", arg_str, ret)
return ret
def run_function(
self,
function,
arg=(),
with_retcode=False,
catch_stderr=False,
local=False,
timeout=RUN_TIMEOUT,
**kwargs
):
"""
Execute function with salt-call.
This function is added for compatibility with ModuleCase. This makes it possible to use
decorators like @with_system_user.
"""
arg_str = "{0} {1} {2}".format(
function,
" ".join((str(arg_) for arg_ in arg)),
" ".join(("{0}={1}".format(*item) for item in kwargs.items())),
)
return self.run_call(arg_str, with_retcode, catch_stderr, local, timeout)
def run_cloud(self, arg_str, catch_stderr=False, timeout=None):
"""
Execute salt-cloud
@ -896,6 +919,7 @@ class SSHCase(ShellCase):
def _arg_str(self, function, arg):
return "{0} {1}".format(function, " ".join(arg))
# pylint: disable=arguments-differ
def run_function(
self, function, arg=(), timeout=180, wipe=True, raw=False, **kwargs
):
@ -921,6 +945,7 @@ class SSHCase(ShellCase):
except Exception: # pylint: disable=broad-except
return ret
# pylint: enable=arguments-differ
def custom_roster(self, new_roster, data):
"""
helper method to create a custom roster to use for a ssh test

View file

@ -38,6 +38,7 @@ import salt.ext.tornado.ioloop
import salt.ext.tornado.web
import salt.utils.files
import salt.utils.platform
import salt.utils.pycrypto
import salt.utils.stringutils
import salt.utils.versions
from salt.ext import six
@ -747,6 +748,17 @@ def with_system_user(
username
)
)
if not salt.utils.platform.is_windows() and password is not None:
if salt.utils.platform.is_darwin():
hashed_password = password
else:
hashed_password = salt.utils.pycrypto.gen_hash(
crypt_salt="SALTsalt", password=password
)
hashed_password = "'{0}'".format(hashed_password)
add_pwd = cls.run_function(
"shadow.set_password", [username, hashed_password]
)
failure = None
try: