Fixed memory leak in AsyncTCPReqChannel

`cls.instance_map` was growing unbounded. This was because
`io_loop.close()` was invoked before `stream.close()`. The stategy used
to fix this issue was discussed in pull request #28530 and can be
summarized as follows:

The `close()` method on the async object will be called if it exists.
This will be called before the `io_loop.close()` to give the async object
the opportunity to close its stream. The del of the async object will
continue to happen after io_loop.close(). This is to maintain the fix
from pull request #27343.

Per file change details:
salt/utils/async.py:
- Implemented the close logic described above.

salt/transport/ipc.py:
- Ensured that `close()` may be invoked more than once safely.
- In IPCServer, `self.stream` was never actually used. Removed
references to it to avoid confusion.

salt/transport/zeromq.py:
- Ensured that `close()` may be invoked more than once safely.

salt/transport/tcp.py:
- Ensured that `close()` may be invoked more than once safely.
- Changed `destroy()` methods to `close()` methods since they involved
a stream that should be closed before `io_loop.close()`.
- In SaltMessageClient, added more checks for `self._closing`. This
makes exit cleaner by not attempting to reconnect while closing.
- Removed the code associate with pull request #28113. This code is
no longer needed now that the stream is closed before the io_loop.

Signed-off-by: Sergey Kizunov <sergey.kizunov@ni.com>
This commit is contained in:
Sergey Kizunov 2015-11-05 11:07:10 -06:00 committed by rallytime
parent cf79722260
commit 034cf28e57
4 changed files with 58 additions and 15 deletions

View file

@ -43,9 +43,9 @@ class IPCServer(object):
self.payload_handler = payload_handler
# Placeholders for attributes to be populated by method calls
self.stream = None
self.sock = None
self.io_loop = io_loop or IOLoop.current()
self._closing = False
def start(self):
'''
@ -125,8 +125,9 @@ class IPCServer(object):
Sockets and filehandles should be closed explicitly, to prevent
leaks.
'''
if hasattr(self.stream, 'close'):
self.stream.close()
if self._closing:
return
self._closing = True
if hasattr(self.sock, 'close'):
self.sock.close()
@ -238,6 +239,8 @@ class IPCClient(object):
Sockets and filehandles should be closed explicitly, to prevent
leaks.
'''
if self._closing:
return
self._closing = True
if hasattr(self, 'stream'):
self.stream.close()

View file

@ -113,11 +113,17 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
parse = urlparse.urlparse(self.opts['master_uri'])
host, port = parse.netloc.rsplit(':', 1)
self.master_addr = (host, int(port))
self._closing = False
self.message_client = SaltMessageClient(host, int(port), io_loop=self.io_loop, resolver=resolver)
def close(self):
if self._closing:
return
self._closing = True
self.message_client.close()
def __del__(self):
self.message_client.destroy()
self.close()
def _package_load(self, load):
return {
@ -194,10 +200,17 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
self.io_loop = kwargs['io_loop'] or tornado.ioloop.IOLoop.current()
self.connected = False
self._closing = False
def close(self):
if self._closing:
return
self._closing = True
if hasattr(self, 'message_client'):
self.message_client.close()
def __del__(self):
if hasattr(self, 'message_client'):
self.message_client.destroy()
self.close()
@tornado.gen.coroutine
def connect(self):
@ -391,20 +404,22 @@ class SaltMessageClient(object):
self.send_future_map = {} # mapping of request_id -> Future
self.send_timeout_map = {} # request_id -> timeout_callback
self._read_until_future = None
self._on_recv = None
self._closing = False
self._connecting_future = self.connect()
self.io_loop.spawn_callback(self._stream_return)
self._on_recv = None
self._closing = False
# TODO: timeout inflight sessions
def destroy(self):
def close(self):
if self._closing:
return
self._closing = True
if hasattr(self, '_stream') and not self._stream.closed():
self._stream.close()
def __del__(self):
self.destroy()
self.close()
def connect(self, callback=None):
'''
@ -444,9 +459,11 @@ class SaltMessageClient(object):
@tornado.gen.coroutine
def _stream_return(self):
while not self._connecting_future.done() or self._connecting_future.result() is not True:
while not self._closing and (
not self._connecting_future.done() or
self._connecting_future.result() is not True):
yield self._connecting_future
while True:
while not self._closing:
try:
framed_msg_len = yield self._stream.read_until(' ')
framed_msg_raw = yield self._stream.read_bytes(int(framed_msg_len.strip()))
@ -468,6 +485,8 @@ class SaltMessageClient(object):
for future in self.send_future_map.itervalues():
future.set_exception(e)
self.send_future_map = {}
if self._closing:
return
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
@ -477,6 +496,8 @@ class SaltMessageClient(object):
for future in self.send_future_map.itervalues():
future.set_exception(e)
self.send_future_map = {}
if self._closing:
return
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
@ -494,6 +515,8 @@ class SaltMessageClient(object):
except tornado.iostream.StreamClosedError as e:
self.send_future_map.pop(message_id).set_exception(Exception())
self.remove_message_timeout(message_id)
if self._closing:
return
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()

View file

@ -399,6 +399,11 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel):
def __init__(self, opts):
salt.transport.server.ReqServerChannel.__init__(self, opts)
self._closing = False
def zmq_device(self):
'''
Multiprocessing target for the zmq queue device
@ -446,6 +451,9 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
'''
Cleanly shutdown the router socket
'''
if self._closing:
return
self._closing = True
if hasattr(self, '_monitor') and self._monitor is not None:
self._monitor.stop()
self._monitor = None

View file

@ -86,6 +86,15 @@ class SyncWrapper(object):
'''
On deletion of the async wrapper, make sure to clean up the async stuff
'''
self.io_loop.close()
if hasattr(self, 'async'):
if hasattr(self.async, 'close'):
# Certain things such as streams should be closed before
# their associated io_loop is closed to allow for proper
# cleanup.
self.async.close()
self.io_loop.close()
# Other things should be deallocated after the io_loop closes.
# See Issue #26889.
del self.async
else:
self.io_loop.close()