Fix wildcard eauth.

Migrate ``tests/integration/client/test_runner.py`` and ``tests/integration/wheel/`` to PyTest
This commit is contained in:
Pedro Algarvio 2021-10-25 11:51:46 +01:00 committed by Megan Wilhite
parent 6e846e721f
commit e33074231b
15 changed files with 440 additions and 373 deletions

View file

@ -942,7 +942,6 @@ def salt_master_factory(
prod_env_pillar_tree_root_dir,
ext_pillar_file_tree_root_dir,
salt_api_account_factory,
salt_auto_account_factory,
):
root_dir = salt_factories.get_root_dir_for_daemon("master")
conf_dir = root_dir / "conf"
@ -988,7 +987,6 @@ def salt_master_factory(
"test.*",
"grains.*",
],
salt_auto_account_factory.username: ["@wheel", "@runner", "test.*"],
}
}
@ -1551,9 +1549,4 @@ def salt_api_account_factory():
return TestAccount(username="saltdev_api", password="saltdev")
@pytest.fixture(scope="session")
def salt_auto_account_factory():
return TestAccount(username="saltdev_auto", password="saltdev")
# <---- Custom Fixtures ----------------------------------------------------------------------------------------------

View file

@ -140,8 +140,8 @@ salt/cli/salt.py:
salt/client/*:
- integration.client.test_kwarg
- integration.client.test_runner
- integration.client.test_standard
- pytests.integration.client.test_runner
salt/cloud/*:
- pytests.functional.cli.test_salt_cloud
@ -307,7 +307,7 @@ salt/utils/vt.py:
- integration.ssh.test_state
salt/wheel/*:
- integration.wheel.test_client
- pytests.integration.wheel.test_client
salt/(minion\.py|channel/.+|transport/.+):
- pytests.scenarios.multimaster.test_multimaster

View file

@ -1,148 +0,0 @@
import pytest
import salt.runner
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
from tests.support.unit import TestCase
@pytest.fixture(scope="module", autouse=True)
def salt_auto_account(salt_auto_account_factory):
with salt_auto_account_factory as account:
yield account
@pytest.mark.windows_whitelisted
class RunnerModuleTest(TestCase, AdaptedConfigurationTestCaseMixin):
# This is really an integration test since it needs a salt-master running
eauth_creds = {
"username": "saltdev_auto",
"password": "saltdev",
"eauth": "auto",
}
def setUp(self):
"""
Configure an eauth user to test with
"""
self.runner = salt.runner.RunnerClient(self.get_config("client_config"))
@pytest.mark.slow_test
def test_eauth(self):
"""
Test executing master_call with lowdata
The choice of using error.error for this is arbitrary and should be
changed to some mocked function that is more testing friendly.
"""
low = {
"client": "runner",
"fun": "error.error",
}
low.update(self.eauth_creds)
self.runner.master_call(**low)
@pytest.mark.slow_test
def test_token(self):
"""
Test executing master_call with lowdata
The choice of using error.error for this is arbitrary and should be
changed to some mocked function that is more testing friendly.
"""
import salt.auth
auth = salt.auth.LoadAuth(self.get_config("client_config"))
token = auth.mk_token(self.eauth_creds)
self.runner.master_call(
**{"client": "runner", "fun": "error.error", "token": token["token"]}
)
@pytest.mark.slow_test
def test_cmd_sync(self):
low = {
"client": "runner",
"fun": "error.error",
}
low.update(self.eauth_creds)
self.runner.cmd_sync(low)
@pytest.mark.slow_test
def test_cmd_async(self):
low = {
"client": "runner",
"fun": "error.error",
}
low.update(self.eauth_creds)
self.runner.cmd_async(low)
@pytest.mark.slow_test
def test_cmd_sync_w_arg(self):
low = {
"fun": "test.arg",
"foo": "Foo!",
"bar": "Bar!",
}
low.update(self.eauth_creds)
ret = self.runner.cmd_sync(low)
self.assertEqual(ret["kwargs"]["foo"], "Foo!")
self.assertEqual(ret["kwargs"]["bar"], "Bar!")
@pytest.mark.slow_test
def test_wildcard_auth(self):
low = {
"username": "the_s0und_of_t3ch",
"password": "willrockyou",
"eauth": "auto",
"fun": "test.arg",
"foo": "Foo!",
"bar": "Bar!",
}
self.runner.cmd_sync(low)
@pytest.mark.slow_test
def test_full_return_kwarg(self):
low = {"fun": "test.arg"}
low.update(self.eauth_creds)
ret = self.runner.cmd_sync(low, full_return=True)
self.assertIn("success", ret["data"])
@pytest.mark.slow_test
def test_cmd_sync_arg_kwarg_parsing(self):
low = {
"client": "runner",
"fun": "test.arg",
"arg": ["foo", "bar=off", "baz={qux: 123}"],
"kwarg": {"quux": "Quux"},
"quuz": "on",
}
low.update(self.eauth_creds)
ret = self.runner.cmd_sync(low)
self.assertEqual(
ret,
{
"args": ["foo"],
"kwargs": {
"bar": False,
"baz": {"qux": 123},
"quux": "Quux",
"quuz": "on",
},
},
)
@pytest.mark.slow_test
def test_invalid_kwargs_are_ignored(self):
low = {
"client": "runner",
"fun": "test.metasyntactic",
"thiskwargisbad": "justpretendimnothere",
}
low.update(self.eauth_creds)
ret = self.runner.cmd_sync(low)
self.assertEqual(ret[0], "foo")

View file

@ -1,117 +0,0 @@
import pytest
import salt.auth
import salt.utils.platform
import salt.wheel
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
from tests.support.unit import TestCase, skipIf
@pytest.fixture(scope="module", autouse=True)
def salt_auto_account(salt_auto_account_factory):
with salt_auto_account_factory as account:
yield account
@pytest.mark.windows_whitelisted
class WheelModuleTest(TestCase, AdaptedConfigurationTestCaseMixin):
eauth_creds = {
"username": "saltdev_auto",
"password": "saltdev",
"eauth": "auto",
}
def setUp(self):
"""
Configure an eauth user to test with
"""
self.wheel = salt.wheel.Wheel(dict(self.get_config("client_config")))
def tearDown(self):
del self.wheel
@pytest.mark.slow_test
def test_master_call(self):
"""
Test executing master_call with lowdata
The choice of using key.list_all for this is arbitrary and should be
changed to some mocked function that is more testing friendly.
"""
low = {"client": "wheel", "fun": "key.list_all", "print_event": False}
low.update(self.eauth_creds)
self.wheel.master_call(**low)
def test_token(self):
"""
Test executing master_call with lowdata
The choice of using key.list_all for this is arbitrary and should be
changed to some mocked function that is more testing friendly.
"""
auth = salt.auth.LoadAuth(dict(self.get_config("client_config")))
token = auth.mk_token(self.eauth_creds)
token = auth.mk_token(
{"username": "saltdev_auto", "password": "saltdev", "eauth": "auto"}
)
self.wheel.master_call(
**{
"client": "wheel",
"fun": "key.list_all",
"token": token["token"],
"print_event": False,
}
)
@pytest.mark.slow_test
def test_cmd_sync(self):
low = {
"client": "wheel",
"fun": "key.list_all",
"print_event": False,
}
low.update(self.eauth_creds)
self.wheel.cmd_sync(low)
# Remove this skipIf when Issue #39616 is resolved
# https://github.com/saltstack/salt/issues/39616
@skipIf(
salt.utils.platform.spawning_platform(),
"Causes pickling error on Windows: Issue #39616",
)
def test_cmd_async(self):
low = {
"client": "wheel_async",
"fun": "key.list_all",
"print_event": False,
}
low.update(self.eauth_creds)
self.wheel.cmd_async(low)
@pytest.mark.slow_test
def test_cmd_sync_w_arg(self):
low = {
"fun": "key.finger",
"match": "*",
"print_event": False,
}
low.update(self.eauth_creds)
ret = self.wheel.cmd_sync(low)
self.assertIn("return", ret.get("data", {}))
def test_wildcard_auth(self):
low = {
"username": "the_s0und_of_t3ch",
"password": "willrockyou",
"eauth": "auto",
"fun": "key.list_all",
"print_event": False,
}
self.wheel.cmd_sync(low)

View file

@ -1,48 +0,0 @@
import pytest
import salt.wheel
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
from tests.support.unit import TestCase
@pytest.mark.windows_whitelisted
@pytest.mark.usefixtures("salt_sub_minion")
class KeyWheelModuleTest(TestCase, AdaptedConfigurationTestCaseMixin):
def setUp(self):
self.wheel = salt.wheel.Wheel(dict(self.get_config("client_config")))
def tearDown(self):
del self.wheel
@pytest.mark.slow_test
def test_list_all(self):
ret = self.wheel.cmd("key.list_all", print_event=False)
for host in ["minion", "sub_minion"]:
self.assertIn(host, ret["minions"])
@pytest.mark.slow_test
def test_gen(self):
ret = self.wheel.cmd(
"key.gen", kwarg={"id_": "soundtechniciansrock"}, print_event=False
)
self.assertIn("pub", ret)
self.assertIn("priv", ret)
try:
self.assertTrue(ret.get("pub", "").startswith("-----BEGIN PUBLIC KEY-----"))
except AssertionError:
self.assertTrue(
ret.get("pub", "").startswith("-----BEGIN RSA PUBLIC KEY-----")
)
self.assertTrue(
ret.get("priv", "").startswith("-----BEGIN RSA PRIVATE KEY-----")
)
@pytest.mark.slow_test
def test_master_key_str(self):
ret = self.wheel.cmd("key.master_key_str", print_event=False)
self.assertIn("local", ret)
self.assertIn("master.pub", ret.get("local"))
self.assertTrue(
ret.get("local").get("master.pub").startswith("-----BEGIN PUBLIC KEY-----")
)

View file

@ -1,51 +0,0 @@
import os
import salt.wheel
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
from tests.support.unit import TestCase
class WheelPillarRootsTest(TestCase, AdaptedConfigurationTestCaseMixin):
def setUp(self):
self.wheel = salt.wheel.Wheel(dict(self.get_config("client_config")))
self.pillar_dir = self.wheel.opts["pillar_roots"]["base"][0]
self.traversed_dir = os.path.dirname(self.pillar_dir)
def tearDown(self):
try:
os.remove(os.path.join(self.pillar_dir, "foo"))
except OSError:
pass
try:
os.remove(os.path.join(self.traversed_dir, "foo"))
except OSError:
pass
del self.wheel
def test_write(self):
ret = self.wheel.cmd(
"pillar_roots.write", kwarg={"data": "foo: bar", "path": "foo"}
)
assert os.path.exists(os.path.join(self.pillar_dir, "foo"))
assert ret.find("Wrote data to file") != -1
def test_write_subdir(self):
ret = self.wheel.cmd(
"pillar_roots.write", kwarg={"data": "foo: bar", "path": "sub/dir/file"}
)
assert os.path.exists(os.path.join(self.pillar_dir, "sub/dir/file"))
assert ret.find("Wrote data to file") != -1
def test_cvr_2021_25282(self):
ret = self.wheel.cmd(
"pillar_roots.write", kwarg={"data": "foo", "path": "../foo"}
)
assert not os.path.exists(os.path.join(self.traversed_dir, "foo"))
assert ret.find("Invalid path") != -1
def test_cvr_2021_25282_subdir(self):
ret = self.wheel.cmd(
"pillar_roots.write", kwarg={"data": "foo", "path": "sub/../../foo"}
)
assert not os.path.exists(os.path.join(self.traversed_dir, "foo"))
assert ret.find("Invalid path") != -1

View file

@ -47,6 +47,11 @@ def salt_eauth_account_factory():
return TestAccount(username="saltdev-eauth")
@pytest.fixture(scope="session")
def salt_auto_account_factory():
return TestAccount(username="saltdev_auto", password="saltdev")
@pytest.fixture(scope="session")
def salt_minion_id():
return random_string("minion-")
@ -212,6 +217,7 @@ def salt_master_factory(
"auto": {
salt_netapi_account_factory.username: ["@wheel", "@runner", "test.*"],
salt_auto_account_factory.username: ["@wheel", "@runner", "test.*"],
"*": ["@wheel", "@runner", "test.*"],
},
}

View file

@ -0,0 +1,27 @@
import pytest
import salt.config
import salt.wheel
@pytest.fixture
def client_config(salt_master):
config = salt.config.client_config(
salt_master.config["conf_file"],
defaults=salt_master.config.copy(),
)
return config
@pytest.fixture(scope="module")
def salt_auto_account(salt_auto_account_factory):
with salt_auto_account_factory as account:
yield account
@pytest.fixture
def auth_creds(salt_auto_account):
return {
"username": salt_auto_account.username,
"password": salt_auto_account.password,
"eauth": "auto",
}

View file

@ -0,0 +1,151 @@
import pytest
import salt.auth
import salt.runner
pytestmark = [
pytest.mark.slow_test,
pytest.mark.skip_if_not_root,
pytest.mark.destructive_test,
pytest.mark.windows_whitelisted,
]
@pytest.fixture
def client(client_config):
return salt.runner.Runner(client_config)
def test_eauth(client, auth_creds):
"""
Test executing master_call with lowdata
The choice of using error.error for this is arbitrary and should be
changed to some mocked function that is more testing friendly.
"""
low = {"client": "runner", "fun": "error.error", **auth_creds}
ret = client.master_call(**low)
assert ret
assert "jid" in ret
assert "tag" in ret
assert ret["tag"] == "salt/run/{}".format(ret["jid"])
def test_token(client, client_config, auth_creds):
"""
Test executing master_call with lowdata
The choice of using error.error for this is arbitrary and should be
changed to some mocked function that is more testing friendly.
"""
auth = salt.auth.LoadAuth(client_config)
token = auth.mk_token(auth_creds)
low = {
"client": "runner",
"fun": "error.error",
"token": token["token"],
}
ret = client.master_call(**low)
assert ret
assert "jid" in ret
assert "tag" in ret
assert ret["tag"] == "salt/run/{}".format(ret["jid"])
def test_cmd_sync(client, auth_creds):
low = {
"client": "runner",
"fun": "error.error",
**auth_creds,
}
ret = client.cmd_sync(low.copy())
assert ret == {}
def test_cmd_async(client, auth_creds):
low = {
"client": "runner",
"fun": "error.error",
**auth_creds,
}
ret = client.cmd_async(low.copy())
assert ret
assert "jid" in ret
assert "tag" in ret
assert ret["tag"] == "salt/run/{}".format(ret["jid"])
def test_cmd_sync_w_arg(client, auth_creds):
low = {"fun": "test.arg", "foo": "Foo!", "bar": "Bar!", **auth_creds}
ret = client.cmd_sync(low.copy())
assert ret
assert "kwargs" in ret
assert ret["kwargs"]["foo"] == "Foo!"
assert ret["kwargs"]["bar"] == "Bar!"
def test_wildcard_auth(client):
# This test passes because of the following master config
# external_auth:
# auto:
# '*':
# - '@wheel'
# - '@runner'
# - test.*
low = {
"fun": "test.arg",
"foo": "Foo!",
"bar": "Bar!",
"username": "the_s0und_of_t3ch",
"password": "willrockyou",
"eauth": "auto",
}
ret = client.cmd_sync(low.copy())
assert ret
assert "kwargs" in ret
assert ret["kwargs"]["foo"] == "Foo!"
assert ret["kwargs"]["bar"] == "Bar!"
def test_full_return_kwarg(client, auth_creds):
low = {"fun": "test.arg", **auth_creds}
ret = client.cmd_sync(low.copy(), full_return=True)
assert ret
assert "data" in ret
assert "success" in ret["data"]
def test_cmd_sync_arg_kwarg_parsing(client, auth_creds):
low = {
"client": "runner",
"fun": "test.arg",
"arg": ["foo", "bar=off", "baz={qux: 123}"],
"kwarg": {"quux": "Quux"},
"quuz": "on",
**auth_creds,
}
ret = client.cmd_sync(low.copy())
assert ret
assert ret == {
"args": ["foo"],
"kwargs": {"bar": False, "baz": {"qux": 123}, "quux": "Quux", "quuz": "on"},
}
def test_invalid_kwargs_are_ignored(client, auth_creds):
low = {
"client": "runner",
"fun": "test.metasyntactic",
"thiskwargisbad": "justpretendimnothere",
**auth_creds,
}
ret = client.cmd_sync(low.copy())
assert ret
assert ret[0] == "foo"

View file

@ -0,0 +1,32 @@
import pytest
import salt.config
import salt.wheel
@pytest.fixture
def client_config(salt_master):
config = salt.config.client_config(
salt_master.config["conf_file"],
defaults=salt_master.config.copy(),
)
return config
@pytest.fixture
def client(client_config):
return salt.wheel.Wheel(client_config)
@pytest.fixture(scope="module")
def salt_auto_account(salt_auto_account_factory):
with salt_auto_account_factory as account:
yield account
@pytest.fixture
def auth_creds(salt_auto_account):
return {
"username": salt_auto_account.username,
"password": salt_auto_account.password,
"eauth": "auto",
}

View file

@ -0,0 +1,130 @@
import pytest
import salt.auth
pytestmark = [
pytest.mark.slow_test,
pytest.mark.skip_if_not_root,
pytest.mark.destructive_test,
pytest.mark.windows_whitelisted,
]
def test_master_call(client, auth_creds, salt_auto_account):
"""
Test executing master_call with lowdata
The choice of using key.list_all for this is arbitrary and should be
changed to some mocked function that is more testing friendly.
"""
low = {"client": "wheel", "fun": "key.list_all", "print_event": False, **auth_creds}
ret = client.master_call(**low)
assert ret
assert "data" in ret
data = ret["data"]
assert data["success"] is True
assert data["user"] == salt_auto_account.username
assert data["fun"] == "wheel.key.list_all"
assert data["return"]
assert data["return"]["local"] == ["master.pem", "master.pub"]
def test_token(client, client_config, auth_creds, salt_auto_account):
"""
Test executing master_call with lowdata
The choice of using key.list_all for this is arbitrary and should be
changed to some mocked function that is more testing friendly.
"""
auth = salt.auth.LoadAuth(client_config)
token = auth.mk_token(auth_creds)
ret = client.master_call(
**{
"client": "wheel",
"fun": "key.list_all",
"token": token["token"],
"print_event": False,
}
)
assert ret
assert "data" in ret
data = ret["data"]
assert data["success"] is True
assert data["user"] == salt_auto_account.username
assert data["fun"] == "wheel.key.list_all"
assert data["return"]
assert data["return"]["local"] == ["master.pem", "master.pub"]
def test_cmd_sync(client, auth_creds, salt_auto_account):
low = {"client": "async", "fun": "key.list_all", "print_event": False, **auth_creds}
ret = client.cmd_sync(low)
assert ret
assert "data" in ret
data = ret["data"]
assert data["success"] is True
assert data["user"] == salt_auto_account.username
assert data["fun"] == "wheel.key.list_all"
assert data["return"]
assert data["return"]["local"] == ["master.pem", "master.pub"]
# Remove this skipIf when https://github.com/saltstack/salt/issues/39616 is resolved
@pytest.mark.skip_on_windows(reason="Causes pickling error on Windows: Issue #39616")
def test_cmd_async(client, auth_creds):
low = {
"client": "wheel_async",
"fun": "key.list_all",
"print_event": False,
**auth_creds,
}
ret = client.cmd_async(low)
# Being an async command, we won't get any data back, just the JID and the TAG
assert ret
assert "jid" in ret
assert "tag" in ret
assert ret["tag"] == "salt/wheel/{}".format(ret["jid"])
def test_cmd_sync_w_arg(client, auth_creds, salt_auto_account):
low = {"fun": "key.finger", "match": "*", "print_event": False, **auth_creds}
ret = client.cmd_sync(low)
assert ret
assert "data" in ret
data = ret["data"]
assert data["success"] is True
assert data["user"] == salt_auto_account.username
assert data["fun"] == "wheel.key.finger"
assert data["return"]
assert data["return"]["local"]
assert "master.pem" in data["return"]["local"]
assert "master.pub" in data["return"]["local"]
def test_wildcard_auth(client):
# This test passes because of the following master config
# external_auth:
# auto:
# '*':
# - '@wheel'
# - '@runner'
# - test.*
username = "the_s0und_of_t3ch"
low = {
"username": username,
"password": "willrockyou",
"eauth": "auto",
"fun": "key.list_all",
"print_event": False,
}
ret = client.cmd_sync(low)
assert ret
assert "data" in ret
data = ret["data"]
assert data["success"] is True
assert data["user"] == username
assert data["fun"] == "wheel.key.list_all"
assert data["return"]
assert data["return"]["local"] == ["master.pem", "master.pub"]

View file

@ -0,0 +1,38 @@
import pytest
pytestmark = [
pytest.mark.slow_test,
pytest.mark.windows_whitelisted,
]
def test_list_all(client, salt_minion, salt_sub_minion):
ret = client.cmd("key.list_all", print_event=False)
assert ret
assert "minions" in ret
assert salt_minion.id in ret["minions"]
assert salt_sub_minion.id in ret["minions"]
def test_gen(client):
ret = client.cmd(
"key.gen", kwarg={"id_": "soundtechniciansrock"}, print_event=False
)
assert ret
assert "pub" in ret
try:
assert ret["pub"].startswith("-----BEGIN PUBLIC KEY-----")
except AttributeError:
assert ret["pub"].startswith("-----BEGIN RSA PUBLIC KEY-----")
assert "priv" in ret
assert ret["priv"].startswith("-----BEGIN RSA PRIVATE KEY-----")
def test_master_key_str(client):
ret = client.cmd("key.master_key_str", print_event=False)
assert ret
assert "local" in ret
data = ret["local"]
assert "master.pub" in data
assert data["master.pub"].startswith("-----BEGIN PUBLIC KEY-----")

View file

@ -0,0 +1,54 @@
import pytest
from saltfactories.utils import random_string
pytestmark = [
pytest.mark.windows_whitelisted,
]
@pytest.fixture
def pillar_file_path(salt_master):
pillar_dir = salt_master.pillar_tree.base.write_path
testfile = pillar_dir / random_string("foo")
try:
yield testfile
finally:
if testfile.exists():
testfile.unlink()
def test_write(client, pillar_file_path):
ret = client.cmd(
"pillar_roots.write", kwarg={"data": "foo: bar", "path": pillar_file_path.name}
)
assert pillar_file_path.is_file()
assert ret.find("Wrote data to file") != -1
def test_write_subdir(client, salt_master):
ret = client.cmd(
"pillar_roots.write", kwarg={"data": "foo: bar", "path": "sub/dir/file"}
)
pillar_file_path = salt_master.pillar_tree.base.write_path / "sub" / "dir" / "file"
assert pillar_file_path.is_file()
assert ret.find("Wrote data to file") != -1
def test_cvr_2021_25282(client, pillar_file_path):
ret = client.cmd(
"pillar_roots.write",
kwarg={"data": "foo", "path": "../{}".format(pillar_file_path.name)},
)
assert not pillar_file_path.parent.parent.joinpath(pillar_file_path.name).is_file()
assert ret.find("Invalid path") != -1
def test_cvr_2021_25282_subdir(client, pillar_file_path):
ret = client.cmd(
"pillar_roots.write",
kwarg={"data": "foo", "path": "../../{}".format(pillar_file_path.name)},
)
assert not pillar_file_path.parent.parent.parent.joinpath(
pillar_file_path.name
).is_file()
assert ret.find("Invalid path") != -1