Avoid circular import

This commit is contained in:
Daniel A. Wozniak 2024-06-22 07:40:28 -07:00
parent 9c12b06903
commit 5f1b51901c
4 changed files with 8 additions and 34 deletions

View file

@ -57,9 +57,10 @@ class ReqServerChannel:
def __init__(self, opts, transport):
self.opts = opts
self.transport = transport
# The event and master_key attributes will be populated after fork.
self.event = None
self.master_key = None
self.event = salt.utils.event.get_master_event(
self.opts, self.opts["sock_dir"], listen=False
)
self.master_key = salt.crypt.MasterKeys(self.opts)
@property
def aes_key(self):

View file

@ -22,7 +22,6 @@ from jinja2.environment import TemplateModule
from jinja2.exceptions import TemplateRuntimeError
from jinja2.ext import Extension
import salt.fileclient
import salt.utils.data
import salt.utils.files
import salt.utils.json
@ -93,6 +92,8 @@ class SaltCacheLoader(BaseLoader):
or not hasattr(self._file_client, "opts")
or self._file_client.opts["file_roots"] != self.opts["file_roots"]
):
import salt.fileclient
self._file_client = salt.fileclient.get_file_client(
self.opts, self.pillar_rend
)

View file

@ -155,7 +155,8 @@ def test_master_keys_with_cluster_id(tmp_path, master_opts):
def test_pwdata_decrypt():
key_string = dedent(
"""-----BEGIN RSA PRIVATE KEY-----
"""
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAzhBRyyHa7b63RLE71uKMKgrpulcAJjaIaN68ltXcCvy4w9pi
Kj+4I3Qp6RvUaHOEmymqyjOMjQc6iwpe0scCFqh3nUk5YYaLZ3WAW0htQVlnesgB
ZiBg9PBeTQY/LzqtudL6RCng/AX+fbnCsddlIysRxnUoNVMvz0gAmCY2mnTDjcTt

View file

@ -6,8 +6,6 @@ import pytest
import salt.crypt
from tests.support.mock import MagicMock, MockCall, mock_open, patch
from . import MSG, PRIVKEY_DATA, PUBKEY_DATA, SIG
RSA = pytest.importorskip("Cryptodome.PublicKey.RSA")
try:
@ -96,30 +94,3 @@ def test_gen_keys_with_passphrase(tmp_path):
salt.crypt.gen_keys(key_path, "keyname", 2048)
assert open_priv_wb in m_open.calls
assert open_pub_wb in m_open.calls
def test_sign_message():
key = RSA.importKey(PRIVKEY_DATA)
with patch("salt.crypt.get_rsa_key", return_value=key):
assert SIG == salt.crypt.sign_message("/keydir/keyname.pem", MSG)
def test_sign_message_with_passphrase():
key = RSA.importKey(PRIVKEY_DATA)
with patch("salt.crypt.get_rsa_key", return_value=key):
assert SIG == salt.crypt.sign_message(
"/keydir/keyname.pem", MSG, passphrase="password"
)
def test_verify_signature():
with patch("salt.utils.files.fopen", mock_open(read_data=PUBKEY_DATA)):
assert salt.crypt.verify_signature("/keydir/keyname.pub", MSG, SIG)
def test_bad_key(key_to_test):
"""
Load public key with an invalid header and validate it without m2crypto
"""
key = salt.crypt.get_rsa_pub_key(key_to_test)
assert key.can_encrypt()