mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge branch '2019.2' into fix_test_symlink_2019.2
This commit is contained in:
commit
8e1c882715
9 changed files with 89 additions and 117 deletions
|
@ -20,7 +20,6 @@ from salt.exceptions import (
|
|||
CommandExecutionError, MinionError
|
||||
)
|
||||
import salt.client
|
||||
import salt.crypt
|
||||
import salt.loader
|
||||
import salt.payload
|
||||
import salt.transport.client
|
||||
|
|
|
@ -63,10 +63,12 @@ def _ctl_cmd(cmd, name, conf_file, bin_env):
|
|||
|
||||
|
||||
def _get_return(ret):
|
||||
if ret['retcode'] == 0:
|
||||
return ret['stdout']
|
||||
else:
|
||||
return ''
|
||||
retmsg = ret['stdout']
|
||||
if ret['retcode'] != 0:
|
||||
# This is a non 0 exit code
|
||||
if 'ERROR' not in retmsg:
|
||||
retmsg = 'ERROR: {}'.format(retmsg)
|
||||
return retmsg
|
||||
|
||||
|
||||
def start(name='all', user=None, conf_file=None, bin_env=None):
|
||||
|
|
|
@ -19,7 +19,6 @@ import inspect
|
|||
import salt.loader
|
||||
import salt.fileclient
|
||||
import salt.minion
|
||||
import salt.crypt
|
||||
import salt.transport.client
|
||||
import salt.utils.args
|
||||
import salt.utils.cache
|
||||
|
|
|
@ -2736,7 +2736,7 @@ def managed(name,
|
|||
try:
|
||||
if __opts__['test']:
|
||||
if 'file.check_managed_changes' in __salt__:
|
||||
ret['pchanges'] = __salt__['file.check_managed_changes'](
|
||||
ret['changes'] = __salt__['file.check_managed_changes'](
|
||||
name,
|
||||
source,
|
||||
source_hash,
|
||||
|
@ -2767,15 +2767,17 @@ def managed(name,
|
|||
reset=win_perms_reset)
|
||||
except CommandExecutionError as exc:
|
||||
if exc.strerror.startswith('Path not found'):
|
||||
ret['pchanges'] = '{0} will be created'.format(name)
|
||||
ret['changes'] = {name: 'will be created'}
|
||||
|
||||
if isinstance(ret['pchanges'], tuple):
|
||||
ret['result'], ret['comment'] = ret['pchanges']
|
||||
elif ret['pchanges']:
|
||||
if isinstance(ret['changes'], tuple):
|
||||
ret['result'], ret['comment'] = ret['changes']
|
||||
elif ret['changes']:
|
||||
ret['result'] = None
|
||||
ret['comment'] = 'The file {0} is set to be changed'.format(name)
|
||||
if 'diff' in ret['pchanges'] and not show_changes:
|
||||
ret['pchanges']['diff'] = '<show_changes=False>'
|
||||
ret['comment'] += ('\nNote: No changes made, actual changes may\n'
|
||||
'be different due to other states.')
|
||||
if 'diff' in ret['changes'] and not show_changes:
|
||||
ret['changes']['diff'] = '<show_changes=False>'
|
||||
else:
|
||||
ret['result'] = True
|
||||
ret['comment'] = 'The file {0} is in the correct state'.format(name)
|
||||
|
|
|
@ -10,7 +10,7 @@ import logging
|
|||
import socket
|
||||
import weakref
|
||||
import time
|
||||
import threading
|
||||
import sys
|
||||
|
||||
# Import 3rd-party libs
|
||||
import msgpack
|
||||
|
@ -85,6 +85,11 @@ class FutureWithTimeout(tornado.concurrent.Future):
|
|||
self.set_exception(exc)
|
||||
|
||||
|
||||
class IPCExceptionProxy(object):
|
||||
def __init__(self, orig_info):
|
||||
self.orig_info = orig_info
|
||||
|
||||
|
||||
class IPCServer(object):
|
||||
'''
|
||||
A Tornado IPC server very similar to Tornado's TCPServer class
|
||||
|
@ -244,36 +249,7 @@ class IPCClient(object):
|
|||
case it is used as the port for a tcp
|
||||
localhost connection.
|
||||
'''
|
||||
|
||||
# Create singleton map between two sockets
|
||||
instance_map = weakref.WeakKeyDictionary()
|
||||
|
||||
def __new__(cls, socket_path, io_loop=None):
|
||||
io_loop = io_loop or tornado.ioloop.IOLoop.current()
|
||||
if io_loop not in IPCClient.instance_map:
|
||||
IPCClient.instance_map[io_loop] = weakref.WeakValueDictionary()
|
||||
loop_instance_map = IPCClient.instance_map[io_loop]
|
||||
|
||||
# FIXME
|
||||
key = six.text_type(socket_path)
|
||||
|
||||
client = loop_instance_map.get(key)
|
||||
if client is None:
|
||||
log.debug('Initializing new IPCClient for path: %s', key)
|
||||
client = object.__new__(cls)
|
||||
# FIXME
|
||||
client.__singleton_init__(io_loop=io_loop, socket_path=socket_path)
|
||||
client._instance_key = key
|
||||
loop_instance_map[key] = client
|
||||
client._refcount = 1
|
||||
client._refcount_lock = threading.RLock()
|
||||
else:
|
||||
log.debug('Re-using IPCClient for %s', key)
|
||||
with client._refcount_lock:
|
||||
client._refcount += 1
|
||||
return client
|
||||
|
||||
def __singleton_init__(self, socket_path, io_loop=None):
|
||||
def __init__(self, socket_path, io_loop=None):
|
||||
'''
|
||||
Create a new IPC client
|
||||
|
||||
|
@ -292,10 +268,6 @@ class IPCClient(object):
|
|||
encoding = 'utf-8'
|
||||
self.unpacker = msgpack.Unpacker(encoding=encoding)
|
||||
|
||||
def __init__(self, socket_path, io_loop=None):
|
||||
# Handled by singleton __new__
|
||||
pass
|
||||
|
||||
def connected(self):
|
||||
return self.stream is not None and not self.stream.closed()
|
||||
|
||||
|
@ -367,16 +339,11 @@ class IPCClient(object):
|
|||
|
||||
def __del__(self):
|
||||
try:
|
||||
with self._refcount_lock:
|
||||
# Make sure we actually close no matter if something
|
||||
# went wrong with our ref counting
|
||||
self._refcount = 1
|
||||
try:
|
||||
self.close()
|
||||
except socket.error as exc:
|
||||
if exc.errno != errno.EBADF:
|
||||
# If its not a bad file descriptor error, raise
|
||||
raise
|
||||
self.close()
|
||||
except socket.error as exc:
|
||||
if exc.errno != errno.EBADF:
|
||||
# If its not a bad file descriptor error, raise
|
||||
raise
|
||||
except TypeError:
|
||||
# This is raised when Python's GC has collected objects which
|
||||
# would be needed when calling self.close()
|
||||
|
@ -391,16 +358,6 @@ class IPCClient(object):
|
|||
if self._closing:
|
||||
return
|
||||
|
||||
if self._refcount > 1:
|
||||
# Decrease refcount
|
||||
with self._refcount_lock:
|
||||
self._refcount -= 1
|
||||
log.debug(
|
||||
'This is not the last %s instance. Not closing yet.',
|
||||
self.__class__.__name__
|
||||
)
|
||||
return
|
||||
|
||||
self._closing = True
|
||||
|
||||
log.debug('Closing %s instance', self.__class__.__name__)
|
||||
|
@ -408,17 +365,6 @@ class IPCClient(object):
|
|||
if self.stream is not None and not self.stream.closed():
|
||||
self.stream.close()
|
||||
|
||||
# Remove the entry from the instance map so
|
||||
# that a closed entry may not be reused.
|
||||
# This forces this operation even if the reference
|
||||
# count of the entry has not yet gone to zero.
|
||||
if self.io_loop in self.__class__.instance_map:
|
||||
loop_instance_map = self.__class__.instance_map[self.io_loop]
|
||||
if self._instance_key in loop_instance_map:
|
||||
del loop_instance_map[self._instance_key]
|
||||
if not loop_instance_map:
|
||||
del self.__class__.instance_map[self.io_loop]
|
||||
|
||||
|
||||
class IPCMessageClient(IPCClient):
|
||||
'''
|
||||
|
@ -637,12 +583,13 @@ class IPCMessageSubscriberService(IPCClient):
|
|||
|
||||
To use this refer to IPCMessageSubscriber documentation.
|
||||
'''
|
||||
def __singleton_init__(self, socket_path, io_loop=None):
|
||||
super(IPCMessageSubscriberService, self).__singleton_init__(
|
||||
def __init__(self, socket_path, io_loop=None):
|
||||
super(IPCMessageSubscriberService, self).__init__(
|
||||
socket_path, io_loop=io_loop)
|
||||
self.saved_data = []
|
||||
self._read_in_progress = Lock()
|
||||
self.handlers = weakref.WeakSet()
|
||||
self.read_stream_future = None
|
||||
|
||||
def _subscribe(self, handler):
|
||||
self.handlers.add(handler)
|
||||
|
@ -670,16 +617,16 @@ class IPCMessageSubscriberService(IPCClient):
|
|||
if timeout is None:
|
||||
timeout = 5
|
||||
|
||||
read_stream_future = None
|
||||
self.read_stream_future = None
|
||||
while self._has_subscribers():
|
||||
if read_stream_future is None:
|
||||
read_stream_future = self.stream.read_bytes(4096, partial=True)
|
||||
if self.read_stream_future is None:
|
||||
self.read_stream_future = self.stream.read_bytes(4096, partial=True)
|
||||
|
||||
try:
|
||||
wire_bytes = yield FutureWithTimeout(self.io_loop,
|
||||
read_stream_future,
|
||||
self.read_stream_future,
|
||||
timeout)
|
||||
read_stream_future = None
|
||||
self.read_stream_future = None
|
||||
|
||||
self.unpacker.feed(wire_bytes)
|
||||
msgs = [msg['body'] for msg in self.unpacker]
|
||||
|
@ -694,6 +641,7 @@ class IPCMessageSubscriberService(IPCClient):
|
|||
break
|
||||
except Exception as exc:
|
||||
log.error('Exception occurred in Subscriber while handling stream: %s', exc)
|
||||
exc = IPCExceptionProxy(sys.exc_info())
|
||||
self._feed_subscribers([exc])
|
||||
break
|
||||
|
||||
|
@ -718,7 +666,7 @@ class IPCMessageSubscriberService(IPCClient):
|
|||
except Exception as exc:
|
||||
log.error('Exception occurred while Subscriber connecting: %s', exc)
|
||||
yield tornado.gen.sleep(1)
|
||||
self._read(timeout)
|
||||
yield self._read(timeout)
|
||||
|
||||
def close(self):
|
||||
'''
|
||||
|
@ -726,8 +674,11 @@ class IPCMessageSubscriberService(IPCClient):
|
|||
Sockets and filehandles should be closed explicitly, to prevent
|
||||
leaks.
|
||||
'''
|
||||
if not self._closing:
|
||||
super(IPCMessageSubscriberService, self).close()
|
||||
super(IPCMessageSubscriberService, self).close()
|
||||
if self.read_stream_future is not None and self.read_stream_future.done():
|
||||
exc = self.read_stream_future.exception()
|
||||
if exc and not isinstance(exc, tornado.iostream.StreamClosedError):
|
||||
log.error("Read future returned exception %r", exc)
|
||||
|
||||
def __del__(self):
|
||||
if IPCMessageSubscriberService in globals():
|
||||
|
@ -801,8 +752,8 @@ class IPCMessageSubscriber(object):
|
|||
raise tornado.gen.Return(None)
|
||||
if data is None:
|
||||
break
|
||||
elif isinstance(data, Exception):
|
||||
raise data
|
||||
elif isinstance(data, IPCExceptionProxy):
|
||||
six.reraise(*data.orig_info)
|
||||
elif callback:
|
||||
self.service.io_loop.spawn_callback(callback, data)
|
||||
else:
|
||||
|
@ -821,6 +772,7 @@ class IPCMessageSubscriber(object):
|
|||
|
||||
def close(self):
|
||||
self.service.unsubscribe(self)
|
||||
self.service.close()
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
|
|
@ -34,7 +34,7 @@ import salt.transport.client
|
|||
import salt.transport.server
|
||||
import salt.transport.mixins.auth
|
||||
from salt.ext import six
|
||||
from salt.exceptions import SaltReqTimeoutError
|
||||
from salt.exceptions import SaltReqTimeoutError, SaltException
|
||||
from salt._compat import ipaddress
|
||||
|
||||
from salt.utils.zeromq import zmq, ZMQDefaultLoop, install_zmq, ZMQ_VERSION_INFO, LIBZMQ_VERSION_INFO
|
||||
|
@ -260,12 +260,18 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
|
|||
|
||||
@property
|
||||
def master_uri(self):
|
||||
if 'master_uri' in self.opts:
|
||||
return self.opts['master_uri']
|
||||
|
||||
# if by chance master_uri is not there..
|
||||
if 'master_ip' in self.opts:
|
||||
return _get_master_uri(self.opts['master_ip'],
|
||||
self.opts['master_port'],
|
||||
source_ip=self.opts.get('source_ip'),
|
||||
source_port=self.opts.get('source_ret_port'))
|
||||
return self.opts['master_uri']
|
||||
|
||||
# if we've reached here something is very abnormal
|
||||
raise SaltException('ReqChannel: missing master_uri/master_ip in self.opts')
|
||||
|
||||
def _package_load(self, load):
|
||||
return {
|
||||
|
|
|
@ -459,6 +459,21 @@ class FileTest(ModuleCase, SaltReturnAssertsMixin):
|
|||
changes = next(six.itervalues(ret))['changes']
|
||||
self.assertEqual('<show_changes=False>', changes['diff'])
|
||||
|
||||
def test_managed_show_changes_true(self):
|
||||
'''
|
||||
file.managed test interface
|
||||
'''
|
||||
name = os.path.join(TMP, 'grail_not_scene33')
|
||||
with salt.utils.files.fopen(name, 'wb') as fp_:
|
||||
fp_.write(b'test_managed_show_changes_false\n')
|
||||
|
||||
ret = self.run_state(
|
||||
'file.managed', name=name, source='salt://grail/scene33',
|
||||
)
|
||||
|
||||
changes = next(six.itervalues(ret))['changes']
|
||||
self.assertIn('diff', changes)
|
||||
|
||||
@skipIf(IS_WINDOWS, 'Don\'t know how to fix for Windows')
|
||||
def test_managed_escaped_file_path(self):
|
||||
'''
|
||||
|
|
|
@ -86,13 +86,14 @@ class IPCMessageClient(BaseIPCReqCase):
|
|||
'''
|
||||
|
||||
def _get_channel(self):
|
||||
channel = salt.transport.ipc.IPCMessageClient(
|
||||
socket_path=self.socket_path,
|
||||
io_loop=self.io_loop,
|
||||
)
|
||||
channel.connect(callback=self.stop)
|
||||
self.wait()
|
||||
return channel
|
||||
if not hasattr(self, 'channel') or self.channel is None:
|
||||
self.channel = salt.transport.ipc.IPCMessageClient(
|
||||
socket_path=self.socket_path,
|
||||
io_loop=self.io_loop,
|
||||
)
|
||||
self.channel.connect(callback=self.stop)
|
||||
self.wait()
|
||||
return self.channel
|
||||
|
||||
def setUp(self):
|
||||
super(IPCMessageClient, self).setUp()
|
||||
|
@ -107,6 +108,8 @@ class IPCMessageClient(BaseIPCReqCase):
|
|||
if exc.errno != errno.EBADF:
|
||||
# If its not a bad file descriptor error, raise
|
||||
raise
|
||||
finally:
|
||||
self.channel = None
|
||||
|
||||
def test_singleton(self):
|
||||
channel = self._get_channel()
|
||||
|
@ -120,23 +123,6 @@ class IPCMessageClient(BaseIPCReqCase):
|
|||
self.wait()
|
||||
self.assertEqual(self.payloads[0], msg)
|
||||
|
||||
def test_last_singleton_instance_closes(self):
|
||||
channel = self._get_channel()
|
||||
msg = {'foo': 'bar', 'stop': True}
|
||||
log.debug('Sending msg1')
|
||||
self.channel.send(msg)
|
||||
self.wait()
|
||||
self.assertEqual(self.payloads[0], msg)
|
||||
channel.close()
|
||||
# Since this is a singleton, and only the last singleton instance
|
||||
# should actually close the connection, the next code should still
|
||||
# work and not timeout
|
||||
msg = {'bar': 'foo', 'stop': True}
|
||||
log.debug('Sending msg2')
|
||||
self.channel.send(msg)
|
||||
self.wait()
|
||||
self.assertEqual(self.payloads[1], msg)
|
||||
|
||||
def test_basic_send(self):
|
||||
msg = {'foo': 'bar', 'stop': True}
|
||||
self.channel.send(msg)
|
||||
|
|
|
@ -145,6 +145,17 @@ class ClearReqTestCases(BaseZMQReqCase, ReqChannelMixin):
|
|||
'''
|
||||
raise tornado.gen.Return((payload, {'fun': 'send_clear'}))
|
||||
|
||||
def test_master_uri_override(self):
|
||||
'''
|
||||
ensure master_uri kwarg is respected
|
||||
'''
|
||||
# minion_config should be 127.0.0.1, we want a different uri that still connects
|
||||
uri = 'tcp://{master_ip}:{master_port}'.format(master_ip='localhost', master_port=self.minion_config['master_port'])
|
||||
|
||||
channel = salt.transport.Channel.factory(self.minion_config, master_uri=uri)
|
||||
self.assertIn('localhost', channel.master_uri)
|
||||
del channel
|
||||
|
||||
|
||||
@flaky
|
||||
@skipIf(ON_SUSE, 'Skipping until https://github.com/saltstack/salt/issues/32902 gets fixed')
|
||||
|
|
Loading…
Add table
Reference in a new issue