This commit is contained in:
Daniel A. Wozniak 2023-06-30 14:58:31 -07:00 committed by Gareth J. Greenaway
parent d931728bfe
commit 7b58472599
3 changed files with 21 additions and 75 deletions

View file

@ -265,8 +265,8 @@ class TCPPubClient(salt.transport.base.PublishClient):
# pylint: disable=W1701
def __del__(self):
self.close()
if not self._closing:
warnings.warn("%r not closed", self)
# pylint: enable=W1701
async def getstream(self, **kwargs):
@ -389,17 +389,14 @@ class TCPPubClient(salt.transport.base.PublishClient):
self._read_in_progress.release()
async def on_recv_handler(self, callback):
log.error("ON RECV HANDLER")
while not self._stream:
await asyncio.sleep(0.003)
while True:
try:
log.error("ON RECV HANDLER - RECV")
msg = await self.recv()
log.error("ON RECV HANDLER - RECVED")
logit = True
except tornado.iostream.StreamClosedError:
log.error("Stream Closed")
log.trace("Stream closed, reconnecting.")
self._stream.close()
self._stream = None
await self._connect()
@ -407,8 +404,8 @@ class TCPPubClient(salt.transport.base.PublishClient):
self.disconnect_callback()
self.unpacker = salt.utils.msgpack.Unpacker()
continue
except:
log.error("Stream Closed", exc_info=True)
except Exception: # py-lint: disable=broad-except
log.error("Unhandled exception in on_recv handler.", exc_info=True)
callback(msg)
def on_recv(self, callback):
@ -755,8 +752,8 @@ class MessageClient:
# pylint: disable=W1701
def __del__(self):
self.io_loop.spawn_callback(self.close)
if not self._closing:
warnings.warn("%r not closed", self)
# pylint: enable=W1701
async def getstream(self, **kwargs):
@ -1024,8 +1021,8 @@ class Subscriber:
# pylint: disable=W1701
def __del__(self):
self.close()
if not self._closing:
warnings.warn("%r not closed", self)
# pylint: enable=W1701
@ -1278,13 +1275,8 @@ class TCPPuller:
# pylint: disable=W1701
def __del__(self):
try:
self.close()
except TypeError:
# This is raised when Python's GC has collected objects which
# would be needed when calling self.close()
pass
if not self._closing:
warnings.warn("%r not closed", self)
# pylint: enable=W1701
def __enter__(self):
@ -1304,7 +1296,6 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
backlog = 128
async_methods = [
"publish",
# "close",
]
close_methods = [
"close",
@ -1313,39 +1304,12 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
def __init__(self, opts, **kwargs):
self.opts = opts
self.pub_sock = None
# Set up Salt IPC server
# if self.opts.get("ipc_mode", "") == "tcp":
# self.pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514))
# else:
# self.pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
# interface = self.opts.get("interface", "127.0.0.1")
# self.publish_port = self.opts.get("publish_port", 4560)
# self.pub_uri = f"tcp://{interface}:{self.publish_port}"
self.pub_host = kwargs.get("pub_host", None)
self.pub_port = kwargs.get("pub_port", None)
self.pub_path = kwargs.get("pub_path", None)
# if pub_path:
# self.pub_path = pub_path
# self.pub_uri = f"ipc://{pub_path}"
# else:
# self.pub_uri = f"tcp://{pub_host}:{pub_port}"
# self.publish_port = self.opts.get("publish_port", 4560)
self.pull_host = kwargs.get("pull_host", None)
self.pull_port = kwargs.get("pull_port", None)
self.pull_path = kwargs.get("pull_path", None)
# if pull_path:
# self.pull_uri = f"ipc://{pull_path}"
# else:
# self.pull_uri = f"tcp://{pub_host}:{pub_port}"
# log.error(
# "TCPPubServer %r %s %s",
# self,
# self.pull_uri,
# #self.publish_port,
# self.pub_uri,
# )
@property
def topic_support(self):
@ -1454,21 +1418,7 @@ class TCPPublishServer(salt.transport.base.DaemonizedPublishServer):
Publish "load" to minions
"""
if not self.pub_sock:
log.error("CONNECT")
self.connect()
# if self.opts.get("ipc_mode", "") == "tcp":
# pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514))
# else:
# pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
# if not self.pub_sock:
# self.pub_sock = salt.utils.asynchronous.SyncWrapper(
# salt.transport.ipc.IPCMessageClient,
# (pull_uri,),
# loop_kwarg="io_loop",
# )
# self.pub_sock.connect()
# await self.pub_sock.send(payload)
log.error("publish payload")
self.pub_sock.send(payload)
def close(self):
@ -1635,13 +1585,8 @@ class _TCPPubServerPublisher:
# pylint: disable=W1701
def __del__(self):
try:
self.close()
except TypeError:
# This is raised when Python's GC has collected objects which
# would be needed when calling self.close()
pass
if not self._closing:
warnings.warn("%r not closed", self)
# pylint: enable=W1701
def __enter__(self):

View file

@ -1110,7 +1110,7 @@ class RequestClient(salt.transport.base.RequestClient):
ret = await self.socket.recv()
return salt.payload.loads(ret)
async def send(self, message, timeout=None, callback=None):
async def send(self, message, timeout=None):
"""
Return a future which will be completed when the message has a response
"""
@ -1118,10 +1118,7 @@ class RequestClient(salt.transport.base.RequestClient):
await self.connect()
await self.sending.acquire()
try:
response = await asyncio.wait_for(self._send_recv(message), timeout=timeout)
if callback:
callback(response)
return response
return await asyncio.wait_for(self._send_recv(message), timeout=timeout)
except TimeoutError:
self.close()
except Exception:

View file

@ -8,6 +8,7 @@
Test support helpers
"""
import asyncio
import base64
import builtins
import errno
@ -1430,7 +1431,7 @@ class Webserver:
Threading target which stands up the tornado application
"""
self.ioloop = tornado.ioloop.IOLoop()
self.ioloop.make_current()
asyncio.set_event_loop(self.ioloop.asyncio_loop)
if self.handler == tornado.web.StaticFileHandler:
self.application = tornado.web.Application(
[(r"/(.*)", self.handler, {"path": self.root})]
@ -1445,7 +1446,10 @@ class Webserver:
if self.port is None:
return False
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
return sock.connect_ex(("127.0.0.1", self.port)) == 0
try:
return sock.connect_ex(("127.0.0.1", self.port)) == 0
finally:
sock.close()
def url(self, path):
"""