From 2631170a6f0a2051b0c709a49ea98b00caf2f4b3 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Fri, 26 Jan 2024 06:20:11 +0000 Subject: [PATCH] Fix tests not properly fixed during the merge-forward Signed-off-by: Pedro Algarvio --- .../zeromq/test_pub_server_channel.py | 11 ++++++++-- tests/support/pytest/transport.py | 21 +++++++++---------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/tests/pytests/functional/transport/zeromq/test_pub_server_channel.py b/tests/pytests/functional/transport/zeromq/test_pub_server_channel.py index 790d3ce2bde..2fc630d6efc 100644 --- a/tests/pytests/functional/transport/zeromq/test_pub_server_channel.py +++ b/tests/pytests/functional/transport/zeromq/test_pub_server_channel.py @@ -7,6 +7,7 @@ import time from contextlib import contextmanager import pytest +import tornado.ioloop from saltfactories.utils import random_string import salt.transport.zeromq @@ -32,9 +33,15 @@ class PubServerChannelSender: self.payload_list = payload_list def run(self): + loop = tornado.ioloop.IOLoop() + loop.add_callback(self._run, loop) + loop.start() + + async def _run(self, loop): for payload in self.payload_list: - self.pub_server_channel.publish(payload) - time.sleep(2) + await self.pub_server_channel.publish(payload) + await asyncio.sleep(2) + loop.stop() def generate_msg_list(msg_cnt, minions_list, broadcast): diff --git a/tests/support/pytest/transport.py b/tests/support/pytest/transport.py index bfb69baa3ad..2b4e62cc24d 100644 --- a/tests/support/pytest/transport.py +++ b/tests/support/pytest/transport.py @@ -7,7 +7,6 @@ import socket import time import pytest -import tornado.gen import tornado.ioloop import tornado.iostream import zmq @@ -118,8 +117,7 @@ class Collector(salt.utils.process.SignalHandlingProcess): break self.sock = tornado.iostream.IOStream(sock) - @tornado.gen.coroutine - def _recv(self): + async def _recv(self): if self.transport == "zeromq": # test_zeromq_filtering requires catching the # SaltDeserializationError in order to pass. @@ -138,26 +136,25 @@ class Collector(salt.utils.process.SignalHandlingProcess): log.debug( "Publish received for not this minion: %s", message_target ) - raise salt.ext.tornado.gen.Return(None) + return serial_payload = salt.payload.loads(messages[1]) else: raise Exception("Invalid number of messages") - raise salt.ext.tornado.gen.Return(serial_payload) + return serial_payload except (zmq.ZMQError, salt.exceptions.SaltDeserializationError): raise RecvError("ZMQ Error") else: for msg in self.unpacker: serial_payload = salt.payload.loads(msg["body"]) - raise tornado.gen.Return(serial_payload) - byts = yield self.sock.read_bytes(8096, partial=True) + return serial_payload + byts = await self.sock.read_bytes(8096, partial=True) self.unpacker.feed(byts) for msg in self.unpacker: serial_payload = salt.payload.loads(msg["body"]) - raise tornado.gen.Return(serial_payload) + return serial_payload raise RecvError("TCP Error") - @tornado.gen.coroutine - def _run(self, loop): + async def _run(self, loop): try: self._setup_listener() except Exception: # pylint: disable=broad-except @@ -179,7 +176,7 @@ class Collector(salt.utils.process.SignalHandlingProcess): log.error("Receive timeout reached in test collector!") break try: - payload = yield self._recv() + payload = await self._recv() except RecvError: time.sleep(0.03) else: @@ -329,6 +326,7 @@ class PubServerChannelProcess(salt.utils.process.SignalHandlingProcess): self.publish({"tgt_type": "glob", "tgt": "*", "jid": -1, "start": True}) if self.collector.running.wait(1) is True: break + time.sleep(0.5) attempts -= 1 else: pytest.fail("Failed to confirm the collector has started") @@ -341,6 +339,7 @@ class PubServerChannelProcess(salt.utils.process.SignalHandlingProcess): self.publish({"tgt_type": "glob", "tgt": "*", "jid": -1, "stop": True}) if self.collector.stop_running.wait(1) is True: break + time.sleep(0.5) attempts -= 1 else: pytest.fail("Failed to confirm the collector has stopped")