Merge branch '2017.7' into fix_47125

This commit is contained in:
Mike Place 2018-05-04 09:14:06 -05:00 committed by GitHub
commit c9bab0b8e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 365 additions and 243 deletions

View file

@ -92,12 +92,9 @@ platforms:
- yum install -y upstart
provisioner:
salt_bootstrap_options: -P -p rsync -y -x python2.7 -X git v<%= version %> >/dev/null
- name: ubuntu-rolling
- name: ubuntu-18.04
driver_config:
image: ubuntu:rolling
run_command: /lib/systemd/systemd
provisioner:
salt_bootstrap_url: https://raw.githubusercontent.com/saltstack/salt-bootstrap/develop/bootstrap-salt.sh
- name: ubuntu-16.04
driver_config:
run_command: /lib/systemd/systemd

View file

@ -38,7 +38,10 @@ from __future__ import division
import re
from docutils import nodes
from docutils.parsers.rst import directives
from sphinx.util.compat import Directive
try:
from sphinx.util.compat import Directive
except ImportError:
from docutils.parsers.rst import Directive
CONTROL_HEIGHT = 30

View file

@ -1,8 +1,9 @@
===========================
Salt 2017.7.6 Release Notes
In Progress: Salt 2017.7.6 Release Notes
===========================
Version 2017.7.6 is a bugfix release for :ref:`2017.7.0 <release-2017-7-0>`.
Version 2017.7.6 is an **unreleased** bugfix release for :ref:`2017.7.0 <release-2017-7-0>`.
This release is still in progress and has not been released yet.
Option to Return to Previous Pillar Include Behavior
----------------------------------------------------

View file

@ -0,0 +1,4 @@
-r base.txt
# Required by Tornado to handle threads stuff.
futures>=2.0

View file

@ -0,0 +1 @@
-r base.txt

View file

