mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Gracefully shutdown worker threads
This commit is contained in:
parent
10e30515dc
commit
fe340778fa
1 changed files with 15 additions and 4 deletions
|
@ -12,6 +12,7 @@ import logging
|
|||
import msgpack
|
||||
import socket
|
||||
import os
|
||||
import queue
|
||||
import weakref
|
||||
import time
|
||||
import traceback
|
||||
|
@ -544,6 +545,8 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
|
|||
return self._socket
|
||||
|
||||
def close(self):
|
||||
if hasattr(self.req_server, 'stop'):
|
||||
self.req_server.stop()
|
||||
if self._socket is not None:
|
||||
try:
|
||||
self._socket.shutdown(socket.SHUT_RDWR)
|
||||
|
@ -742,15 +745,23 @@ if USE_LOAD_BALANCER:
|
|||
super(LoadBalancerWorker, self).__init__(
|
||||
message_handler, *args, **kwargs)
|
||||
self.socket_queue = socket_queue
|
||||
self._stop = threading.Event()
|
||||
self.thread = threading.Thread(target=self.socket_queue_thread)
|
||||
self.thread.start()
|
||||
|
||||
t = threading.Thread(target=self.socket_queue_thread)
|
||||
t.start()
|
||||
def stop(self):
|
||||
self._stop.set()
|
||||
self.thread.join()
|
||||
|
||||
def socket_queue_thread(self):
|
||||
try:
|
||||
while True:
|
||||
client_socket, address = self.socket_queue.get(True, None)
|
||||
|
||||
try:
|
||||
client_socket, address = self.socket_queue.get(True, 1)
|
||||
except queue.Empty:
|
||||
if self._stop.is_set():
|
||||
break
|
||||
continue
|
||||
# 'self.io_loop' initialized in super class
|
||||
# 'tornado.tcpserver.TCPServer'.
|
||||
# 'self._handle_connection' defined in same super class.
|
||||
|
|
Loading…
Add table
Reference in a new issue