diff --git a/tests/pytests/unit/channel/test_request_channel.py b/tests/pytests/unit/channel/test_request_channel.py index 86b58cbd2e5..68bda4700ae 100644 --- a/tests/pytests/unit/channel/test_request_channel.py +++ b/tests/pytests/unit/channel/test_request_channel.py @@ -356,14 +356,13 @@ def run_loop_in_thread(loop, evt): """ asyncio.set_event_loop(loop.asyncio_loop) - @tornado.gen.coroutine - def stopper(): - yield tornado.gen.sleep(0.1) + async def stopper(): + await asyncio.sleep(0.1) while True: if not evt.is_set(): loop.stop() break - yield tornado.gen.sleep(0.3) + await asyncio.sleep(0.3) loop.add_callback(evt.set) loop.add_callback(stopper) @@ -463,7 +462,7 @@ def test_payload_handling_exception(temp_salt_minion, temp_salt_master): with MockSaltMinionMaster(temp_salt_minion, temp_salt_master) as minion_master: with patch.object(minion_master.mock, "_handle_payload_hook") as _mock: _mock.side_effect = Exception() - ret = minion_master.channel.send({}, timeout=2, tries=1) + ret = minion_master.channel.send({}, timeout=15, tries=1) assert ret == "Some exception handling minion payload" diff --git a/tests/pytests/unit/transport/test_publish_client.py b/tests/pytests/unit/transport/test_publish_client.py index 3a4ee21a9fd..b0441426cfc 100644 --- a/tests/pytests/unit/transport/test_publish_client.py +++ b/tests/pytests/unit/transport/test_publish_client.py @@ -2,8 +2,11 @@ :codeauthor: Thomas Jackson """ +import asyncio import hashlib import logging +import socket +import time import pytest import tornado.ioloop @@ -186,8 +189,6 @@ async def test_publish_client_connect_server_comes_up(transport, io_loop): host = "127.0.0.1" port = 11122 if transport == "zeromq": - import asyncio - import zmq ctx = zmq.asyncio.Context() @@ -200,11 +201,11 @@ async def test_publish_client_connect_server_comes_up(transport, io_loop): await client.connect() assert client._socket - socket = ctx.socket(zmq.PUB) - socket.setsockopt(zmq.BACKLOG, 1000) - socket.setsockopt(zmq.LINGER, -1) - socket.setsockopt(zmq.SNDHWM, 1000) - socket.bind(uri) + sock = ctx.socket(zmq.PUB) + sock.setsockopt(zmq.BACKLOG, 1000) + sock.setsockopt(zmq.LINGER, -1) + sock.setsockopt(zmq.SNDHWM, 1000) + sock.bind(uri) await asyncio.sleep(20) async def recv(): @@ -212,17 +213,15 @@ async def test_publish_client_connect_server_comes_up(transport, io_loop): task = asyncio.create_task(recv()) # Sleep to allow zmq to do it's thing. - await socket.send(msg) + await sock.send(msg) await task response = task.result() assert response client.close() - socket.close() + sock.close() await asyncio.sleep(0.03) ctx.term() elif transport == "tcp": - import asyncio - import socket client = salt.transport.tcp.TCPPubClient(opts, io_loop, host=host, port=port) # XXX: This is an implimentation detail of the tcp transport. @@ -240,7 +239,19 @@ async def test_publish_client_connect_server_comes_up(transport, io_loop): msg = salt.payload.dumps({"meh": 123}) msg = salt.transport.frame.frame_msg(msg, header=None) - conn, addr = sock.accept() + + # This loop and timeout is needed to reliably run this test on windows. + start = time.monotonic() + while True: + try: + conn, addr = sock.accept() + except BlockingIOError: + await asyncio.sleep(0.3) + if time.monotonic() - start > 30: + raise TimeoutError("No connection after 30 seconds") + else: + break + conn.send(msg) response = await client.recv() assert response