mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Always get default option settings from salt.config
It is a poor practice to duplicate settings and scatter them through multiple files. Configuration options have been centralize in salt.config, yet salt.utils.event repeats many of those settings for the benefit of unit tests that pass only partial "opts" parameters. Rather than change all of the unit tests (and other callers) to provide sufficient settings in the "opts" parameters, the various classes that need opts should start with the default options provided by salt.config as a base and then the passed-in "opts" parameter to __init__() functions should override the base options. * Use the appropriate salt.config.DEFAULT_MASTER_OPTS or salt.config.DEFAULT_MINION_OPTS. * Remove all of the spurrious, redundant default settings from salt.utils.event. * Drop use of "opts.get(KEY)" in favor of "opts[KEY]" - KeyError exceptions are a good thing because they indicate that an appropriate default is not in salt.config. TODO: Perform similar clean-ups through other files in the code base. Conflicts: salt/utils/event.py
This commit is contained in:
parent
94ee6f88af
commit
4e53ef0bf6
1 changed files with 45 additions and 32 deletions
|
@ -78,6 +78,7 @@ except ImportError:
|
|||
pass
|
||||
|
||||
# Import salt libs
|
||||
import salt.config
|
||||
import salt.payload
|
||||
import salt.loader
|
||||
import salt.utils
|
||||
|
@ -119,7 +120,7 @@ def get_event(node, sock_dir=None, transport='zeromq', opts=None, listen=True):
|
|||
'''
|
||||
Return an event object suitable for the named transport
|
||||
'''
|
||||
sock_dir = sock_dir or opts.get('sock_dir', None)
|
||||
sock_dir = sock_dir or opts['sock_dir']
|
||||
# TODO: AIO core is separate from transport
|
||||
if transport in ('zeromq', 'tcp'):
|
||||
if node == 'master':
|
||||
|
@ -179,13 +180,22 @@ class SaltEvent(object):
|
|||
self.poller = zmq.Poller()
|
||||
self.cpub = False
|
||||
self.cpush = False
|
||||
|
||||
if opts is None:
|
||||
opts = {}
|
||||
self.opts = opts
|
||||
if node == 'master':
|
||||
self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
|
||||
else:
|
||||
self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
self.opts.update(opts)
|
||||
|
||||
if sock_dir is None:
|
||||
sock_dir = opts.get('sock_dir', None)
|
||||
if salt.utils.is_windows() and 'ipc_mode' not in opts:
|
||||
opts['ipc_mode'] = 'tcp'
|
||||
sock_dir = self.opts['sock_dir']
|
||||
else:
|
||||
self.opts['sock_dir'] = sock_dir
|
||||
|
||||
if salt.utils.is_windows() and not hasattr(self.opts, 'ipc_mode'):
|
||||
self.opts['ipc_mode'] = 'tcp'
|
||||
self.puburi, self.pulluri = self.__load_uri(sock_dir, node)
|
||||
self.pending_tags = []
|
||||
self.pending_events = []
|
||||
|
@ -210,17 +220,17 @@ class SaltEvent(object):
|
|||
Return the string URI for the location of the pull and pub sockets to
|
||||
use for firing and listening to events
|
||||
'''
|
||||
hash_type = getattr(hashlib, self.opts.get('hash_type', 'md5'))
|
||||
hash_type = getattr(hashlib, self.opts['hash_type'])
|
||||
# Only use the first 10 chars to keep longer hashes from exceeding the
|
||||
# max socket path length.
|
||||
id_hash = hash_type(salt.utils.to_bytes(self.opts.get('id', ''))).hexdigest()[:10]
|
||||
if node == 'master':
|
||||
if self.opts.get('ipc_mode', '') == 'tcp':
|
||||
if self.opts['ipc_mode'] == 'tcp':
|
||||
puburi = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts.get('tcp_master_pub_port', 4512)
|
||||
self.opts['tcp_master_pub_port']
|
||||
)
|
||||
pulluri = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts.get('tcp_master_pull_port', 4513)
|
||||
self.opts['tcp_master_pull_port']
|
||||
)
|
||||
else:
|
||||
puburi = 'ipc://{0}'.format(os.path.join(
|
||||
|
@ -234,12 +244,12 @@ class SaltEvent(object):
|
|||
))
|
||||
salt.utils.zeromq.check_ipc_path_max_len(pulluri)
|
||||
else:
|
||||
if self.opts.get('ipc_mode', '') == 'tcp':
|
||||
if self.opts['ipc_mode'] == 'tcp':
|
||||
puburi = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts.get('tcp_pub_port', 4510)
|
||||
self.opts['tcp_pub_port']
|
||||
)
|
||||
pulluri = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts.get('tcp_pull_port', 4511)
|
||||
self.opts['tcp_pull_port']
|
||||
)
|
||||
else:
|
||||
puburi = 'ipc://{0}'.format(os.path.join(
|
||||
|
@ -297,10 +307,10 @@ class SaltEvent(object):
|
|||
'''
|
||||
self.sub = self.context.socket(zmq.SUB)
|
||||
try:
|
||||
self.sub.setsockopt(zmq.HWM, self.opts.get('salt_event_pub_hwm', 0))
|
||||
self.sub.setsockopt(zmq.HWM, self.opts['salt_event_pub_hwm'])
|
||||
except AttributeError:
|
||||
self.sub.setsockopt(zmq.SNDHWM, self.opts.get('salt_event_pub_hwm', 0))
|
||||
self.sub.setsockopt(zmq.RCVHWM, self.opts.get('salt_event_pub_hwm', 0))
|
||||
self.sub.setsockopt(zmq.SNDHWM, self.opts['salt_event_pub_hwm'])
|
||||
self.sub.setsockopt(zmq.RCVHWM, self.opts['salt_event_pub_hwm'])
|
||||
self.sub.connect(self.puburi)
|
||||
self.poller.register(self.sub, zmq.POLLIN)
|
||||
self.sub.setsockopt_string(zmq.SUBSCRIBE, u'')
|
||||
|
@ -568,7 +578,7 @@ class SaltEvent(object):
|
|||
tagend = TAGEND
|
||||
serialized_data = salt.utils.dicttrim.trim_dict(
|
||||
self.serial.dumps(data),
|
||||
self.opts.get('max_event_size', 1048576),
|
||||
self.opts['max_event_size'],
|
||||
is_msgpacked=True,
|
||||
)
|
||||
log.debug('Sending event - data = {0}'.format(data))
|
||||
|
@ -714,7 +724,7 @@ class MinionEvent(SaltEvent):
|
|||
'''
|
||||
def __init__(self, opts, listen=True):
|
||||
super(MinionEvent, self).__init__(
|
||||
'minion', sock_dir=opts.get('sock_dir', None), opts=opts, listen=listen)
|
||||
'minion', sock_dir=opts['sock_dir'], opts=opts, listen=listen)
|
||||
|
||||
|
||||
class AsyncEventPublisher(object):
|
||||
|
@ -724,13 +734,15 @@ class AsyncEventPublisher(object):
|
|||
TODO: remove references to "minion_event" whenever we need to use this for other things
|
||||
'''
|
||||
def __init__(self, opts, publish_handler, io_loop=None):
|
||||
self.opts = opts
|
||||
self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
self.opts.update(opts)
|
||||
|
||||
self.publish_handler = publish_handler
|
||||
|
||||
self.io_loop = io_loop or zmq.eventloop.ioloop.ZMQIOLoop()
|
||||
self.context = zmq.Context()
|
||||
|
||||
hash_type = getattr(hashlib, self.opts.get('hash_type', 'md5'))
|
||||
hash_type = getattr(hashlib, self.opts['hash_type'])
|
||||
# Only use the first 10 chars to keep longer hashes from exceeding the
|
||||
# max socket path length.
|
||||
id_hash = hash_type(salt.utils.to_bytes(self.opts.get('id', ''))).hexdigest()[:10]
|
||||
|
@ -749,7 +761,7 @@ class AsyncEventPublisher(object):
|
|||
|
||||
self.epub_sock = self.context.socket(zmq.PUB)
|
||||
|
||||
if self.opts.get('ipc_mode', '') == 'tcp':
|
||||
if self.opts['ipc_mode'] == 'tcp':
|
||||
epub_uri = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts['tcp_pub_port']
|
||||
)
|
||||
|
@ -804,7 +816,7 @@ class AsyncEventPublisher(object):
|
|||
self.epull_sock = self.context.socket(zmq.PULL)
|
||||
|
||||
# Securely bind the event sockets
|
||||
if self.opts.get('ipc_mode', '') != 'tcp':
|
||||
if self.opts['ipc_mode'] != 'tcp':
|
||||
old_umask = os.umask(0o177)
|
||||
try:
|
||||
log.info('Starting pub socket on {0}'.format(epub_uri))
|
||||
|
@ -812,7 +824,7 @@ class AsyncEventPublisher(object):
|
|||
log.info('Starting pull socket on {0}'.format(epull_uri))
|
||||
self.epull_sock.bind(epull_uri)
|
||||
finally:
|
||||
if self.opts.get('ipc_mode', '') != 'tcp':
|
||||
if self.opts['ipc_mode'] != 'tcp':
|
||||
os.umask(old_umask)
|
||||
|
||||
self.stream = zmq.eventloop.zmqstream.ZMQStream(self.epull_sock, io_loop=self.io_loop)
|
||||
|
@ -860,7 +872,8 @@ class EventPublisher(multiprocessing.Process):
|
|||
'''
|
||||
def __init__(self, opts):
|
||||
super(EventPublisher, self).__init__()
|
||||
self.opts = opts
|
||||
self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
|
||||
self.opts.update(opts)
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
|
@ -873,18 +886,18 @@ class EventPublisher(multiprocessing.Process):
|
|||
# Prepare the master event publisher
|
||||
self.epub_sock = self.context.socket(zmq.PUB)
|
||||
try:
|
||||
self.epub_sock.setsockopt(zmq.HWM, self.opts.get('event_publisher_pub_hwm', 0))
|
||||
self.epub_sock.setsockopt(zmq.HWM, self.opts['event_publisher_pub_hwm'])
|
||||
except AttributeError:
|
||||
self.epub_sock.setsockopt(zmq.SNDHWM, self.opts.get('event_publisher_pub_hwm', 0))
|
||||
self.epub_sock.setsockopt(zmq.RCVHWM, self.opts.get('event_publisher_pub_hwm', 0))
|
||||
self.epub_sock.setsockopt(zmq.SNDHWM, self.opts['event_publisher_pub_hwm'])
|
||||
self.epub_sock.setsockopt(zmq.RCVHWM, self.opts['event_publisher_pub_hwm'])
|
||||
# Prepare master event pull socket
|
||||
self.epull_sock = self.context.socket(zmq.PULL)
|
||||
if self.opts.get('ipc_mode', '') == 'tcp':
|
||||
if self.opts['ipc_mode'] == 'tcp':
|
||||
epub_uri = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts.get('tcp_master_pub_port', 4512)
|
||||
self.opts['tcp_master_pub_port']
|
||||
)
|
||||
epull_uri = 'tcp://127.0.0.1:{0}'.format(
|
||||
self.opts.get('tcp_master_pull_port', 4513)
|
||||
self.opts['tcp_master_pull_port']
|
||||
)
|
||||
else:
|
||||
epub_uri = 'ipc://{0}'.format(
|
||||
|
@ -901,9 +914,9 @@ class EventPublisher(multiprocessing.Process):
|
|||
try:
|
||||
self.epull_sock.bind(epull_uri)
|
||||
self.epub_sock.bind(epub_uri)
|
||||
if (self.opts.get('ipc_mode', '') != 'tcp' and (
|
||||
self.opts.get('client_acl') or
|
||||
self.opts.get('external_auth'))):
|
||||
if (self.opts['ipc_mode'] != 'tcp' and (
|
||||
self.opts['client_acl'] or
|
||||
self.opts['external_auth'])):
|
||||
os.chmod(os.path.join(
|
||||
self.opts['sock_dir'], 'master_event_pub.ipc'), 0o666)
|
||||
finally:
|
||||
|
|
Loading…
Add table
Reference in a new issue