Merge pull request #31670 from terminalmage/issue30528

Write lists of minions targeted by syndic masters to job cache
This commit is contained in:
Mike Place 2016-03-07 11:51:52 -07:00
commit a1f32b71bd
39 changed files with 547 additions and 114 deletions

View file

@ -0,0 +1,6 @@
====================
salt.modules.slsutil
====================
.. automodule:: salt.modules.slsutil
:members:

View file

@ -3,4 +3,5 @@ salt.returners.cassandra_cql_return
===================================
.. automodule:: salt.returners.cassandra_cql_return
:members:
:members:
:exclude-members: save_minions

View file

@ -3,4 +3,5 @@ salt.returners.couchdb_return
=============================
.. automodule:: salt.returners.couchdb_return
:members:
:members:
:exclude-members: save_minions

View file

@ -3,4 +3,5 @@ salt.returners.etcd_return
==========================
.. automodule:: salt.returners.etcd_return
:members:
:members:
:exclude-members: save_minions

View file

@ -4,3 +4,4 @@ salt.returners.influxdb_return
.. automodule:: salt.returners.influxdb_return
:members:
:exclude-members: save_minions

View file

@ -3,4 +3,5 @@ salt.returners.memcache_return
==============================
.. automodule:: salt.returners.memcache_return
:members:
:members:
:exclude-members: save_minions

View file

@ -3,4 +3,5 @@ salt.returners.mongo_future_return
==================================
.. automodule:: salt.returners.mongo_future_return
:members:
:members:
:exclude-members: save_minions

View file

@ -3,4 +3,5 @@ salt.returners.mongo_return
===========================
.. automodule:: salt.returners.mongo_return
:members:
:members:
:exclude-members: save_minions

View file

@ -3,4 +3,5 @@ salt.returners.multi_returner
=============================
.. automodule:: salt.returners.multi_returner
:members:
:members:
:exclude-members: save_minions

View file

@ -3,4 +3,5 @@ salt.returners.mysql
====================
.. automodule:: salt.returners.mysql
:members:
:members:
:exclude-members: save_minions

View file

@ -3,4 +3,5 @@ salt.returners.odbc
===================
.. automodule:: salt.returners.odbc
:members:
:members:
:exclude-members: save_minions

View file

@ -4,3 +4,4 @@ salt.returners.pgjsonb
.. automodule:: salt.returners.pgjsonb
:members:
:exclude-members: save_minions

View file

@ -3,4 +3,5 @@ salt.returners.postgres
=======================
.. automodule:: salt.returners.postgres
:members:
:members:
:exclude-members: save_minions

View file

@ -4,3 +4,4 @@ salt.returners.postgres_local_cache
.. automodule:: salt.returners.postgres_local_cache
:members:
:exclude-members: save_minions

View file

@ -3,4 +3,5 @@ salt.returners.redis_return
===========================
.. automodule:: salt.returners.redis_return
:members:
:members:
:exclude-members: save_minions

View file

@ -3,4 +3,5 @@ salt.returners.sqlite3
=======================
.. automodule:: salt.returners.sqlite3_return
:members:
:members:
:exclude-members: save_minions

View file

@ -6,8 +6,14 @@ from __future__ import absolute_import
# Import python libs
import copy
import logging
import time
# Import salt libs
import salt.defaults.exitcodes
log = logging.getLogger(__name__)
class SaltException(Exception):
'''
@ -98,6 +104,23 @@ class FileserverConfigError(SaltException):
'''
class FileLockError(SaltException):
'''
Used when an error occurs obtaining a file lock
'''
def __init__(self, msg, time_start=None, *args, **kwargs):
super(FileLockError, self).__init__(msg, *args, **kwargs)
if time_start is None:
log.warning(
'time_start should be provided when raising a FileLockError. '
'Defaulting to current time as a fallback, but this may '
'result in an inaccurate timeout.'
)
self.time_start = time.time()
else:
self.time_start = time_start
class SaltInvocationError(SaltException, TypeError):
'''
Used when the wrong number of arguments are sent to modules or invalid

View file

@ -1182,10 +1182,32 @@ class AESFuncs(object):
'''
Act on specific events from minions
'''
id_ = load['id']
if load.get('tag', '') == '_salt_error':
log.error('Received minion error from [{minion}]: '
'{data}'.format(minion=load['id'],
data=load['data']['message']))
log.error(
'Received minion error from [{minion}]: {data}'
.format(minion=id_, data=load['data']['message'])
)
for event in load.get('events', []):
event_data = event.get('data', {})
if 'minions' in event_data:
jid = event_data.get('jid')
if not jid:
continue
minions = event_data['minions']
try:
salt.utils.job.store_minions(
self.opts,
jid,
minions,
mminion=self.mminion,
syndic_id=id_)
except (KeyError, salt.exceptions.SaltCacheError) as exc:
log.error(
'Could not add minion(s) {0} for job {1}: {2}'
.format(minions, jid, exc)
)
def _return(self, load):
'''

