Merge pull request #37115 from DSRCorporation/backport/36720_fix_race_condition

Backport/36720 fix race condition
This commit is contained in:
Mike Place 2016-10-21 14:16:15 +09:00 committed by GitHub
commit 274120300d
3 changed files with 178 additions and 117 deletions

View file

@ -278,6 +278,7 @@ class LocalClient(object):
timeout=None,
jid='',
kwarg=None,
listen=False,
**kwargs):
'''
Asynchronously send a command to connected minions
@ -304,6 +305,7 @@ class LocalClient(object):
ret,
jid=jid,
timeout=self._get_timeout(timeout),
listen=listen,
**kwargs)
except SaltClientError:
# Re-raise error with specific message
@ -552,32 +554,38 @@ class LocalClient(object):
function name.
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(tgt,
fun,
arg,
expr_form,
ret,
timeout,
jid,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
return pub_data
try:
pub_data = self.run_job(tgt,
fun,
arg,
expr_form,
ret,
timeout,
jid,
listen=True,
**kwargs)
ret = {}
for fn_ret in self.get_cli_event_returns(
pub_data['jid'],
pub_data['minions'],
self._get_timeout(timeout),
tgt,
expr_form,
**kwargs):
if not pub_data:
return pub_data
if fn_ret:
for mid, data in six.iteritems(fn_ret):
ret[mid] = data.get('ret', {})
ret = {}
for fn_ret in self.get_cli_event_returns(
pub_data['jid'],
pub_data['minions'],
self._get_timeout(timeout),
tgt,
expr_form,
**kwargs):
return ret
if fn_ret:
for mid, data in six.iteritems(fn_ret):
ret[mid] = data.get('ret', {})
return ret
finally:
if not was_listening:
self.event.close_pub()
def cmd_cli(
self,
@ -602,44 +610,47 @@ class LocalClient(object):
:returns: A generator
'''
arg = salt.utils.args.condition_input(arg, kwarg)
self.pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
**kwargs)
was_listening = self.event.cpub
if not self.pub_data:
yield self.pub_data
else:
try:
for fn_ret in self.get_cli_event_returns(
self.pub_data['jid'],
self.pub_data['minions'],
self._get_timeout(timeout),
tgt,
expr_form,
verbose,
progress,
**kwargs):
try:
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
listen=True,
**kwargs)
if not fn_ret:
continue
if not pub_data:
yield pub_data
else:
try:
for fn_ret in self.get_cli_event_returns(
pub_data['jid'],
pub_data['minions'],
self._get_timeout(timeout),
tgt,
expr_form,
verbose,
progress,
**kwargs):
yield fn_ret
except KeyboardInterrupt:
raise SystemExit(
'\n'
'This job\'s jid is: {0}\n'
'Exiting gracefully on Ctrl-c\n'
'The minions may not have all finished running and any '
'remaining minions will return upon completion. To look '
'up the return data for this job later, run the following '
'command:\n\n'
'salt-run jobs.lookup_jid {0}'.format(self.pub_data['jid'])
)
if not fn_ret:
continue
yield fn_ret
except KeyboardInterrupt:
msg = ('Exiting on Ctrl-C\nThis job\'s jid is:\n{0}\n'
'The minions may not have all finished running and any '
'remaining minions will return upon completion. To '
'look up the return data for this job later run:\n'
'salt-run jobs.lookup_jid {0}').format(pub_data['jid'])
raise SystemExit(msg)
finally:
if not was_listening:
self.event.close_pub()
def cmd_iter(
self,
@ -669,28 +680,37 @@ class LocalClient(object):
{'stewart': {'ret': True}}
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
yield pub_data
else:
for fn_ret in self.get_iter_returns(pub_data['jid'],
pub_data['minions'],
timeout=self._get_timeout(timeout),
tgt=tgt,
tgt_type=expr_form,
**kwargs):
if not fn_ret:
continue
yield fn_ret
self._clean_up_subscriptions(pub_data['jid'])
try:
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
listen=True,
**kwargs)
if not pub_data:
yield pub_data
else:
if kwargs.get('yield_pub_data'):
yield pub_data
for fn_ret in self.get_iter_returns(pub_data['jid'],
pub_data['minions'],
timeout=self._get_timeout(timeout),
tgt=tgt,
tgt_type=expr_form,
**kwargs):
if not fn_ret:
continue
yield fn_ret
self._clean_up_subscriptions(pub_data['jid'])
finally:
if not was_listening:
self.event.close_pub()
def cmd_iter_no_block(
self,
@ -727,31 +747,38 @@ class LocalClient(object):
{'stewart': {'ret': True}}
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
yield pub_data
else:
for fn_ret in self.get_iter_returns(pub_data['jid'],
pub_data['minions'],
timeout=timeout,
tgt=tgt,
tgt_type=expr_form,
block=False,
**kwargs):
if fn_ret and any([show_jid, verbose]):
for minion in fn_ret.keys():
fn_ret[minion]['jid'] = pub_data['jid']
yield fn_ret
try:
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
listen=True,
**kwargs)
self._clean_up_subscriptions(pub_data['jid'])
if not pub_data:
yield pub_data
else:
for fn_ret in self.get_iter_returns(pub_data['jid'],
pub_data['minions'],
timeout=timeout,
tgt=tgt,
tgt_type=expr_form,
block=False,
**kwargs):
if fn_ret and any([show_jid, verbose]):
for minion in fn_ret.keys():
fn_ret[minion]['jid'] = pub_data['jid']
yield fn_ret
self._clean_up_subscriptions(pub_data['jid'])
finally:
if not was_listening:
self.event.close_pub()
def cmd_full_return(
self,
@ -768,24 +795,31 @@ class LocalClient(object):
Execute a salt command and return
'''
arg = salt.utils.args.condition_input(arg, kwarg)
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
**kwargs)
was_listening = self.event.cpub
if not pub_data:
return pub_data
try:
pub_data = self.run_job(
tgt,
fun,
arg,
expr_form,
ret,
timeout,
listen=True,
**kwargs)
return (self.get_cli_static_event_returns(pub_data['jid'],
pub_data['minions'],
timeout,
tgt,
expr_form,
verbose))
if not pub_data:
return pub_data
return (self.get_cli_static_event_returns(pub_data['jid'],
pub_data['minions'],
timeout,
tgt,
expr_form,
verbose))
finally:
if not was_listening:
self.event.close_pub()
def get_cli_returns(
self,
@ -1449,6 +1483,7 @@ class LocalClient(object):
ret='',
jid='',
timeout=5,
listen=False,
**kwargs):
'''
Take the required arguments and publish the given command.
@ -1498,6 +1533,10 @@ class LocalClient(object):
master_uri=master_uri)
try:
# Ensure that the event subscriber is connected.
# If not, we won't get a response, so error out
if listen and not self.event.connect_pub(timeout=timeout):
raise SaltReqTimeoutError()
payload = channel.send(payload_kwargs, timeout=timeout)
except SaltReqTimeoutError:
raise SaltReqTimeoutError(

View file

@ -344,6 +344,16 @@ class IPCClient(object):
if self.stream is not None and not self.stream.closed():
self.stream.close()
# Remove the entry from the instance map so
# that a closed entry may not be reused.
# This forces this operation even if the reference
# count of the entry has not yet gone to zero.
if self.io_loop in IPCClient.instance_map:
loop_instance_map = IPCClient.instance_map[self.io_loop]
key = str(self.socket_path)
if key in loop_instance_map:
del loop_instance_map[key]
class IPCMessageClient(IPCClient):
'''

View file

@ -345,6 +345,18 @@ class SaltEvent(object):
self.cpub = True
return self.cpub
def close_pub(self):
'''
Close the publish connection (if established)
'''
if not self.cpub:
return
self.subscriber.close()
self.subscriber = None
self.pending_events = []
self.cpub = False
def connect_pull(self, timeout=1):
'''
Establish a connection with the event pull socket