Merge remote-tracking branch 'upstream/2015.8' into hotfix/merge-forward-develop

This commit is contained in:
Pedro Algarvio 2015-09-06 12:42:58 +01:00
commit 10a9189d83
10 changed files with 141 additions and 52 deletions

View file

@ -820,8 +820,7 @@ class LocalClient(object):
'''
while True:
# CHANGED(driskell): This was previously completely nonblocking.
raw = self.event.get_event(wait=0.01, tag=tag, match_type=match_type, full=True)
raw = self.event.get_event(wait=0.01, tag=tag, match_type=match_type, full=True, no_block=True)
yield raw
def get_iter_returns(

View file

@ -709,13 +709,20 @@ def clone(cwd,
if not isinstance(name, six.string_types):
name = str(name)
command.append(name)
if not os.path.exists(cwd):
os.makedirs(cwd)
clone_cwd = cwd
else:
command.append(cwd)
# Use '/tmp' instead of $HOME (/root for root user) to work around upstream
# git bug. See the following comment on the Salt bug tracker for more info:
# https://github.com/saltstack/salt/issues/15519#issuecomment-128531310
# Use '/tmp' instead of $HOME (/root for root user) to work around
# upstream git bug. See the following comment on the Salt bug tracker
# for more info:
# https://github.com/saltstack/salt/issues/15519#issuecomment-128531310
# On Windows, just fall back to None (runs git clone command using the
# home directory as the cwd).
clone_cwd = '/tmp' if not salt.utils.is_windows() else None
_git_run(command,
cwd='/tmp' if name is None else cwd,
cwd=clone_cwd,
runas=user,
identity=identity,
ignore_retcode=ignore_retcode)

View file

@ -137,7 +137,7 @@ class SPMClient(object):
raise SPMInvocationError('A package file must be specified')
pkg_file = args[1]
if not self.pkgfiles['{0}.path_exists'.format(self.files_prov)](pkg_file):
if not os.path.exists(pkg_file):
raise SPMInvocationError('Package file {0} not found'.format(pkg_file))
comps = pkg_file.split('-')
@ -211,18 +211,15 @@ class SPMClient(object):
member.uname = uname
member.gname = gname
file_ref = formula_tar.extractfile(member)
if member.isdir():
digest = ''
else:
file_hash = hashlib.sha1()
file_hash.update(file_ref.read())
digest = file_hash.hexdigest()
out_path = self.pkgfiles['{0}.install_file'.format(self.files_prov)](
name, formula_tar, member, formula_def, self.files_conn
)
if out_path is not False:
if member.isdir():
digest = ''
else:
file_hash = hashlib.sha1()
digest = self.pkgfiles['{0}.hash_file'.format(self.files_prov)](out_path, file_hash, self.files_conn)
self.pkgdb['{0}.register_file'.format(self.db_prov)](
name,
member,
@ -413,16 +410,14 @@ class SPMClient(object):
if self.pkgfiles['{0}.path_isdir'.format(self.files_prov)](filerow[0]):
dirs.append(filerow[0])
continue
with salt.utils.fopen(filerow[0], 'r') as fh_:
file_hash = hashlib.sha1()
file_hash.update(fh_.read())
digest = file_hash.hexdigest()
if filerow[1] == digest:
log.trace('Removing file {0}'.format(filerow[0]))
self.pkgfiles['{0}.remove_file'.format(self.files_prov)](filerow[0], self.files_conn)
else:
log.trace('Not removing file {0}'.format(filerow[0]))
self.pkgdb['{0}.unregister_file'.format(self.db_prov)](filerow[0], package, self.db_conn)
file_hash = hashlib.sha1()
digest = self.pkgfiles['{0}.hash_file'.format(self.files_prov)](filerow[0], file_hash, self.files_conn)
if filerow[1] == digest:
log.trace('Removing file {0}'.format(filerow[0]))
self.pkgfiles['{0}.remove_file'.format(self.files_prov)](filerow[0], self.files_conn)
else:
log.trace('Not removing file {0}'.format(filerow[0]))
self.pkgdb['{0}.unregister_file'.format(self.db_prov)](filerow[0], package, self.db_conn)
# Clean up directories
for dir_ in sorted(dirs, reverse=True):

View file

@ -123,6 +123,15 @@ def remove_file(path, conn=None):
os.remove(path)
def hash_file(path, hashobj, conn=None):
'''
Get the hexdigest hash value of a file
'''
with salt.utils.fopen(path, 'r') as f:
hashobj.update(f.read())
return hashobj.hexdigest()
def path_exists(path):
'''
Check to see whether the file already exists

View file

