mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #34683 from cachedout/issue_34215
Fix publisher leak
This commit is contained in:
commit
6ca9ffa7c7
4 changed files with 72 additions and 6 deletions
|
@ -248,6 +248,14 @@ CLI option, only sets this to a single file for all salt commands.
|
|||
# ZMQ high-water-mark for EventPublisher pub socket
|
||||
#event_publisher_pub_hwm: 10000
|
||||
|
||||
# The master may allocate memory per-event and not
|
||||
# reclaim it.
|
||||
# To set a high-water mark for memory allocation, use
|
||||
# ipc_write_buffer to set a high-water mark for message
|
||||
# buffering.
|
||||
# Value: In bytes. Set to 'dynamic' to have Salt select
|
||||
# a value for you. Default is disabled.
|
||||
# ipc_write_buffer: 'dynamic'
|
||||
|
||||
|
||||
##### Security settings #####
|
||||
|
|
|
@ -42,6 +42,13 @@ import salt.utils.sdb
|
|||
from salt.utils.locales import sdecode
|
||||
import salt.defaults.exitcodes
|
||||
|
||||
try:
|
||||
import psutil
|
||||
HAS_PSUTIL = True
|
||||
except ImportError:
|
||||
HAS_PSUTIL = False
|
||||
import salt.grains.core
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_DFLT_LOG_DATEFMT = '%H:%M:%S'
|
||||
|
@ -61,6 +68,31 @@ else:
|
|||
_DFLT_IPC_MODE = 'ipc'
|
||||
_MASTER_TRIES = 1
|
||||
|
||||
|
||||
def _gather_buffer_space():
|
||||
'''
|
||||
Gather some system data and then calculate
|
||||
buffer space.
|
||||
|
||||
Result is in bytes.
|
||||
'''
|
||||
if HAS_PSUTIL:
|
||||
# Oh good, we have psutil. This will be quick.
|
||||
total_mem = psutil.virtual_memory().total
|
||||
else:
|
||||
# We need to load up some grains. This will be slow.
|
||||
os_data = salt.grains.core.os_data()
|
||||
grains = salt.grains.core._memdata(os_data)
|
||||
total_mem = grains['mem_total']
|
||||
# Return the higher number between 5% of the system memory and 100MB
|
||||
return max([total_mem * 0.05, 10 << 20])
|
||||
|
||||
# For the time being this will be a fixed calculation
|
||||
# TODO: Allow user configuration
|
||||
_DFLT_IPC_WBUFFER = _gather_buffer_space() * .5
|
||||
# TODO: Reserved for future use
|
||||
_DFLT_IPC_RBUFFER = _gather_buffer_space() * .5
|
||||
|
||||
FLO_DIR = os.path.join(
|
||||
os.path.dirname(os.path.dirname(__file__)),
|
||||
'daemons', 'flo')
|
||||
|
@ -445,6 +477,10 @@ VALID_OPTS = {
|
|||
# ZMQ HWM for EventPublisher pub socket
|
||||
'event_publisher_pub_hwm': int,
|
||||
|
||||
# IPC buffer size
|
||||
# Refs https://github.com/saltstack/salt/issues/34215
|
||||
'ipc_write_buffer': int,
|
||||
|
||||
# The number of MWorker processes for a master to startup. This number needs to scale up as
|
||||
# the number of connected minions increases.
|
||||
'worker_threads': int,
|
||||
|
@ -934,6 +970,7 @@ DEFAULT_MINION_OPTS = {
|
|||
'mine_return_job': False,
|
||||
'mine_interval': 60,
|
||||
'ipc_mode': _DFLT_IPC_MODE,
|
||||
'ipc_write_buffer': _DFLT_IPC_WBUFFER,
|
||||
'ipv6': False,
|
||||
'file_buffer_size': 262144,
|
||||
'tcp_pub_port': 4510,
|
||||
|
@ -1177,6 +1214,7 @@ DEFAULT_MASTER_OPTS = {
|
|||
'minion_data_cache': True,
|
||||
'enforce_mine_cache': False,
|
||||
'ipc_mode': _DFLT_IPC_MODE,
|
||||
'ipc_write_buffer': _DFLT_IPC_WBUFFER,
|
||||
'ipv6': False,
|
||||
'tcp_master_pub_port': 4512,
|
||||
'tcp_master_pull_port': 4513,
|
||||
|
@ -2955,6 +2993,11 @@ def apply_minion_config(overrides=None,
|
|||
if 'beacons' not in opts:
|
||||
opts['beacons'] = {}
|
||||
|
||||
if overrides.get('ipc_write_buffer', '') == 'dynamic':
|
||||
opts['ipc_write_buffer'] = _DFLT_IPC_WBUFFER
|
||||
if 'ipc_write_buffer' not in overrides:
|
||||
opts['ipc_write_buffer'] = 0
|
||||
|
||||
# if there is no schedule option yet, add an empty scheduler
|
||||
if 'schedule' not in opts:
|
||||
opts['schedule'] = {}
|
||||
|
@ -3029,7 +3072,10 @@ def apply_master_config(overrides=None, defaults=None):
|
|||
)
|
||||
opts['token_dir'] = os.path.join(opts['cachedir'], 'tokens')
|
||||
opts['syndic_dir'] = os.path.join(opts['cachedir'], 'syndics')
|
||||
|
||||
if overrides.get('ipc_write_buffer', '') == 'dynamic':
|
||||
opts['ipc_write_buffer'] = _DFLT_IPC_WBUFFER
|
||||
if 'ipc_write_buffer' not in overrides:
|
||||
opts['ipc_write_buffer'] = 0
|
||||
using_ip_for_id = False
|
||||
append_master = False
|
||||
if not opts.get('id'):
|
||||
|
|
|
@ -431,9 +431,10 @@ class IPCMessagePublisher(object):
|
|||
A Tornado IPC Publisher similar to Tornado's TCPServer class
|
||||
but using either UNIX domain sockets or TCP sockets
|
||||
'''
|
||||
def __init__(self, socket_path, io_loop=None):
|
||||
def __init__(self, opts, socket_path, io_loop=None):
|
||||
'''
|
||||
Create a new Tornado IPC server
|
||||
:param dict opts: Salt options
|
||||
:param str/int socket_path: Path on the filesystem for the
|
||||
socket to bind to. This socket does
|
||||
not need to exist prior to calling
|
||||
|
@ -444,6 +445,7 @@ class IPCMessagePublisher(object):
|
|||
for a tcp localhost connection.
|
||||
:param IOLoop io_loop: A Tornado ioloop to handle scheduling
|
||||
'''
|
||||
self.opts = opts
|
||||
self.socket_path = socket_path
|
||||
self._started = False
|
||||
|
||||
|
@ -506,10 +508,18 @@ class IPCMessagePublisher(object):
|
|||
def handle_connection(self, connection, address):
|
||||
log.trace('IPCServer: Handling connection to address: {0}'.format(address))
|
||||
try:
|
||||
stream = IOStream(
|
||||
connection,
|
||||
io_loop=self.io_loop,
|
||||
)
|
||||
if self.opts['ipc_write_buffer'] > 0:
|
||||
log.trace('Setting IPC connection write buffer: {0}'.format((self.opts['ipc_write_buffer'])))
|
||||
stream = IOStream(
|
||||
connection,
|
||||
io_loop=self.io_loop,
|
||||
max_write_buffer_size=self.opts['ipc_write_buffer']
|
||||
)
|
||||
else:
|
||||
stream = IOStream(
|
||||
connection,
|
||||
io_loop=self.io_loop
|
||||
)
|
||||
self.streams.add(stream)
|
||||
except Exception as exc:
|
||||
log.error('IPC streaming error: {0}'.format(exc))
|
||||
|
|
|
@ -865,6 +865,7 @@ class AsyncEventPublisher(object):
|
|||
raise
|
||||
|
||||
self.publisher = salt.transport.ipc.IPCMessagePublisher(
|
||||
self.opts,
|
||||
epub_uri,
|
||||
io_loop=self.io_loop
|
||||
)
|
||||
|
@ -953,6 +954,7 @@ class EventPublisher(salt.utils.process.SignalHandlingMultiprocessingProcess):
|
|||
)
|
||||
|
||||
self.publisher = salt.transport.ipc.IPCMessagePublisher(
|
||||
self.opts,
|
||||
epub_uri,
|
||||
io_loop=self.io_loop
|
||||
)
|
||||
|
|
Loading…
Add table
Reference in a new issue