mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
parent
a69b124aaa
commit
70b5ae9b1d
1 changed files with 35 additions and 8 deletions
|
@ -150,7 +150,11 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
|
|||
# Return controle back to the caller, continue when authentication succeeds
|
||||
yield self.auth.authenticate()
|
||||
# Return control to the caller. When send() completes, resume by populating ret with the Future.result
|
||||
ret = yield self.message_client.send(self._package_load(self.auth.crypticle.dumps(load)), timeout=timeout)
|
||||
ret = yield self.message_client.send(
|
||||
self._package_load(self.auth.crypticle.dumps(load)),
|
||||
timeout=timeout,
|
||||
tries=tries,
|
||||
)
|
||||
key = self.auth.get_keys()
|
||||
cipher = PKCS1_OAEP.new(key)
|
||||
aes = cipher.decrypt(ret['key'])
|
||||
|
@ -175,9 +179,11 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
|
|||
@tornado.gen.coroutine
|
||||
def _do_transfer():
|
||||
# Yield control to the caller. When send() completes, resume by populating data with the Future.result
|
||||
data = yield self.message_client.send(self._package_load(self.auth.crypticle.dumps(load)),
|
||||
timeout=timeout,
|
||||
)
|
||||
data = yield self.message_client.send(
|
||||
self._package_load(self.auth.crypticle.dumps(load)),
|
||||
timeout=timeout,
|
||||
tries=tries,
|
||||
)
|
||||
# we may not have always data
|
||||
# as for example for saltcall ret submission, this is a blind
|
||||
# communication, we do not subscribe to return events, we just
|
||||
|
@ -206,7 +212,12 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
|
|||
:param int tries: The number of times to make before failure
|
||||
:param int timeout: The number of seconds on a response before failing
|
||||
'''
|
||||
ret = yield self.message_client.send(self._package_load(load), timeout=timeout)
|
||||
ret = yield self.message_client.send(
|
||||
self._package_load(load),
|
||||
timeout=timeout,
|
||||
tries=tries,
|
||||
)
|
||||
|
||||
raise tornado.gen.Return(ret)
|
||||
|
||||
@tornado.gen.coroutine
|
||||
|
@ -818,15 +829,31 @@ class AsyncReqMessageClient(object):
|
|||
|
||||
:raises: SaltReqTimeoutError
|
||||
'''
|
||||
future = self.send_future_map.pop(message)
|
||||
del self.send_timeout_map[message]
|
||||
self.send_future_map.pop(message).set_exception(SaltReqTimeoutError('Message timed out'))
|
||||
if future.attempts < future.tries:
|
||||
future.attempts += 1
|
||||
log.debug('SaltReqTimeoutError, retrying. ({0}/{1})'.format(future.attempts, future.tries))
|
||||
self.send(
|
||||
message,
|
||||
timeout=future.timeout,
|
||||
tries=future.tries,
|
||||
future=future,
|
||||
)
|
||||
|
||||
def send(self, message, timeout=None, callback=None):
|
||||
else:
|
||||
future.set_exception(SaltReqTimeoutError('Message timed out'))
|
||||
|
||||
def send(self, message, timeout=None, tries=3, future=None, callback=None):
|
||||
'''
|
||||
Return a future which will be completed when the message has a response
|
||||
'''
|
||||
message = self.serial.dumps(message)
|
||||
future = tornado.concurrent.Future()
|
||||
if future is None:
|
||||
future = tornado.concurrent.Future()
|
||||
future.tries = tries
|
||||
future.attempts = 0
|
||||
future.timeout = timeout
|
||||
if callback is not None:
|
||||
def handle_future(future):
|
||||
response = future.result()
|
||||
|
|
Loading…
Add table
Reference in a new issue