@ -57,6 +57,7 @@ import os
import time
import errno
import signal
import fnmatch
import hashlib
import logging
import datetime
@ -259,7 +260,7 @@ class SaltEvent(object):
)
return puburi, pulluri
def subscribe(self, tag, match_type=None):
def subscribe(self, tag=None, match_type=None):
'''
Subscribe to events matching the passed tag.
@ -269,15 +270,17 @@ class SaltEvent(object):
to get_event from discarding a response required by a subsequent call
to get_event.
'''
if tag is None:
return
match_func = self._get_match_func(match_type)
self.pending_tags.append([tag, match_func])
return
def unsubscribe(self, tag, match_type=None):
'''
Un-subscribe to events matching the passed tag.
'''
if tag is None:
return
match_func = self._get_match_func(match_type)
self.pending_tags.remove([tag, match_func])
@ -288,8 +291,6 @@ class SaltEvent(object):
if any(pmatch_func(evt['tag'], ptag) for ptag, pmatch_func in self.pending_tags):
self.pending_events.append(evt)
return
def connect_pub(self):
'''
Establish the publish connection
@ -389,20 +390,34 @@ class SaltEvent(object):
'''
return self.cache_regex.get(search_tag).search(event_tag) is not None
def _get_event(self, wait, tag, match_func=None):
def _match_tag_fnmatch(self, event_tag, search_tag):
'''
Check if the event_tag matches the search check.
Uses fnmatch to check.
Return True (matches) or False (no match)
'''
return fnmatch.fnmatch(event_tag, search_tag)
def _get_event(self, wait, tag, match_func=None, no_block=False):
if match_func is None:
match_func = self._get_match_func()
start = time.time()
timeout_at = start + wait
while not wait or time.time() <= timeout_at:
run_once = False
if no_block is True:
wait = 0
while (run_once is False and not wait) or time.time() <= timeout_at:
if no_block is True:
if run_once is True:
break
# Trigger that at least a single iteration has gone through
run_once = True
try:
# convert to milliseconds
socks = dict(self.poller.poll(wait * 1000))
if socks.get(self.sub) != zmq.POLLIN:
continue
# Please do not use non-blocking mode here. Reliability is
# more important than pure speed on the event bus.
ret = self.get_event_block()
except KeyboardInterrupt:
return {'tag': 'salt/event/exit', 'data': {}}
@ -426,7 +441,14 @@ class SaltEvent(object):
log.trace('_get_event() waited {0} seconds and received nothing'.format(wait * 1000))
return None
def get_event(self, wait=5, tag='', full=False, match_type=None):
def get_event(self,
wait=5,
tag='',
full=False,
use_pending=None,
pending_tags=None,
match_type=None,
no_block=False):
'''
Get a single publication.
IF no publication available THEN block for up to wait seconds
@ -446,9 +468,16 @@ class SaltEvent(object):
- 'endswith' : search for event tags that end with tag
- 'find' : search for event tags that contain tag
- 'regex' : regex search '^' + tag event tags
- 'fnmatch' : fnmatch tag event tags matching
Default is opts['event_match_type'] or 'startswith'
.. versionadded:: Boron
.. versionadded:: Beryllium
no_block
Define if getting the event should be a blocking call or not.
Defaults to False to keep backwards compatibility.
.. versionadded:: Beryllium
Notes:
@ -464,12 +493,23 @@ class SaltEvent(object):
request, it MUST subscribe the result to ensure the response is not lost
should other regions of code call get_event for other purposes.
'''
if use_pending is not None:
salt.utils.warn_until(
'Nitrogen',
'The \'use_pending\' keyword argument is deprecated and is simply ignored. '
'Please stop using it since it\'s support will be removed in {version}.'
)
if pending_tags is not None:
salt.utils.warn_until(
'Nitrogen',
'The \'pending_tags\' keyword argument is deprecated and is simply ignored. '
'Please stop using it since it\'s support will be removed in {version}.'
)
match_func = self._get_match_func(match_type)
ret = self._check_pending(tag, match_func)
if ret is None:
ret = self._get_event(wait, tag, match_func)
ret = self._get_event(wait, tag, match_func, no_block)
if ret is None or full:
return ret
@ -751,13 +791,13 @@ class AsyncEventPublisher(object):
# We're already trying the default system path, stop now!
raise
if not os.path.isdir(default_minion_sock_dir):
try:
os.makedirs(default_minion_sock_dir, 0o755)
except OSError as exc:
log.error('Could not create SOCK_DIR: {0}'.format(exc))
# Let's stop at this stage
raise
if not os.path.isdir(default_minion_sock_dir):
try:
os.makedirs(default_minion_sock_dir, 0o755)
except OSError as exc:
log.error('Could not create SOCK_DIR: {0}'.format(exc))
# Let's stop at this stage
raise
# Create the pull socket
self.epull_sock = self.context.socket(zmq.PULL)

View file