View file

@ -260,6 +260,13 @@ def save_load(jid, load):
raise
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
# salt-run jobs.list_jobs FAILED
def get_load(jid):
'''

View file

@ -124,7 +124,8 @@ def _get_ttl():
def prep_jid(nocache=False, passed_jid=None):
'''
Return a job id and prepare the job id directory
This is the function responsible for making sure jids don't collide (unless its passed a jid)
This is the function responsible for making sure jids don't collide (unless
its passed a jid)
So do what you have to do to make sure that stays the case
'''
if passed_jid is None:
@ -195,6 +196,9 @@ def save_load(jid, clear_load):
log.warning('Could not write job cache file for jid: {0}'.format(jid))
return False
jid_doc.value['load'] = clear_load
cb_.replace(str(jid), jid_doc.value, cas=jid_doc.cas, ttl=_get_ttl())
# if you have a tgt, save that for the UI etc
if 'tgt' in clear_load:
ckminions = salt.utils.minions.CkMinions(__opts__)
@ -203,16 +207,30 @@ def save_load(jid, clear_load):
clear_load['tgt'],
clear_load.get('tgt_type', 'glob')
)
# save the minions to a cache so we can see in the UI
save_minions(jid, minions)
def save_minions(jid, minions, syndic_id=None): # pylint: disable=unused-argument
'''
Save/update the minion list for a given jid. The syndic_id argument is
included for API compatibility only.
'''
cb_ = _get_connection()
try:
jid_doc = cb_.get(str(jid))
except couchbase.exceptions.NotFoundError:
log.warning('Could not write job cache file for jid: {0}'.format(jid))
return False
# save the minions to a cache so we can see in the UI
if 'minions' in jid_doc.value:
jid_doc.value['minions'] = sorted(
set(jid_doc.value['minions'] + minions)
)
else:
jid_doc.value['minions'] = minions
jid_doc.value['load'] = clear_load
cb_.replace(str(jid),
jid_doc.value,
cas=jid_doc.cas,
ttl=_get_ttl()
)
cb_.replace(str(jid), jid_doc.value, cas=jid_doc.cas, ttl=_get_ttl())
def get_load(jid):

View file

@ -367,3 +367,10 @@ def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
Do any work necessary to prepare a JID, including sending a custom id
'''
return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid()
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass

View file

@ -113,6 +113,13 @@ def save_load(jid, load):
)
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
def get_load(jid):
'''
Return the load data that marks a specified jid

View file

@ -149,6 +149,13 @@ def save_load(jid, load):
log.critical('Failed to store load with InfluxDB returner: {0}'.format(ex))
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
def get_load(jid):
'''
Return the load data that marks a specified jid

View file

