Gracefully shutdown worker threads

This commit is contained in:
Daniel A Wozniak 2018-04-24 17:19:28 +00:00 committed by Daniel A. Wozniak
parent 10e30515dc
commit fe340778fa
No known key found for this signature in database
GPG key ID: 166B9D2C06C82D61

View file

@ -12,6 +12,7 @@ import logging
import msgpack
import socket
import os
import queue
import weakref
import time
import traceback
@ -544,6 +545,8 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
return self._socket
def close(self):
if hasattr(self.req_server, 'stop'):
self.req_server.stop()
if self._socket is not None:
try:
self._socket.shutdown(socket.SHUT_RDWR)
@ -742,15 +745,23 @@ if USE_LOAD_BALANCER:
super(LoadBalancerWorker, self).__init__(
message_handler, *args, **kwargs)
self.socket_queue = socket_queue
self._stop = threading.Event()
self.thread = threading.Thread(target=self.socket_queue_thread)
self.thread.start()
t = threading.Thread(target=self.socket_queue_thread)
t.start()
def stop(self):
self._stop.set()
self.thread.join()
def socket_queue_thread(self):
try:
while True:
client_socket, address = self.socket_queue.get(True, None)
try:
client_socket, address = self.socket_queue.get(True, 1)
except queue.Empty:
if self._stop.is_set():
break
continue
# 'self.io_loop' initialized in super class
# 'tornado.tcpserver.TCPServer'.
# 'self._handle_connection' defined in same super class.