@ -791,6 +791,7 @@ class SaltDistribution(distutils.dist.Distribution):
* salt-cp
* salt-minion
* salt-unity
* salt-proxy
When packaged for salt-ssh, the following scripts should be installed:
* salt-call
@ -1031,6 +1032,7 @@ class SaltDistribution(distutils.dist.Distribution):
if IS_WINDOWS_PLATFORM:
scripts.extend(['scripts/salt-cp',
'scripts/salt-minion',
'scripts/salt-proxy',
'scripts/salt-unity'])
return scripts
@ -1046,6 +1048,7 @@ class SaltDistribution(distutils.dist.Distribution):
'scripts/salt-ssh',
'scripts/salt-syndic',
'scripts/salt-unity',
'scripts/salt-proxy',
'scripts/spm'])
return scripts

View file

@ -35,7 +35,6 @@ INTEGRATION_TEST_DIR = os.path.dirname(
os.path.normpath(os.path.abspath(__file__))
)
CODE_DIR = os.path.dirname(os.path.dirname(INTEGRATION_TEST_DIR))
SALT_LIBS = os.path.dirname(CODE_DIR)
# Import Salt Testing libs
from salttesting import TestCase
@ -46,7 +45,7 @@ from salttesting.helpers import requires_sshd_server
from salttesting.helpers import ensure_in_syspath, RedirectStdStreams
# Update sys.path
ensure_in_syspath(CODE_DIR, SALT_LIBS)
ensure_in_syspath(CODE_DIR)
# Import Salt libs
import salt

View file

@ -29,6 +29,9 @@ try:
except ImportError:
HAS_AZURE = False
if HAS_AZURE and not hasattr(azure, '__version__'):
import azure.common
def __random_name(size=6):
'''
@ -53,7 +56,6 @@ def __has_required_azure():
if hasattr(azure, '__version__'):
version = LooseVersion(azure.__version__)
else:
import azure.common
version = LooseVersion(azure.common.__version__)
if HAS_AZURE is True and REQUIRED_AZURE <= version:
return True

View file

@ -29,11 +29,13 @@ __opts__ = {
'spm_build_dir': os.path.join(_TMP_SPM, 'build'),
'spm_build_exclude': ['.git'],
'spm_db_provider': 'sqlite3',
'spm_files_provider': 'roots',
'spm_files_provider': 'local',
'spm_db': os.path.join(_TMP_SPM, 'packages.db'),
'extension_modules': os.path.join(_TMP_SPM, 'modules'),
'file_roots': {'base': [os.path.join(_TMP_SPM, 'salt')]},
'pillar_roots': {'base': [os.path.join(_TMP_SPM, 'pillar')]},
'file_roots': {'base': [_TMP_SPM, ]},
'formula_path': os.path.join(_TMP_SPM, 'spm'),
'pillar_path': os.path.join(_TMP_SPM, 'pillar'),
'reactor_path': os.path.join(_TMP_SPM, 'reactor'),
'assume_yes': True,
'force': False,
}

View file

@ -169,6 +169,31 @@ class TestSaltEvent(TestCase):
evt1 = me.get_event(tag='evt1')
self.assertGotEvent(evt1, {'data': 'foo1'})
def test_event_single_no_block(self):
'''Test a single event is received, no block'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR, listen=True)
start = time.time()
finish = start + 5
evt1 = me.get_event(wait=0, tag='evt1', no_block=True)
# We should get None and way before the 5 seconds wait since it's
# non-blocking, otherwise it would wait for an event which we
# didn't even send
self.assertIsNone(evt1, None)
self.assertLess(start, finish)
me.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me.get_event(wait=0, tag='evt1')
self.assertGotEvent(evt1, {'data': 'foo1'})
def test_event_single_wait_0_no_block_False(self):
'''Test a single event is received with wait=0 and no_block=False and doesn't spin the while loop'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR, listen=True)
me.fire_event({'data': 'foo1'}, 'evt1')
# This is too fast and will be None but assures we're not blocking
evt1 = me.get_event(wait=0, tag='evt1', no_block=False)
self.assertGotEvent(evt1, {'data': 'foo1'})
def test_event_timeout(self):
'''Test no event is received if the timeout is reached'''
with eventpublisher_process():
@ -184,7 +209,7 @@ class TestSaltEvent(TestCase):
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR, listen=True)
with eventsender_process({'data': 'foo2'}, 'evt2', 5):
evt = me.get_event(tag='evt2', wait=0)
evt = me.get_event(tag='evt2', wait=0, no_block=False)
self.assertGotEvent(evt, {'data': 'foo2'})
def test_event_matching(self):
@ -211,6 +236,14 @@ class TestSaltEvent(TestCase):
evt1 = me.get_event(tag='')
self.assertGotEvent(evt1, {'data': 'foo1'})
def test_event_matching_all_when_tag_is_None(self):
'''Test event matching all when not passing a tag'''
with eventpublisher_process():
me = event.MasterEvent(SOCK_DIR, listen=True)
me.fire_event({'data': 'foo1'}, 'evt1')
evt1 = me.get_event()
self.assertGotEvent(evt1, {'data': 'foo1'})
def test_event_not_subscribed(self):
'''Test get_event drops non-subscribed events'''
with eventpublisher_process():