This commit is contained in:
Daniel A. Wozniak 2023-07-21 00:50:10 -07:00 committed by Gareth J. Greenaway
parent 7d50c786e3
commit 156e75f345
10 changed files with 24 additions and 45 deletions

View file

@ -254,7 +254,9 @@ class PublishClient:
"""
raise NotImplementedError
async def connect(self, port=None, connect_callback=None, disconnect_callback=None):
async def connect(
self, port=None, connect_callback=None, disconnect_callback=None, timeout=None
):
"""
Create a network connection to the the PublishServer or broker.
"""

View file

@ -214,7 +214,7 @@ class IPCServer:
log.error("Exception occurred while handling stream: %s", exc)
def handle_connection(self, connection, address):
log.error(
log.trace(
"IPCServer: Handling connection to address: %s",
address if address else connection,
)

View file

@ -225,7 +225,6 @@ class TCPPubClient(salt.transport.base.PublishClient):
"connect",
"connect_uri",
"recv",
# "close",
]
close_methods = [
"close",
@ -289,8 +288,8 @@ class TCPPubClient(salt.transport.base.PublishClient):
"source_port": self.source_port,
}
stream = None
timeout = kwargs.get("timeout", None)
start = time.time()
timeout = kwargs.get("timeout", None)
while stream is None and (not self._closed and not self._closing):
try:
if self.host and self.port:
@ -450,7 +449,7 @@ class TCPPubClient(salt.transport.base.PublishClient):
if self.disconnect_callback:
self.disconnect_callback()
await self.connect()
log.error("Re-connected - continue")
log.debug("Re-connected - continue")
continue
# except AttributeError:
# return
@ -1178,7 +1177,6 @@ class PubServer(tornado.tcpserver.TCPServer):
log.trace(
"TCP PubServer sending payload: topic_list=%r %r", topic_list, package
)
# log.error("PUBLISH PAYLOAD %r", package)
payload = salt.transport.frame.frame_msg(package)
to_remove = []
if topic_list:
@ -1198,7 +1196,6 @@ class PubServer(tornado.tcpserver.TCPServer):
else:
for client in self.clients:
try:
# log.error("PUBLISH CLIENT %r", package)
# Write the packed str
await client.stream.write(payload)
except tornado.iostream.StreamClosedError:
@ -1512,7 +1509,6 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
"""
Publish "load" to minions
"""
# log.error("PUBLISH %r", payload)
if not self.pub_sock:
self.connect()
self.pub_sock.send(payload)
@ -1811,11 +1807,9 @@ class TCPReqClient(salt.transport.base.RequestClient):
if message_id in self.send_future_map:
self.send_future_map.pop(message_id).set_result(body)
# self.remove_message_timeout(message_id)
else:
if self._on_recv is not None:
self.io_loop.spawn_callback(self._on_recv, header, body)
# await self._on_recv(header, body)
else:
log.error(
"Got response for message_id %s that we are not"
@ -1881,12 +1875,6 @@ class TCPReqClient(salt.transport.base.RequestClient):
return self._mid
def remove_message_timeout(self, message_id):
if message_id not in self.send_timeout_map:
return
timeout = self.send_timeout_map.pop(message_id)
self.io_loop.remove_timeout(timeout)
def timeout_message(self, message_id, msg):
if message_id not in self.send_future_map:
return

View file

@ -250,7 +250,9 @@ class PublishClient(salt.transport.base.PublishClient):
self.close()
# TODO: this is the time to see if we are connected, maybe use the req channel to guess?
async def connect(self, port=None, connect_callback=None, disconnect_callback=None):
async def connect(
self, port=None, connect_callback=None, disconnect_callback=None, timeout=None
):
self.connect_called = True
if port is not None:
self.port = port

View file

@ -409,7 +409,7 @@ class SaltEvent:
try:
# self.subscriber.connect(timeout=timeout)
log.debug("Event connect subscriber %r", self.pub_path)
self.subscriber.connect()
self.subscriber.connect(timeout=timeout)
self.cpub = True
except tornado.iostream.StreamClosedError:
log.error("Encountered StreamClosedException")

View file

@ -137,7 +137,7 @@ def _connect_and_publish(
io_loop.stop()
channel.on_recv(cb)
log.error("TEST - RUN PUBLISH")
log.info("TEST - RUN PUBLISH")
io_loop.spawn_callback(
server.publish, {"tgt_type": "glob", "tgt": [channel_minion_id], "WTF": "SON"}
)

View file

@ -118,9 +118,9 @@ async def test_clean_by_request(sock_dir, io_loop):
"""
with eventpublisher_process(sock_dir):
log.error("After event pubserver start")
log.info("After event pubserver start")
with salt.utils.event.MasterEvent(sock_dir) as me:
log.error("After master event start %r", me)
log.info("After master event start %r", me)
request1 = Request()
request2 = Request()
event_listener = saltnado.EventListener(

View file

@ -28,23 +28,23 @@ def server(config):
async def handle_stream(self, stream, address):
try:
log.error("Got stream %r", self.disconnect)
log.info("Got stream %r", self.disconnect)
while self.disconnect is False:
for msg in self.send[:]:
msg = self.send.pop(0)
try:
log.error("Write %r", msg)
log.info("Write %r", msg)
await stream.write(msg)
except tornado.iostream.StreamClosedError:
log.error("Stream Closed Error From Test Server")
break
else:
log.error("SLEEP")
log.info("Sleep")
await asyncio.sleep(1)
log.error("Close stream")
log.info("Close stream")
finally:
stream.close()
log.error("After close stream")
log.info("After close stream")
server = TestServer()
try:
@ -89,42 +89,32 @@ async def test_message_client_reconnect(config, client, server):
# Send one full and one partial msg to the client.
partial = pmsg[:40]
log.error("Send partial %r", partial)
log.info("Send partial %r", partial)
server.send.append(partial)
while not received:
log.error("wait received")
log.info("wait received")
await asyncio.sleep(1)
log.error("assert received")
log.info("assert received")
assert received == [msg]
# log.error("sleep")
# log.info("sleep")
# await asyncio.sleep(1)
# The message client has unpacked one msg and there is a partial msg left in
# the unpacker. Closing the stream now leaves the unpacker in a bad state
# since the rest of the partil message will never be received.
log.error("disconnect")
server.disconnect = True
log.error("sleep")
await asyncio.sleep(1)
log.error("after sleep")
log.error("disconnect false")
server.disconnect = False
log.error("sleep")
await asyncio.sleep(1)
log.error("after sleep")
log.error("Disconnect False")
received = []
# Prior to the fix for #60831, the unpacker would be left in a broken state
# resulting in either a TypeError or BufferFull error from msgpack. The
# rest of this test would fail.
log.error("Send pmsg %r", pmsg)
server.send.append(pmsg)
log.error("After - Send pmsg %r", pmsg)
while not received:
await tornado.gen.sleep(1)
log.error("received %r", received)
assert received == [msg, msg]
server.disconnect = True

View file

@ -45,7 +45,7 @@ def test_minion_hangs_on_master_failure_50814(
break
time.sleep(0.5)
def wait_for_minion(salt_cli, tgt, timeout=30):
def wait_for_minion(salt_cli, tgt, timeout=60):
start = time.time()
while True:
ret = salt_cli.run(

View file

@ -87,7 +87,7 @@ class Collector(salt.utils.process.SignalHandlingProcess):
self.sock.setsockopt(zmq.LINGER, -1)
self.sock.setsockopt(zmq.SUBSCRIBE, b"")
pub_uri = "tcp://{}:{}".format(self.interface, self.port)
log.error("Collector listen %s", pub_uri)
log.info("Collector listen %s", pub_uri)
self.sock.connect(pub_uri)
else:
end = time.time() + 120
@ -105,20 +105,17 @@ class Collector(salt.utils.process.SignalHandlingProcess):
@tornado.gen.coroutine
def _recv(self):
# log.error("RECV %s", self.transport)
if self.transport == "zeromq":
# test_zeromq_filtering requires catching the
# SaltDeserializationError in order to pass.
try:
payload = self.sock.recv(zmq.NOBLOCK)
# log.error("ZMQ Payload is %r", payload)
serial_payload = salt.payload.loads(payload)
raise tornado.gen.Return(serial_payload)
except (zmq.ZMQError, salt.exceptions.SaltDeserializationError):
raise RecvError("ZMQ Error")
else:
for msg in self.unpacker:
# log.error("TCP Payload is %r", msg)
serial_payload = salt.payload.loads(msg["body"])
# raise tornado.gen.Return(msg["body"])
raise tornado.gen.Return(serial_payload)