Thrown an error when the tmpdir fixture is used. Migrate to tmp_path

This commit is contained in:
Pedro Algarvio 2022-06-03 15:41:56 +01:00 committed by Megan Wilhite
parent 599872e104
commit 70266f5aa8
13 changed files with 182 additions and 180 deletions

View file

@ -632,6 +632,15 @@ def salt_factories_config():
}
@pytest.fixture
def tmpdir(tmpdir):
raise pytest.UsageError(
"The `tmpdir` fixture uses Pytest's `pypath` implementation which "
"is getting deprecated in favor of `pathlib`. Please use the "
"`tmp_path` fixture instead."
)
# <---- Fixtures Overrides -------------------------------------------------------------------------------------------

View file

@ -2,13 +2,12 @@
Tests for the file state
"""
import pytest
import salt.states.file
@pytest.mark.parametrize("verify_ssl", [True, False])
@pytest.mark.slow_test
def test_get_source_sum_verify_ssl_false(
salt_call_cli, tmpdir, ssl_webserver, verify_ssl
salt_call_cli, tmp_path, ssl_webserver, verify_ssl
):
"""
test verify_ssl with get_source_sum
@ -17,7 +16,7 @@ def test_get_source_sum_verify_ssl_false(
ret = salt_call_cli.run(
"--local",
"file.get_source_sum",
tmpdir.join("test_source_sum.txt").strpath,
str(tmp_path / "test_source_sum.txt"),
web_file,
web_file + ".sha256",
"verify_ssl={}".format(verify_ssl),
@ -34,7 +33,7 @@ def test_get_source_sum_verify_ssl_false(
@pytest.mark.parametrize("verify_ssl", [True, False])
@pytest.mark.slow_test
def test_get_managed_verify_ssl(salt_call_cli, tmpdir, ssl_webserver, verify_ssl):
def test_get_managed_verify_ssl(salt_call_cli, tmp_path, ssl_webserver, verify_ssl):
"""
test verify_ssl with get_managed
"""
@ -42,7 +41,7 @@ def test_get_managed_verify_ssl(salt_call_cli, tmpdir, ssl_webserver, verify_ssl
ret = salt_call_cli.run(
"--local",
"file.get_managed",
tmpdir.join("test_managed.txt").strpath,
str(tmp_path / "test_managed.txt"),
"",
web_file,
web_file + ".sha256",
@ -65,15 +64,15 @@ def test_get_managed_verify_ssl(salt_call_cli, tmpdir, ssl_webserver, verify_ssl
@pytest.mark.parametrize("verify_ssl", [True, False])
@pytest.mark.slow_test
def test_manage_file_verify_ssl(salt_call_cli, tmpdir, ssl_webserver, verify_ssl):
def test_manage_file_verify_ssl(salt_call_cli, tmp_path, ssl_webserver, verify_ssl):
"""
test verify_ssl with manage_file
"""
test_file = tmpdir.join("test_manage_file.txt").strpath
test_file = tmp_path / "test_manage_file.txt"
ret = salt_call_cli.run(
"--local",
"file.manage_file",
test_file,
str(test_file),
"",
"",
ssl_webserver.url("this.txt"),
@ -97,17 +96,17 @@ def test_manage_file_verify_ssl(salt_call_cli, tmpdir, ssl_webserver, verify_ssl
@pytest.mark.parametrize("verify_ssl", [True, False])
@pytest.mark.slow_test
def test_check_managed_changes_verify_ssl(
salt_call_cli, tmpdir, ssl_webserver, verify_ssl
salt_call_cli, tmp_path, ssl_webserver, verify_ssl
):
"""
test verify_ssl with check_managed_changes
"""
test_file = tmpdir.join("test_managed_changes.txt").strpath
test_file = tmp_path / "test_managed_changes.txt"
web_url = ssl_webserver.url("this.txt")
ret = salt_call_cli.run(
"--local",
"file.check_managed_changes",
test_file,
str(test_file),
web_url,
web_url + ".sha256",
"",
@ -123,25 +122,24 @@ def test_check_managed_changes_verify_ssl(
)
if not verify_ssl:
assert ret.data["newfile"] == test_file
assert ret.data["newfile"] == str(test_file)
else:
assert "SSL: CERTIFICATE_VERIFY_FAILED" in ret.stderr
@pytest.mark.parametrize("verify_ssl", [True, False])
@pytest.mark.slow_test
def test_check_file_meta_verify_ssl(salt_call_cli, tmpdir, ssl_webserver, verify_ssl):
def test_check_file_meta_verify_ssl(salt_call_cli, tmp_path, ssl_webserver, verify_ssl):
"""
test verify_ssl with check_file_meta
"""
test_file = tmpdir.join("test_check_file_meta.txt").strpath
with salt.utils.files.fopen(test_file, "w") as fh_:
fh_.write("test check_file_meta")
test_file = tmp_path / "test_check_file_meta.txt"
test_file.write_text("test check_file_meta")
web_url = ssl_webserver.url("this.txt")
ret = salt_call_cli.run(
"--local",
"file.check_file_meta",
test_file,
str(test_file),
"",
web_url,
"{hash_type: 'sha256', 'hsum':"

View file

@ -79,7 +79,7 @@ def test_state_with_import(salt_ssh_cli, state_tree):
@pytest.fixture
def nested_state_tree(base_env_state_tree_root_dir, tmpdir):
def nested_state_tree(base_env_state_tree_root_dir, tmp_path):
top_file = """
base:
'localhost':
@ -93,7 +93,7 @@ def nested_state_tree(base_env_state_tree_root_dir, tmpdir):
- source: salt://foo/file.jinja
- template: jinja
""".format(
tmpdir
tmp_path
)
file_jinja = """
{% from 'foo/map.jinja' import comment %}{{ comment }}

View file

@ -179,7 +179,7 @@ def salt_cli_secondary_wrapper(salt_secondary_master, salt_secondary_minion):
@SKIP_INITIAL_PHOTONOS_FAILURES
@pytest.mark.skip_on_windows
def test_verify_ssl_skip_verify_false(
salt_master, salt_call_cli, tmpdir, ssl_webserver
salt_master, salt_call_cli, tmp_path, ssl_webserver
):
"""
test verify_ssl when its False and True when managing
@ -193,7 +193,7 @@ def test_verify_ssl_skip_verify_false(
- source: {}
- source_hash: {}
""".format(
tmpdir.join("test_verify_ssl_true.txt"), web_file, web_file + ".sha256"
tmp_path / "test_verify_ssl_true.txt", web_file, web_file + ".sha256"
)
false_content = true_content + " - verify_ssl: False"

View file

@ -13,7 +13,7 @@ pytestmark = [
@pytest.fixture
def ssh_target(tmpdir):
def ssh_target(tmp_path):
argv = [
"ssh.set_auth_key",
"root",
@ -23,8 +23,8 @@ def ssh_target(tmpdir):
opts = {
"argv": argv,
"__role": "master",
"cachedir": tmpdir.strpath,
"extension_modules": tmpdir.join("extmods").strpath,
"cachedir": str(tmp_path),
"extension_modules": str(tmp_path / "extmods"),
}
target = {
"passwd": "abc123",

View file

@ -106,7 +106,7 @@ def test_add():
@pytest.mark.slow_test
def test_save(tmpdir):
def test_save():
"""
Test saving beacons.
"""

View file

@ -81,7 +81,7 @@ def test_full_data():
assert publish.publish("*", "publish.salt") == {}
def test_runner(tmpdir):
def test_runner():
"""
Test if it execute a runner on the master and return the data
from the runner function

View file

@ -1,10 +1,13 @@
import os
import logging
import pytest
import salt.modules.tls as tls
from tests.support.helpers import SKIP_INITIAL_PHOTONOS_FAILURES
from tests.support.mock import MagicMock, patch
log = logging.getLogger(__name__)
pytestmark = [
SKIP_INITIAL_PHOTONOS_FAILURES,
]
@ -34,86 +37,72 @@ def tls_test_data():
@pytest.mark.skip_on_windows(reason="Skipping on Windows per Shane's suggestion")
def test_create_ca_permissions_on_cert_and_key(tmpdir, tls_test_data):
def test_create_ca_permissions_on_cert_and_key(tmp_path, tls_test_data):
ca_name = "test_ca"
certp = tmpdir.join(ca_name).join("{}_ca_cert.crt".format(ca_name)).strpath
certk = tmpdir.join(ca_name).join("{}_ca_cert.key".format(ca_name)).strpath
mock_opt = MagicMock(return_value=tmpdir)
certp = tmp_path / ca_name / "{}_ca_cert.crt".format(ca_name)
certk = tmp_path / ca_name / "{}_ca_cert.key".format(ca_name)
mock_opt = MagicMock(return_value=str(tmp_path))
mock_ret = MagicMock(return_value=0)
with patch.dict(
tls.__salt__, {"config.option": mock_opt, "cmd.retcode": mock_ret}
), patch.dict(tls.__opts__, {"hash_type": "sha256", "cachedir": str(tmpdir)}):
), patch.dict(tls.__opts__, {"hash_type": "sha256", "cachedir": str(tmp_path)}):
tls.create_ca(ca_name, days=365, fixmode=False, **tls_test_data["create_ca"])
certp_mode = os.stat(certp).st_mode & 0o7777
certk_mode = os.stat(certk).st_mode & 0o7777
assert 0o644 == certp_mode
assert 0o600 == certk_mode
assert certp.stat().st_mode & 0o7777 == 0o644
assert certk.stat().st_mode & 0o7777 == 0o600
@pytest.mark.skip_on_windows(reason="Skipping on Windows per Shane's suggestion")
def test_create_csr_permissions_on_csr_and_key(tmpdir, tls_test_data):
def test_create_csr_permissions_on_csr_and_key(tmp_path, tls_test_data):
ca_name = "test_ca"
csrp = (
tmpdir.join(ca_name)
.join("certs")
.join("{}.csr".format(tls_test_data["create_ca"]["CN"]))
.strpath
tmp_path / ca_name / "certs" / "{}.csr".format(tls_test_data["create_ca"]["CN"])
)
keyp = (
tmpdir.join(ca_name)
.join("certs")
.join("{}.key".format(tls_test_data["create_ca"]["CN"]))
.strpath
tmp_path / ca_name / "certs" / "{}.key".format(tls_test_data["create_ca"]["CN"])
)
mock_opt = MagicMock(return_value=tmpdir)
mock_opt = MagicMock(return_value=str(tmp_path))
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(
tls.__salt__,
{"config.option": mock_opt, "cmd.retcode": mock_ret, "pillar.get": mock_pgt},
), patch.dict(tls.__opts__, {"hash_type": "sha256", "cachedir": str(tmpdir)}):
tls.create_ca(ca_name, days=365, **tls_test_data["create_ca"])
tls.create_csr(ca_name, **tls_test_data["create_ca"])
), patch.dict(tls.__opts__, {"hash_type": "sha256", "cachedir": str(tmp_path)}):
ca_ret = tls.create_ca(ca_name, days=365, **tls_test_data["create_ca"])
assert ca_ret
csr_ret = tls.create_csr(ca_name, **tls_test_data["create_ca"])
assert csr_ret
csrp_mode = os.stat(csrp).st_mode & 0o7777
keyp_mode = os.stat(keyp).st_mode & 0o7777
assert csrp.exists()
assert keyp.exists()
assert 0o644 == csrp_mode
assert 0o600 == keyp_mode
assert csrp.stat().st_mode & 0o7777 == 0o644
assert keyp.stat().st_mode & 0o7777 == 0o600
@pytest.mark.skip_on_windows(reason="Skipping on Windows per Shane's suggestion")
def test_create_self_signed_cert_permissions_on_csr_cert_and_key(tmpdir, tls_test_data):
def test_create_self_signed_cert_permissions_on_csr_cert_and_key(
tmp_path, tls_test_data
):
ca_name = "test_ca"
certp = (
tmpdir.join(ca_name)
.join("certs")
.join("{}.crt".format(tls_test_data["create_ca"]["CN"]))
.strpath
tmp_path / ca_name / "certs" / "{}.crt".format(tls_test_data["create_ca"]["CN"])
)
keyp = (
tmpdir.join(ca_name)
.join("certs")
.join("{}.key".format(tls_test_data["create_ca"]["CN"]))
.strpath
tmp_path / ca_name / "certs" / "{}.key".format(tls_test_data["create_ca"]["CN"])
)
mock_opt = MagicMock(return_value=tmpdir)
mock_opt = MagicMock(return_value=str(tmp_path))
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(
tls.__salt__,
{"config.option": mock_opt, "cmd.retcode": mock_ret, "pillar.get": mock_pgt},
), patch.dict(tls.__opts__, {"hash_type": "sha256", "cachedir": str(tmpdir)}):
), patch.dict(tls.__opts__, {"hash_type": "sha256", "cachedir": str(tmp_path)}):
tls.create_self_signed_cert(ca_name, days=365, **tls_test_data["create_ca"])
certp_mode = os.stat(certp).st_mode & 0o7777
keyp_mode = os.stat(keyp).st_mode & 0o7777
assert 0o644 == certp_mode
assert 0o600 == keyp_mode
assert certp.stat().st_mode & 0o7777 == 0o644
assert keyp.stat().st_mode & 0o7777 == 0o600

View file

@ -31,12 +31,12 @@ def sections():
return sections
def test_options_present(tmpdir, sections):
def test_options_present(tmp_path, sections):
"""
Test to verify options present when
file does not initially exist
"""
name = tmpdir.join("test.ini").strpath
name = str(tmp_path / "test.ini")
exp_ret = {
"name": name,
@ -49,12 +49,12 @@ def test_options_present(tmpdir, sections):
assert mod_ini_manage.get_ini(name) == sections
def test_options_present_true_no_file(tmpdir, sections):
def test_options_present_true_no_file(tmp_path, sections):
"""
Test to verify options present when
file does not initially exist and test=True
"""
name = tmpdir.join("test_true_no_file.ini").strpath
name = str(tmp_path / "test_true_no_file.ini")
exp_ret = {
"name": name,
@ -73,12 +73,12 @@ def test_options_present_true_no_file(tmpdir, sections):
assert not os.path.exists(name)
def test_options_present_true_file(tmpdir, sections):
def test_options_present_true_file(tmp_path, sections):
"""
Test to verify options present when
file does exist and test=True
"""
name = tmpdir.join("test_true_file.ini").strpath
name = str(tmp_path / "test_true_file.ini")
exp_ret = {
"name": name,

View file

@ -151,21 +151,21 @@ def test_cryptical_dumps_invalid_nonce():
assert master_crypt.loads(ret, nonce="abcde")
def test_verify_signature(tmpdir):
tmpdir.join("foo.pem").write(PRIV_KEY.strip())
tmpdir.join("foo.pub").write(PUB_KEY.strip())
tmpdir.join("bar.pem").write(PRIV_KEY2.strip())
tmpdir.join("bar.pub").write(PUB_KEY2.strip())
def test_verify_signature(tmp_path):
tmp_path.joinpath("foo.pem").write_text(PRIV_KEY.strip())
tmp_path.joinpath("foo.pub").write_text(PUB_KEY.strip())
tmp_path.joinpath("bar.pem").write_text(PRIV_KEY2.strip())
tmp_path.joinpath("bar.pub").write_text(PUB_KEY2.strip())
msg = b"foo bar"
sig = salt.crypt.sign_message(str(tmpdir.join("foo.pem")), msg)
assert salt.crypt.verify_signature(str(tmpdir.join("foo.pub")), msg, sig)
sig = salt.crypt.sign_message(str(tmp_path.joinpath("foo.pem")), msg)
assert salt.crypt.verify_signature(str(tmp_path.joinpath("foo.pub")), msg, sig)
def test_verify_signature_bad_sig(tmpdir):
tmpdir.join("foo.pem").write(PRIV_KEY.strip())
tmpdir.join("foo.pub").write(PUB_KEY.strip())
tmpdir.join("bar.pem").write(PRIV_KEY2.strip())
tmpdir.join("bar.pub").write(PUB_KEY2.strip())
def test_verify_signature_bad_sig(tmp_path):
tmp_path.joinpath("foo.pem").write_text(PRIV_KEY.strip())
tmp_path.joinpath("foo.pub").write_text(PUB_KEY.strip())
tmp_path.joinpath("bar.pem").write_text(PRIV_KEY2.strip())
tmp_path.joinpath("bar.pub").write_text(PUB_KEY2.strip())
msg = b"foo bar"
sig = salt.crypt.sign_message(str(tmpdir.join("foo.pem")), msg)
assert not salt.crypt.verify_signature(str(tmpdir.join("bar.pub")), msg, sig)
sig = salt.crypt.sign_message(str(tmp_path.joinpath("foo.pem")), msg)
assert not salt.crypt.verify_signature(str(tmp_path.joinpath("bar.pub")), msg, sig)

View file

@ -215,42 +215,44 @@ AES_KEY = "8wxWlOaMMQ4d3yT74LL4+hGrGTf65w8VgrcNjLJeLRQ2Q6zMa8ItY2EQUgMKKDb7JY+Rn
@pytest.fixture
def pki_dir(tmpdir):
madir = tmpdir.mkdir("master")
def pki_dir(tmp_path):
_pki_dir = tmp_path / "pki"
_pki_dir.mkdir()
madir = _pki_dir / "master"
madir.mkdir()
mapriv = madir.join("master.pem")
mapriv.write(MASTER_PRIV_KEY.strip())
mapub = madir.join("master.pub")
mapub.write(MASTER_PUB_KEY.strip())
mapriv = madir / "master.pem"
mapriv.write_text(MASTER_PRIV_KEY.strip())
mapub = madir / "master.pub"
mapub.write_text(MASTER_PUB_KEY.strip())
maspriv = madir.join("master_sign.pem")
maspriv.write(MASTER_SIGNING_PRIV.strip())
maspub = madir.join("master_sign.pub")
maspub.write(MASTER_SIGNING_PUB.strip())
maspriv = madir / "master_sign.pem"
maspriv.write_text(MASTER_SIGNING_PRIV.strip())
maspub = madir / "master_sign.pub"
maspub.write_text(MASTER_SIGNING_PUB.strip())
mipub = madir.mkdir("minions").join("minion")
mipub.write(MINION_PUB_KEY.strip())
misdir = madir / "minions"
misdir.mkdir()
misdir.joinpath("minion").write_text(MINION_PUB_KEY.strip())
for sdir in [
"minions_autosign",
"minions_denied",
"minions_pre",
"minions_rejected",
]:
madir.mkdir(sdir)
madir.joinpath(sdir).mkdir()
midir = tmpdir.mkdir("minion")
mipub = midir.join("minion.pub")
mipub.write(MINION_PUB_KEY.strip())
mipriv = midir.join("minion.pem")
mipriv.write(MINION_PRIV_KEY.strip())
mimapriv = midir.join("minion_master.pub")
mimapriv.write(MASTER_PUB_KEY.strip())
mimaspriv = midir.join("master_sign.pub")
mimaspriv.write(MASTER_SIGNING_PUB.strip())
try:
yield tmpdir
finally:
tmpdir.remove()
midir = _pki_dir / "minion"
midir.mkdir()
mipub = midir / "minion.pub"
mipub.write_text(MINION_PUB_KEY.strip())
mipriv = midir / "minion.pem"
mipriv.write_text(MINION_PRIV_KEY.strip())
mimapriv = midir / "minion_master.pub"
mimapriv.write_text(MASTER_PUB_KEY.strip())
mimaspriv = midir / "master_sign.pub"
mimaspriv.write_text(MASTER_SIGNING_PUB.strip())
yield _pki_dir
def test_master_uri():
@ -581,7 +583,7 @@ def test_req_server_chan_encrypt_v2(pki_dir):
"zmq_monitor": False,
"mworker_queue_niceness": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("master")),
"pki_dir": str(pki_dir.joinpath("master")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
@ -594,7 +596,7 @@ def test_req_server_chan_encrypt_v2(pki_dir):
assert "key" in ret
assert dictkey in ret
key = salt.crypt.get_rsa_key(str(pki_dir.join("minion", "minion.pem")), None)
key = salt.crypt.get_rsa_key(str(pki_dir.joinpath("minion", "minion.pem")), None)
if HAS_M2:
aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
else:
@ -625,7 +627,7 @@ def test_req_server_chan_encrypt_v1(pki_dir):
"zmq_monitor": False,
"mworker_queue_niceness": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("master")),
"pki_dir": str(pki_dir.joinpath("master")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
@ -639,7 +641,7 @@ def test_req_server_chan_encrypt_v1(pki_dir):
assert "key" in ret
assert dictkey in ret
key = salt.crypt.get_rsa_key(str(pki_dir.join("minion", "minion.pem")), None)
key = salt.crypt.get_rsa_key(str(pki_dir.joinpath("minion", "minion.pem")), None)
if HAS_M2:
aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
else:
@ -658,12 +660,12 @@ def test_req_chan_decode_data_dict_entry_v1(pki_dir):
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("minion")),
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
}
master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
master_opts = dict(opts, pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
client = salt.channel.client.ReqChannel.factory(opts, io_loop=mockloop)
dictkey = "pillar"
@ -689,12 +691,12 @@ async def test_req_chan_decode_data_dict_entry_v2(pki_dir):
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("minion")),
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
}
master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
master_opts = dict(opts, pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
client = salt.channel.client.AsyncReqChannel.factory(opts, io_loop=mockloop)
@ -752,12 +754,12 @@ async def test_req_chan_decode_data_dict_entry_v2_bad_nonce(pki_dir):
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("minion")),
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
}
master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
master_opts = dict(opts, pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
client = salt.channel.client.AsyncReqChannel.factory(opts, io_loop=mockloop)
@ -815,12 +817,12 @@ async def test_req_chan_decode_data_dict_entry_v2_bad_signature(pki_dir):
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("minion")),
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
}
master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
master_opts = dict(opts, pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
client = salt.channel.client.AsyncReqChannel.factory(opts, io_loop=mockloop)
@ -894,12 +896,12 @@ async def test_req_chan_decode_data_dict_entry_v2_bad_key(pki_dir):
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("minion")),
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
}
master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
master_opts = dict(opts, pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
client = salt.channel.client.AsyncReqChannel.factory(opts, io_loop=mockloop)
@ -980,7 +982,7 @@ async def test_req_serv_auth_v1(pki_dir):
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("minion")),
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
@ -999,19 +1001,21 @@ async def test_req_serv_auth_v1(pki_dir):
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
master_opts = dict(opts, pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
server.cache_cli = False
server.master_key = salt.crypt.MasterKeys(server.opts)
pub = salt.crypt.get_rsa_pub_key(str(pki_dir.join("minion", "minion.pub")))
pub = salt.crypt.get_rsa_pub_key(str(pki_dir.joinpath("minion", "minion.pub")))
token = salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string())
nonce = uuid.uuid4().hex
# We need to read the public key with fopen otherwise the newlines might
# not match on windows.
with salt.utils.files.fopen(str(pki_dir.join("minion", "minion.pub")), "r") as fp:
with salt.utils.files.fopen(
str(pki_dir.joinpath("minion", "minion.pub")), "r"
) as fp:
pub_key = fp.read()
load = {
@ -1032,7 +1036,7 @@ async def test_req_serv_auth_v2(pki_dir):
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("minion")),
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
@ -1051,19 +1055,21 @@ async def test_req_serv_auth_v2(pki_dir):
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
master_opts = dict(opts, pki_dir=str(pki_dir.joinpath("master")))
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
server.cache_cli = False
server.master_key = salt.crypt.MasterKeys(server.opts)
pub = salt.crypt.get_rsa_pub_key(str(pki_dir.join("minion", "minion.pub")))
pub = salt.crypt.get_rsa_pub_key(str(pki_dir.joinpath("minion", "minion.pub")))
token = salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string())
nonce = uuid.uuid4().hex
# We need to read the public key with fopen otherwise the newlines might
# not match on windows.
with salt.utils.files.fopen(str(pki_dir.join("minion", "minion.pub")), "r") as fp:
with salt.utils.files.fopen(
str(pki_dir.joinpath("minion", "minion.pub")), "r"
) as fp:
pub_key = fp.read()
load = {
@ -1086,7 +1092,7 @@ async def test_req_chan_auth_v2(pki_dir, io_loop):
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("minion")),
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
@ -1104,7 +1110,7 @@ async def test_req_chan_auth_v2(pki_dir, io_loop):
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
master_opts = dict(opts, pki_dir=str(pki_dir.joinpath("master")))
master_opts["master_sign_pubkey"] = False
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
@ -1134,7 +1140,7 @@ async def test_req_chan_auth_v2_with_master_signing(pki_dir, io_loop):
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("minion")),
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
@ -1152,7 +1158,7 @@ async def test_req_chan_auth_v2_with_master_signing(pki_dir, io_loop):
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
master_opts = dict(opts, pki_dir=str(pki_dir.joinpath("master")))
master_opts["master_sign_pubkey"] = True
master_opts["master_use_pubkey_signature"] = False
master_opts["signing_key_pass"] = True
@ -1167,8 +1173,8 @@ async def test_req_chan_auth_v2_with_master_signing(pki_dir, io_loop):
opts["master"] = "master"
assert (
pki_dir.join("minion", "minion_master.pub").read()
== pki_dir.join("master", "master.pub").read()
pki_dir.joinpath("minion", "minion_master.pub").read_text()
== pki_dir.joinpath("master", "master.pub").read_text()
)
client = salt.channel.client.AsyncReqChannel.factory(opts, io_loop=io_loop)
@ -1189,12 +1195,12 @@ async def test_req_chan_auth_v2_with_master_signing(pki_dir, io_loop):
assert "publish_port" in ret
# Now create a new master key pair and try auth with it.
mapriv = pki_dir.join("master", "master.pem")
mapriv.remove()
mapriv.write(MASTER2_PRIV_KEY.strip())
mapub = pki_dir.join("master", "master.pub")
mapub.remove()
mapub.write(MASTER2_PUB_KEY.strip())
mapriv = pki_dir.joinpath("master", "master.pem")
mapriv.unlink()
mapriv.write_text(MASTER2_PRIV_KEY.strip())
mapub = pki_dir.joinpath("master", "master.pub")
mapub.unlink()
mapub.write_text(MASTER2_PUB_KEY.strip())
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
@ -1211,14 +1217,14 @@ async def test_req_chan_auth_v2_with_master_signing(pki_dir, io_loop):
assert "publish_port" in ret
assert (
pki_dir.join("minion", "minion_master.pub").read()
== pki_dir.join("master", "master.pub").read()
pki_dir.joinpath("minion", "minion_master.pub").read_text()
== pki_dir.joinpath("master", "master.pub").read_text()
)
async def test_req_chan_auth_v2_new_minion_with_master_pub(pki_dir, io_loop):
pki_dir.join("master", "minions", "minion").remove()
pki_dir.joinpath("master", "minions", "minion").unlink()
mockloop = MagicMock()
opts = {
"master_uri": "tcp://127.0.0.1:4506",
@ -1226,7 +1232,7 @@ async def test_req_chan_auth_v2_new_minion_with_master_pub(pki_dir, io_loop):
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("minion")),
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
@ -1245,7 +1251,7 @@ async def test_req_chan_auth_v2_new_minion_with_master_pub(pki_dir, io_loop):
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
master_opts = dict(opts, pki_dir=str(pki_dir.joinpath("master")))
master_opts["master_sign_pubkey"] = False
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
@ -1267,15 +1273,15 @@ async def test_req_chan_auth_v2_new_minion_with_master_pub(pki_dir, io_loop):
async def test_req_chan_auth_v2_new_minion_with_master_pub_bad_sig(pki_dir, io_loop):
pki_dir.join("master", "minions", "minion").remove()
pki_dir.joinpath("master", "minions", "minion").unlink()
# Give the master a different key than the minion has.
mapriv = pki_dir.join("master", "master.pem")
mapriv.remove()
mapriv.write(MASTER2_PRIV_KEY.strip())
mapub = pki_dir.join("master", "master.pub")
mapub.remove()
mapub.write(MASTER2_PUB_KEY.strip())
mapriv = pki_dir.joinpath("master", "master.pem")
mapriv.unlink()
mapriv.write_text(MASTER2_PRIV_KEY.strip())
mapub = pki_dir.joinpath("master", "master.pub")
mapub.unlink()
mapub.write_text(MASTER2_PUB_KEY.strip())
mockloop = MagicMock()
opts = {
@ -1284,7 +1290,7 @@ async def test_req_chan_auth_v2_new_minion_with_master_pub_bad_sig(pki_dir, io_l
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("minion")),
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
@ -1303,7 +1309,7 @@ async def test_req_chan_auth_v2_new_minion_with_master_pub_bad_sig(pki_dir, io_l
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
master_opts = dict(opts, pki_dir=str(pki_dir.joinpath("master")))
master_opts["master_sign_pubkey"] = False
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)
@ -1325,8 +1331,8 @@ async def test_req_chan_auth_v2_new_minion_with_master_pub_bad_sig(pki_dir, io_l
async def test_req_chan_auth_v2_new_minion_without_master_pub(pki_dir, io_loop):
pki_dir.join("master", "minions", "minion").remove()
pki_dir.join("minion", "minion_master.pub").remove()
pki_dir.joinpath("master", "minions", "minion").unlink()
pki_dir.joinpath("minion", "minion_master.pub").unlink()
mockloop = MagicMock()
opts = {
"master_uri": "tcp://127.0.0.1:4506",
@ -1334,7 +1340,7 @@ async def test_req_chan_auth_v2_new_minion_without_master_pub(pki_dir, io_loop):
"ret_port": 4506,
"ipv6": False,
"sock_dir": ".",
"pki_dir": str(pki_dir.join("minion")),
"pki_dir": str(pki_dir.joinpath("minion")),
"id": "minion",
"__role": "minion",
"keysize": 4096,
@ -1353,7 +1359,7 @@ async def test_req_chan_auth_v2_new_minion_without_master_pub(pki_dir, io_loop):
),
"reload": salt.crypt.Crypticle.generate_key_string,
}
master_opts = dict(opts, pki_dir=str(pki_dir.join("master")))
master_opts = dict(opts, pki_dir=str(pki_dir.joinpath("master")))
master_opts["master_sign_pubkey"] = False
server = salt.channel.server.ReqServerChannel.factory(master_opts)
server.auto_key = salt.daemons.masterapi.AutoKey(server.opts)

View file

@ -18,17 +18,17 @@ from salt.utils.templates import render_jinja_tmpl
@pytest.fixture
def minion_opts(tmpdir):
def minion_opts(tmp_path):
_opts = salt.config.DEFAULT_MINION_OPTS.copy()
_opts.update(
{
"cachedir": tmpdir.join("jinja-template-cache").strpath,
"cachedir": str(tmp_path / "jinja-template-cache"),
"file_buffer_size": 1048576,
"file_client": "local",
"file_ignore_regex": None,
"file_ignore_glob": None,
"file_roots": {"test": [tmpdir.join("templates").strpath]},
"pillar_roots": {"test": [tmpdir.join("templates").strpath]},
"file_roots": {"test": [str(tmp_path / "templates")]},
"pillar_roots": {"test": [str(tmp_path / "templates")]},
"fileserver_backend": ["roots"],
"hash_type": "md5",
"extension_modules": os.path.join(

View file

@ -3,7 +3,7 @@ import salt.utils.msgpack
from tests.support.mock import MagicMock, patch
def test_load_encoding(tmpdir):
def test_load_encoding(tmp_path):
"""
test when using msgpack version >= 1.0.0 we
can still load/dump when using unsupported
@ -12,13 +12,13 @@ def test_load_encoding(tmpdir):
https://github.com/msgpack/msgpack-python/blob/master/ChangeLog.rst
"""
fname = tmpdir.join("test_load_encoding.txt")
fname = str(tmp_path / "test_load_encoding.txt")
kwargs = {"encoding": "utf-8"}
data = [1, 2, 3]
with patch.object(salt.utils.msgpack, "version", (1, 0, 0)):
with salt.utils.files.fopen(fname.strpath, "wb") as wfh:
with salt.utils.files.fopen(fname, "wb") as wfh:
salt.utils.msgpack.dump(data, wfh)
with salt.utils.files.fopen(fname.strpath, "rb") as rfh:
with salt.utils.files.fopen(fname, "rb") as rfh:
ret = salt.utils.msgpack.load(rfh, **kwargs)
assert ret == data
@ -27,12 +27,12 @@ def test_load_encoding(tmpdir):
@pytest.mark.parametrize(
"version,encoding", [((2, 1, 3), False), ((1, 0, 0), False), ((0, 6, 2), True)]
)
def test_load_multiple_versions(version, encoding, tmpdir):
def test_load_multiple_versions(version, encoding, tmp_path):
"""
test when using msgpack on multiple versions that
we only remove encoding on >= 1.0.0
"""
fname = tmpdir.join("test_load_multipl_versions.txt")
fname = str(tmp_path / "test_load_multipl_versions.txt")
with patch.object(salt.utils.msgpack, "version", version):
data = [1, 2, 3]
@ -44,14 +44,14 @@ def test_load_multiple_versions(version, encoding, tmpdir):
kwargs = {"encoding": "utf-8"}
with patch_dump, patch_load:
with salt.utils.files.fopen(fname.strpath, "wb") as wfh:
with salt.utils.files.fopen(fname, "wb") as wfh:
salt.utils.msgpack.dump(data, wfh, encoding="utf-8")
if encoding:
assert "encoding" in mock_dump.call_args.kwargs
else:
assert "encoding" not in mock_dump.call_args.kwargs
with salt.utils.files.fopen(fname.strpath, "rb") as rfh:
with salt.utils.files.fopen(fname, "rb") as rfh:
salt.utils.msgpack.load(rfh, **kwargs)
if encoding:
assert "encoding" in mock_load.call_args.kwargs