mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #30343 from rallytime/fix-2015.8
Fix 2015.8 from incomplete back-port
This commit is contained in:
commit
6079a96e6e
4 changed files with 70 additions and 40 deletions
|
@ -811,3 +811,9 @@
|
|||
############################################
|
||||
# Which returner(s) will be used for minion's result:
|
||||
#return: mysql
|
||||
|
||||
|
||||
###### Miscellaneous settings ######
|
||||
############################################
|
||||
# Default match type for filtering events tags: startswith, endswith, find, regex, fnmatch
|
||||
#event_match_type: startswith
|
||||
|
|
|
@ -655,3 +655,9 @@
|
|||
############################################
|
||||
# Which returner(s) will be used for minion's result:
|
||||
#return: mysql
|
||||
|
||||
|
||||
###### Miscellaneous settings ######
|
||||
############################################
|
||||
# Default match type for filtering events tags: startswith, endswith, find, regex, fnmatch
|
||||
#event_match_type: startswith
|
||||
|
|
|
@ -387,6 +387,9 @@ VALID_OPTS = {
|
|||
# Events matching a tag in this list should never be sent to an event returner.
|
||||
'event_return_blacklist': list,
|
||||
|
||||
# default match type for filtering events tags: startswith, endswith, find, regex, fnmatch
|
||||
'event_match_type': str,
|
||||
|
||||
# This pidfile to write out to when a daemon starts
|
||||
'pidfile': str,
|
||||
|
||||
|
@ -781,7 +784,7 @@ DEFAULT_MINION_OPTS = {
|
|||
'user': 'root',
|
||||
'root_dir': salt.syspaths.ROOT_DIR,
|
||||
'pki_dir': os.path.join(salt.syspaths.CONFIG_DIR, 'pki', 'minion'),
|
||||
'id': None,
|
||||
'id': '',
|
||||
'cachedir': os.path.join(salt.syspaths.CACHE_DIR, 'minion'),
|
||||
'cache_jobs': False,
|
||||
'grains_cache': False,
|
||||
|
@ -966,6 +969,7 @@ DEFAULT_MINION_OPTS = {
|
|||
'salt_event_pub_hwm': 2000,
|
||||
# ZMQ HWM for EventPublisher pub socket - different for minion vs. master
|
||||
'event_publisher_pub_hwm': 1000,
|
||||
'event_match_type': 'startswith',
|
||||
}
|
||||
|
||||
DEFAULT_MASTER_OPTS = {
|
||||
|
@ -1115,6 +1119,7 @@ DEFAULT_MASTER_OPTS = {
|
|||
'event_return_queue': 0,
|
||||
'event_return_whitelist': [],
|
||||
'event_return_blacklist': [],
|
||||
'event_match_type': 'startswith',
|
||||
'serial': 'msgpack',
|
||||
'state_verbose': True,
|
||||
'state_output': 'full',
|
||||
|
@ -2742,7 +2747,7 @@ def apply_minion_config(overrides=None,
|
|||
|
||||
# No ID provided. Will getfqdn save us?
|
||||
using_ip_for_id = False
|
||||
if opts['id'] is None:
|
||||
if not opts.get('id'):
|
||||
opts['id'], using_ip_for_id = get_id(
|
||||
opts,
|
||||
cache_minion_id=cache_minion_id)
|
||||
|
@ -2859,7 +2864,7 @@ def apply_master_config(overrides=None, defaults=None):
|
|||
|
||||
using_ip_for_id = False
|
||||
append_master = False
|
||||
if opts.get('id') is None:
|
||||
if not opts.get('id'):
|
||||
opts['id'], using_ip_for_id = get_id(
|
||||
opts,
|
||||
cache_minion_id=None)
|
||||
|
|
|
@ -78,6 +78,7 @@ except ImportError:
|
|||
pass
|
||||
|
||||
# Import salt libs
|
||||
import salt.config
|
||||
import salt.payload
|
||||
import salt.loader
|
||||
import salt.utils
|
||||
|
@ -119,7 +120,7 @@ def get_event(node, sock_dir=None, transport='zeromq', opts=None, listen=True):
|
|||
'''
|
||||
Return an event object suitable for the named transport
|
||||
'''
|
||||
sock_dir = sock_dir or opts.get('sock_dir', None)
|
||||
sock_dir = sock_dir or opts['sock_dir']
|
||||
# TODO: AIO core is separate from transport
|
||||
if transport in ('zeromq', 'tcp'):
|
||||
if node == 'master':
|
||||
|
@ -179,13 +180,22 @@ class SaltEvent(object):
|
|||
self.poller = zmq.Poller()
|
||||
self.cpub = False
|
||||
self.cpush = False
|
||||
|
||||
if opts is None:
|
||||
opts = {}
|
||||
self.opts = opts
|
||||
if node == 'master':
|
||||
self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
|
||||
else:
|
||||
self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
self.opts.update(opts)
|
||||
|
||||
if sock_dir is None:
|
||||
sock_dir = opts.get('sock_dir', None)
|
||||
if salt.utils.is_windows() and 'ipc_mode' not in opts:
|
||||
opts['ipc_mode'] = 'tcp'
|
||||
sock_dir = self.opts['sock_dir']
|
||||
else:
|
||||
self.opts['sock_dir'] = sock_dir
|
||||
|
||||
if salt.utils.is_windows() and not hasattr(self.opts, 'ipc_mode'):
|
||||
self.opts['ipc_mode'] = 'tcp'
|
||||
self.puburi, self.pulluri = self.__load_uri(sock_dir, node)
|
||||
self.pending_tags = []
|
||||
self.pending_events = []
|
||||
|
@ -210,17 +220,13 @@ class SaltEvent(object):
|
|||
Return the string URI for the location of the pull and pub sockets to
|
||||
use for firing and listening to events
|
||||
'''
|
||||
hash_type = getattr(hashlib, self.opts.get('hash_type', 'md5'))
|
||||
# Only use the first 10 chars to keep longer hashes from exceeding the
|
||||
# max socket path length.
|
||||
id_hash = hash_type(salt.utils.to_bytes(self.opts.get('id', ''))).hexdigest()[:10]
|
||||
if node == 'master':
|
||||
if self.opts.get('ipc_mode', '') == 'tcp':
|
||||
if self.opts['ipc_mode'] == 'tcp':
|
||||
puburi = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts.get('tcp_master_pub_port', 4512)
|
||||
self.opts['tcp_master_pub_port']
|
||||
)
|
||||
pulluri = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts.get('tcp_master_pull_port', 4513)
|
||||
self.opts['tcp_master_pull_port']
|
||||
)
|
||||
else:
|
||||
puburi = 'ipc://{0}'.format(os.path.join(
|
||||
|
@ -234,14 +240,18 @@ class SaltEvent(object):
|
|||
))
|
||||
salt.utils.zeromq.check_ipc_path_max_len(pulluri)
|
||||
else:
|
||||
if self.opts.get('ipc_mode', '') == 'tcp':
|
||||
if self.opts['ipc_mode'] == 'tcp':
|
||||
puburi = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts.get('tcp_pub_port', 4510)
|
||||
self.opts['tcp_pub_port']
|
||||
)
|
||||
pulluri = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts.get('tcp_pull_port', 4511)
|
||||
self.opts['tcp_pull_port']
|
||||
)
|
||||
else:
|
||||
hash_type = getattr(hashlib, self.opts['hash_type'])
|
||||
# Only use the first 10 chars to keep longer hashes from exceeding the
|
||||
# max socket path length.
|
||||
id_hash = hash_type(salt.utils.to_bytes(self.opts['id'])).hexdigest()[:10]
|
||||
puburi = 'ipc://{0}'.format(os.path.join(
|
||||
sock_dir,
|
||||
'minion_event_{0}_pub.ipc'.format(id_hash)
|
||||
|
@ -297,10 +307,10 @@ class SaltEvent(object):
|
|||
'''
|
||||
self.sub = self.context.socket(zmq.SUB)
|
||||
try:
|
||||
self.sub.setsockopt(zmq.HWM, self.opts.get('salt_event_pub_hwm'))
|
||||
self.sub.setsockopt(zmq.HWM, self.opts['salt_event_pub_hwm'])
|
||||
except AttributeError:
|
||||
self.sub.setsockopt(zmq.SNDHWM, self.opts.get('salt_event_pub_hwm'))
|
||||
self.sub.setsockopt(zmq.RCVHWM, self.opts.get('salt_event_pub_hwm'))
|
||||
self.sub.setsockopt(zmq.SNDHWM, self.opts['salt_event_pub_hwm'])
|
||||
self.sub.setsockopt(zmq.RCVHWM, self.opts['salt_event_pub_hwm'])
|
||||
self.sub.connect(self.puburi)
|
||||
self.poller.register(self.sub, zmq.POLLIN)
|
||||
self.sub.setsockopt_string(zmq.SUBSCRIBE, u'')
|
||||
|
@ -330,7 +340,7 @@ class SaltEvent(object):
|
|||
|
||||
def _get_match_func(self, match_type=None):
|
||||
if match_type is None:
|
||||
match_type = self.opts.get('event_match_type', 'startswith')
|
||||
match_type = self.opts['event_match_type']
|
||||
return getattr(self, '_match_tag_{0}'.format(match_type), None)
|
||||
|
||||
def _check_pending(self, tag, match_func=None):
|
||||
|
@ -568,7 +578,7 @@ class SaltEvent(object):
|
|||
tagend = TAGEND
|
||||
serialized_data = salt.utils.dicttrim.trim_dict(
|
||||
self.serial.dumps(data),
|
||||
self.opts.get('max_event_size', 1048576),
|
||||
self.opts['max_event_size'],
|
||||
is_msgpacked=True,
|
||||
)
|
||||
log.debug('Sending event - data = {0}'.format(data))
|
||||
|
@ -714,7 +724,7 @@ class MinionEvent(SaltEvent):
|
|||
'''
|
||||
def __init__(self, opts, listen=True):
|
||||
super(MinionEvent, self).__init__(
|
||||
'minion', sock_dir=opts.get('sock_dir', None), opts=opts, listen=listen)
|
||||
'minion', sock_dir=opts.get('sock_dir'), opts=opts, listen=listen)
|
||||
|
||||
|
||||
class AsyncEventPublisher(object):
|
||||
|
@ -724,16 +734,18 @@ class AsyncEventPublisher(object):
|
|||
TODO: remove references to "minion_event" whenever we need to use this for other things
|
||||
'''
|
||||
def __init__(self, opts, publish_handler, io_loop=None):
|
||||
self.opts = opts
|
||||
self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
self.opts.update(opts)
|
||||
|
||||
self.publish_handler = publish_handler
|
||||
|
||||
self.io_loop = io_loop or zmq.eventloop.ioloop.ZMQIOLoop()
|
||||
self.context = zmq.Context()
|
||||
|
||||
hash_type = getattr(hashlib, self.opts.get('hash_type', 'md5'))
|
||||
hash_type = getattr(hashlib, self.opts['hash_type'])
|
||||
# Only use the first 10 chars to keep longer hashes from exceeding the
|
||||
# max socket path length.
|
||||
id_hash = hash_type(salt.utils.to_bytes(self.opts.get('id', ''))).hexdigest()[:10]
|
||||
id_hash = hash_type(salt.utils.to_bytes(self.opts['id'])).hexdigest()[:10]
|
||||
epub_sock_path = os.path.join(
|
||||
self.opts['sock_dir'],
|
||||
'minion_event_{0}_pub.ipc'.format(id_hash)
|
||||
|
@ -749,7 +761,7 @@ class AsyncEventPublisher(object):
|
|||
|
||||
self.epub_sock = self.context.socket(zmq.PUB)
|
||||
|
||||
if self.opts.get('ipc_mode', '') == 'tcp':
|
||||
if self.opts['ipc_mode'] == 'tcp':
|
||||
epub_uri = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts['tcp_pub_port']
|
||||
)
|
||||
|
@ -804,7 +816,7 @@ class AsyncEventPublisher(object):
|
|||
self.epull_sock = self.context.socket(zmq.PULL)
|
||||
|
||||
# Securely bind the event sockets
|
||||
if self.opts.get('ipc_mode', '') != 'tcp':
|
||||
if self.opts['ipc_mode'] != 'tcp':
|
||||
old_umask = os.umask(0o177)
|
||||
try:
|
||||
log.info('Starting pub socket on {0}'.format(epub_uri))
|
||||
|
@ -812,7 +824,7 @@ class AsyncEventPublisher(object):
|
|||
log.info('Starting pull socket on {0}'.format(epull_uri))
|
||||
self.epull_sock.bind(epull_uri)
|
||||
finally:
|
||||
if self.opts.get('ipc_mode', '') != 'tcp':
|
||||
if self.opts['ipc_mode'] != 'tcp':
|
||||
os.umask(old_umask)
|
||||
|
||||
self.stream = zmq.eventloop.zmqstream.ZMQStream(self.epull_sock, io_loop=self.io_loop)
|
||||
|
@ -860,7 +872,8 @@ class EventPublisher(multiprocessing.Process):
|
|||
'''
|
||||
def __init__(self, opts):
|
||||
super(EventPublisher, self).__init__()
|
||||
self.opts = opts
|
||||
self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
|
||||
self.opts.update(opts)
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
|
@ -873,18 +886,18 @@ class EventPublisher(multiprocessing.Process):
|
|||
# Prepare the master event publisher
|
||||
self.epub_sock = self.context.socket(zmq.PUB)
|
||||
try:
|
||||
self.epub_sock.setsockopt(zmq.HWM, self.opts.get('event_publisher_pub_hwm'))
|
||||
self.epub_sock.setsockopt(zmq.HWM, self.opts['event_publisher_pub_hwm'])
|
||||
except AttributeError:
|
||||
self.epub_sock.setsockopt(zmq.SNDHWM, self.opts.get('event_publisher_pub_hwm'))
|
||||
self.epub_sock.setsockopt(zmq.RCVHWM, self.opts.get('event_publisher_pub_hwm'))
|
||||
self.epub_sock.setsockopt(zmq.SNDHWM, self.opts['event_publisher_pub_hwm'])
|
||||
self.epub_sock.setsockopt(zmq.RCVHWM, self.opts['event_publisher_pub_hwm'])
|
||||
# Prepare master event pull socket
|
||||
self.epull_sock = self.context.socket(zmq.PULL)
|
||||
if self.opts.get('ipc_mode', '') == 'tcp':
|
||||
if self.opts['ipc_mode'] == 'tcp':
|
||||
epub_uri = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts.get('tcp_master_pub_port', 4512)
|
||||
self.opts['tcp_master_pub_port']
|
||||
)
|
||||
epull_uri = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts.get('tcp_master_pull_port', 4513)
|
||||
self.opts['tcp_master_pull_port']
|
||||
)
|
||||
else:
|
||||
epub_uri = 'ipc://{0}'.format(
|
||||
|
@ -901,9 +914,9 @@ class EventPublisher(multiprocessing.Process):
|
|||
try:
|
||||
self.epull_sock.bind(epull_uri)
|
||||
self.epub_sock.bind(epub_uri)
|
||||
if (self.opts.get('ipc_mode', '') != 'tcp' and (
|
||||
self.opts.get('client_acl') or
|
||||
self.opts.get('external_auth'))):
|
||||
if (self.opts['ipc_mode'] != 'tcp' and (
|
||||
self.opts['client_acl'] or
|
||||
self.opts['external_auth'])):
|
||||
os.chmod(os.path.join(
|
||||
self.opts['sock_dir'], 'master_event_pub.ipc'), 0o666)
|
||||
finally:
|
||||
|
|
Loading…
Add table
Reference in a new issue