From 156e75f34544678b6b86cc19741fc372aedae95f Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Fri, 21 Jul 2023 00:50:10 -0700 Subject: [PATCH] Clean up --- salt/transport/base.py | 4 ++- salt/transport/ipc.py | 2 +- salt/transport/tcp.py | 16 ++--------- salt/transport/zeromq.py | 4 ++- salt/utils/event.py | 2 +- .../pytests/functional/channel/test_server.py | 2 +- .../rest_tornado/test_event_listener.py | 4 +-- .../transport/tcp/test_message_client.py | 28 ++++++------------- .../multimaster/test_offline_master.py | 2 +- tests/support/pytest/transport.py | 5 +--- 10 files changed, 24 insertions(+), 45 deletions(-) diff --git a/salt/transport/base.py b/salt/transport/base.py index a1e236176ef..93a4ed57c53 100644 --- a/salt/transport/base.py +++ b/salt/transport/base.py @@ -254,7 +254,9 @@ class PublishClient: """ raise NotImplementedError - async def connect(self, port=None, connect_callback=None, disconnect_callback=None): + async def connect( + self, port=None, connect_callback=None, disconnect_callback=None, timeout=None + ): """ Create a network connection to the the PublishServer or broker. """ diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 40d996ea234..8493b6548b0 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -214,7 +214,7 @@ class IPCServer: log.error("Exception occurred while handling stream: %s", exc) def handle_connection(self, connection, address): - log.error( + log.trace( "IPCServer: Handling connection to address: %s", address if address else connection, ) diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 433250ddf46..d079432d1c3 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -225,7 +225,6 @@ class TCPPubClient(salt.transport.base.PublishClient): "connect", "connect_uri", "recv", - # "close", ] close_methods = [ "close", @@ -289,8 +288,8 @@ class TCPPubClient(salt.transport.base.PublishClient): "source_port": self.source_port, } stream = None - timeout = kwargs.get("timeout", None) start = time.time() + timeout = kwargs.get("timeout", None) while stream is None and (not self._closed and not self._closing): try: if self.host and self.port: @@ -450,7 +449,7 @@ class TCPPubClient(salt.transport.base.PublishClient): if self.disconnect_callback: self.disconnect_callback() await self.connect() - log.error("Re-connected - continue") + log.debug("Re-connected - continue") continue # except AttributeError: # return @@ -1178,7 +1177,6 @@ class PubServer(tornado.tcpserver.TCPServer): log.trace( "TCP PubServer sending payload: topic_list=%r %r", topic_list, package ) - # log.error("PUBLISH PAYLOAD %r", package) payload = salt.transport.frame.frame_msg(package) to_remove = [] if topic_list: @@ -1198,7 +1196,6 @@ class PubServer(tornado.tcpserver.TCPServer): else: for client in self.clients: try: - # log.error("PUBLISH CLIENT %r", package) # Write the packed str await client.stream.write(payload) except tornado.iostream.StreamClosedError: @@ -1512,7 +1509,6 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer): """ Publish "load" to minions """ - # log.error("PUBLISH %r", payload) if not self.pub_sock: self.connect() self.pub_sock.send(payload) @@ -1811,11 +1807,9 @@ class TCPReqClient(salt.transport.base.RequestClient): if message_id in self.send_future_map: self.send_future_map.pop(message_id).set_result(body) - # self.remove_message_timeout(message_id) else: if self._on_recv is not None: self.io_loop.spawn_callback(self._on_recv, header, body) - # await self._on_recv(header, body) else: log.error( "Got response for message_id %s that we are not" @@ -1881,12 +1875,6 @@ class TCPReqClient(salt.transport.base.RequestClient): return self._mid - def remove_message_timeout(self, message_id): - if message_id not in self.send_timeout_map: - return - timeout = self.send_timeout_map.pop(message_id) - self.io_loop.remove_timeout(timeout) - def timeout_message(self, message_id, msg): if message_id not in self.send_future_map: return diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 87c23ddec82..ab7a7c97ab3 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -250,7 +250,9 @@ class PublishClient(salt.transport.base.PublishClient): self.close() # TODO: this is the time to see if we are connected, maybe use the req channel to guess? - async def connect(self, port=None, connect_callback=None, disconnect_callback=None): + async def connect( + self, port=None, connect_callback=None, disconnect_callback=None, timeout=None + ): self.connect_called = True if port is not None: self.port = port diff --git a/salt/utils/event.py b/salt/utils/event.py index 32c37ed3293..b1438ce12b0 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -409,7 +409,7 @@ class SaltEvent: try: # self.subscriber.connect(timeout=timeout) log.debug("Event connect subscriber %r", self.pub_path) - self.subscriber.connect() + self.subscriber.connect(timeout=timeout) self.cpub = True except tornado.iostream.StreamClosedError: log.error("Encountered StreamClosedException") diff --git a/tests/pytests/functional/channel/test_server.py b/tests/pytests/functional/channel/test_server.py index f13284504ab..8db1f64c057 100644 --- a/tests/pytests/functional/channel/test_server.py +++ b/tests/pytests/functional/channel/test_server.py @@ -137,7 +137,7 @@ def _connect_and_publish( io_loop.stop() channel.on_recv(cb) - log.error("TEST - RUN PUBLISH") + log.info("TEST - RUN PUBLISH") io_loop.spawn_callback( server.publish, {"tgt_type": "glob", "tgt": [channel_minion_id], "WTF": "SON"} ) diff --git a/tests/pytests/functional/netapi/rest_tornado/test_event_listener.py b/tests/pytests/functional/netapi/rest_tornado/test_event_listener.py index c47578357c5..c921dac1341 100644 --- a/tests/pytests/functional/netapi/rest_tornado/test_event_listener.py +++ b/tests/pytests/functional/netapi/rest_tornado/test_event_listener.py @@ -118,9 +118,9 @@ async def test_clean_by_request(sock_dir, io_loop): """ with eventpublisher_process(sock_dir): - log.error("After event pubserver start") + log.info("After event pubserver start") with salt.utils.event.MasterEvent(sock_dir) as me: - log.error("After master event start %r", me) + log.info("After master event start %r", me) request1 = Request() request2 = Request() event_listener = saltnado.EventListener( diff --git a/tests/pytests/functional/transport/tcp/test_message_client.py b/tests/pytests/functional/transport/tcp/test_message_client.py index c9b1b302a1b..b7403d4b6fb 100644 --- a/tests/pytests/functional/transport/tcp/test_message_client.py +++ b/tests/pytests/functional/transport/tcp/test_message_client.py @@ -28,23 +28,23 @@ def server(config): async def handle_stream(self, stream, address): try: - log.error("Got stream %r", self.disconnect) + log.info("Got stream %r", self.disconnect) while self.disconnect is False: for msg in self.send[:]: msg = self.send.pop(0) try: - log.error("Write %r", msg) + log.info("Write %r", msg) await stream.write(msg) except tornado.iostream.StreamClosedError: log.error("Stream Closed Error From Test Server") break else: - log.error("SLEEP") + log.info("Sleep") await asyncio.sleep(1) - log.error("Close stream") + log.info("Close stream") finally: stream.close() - log.error("After close stream") + log.info("After close stream") server = TestServer() try: @@ -89,42 +89,32 @@ async def test_message_client_reconnect(config, client, server): # Send one full and one partial msg to the client. partial = pmsg[:40] - log.error("Send partial %r", partial) + log.info("Send partial %r", partial) server.send.append(partial) while not received: - log.error("wait received") + log.info("wait received") await asyncio.sleep(1) - log.error("assert received") + log.info("assert received") assert received == [msg] - # log.error("sleep") + # log.info("sleep") # await asyncio.sleep(1) # The message client has unpacked one msg and there is a partial msg left in # the unpacker. Closing the stream now leaves the unpacker in a bad state # since the rest of the partil message will never be received. - log.error("disconnect") server.disconnect = True - log.error("sleep") await asyncio.sleep(1) - log.error("after sleep") - log.error("disconnect false") server.disconnect = False - log.error("sleep") await asyncio.sleep(1) - log.error("after sleep") - log.error("Disconnect False") received = [] # Prior to the fix for #60831, the unpacker would be left in a broken state # resulting in either a TypeError or BufferFull error from msgpack. The # rest of this test would fail. - log.error("Send pmsg %r", pmsg) server.send.append(pmsg) - log.error("After - Send pmsg %r", pmsg) while not received: await tornado.gen.sleep(1) - log.error("received %r", received) assert received == [msg, msg] server.disconnect = True diff --git a/tests/pytests/scenarios/multimaster/test_offline_master.py b/tests/pytests/scenarios/multimaster/test_offline_master.py index fe8f6d28bf0..c8ea679328b 100644 --- a/tests/pytests/scenarios/multimaster/test_offline_master.py +++ b/tests/pytests/scenarios/multimaster/test_offline_master.py @@ -45,7 +45,7 @@ def test_minion_hangs_on_master_failure_50814( break time.sleep(0.5) - def wait_for_minion(salt_cli, tgt, timeout=30): + def wait_for_minion(salt_cli, tgt, timeout=60): start = time.time() while True: ret = salt_cli.run( diff --git a/tests/support/pytest/transport.py b/tests/support/pytest/transport.py index c0f01bdfbc6..8b21ab640f2 100644 --- a/tests/support/pytest/transport.py +++ b/tests/support/pytest/transport.py @@ -87,7 +87,7 @@ class Collector(salt.utils.process.SignalHandlingProcess): self.sock.setsockopt(zmq.LINGER, -1) self.sock.setsockopt(zmq.SUBSCRIBE, b"") pub_uri = "tcp://{}:{}".format(self.interface, self.port) - log.error("Collector listen %s", pub_uri) + log.info("Collector listen %s", pub_uri) self.sock.connect(pub_uri) else: end = time.time() + 120 @@ -105,20 +105,17 @@ class Collector(salt.utils.process.SignalHandlingProcess): @tornado.gen.coroutine def _recv(self): - # log.error("RECV %s", self.transport) if self.transport == "zeromq": # test_zeromq_filtering requires catching the # SaltDeserializationError in order to pass. try: payload = self.sock.recv(zmq.NOBLOCK) - # log.error("ZMQ Payload is %r", payload) serial_payload = salt.payload.loads(payload) raise tornado.gen.Return(serial_payload) except (zmq.ZMQError, salt.exceptions.SaltDeserializationError): raise RecvError("ZMQ Error") else: for msg in self.unpacker: - # log.error("TCP Payload is %r", msg) serial_payload = salt.payload.loads(msg["body"]) # raise tornado.gen.Return(msg["body"]) raise tornado.gen.Return(serial_payload)