Merge pull request #30343 from rallytime/fix-2015.8

Fix 2015.8 from incomplete back-port
This commit is contained in:
Nicole Thomas 2016-01-13 14:56:26 -07:00
commit 6079a96e6e
4 changed files with 70 additions and 40 deletions

View file

@ -811,3 +811,9 @@
############################################
# Which returner(s) will be used for minion's result:
#return: mysql
###### Miscellaneous settings ######
############################################
# Default match type for filtering events tags: startswith, endswith, find, regex, fnmatch
#event_match_type: startswith

View file

@ -655,3 +655,9 @@
############################################
# Which returner(s) will be used for minion's result:
#return: mysql
###### Miscellaneous settings ######
############################################
# Default match type for filtering events tags: startswith, endswith, find, regex, fnmatch
#event_match_type: startswith

View file

@ -387,6 +387,9 @@ VALID_OPTS = {
# Events matching a tag in this list should never be sent to an event returner.
'event_return_blacklist': list,
# default match type for filtering events tags: startswith, endswith, find, regex, fnmatch
'event_match_type': str,
# This pidfile to write out to when a daemon starts
'pidfile': str,
@ -781,7 +784,7 @@ DEFAULT_MINION_OPTS = {
'user': 'root',
'root_dir': salt.syspaths.ROOT_DIR,
'pki_dir': os.path.join(salt.syspaths.CONFIG_DIR, 'pki', 'minion'),
'id': None,
'id': '',
'cachedir': os.path.join(salt.syspaths.CACHE_DIR, 'minion'),
'cache_jobs': False,
'grains_cache': False,
@ -966,6 +969,7 @@ DEFAULT_MINION_OPTS = {
'salt_event_pub_hwm': 2000,
# ZMQ HWM for EventPublisher pub socket - different for minion vs. master
'event_publisher_pub_hwm': 1000,
'event_match_type': 'startswith',
}
DEFAULT_MASTER_OPTS = {
@ -1115,6 +1119,7 @@ DEFAULT_MASTER_OPTS = {
'event_return_queue': 0,
'event_return_whitelist': [],
'event_return_blacklist': [],
'event_match_type': 'startswith',
'serial': 'msgpack',
'state_verbose': True,
'state_output': 'full',
@ -2742,7 +2747,7 @@ def apply_minion_config(overrides=None,
# No ID provided. Will getfqdn save us?
using_ip_for_id = False
if opts['id'] is None:
if not opts.get('id'):
opts['id'], using_ip_for_id = get_id(
opts,
cache_minion_id=cache_minion_id)
@ -2859,7 +2864,7 @@ def apply_master_config(overrides=None, defaults=None):
using_ip_for_id = False
append_master = False
if opts.get('id') is None:
if not opts.get('id'):
opts['id'], using_ip_for_id = get_id(
opts,
cache_minion_id=None)

View file

@ -78,6 +78,7 @@ except ImportError:
pass
# Import salt libs
import salt.config
import salt.payload
import salt.loader
import salt.utils
@ -119,7 +120,7 @@ def get_event(node, sock_dir=None, transport='zeromq', opts=None, listen=True):
'''
Return an event object suitable for the named transport
'''
sock_dir = sock_dir or opts.get('sock_dir', None)
sock_dir = sock_dir or opts['sock_dir']
# TODO: AIO core is separate from transport
if transport in ('zeromq', 'tcp'):
if node == 'master':
@ -179,13 +180,22 @@ class SaltEvent(object):
self.poller = zmq.Poller()
self.cpub = False
self.cpush = False
if opts is None:
opts = {}
self.opts = opts
if node == 'master':
self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
else:
self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
self.opts.update(opts)
if sock_dir is None:
sock_dir = opts.get('sock_dir', None)
if salt.utils.is_windows() and 'ipc_mode' not in opts:
opts['ipc_mode'] = 'tcp'
sock_dir = self.opts['sock_dir']
else:
self.opts['sock_dir'] = sock_dir
if salt.utils.is_windows() and not hasattr(self.opts, 'ipc_mode'):
self.opts['ipc_mode'] = 'tcp'
self.puburi, self.pulluri = self.__load_uri(sock_dir, node)
self.pending_tags = []
self.pending_events = []
@ -210,17 +220,13 @@ class SaltEvent(object):
Return the string URI for the location of the pull and pub sockets to
use for firing and listening to events
'''
hash_type = getattr(hashlib, self.opts.get('hash_type', 'md5'))
# Only use the first 10 chars to keep longer hashes from exceeding the
# max socket path length.
id_hash = hash_type(salt.utils.to_bytes(self.opts.get('id', ''))).hexdigest()[:10]
if node == 'master':
if self.opts.get('ipc_mode', '') == 'tcp':
if self.opts['ipc_mode'] == 'tcp':
puburi = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_master_pub_port', 4512)
self.opts['tcp_master_pub_port']
)
pulluri = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_master_pull_port', 4513)
self.opts['tcp_master_pull_port']
)
else:
puburi = 'ipc://{0}'.format(os.path.join(
@ -234,14 +240,18 @@ class SaltEvent(object):
))
salt.utils.zeromq.check_ipc_path_max_len(pulluri)
else:
if self.opts.get('ipc_mode', '') == 'tcp':
if self.opts['ipc_mode'] == 'tcp':
puburi = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_pub_port', 4510)
self.opts['tcp_pub_port']
)
pulluri = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_pull_port', 4511)
self.opts['tcp_pull_port']
)
else:
hash_type = getattr(hashlib, self.opts['hash_type'])
# Only use the first 10 chars to keep longer hashes from exceeding the
# max socket path length.
id_hash = hash_type(salt.utils.to_bytes(self.opts['id'])).hexdigest()[:10]
puburi = 'ipc://{0}'.format(os.path.join(
sock_dir,
'minion_event_{0}_pub.ipc'.format(id_hash)
@ -297,10 +307,10 @@ class SaltEvent(object):
'''
self.sub = self.context.socket(zmq.SUB)
try:
self.sub.setsockopt(zmq.HWM, self.opts.get('salt_event_pub_hwm'))
self.sub.setsockopt(zmq.HWM, self.opts['salt_event_pub_hwm'])
except AttributeError:
self.sub.setsockopt(zmq.SNDHWM, self.opts.get('salt_event_pub_hwm'))
self.sub.setsockopt(zmq.RCVHWM, self.opts.get('salt_event_pub_hwm'))
self.sub.setsockopt(zmq.SNDHWM, self.opts['salt_event_pub_hwm'])
self.sub.setsockopt(zmq.RCVHWM, self.opts['salt_event_pub_hwm'])
self.sub.connect(self.puburi)
self.poller.register(self.sub, zmq.POLLIN)
self.sub.setsockopt_string(zmq.SUBSCRIBE, u'')
@ -330,7 +340,7 @@ class SaltEvent(object):
def _get_match_func(self, match_type=None):
if match_type is None:
match_type = self.opts.get('event_match_type', 'startswith')
match_type = self.opts['event_match_type']
return getattr(self, '_match_tag_{0}'.format(match_type), None)
def _check_pending(self, tag, match_func=None):
@ -568,7 +578,7 @@ class SaltEvent(object):
tagend = TAGEND
serialized_data = salt.utils.dicttrim.trim_dict(
self.serial.dumps(data),
self.opts.get('max_event_size', 1048576),
self.opts['max_event_size'],
is_msgpacked=True,
)
log.debug('Sending event - data = {0}'.format(data))
@ -714,7 +724,7 @@ class MinionEvent(SaltEvent):
'''
def __init__(self, opts, listen=True):
super(MinionEvent, self).__init__(
'minion', sock_dir=opts.get('sock_dir', None), opts=opts, listen=listen)
'minion', sock_dir=opts.get('sock_dir'), opts=opts, listen=listen)
class AsyncEventPublisher(object):
@ -724,16 +734,18 @@ class AsyncEventPublisher(object):
TODO: remove references to "minion_event" whenever we need to use this for other things
'''
def __init__(self, opts, publish_handler, io_loop=None):
self.opts = opts
self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
self.opts.update(opts)
self.publish_handler = publish_handler
self.io_loop = io_loop or zmq.eventloop.ioloop.ZMQIOLoop()
self.context = zmq.Context()
hash_type = getattr(hashlib, self.opts.get('hash_type', 'md5'))
hash_type = getattr(hashlib, self.opts['hash_type'])
# Only use the first 10 chars to keep longer hashes from exceeding the
# max socket path length.
id_hash = hash_type(salt.utils.to_bytes(self.opts.get('id', ''))).hexdigest()[:10]
id_hash = hash_type(salt.utils.to_bytes(self.opts['id'])).hexdigest()[:10]
epub_sock_path = os.path.join(
self.opts['sock_dir'],
'minion_event_{0}_pub.ipc'.format(id_hash)
@ -749,7 +761,7 @@ class AsyncEventPublisher(object):
self.epub_sock = self.context.socket(zmq.PUB)
if self.opts.get('ipc_mode', '') == 'tcp':
if self.opts['ipc_mode'] == 'tcp':
epub_uri = 'tcp://127.0.0.1:{0}'.format(
self.opts['tcp_pub_port']
)
@ -804,7 +816,7 @@ class AsyncEventPublisher(object):
self.epull_sock = self.context.socket(zmq.PULL)
# Securely bind the event sockets
if self.opts.get('ipc_mode', '') != 'tcp':
if self.opts['ipc_mode'] != 'tcp':
old_umask = os.umask(0o177)
try:
log.info('Starting pub socket on {0}'.format(epub_uri))
@ -812,7 +824,7 @@ class AsyncEventPublisher(object):
log.info('Starting pull socket on {0}'.format(epull_uri))
self.epull_sock.bind(epull_uri)
finally:
if self.opts.get('ipc_mode', '') != 'tcp':
if self.opts['ipc_mode'] != 'tcp':
os.umask(old_umask)
self.stream = zmq.eventloop.zmqstream.ZMQStream(self.epull_sock, io_loop=self.io_loop)
@ -860,7 +872,8 @@ class EventPublisher(multiprocessing.Process):
'''
def __init__(self, opts):
super(EventPublisher, self).__init__()
self.opts = opts
self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
self.opts.update(opts)
def run(self):
'''
@ -873,18 +886,18 @@ class EventPublisher(multiprocessing.Process):
# Prepare the master event publisher
self.epub_sock = self.context.socket(zmq.PUB)
try:
self.epub_sock.setsockopt(zmq.HWM, self.opts.get('event_publisher_pub_hwm'))
self.epub_sock.setsockopt(zmq.HWM, self.opts['event_publisher_pub_hwm'])
except AttributeError:
self.epub_sock.setsockopt(zmq.SNDHWM, self.opts.get('event_publisher_pub_hwm'))
self.epub_sock.setsockopt(zmq.RCVHWM, self.opts.get('event_publisher_pub_hwm'))
self.epub_sock.setsockopt(zmq.SNDHWM, self.opts['event_publisher_pub_hwm'])
self.epub_sock.setsockopt(zmq.RCVHWM, self.opts['event_publisher_pub_hwm'])
# Prepare master event pull socket
self.epull_sock = self.context.socket(zmq.PULL)
if self.opts.get('ipc_mode', '') == 'tcp':
if self.opts['ipc_mode'] == 'tcp':
epub_uri = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_master_pub_port', 4512)
self.opts['tcp_master_pub_port']
)
epull_uri = 'tcp://127.0.0.1:{0}'.format(
self.opts.get('tcp_master_pull_port', 4513)
self.opts['tcp_master_pull_port']
)
else:
epub_uri = 'ipc://{0}'.format(
@ -901,9 +914,9 @@ class EventPublisher(multiprocessing.Process):
try:
self.epull_sock.bind(epull_uri)
self.epub_sock.bind(epub_uri)
if (self.opts.get('ipc_mode', '') != 'tcp' and (
self.opts.get('client_acl') or
self.opts.get('external_auth'))):
if (self.opts['ipc_mode'] != 'tcp' and (
self.opts['client_acl'] or
self.opts['external_auth'])):
os.chmod(os.path.join(
self.opts['sock_dir'], 'master_event_pub.ipc'), 0o666)
finally: