mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #28646 from rallytime/bp-28614
Back-port #28614 to 2015.8
This commit is contained in:
commit
7531bc7334
4 changed files with 58 additions and 15 deletions
|
@ -43,9 +43,9 @@ class IPCServer(object):
|
|||
self.payload_handler = payload_handler
|
||||
|
||||
# Placeholders for attributes to be populated by method calls
|
||||
self.stream = None
|
||||
self.sock = None
|
||||
self.io_loop = io_loop or IOLoop.current()
|
||||
self._closing = False
|
||||
|
||||
def start(self):
|
||||
'''
|
||||
|
@ -125,8 +125,9 @@ class IPCServer(object):
|
|||
Sockets and filehandles should be closed explicitly, to prevent
|
||||
leaks.
|
||||
'''
|
||||
if hasattr(self.stream, 'close'):
|
||||
self.stream.close()
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
if hasattr(self.sock, 'close'):
|
||||
self.sock.close()
|
||||
|
||||
|
@ -238,6 +239,8 @@ class IPCClient(object):
|
|||
Sockets and filehandles should be closed explicitly, to prevent
|
||||
leaks.
|
||||
'''
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
if hasattr(self, 'stream'):
|
||||
self.stream.close()
|
||||
|
|
|
@ -113,11 +113,17 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
|
|||
parse = urlparse.urlparse(self.opts['master_uri'])
|
||||
host, port = parse.netloc.rsplit(':', 1)
|
||||
self.master_addr = (host, int(port))
|
||||
|
||||
self._closing = False
|
||||
self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop, resolver=resolver)
|
||||
|
||||
def close(self):
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
self.message_client.close()
|
||||
|
||||
def __del__(self):
|
||||
self.message_client.destroy()
|
||||
self.close()
|
||||
|
||||
def _package_load(self, load):
|
||||
return {
|
||||
|
@ -194,10 +200,17 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
|
|||
|
||||
self.io_loop = kwargs['io_loop'] or tornado.ioloop.IOLoop.current()
|
||||
self.connected = False
|
||||
self._closing = False
|
||||
|
||||
def close(self):
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
if hasattr(self, 'message_client'):
|
||||
self.message_client.close()
|
||||
|
||||
def __del__(self):
|
||||
if hasattr(self, 'message_client'):
|
||||
self.message_client.destroy()
|
||||
self.close()
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def connect(self):
|
||||
|
@ -391,20 +404,22 @@ class SaltMessageClient(object):
|
|||
self.send_future_map = {} # mapping of request_id -> Future
|
||||
self.send_timeout_map = {} # request_id -> timeout_callback
|
||||
|
||||
self._read_until_future = None
|
||||
self._on_recv = None
|
||||
self._closing = False
|
||||
self._connecting_future = self.connect()
|
||||
self.io_loop.spawn_callback(self._stream_return)
|
||||
|
||||
self._on_recv = None
|
||||
self._closing = False
|
||||
|
||||
# TODO: timeout inflight sessions
|
||||
def destroy(self):
|
||||
def close(self):
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
if hasattr(self, '_stream') and not self._stream.closed():
|
||||
self._stream.close()
|
||||
|
||||
def __del__(self):
|
||||
self.destroy()
|
||||
self.close()
|
||||
|
||||
def connect(self, callback=None):
|
||||
'''
|
||||
|
@ -444,9 +459,11 @@ class SaltMessageClient(object):
|
|||
|
||||
@tornado.gen.coroutine
|
||||
def _stream_return(self):
|
||||
while not self._connecting_future.done() or self._connecting_future.result() is not True:
|
||||
while not self._closing and (
|
||||
not self._connecting_future.done() or
|
||||
self._connecting_future.result() is not True):
|
||||
yield self._connecting_future
|
||||
while True:
|
||||
while not self._closing:
|
||||
try:
|
||||
framed_msg_len = yield self._stream.read_until(' ')
|
||||
framed_msg_raw = yield self._stream.read_bytes(int(framed_msg_len.strip()))
|
||||
|
@ -468,6 +485,8 @@ class SaltMessageClient(object):
|
|||
for future in self.send_future_map.itervalues():
|
||||
future.set_exception(e)
|
||||
self.send_future_map = {}
|
||||
if self._closing:
|
||||
return
|
||||
# if the last connect finished, then we need to make a new one
|
||||
if self._connecting_future.done():
|
||||
self._connecting_future = self.connect()
|
||||
|
@ -477,6 +496,8 @@ class SaltMessageClient(object):
|
|||
for future in self.send_future_map.itervalues():
|
||||
future.set_exception(e)
|
||||
self.send_future_map = {}
|
||||
if self._closing:
|
||||
return
|
||||
# if the last connect finished, then we need to make a new one
|
||||
if self._connecting_future.done():
|
||||
self._connecting_future = self.connect()
|
||||
|
@ -494,6 +515,8 @@ class SaltMessageClient(object):
|
|||
except tornado.iostream.StreamClosedError as e:
|
||||
self.send_future_map.pop(message_id).set_exception(Exception())
|
||||
self.remove_message_timeout(message_id)
|
||||
if self._closing:
|
||||
return
|
||||
# if the last connect finished, then we need to make a new one
|
||||
if self._connecting_future.done():
|
||||
self._connecting_future = self.connect()
|
||||
|
|
|
@ -399,6 +399,11 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
|
|||
|
||||
|
||||
class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel):
|
||||
|
||||
def __init__(self, opts):
|
||||
salt.transport.server.ReqServerChannel.__init__(self, opts)
|
||||
self._closing = False
|
||||
|
||||
def zmq_device(self):
|
||||
'''
|
||||
Multiprocessing target for the zmq queue device
|
||||
|
@ -446,6 +451,9 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
|
|||
'''
|
||||
Cleanly shutdown the router socket
|
||||
'''
|
||||
if self._closing:
|
||||
return
|
||||
self._closing = True
|
||||
if hasattr(self, '_monitor') and self._monitor is not None:
|
||||
self._monitor.stop()
|
||||
self._monitor = None
|
||||
|
|
|
@ -86,6 +86,15 @@ class SyncWrapper(object):
|
|||
'''
|
||||
On deletion of the async wrapper, make sure to clean up the async stuff
|
||||
'''
|
||||
self.io_loop.close()
|
||||
if hasattr(self, 'async'):
|
||||
if hasattr(self.async, 'close'):
|
||||
# Certain things such as streams should be closed before
|
||||
# their associated io_loop is closed to allow for proper
|
||||
# cleanup.
|
||||
self.async.close()
|
||||
self.io_loop.close()
|
||||
# Other things should be deallocated after the io_loop closes.
|
||||
# See Issue #26889.
|
||||
del self.async
|
||||
else:
|
||||
self.io_loop.close()
|
||||
|
|
Loading…
Add table
Reference in a new issue