salt-call now returns jobs to master

This is related to #10500.

This finishes @thatch45 work
This commit is contained in:
Mathieu Le Marec - Pasquet 2014-02-20 02:47:17 +01:00
parent 4d7c696648
commit 537f96fe14
5 changed files with 111 additions and 24 deletions

View file

@ -7,6 +7,7 @@ minion modules.
# Import python libs
from __future__ import print_function
import os
import copy
import sys
import logging
import datetime
@ -106,23 +107,37 @@ class Caller(object):
oput = self.minion.functions[fun].__outputter__
if isinstance(oput, string_types):
ret['out'] = oput
if self.opts.get('return', ''):
is_local = self.opts['local'] or self.opts.get(
'file_client', False) == 'local'
returners = self.opts.get('return', '').split(',')
if (not is_local) or returners:
ret['id'] = self.opts['id']
ret['fun'] = fun
ret['fun_args'] = self.opts['arg']
for returner in self.opts['return'].split(','):
try:
ret['success'] = True
self.minion.returners['{0}.returner'.format(returner)](ret)
except Exception:
pass
for returner in returners:
try:
ret['success'] = True
self.minion.returners['{0}.returner'.format(returner)](ret)
except Exception:
pass
# return the job infos back up to the respective minion's master
if not is_local:
try:
mret = ret.copy()
mret['jid'] = 'req'
self.return_pub(mret)
except Exception:
pass
return ret
def return_pub(self, ret):
'''
Return the data up to the master
'''
channel = salt.transport.Channel(self.opts)
channel = salt.transport.Channel.factory(self.opts)
load = {'cmd': '_return', 'id': self.opts['id']}
for key, value in ret.items():
load[key] = value

View file

@ -1232,15 +1232,21 @@ class AESFuncs(object):
return False
if not salt.utils.verify.valid_id(self.opts, load['id']):
return False
new_loadp = False
if load['jid'] == 'req':
# The minion is returning a standalone job, request a jobid
# The minion is returning a standalone job, request a jobid
load['arg'] = load.get('arg', load.get('fun_args', []))
load['tgt_type'] = 'glob'
load['tgt'] = load['id']
load['jid'] = salt.utils.prep_jid(
self.opts['cachedir'],
self.opts['hash_type'],
load.get('nocache', False))
self.opts['cachedir'],
self.opts['hash_type'],
load.get('nocache', False))
new_loadp = load.get('nocache', True) and True
log.info('Got return from {id} for job {jid}'.format(**load))
self.event.fire_event(load, load['jid']) # old dup event
self.event.fire_event(load, tagify([load['jid'], 'ret', load['id']], 'job'))
self.event.fire_event(
load, tagify([load['jid'], 'ret', load['id']], 'job'))
self.event.fire_ret_load(load)
if self.opts['master_ext_job_cache']:
fstr = '{0}.returner'.format(self.opts['master_ext_job_cache'])
@ -1249,12 +1255,17 @@ class AESFuncs(object):
if not self.opts['job_cache'] or self.opts.get('ext_job_cache'):
return
jid_dir = salt.utils.jid_dir(
load['jid'],
self.opts['cachedir'],
self.opts['hash_type']
)
load['jid'],
self.opts['cachedir'],
self.opts['hash_type']
)
if os.path.exists(os.path.join(jid_dir, 'nocache')):
return
if new_loadp:
with salt.utils.fopen(
os.path.join(jid_dir, '.load.p'), 'w+b'
) as fp_:
self.serial.dump(load, fp_)
hn_dir = os.path.join(jid_dir, load['id'])
try:
os.mkdir(hn_dir)

View file

@ -175,7 +175,8 @@ def print_job(job_id):
def _format_job_instance(job):
return {'Function': job['fun'],
'Arguments': list(job.get('arg', [])),
'Target': job['tgt'],
# unlikely but safeguard from invalid returns
'Target': job.get('tgt', 'unknown-target'),
'Target-type': job.get('tgt_type', []),
'User': job.get('user', 'root')}

View file

@ -66,12 +66,18 @@ class ZeroMQChannel(Channel):
minion state execution call
'''
def _do_transfer():
return self.auth.crypticle.loads(
self.sreq.send(self.crypt,
self.auth.crypticle.dumps(load),
tries,
timeout)
)
data = self.sreq.send(
self.crypt,
self.auth.crypticle.dumps(load),
tries,
timeout)
# we may not have always data
# as for example for saltcall ret submission, this is a blind
# communication, we do not subscribe to return events, we just
# upload the results to the master
if data:
data = self.auth.crypticle.loads(data)
return data
try:
return _do_transfer()
except salt.crypt.AuthenticationError:

View file

@ -12,6 +12,7 @@
# Import python libs
import os
import sys
import re
import shutil
import yaml
from datetime import datetime
@ -77,6 +78,59 @@ class CallTest(integration.ShellCase, integration.ShellCaseCommonTestsMixIn):
self.assertIn(expected_comment, ''.join(stdout))
self.assertNotEqual(0, retcode)
@skipIf(sys.platform.startswith('win'), 'This test does not apply on Win')
def test_return(self):
config_dir = '/tmp/salttest'
minion_config_file = os.path.join(config_dir, 'minion')
minion_config = {
'id': 'minion_test_issue_2731',
'master': 'localhost',
'master_port': 64506,
'root_dir': '/tmp/salttest',
'pki_dir': 'pki',
'cachedir': 'cachedir',
'sock_dir': 'minion_sock',
'open_mode': True,
'log_file': '/tmp/salttest/minion_test_issue_2731',
'log_level': 'quiet',
'log_level_logfile': 'info'
}
# Remove existing logfile
if os.path.isfile('/tmp/salttest/minion_test_issue_2731'):
os.unlink('/tmp/salttest/minion_test_issue_2731')
# Let's first test with a master running
open(minion_config_file, 'w').write(
yaml.dump(minion_config, default_flow_style=False)
)
out = self.run_call('-c {0} cmd.run "echo returnTOmaster"'.format(
os.path.join(integration.INTEGRATION_TEST_DIR, 'files', 'conf')))
jobs = [a for a in self.run_run('-c {0} jobs.list_jobs'.format(
os.path.join(integration.INTEGRATION_TEST_DIR, 'files', 'conf')))]
self.assertTrue(True in ['returnTOmaster' in j for j in jobs])
# lookback jid
first_match = [(i, j)
for i, j in enumerate(jobs)
if 'returnTOmaster' in j][0]
jid, idx = None, first_match[0]
while idx > 0:
jid = re.match("('|\")([0-9]+)('|\"):", jobs[idx])
if jid:
jid = jid.group(2)
break
idx -= 1
assert idx > 0
assert jid
master_out = [
a for a in self.run_run('-c {0} jobs.lookup_jid {1}'.format(
os.path.join(integration.INTEGRATION_TEST_DIR,
'files',
'conf'),
jid))]
self.assertTrue(True in ['returnTOmaster' in a for a in master_out])
@skipIf(sys.platform.startswith('win'), 'This test does not apply on Win')
def test_issue_2731_masterless(self):
config_dir = '/tmp/salttest'