diff --git a/doc/topics/channels/index.rst b/doc/topics/channels/index.rst index 609d62e492c..414b18acfed 100644 --- a/doc/topics/channels/index.rst +++ b/doc/topics/channels/index.rst @@ -14,7 +14,7 @@ for sending and receiving messages. Pub Channel =========== -The pub channel, or publish channel, is how a master sends a job (payload) to a +The pub (or pubish) channel is how a master sends a job (payload) to a minion. This is a basic pub/sub paradigm, which has specific targeting semantics. All data which goes across the publish system should be encrypted such that only members of the Salt cluster can decrypt the published payloads. diff --git a/salt/transport/base.py b/salt/transport/base.py index 7063d46a67f..a063111954d 100644 --- a/salt/transport/base.py +++ b/salt/transport/base.py @@ -181,7 +181,7 @@ class DaemonizedPublishServer(PublishServer): **kwargs ): """ - If a deamon is needed to act as a broker impliment it here. + If a daemon is needed to act as a broker implement it here. :param func publish_payload: A method used to publish the payload :param func presence_callback: If the transport support presence diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 75566ccf978..1e6e337577a 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -621,19 +621,10 @@ class MessageClient: @salt.ext.tornado.gen.coroutine def getstream(self, **kwargs): if self.source_ip or self.source_port: - if salt.ext.tornado.version_info >= (4, 5): - ### source_ip and source_port are supported only in Tornado >= 4.5 - # See http://www.tornadoweb.org/en/stable/releases/v4.5.0.html - # Otherwise will just ignore these args - kwargs = { - "source_ip": self.source_ip, - "source_port": self.source_port, - } - else: - log.warning( - "If you need a certain source IP/port, consider upgrading" - " Tornado >= 4.5" - ) + kwargs = { + "source_ip": self.source_ip, + "source_port": self.source_port, + } stream = None while stream is None and not self._closed: try: @@ -815,380 +806,6 @@ class MessageClient: raise salt.ext.tornado.gen.Return(recv) -class xMessageClient: - """ - Low-level message sending client - """ - - def __init__( - self, - opts, - host, - port, - io_loop=None, - resolver=None, - connect_callback=None, - disconnect_callback=None, - source_ip=None, - source_port=None, - ): - self.opts = opts - self.host = host - self.port = port - self.source_ip = source_ip - self.source_port = source_port - self.connect_callback = connect_callback - self.disconnect_callback = disconnect_callback - - self.io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current() - - with salt.utils.asynchronous.current_ioloop(self.io_loop): - self._tcp_client = TCPClientKeepAlive(opts, resolver=resolver) - - self._mid = 1 - self._max_messages = int((1 << 31) - 2) # number of IDs before we wrap - - # TODO: max queue size - self.send_queue = [] # queue of messages to be sent - self.send_future_map = {} # mapping of request_id -> Future - self.send_timeout_map = {} # request_id -> timeout_callback - - self._read_until_future = None - self._on_recv = None - self._closing = False - self._connecting_future = self.connect() - self._stream_return_future = salt.ext.tornado.concurrent.Future() - self.io_loop.spawn_callback(self._stream_return) - - self.backoff = opts.get("tcp_reconnect_backoff", 1) - - def _stop_io_loop(self): - if self.io_loop is not None: - self.io_loop.stop() - - # TODO: timeout inflight sessions - def close(self): - if self._closing: - return - self._closing = True - if hasattr(self, "_stream") and not self._stream.closed(): - # If _stream_return() hasn't completed, it means the IO - # Loop is stopped (such as when using - # 'salt.utils.asynchronous.SyncWrapper'). Ensure that - # _stream_return() completes by restarting the IO Loop. - # This will prevent potential errors on shutdown. - try: - orig_loop = salt.ext.tornado.ioloop.IOLoop.current() - self.io_loop.make_current() - self._stream.close() - if self._read_until_future is not None: - # This will prevent this message from showing up: - # '[ERROR ] Future exception was never retrieved: - # StreamClosedError' - # This happens because the logic is always waiting to read - # the next message and the associated read future is marked - # 'StreamClosedError' when the stream is closed. - if self._read_until_future.done(): - self._read_until_future.exception() - if ( - self.io_loop - != salt.ext.tornado.ioloop.IOLoop.current(instance=False) - or not self._stream_return_future.done() - ): - self.io_loop.add_future( - self._stream_return_future, - lambda future: self._stop_io_loop(), - ) - self.io_loop.start() - except Exception as e: # pylint: disable=broad-except - log.info("Exception caught in SaltMessageClient.close: %s", str(e)) - finally: - orig_loop.make_current() - self._tcp_client.close() - self.io_loop = None - self._read_until_future = None - # Clear callback references to allow the object that they belong to - # to be deleted. - self.connect_callback = None - self.disconnect_callback = None - - # pylint: disable=W1701 - def __del__(self): - self.close() - - # pylint: enable=W1701 - - def connect(self): - """ - Ask for this client to reconnect to the origin - """ - if hasattr(self, "_connecting_future") and not self._connecting_future.done(): - future = self._connecting_future - else: - future = salt.ext.tornado.concurrent.Future() - self._connecting_future = future - self.io_loop.add_callback(self._connect) - - # Add the callback only when a new future is created - if self.connect_callback is not None: - - def handle_future(future): - response = future.result() - self.io_loop.add_callback(self.connect_callback, response) - - future.add_done_callback(handle_future) - - return future - - @salt.ext.tornado.gen.coroutine - def _connect(self): - """ - Try to connect for the rest of time! - """ - while True: - if self._closing: - break - try: - kwargs = {} - if self.source_ip or self.source_port: - if salt.ext.tornado.version_info >= (4, 5): - ### source_ip and source_port are supported only in Tornado >= 4.5 - # See http://www.tornadoweb.org/en/stable/releases/v4.5.0.html - # Otherwise will just ignore these args - kwargs = { - "source_ip": self.source_ip, - "source_port": self.source_port, - } - else: - log.warning( - "If you need a certain source IP/port, consider upgrading" - " Tornado >= 4.5" - ) - with salt.utils.asynchronous.current_ioloop(self.io_loop): - self._stream = yield self._tcp_client.connect( - self.host, self.port, ssl_options=self.opts.get("ssl"), **kwargs - ) - self._connecting_future.set_result(True) - break - except Exception as exc: # pylint: disable=broad-except - log.warning( - "TCP Message Client encountered an exception while connecting to" - " %s:%s: %r, will reconnect in %d seconds", - self.host, - self.port, - exc, - self.backoff, - ) - yield salt.ext.tornado.gen.sleep(self.backoff) - # self._connecting_future.set_exception(exc) - - @salt.ext.tornado.gen.coroutine - def _stream_return(self): - try: - while not self._closing and ( - not self._connecting_future.done() - or self._connecting_future.result() is not True - ): - yield self._connecting_future - unpacker = salt.utils.msgpack.Unpacker() - while not self._closing: - try: - self._read_until_future = self._stream.read_bytes( - 4096, partial=True - ) - wire_bytes = yield self._read_until_future - unpacker.feed(wire_bytes) - for framed_msg in unpacker: - framed_msg = salt.transport.frame.decode_embedded_strs( - framed_msg - ) - header = framed_msg["head"] - body = framed_msg["body"] - message_id = header.get("mid") - - if message_id in self.send_future_map: - self.send_future_map.pop(message_id).set_result(body) - self.remove_message_timeout(message_id) - else: - if self._on_recv is not None: - self.io_loop.spawn_callback(self._on_recv, header, body) - else: - log.error( - "Got response for message_id %s that we are not" - " tracking", - message_id, - ) - except salt.ext.tornado.iostream.StreamClosedError as e: - log.debug( - "tcp stream to %s:%s closed, unable to recv", - self.host, - self.port, - ) - for future in self.send_future_map.values(): - future.set_exception(e) - self.send_future_map = {} - if self._closing: - return - if self.disconnect_callback: - self.disconnect_callback() - # if the last connect finished, then we need to make a new one - if self._connecting_future.done(): - self._connecting_future = self.connect() - yield self._connecting_future - except TypeError: - # This is an invalid transport - if "detect_mode" in self.opts: - log.info( - "There was an error trying to use TCP transport; " - "attempting to fallback to another transport" - ) - else: - raise SaltClientError - except Exception as e: # pylint: disable=broad-except - log.error("Exception parsing response", exc_info=True) - for future in self.send_future_map.values(): - future.set_exception(e) - self.send_future_map = {} - if self._closing: - return - if self.disconnect_callback: - self.disconnect_callback() - # if the last connect finished, then we need to make a new one - if self._connecting_future.done(): - self._connecting_future = self.connect() - yield self._connecting_future - finally: - self._stream_return_future.set_result(True) - - @salt.ext.tornado.gen.coroutine - def _stream_send(self): - while ( - not self._connecting_future.done() - or self._connecting_future.result() is not True - ): - yield self._connecting_future - while len(self.send_queue) > 0: - message_id, item = self.send_queue[0] - try: - yield self._stream.write(item) - del self.send_queue[0] - # if the connection is dead, lets fail this send, and make sure we - # attempt to reconnect - except salt.ext.tornado.iostream.StreamClosedError as e: - if message_id in self.send_future_map: - self.send_future_map.pop(message_id).set_exception(e) - self.remove_message_timeout(message_id) - del self.send_queue[0] - if self._closing: - return - if self.disconnect_callback: - self.disconnect_callback() - # if the last connect finished, then we need to make a new one - if self._connecting_future.done(): - self._connecting_future = self.connect() - yield self._connecting_future - - def _message_id(self): - wrap = False - while self._mid in self.send_future_map: - if self._mid >= self._max_messages: - if wrap: - # this shouldn't ever happen, but just in case - raise Exception("Unable to find available messageid") - self._mid = 1 - wrap = True - else: - self._mid += 1 - - return self._mid - - # TODO: return a message object which takes care of multiplexing? - def on_recv(self, callback): - """ - Register a callback for received messages (that we didn't initiate) - """ - if callback is None: - self._on_recv = callback - else: - - def wrap_recv(header, body): - callback(body) - - self._on_recv = wrap_recv - - def remove_message_timeout(self, message_id): - if message_id not in self.send_timeout_map: - return - timeout = self.send_timeout_map.pop(message_id) - self.io_loop.remove_timeout(timeout) - - def timeout_message(self, message_id, msg): - if message_id in self.send_timeout_map: - del self.send_timeout_map[message_id] - if message_id in self.send_future_map: - future = self.send_future_map.pop(message_id) - # In a race condition the message might have been sent by the time - # we're timing it out. Make sure the future is not None - if future is not None: - if future.attempts < future.tries: - future.attempts += 1 - - log.debug( - "SaltReqTimeoutError, retrying. (%s/%s)", - future.attempts, - future.tries, - ) - self.send( - msg, - timeout=future.timeout, - tries=future.tries, - future=future, - ) - - else: - future.set_exception(SaltReqTimeoutError("Message timed out")) - - def send(self, msg, timeout=None, callback=None, raw=False, future=None, tries=3): - """ - Send given message, and return a future - """ - message_id = self._message_id() - header = {"mid": message_id} - - if future is None: - future = salt.ext.tornado.concurrent.Future() - future.tries = tries - future.attempts = 0 - future.timeout = timeout - - if callback is not None: - - def handle_future(future): - response = future.result() - self.io_loop.add_callback(callback, response) - - future.add_done_callback(handle_future) - # Add this future to the mapping - self.send_future_map[message_id] = future - - if self.opts.get("detect_mode") is True: - timeout = 1 - - if timeout is not None: - send_timeout = self.io_loop.call_later( - timeout, self.timeout_message, message_id, msg - ) - self.send_timeout_map[message_id] = send_timeout - - # if we don't have a send queue, we need to spawn the callback to do the sending - if len(self.send_queue) == 0: - self.io_loop.spawn_callback(self._stream_send) - self.send_queue.append( - (message_id, salt.transport.frame.frame_msg(msg, header=header)) - ) - return future - - class Subscriber: """ Client object for use with the TCP publisher server @@ -1299,7 +916,7 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer): if topic_list: for topic in topic_list: sent = False - for client in list(self.clients): + for client in self.clients: if topic == client.id_: try: # Write the packed str @@ -1307,7 +924,7 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer): sent = True # self.io_loop.add_future(f, lambda f: True) except salt.ext.tornado.iostream.StreamClosedError: - self.clients.remove(client) + to_remove.append(client) if not sent: log.debug("Publish target %s not connected %r", topic, self.clients) else: diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 2c591327d25..963dfd90b5e 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -402,6 +402,7 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer): """ context = zmq.Context(1) self._socket = context.socket(zmq.REP) + # Linger -1 means we'll never discard messages. self._socket.setsockopt(zmq.LINGER, -1) self._start_zmq_monitor() @@ -726,7 +727,7 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): ): """ This method represents the Publish Daemon process. It is intended to be - run inn a thread or process as it creates and runs an it's own ioloop. + run in a thread or process as it creates and runs an it's own ioloop. """ ioloop = salt.ext.tornado.ioloop.IOLoop() ioloop.make_current() diff --git a/salt/utils/thin.py b/salt/utils/thin.py index 7515251c2c7..5cbd29b5111 100644 --- a/salt/utils/thin.py +++ b/salt/utils/thin.py @@ -935,7 +935,7 @@ def gen_min( "salt/fileserver", "salt/fileserver/__init__.py", "salt/channel", - "salt/tchannel/__init__.py", + "salt/channel/__init__.py", "salt/channel/client.py", "salt/transport", # XXX Are the transport imports still needed? "salt/transport/__init__.py", diff --git a/tests/integration/files/file/base/Issues_55775.txt.bak b/tests/integration/files/file/base/Issues_55775.txt.bak deleted file mode 100644 index 257cc5642cb..00000000000 --- a/tests/integration/files/file/base/Issues_55775.txt.bak +++ /dev/null @@ -1 +0,0 @@ -foo