mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge branch '2017.7' into '2018.3'
Conflicts: - salt/state.py - salt/utils/files.py - tests/integration/modules/test_pip.py
This commit is contained in:
commit
590c7fc13f
11 changed files with 181 additions and 65 deletions
|
@ -372,7 +372,12 @@ def facts(puppet=False):
|
|||
'''
|
||||
ret = {}
|
||||
opt_puppet = '--puppet' if puppet else ''
|
||||
output = __salt__['cmd.run']('facter {0}'.format(opt_puppet))
|
||||
cmd_ret = __salt__['cmd.run_all']('facter {0}'.format(opt_puppet))
|
||||
|
||||
if cmd_ret['retcode'] != 0:
|
||||
raise CommandExecutionError(cmd_ret['stderr'])
|
||||
|
||||
output = cmd_ret['stdout']
|
||||
|
||||
# Loop over the facter output and properly
|
||||
# parse it into a nice dictionary for using
|
||||
|
@ -398,9 +403,13 @@ def fact(name, puppet=False):
|
|||
salt '*' puppet.fact kernel
|
||||
'''
|
||||
opt_puppet = '--puppet' if puppet else ''
|
||||
ret = __salt__['cmd.run'](
|
||||
ret = __salt__['cmd.run_all'](
|
||||
'facter {0} {1}'.format(opt_puppet, name),
|
||||
python_shell=False)
|
||||
if not ret:
|
||||
|
||||
if ret['retcode'] != 0:
|
||||
raise CommandExecutionError(ret['stderr'])
|
||||
|
||||
if not ret['stdout']:
|
||||
return ''
|
||||
return ret
|
||||
return ret['stdout']
|
||||
|
|
|
@ -1504,6 +1504,9 @@ def runner(name, arg=None, kwarg=None, full_return=False, saltenv='base', jid=No
|
|||
if 'saltenv' in aspec.args:
|
||||
kwarg['saltenv'] = saltenv
|
||||
|
||||
if name in ['state.orchestrate', 'state.orch', 'state.sls']:
|
||||
kwarg['orchestration_jid'] = jid
|
||||
|
||||
if jid:
|
||||
salt.utils.event.fire_args(
|
||||
__opts__,
|
||||
|
|
|
@ -48,9 +48,10 @@ def set_servers(*servers):
|
|||
update_cmd = ['W32tm', '/config', '/update']
|
||||
|
||||
for cmd in server_cmd, reliable_cmd, update_cmd:
|
||||
ret = __salt__['cmd.run'](cmd, python_shell=False)
|
||||
if 'command completed successfully' not in ret:
|
||||
return False
|
||||
__salt__['cmd.run'](cmd, python_shell=False)
|
||||
|
||||
if not sorted(list(servers)) == get_servers():
|
||||
return False
|
||||
|
||||
__salt__['service.restart'](service_name)
|
||||
return True
|
||||
|
@ -71,7 +72,7 @@ def get_servers():
|
|||
for line in lines:
|
||||
try:
|
||||
if line.startswith('NtpServer:'):
|
||||
_, ntpsvrs = line.rstrip(' (Local)').split(':', 1)
|
||||
_, ntpsvrs = line.rsplit(' (', 1)[0].split(':', 1)
|
||||
return sorted(ntpsvrs.split())
|
||||
except ValueError as e:
|
||||
return False
|
||||
|
|
|
@ -232,7 +232,7 @@ class Runner(RunnerClient):
|
|||
else:
|
||||
user = salt.utils.user.get_specific_user()
|
||||
|
||||
if low['fun'] in ('state.orchestrate', 'state.orch'):
|
||||
if low['fun'] in ['state.orchestrate', 'state.orch', 'state.sls']:
|
||||
low['kwarg']['orchestration_jid'] = async_pub['jid']
|
||||
|
||||
# Run the runner!
|
||||
|
|
|
@ -103,6 +103,7 @@ def orchestrate(mods,
|
|||
saltenv=saltenv,
|
||||
pillarenv=pillarenv,
|
||||
pillar_enc=pillar_enc,
|
||||
__pub_jid=orchestration_jid,
|
||||
orchestration_jid=orchestration_jid)
|
||||
ret = {'data': {minion.opts['id']: running}, 'outputter': 'highstate'}
|
||||
res = __utils__['state.check_result'](ret['data'])
|
||||
|
|
|
@ -42,6 +42,7 @@ import salt.utils.platform
|
|||
import salt.utils.process
|
||||
import salt.utils.url
|
||||
import salt.syspaths as syspaths
|
||||
from salt.serializers.msgpack import serialize as msgpack_serialize, deserialize as msgpack_deserialize
|
||||
from salt.template import compile_template, compile_template_str
|
||||
from salt.exceptions import (
|
||||
SaltRenderError,
|
||||
|
@ -57,7 +58,6 @@ import salt.utils.yamlloader as yamlloader
|
|||
from salt.ext import six
|
||||
from salt.ext.six.moves import map, range, reload_module
|
||||
# pylint: enable=import-error,no-name-in-module,redefined-builtin
|
||||
import msgpack
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -179,7 +179,7 @@ def _calculate_fake_duration():
|
|||
start_time = local_start_time.time().isoformat()
|
||||
delta = (utc_finish_time - utc_start_time)
|
||||
# duration in milliseconds.microseconds
|
||||
duration = (delta.seconds * 1000000 + delta.microseconds)/1000.0
|
||||
duration = (delta.seconds * 1000000 + delta.microseconds) / 1000.0
|
||||
|
||||
return start_time, duration
|
||||
|
||||
|
@ -1710,27 +1710,20 @@ class State(object):
|
|||
errors.extend(req_in_errors)
|
||||
return req_in_high, errors
|
||||
|
||||
def _call_parallel_target(self, cdata, low):
|
||||
def _call_parallel_target(self, name, cdata, low):
|
||||
'''
|
||||
The target function to call that will create the parallel thread/process
|
||||
'''
|
||||
# we need to re-record start/end duration here because it is impossible to
|
||||
# correctly calculate further down the chain
|
||||
utc_start_time = datetime.datetime.utcnow()
|
||||
|
||||
tag = _gen_tag(low)
|
||||
try:
|
||||
ret = self.states[cdata['full']](*cdata['args'],
|
||||
**cdata['kwargs'])
|
||||
except Exception:
|
||||
trb = traceback.format_exc()
|
||||
# There are a number of possibilities to not have the cdata
|
||||
# populated with what we might have expected, so just be smart
|
||||
# enough to not raise another KeyError as the name is easily
|
||||
# guessable and fallback in all cases to present the real
|
||||
# exception to the user
|
||||
if len(cdata['args']) > 0:
|
||||
name = cdata['args'][0]
|
||||
elif 'name' in cdata['kwargs']:
|
||||
name = cdata['kwargs']['name']
|
||||
else:
|
||||
name = low.get('name', low.get('__id__'))
|
||||
ret = {
|
||||
'result': False,
|
||||
'name': name,
|
||||
|
@ -1738,6 +1731,13 @@ class State(object):
|
|||
'comment': 'An exception occurred in this state: {0}'.format(
|
||||
trb)
|
||||
}
|
||||
|
||||
utc_finish_time = datetime.datetime.utcnow()
|
||||
delta = (utc_finish_time - utc_start_time)
|
||||
# duration in milliseconds.microseconds
|
||||
duration = (delta.seconds * 1000000 + delta.microseconds) / 1000.0
|
||||
ret['duration'] = duration
|
||||
|
||||
troot = os.path.join(self.opts['cachedir'], self.jid)
|
||||
tfile = os.path.join(troot, _clean_tag(tag))
|
||||
if not os.path.isdir(troot):
|
||||
|
@ -1748,17 +1748,26 @@ class State(object):
|
|||
# and the attempt, we are safe to pass
|
||||
pass
|
||||
with salt.utils.files.fopen(tfile, 'wb+') as fp_:
|
||||
fp_.write(msgpack.dumps(ret))
|
||||
fp_.write(msgpack_serialize(ret))
|
||||
|
||||
def call_parallel(self, cdata, low):
|
||||
'''
|
||||
Call the state defined in the given cdata in parallel
|
||||
'''
|
||||
# There are a number of possibilities to not have the cdata
|
||||
# populated with what we might have expected, so just be smart
|
||||
# enough to not raise another KeyError as the name is easily
|
||||
# guessable and fallback in all cases to present the real
|
||||
# exception to the user
|
||||
name = (cdata.get('args') or [None])[0] or cdata['kwargs'].get('name')
|
||||
if not name:
|
||||
name = low.get('name', low.get('__id__'))
|
||||
|
||||
proc = salt.utils.process.MultiprocessingProcess(
|
||||
target=self._call_parallel_target,
|
||||
args=(cdata, low))
|
||||
args=(name, cdata, low))
|
||||
proc.start()
|
||||
ret = {'name': cdata['args'][0],
|
||||
ret = {'name': name,
|
||||
'result': None,
|
||||
'changes': {},
|
||||
'comment': 'Started in a separate process',
|
||||
|
@ -1903,12 +1912,10 @@ class State(object):
|
|||
# enough to not raise another KeyError as the name is easily
|
||||
# guessable and fallback in all cases to present the real
|
||||
# exception to the user
|
||||
if len(cdata['args']) > 0:
|
||||
name = cdata['args'][0]
|
||||
elif 'name' in cdata['kwargs']:
|
||||
name = cdata['kwargs']['name']
|
||||
else:
|
||||
name = (cdata.get('args') or [None])[0] or cdata['kwargs'].get('name')
|
||||
if not name:
|
||||
name = low.get('name', low.get('__id__'))
|
||||
|
||||
ret = {
|
||||
'result': False,
|
||||
'name': name,
|
||||
|
@ -1949,7 +1956,7 @@ class State(object):
|
|||
ret['start_time'] = local_start_time.time().isoformat()
|
||||
delta = (utc_finish_time - utc_start_time)
|
||||
# duration in milliseconds.microseconds
|
||||
duration = (delta.seconds * 1000000 + delta.microseconds)/1000.0
|
||||
duration = (delta.seconds * 1000000 + delta.microseconds) / 1000.0
|
||||
ret['duration'] = duration
|
||||
ret['__id__'] = low['__id__']
|
||||
log.info(
|
||||
|
@ -2117,7 +2124,7 @@ class State(object):
|
|||
while True:
|
||||
if self.reconcile_procs(running):
|
||||
break
|
||||
time.sleep(0.01)
|
||||
time.sleep(0.0001)
|
||||
ret = dict(list(disabled.items()) + list(running.items()))
|
||||
return ret
|
||||
|
||||
|
@ -2196,7 +2203,7 @@ class State(object):
|
|||
'changes': {}}
|
||||
try:
|
||||
with salt.utils.files.fopen(ret_cache, 'rb') as fp_:
|
||||
ret = msgpack.loads(fp_.read())
|
||||
ret = msgpack_deserialize(fp_.read())
|
||||
except (OSError, IOError):
|
||||
ret = {'result': False,
|
||||
'comment': 'Parallel cache failure',
|
||||
|
@ -2309,15 +2316,17 @@ class State(object):
|
|||
run_dict = self.pre
|
||||
else:
|
||||
run_dict = running
|
||||
|
||||
while True:
|
||||
if self.reconcile_procs(run_dict):
|
||||
break
|
||||
time.sleep(0.0001)
|
||||
|
||||
for chunk in chunks:
|
||||
tag = _gen_tag(chunk)
|
||||
if tag not in run_dict:
|
||||
req_stats.add('unmet')
|
||||
continue
|
||||
if run_dict[tag].get('proc'):
|
||||
# Run in parallel, first wait for a touch and then recheck
|
||||
time.sleep(0.01)
|
||||
return self.check_requisite(low, running, chunks, pre)
|
||||
if r_state.startswith('onfail'):
|
||||
if run_dict[tag]['result'] is True:
|
||||
req_stats.add('onfail') # At least one state is OK
|
||||
|
|
|
@ -16,7 +16,6 @@ import stat
|
|||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
import urllib
|
||||
|
||||
# Import Salt libs
|
||||
import salt.utils.path
|
||||
|
@ -29,6 +28,7 @@ from salt.utils.decorators.jinja import jinja_filter
|
|||
# Import 3rd-party libs
|
||||
from salt.ext import six
|
||||
from salt.ext.six.moves import range
|
||||
from salt.ext.six.moves.urllib.parse import quote # pylint: disable=no-name-in-module
|
||||
try:
|
||||
import fcntl
|
||||
HAS_FCNTL = True
|
||||
|
@ -583,7 +583,7 @@ def safe_filename_leaf(file_basename):
|
|||
:codeauthor: Damon Atkins <https://github.com/damon-atkins>
|
||||
'''
|
||||
def _replace(re_obj):
|
||||
return urllib.quote(re_obj.group(0), safe='')
|
||||
return quote(re_obj.group(0), safe='')
|
||||
if not isinstance(file_basename, six.text_type):
|
||||
# the following string is not prefixed with u
|
||||
return re.sub('[\\\\:/*?"<>|]',
|
||||
|
|
|
@ -48,7 +48,7 @@ class PipModuleTest(ModuleCase):
|
|||
'''
|
||||
return any(w in ret for w in ['URLError', 'Download error'])
|
||||
|
||||
def pip_successful_install(self, target, expect=('tox', 'pep8',)):
|
||||
def pip_successful_install(self, target, expect=('irc3-plugins-test', 'pep8',)):
|
||||
'''
|
||||
isolate regex for extracting `successful install` message from pip
|
||||
'''
|
||||
|
@ -103,7 +103,7 @@ class PipModuleTest(ModuleCase):
|
|||
with salt.utils.files.fopen(req1_filename, 'w') as f:
|
||||
f.write('-r requirements1b.txt\n')
|
||||
with salt.utils.files.fopen(req1b_filename, 'w') as f:
|
||||
f.write('tox\n')
|
||||
f.write('irc3-plugins-test\n')
|
||||
with salt.utils.files.fopen(req2_filename, 'w') as f:
|
||||
f.write('-r requirements2b.txt\n')
|
||||
with salt.utils.files.fopen(req2b_filename, 'w') as f:
|
||||
|
@ -141,7 +141,7 @@ class PipModuleTest(ModuleCase):
|
|||
with salt.utils.files.fopen(req1_filename, 'w') as f:
|
||||
f.write('-r requirements1b.txt\n')
|
||||
with salt.utils.files.fopen(req1b_filename, 'w') as f:
|
||||
f.write('tox\n')
|
||||
f.write('irc3-plugins-test\n')
|
||||
with salt.utils.files.fopen(req2_filename, 'w') as f:
|
||||
f.write('-r requirements2b.txt\n')
|
||||
with salt.utils.files.fopen(req2b_filename, 'w') as f:
|
||||
|
@ -174,7 +174,7 @@ class PipModuleTest(ModuleCase):
|
|||
req2_filename = os.path.join(self.venv_dir, 'requirements2.txt')
|
||||
|
||||
with salt.utils.files.fopen(req1_filename, 'w') as f:
|
||||
f.write('tox\n')
|
||||
f.write('irc3-plugins-test\n')
|
||||
with salt.utils.files.fopen(req2_filename, 'w') as f:
|
||||
f.write('pep8\n')
|
||||
|
||||
|
@ -211,7 +211,7 @@ class PipModuleTest(ModuleCase):
|
|||
req2_filepath = os.path.join(req_cwd, req2_filename)
|
||||
|
||||
with salt.utils.files.fopen(req1_filepath, 'w') as f:
|
||||
f.write('tox\n')
|
||||
f.write('irc3-plugins-test\n')
|
||||
with salt.utils.files.fopen(req2_filepath, 'w') as f:
|
||||
f.write('pep8\n')
|
||||
|
||||
|
|
|
@ -6,10 +6,12 @@ Tests for the state runner
|
|||
# Import Python Libs
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import errno
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import signal
|
||||
import tempfile
|
||||
import time
|
||||
import textwrap
|
||||
import threading
|
||||
from salt.ext.six.moves import queue
|
||||
|
@ -31,6 +33,8 @@ import salt.utils.yaml
|
|||
# Import 3rd-party libs
|
||||
from salt.ext import six
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StateRunnerTest(ShellCase):
|
||||
'''
|
||||
|
@ -364,3 +368,85 @@ class OrchEventTest(ShellCase):
|
|||
finally:
|
||||
del listener
|
||||
signal.alarm(0)
|
||||
|
||||
def test_parallel_orchestrations(self):
|
||||
'''
|
||||
Test to confirm that the parallel state requisite works in orch
|
||||
we do this by running 10 test.sleep's of 10 seconds, and insure it only takes roughly 10s
|
||||
'''
|
||||
self.write_conf({
|
||||
'fileserver_backend': ['roots'],
|
||||
'file_roots': {
|
||||
'base': [self.base_env],
|
||||
},
|
||||
})
|
||||
|
||||
orch_sls = os.path.join(self.base_env, 'test_par_orch.sls')
|
||||
|
||||
with salt.utils.fopen(orch_sls, 'w') as fp_:
|
||||
fp_.write(textwrap.dedent('''
|
||||
{% for count in range(1, 20) %}
|
||||
|
||||
sleep {{ count }}:
|
||||
module.run:
|
||||
- name: test.sleep
|
||||
- length: 10
|
||||
- parallel: True
|
||||
|
||||
{% endfor %}
|
||||
|
||||
sleep 21:
|
||||
module.run:
|
||||
- name: test.sleep
|
||||
- length: 10
|
||||
- parallel: True
|
||||
- require:
|
||||
- module: sleep 1
|
||||
'''))
|
||||
|
||||
orch_sls = os.path.join(self.base_env, 'test_par_orch.sls')
|
||||
|
||||
listener = salt.utils.event.get_event(
|
||||
'master',
|
||||
sock_dir=self.master_opts['sock_dir'],
|
||||
transport=self.master_opts['transport'],
|
||||
opts=self.master_opts)
|
||||
|
||||
start_time = time.time()
|
||||
jid = self.run_run_plus(
|
||||
'state.orchestrate',
|
||||
'test_par_orch',
|
||||
__reload_config=True).get('jid')
|
||||
|
||||
if jid is None:
|
||||
raise Exception('jid missing from run_run_plus output')
|
||||
|
||||
signal.signal(signal.SIGALRM, self.alarm_handler)
|
||||
signal.alarm(self.timeout)
|
||||
received = False
|
||||
try:
|
||||
while True:
|
||||
event = listener.get_event(full=True)
|
||||
if event is None:
|
||||
continue
|
||||
|
||||
# if we receive the ret for this job before self.timeout (60),
|
||||
# the test is implicitly sucessful; if it were happening in serial it would be
|
||||
# atleast 110 seconds.
|
||||
if event['tag'] == 'salt/run/{0}/ret'.format(jid):
|
||||
received = True
|
||||
# Don't wrap this in a try/except. We want to know if the
|
||||
# data structure is different from what we expect!
|
||||
ret = event['data']['return']['data']['master']
|
||||
for state in ret:
|
||||
data = ret[state]
|
||||
# we expect each duration to be greater than 10s
|
||||
self.assertTrue(data['duration'] > 10000)
|
||||
break
|
||||
|
||||
# self confirm that the total runtime is roughly 30s (left 10s for buffer)
|
||||
self.assertTrue((time.time() - start_time) < 40)
|
||||
finally:
|
||||
self.assertTrue(received)
|
||||
del listener
|
||||
signal.alarm(0)
|
||||
|
|
|
@ -160,22 +160,27 @@ class PuppetTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'''
|
||||
Test to run facter and return the results
|
||||
'''
|
||||
mock_lst = MagicMock(return_value=[])
|
||||
with patch.dict(puppet.__salt__, {'cmd.run': mock_lst}):
|
||||
mock_lst = MagicMock(return_value="True")
|
||||
with patch.dict(puppet.__salt__, {'cmd.run': mock_lst}):
|
||||
mock = MagicMock(return_value=["a", "b"])
|
||||
with patch.object(puppet, '_format_fact', mock):
|
||||
self.assertDictEqual(puppet.facts(), {'a': 'b'})
|
||||
mock = MagicMock(return_value={
|
||||
'retcode': 0,
|
||||
'stdout': "1\n2"
|
||||
})
|
||||
with patch.dict(puppet.__salt__, {'cmd.run_all': mock}):
|
||||
mock = MagicMock(side_effect=[
|
||||
['a', 'b'],
|
||||
['c', 'd'],
|
||||
])
|
||||
with patch.object(puppet, '_format_fact', mock):
|
||||
self.assertDictEqual(puppet.facts(), {'a': 'b', 'c': 'd'})
|
||||
|
||||
def test_fact(self):
|
||||
'''
|
||||
Test to run facter for a specific fact
|
||||
'''
|
||||
mock_lst = MagicMock(return_value=[])
|
||||
with patch.dict(puppet.__salt__, {'cmd.run': mock_lst}):
|
||||
mock_lst = MagicMock(side_effect=[False, True])
|
||||
with patch.dict(puppet.__salt__, {'cmd.run': mock_lst}):
|
||||
self.assertEqual(puppet.fact("salt"), "")
|
||||
mock = MagicMock(side_effect=[
|
||||
{'retcode': 0, 'stdout': False},
|
||||
{'retcode': 0, 'stdout': True},
|
||||
])
|
||||
with patch.dict(puppet.__salt__, {'cmd.run_all': mock}):
|
||||
self.assertEqual(puppet.fact('salt'), '')
|
||||
|
||||
self.assertTrue(puppet.fact("salt"))
|
||||
self.assertTrue(puppet.fact('salt'))
|
||||
|
|
|
@ -31,23 +31,25 @@ class WinNtpTestCase(TestCase, LoaderModuleMockMixin):
|
|||
'''
|
||||
Test if it set Windows to use a list of NTP servers
|
||||
'''
|
||||
# Windows Time (W32Time) service is not started
|
||||
# Windows Time (W32Time) service fails to start
|
||||
mock_service = MagicMock(return_value=False)
|
||||
mock_cmd = MagicMock(return_value='Failure')
|
||||
with patch.dict(win_ntp.__salt__, {'service.status': mock_service,
|
||||
'service.start': mock_service,
|
||||
'cmd.run': mock_cmd}):
|
||||
'service.start': mock_service}):
|
||||
self.assertFalse(win_ntp.set_servers('pool.ntp.org'))
|
||||
|
||||
# Windows Time service is running
|
||||
# Fail to set NTP servers
|
||||
mock_service = MagicMock(return_value=True)
|
||||
mock_cmd = MagicMock(return_value='Failure')
|
||||
mock_cmd = MagicMock(side_effect=['Failure', 'Failure', 'Failure', 'NtpServer: time.windows.com,0x8'])
|
||||
with patch.dict(win_ntp.__salt__, {'service.status': mock_service,
|
||||
'service.start': mock_service,
|
||||
'cmd.run': mock_cmd}):
|
||||
self.assertFalse(win_ntp.set_servers('pool.ntp.org'))
|
||||
|
||||
mock_cmd = MagicMock(return_value='command completed successfully')
|
||||
# Windows Time service is running
|
||||
# Successfully set NTP servers
|
||||
mock_cmd = MagicMock(side_effect=['Success', 'Success', 'Success', 'NtpServer: pool.ntp.org'])
|
||||
with patch.dict(win_ntp.__salt__, {'service.status': mock_service,
|
||||
'service.start': mock_service,
|
||||
'service.restart': mock_service,
|
||||
'cmd.run': mock_cmd}):
|
||||
self.assertTrue(win_ntp.set_servers('pool.ntp.org'))
|
||||
|
|
Loading…
Add table
Reference in a new issue