@ -7,16 +7,19 @@ from __future__ import absolute_import
# Import python libs
import errno
import glob
import logging
import os
import shutil
import time
import hashlib
import bisect
import time
# Import salt libs
import salt.payload
import salt.utils
import salt.utils.files
import salt.utils.jid
import salt.exceptions
@ -24,8 +27,12 @@ log = logging.getLogger(__name__)
# load is the published job
LOAD_P = '.load.p'
# the list of minions that the job is targeted to (best effort match on the master side)
# the list of minions that the job is targeted to (best effort match on the
# master side)
MINIONS_P = '.minions.p'
# format string for minion lists forwarded from syndic masters (the placeholder
# will be replaced with the syndic master's id)
SYNDIC_MINIONS_P = '.minions.{0}.p'
# return is the "return" from the minion data
RETURN_P = 'return.p'
# out is the "out" from the minion data
@ -48,9 +55,7 @@ def _jid_dir(jid):
'''
jid = str(jid)
jhash = getattr(hashlib, __opts__['hash_type'])(jid).hexdigest()
return os.path.join(_job_dir(),
jhash[:2],
jhash[2:])
return os.path.join(_job_dir(), jhash[:2], jhash[2:])
def _walk_through(job_dir):
@ -182,7 +187,8 @@ def save_load(jid, clear_load, minions=None, recurse_count=0):
as for salt-ssh)
'''
if recurse_count >= 5:
err = 'save_load could not write job cache file after {0} retries.'.format(recurse_count)
err = ('save_load could not write job cache file after {0} retries.'
.format(recurse_count))
log.error(err)
raise salt.exceptions.SaltCacheError(err)
@ -207,7 +213,9 @@ def save_load(jid, clear_load, minions=None, recurse_count=0):
salt.utils.fopen(os.path.join(jid_dir, LOAD_P), 'w+b')
)
except IOError as exc:
log.warning('Could not write job invocation cache file: {0}'.format(exc))
log.warning(
'Could not write job invocation cache file: %s', exc
)
time.sleep(0.1)
return save_load(jid=jid, clear_load=clear_load,
recurse_count=recurse_count+1)
@ -222,14 +230,37 @@ def save_load(jid, clear_load, minions=None, recurse_count=0):
clear_load.get('tgt_type', 'glob')
)
# save the minions to a cache so we can see in the UI
try:
serial.dump(
minions,
salt.utils.fopen(os.path.join(jid_dir, MINIONS_P), 'w+b')
)
except IOError as exc:
log.warning('Could not write job cache file for minions: {0}'.format(minions))
log.debug('Job cache write failure: {0}'.format(exc))
save_minions(jid, minions)
def save_minions(jid, minions, syndic_id=None):
'''
Save/update the serialized list of minions for a given job
'''
log.debug(
'Adding minions for job %s%s: %s',
jid,
' from syndic master \'{0}\''.format(syndic_id) if syndic_id else '',
minions
)
serial = salt.payload.Serial(__opts__)
jid_dir = _jid_dir(jid)
if syndic_id is not None:
minions_path = os.path.join(
jid_dir,
SYNDIC_MINIONS_P.format(syndic_id)
)
else:
minions_path = os.path.join(jid_dir, MINIONS_P)
try:
serial.dump(minions, salt.utils.fopen(minions_path, 'w+b'))
except IOError as exc:
log.error(
'Failed to write minion list {0} to job cache file {1}: {2}'
.format(minions, minions_path, exc)
)
def get_load(jid):
@ -244,9 +275,22 @@ def get_load(jid):
ret = serial.load(salt.utils.fopen(os.path.join(jid_dir, LOAD_P), 'rb'))
minions_path = os.path.join(jid_dir, MINIONS_P)
if os.path.isfile(minions_path):
ret['Minions'] = serial.load(salt.utils.fopen(minions_path, 'rb'))
minions_cache = [os.path.join(jid_dir, MINIONS_P)]
minions_cache.extend(
glob.glob(os.path.join(jid_dir, SYNDIC_MINIONS_P.format('*')))
)
all_minions = set()
for minions_path in minions_cache:
log.debug('Reading minion list from %s', minions_path)
try:
all_minions.update(
serial.load(salt.utils.fopen(minions_path, 'rb'))
)
except IOError as exc:
salt.utils.files.process_read_exception(exc, minions_path)
if all_minions:
ret['Minions'] = sorted(all_minions)
return ret

View file

@ -139,6 +139,13 @@ def save_load(jid, load):
serv.append('jids', jid)
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
def get_load(jid):
'''
Return the load data that marks a specified jid

View file

@ -202,6 +202,13 @@ def save_load(jid, load):
mdb.jobs.insert(load.copy())
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
def get_load(jid):
'''
Return the load associated with a given job id

View file

@ -215,3 +215,10 @@ def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
Do any work necessary to prepare a JID, including sending a custom id
'''
return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid()
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass

View file

@ -69,6 +69,13 @@ def save_load(jid, clear_load):
_mminion().returners['{0}.save_load'.format(returner_)](jid, clear_load)
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
def get_load(jid):
'''
Merge the load data from all returners

View file

