Request server basic test for all transports

This commit is contained in:
Daniel A. Wozniak 2023-08-13 15:23:21 -07:00 committed by Daniel Wozniak
parent e84dbda010
commit 85c282e51a

View file

@ -0,0 +1,61 @@
import asyncio
import pytest
import salt.transport
import salt.utils.process
def transport_ids(value):
return "Transport({})".format(value)
@pytest.fixture(params=("zeromq", "tcp", "ws"), ids=transport_ids)
def transport(request):
return request.param
@pytest.fixture
def process_manager():
pm = salt.utils.process.ProcessManager()
try:
yield pm
finally:
pm.terminate()
async def test_request_server(
io_loop, minion_opts, master_opts, transport, process_manager
):
minion_opts["transport"] = master_opts["transport"] = transport
# Needed by tcp transport's RequestClient
minion_opts[
"master_uri"
] = f"tcp://{master_opts['interface']}:{master_opts['ret_port']}"
req_server = salt.transport.request_server(master_opts)
req_server.pre_fork(process_manager)
reqmsg = {"req": "test"}
repmsg = {"result": "success"}
requests = []
async def handler(message):
requests.append(message)
return repmsg
req_server.post_fork(handler, io_loop)
req_client = salt.transport.request_client(minion_opts, io_loop)
try:
ret = await req_client.send({"req": "test"})
assert [reqmsg] == requests
assert repmsg == ret
finally:
req_client.close()
req_server.close()
# Yield to loop in order to allow cleanup methods to finish.
await asyncio.sleep(0.3)