mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Fix tests not properly fixed during the merge-forward
Signed-off-by: Pedro Algarvio <palgarvio@vmware.com>
This commit is contained in:
parent
890df5021b
commit
2631170a6f
2 changed files with 19 additions and 13 deletions
|
@ -7,6 +7,7 @@ import time
|
|||
from contextlib import contextmanager
|
||||
|
||||
import pytest
|
||||
import tornado.ioloop
|
||||
from saltfactories.utils import random_string
|
||||
|
||||
import salt.transport.zeromq
|
||||
|
@ -32,9 +33,15 @@ class PubServerChannelSender:
|
|||
self.payload_list = payload_list
|
||||
|
||||
def run(self):
|
||||
loop = tornado.ioloop.IOLoop()
|
||||
loop.add_callback(self._run, loop)
|
||||
loop.start()
|
||||
|
||||
async def _run(self, loop):
|
||||
for payload in self.payload_list:
|
||||
self.pub_server_channel.publish(payload)
|
||||
time.sleep(2)
|
||||
await self.pub_server_channel.publish(payload)
|
||||
await asyncio.sleep(2)
|
||||
loop.stop()
|
||||
|
||||
|
||||
def generate_msg_list(msg_cnt, minions_list, broadcast):
|
||||
|
|
|
@ -7,7 +7,6 @@ import socket
|
|||
import time
|
||||
|
||||
import pytest
|
||||
import tornado.gen
|
||||
import tornado.ioloop
|
||||
import tornado.iostream
|
||||
import zmq
|
||||
|
@ -118,8 +117,7 @@ class Collector(salt.utils.process.SignalHandlingProcess):
|
|||
break
|
||||
self.sock = tornado.iostream.IOStream(sock)
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def _recv(self):
|
||||
async def _recv(self):
|
||||
if self.transport == "zeromq":
|
||||
# test_zeromq_filtering requires catching the
|
||||
# SaltDeserializationError in order to pass.
|
||||
|
@ -138,26 +136,25 @@ class Collector(salt.utils.process.SignalHandlingProcess):
|
|||
log.debug(
|
||||
"Publish received for not this minion: %s", message_target
|
||||
)
|
||||
raise salt.ext.tornado.gen.Return(None)
|
||||
return
|
||||
serial_payload = salt.payload.loads(messages[1])
|
||||
else:
|
||||
raise Exception("Invalid number of messages")
|
||||
raise salt.ext.tornado.gen.Return(serial_payload)
|
||||
return serial_payload
|
||||
except (zmq.ZMQError, salt.exceptions.SaltDeserializationError):
|
||||
raise RecvError("ZMQ Error")
|
||||
else:
|
||||
for msg in self.unpacker:
|
||||
serial_payload = salt.payload.loads(msg["body"])
|
||||
raise tornado.gen.Return(serial_payload)
|
||||
byts = yield self.sock.read_bytes(8096, partial=True)
|
||||
return serial_payload
|
||||
byts = await self.sock.read_bytes(8096, partial=True)
|
||||
self.unpacker.feed(byts)
|
||||
for msg in self.unpacker:
|
||||
serial_payload = salt.payload.loads(msg["body"])
|
||||
raise tornado.gen.Return(serial_payload)
|
||||
return serial_payload
|
||||
raise RecvError("TCP Error")
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def _run(self, loop):
|
||||
async def _run(self, loop):
|
||||
try:
|
||||
self._setup_listener()
|
||||
except Exception: # pylint: disable=broad-except
|
||||
|
@ -179,7 +176,7 @@ class Collector(salt.utils.process.SignalHandlingProcess):
|
|||
log.error("Receive timeout reached in test collector!")
|
||||
break
|
||||
try:
|
||||
payload = yield self._recv()
|
||||
payload = await self._recv()
|
||||
except RecvError:
|
||||
time.sleep(0.03)
|
||||
else:
|
||||
|
@ -329,6 +326,7 @@ class PubServerChannelProcess(salt.utils.process.SignalHandlingProcess):
|
|||
self.publish({"tgt_type": "glob", "tgt": "*", "jid": -1, "start": True})
|
||||
if self.collector.running.wait(1) is True:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
attempts -= 1
|
||||
else:
|
||||
pytest.fail("Failed to confirm the collector has started")
|
||||
|
@ -341,6 +339,7 @@ class PubServerChannelProcess(salt.utils.process.SignalHandlingProcess):
|
|||
self.publish({"tgt_type": "glob", "tgt": "*", "jid": -1, "stop": True})
|
||||
if self.collector.stop_running.wait(1) is True:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
attempts -= 1
|
||||
else:
|
||||
pytest.fail("Failed to confirm the collector has stopped")
|
||||
|
|
Loading…
Add table
Reference in a new issue