mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge pull request #37115 from DSRCorporation/backport/36720_fix_race_condition
Backport/36720 fix race condition
This commit is contained in:
commit
274120300d
3 changed files with 178 additions and 117 deletions
|
@ -278,6 +278,7 @@ class LocalClient(object):
|
|||
timeout=None,
|
||||
jid='',
|
||||
kwarg=None,
|
||||
listen=False,
|
||||
**kwargs):
|
||||
'''
|
||||
Asynchronously send a command to connected minions
|
||||
|
@ -304,6 +305,7 @@ class LocalClient(object):
|
|||
ret,
|
||||
jid=jid,
|
||||
timeout=self._get_timeout(timeout),
|
||||
listen=listen,
|
||||
**kwargs)
|
||||
except SaltClientError:
|
||||
# Re-raise error with specific message
|
||||
|
@ -552,32 +554,38 @@ class LocalClient(object):
|
|||
function name.
|
||||
'''
|
||||
arg = salt.utils.args.condition_input(arg, kwarg)
|
||||
pub_data = self.run_job(tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
jid,
|
||||
**kwargs)
|
||||
was_listening = self.event.cpub
|
||||
|
||||
if not pub_data:
|
||||
return pub_data
|
||||
try:
|
||||
pub_data = self.run_job(tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
jid,
|
||||
listen=True,
|
||||
**kwargs)
|
||||
|
||||
ret = {}
|
||||
for fn_ret in self.get_cli_event_returns(
|
||||
pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
self._get_timeout(timeout),
|
||||
tgt,
|
||||
expr_form,
|
||||
**kwargs):
|
||||
if not pub_data:
|
||||
return pub_data
|
||||
|
||||
if fn_ret:
|
||||
for mid, data in six.iteritems(fn_ret):
|
||||
ret[mid] = data.get('ret', {})
|
||||
ret = {}
|
||||
for fn_ret in self.get_cli_event_returns(
|
||||
pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
self._get_timeout(timeout),
|
||||
tgt,
|
||||
expr_form,
|
||||
**kwargs):
|
||||
|
||||
return ret
|
||||
if fn_ret:
|
||||
for mid, data in six.iteritems(fn_ret):
|
||||
ret[mid] = data.get('ret', {})
|
||||
return ret
|
||||
finally:
|
||||
if not was_listening:
|
||||
self.event.close_pub()
|
||||
|
||||
def cmd_cli(
|
||||
self,
|
||||
|
@ -602,44 +610,47 @@ class LocalClient(object):
|
|||
:returns: A generator
|
||||
'''
|
||||
arg = salt.utils.args.condition_input(arg, kwarg)
|
||||
self.pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
**kwargs)
|
||||
was_listening = self.event.cpub
|
||||
|
||||
if not self.pub_data:
|
||||
yield self.pub_data
|
||||
else:
|
||||
try:
|
||||
for fn_ret in self.get_cli_event_returns(
|
||||
self.pub_data['jid'],
|
||||
self.pub_data['minions'],
|
||||
self._get_timeout(timeout),
|
||||
tgt,
|
||||
expr_form,
|
||||
verbose,
|
||||
progress,
|
||||
**kwargs):
|
||||
try:
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
listen=True,
|
||||
**kwargs)
|
||||
|
||||
if not fn_ret:
|
||||
continue
|
||||
if not pub_data:
|
||||
yield pub_data
|
||||
else:
|
||||
try:
|
||||
for fn_ret in self.get_cli_event_returns(
|
||||
pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
self._get_timeout(timeout),
|
||||
tgt,
|
||||
expr_form,
|
||||
verbose,
|
||||
progress,
|
||||
**kwargs):
|
||||
|
||||
yield fn_ret
|
||||
except KeyboardInterrupt:
|
||||
raise SystemExit(
|
||||
'\n'
|
||||
'This job\'s jid is: {0}\n'
|
||||
'Exiting gracefully on Ctrl-c\n'
|
||||
'The minions may not have all finished running and any '
|
||||
'remaining minions will return upon completion. To look '
|
||||
'up the return data for this job later, run the following '
|
||||
'command:\n\n'
|
||||
'salt-run jobs.lookup_jid {0}'.format(self.pub_data['jid'])
|
||||
)
|
||||
if not fn_ret:
|
||||
continue
|
||||
|
||||
yield fn_ret
|
||||
except KeyboardInterrupt:
|
||||
msg = ('Exiting on Ctrl-C\nThis job\'s jid is:\n{0}\n'
|
||||
'The minions may not have all finished running and any '
|
||||
'remaining minions will return upon completion. To '
|
||||
'look up the return data for this job later run:\n'
|
||||
'salt-run jobs.lookup_jid {0}').format(pub_data['jid'])
|
||||
raise SystemExit(msg)
|
||||
finally:
|
||||
if not was_listening:
|
||||
self.event.close_pub()
|
||||
|
||||
def cmd_iter(
|
||||
self,
|
||||
|
@ -669,28 +680,37 @@ class LocalClient(object):
|
|||
{'stewart': {'ret': True}}
|
||||
'''
|
||||
arg = salt.utils.args.condition_input(arg, kwarg)
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
**kwargs)
|
||||
was_listening = self.event.cpub
|
||||
|
||||
if not pub_data:
|
||||
yield pub_data
|
||||
else:
|
||||
for fn_ret in self.get_iter_returns(pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
timeout=self._get_timeout(timeout),
|
||||
tgt=tgt,
|
||||
tgt_type=expr_form,
|
||||
**kwargs):
|
||||
if not fn_ret:
|
||||
continue
|
||||
yield fn_ret
|
||||
self._clean_up_subscriptions(pub_data['jid'])
|
||||
try:
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
listen=True,
|
||||
**kwargs)
|
||||
|
||||
if not pub_data:
|
||||
yield pub_data
|
||||
else:
|
||||
if kwargs.get('yield_pub_data'):
|
||||
yield pub_data
|
||||
for fn_ret in self.get_iter_returns(pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
timeout=self._get_timeout(timeout),
|
||||
tgt=tgt,
|
||||
tgt_type=expr_form,
|
||||
**kwargs):
|
||||
if not fn_ret:
|
||||
continue
|
||||
yield fn_ret
|
||||
self._clean_up_subscriptions(pub_data['jid'])
|
||||
finally:
|
||||
if not was_listening:
|
||||
self.event.close_pub()
|
||||
|
||||
def cmd_iter_no_block(
|
||||
self,
|
||||
|
@ -727,31 +747,38 @@ class LocalClient(object):
|
|||
{'stewart': {'ret': True}}
|
||||
'''
|
||||
arg = salt.utils.args.condition_input(arg, kwarg)
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
**kwargs)
|
||||
was_listening = self.event.cpub
|
||||
|
||||
if not pub_data:
|
||||
yield pub_data
|
||||
else:
|
||||
for fn_ret in self.get_iter_returns(pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
timeout=timeout,
|
||||
tgt=tgt,
|
||||
tgt_type=expr_form,
|
||||
block=False,
|
||||
**kwargs):
|
||||
if fn_ret and any([show_jid, verbose]):
|
||||
for minion in fn_ret.keys():
|
||||
fn_ret[minion]['jid'] = pub_data['jid']
|
||||
yield fn_ret
|
||||
try:
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
listen=True,
|
||||
**kwargs)
|
||||
|
||||
self._clean_up_subscriptions(pub_data['jid'])
|
||||
if not pub_data:
|
||||
yield pub_data
|
||||
else:
|
||||
for fn_ret in self.get_iter_returns(pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
timeout=timeout,
|
||||
tgt=tgt,
|
||||
tgt_type=expr_form,
|
||||
block=False,
|
||||
**kwargs):
|
||||
if fn_ret and any([show_jid, verbose]):
|
||||
for minion in fn_ret.keys():
|
||||
fn_ret[minion]['jid'] = pub_data['jid']
|
||||
yield fn_ret
|
||||
|
||||
self._clean_up_subscriptions(pub_data['jid'])
|
||||
finally:
|
||||
if not was_listening:
|
||||
self.event.close_pub()
|
||||
|
||||
def cmd_full_return(
|
||||
self,
|
||||
|
@ -768,24 +795,31 @@ class LocalClient(object):
|
|||
Execute a salt command and return
|
||||
'''
|
||||
arg = salt.utils.args.condition_input(arg, kwarg)
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
**kwargs)
|
||||
was_listening = self.event.cpub
|
||||
|
||||
if not pub_data:
|
||||
return pub_data
|
||||
try:
|
||||
pub_data = self.run_job(
|
||||
tgt,
|
||||
fun,
|
||||
arg,
|
||||
expr_form,
|
||||
ret,
|
||||
timeout,
|
||||
listen=True,
|
||||
**kwargs)
|
||||
|
||||
return (self.get_cli_static_event_returns(pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
timeout,
|
||||
tgt,
|
||||
expr_form,
|
||||
verbose))
|
||||
if not pub_data:
|
||||
return pub_data
|
||||
|
||||
return (self.get_cli_static_event_returns(pub_data['jid'],
|
||||
pub_data['minions'],
|
||||
timeout,
|
||||
tgt,
|
||||
expr_form,
|
||||
verbose))
|
||||
finally:
|
||||
if not was_listening:
|
||||
self.event.close_pub()
|
||||
|
||||
def get_cli_returns(
|
||||
self,
|
||||
|
@ -1449,6 +1483,7 @@ class LocalClient(object):
|
|||
ret='',
|
||||
jid='',
|
||||
timeout=5,
|
||||
listen=False,
|
||||
**kwargs):
|
||||
'''
|
||||
Take the required arguments and publish the given command.
|
||||
|
@ -1498,6 +1533,10 @@ class LocalClient(object):
|
|||
master_uri=master_uri)
|
||||
|
||||
try:
|
||||
# Ensure that the event subscriber is connected.
|
||||
# If not, we won't get a response, so error out
|
||||
if listen and not self.event.connect_pub(timeout=timeout):
|
||||
raise SaltReqTimeoutError()
|
||||
payload = channel.send(payload_kwargs, timeout=timeout)
|
||||
except SaltReqTimeoutError:
|
||||
raise SaltReqTimeoutError(
|
||||
|
|
|
@ -344,6 +344,16 @@ class IPCClient(object):
|
|||
if self.stream is not None and not self.stream.closed():
|
||||
self.stream.close()
|
||||
|
||||
# Remove the entry from the instance map so
|
||||
# that a closed entry may not be reused.
|
||||
# This forces this operation even if the reference
|
||||
# count of the entry has not yet gone to zero.
|
||||
if self.io_loop in IPCClient.instance_map:
|
||||
loop_instance_map = IPCClient.instance_map[self.io_loop]
|
||||
key = str(self.socket_path)
|
||||
if key in loop_instance_map:
|
||||
del loop_instance_map[key]
|
||||
|
||||
|
||||
class IPCMessageClient(IPCClient):
|
||||
'''
|
||||
|
|
|
@ -345,6 +345,18 @@ class SaltEvent(object):
|
|||
self.cpub = True
|
||||
return self.cpub
|
||||
|
||||
def close_pub(self):
|
||||
'''
|
||||
Close the publish connection (if established)
|
||||
'''
|
||||
if not self.cpub:
|
||||
return
|
||||
|
||||
self.subscriber.close()
|
||||
self.subscriber = None
|
||||
self.pending_events = []
|
||||
self.cpub = False
|
||||
|
||||
def connect_pull(self, timeout=1):
|
||||
'''
|
||||
Establish a connection with the event pull socket
|
||||
|
|
Loading…
Add table
Reference in a new issue