Merge pull request #34683 from cachedout/issue_34215

Fix publisher leak
This commit is contained in:
Mike Place 2016-07-20 07:57:10 -06:00 committed by GitHub
commit 6ca9ffa7c7
4 changed files with 72 additions and 6 deletions

View file

@ -248,6 +248,14 @@ CLI option, only sets this to a single file for all salt commands.
# ZMQ high-water-mark for EventPublisher pub socket
#event_publisher_pub_hwm: 10000
# The master may allocate memory per-event and not
# reclaim it.
# To set a high-water mark for memory allocation, use
# ipc_write_buffer to set a high-water mark for message
# buffering.
# Value: In bytes. Set to 'dynamic' to have Salt select
# a value for you. Default is disabled.
# ipc_write_buffer: 'dynamic'
##### Security settings #####

View file

@ -42,6 +42,13 @@ import salt.utils.sdb
from salt.utils.locales import sdecode
import salt.defaults.exitcodes
try:
import psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
import salt.grains.core
log = logging.getLogger(__name__)
_DFLT_LOG_DATEFMT = '%H:%M:%S'
@ -61,6 +68,31 @@ else:
_DFLT_IPC_MODE = 'ipc'
_MASTER_TRIES = 1
def _gather_buffer_space():
'''
Gather some system data and then calculate
buffer space.
Result is in bytes.
'''
if HAS_PSUTIL:
# Oh good, we have psutil. This will be quick.
total_mem = psutil.virtual_memory().total
else:
# We need to load up some grains. This will be slow.
os_data = salt.grains.core.os_data()
grains = salt.grains.core._memdata(os_data)
total_mem = grains['mem_total']
# Return the higher number between 5% of the system memory and 100MB
return max([total_mem * 0.05, 10 << 20])
# For the time being this will be a fixed calculation
# TODO: Allow user configuration
_DFLT_IPC_WBUFFER = _gather_buffer_space() * .5
# TODO: Reserved for future use
_DFLT_IPC_RBUFFER = _gather_buffer_space() * .5
FLO_DIR = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'daemons', 'flo')
@ -445,6 +477,10 @@ VALID_OPTS = {
# ZMQ HWM for EventPublisher pub socket
'event_publisher_pub_hwm': int,
# IPC buffer size
# Refs https://github.com/saltstack/salt/issues/34215
'ipc_write_buffer': int,
# The number of MWorker processes for a master to startup. This number needs to scale up as
# the number of connected minions increases.
'worker_threads': int,
@ -934,6 +970,7 @@ DEFAULT_MINION_OPTS = {
'mine_return_job': False,
'mine_interval': 60,
'ipc_mode': _DFLT_IPC_MODE,
'ipc_write_buffer': _DFLT_IPC_WBUFFER,
'ipv6': False,
'file_buffer_size': 262144,
'tcp_pub_port': 4510,
@ -1177,6 +1214,7 @@ DEFAULT_MASTER_OPTS = {
'minion_data_cache': True,
'enforce_mine_cache': False,
'ipc_mode': _DFLT_IPC_MODE,
'ipc_write_buffer': _DFLT_IPC_WBUFFER,
'ipv6': False,
'tcp_master_pub_port': 4512,
'tcp_master_pull_port': 4513,
@ -2955,6 +2993,11 @@ def apply_minion_config(overrides=None,
if 'beacons' not in opts:
opts['beacons'] = {}
if overrides.get('ipc_write_buffer', '') == 'dynamic':
opts['ipc_write_buffer'] = _DFLT_IPC_WBUFFER
if 'ipc_write_buffer' not in overrides:
opts['ipc_write_buffer'] = 0
# if there is no schedule option yet, add an empty scheduler
if 'schedule' not in opts:
opts['schedule'] = {}
@ -3029,7 +3072,10 @@ def apply_master_config(overrides=None, defaults=None):
)
opts['token_dir'] = os.path.join(opts['cachedir'], 'tokens')
opts['syndic_dir'] = os.path.join(opts['cachedir'], 'syndics')
if overrides.get('ipc_write_buffer', '') == 'dynamic':
opts['ipc_write_buffer'] = _DFLT_IPC_WBUFFER
if 'ipc_write_buffer' not in overrides:
opts['ipc_write_buffer'] = 0
using_ip_for_id = False
append_master = False
if not opts.get('id'):

View file

@ -431,9 +431,10 @@ class IPCMessagePublisher(object):
A Tornado IPC Publisher similar to Tornado's TCPServer class
but using either UNIX domain sockets or TCP sockets
'''
def __init__(self, socket_path, io_loop=None):
def __init__(self, opts, socket_path, io_loop=None):
'''
Create a new Tornado IPC server
:param dict opts: Salt options
:param str/int socket_path: Path on the filesystem for the
socket to bind to. This socket does
not need to exist prior to calling
@ -444,6 +445,7 @@ class IPCMessagePublisher(object):
for a tcp localhost connection.
:param IOLoop io_loop: A Tornado ioloop to handle scheduling
'''
self.opts = opts
self.socket_path = socket_path
self._started = False
@ -506,10 +508,18 @@ class IPCMessagePublisher(object):
def handle_connection(self, connection, address):
log.trace('IPCServer: Handling connection to address: {0}'.format(address))
try:
stream = IOStream(
connection,
io_loop=self.io_loop,
)
if self.opts['ipc_write_buffer'] > 0:
log.trace('Setting IPC connection write buffer: {0}'.format((self.opts['ipc_write_buffer'])))
stream = IOStream(
connection,
io_loop=self.io_loop,
max_write_buffer_size=self.opts['ipc_write_buffer']
)
else:
stream = IOStream(
connection,
io_loop=self.io_loop
)
self.streams.add(stream)
except Exception as exc:
log.error('IPC streaming error: {0}'.format(exc))

View file

@ -865,6 +865,7 @@ class AsyncEventPublisher(object):
raise
self.publisher = salt.transport.ipc.IPCMessagePublisher(
self.opts,
epub_uri,
io_loop=self.io_loop
)
@ -953,6 +954,7 @@ class EventPublisher(salt.utils.process.SignalHandlingMultiprocessingProcess):
)
self.publisher = salt.transport.ipc.IPCMessagePublisher(
self.opts,
epub_uri,
io_loop=self.io_loop
)