mirror of
https://github.com/saltstack/salt.git
synced 2025-04-16 09:40:20 +00:00
Warn on un-closed tranport clients
This commit is contained in:
parent
b06444317f
commit
dcc9976d9b
3 changed files with 41 additions and 18 deletions
|
@ -1,3 +1,6 @@
|
|||
import traceback
|
||||
import warnings
|
||||
|
||||
import salt.ext.tornado.gen
|
||||
|
||||
TRANSPORTS = (
|
||||
|
@ -94,14 +97,32 @@ def publish_client(opts, io_loop):
|
|||
raise Exception("Transport type not found: {}".format(ttype))
|
||||
|
||||
|
||||
class RequestClient:
|
||||
class Transport:
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._trace = "\n".join(traceback.format_stack()[:-1])
|
||||
if not hasattr(self, "_closing"):
|
||||
self._closing = False
|
||||
|
||||
# pylint: disable=W1701
|
||||
def __del__(self):
|
||||
if not self._closing:
|
||||
warnings.warn(
|
||||
f"Unclosed transport {self!r} \n{self._trace}",
|
||||
ResourceWarning,
|
||||
source=self,
|
||||
)
|
||||
|
||||
# pylint: enable=W1701
|
||||
|
||||
|
||||
class RequestClient(Transport):
|
||||
"""
|
||||
The RequestClient transport is used to make requests and get corresponding
|
||||
replies from the RequestServer.
|
||||
"""
|
||||
|
||||
def __init__(self, opts, io_loop, **kwargs):
|
||||
pass
|
||||
super().__init__()
|
||||
|
||||
@salt.ext.tornado.gen.coroutine
|
||||
def send(self, load, timeout=60):
|
||||
|
@ -197,13 +218,13 @@ class DaemonizedPublishServer(PublishServer):
|
|||
raise NotImplementedError
|
||||
|
||||
|
||||
class PublishClient:
|
||||
class PublishClient(Transport):
|
||||
"""
|
||||
The PublishClient receives messages from the PublishServer and runs a callback.
|
||||
"""
|
||||
|
||||
def __init__(self, opts, io_loop, **kwargs):
|
||||
pass
|
||||
super().__init__()
|
||||
|
||||
def on_recv(self, callback):
|
||||
"""
|
||||
|
|
|
@ -213,6 +213,7 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
ttype = "tcp"
|
||||
|
||||
def __init__(self, opts, io_loop, **kwargs): # pylint: disable=W0231
|
||||
super().__init__(opts, io_loop, **kwargs)
|
||||
self.opts = opts
|
||||
self.io_loop = io_loop
|
||||
self.message_client = None
|
||||
|
@ -228,12 +229,6 @@ class TCPPubClient(salt.transport.base.PublishClient):
|
|||
self.message_client.close()
|
||||
self.message_client = None
|
||||
|
||||
# pylint: disable=W1701
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
# pylint: enable=W1701
|
||||
|
||||
@salt.ext.tornado.gen.coroutine
|
||||
def connect(self, publish_port, connect_callback=None, disconnect_callback=None):
|
||||
self.publish_port = publish_port
|
||||
|
@ -1038,6 +1033,7 @@ class TCPReqClient(salt.transport.base.RequestClient):
|
|||
ttype = "tcp"
|
||||
|
||||
def __init__(self, opts, io_loop, **kwargs): # pylint: disable=W0231
|
||||
super().__init__(opts, io_loop, **kwargs)
|
||||
self.opts = opts
|
||||
self.io_loop = io_loop
|
||||
parse = urllib.parse.urlparse(self.opts["master_uri"])
|
||||
|
@ -1054,6 +1050,7 @@ class TCPReqClient(salt.transport.base.RequestClient):
|
|||
source_ip=opts.get("source_ip"),
|
||||
source_port=opts.get("source_ret_port"),
|
||||
)
|
||||
self._closing = False
|
||||
|
||||
@salt.ext.tornado.gen.coroutine
|
||||
def connect(self):
|
||||
|
@ -1065,4 +1062,7 @@ class TCPReqClient(salt.transport.base.RequestClient):
|
|||
raise salt.ext.tornado.gen.Return(ret)
|
||||
|
||||
def close(self):
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
self.message_client.close()
|
||||
|
|
|
@ -529,14 +529,8 @@ class AsyncReqMessageClient:
|
|||
# wire up sockets
|
||||
self._init_socket()
|
||||
|
||||
# TODO: timeout all in-flight sessions, or error
|
||||
def close(self):
|
||||
try:
|
||||
if self._closing:
|
||||
return
|
||||
except AttributeError:
|
||||
# We must have been called from __del__
|
||||
# The python interpreter has nuked most attributes already
|
||||
if self._closing:
|
||||
return
|
||||
else:
|
||||
self._closing = True
|
||||
|
@ -661,7 +655,10 @@ class ZeroMQSocketMonitor:
|
|||
def stop(self):
|
||||
if self._socket is None:
|
||||
return
|
||||
self._socket.disable_monitor()
|
||||
try:
|
||||
self._socket.disable_monitor()
|
||||
except zmq.Error:
|
||||
pass
|
||||
self._socket = None
|
||||
self._monitor_socket = None
|
||||
if self._monitor_stream is not None:
|
||||
|
@ -880,6 +877,7 @@ class RequestClient(salt.transport.base.RequestClient):
|
|||
ttype = "zeromq"
|
||||
|
||||
def __init__(self, opts, io_loop): # pylint: disable=W0231
|
||||
super().__init__(opts, io_loop)
|
||||
self.opts = opts
|
||||
master_uri = self.get_master_uri(opts)
|
||||
self.message_client = AsyncReqMessageClient(
|
||||
|
@ -887,6 +885,7 @@ class RequestClient(salt.transport.base.RequestClient):
|
|||
master_uri,
|
||||
io_loop=io_loop,
|
||||
)
|
||||
self._closing = False
|
||||
|
||||
def connect(self):
|
||||
self.message_client.connect()
|
||||
|
@ -898,6 +897,9 @@ class RequestClient(salt.transport.base.RequestClient):
|
|||
raise salt.ext.tornado.gen.Return(ret)
|
||||
|
||||
def close(self):
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
self.message_client.close()
|
||||
|
||||
@staticmethod
|
||||
|
|
Loading…
Add table
Reference in a new issue