Merge pull request #28410 from jacksontj/2015.8

Add retries to the zeromq.AsyncReqMessageClient
This commit is contained in:
Mike Place 2015-10-29 12:05:50 -06:00
commit 7ead823731

View file

@ -150,7 +150,11 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
# Return controle back to the caller, continue when authentication succeeds
yield self.auth.authenticate()
# Return control to the caller. When send() completes, resume by populating ret with the Future.result
ret = yield self.message_client.send(self._package_load(self.auth.crypticle.dumps(load)), timeout=timeout)
ret = yield self.message_client.send(
self._package_load(self.auth.crypticle.dumps(load)),
timeout=timeout,
tries=tries,
)
key = self.auth.get_keys()
cipher = PKCS1_OAEP.new(key)
aes = cipher.decrypt(ret['key'])
@ -175,9 +179,11 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
@tornado.gen.coroutine
def _do_transfer():
# Yield control to the caller. When send() completes, resume by populating data with the Future.result
data = yield self.message_client.send(self._package_load(self.auth.crypticle.dumps(load)),
timeout=timeout,
)
data = yield self.message_client.send(
self._package_load(self.auth.crypticle.dumps(load)),
timeout=timeout,
tries=tries,
)
# we may not have always data
# as for example for saltcall ret submission, this is a blind
# communication, we do not subscribe to return events, we just
@ -206,7 +212,12 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
:param int tries: The number of times to make before failure
:param int timeout: The number of seconds on a response before failing
'''
ret = yield self.message_client.send(self._package_load(load), timeout=timeout)
ret = yield self.message_client.send(
self._package_load(load),
timeout=timeout,
tries=tries,
)
raise tornado.gen.Return(ret)
@tornado.gen.coroutine
@ -818,15 +829,31 @@ class AsyncReqMessageClient(object):
:raises: SaltReqTimeoutError
'''
future = self.send_future_map.pop(message)
del self.send_timeout_map[message]
self.send_future_map.pop(message).set_exception(SaltReqTimeoutError('Message timed out'))
if future.attempts < future.tries:
future.attempts += 1
log.debug('SaltReqTimeoutError, retrying. ({0}/{1})'.format(future.attempts, future.tries))
self.send(
message,
timeout=future.timeout,
tries=future.tries,
future=future,
)
def send(self, message, timeout=None, callback=None):
else:
future.set_exception(SaltReqTimeoutError('Message timed out'))
def send(self, message, timeout=None, tries=3, future=None, callback=None):
'''
Return a future which will be completed when the message has a response
'''
message = self.serial.dumps(message)
future = tornado.concurrent.Future()
if future is None:
future = tornado.concurrent.Future()
future.tries = tries
future.attempts = 0
future.timeout = timeout
if callback is not None:
def handle_future(future):
response = future.result()