Merge pull request #47279 from dwoz/py3_build_fix

Gracefully shutdown worker threads
This commit is contained in:
Mike Place 2018-04-25 16:15:42 -05:00 committed by GitHub
commit e0765f5719
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -32,6 +32,7 @@ import salt.transport.client
import salt.transport.server
import salt.transport.mixins.auth
import salt.ext.six as six
from salt.ext.six.moves import queue # pylint: disable=import-error
from salt.exceptions import SaltReqTimeoutError, SaltClientError
from salt.transport import iter_transport_opts
@ -556,6 +557,11 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
raise exc
self._socket.close()
self._socket = None
if hasattr(self.req_server, 'stop'):
try:
self.req_server.stop()
except Exception as exc:
log.debug("TCPReqServerChannel close generated an exception: %s", str(exc))
def __del__(self):
self.close()
@ -742,15 +748,23 @@ if USE_LOAD_BALANCER:
super(LoadBalancerWorker, self).__init__(
message_handler, *args, **kwargs)
self.socket_queue = socket_queue
self._stop = threading.Event()
self.thread = threading.Thread(target=self.socket_queue_thread)
self.thread.start()
t = threading.Thread(target=self.socket_queue_thread)
t.start()
def stop(self):
self._stop.set()
self.thread.join()
def socket_queue_thread(self):
try:
while True:
client_socket, address = self.socket_queue.get(True, None)
try:
client_socket, address = self.socket_queue.get(True, 1)
except queue.Empty:
if self._stop.is_set():
break
continue
# 'self.io_loop' initialized in super class
# 'tornado.tcpserver.TCPServer'.
# 'self._handle_connection' defined in same super class.