Address PR comments

This commit is contained in:
Daniel A. Wozniak 2021-12-22 15:09:59 -07:00 committed by Gareth J. Greenaway
parent 56a75eb553
commit 0ebd45a34d
6 changed files with 11 additions and 394 deletions

View file

@ -14,7 +14,7 @@ for sending and receiving messages.
Pub Channel
===========
The pub channel, or publish channel, is how a master sends a job (payload) to a
The pub (or pubish) channel is how a master sends a job (payload) to a
minion. This is a basic pub/sub paradigm, which has specific targeting semantics.
All data which goes across the publish system should be encrypted such that only
members of the Salt cluster can decrypt the published payloads.

View file

@ -181,7 +181,7 @@ class DaemonizedPublishServer(PublishServer):
**kwargs
):
"""
If a deamon is needed to act as a broker impliment it here.
If a daemon is needed to act as a broker implement it here.
:param func publish_payload: A method used to publish the payload
:param func presence_callback: If the transport support presence

View file

@ -621,19 +621,10 @@ class MessageClient:
@salt.ext.tornado.gen.coroutine
def getstream(self, **kwargs):
if self.source_ip or self.source_port:
if salt.ext.tornado.version_info >= (4, 5):
### source_ip and source_port are supported only in Tornado >= 4.5
# See http://www.tornadoweb.org/en/stable/releases/v4.5.0.html
# Otherwise will just ignore these args
kwargs = {
"source_ip": self.source_ip,
"source_port": self.source_port,
}
else:
log.warning(
"If you need a certain source IP/port, consider upgrading"
" Tornado >= 4.5"
)
kwargs = {
"source_ip": self.source_ip,
"source_port": self.source_port,
}
stream = None
while stream is None and not self._closed:
try:
@ -815,380 +806,6 @@ class MessageClient:
raise salt.ext.tornado.gen.Return(recv)
class xMessageClient:
"""
Low-level message sending client
"""
def __init__(
self,
opts,
host,
port,
io_loop=None,
resolver=None,
connect_callback=None,
disconnect_callback=None,
source_ip=None,
source_port=None,
):
self.opts = opts
self.host = host
self.port = port
self.source_ip = source_ip
self.source_port = source_port
self.connect_callback = connect_callback
self.disconnect_callback = disconnect_callback
self.io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current()
with salt.utils.asynchronous.current_ioloop(self.io_loop):
self._tcp_client = TCPClientKeepAlive(opts, resolver=resolver)
self._mid = 1
self._max_messages = int((1 << 31) - 2) # number of IDs before we wrap
# TODO: max queue size
self.send_queue = [] # queue of messages to be sent
self.send_future_map = {} # mapping of request_id -> Future
self.send_timeout_map = {} # request_id -> timeout_callback
self._read_until_future = None
self._on_recv = None
self._closing = False
self._connecting_future = self.connect()
self._stream_return_future = salt.ext.tornado.concurrent.Future()
self.io_loop.spawn_callback(self._stream_return)
self.backoff = opts.get("tcp_reconnect_backoff", 1)
def _stop_io_loop(self):
if self.io_loop is not None:
self.io_loop.stop()
# TODO: timeout inflight sessions
def close(self):
if self._closing:
return
self._closing = True
if hasattr(self, "_stream") and not self._stream.closed():
# If _stream_return() hasn't completed, it means the IO
# Loop is stopped (such as when using
# 'salt.utils.asynchronous.SyncWrapper'). Ensure that
# _stream_return() completes by restarting the IO Loop.
# This will prevent potential errors on shutdown.
try:
orig_loop = salt.ext.tornado.ioloop.IOLoop.current()
self.io_loop.make_current()
self._stream.close()
if self._read_until_future is not None:
# This will prevent this message from showing up:
# '[ERROR ] Future exception was never retrieved:
# StreamClosedError'
# This happens because the logic is always waiting to read
# the next message and the associated read future is marked
# 'StreamClosedError' when the stream is closed.
if self._read_until_future.done():
self._read_until_future.exception()
if (
self.io_loop
!= salt.ext.tornado.ioloop.IOLoop.current(instance=False)
or not self._stream_return_future.done()
):
self.io_loop.add_future(
self._stream_return_future,
lambda future: self._stop_io_loop(),
)
self.io_loop.start()
except Exception as e: # pylint: disable=broad-except
log.info("Exception caught in SaltMessageClient.close: %s", str(e))
finally:
orig_loop.make_current()
self._tcp_client.close()
self.io_loop = None
self._read_until_future = None
# Clear callback references to allow the object that they belong to
# to be deleted.
self.connect_callback = None
self.disconnect_callback = None
# pylint: disable=W1701
def __del__(self):
self.close()
# pylint: enable=W1701
def connect(self):
"""
Ask for this client to reconnect to the origin
"""
if hasattr(self, "_connecting_future") and not self._connecting_future.done():
future = self._connecting_future
else:
future = salt.ext.tornado.concurrent.Future()
self._connecting_future = future
self.io_loop.add_callback(self._connect)
# Add the callback only when a new future is created
if self.connect_callback is not None:
def handle_future(future):
response = future.result()
self.io_loop.add_callback(self.connect_callback, response)
future.add_done_callback(handle_future)
return future
@salt.ext.tornado.gen.coroutine
def _connect(self):
"""
Try to connect for the rest of time!
"""
while True:
if self._closing:
break
try:
kwargs = {}
if self.source_ip or self.source_port:
if salt.ext.tornado.version_info >= (4, 5):
### source_ip and source_port are supported only in Tornado >= 4.5
# See http://www.tornadoweb.org/en/stable/releases/v4.5.0.html
# Otherwise will just ignore these args
kwargs = {
"source_ip": self.source_ip,
"source_port": self.source_port,
}
else:
log.warning(
"If you need a certain source IP/port, consider upgrading"
" Tornado >= 4.5"
)
with salt.utils.asynchronous.current_ioloop(self.io_loop):
self._stream = yield self._tcp_client.connect(
self.host, self.port, ssl_options=self.opts.get("ssl"), **kwargs
)
self._connecting_future.set_result(True)
break
except Exception as exc: # pylint: disable=broad-except
log.warning(
"TCP Message Client encountered an exception while connecting to"
" %s:%s: %r, will reconnect in %d seconds",
self.host,
self.port,
exc,
self.backoff,
)
yield salt.ext.tornado.gen.sleep(self.backoff)
# self._connecting_future.set_exception(exc)
@salt.ext.tornado.gen.coroutine
def _stream_return(self):
try:
while not self._closing and (
not self._connecting_future.done()
or self._connecting_future.result() is not True
):
yield self._connecting_future
unpacker = salt.utils.msgpack.Unpacker()
while not self._closing:
try:
self._read_until_future = self._stream.read_bytes(
4096, partial=True
)
wire_bytes = yield self._read_until_future
unpacker.feed(wire_bytes)
for framed_msg in unpacker:
framed_msg = salt.transport.frame.decode_embedded_strs(
framed_msg
)
header = framed_msg["head"]
body = framed_msg["body"]
message_id = header.get("mid")
if message_id in self.send_future_map:
self.send_future_map.pop(message_id).set_result(body)
self.remove_message_timeout(message_id)
else:
if self._on_recv is not None:
self.io_loop.spawn_callback(self._on_recv, header, body)
else:
log.error(
"Got response for message_id %s that we are not"
" tracking",
message_id,
)
except salt.ext.tornado.iostream.StreamClosedError as e:
log.debug(
"tcp stream to %s:%s closed, unable to recv",
self.host,
self.port,
)
for future in self.send_future_map.values():
future.set_exception(e)
self.send_future_map = {}
if self._closing:
return
if self.disconnect_callback:
self.disconnect_callback()
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
yield self._connecting_future
except TypeError:
# This is an invalid transport
if "detect_mode" in self.opts:
log.info(
"There was an error trying to use TCP transport; "
"attempting to fallback to another transport"
)
else:
raise SaltClientError
except Exception as e: # pylint: disable=broad-except
log.error("Exception parsing response", exc_info=True)
for future in self.send_future_map.values():
future.set_exception(e)
self.send_future_map = {}
if self._closing:
return
if self.disconnect_callback:
self.disconnect_callback()
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
yield self._connecting_future
finally:
self._stream_return_future.set_result(True)
@salt.ext.tornado.gen.coroutine
def _stream_send(self):
while (
not self._connecting_future.done()
or self._connecting_future.result() is not True
):
yield self._connecting_future
while len(self.send_queue) > 0:
message_id, item = self.send_queue[0]
try:
yield self._stream.write(item)
del self.send_queue[0]
# if the connection is dead, lets fail this send, and make sure we
# attempt to reconnect
except salt.ext.tornado.iostream.StreamClosedError as e:
if message_id in self.send_future_map:
self.send_future_map.pop(message_id).set_exception(e)
self.remove_message_timeout(message_id)
del self.send_queue[0]
if self._closing:
return
if self.disconnect_callback:
self.disconnect_callback()
# if the last connect finished, then we need to make a new one
if self._connecting_future.done():
self._connecting_future = self.connect()
yield self._connecting_future
def _message_id(self):
wrap = False
while self._mid in self.send_future_map:
if self._mid >= self._max_messages:
if wrap:
# this shouldn't ever happen, but just in case
raise Exception("Unable to find available messageid")
self._mid = 1
wrap = True
else:
self._mid += 1
return self._mid
# TODO: return a message object which takes care of multiplexing?
def on_recv(self, callback):
"""
Register a callback for received messages (that we didn't initiate)
"""
if callback is None:
self._on_recv = callback
else:
def wrap_recv(header, body):
callback(body)
self._on_recv = wrap_recv
def remove_message_timeout(self, message_id):
if message_id not in self.send_timeout_map:
return
timeout = self.send_timeout_map.pop(message_id)
self.io_loop.remove_timeout(timeout)
def timeout_message(self, message_id, msg):
if message_id in self.send_timeout_map:
del self.send_timeout_map[message_id]
if message_id in self.send_future_map:
future = self.send_future_map.pop(message_id)
# In a race condition the message might have been sent by the time
# we're timing it out. Make sure the future is not None
if future is not None:
if future.attempts < future.tries:
future.attempts += 1
log.debug(
"SaltReqTimeoutError, retrying. (%s/%s)",
future.attempts,
future.tries,
)
self.send(
msg,
timeout=future.timeout,
tries=future.tries,
future=future,
)
else:
future.set_exception(SaltReqTimeoutError("Message timed out"))
def send(self, msg, timeout=None, callback=None, raw=False, future=None, tries=3):
"""
Send given message, and return a future
"""
message_id = self._message_id()
header = {"mid": message_id}
if future is None:
future = salt.ext.tornado.concurrent.Future()
future.tries = tries
future.attempts = 0
future.timeout = timeout
if callback is not None:
def handle_future(future):
response = future.result()
self.io_loop.add_callback(callback, response)
future.add_done_callback(handle_future)
# Add this future to the mapping
self.send_future_map[message_id] = future
if self.opts.get("detect_mode") is True:
timeout = 1
if timeout is not None:
send_timeout = self.io_loop.call_later(
timeout, self.timeout_message, message_id, msg
)
self.send_timeout_map[message_id] = send_timeout
# if we don't have a send queue, we need to spawn the callback to do the sending
if len(self.send_queue) == 0:
self.io_loop.spawn_callback(self._stream_send)
self.send_queue.append(
(message_id, salt.transport.frame.frame_msg(msg, header=header))
)
return future
class Subscriber:
"""
Client object for use with the TCP publisher server
@ -1299,7 +916,7 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer):
if topic_list:
for topic in topic_list:
sent = False
for client in list(self.clients):
for client in self.clients:
if topic == client.id_:
try:
# Write the packed str
@ -1307,7 +924,7 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer):
sent = True
# self.io_loop.add_future(f, lambda f: True)
except salt.ext.tornado.iostream.StreamClosedError:
self.clients.remove(client)
to_remove.append(client)
if not sent:
log.debug("Publish target %s not connected %r", topic, self.clients)
else:

View file

@ -402,6 +402,7 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer):
"""
context = zmq.Context(1)
self._socket = context.socket(zmq.REP)
# Linger -1 means we'll never discard messages.
self._socket.setsockopt(zmq.LINGER, -1)
self._start_zmq_monitor()
@ -726,7 +727,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
):
"""
This method represents the Publish Daemon process. It is intended to be
run inn a thread or process as it creates and runs an it's own ioloop.
run in a thread or process as it creates and runs an it's own ioloop.
"""
ioloop = salt.ext.tornado.ioloop.IOLoop()
ioloop.make_current()

View file

@ -935,7 +935,7 @@ def gen_min(
"salt/fileserver",
"salt/fileserver/__init__.py",
"salt/channel",
"salt/tchannel/__init__.py",
"salt/channel/__init__.py",
"salt/channel/client.py",
"salt/transport", # XXX Are the transport imports still needed?
"salt/transport/__init__.py",