@ -297,6 +297,13 @@ def save_load(jid, load):
pass
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
def get_load(jid):
'''
Return the load data that marks a specified jid

View file

@ -217,6 +217,13 @@ def save_load(jid, load):
_close_conn(conn)
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
def get_load(jid):
'''
Return the load data that marks a specified jid

View file

@ -282,6 +282,13 @@ def save_load(jid, load):
pass
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
def get_load(jid):
'''
Return the load data that marks a specified jid

View file

@ -193,6 +193,13 @@ def save_load(jid, load):
_close_conn(conn)
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
def get_load(jid):
'''
Return the load data that marks a specified jid

View file

@ -256,6 +256,13 @@ def save_load(jid, clear_load):
_close_conn(conn)
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
def _escape_jid(jid):
'''
Do proper formatting of the jid

View file

@ -115,6 +115,13 @@ def save_load(jid, load):
serv.sadd('jids', jid)
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
def get_load(jid):
'''
Return the load data that marks a specified jid

View file

@ -185,6 +185,13 @@ def save_load(jid, load):
_close_conn(conn)
def save_minions(jid, minions): # pylint: disable=unused-argument
'''
Included for API consistency
'''
pass
def get_load(jid):
'''
Return the load from a specified jid

View file

@ -100,18 +100,17 @@ def lookup_jid(jid,
ext_source
The external job cache to use. Default: `None`.
returned
When set to `True`, adds the minions that did return from the command.
Default: `True`.
returned : True
If ``True``, include the minions that did return from the command.
.. versionadded:: 2015.8.0
missing
When set to `True`, adds the minions that did NOT return from the command.
Default: `False`.
missing : False
If ``True``, include the minions that did *not* return from the
command.
display_progress
Displays progress events when set to `True`. Default: `False`.
display_progress : False
If ``True``, fire progress events.
.. versionadded:: 2015.5.0
@ -120,35 +119,41 @@ def lookup_jid(jid,
.. code-block:: bash
salt-run jobs.lookup_jid 20130916125524463507
salt-run jobs.lookup_jid 20130916125524463507 outputter=highstate
salt-run jobs.lookup_jid 20130916125524463507 --out=highstate
'''
ret = {}
mminion = salt.minion.MasterMinion(__opts__)
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache']))
if display_progress:
__jid_event__.fire_event({'message': 'Querying returner: {0}'.format(returner)}, 'progress')
returner = _get_returner((
__opts__['ext_job_cache'],
ext_source,
__opts__['master_job_cache']
))
try:
data = mminion.returners['{0}.get_jid'.format(returner)](jid)
data = list_job(
jid,
ext_source=ext_source,
display_progress=display_progress
)
except TypeError:
return 'Requested returner could not be loaded. No JIDs could be retrieved.'
return ('Requested returner could not be loaded. '
'No JIDs could be retrieved.')
for minion in data:
targeted_minions = data.get('Minions', [])
returns = data.get('Result', {})
for minion in returns:
if display_progress:
__jid_event__.fire_event({'message': minion}, 'progress')
if u'return' in data[minion]:
if u'return' in returns[minion]:
if returned:
ret[minion] = data[minion].get(u'return')
ret[minion] = returns[minion].get(u'return')
else:
if returned:
ret[minion] = data[minion].get('return')
ret[minion] = returns[minion].get('return')
if missing:
load = mminion.returners['{0}.get_load'.format(returner)](jid)
ckminions = salt.utils.minions.CkMinions(__opts__)
exp = ckminions.check_minions(load['tgt'], load['tgt_type'])
for minion_id in exp:
if minion_id not in data:
ret[minion_id] = 'Minion did not return'
for minion_id in (x for x in targeted_minions if x not in returns):
ret[minion_id] = 'Minion did not return'
# Once we remove the outputter argument in a couple releases, we still
# need to check to see if the 'out' key is present and use it to specify
@ -174,19 +179,37 @@ def lookup_jid(jid,
return ret
def list_job(jid, ext_source=None, outputter=None):
def list_job(jid, ext_source=None, outputter=None, display_progress=False):
'''
List a specific job given by its jid
ext_source
If provided, specifies which external job cache to use.
display_progress : False
If ``True``, fire progress events.
.. versionadded:: 2015.8.8
CLI Example:
.. code-block:: bash
salt-run jobs.list_job 20130916125524463507
salt-run jobs.list_job 20130916125524463507 --out=pprint
'''
ret = {'jid': jid}
mminion = salt.minion.MasterMinion(__opts__)
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache']))
returner = _get_returner((
__opts__['ext_job_cache'],
ext_source,
__opts__['master_job_cache']
))
if display_progress:
__jid_event__.fire_event(
{'message': 'Querying returner: {0}'.format(returner)},
'progress'
)
job = mminion.returners['{0}.get_load'.format(returner)](jid)
ret.update(_format_jid_instance(jid, job))
@ -223,31 +246,68 @@ def list_jobs(ext_source=None,
List all detectable jobs and associated functions
ext_source
The external job cache to use. Default: `None`.
If provided, specifies which external job cache to use.
**FILTER OPTIONS**
.. note::
If more than one of the below options are used, only jobs which match
*all* of the filters will be returned.
search_metadata
Search the metadata of a job for the provided string of dictionary.
Default: 'None'.
Specify a dictionary to match to the job's metadata. If any of the
key-value pairs in this dictionary match, the job will be returned.
Example:
.. code-block:: bash
salt-run jobs.list_jobs search_metadata='{"foo": "bar", "baz": "qux"}'
search_function
Search the function of a job for the provided string.
Default: 'None'.
Can be passed as a string or a list. Returns jobs which match the
specified function. Globbing is allowed. Example:
.. code-block:: bash
salt-run jobs.list_jobs search_function='test.*'
salt-run jobs.list_jobs search_function='["test.*", "pkg.install"]'
.. versionchanged:: 2015.8.8
Multiple targets can now also be passed as a comma-separated list.
For example:
.. code-block:: bash
salt-run jobs.list_jobs search_function='test.*,pkg.install'
search_target
Search the target of a job for the provided minion name.
Default: 'None'.
Can be passed as a string or a list. Returns jobs which match the
specified minion name. Globbing is allowed. Example:
.. code-block:: bash
salt-run jobs.list_jobs search_target='*.mydomain.tld'
salt-run jobs.list_jobs search_target='["db*", "myminion"]'
.. versionchanged:: 2015.8.8
Multiple targets can now also be passed as a comma-separated list.
For example:
.. code-block:: bash
salt-run jobs.list_jobs search_target='db*,myminion'
start_time
Search for jobs where the start time of the job is greater than
or equal to the provided time stamp. Any timestamp supported
by the Dateutil (required) module can be used.
Default: 'None'.
Accepts any timestamp supported by the dateutil_ Python module (if this
module is not installed, this argument will be ignored). Returns jobs
which started after this timestamp.
end_time
Search for jobs where the start time of the job is less than
or equal to the provided time stamp. Any timestamp supported
by the Dateutil (required) module can be used.
Default: 'None'.
Accepts any timestamp supported by the dateutil_ Python module (if this
module is not installed, this argument will be ignored). Returns jobs
which started before this timestamp.
.. _dateutil: https://pypi.python.org/pypi/python-dateutil
CLI Example:
@ -258,9 +318,16 @@ def list_jobs(ext_source=None,
salt-run jobs.list_jobs start_time='2015, Mar 16 19:00' end_time='2015, Mar 18 22:00'
'''
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache']))
returner = _get_returner((
__opts__['ext_job_cache'],
ext_source,
__opts__['master_job_cache']
))
if display_progress:
__jid_event__.fire_event({'message': 'Querying returner {0} for jobs.'.format(returner)}, 'progress')
__jid_event__.fire_event(
{'message': 'Querying returner {0} for jobs.'.format(returner)},
'progress'
)
mminion = salt.minion.MasterMinion(__opts__)
ret = mminion.returners['{0}.get_jids'.format(returner)]()
@ -282,23 +349,15 @@ def list_jobs(ext_source=None,
if search_target and _match:
_match = False
if 'Target' in ret[item]:
if isinstance(search_target, list):
for key in search_target:
if fnmatch.fnmatch(ret[item]['Target'], key):
_match = True
elif isinstance(search_target, six.string_types):
if fnmatch.fnmatch(ret[item]['Target'], search_target):
for key in salt.utils.split_input(search_target):
if fnmatch.fnmatch(ret[item]['Target'], key):
_match = True
if search_function and _match:
_match = False
if 'Function' in ret[item]:
if isinstance(search_function, list):
for key in search_function:
if fnmatch.fnmatch(ret[item]['Function'], key):
_match = True
elif isinstance(search_function, six.string_types):
if fnmatch.fnmatch(ret[item]['Function'], search_function):
for key in salt.utils.split_input(search_function):
if fnmatch.fnmatch(ret[item]['Function'], key):
_match = True
if start_time and _match:
@ -309,7 +368,10 @@ def list_jobs(ext_source=None,
if _start_time >= parsed_start_time:
_match = True
else:
log.error('"dateutil" library not available, skipping start_time comparision.')
log.error(
'\'dateutil\' library not available, skipping start_time '
'comparision.'
)
if end_time and _match:
_match = False
@ -319,7 +381,10 @@ def list_jobs(ext_source=None,
if _start_time <= parsed_end_time:
_match = True
else:
log.error('"dateutil" library not available, skipping start_time comparision.')
log.error(
'\'dateutil\' library not available, skipping end_time '
'comparision.'
)
if _match:
mret[item] = ret[item]
@ -349,14 +414,23 @@ def list_jobs_filter(count,
salt-run jobs.list_jobs_filter 100 filter_find_job=False
'''
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache']))
returner = _get_returner((
__opts__['ext_job_cache'],
ext_source,
__opts__['master_job_cache']
))
if display_progress:
__jid_event__.fire_event({'message': 'Querying returner {0} for jobs.'.format(returner)}, 'progress')
__jid_event__.fire_event(
{'message': 'Querying returner {0} for jobs.'.format(returner)},
'progress'
)
mminion = salt.minion.MasterMinion(__opts__)
fun = '{0}.get_jids_filter'.format(returner)
if fun not in mminion.returners:
raise salt.exceptions.NotImplemented('\'{0}\' returner function not implemented yet.'.format(fun))
raise NotImplementedError(
'\'{0}\' returner function not implemented yet.'.format(fun)
)
ret = mminion.returners[fun](count, filter_find_job)
if outputter:
@ -377,15 +451,21 @@ def print_job(jid, ext_source=None, outputter=None):
'''
ret = {}
returner = _get_returner((__opts__['ext_job_cache'], ext_source, __opts__['master_job_cache']))
returner = _get_returner((
__opts__['ext_job_cache'],
ext_source,
__opts__['master_job_cache']
))
mminion = salt.minion.MasterMinion(__opts__)
try:
job = mminion.returners['{0}.get_load'.format(returner)](jid)
ret[jid] = _format_jid_instance(jid, job)
except TypeError:
ret[jid]['Result'] = ('Requested returner {0} is not available. Jobs cannot be retrieved. '
'Check master log for details.'.format(returner))
ret[jid]['Result'] = (
'Requested returner {0} is not available. Jobs cannot be '
'retrieved. Check master log for details.'.format(returner)
)
return ret
ret[jid]['Result'] = mminion.returners['{0}.get_jid'.format(returner)](jid)
@ -415,20 +495,17 @@ def last_run(ext_source=None,
target=None,
display_progress=False):
'''
List all detectable jobs and associated functions
.. versionadded:: 2015.8.0
List all detectable jobs and associated functions
CLI Example:
.. code-block:: bash
salt-run jobs.last_run
salt-run jobs.last_run target=nodename
salt-run jobs.last_run function='cmd.run'
salt-run jobs.last_run metadata="{'foo': 'bar'}"
'''
@ -437,7 +514,8 @@ def last_run(ext_source=None,
log.info('The metadata parameter must be specified as a dictionary')
return False
_all_jobs = list_jobs(ext_source, outputter, metadata, function, target, display_progress)
_all_jobs = list_jobs(ext_source, outputter, metadata,
function, target, display_progress)
if _all_jobs:
last_job = sorted(_all_jobs)[-1]
return print_job(last_job, ext_source, outputter)
@ -505,5 +583,8 @@ def _walk_through(job_dir, display_progress=False):
job = serial.load(salt.utils.fopen(load_path, 'rb'))
jid = job['jid']
if display_progress:
__jid_event__.fire_event({'message': 'Found JID {0}'.format(jid)}, 'progress')
__jid_event__.fire_event(
{'message': 'Found JID {0}'.format(jid)},
'progress'
)
yield jid, job, t_path, final

View file

@ -2874,3 +2874,15 @@ def invalid_kwargs(invalid_kwargs, raise_exc=True):
raise SaltInvocationError(msg)
else:
return msg
def split_input(val):
'''
Take an input value and split it into a list, returning the resulting list
'''
if isinstance(val, list):
return val
try:
return val.split(',')
except AttributeError:
return str(val).split(',')

View file

@ -3,15 +3,23 @@
from __future__ import absolute_import
# Import Python libs
import contextlib
import errno
import logging
import os
import shutil
import subprocess
import time
# Import salt libs
import salt.utils
import salt.modules.selinux
from salt.exceptions import CommandExecutionError, MinionError
from salt.exceptions import CommandExecutionError, FileLockError, MinionError
# Import 3rd-party libs
from salt.ext import six
log = logging.getLogger(__name__)
def recursive_copy(source, dest):
@ -130,3 +138,80 @@ def process_read_exception(exc, path):
exc.errno, path, exc.strerror
)
)
@contextlib.contextmanager
def wait_lock(path, lock_fn=None, timeout=5, sleep=0.1, time_start=None):
'''
Obtain a write lock. If one exists, wait for it to release first
'''
if not isinstance(path, six.string_types):
raise FileLockError('path must be a string')
if lock_fn is None:
lock_fn = path + '.w'
if time_start is None:
time_start = time.time()
obtained_lock = False
def _raise_error(msg, race=False):
'''
Raise a FileLockError
'''
raise FileLockError(msg, time_start=time_start)
try:
if os.path.exists(lock_fn) and not os.path.isfile(lock_fn):
_raise_error(
'lock_fn {0} exists and is not a file'.format(lock_fn)
)
open_flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY
while time.time() - time_start < timeout:
try:
# Use os.open() to obtain filehandle so that we can force an
# exception if the file already exists. Concept found here:
# http://stackoverflow.com/a/10979569
fh_ = os.open(lock_fn, open_flags)
except (IOError, OSError) as exc:
if exc.errno != errno.EEXIST:
_raise_error(
'Error {0} encountered obtaining file lock {1}: {2}'
.format(exc.errno, lock_fn, exc.strerror)
)
log.trace(
'Lock file %s exists, sleeping %f seconds', lock_fn, sleep
)
time.sleep(sleep)
else:
# Write the lock file
with os.fdopen(fh_, 'w'):
pass
# Lock successfully acquired
log.trace('Write lock %s obtained', lock_fn)
obtained_lock = True
# Transfer control back to the code inside the with block
yield
# Exit the loop
break
else:
_raise_error(
'Timeout of {0} seconds exceeded waiting for lock_fn {1} '
'to be released'.format(timeout, lock_fn)
)
except FileLockError:
raise
except Exception as exc:
_raise_error(
'Error encountered obtaining file lock {0}: {1}'.format(
lock_fn,
exc
)
)
finally:
if obtained_lock:
os.remove(lock_fn)
log.trace('Write lock for %s (%s) released', path, lock_fn)

View file

@ -83,7 +83,9 @@ def store_job(opts, load, event=None, mminion=None):
if 'user' in ret_:
load.update({'user': ret_['user']})
try:
if 'jid' in load and 'get_load' in mminion.returners and not mminion.returners[getfstr](load.get('jid', '')):
if 'jid' in load \
and 'get_load' in mminion.returners \
and not mminion.returners[getfstr](load.get('jid', '')):
mminion.returners[savefstr](load['jid'], load)
mminion.returners[fstr](load)
@ -98,6 +100,26 @@ def store_job(opts, load, event=None, mminion=None):
raise KeyError(emsg)
def store_minions(opts, jid, minions, mminion=None, syndic_id=None):
'''
Store additional minions matched on lower-level masters using the configured
master_job_cache
'''
if mminion is None:
mminion = salt.minion.MasterMinion(opts, states=False, rend=False)
job_cache = opts['master_job_cache']
minions_fstr = '{0}.save_minions'.format(job_cache)
try:
mminion.returners[minions_fstr](jid, minions, syndic_id=syndic_id)
except KeyError:
raise KeyError(
'Returner \'{0}\' does not support function save_minions'.format(
job_cache
)
)
def get_retcode(ret):
'''
Determine a retcode for a given return