import asyncio import pytest import salt.config import salt.transport.tcp from tests.support.mock import MagicMock, patch @pytest.fixture def minion_opts(tmp_path): """ Default minion configuration with relative temporary paths to not require root permissions. """ root_dir = tmp_path / "minion" opts = salt.config.DEFAULT_MINION_OPTS.copy() opts["__role"] = "minion" opts["root_dir"] = str(root_dir) opts["master_uri"] = "tcp://{ip}:{port}".format( ip="127.0.0.1", port=opts["master_port"] ) for name in ("cachedir", "pki_dir", "sock_dir", "conf_dir"): dirpath = root_dir / name dirpath.mkdir(parents=True) opts[name] = str(dirpath) opts["log_file"] = "logs/minion.log" return opts @pytest.fixture def master_opts(tmp_path): """ Default master configuration with relative temporary paths to not require root permissions. """ root_dir = tmp_path / "master" opts = salt.config.master_config(None) opts["__role"] = "master" opts["root_dir"] = str(root_dir) for name in ("cachedir", "pki_dir", "sock_dir", "conf_dir"): dirpath = root_dir / name dirpath.mkdir(parents=True) opts[name] = str(dirpath) opts["log_file"] = "logs/master.log" return opts @pytest.fixture def syndic_opts(tmp_path): """ Default master configuration with relative temporary paths to not require root permissions. """ root_dir = tmp_path / "syndic" opts = salt.config.DEFAULT_MINION_OPTS.copy() opts["syndic_master"] = "127.0.0.1" opts["__role"] = "minion" opts["root_dir"] = str(root_dir) for name in ("cachedir", "pki_dir", "sock_dir", "conf_dir"): dirpath = root_dir / name dirpath.mkdir(parents=True) opts[name] = str(dirpath) opts["log_file"] = "logs/syndic.log" return opts @pytest.fixture def mocked_tcp_pub_client(): transport = MagicMock(spec=salt.transport.tcp.TCPPubClient) transport.connect = MagicMock() future = asyncio.Future() transport.connect.return_value = future future.set_result(True) with patch("salt.transport.tcp.TCPPubClient", transport): yield