@ -47,11 +47,6 @@ from salt.exceptions import (
# Import third party libs
import salt.ext.six as six
# pylint: disable=import-error
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
# Try to import range from https://github.com/ytoolshed/range
HAS_RANGE = False

View file

@ -56,11 +56,7 @@ try:
HAS_WINSHELL = True
except ImportError:
HAS_WINSHELL = False
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
from salt.utils.zeromq import zmq
# The directory where salt thin is deployed
DEFAULT_THIN_DIR = '/var/tmp/.%%USER%%_%%FQDNUUID%%_salt'
@ -207,7 +203,7 @@ class SSH(object):
'''
def __init__(self, opts):
pull_sock = os.path.join(opts['sock_dir'], 'master_event_pull.ipc')
if os.path.isfile(pull_sock) and HAS_ZMQ:
if os.path.exists(pull_sock) and zmq:
self.event = salt.utils.event.get_event(
'master',
opts['sock_dir'],

View file

@ -15,15 +15,12 @@ import errno
# Import ioflo libs
import ioflo.base.deeding
# Import third party libs
try:
import zmq
import salt.master
import salt.crypt
import salt.daemons.masterapi
import salt.payload
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
from salt.utils.zeromq import zmq
import salt.master
import salt.crypt
import salt.daemons.masterapi
import salt.payload
log = logging.getLogger(__name__)
@ -159,7 +156,7 @@ class SaltZmqPublisher(ioflo.base.deeding.Deed):
'''
Set up tracking value(s)
'''
if not HAS_ZMQ:
if not zmq:
return
self.created = False
self.serial = salt.payload.Serial(self.opts.value)

View file

@ -173,11 +173,7 @@ from __future__ import absolute_import
import logging
# Import third party libraries
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
from salt.utils.zeromq import zmq
try:
# pylint: disable=W0611
@ -209,7 +205,7 @@ def __virtual__():
'''
Load only if napalm-logs is installed.
'''
if not HAS_NAPALM_LOGS or not HAS_ZMQ:
if not HAS_NAPALM_LOGS or not zmq:
return (False, 'napalm_syslog could not be loaded. \
Please install napalm-logs library amd ZeroMQ.')
return True

View file

@ -28,23 +28,9 @@ except ImportError:
# pylint: disable=import-error,no-name-in-module,redefined-builtin
import salt.ext.six as six
from salt.ext.six.moves import range
from salt.utils.zeromq import zmq, ZMQDefaultLoop, install_zmq, ZMQ_VERSION_INFO
# pylint: enable=import-error,no-name-in-module,redefined-builtin
try:
import zmq
import zmq.eventloop.ioloop
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
import tornado
TORNADO_50 = tornado.version_info >= (5,)
from salt.utils.async import LOOP_CLASS
import tornado.gen # pylint: disable=F0401
# Import salt libs
@ -378,23 +364,13 @@ class Master(SMaster):
:param dict: The salt options
'''
if HAS_ZMQ:
# Warn if ZMQ < 3.2
try:
zmq_version_info = zmq.zmq_version_info()
except AttributeError:
# PyZMQ <= 2.1.9 does not have zmq_version_info, fall back to
# using zmq.zmq_version() and build a version info tuple.
zmq_version_info = tuple(
[int(x) for x in zmq.zmq_version().split('.')]
)
if zmq_version_info < (3, 2):
log.warning(
'You have a version of ZMQ less than ZMQ 3.2! There are '
'known connection keep-alive issues with ZMQ < 3.2 which '
'may result in loss of contact with minions. Please '
'upgrade your ZMQ!'
)
if zmq and ZMQ_VERSION_INFO < (3, 2):
log.warning(
'You have a version of ZMQ less than ZMQ 3.2! There are '
'known connection keep-alive issues with ZMQ < 3.2 which '
'may result in loss of contact with minions. Please '
'upgrade your ZMQ!'
)
SMaster.__init__(self, opts)
def __set_max_open_files(self):
@ -858,9 +834,8 @@ class MWorker(SignalHandlingMultiprocessingProcess):
Bind to the local port
'''
# using ZMQIOLoop since we *might* need zmq in there
if HAS_ZMQ and not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS()
install_zmq()
self.io_loop = ZMQDefaultLoop()
self.io_loop.make_current()
for req_channel in self.req_channels:
req_channel.post_fork(self._handle_payload, io_loop=self.io_loop) # TODO: cleaner? Maybe lazily?

View file

@ -30,23 +30,10 @@ if six.PY3:
else:
import salt.ext.ipaddress as ipaddress
from salt.ext.six.moves import range
from salt.utils.zeromq import zmq, ZMQDefaultLoop, install_zmq, ZMQ_VERSION_INFO
# pylint: enable=no-name-in-module,redefined-builtin
from salt.utils.async import LOOP_CLASS
# Import third party libs
try:
import zmq
# TODO: cleanup
import zmq.eventloop.ioloop
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
import tornado
TORNADO_50 = tornado.version_info >= (5,)
HAS_RANGE = False
try:
@ -620,7 +607,7 @@ class MinionBase(object):
if self.opts['transport'] == 'detect':
self.opts['detect_mode'] = True
for trans in ('zeromq', 'tcp'):
if trans == 'zeromq' and not HAS_ZMQ:
if trans == 'zeromq' and not zmq:
continue
self.opts['transport'] = trans
pub_channel = salt.transport.client.AsyncPubChannel.factory(self.opts, **factory_kwargs)
@ -657,10 +644,8 @@ class SMinion(MinionBase):
# Clean out the proc directory (default /var/cache/salt/minion/proc)
if (self.opts.get('file_client', 'remote') == 'remote'
or self.opts.get('use_master_when_local', False)):
if self.opts['transport'] == 'zeromq' and HAS_ZMQ and not TORNADO_50:
io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
else:
io_loop = LOOP_CLASS.current()
install_zmq()
io_loop = ZMQDefaultLoop.current()
io_loop.run_sync(
lambda: self.eval_master(self.opts, failed=True)
)
@ -806,9 +791,8 @@ class MinionManager(MinionBase):
self.minions = []
self.jid_queue = []
if HAS_ZMQ and not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS.current()
install_zmq()
self.io_loop = ZMQDefaultLoop.current()
self.process_manager = ProcessManager(name='MultiMinionProcessManager')
self.io_loop.spawn_callback(self.process_manager.run, async=True)
@ -955,23 +939,14 @@ class Minion(MinionBase):
self.periodic_callbacks = {}
if io_loop is None:
if HAS_ZMQ and not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS.current()
install_zmq()
self.io_loop = ZMQDefaultLoop.current()
else:
self.io_loop = io_loop
# Warn if ZMQ < 3.2
if HAS_ZMQ:
try:
zmq_version_info = zmq.zmq_version_info()
except AttributeError:
# PyZMQ <= 2.1.9 does not have zmq_version_info, fall back to
# using zmq.zmq_version() and build a version info tuple.
zmq_version_info = tuple(
[int(x) for x in zmq.zmq_version().split('.')] # pylint: disable=no-member
)
if zmq_version_info < (3, 2):
if zmq:
if ZMQ_VERSION_INFO < (3, 2):
log.warning(
'You have a version of ZMQ less than ZMQ 3.2! There are '
'known connection keep-alive issues with ZMQ < 3.2 which '
@ -2636,9 +2611,8 @@ class SyndicManager(MinionBase):
self.jid_forward_cache = set()
if io_loop is None:
if HAS_ZMQ and not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS.current()
install_zmq()
self.io_loop = ZMQDefaultLoop.current()
else:
self.io_loop = io_loop

View file

@ -2302,10 +2302,10 @@ def create_route(route_table_id=None, destination_cidr_block=None,
'must be provided.')
if not _exactly_one((gateway_id, internet_gateway_name, instance_id, interface_id, vpc_peering_connection_id,
nat_gateway_id, nat_gateway_subnet_id, nat_gateway_subnet_name)):
nat_gateway_id, nat_gateway_subnet_id, nat_gateway_subnet_name, vpc_peering_connection_name)):
raise SaltInvocationError('Only one of gateway_id, internet_gateway_name, instance_id, '
'interface_id, vpc_peering_connection_id, nat_gateway_id, '
'nat_gateway_subnet_id or nat_gateway_subnet_name may be provided.')
'nat_gateway_subnet_id, nat_gateway_subnet_name or vpc_peering_connection_name may be provided.')
if destination_cidr_block is None:
raise SaltInvocationError('destination_cidr_block is required.')

View file

@ -453,8 +453,8 @@ def delval(key, destructive=False):
.. versionadded:: 0.17.0
Delete a grain value from the grains config file. This will just set the
grain value to `None`. To completely remove the grain run `grains.delkey`
of pass `destructive=True` to `grains.delval`.
grain value to ``None``. To completely remove the grain, run ``grains.delkey``
or pass ``destructive=True`` to ``grains.delval``.
key
The grain key from which to delete the value.

View file

@ -3679,7 +3679,7 @@ def _checkAllAdmxPolicies(policy_class,
if ENABLED_VALUE_XPATH(admx_policy) and this_policy_setting == 'Not Configured':
# some policies have a disabled list but not an enabled list
# added this to address those issues
if DISABLED_LIST_XPATH(admx_policy):
if DISABLED_LIST_XPATH(admx_policy) or DISABLED_VALUE_XPATH(admx_policy):
element_only_enabled_disabled = False
explicit_enable_disable_value_setting = True
if _checkValueItemParent(admx_policy,
@ -3689,14 +3689,14 @@ def _checkAllAdmxPolicies(policy_class,
ENABLED_VALUE_XPATH,
policy_filedata):
this_policy_setting = 'Enabled'
log.debug('{0} is enabled'.format(this_policyname))
log.debug('{0} is enabled by detected ENABLED_VALUE_XPATH'.format(this_policyname))
if this_policynamespace not in policy_vals:
policy_vals[this_policynamespace] = {}
policy_vals[this_policynamespace][this_policyname] = this_policy_setting
if DISABLED_VALUE_XPATH(admx_policy) and this_policy_setting == 'Not Configured':
# some policies have a disabled list but not an enabled list
# added this to address those issues
if ENABLED_LIST_XPATH(admx_policy):
if ENABLED_LIST_XPATH(admx_policy) or ENABLED_VALUE_XPATH(admx_policy):
element_only_enabled_disabled = False
explicit_enable_disable_value_setting = True
if _checkValueItemParent(admx_policy,
@ -3706,25 +3706,27 @@ def _checkAllAdmxPolicies(policy_class,
DISABLED_VALUE_XPATH,
policy_filedata):
this_policy_setting = 'Disabled'
log.debug('{0} is disabled'.format(this_policyname))
log.debug('{0} is disabled by detected DISABLED_VALUE_XPATH'.format(this_policyname))
if this_policynamespace not in policy_vals:
policy_vals[this_policynamespace] = {}
policy_vals[this_policynamespace][this_policyname] = this_policy_setting
if ENABLED_LIST_XPATH(admx_policy) and this_policy_setting == 'Not Configured':
element_only_enabled_disabled = False
explicit_enable_disable_value_setting = True
if DISABLED_LIST_XPATH(admx_policy) or DISABLED_VALUE_XPATH(admx_policy):
element_only_enabled_disabled = False
explicit_enable_disable_value_setting = True
if _checkListItem(admx_policy, this_policyname, this_key, ENABLED_LIST_XPATH, policy_filedata):
this_policy_setting = 'Enabled'
log.debug('{0} is enabled'.format(this_policyname))
log.debug('{0} is enabled by detected ENABLED_LIST_XPATH'.format(this_policyname))
if this_policynamespace not in policy_vals:
policy_vals[this_policynamespace] = {}
policy_vals[this_policynamespace][this_policyname] = this_policy_setting
if DISABLED_LIST_XPATH(admx_policy) and this_policy_setting == 'Not Configured':
element_only_enabled_disabled = False
explicit_enable_disable_value_setting = True
if ENABLED_LIST_XPATH(admx_policy) or ENABLED_VALUE_XPATH(admx_policy):
element_only_enabled_disabled = False
explicit_enable_disable_value_setting = True
if _checkListItem(admx_policy, this_policyname, this_key, DISABLED_LIST_XPATH, policy_filedata):
this_policy_setting = 'Disabled'
log.debug('{0} is disabled'.format(this_policyname))
log.debug('{0} is disabled by detected DISABLED_LIST_XPATH'.format(this_policyname))
if this_policynamespace not in policy_vals:
policy_vals[this_policynamespace] = {}
policy_vals[this_policynamespace][this_policyname] = this_policy_setting
@ -3739,7 +3741,7 @@ def _checkAllAdmxPolicies(policy_class,
'1')),
policy_filedata):
this_policy_setting = 'Enabled'
log.debug('{0} is enabled'.format(this_policyname))
log.debug('{0} is enabled by no explicit enable/disable list or value'.format(this_policyname))
if this_policynamespace not in policy_vals:
policy_vals[this_policynamespace] = {}
policy_vals[this_policynamespace][this_policyname] = this_policy_setting
@ -3750,7 +3752,7 @@ def _checkAllAdmxPolicies(policy_class,
check_deleted=True)),
policy_filedata):
this_policy_setting = 'Disabled'
log.debug('{0} is disabled'.format(this_policyname))
log.debug('{0} is disabled by no explicit enable/disable list or value'.format(this_policyname))
if this_policynamespace not in policy_vals:
policy_vals[this_policynamespace] = {}
policy_vals[this_policynamespace][this_policyname] = this_policy_setting

View file

@ -429,6 +429,9 @@ class BaseSaltAPIHandler(tornado.web.RequestHandler): # pylint: disable=W0223
'runner_async': None, # empty, since we use the same client as `runner`
}
if not hasattr(self, 'ckminions'):
self.ckminions = salt.utils.minions.CkMinions(self.application.opts)
@property
def token(self):
'''
@ -921,7 +924,8 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
chunk['jid'] = salt.utils.jid.gen_jid()
# Subscribe returns from minions before firing a job
future_minion_map = self.subscribe_minion_returns(chunk['jid'], chunk['tgt'])
minions = set(self.ckminions.check_minions(chunk['tgt'], chunk.get('tgt_type', 'glob')))
future_minion_map = self.subscribe_minion_returns(chunk['jid'], minions)
f_call = self._format_call_run_job_async(chunk)
# fire a job off
@ -940,9 +944,9 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
pass
raise tornado.gen.Return('No minions matched the target. No command was sent, no jid was assigned.')
syndic_min_wait = None
# wait syndic a while to avoid missing published events
if self.application.opts['order_masters']:
syndic_min_wait = tornado.gen.sleep(self.application.opts['syndic_wait'])
yield tornado.gen.sleep(self.application.opts['syndic_wait'])
# To ensure job_not_running and all_return are terminated by each other, communicate using a future
is_finished = Future()
@ -952,10 +956,6 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
f_call['kwargs']['tgt_type'],
is_finished)
# if we have a min_wait, do that
if syndic_min_wait is not None:
yield syndic_min_wait
minion_returns_future = self.sanitize_minion_returns(future_minion_map, pub_data['minions'], is_finished)
yield job_not_running_future
@ -992,7 +992,7 @@ class SaltAPIHandler(BaseSaltAPIHandler): # pylint: disable=W0223
chunk_ret = {}
while True:
f = yield Any(future_minion_map.keys() + [is_finished])
f = yield Any(list(future_minion_map.keys()) + [is_finished])
try:
# When finished entire routine, cleanup other futures and return result
if f is is_finished:

View file

@ -153,6 +153,16 @@ def __virtual__():
return 'pkg.install' in __salt__
def _warn_virtual(virtual):
return [
'The following package(s) are "virtual package" names: {0}. These '
'will no longer be supported as of the Fluorine release. Please '
'update your SLS file(s) to use the actual package name.'.format(
', '.join(virtual)
)
]
def _get_comparison_spec(pkgver):
'''
Return a tuple containing the comparison operator and the version. If no
@ -529,6 +539,11 @@ def _find_install_targets(name=None,
was_refreshed = True
refresh = False
def _get_virtual(desired):
return [x for x in desired if cur_pkgs.get(x, []) == ['1']]
virtual_pkgs = []
if any((pkgs, sources)):
if pkgs:
desired = _repack_pkgs(pkgs, normalize=normalize)
@ -546,6 +561,8 @@ def _find_install_targets(name=None,
'comment': 'Invalidly formatted \'{0}\' parameter. See '
'minion log.'.format('pkgs' if pkgs
else 'sources')}
virtual_pkgs = _get_virtual(desired)
to_unpurge = _find_unpurge_targets(desired)
else:
if salt.utils.is_windows():
@ -566,6 +583,7 @@ def _find_install_targets(name=None,
else:
desired = {name: version}
virtual_pkgs = _get_virtual(desired)
to_unpurge = _find_unpurge_targets(desired)
# FreeBSD pkg supports `openjdk` and `java/openjdk7` package names
@ -582,22 +600,28 @@ def _find_install_targets(name=None,
and not reinstall \
and not pkg_verify:
# The package is installed and is the correct version
return {'name': name,
'changes': {},
'result': True,
'comment': 'Version {0} of package \'{1}\' is already '
'installed'.format(version, name)}
ret = {'name': name,
'changes': {},
'result': True,
'comment': 'Version {0} of package \'{1}\' is already '
'installed'.format(version, name)}
if virtual_pkgs:
ret['warnings'] = _warn_virtual(virtual_pkgs)
return ret
# if cver is not an empty string, the package is already installed
elif cver and version is None \
and not reinstall \
and not pkg_verify:
# The package is installed
return {'name': name,
'changes': {},
'result': True,
'comment': 'Package {0} is already '
'installed'.format(name)}
ret = {'name': name,
'changes': {},
'result': True,
'comment': 'Package {0} is already '
'installed'.format(name)}
if virtual_pkgs:
ret['warnings'] = _warn_virtual(virtual_pkgs)
return ret
version_spec = False
if not sources:
@ -635,10 +659,13 @@ def _find_install_targets(name=None,
if comments:
if len(comments) > 1:
comments.append('')
return {'name': name,
'changes': {},
'result': False,
'comment': '. '.join(comments).rstrip()}
ret = {'name': name,
'changes': {},
'result': False,
'comment': '. '.join(comments).rstrip()}
if virtual_pkgs:
ret['warnings'] = _warn_virtual(virtual_pkgs)
return ret
# Resolve the latest package version for any packages with "latest" in the
# package version
@ -770,10 +797,13 @@ def _find_install_targets(name=None,
problems.append(failed_verify)
if problems:
return {'name': name,
'changes': {},
'result': False,
'comment': ' '.join(problems)}
ret = {'name': name,
'changes': {},
'result': False,
'comment': ' '.join(problems)}
if virtual_pkgs:
ret['warnings'] = _warn_virtual(virtual_pkgs)
return ret
if not any((targets, to_unpurge, to_reinstall)):
# All specified packages are installed
@ -788,6 +818,8 @@ def _find_install_targets(name=None,
'comment': msg}
if warnings:
ret.setdefault('warnings', []).extend(warnings)
if virtual_pkgs:
ret.setdefault('warnings', []).extend(_warn_virtual(virtual_pkgs))
return ret
return (desired, targets, to_unpurge, to_reinstall, altered_files,
@ -1720,6 +1752,25 @@ def installed(
and x not in to_reinstall]
failed = [x for x in failed if x in targets]
# Check for virtual packages in list of desired packages
if not sources:
try:
virtual_pkgs = []
for pkgname in [next(iter(x)) for x in pkgs] if pkgs else [name]:
cver = new_pkgs.get(pkgname, [])
if '1' in cver:
virtual_pkgs.append(pkgname)
if virtual_pkgs:
warnings.extend(_warn_virtual(virtual_pkgs))
except Exception:
# This is just some temporary code to warn the user about using
# virtual packages. Don't let an exception break the entire
# state.
log.debug(
'Failed to detect virtual packages after running '
'pkg.install', exc_info=True
)
# If there was nothing unpurged, just set the changes dict to the contents
# of changes['installed'].
if not changes.get('purge_desired'):

View file

@ -30,13 +30,11 @@ import salt.transport.server
import salt.transport.mixins.auth
from salt.exceptions import SaltReqTimeoutError
import zmq
from salt.utils.zeromq import zmq, ZMQDefaultLoop, install_zmq, ZMQ_VERSION_INFO, LIBZMQ_VERSION_INFO
import zmq.error
import zmq.eventloop.ioloop
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
import zmq.eventloop.zmqstream
try:
import zmq.utils.monitor
HAS_ZMQ_MONITOR = True
@ -61,6 +59,42 @@ except ImportError:
log = logging.getLogger(__name__)
def _get_master_uri(master_ip,
master_port,
source_ip=None,
source_port=None):
'''
Return the ZeroMQ URI to connect the Minion to the Master.
It supports different source IP / port, given the ZeroMQ syntax:
// Connecting using a IP address and bind to an IP address
rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); assert (rc == 0);
Source: http://api.zeromq.org/4-1:zmq-tcp
'''
if LIBZMQ_VERSION_INFO >= (4, 1, 6) and ZMQ_VERSION_INFO >= (16, 0, 1):
# The source:port syntax for ZeroMQ has been added in libzmq 4.1.6
# which is included in the pyzmq wheels starting with 16.0.1.
if source_ip or source_port:
if source_ip and source_port:
return 'tcp://{source_ip}:{source_port};{master_ip}:{master_port}'.format(
source_ip=source_ip, source_port=source_port,
master_ip=master_ip, master_port=master_port)
elif source_ip and not source_port:
return 'tcp://{source_ip}:0;{master_ip}:{master_port}'.format(
source_ip=source_ip,
master_ip=master_ip, master_port=master_port)
elif not source_ip and source_port:
return 'tcp://0.0.0.0:{source_port};{master_ip}:{master_port}'.format(
source_port=source_port,
master_ip=master_ip, master_port=master_port)
if source_ip or source_port:
log.warning('Unable to connect to the Master using a specific source IP / port')
log.warning('Consider upgrading to pyzmq >= 16.0.1 and libzmq >= 4.1.6')
return 'tcp://{master_ip}:{master_port}'.format(
master_ip=master_ip, master_port=master_port)
class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
'''
Encapsulate sending routines to ZeroMQ.
@ -79,9 +113,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
# do we have any mapping for this io_loop
io_loop = kwargs.get('io_loop')
if io_loop is None:
if not TORNADO_50:
zmq.eventloop.ioloop.install()
io_loop = tornado.ioloop.IOLoop.current()
install_zmq()
io_loop = ZMQDefaultLoop.current()
if io_loop not in cls.instance_map:
cls.instance_map[io_loop] = weakref.WeakValueDictionary()
loop_instance_map = cls.instance_map[io_loop]
@ -96,7 +129,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
obj = object.__new__(cls)
obj.__singleton_init__(opts, **kwargs)
loop_instance_map[key] = obj
log.trace('Inserted key into loop_instance_map id {0} for key {1} and process {2}'.format(id(loop_instance_map), key, os.getpid()))
log.trace('Inserted key into loop_instance_map id %s for key %s and process %s',
id(loop_instance_map), key, os.getpid())
else:
log.debug('Re-using AsyncZeroMQReqChannel for {0}'.format(key))
return obj
@ -148,9 +182,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
self._io_loop = kwargs.get('io_loop')
if self._io_loop is None:
if not TORNADO_50:
zmq.eventloop.ioloop.install()
self._io_loop = tornado.ioloop.IOLoop.current()
install_zmq()
self._io_loop = ZMQDefaultLoop.current()
if self.crypt != 'clear':
# we don't need to worry about auth as a kwarg, since its a singleton
@ -290,19 +323,14 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
**kwargs):
self.opts = opts
self.ttype = 'zeromq'
self.io_loop = kwargs.get('io_loop')
if self.io_loop is None:
if not TORNADO_50:
zmq.eventloop.ioloop.install()
self.io_loop = tornado.ioloop.IOLoop.current()
self.hexid = hashlib.sha1(six.b(self.opts['id'])).hexdigest()
install_zmq()
self.io_loop = ZMQDefaultLoop.current()
self.hexid = hashlib.sha1(salt.utils.to_bytes(self.opts['id'])).hexdigest()
self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)
self.serial = salt.payload.Serial(self.opts)
self.context = zmq.Context()
self._socket = self.context.socket(zmq.SUB)
@ -334,8 +362,7 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
if self.opts['recon_randomize']:
recon_delay = randint(self.opts['recon_default'],
self.opts['recon_default'] + self.opts['recon_max']
)
self.opts['recon_default'] + self.opts['recon_max'])
log.debug("Generated random reconnect delay between '{0}ms' and '{1}ms' ({2})".format(
self.opts['recon_default'],
@ -449,7 +476,8 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
return self.stream.on_recv(wrap_callback)
class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel):
class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin,
salt.transport.server.ReqServerChannel):
def __init__(self, opts):
salt.transport.server.ReqServerChannel.__init__(self, opts)
@ -469,13 +497,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
# IPv6 sockets work for both IPv6 and IPv4 addresses
self.clients.setsockopt(zmq.IPV4ONLY, 0)
self.clients.setsockopt(zmq.BACKLOG, self.opts.get('zmq_backlog', 1000))
if HAS_ZMQ_MONITOR and self.opts['zmq_monitor']:
# Socket monitor shall be used the only for debug purposes so using threading doesn't look too bad here
import threading
self._monitor = ZeroMQSocketMonitor(self.clients)
t = threading.Thread(target=self._monitor.start_poll)
t.start()
self._start_zmq_monitor()
self.workers = self.context.socket(zmq.DEALER)
if self.opts.get('ipc_mode', '') == 'tcp':
@ -489,7 +511,6 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
log.info('Setting up the master communication server')
self.clients.bind(self.uri)
self.workers.bind(self.w_uri)
while True:
@ -512,10 +533,11 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
return
log.info('MWorkerQueue under PID %s is closing', os.getpid())
self._closing = True
if hasattr(self, '_monitor') and self._monitor is not None:
# pylint: disable=E0203
if getattr(self, '_monitor', None) is not None:
self._monitor.stop()
self._monitor = None
if hasattr(self, '_w_monitor') and self._w_monitor is not None:
if getattr(self, '_w_monitor', None) is not None:
self._w_monitor.stop()
self._w_monitor = None
if hasattr(self, 'clients') and self.clients.closed is False:
@ -528,6 +550,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
self._socket.close()
if hasattr(self, 'context') and self.context.closed is False:
self.context.term()
# pylint: enable=E0203
def pre_fork(self, process_manager):
'''
@ -538,6 +561,21 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
salt.transport.mixins.auth.AESReqServerMixin.pre_fork(self, process_manager)
process_manager.add_process(self.zmq_device)
def _start_zmq_monitor(self):
'''
Starts ZMQ monitor for debugging purposes.
:return:
'''
# Socket monitor shall be used the only for debug
# purposes so using threading doesn't look too bad here
if HAS_ZMQ_MONITOR and self.opts['zmq_monitor']:
log.debug('Starting ZMQ monitor')
import threading
self._w_monitor = ZeroMQSocketMonitor(self._socket)
threading.Thread(target=self._w_monitor.start_poll).start()
log.debug('ZMQ monitor has been started started')
def post_fork(self, payload_handler, io_loop):
'''
After forking we need to create all of the local sockets to listen to the
@ -552,12 +590,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
self.context = zmq.Context(1)
self._socket = self.context.socket(zmq.REP)
if HAS_ZMQ_MONITOR and self.opts['zmq_monitor']:
# Socket monitor shall be used the only for debug purposes so using threading doesn't look too bad here
import threading
self._w_monitor = ZeroMQSocketMonitor(self._socket)
t = threading.Thread(target=self._w_monitor.start_poll)
t.start()
self._start_zmq_monitor()
if self.opts.get('ipc_mode', '') == 'tcp':
self.w_uri = 'tcp://127.0.0.1:{0}'.format(
@ -763,27 +796,35 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
# Catch and handle EINTR from when this process is sent
# SIGUSR1 gracefully so we don't choke and die horribly
try:
log.trace('Getting data from puller %s', pull_uri)
package = pull_sock.recv()
unpacked_package = salt.payload.unpackage(package)
if six.PY3:
unpacked_package = salt.transport.frame.decode_embedded_strs(unpacked_package)
payload = unpacked_package['payload']
log.trace('Accepted unpacked package from puller')
if self.opts['zmq_filtering']:
# if you have a specific topic list, use that
if 'topic_lst' in unpacked_package:
for topic in unpacked_package['topic_lst']:
log.trace('Sending filtered data over publisher %s', pub_uri)
# zmq filters are substring match, hash the topic
# to avoid collisions
htopic = hashlib.sha1(topic).hexdigest()
pub_sock.send(htopic, flags=zmq.SNDMORE)
pub_sock.send(payload)
log.trace('Filtered data has been sent')
# otherwise its a broadcast
else:
# TODO: constants file for "broadcast"
log.trace('Sending broadcasted data over publisher %s', pub_uri)
pub_sock.send('broadcast', flags=zmq.SNDMORE)
pub_sock.send(payload)
log.trace('Broadcasted data has been sent')
else:
log.trace('Sending ZMQ-unfiltered data over publisher %s', pub_uri)
pub_sock.send(payload)
log.trace('Unfiltered data has been sent')
except zmq.ZMQError as exc:
if exc.errno == errno.EINTR:
continue
@ -901,14 +942,12 @@ class AsyncReqMessageClient(object):
self.addr = addr
self.linger = linger
if io_loop is None:
if not TORNADO_50:
zmq.eventloop.ioloop.install()
tornado.ioloop.IOLoop.current()
install_zmq()
ZMQDefaultLoop.current()
else:
self.io_loop = io_loop
self.serial = salt.payload.Serial(self.opts)
self.context = zmq.Context()
# wire up sockets
@ -987,7 +1026,8 @@ class AsyncReqMessageClient(object):
try:
ret = yield future
except: # pylint: disable=W0702
except Exception as err: # pylint: disable=W0702
log.debug('Re-init ZMQ socket: %s', err)
self._init_socket() # re-init the zmq socket (no other way in zmq)
del self.send_queue[0]
continue

View file

@ -7,26 +7,9 @@ from __future__ import absolute_import
import tornado.ioloop
import tornado.concurrent
# attempt to use zmq-- if we have it otherwise fallback to tornado loop
try:
import zmq.eventloop.ioloop
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
import tornado
TORNADO_50 = tornado.version_info >= (5,)
if HAS_ZMQ and not TORNADO_50:
LOOP_CLASS = zmq.eventloop.ioloop.ZMQIOLoop
else:
import tornado.ioloop
LOOP_CLASS = tornado.ioloop.IOLoop
import contextlib
from salt.utils import zeromq
@contextlib.contextmanager
@ -60,7 +43,7 @@ class SyncWrapper(object):
if kwargs is None:
kwargs = {}
self.io_loop = LOOP_CLASS()
self.io_loop = zeromq.ZMQDefaultLoop()
kwargs['io_loop'] = self.io_loop
with current_ioloop(self.io_loop):

View file

@ -18,11 +18,7 @@ import salt.utils.dictupdate
# Import third party libs
from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
from salt.utils.zeromq import zmq
log = logging.getLogger(__name__)

View file

@ -29,12 +29,8 @@ from salt.utils.cache import CacheCli as cache_cli
from salt.utils.process import MultiprocessingProcess
# Import third party libs
import salt.ext.six as six
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
from salt.ext import six
from salt.utils.zeromq import zmq
log = logging.getLogger(__name__)

View file

@ -908,7 +908,7 @@ class SaltNova(object):
try:
ret[item.name] = {
'id': item.id,
'status': 'Running'
'state': 'Running'
}
except TypeError:
pass

View file

@ -889,6 +889,7 @@ class Schedule(object):
for global_key, value in six.iteritems(func_globals):
self.functions[mod_name].__globals__[global_key] = value
self.functions.pack['__context__']['retcode'] = 0
ret['return'] = self.functions[func](*args, **kwargs)
# runners do not provide retcode

View file

@ -3,21 +3,60 @@
# Import Python libs
from __future__ import absolute_import
# Import Salt libs
import logging
import tornado.ioloop
from salt.exceptions import SaltSystemExit
# Import 3rd-party libs
log = logging.getLogger(__name__)
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
zmq = None
log.debug('ZMQ module is not found')
ZMQDefaultLoop = None
ZMQ_VERSION_INFO = (-1, -1, -1)
LIBZMQ_VERSION_INFO = (-1, -1, -1)
try:
if zmq:
ZMQ_VERSION_INFO = tuple([int(v_el) for v_el in zmq.__version__.split('.')])
LIBZMQ_VERSION_INFO = tuple([int(v_el) for v_el in zmq.zmq_version().split('.')])
if ZMQ_VERSION_INFO[0] > 16: # 17.0.x+ deprecates zmq's ioloops
ZMQDefaultLoop = tornado.ioloop.IOLoop
except Exception:
log.exception('Error while getting LibZMQ/PyZMQ library version')
if ZMQDefaultLoop is None:
try:
import zmq.eventloop.ioloop
# Support for ZeroMQ 13.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
if tornado.version_info < (5,):
ZMQDefaultLoop = zmq.eventloop.ioloop.ZMQIOLoop
except ImportError:
ZMQDefaultLoop = None
if ZMQDefaultLoop is None:
ZMQDefaultLoop = tornado.ioloop.IOLoop
def install_zmq():
'''
While pyzmq 17 no longer needs any special integration for tornado,
older version still need one.
:return:
'''
if zmq and ZMQ_VERSION_INFO[0] < 17:
if tornado.version_info < (5,):
zmq.eventloop.ioloop.install()
def check_ipc_path_max_len(uri):
# The socket path is limited to 107 characters on Solaris and
# Linux, and 103 characters on BSD-based systems.
if not HAS_ZMQ:
if zmq is None:
return
ipc_path_max_len = getattr(zmq, 'IPC_PATH_MAX_LEN', 103)
if ipc_path_max_len and len(uri) > ipc_path_max_len:

View file

@ -6,6 +6,7 @@ from __future__ import absolute_import
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.helpers import destructiveTest
from tests.support.unit import skipIf
# Import Salt libs
import salt.utils
@ -30,10 +31,32 @@ class ServiceModuleTest(ModuleCase):
self.service_name = 'org.ntp.ntpd'
if int(os_release.split('.')[1]) >= 13:
self.service_name = 'com.apple.AirPlayXPCHelper'
elif salt.utils.is_windows():
self.service_name = 'Spooler'
if salt.utils.which(cmd_name) is None:
self.pre_srv_status = self.run_function('service.status', [self.service_name])
self.pre_srv_enabled = True if self.service_name in self.run_function('service.get_enabled') else False
if salt.utils.which(cmd_name) is None and not salt.utils.is_windows():
self.skipTest('{0} is not installed'.format(cmd_name))
def tearDown(self):
post_srv_status = self.run_function('service.status', [self.service_name])
post_srv_enabled = True if self.service_name in self.run_function('service.get_enabled') else False
if post_srv_status != self.pre_srv_status:
if self.pre_srv_status:
self.run_function('service.enable', [self.service_name])
else:
self.run_function('service.disable', [self.service_name])
if post_srv_enabled != self.pre_srv_enabled:
if self.pre_srv_enabled:
self.run_function('service.enable', [self.service_name])
else:
self.run_function('service.disable', [self.service_name])
del self.service_name
def test_service_status_running(self):
'''
test service.status execution module
@ -53,3 +76,37 @@ class ServiceModuleTest(ModuleCase):
check_service = self.run_function('service.status', [self.service_name])
self.assertFalse(check_service)
def test_service_restart(self):
'''
test service.restart
'''
self.assertTrue(self.run_function('service.restart', [self.service_name]))
def test_service_enable(self):
'''
test service.get_enabled and service.enable module
'''
# disable service before test
self.assertTrue(self.run_function('service.disable', [self.service_name]))
self.assertTrue(self.run_function('service.enable', [self.service_name]))
self.assertIn(self.service_name, self.run_function('service.get_enabled'))
def test_service_disable(self):
'''
test service.get_disabled and service.disable module
'''
# enable service before test
self.assertTrue(self.run_function('service.enable', [self.service_name]))
self.assertTrue(self.run_function('service.disable', [self.service_name]))
self.assertIn(self.service_name, self.run_function('service.get_disabled'))
@skipIf(not salt.utils.is_windows(), 'Windows Only Test')
def test_service_get_service_name(self):
'''
test service.get_service_name
'''
ret = self.run_function('service.get_service_name')
self.assertIn(self.service_name, ret.values())

View file

@ -18,13 +18,9 @@ from tests.support.helpers import flaky
from tests.support.unit import skipIf
# Import 3rd-party libs
import salt.ext.six as six
try:
import zmq
from zmq.eventloop.ioloop import ZMQIOLoop
HAS_ZMQ_IOLOOP = True
except ImportError:
HAS_ZMQ_IOLOOP = False
from salt.ext import six
from salt.utils.zeromq import zmq, ZMQDefaultLoop as ZMQIOLoop
HAS_ZMQ_IOLOOP = bool(zmq)
def json_loads(data):
@ -53,6 +49,7 @@ class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):
application = self.build_tornado_app(urls)
application.event_listener = saltnado.EventListener({}, self.opts)
self.application = application
return application
def test_root(self):
@ -89,8 +86,6 @@ class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):
self.assertEqual(response.code, 302)
self.assertEqual(response.headers['Location'], '/login')
# Local client tests
@skipIf(True, 'to be re-enabled when #23623 is merged')
def test_simple_local_post(self):
'''
Test a basic API of /
@ -108,7 +103,8 @@ class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):
request_timeout=30,
)
response_obj = json_loads(response.body)
self.assertEqual(response_obj['return'], [{'minion': True, 'sub_minion': True}])
self.assertEqual(len(response_obj['return']), 1)
self.assertEqual(response_obj['return'][0], {'minion': True, 'sub_minion': True})
def test_simple_local_post_no_tgt(self):
'''
@ -129,8 +125,6 @@ class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):
response_obj = json_loads(response.body)
self.assertEqual(response_obj['return'], ["No minions matched the target. No command was sent, no jid was assigned."])
# local client request body test
@skipIf(True, 'Undetermined race condition in test. Temporarily disabled.')
def test_simple_local_post_only_dictionary_request(self):
'''
Test a basic API of /
@ -148,7 +142,8 @@ class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):
request_timeout=30,
)
response_obj = json_loads(response.body)
self.assertEqual(response_obj['return'], [{'minion': True, 'sub_minion': True}])
self.assertEqual(len(response_obj['return']), 1)
self.assertEqual(response_obj['return'][0], {'minion': True, 'sub_minion': True})
def test_simple_local_post_invalid_request(self):
'''
@ -263,6 +258,28 @@ class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):
response_obj = json_loads(response.body)
self.assertEqual(response_obj['return'], [{}])
def test_simple_local_post_only_dictionary_request_with_order_masters(self):
'''
Test a basic API of /
'''
low = {'client': 'local',
'tgt': '*',
'fun': 'test.ping',
}
response = self.fetch('/',
method='POST',
body=salt.utils.json.dumps(low),
headers={'Content-Type': self.content_type_map['json'],
saltnado.AUTH_TOKEN_HEADER: self.token['token']},
connect_timeout=30,
request_timeout=30,
)
response_obj = salt.utils.json.loads(response.body)
self.application.opts['order_masters'] = []
self.application.opts['syndic_wait'] = 5
self.assertEqual(response_obj['return'], [{'minion': True, 'sub_minion': True}])
# runner tests
def test_simple_local_runner_post(self):
low = [{'client': 'runner',

View file

@ -21,6 +21,7 @@ integration.modules.test_ntp
integration.modules.test_pillar
integration.modules.test_pkg
integration.modules.test_publish
integration.modules.test_service
integration.modules.test_state
integration.modules.test_status
integration.modules.test_sysmod