Merge pull request #28646 from rallytime/bp-28614

Back-port #28614 to 2015.8
This commit is contained in:
Mike Place 2015-11-06 11:19:08 -07:00
commit 7531bc7334
4 changed files with 58 additions and 15 deletions

View file

@ -43,9 +43,9 @@ class IPCServer(object):
self.payload_handler = payload_handler
# Placeholders for attributes to be populated by method calls
self.stream = None
self.sock = None
self.io_loop = io_loop or IOLoop.current()
self._closing = False
def start(self):
'''
@ -125,8 +125,9 @@ class IPCServer(object):
Sockets and filehandles should be closed explicitly, to prevent
leaks.
'''
if hasattr(self.stream, 'close'):
self.stream.close()
if self._closing:
return
self._closing = True
if hasattr(self.sock, 'close'):
self.sock.close()
@ -238,6 +239,8 @@ class IPCClient(object):
Sockets and filehandles should be closed explicitly, to prevent
leaks.
'''
if self._closing:
return
self._closing = True
if hasattr(self, 'stream'):
self.stream.close()

View file

@ -113,11 +113,17 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
parse = urlparse.urlparse(self.opts['master_uri'])
host, port = parse.netloc.rsplit(':', 1)
self.master_addr = (host, int(port))
self._closing = False
self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop, resolver=resolver)
def close(self):
if self._closing:
return
self._closing = True
self.message_client.close()
def __del__(self):
self.message_client.destroy()
self.close()
def _package_load(self, load):
return {
@ -194,10 +200,17 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
self.io_loop = kwargs['io_loop'] or tornado.ioloop.IOLoop.current()
self.connected = False
self._closing = False
def close(self):
if self._closing:
return
self._closing = True
if hasattr(self, 'message_client'):
self.message_client.close()
def __del__(self):
if hasattr(self, 'message_client'):
self.message_client.destroy()
self.close()
@tornado.gen.coroutine
def connect(self):
@ -391,20 +404,22 @@ class SaltMessageClient(object):
self.send_future_map = {} # mapping of request_id -> Future
self.send_timeout_map = {} # request_id -> timeout_callback
self._read_until_future = None
self._on_recv = None
self._closing = False
self._connecting_future = self.connect()
self.io_loop.spawn_callback(self._stream_return)
self._on_recv = None
self._closing = False
# TODO: timeout inflight sessions
def destroy(self):
def close(self):
if self._closing:
return
self._closing = True
if hasattr(self, '_stream') and not self._stream.closed():
self._stream.close()
def __del__(self):
self.destroy()
self.close()
def connect(self, callback=None):
'''
@ -444,9 +459,11 @@ class SaltMessageClient(object):
@tornado.gen.coroutine
def _stream_return(self):
while not self._connecting_future.done() or self._connecting_future.result() is not True:
while not self._closing and (
not self._connecting_future.done() or
self._connecting_future.result() is not True):
yield self._connecting_future
while True:
while not self._closing:
try:
framed_msg_len = yield self._stream.read_until(' ')
framed_msg_raw = yield self._stream.read_bytes(int(framed_msg_len.strip()))
@ -468,6 +485,8 @@ class SaltMessageClient(object):
for future in self.send_future_map.itervalues():
future.set_exception(e)
self.send_future_map = {}
if self._closing:
return
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
@ -477,6 +496,8 @@ class SaltMessageClient(object):
for future in self.send_future_map.itervalues():
future.set_exception(e)
self.send_future_map = {}
if self._closing:
return
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
@ -494,6 +515,8 @@ class SaltMessageClient(object):
except tornado.iostream.StreamClosedError as e:
self.send_future_map.pop(message_id).set_exception(Exception())
self.remove_message_timeout(message_id)
if self._closing:
return
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()

View file

@ -399,6 +399,11 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel):
def __init__(self, opts):
salt.transport.server.ReqServerChannel.__init__(self, opts)
self._closing = False
def zmq_device(self):
'''
Multiprocessing target for the zmq queue device
@ -446,6 +451,9 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
'''
Cleanly shutdown the router socket
'''
if self._closing:
return
self._closing = True
if hasattr(self, '_monitor') and self._monitor is not None:
self._monitor.stop()
self._monitor = None

View file

@ -86,6 +86,15 @@ class SyncWrapper(object):
'''
On deletion of the async wrapper, make sure to clean up the async stuff
'''
self.io_loop.close()
if hasattr(self, 'async'):
if hasattr(self.async, 'close'):
# Certain things such as streams should be closed before
# their associated io_loop is closed to allow for proper
# cleanup.
self.async.close()
self.io_loop.close()
# Other things should be deallocated after the io_loop closes.
# See Issue #26889.
del self.async
else:
self.io_loop.close()