From 027a29fd0a3b7a59722e4b7f9637e5e15dab2b6b Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Wed, 28 Jun 2023 17:12:28 -0700 Subject: [PATCH] Clean up pre-commit --- salt/channel/client.py | 5 +- salt/transport/base.py | 4 +- salt/transport/ipc.py | 2 + salt/transport/tcp.py | 29 +++----- salt/transport/zeromq.py | 66 ++++++++--------- tests/pytests/unit/test_master.py | 70 ++++++++++--------- .../unit/transport/test_publish_client.py | 9 ++- 7 files changed, 90 insertions(+), 95 deletions(-) diff --git a/salt/channel/client.py b/salt/channel/client.py index d0040c80414..8d029470686 100644 --- a/salt/channel/client.py +++ b/salt/channel/client.py @@ -22,6 +22,7 @@ import salt.utils.files import salt.utils.minions import salt.utils.stringutils import salt.utils.verify +import salt.utils.versions from salt.utils.asynchronous import SyncWrapper try: @@ -643,7 +644,7 @@ class AsyncPushChannel: """ # FIXME for now, just UXD # Obviously, this makes the factory approach pointless, but we'll extend later - warn_until( + salt.utils.versions.warn_until( 3008, "AsyncPushChannel is deprecated. Use zeromq or tcp transport instead.", ) @@ -663,7 +664,7 @@ class AsyncPullChannel: """ If we have additional IPC transports other than UXD and TCP, add them here """ - warn_until( + salt.utils.versions.warn_until( 3008, "AsyncPullChannel is deprecated. Use zeromq or tcp transport instead.", ) diff --git a/salt/transport/base.py b/salt/transport/base.py index f9633775ab0..3e695d82d45 100644 --- a/salt/transport/base.py +++ b/salt/transport/base.py @@ -247,9 +247,7 @@ class PublishClient: """ raise NotImplementedError - async def connect( - self, publish_port, connect_callback=None, disconnect_callback=None - ): + async def connect(self, port=None, connect_callback=None, disconnect_callback=None): """ Create a network connection to the the PublishServer or broker. """ diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index c1b48e044d1..d698f1380ab 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -24,11 +24,13 @@ from salt.utils.versions import warn_until log = logging.getLogger(__name__) + warn_until( 3008, "This module is deprecated. Use zeromq or tcp transport instead.", ) + # 'tornado.concurrent.Future' doesn't support # remove_done_callback() which we would have called # in the timeout case. Due to this, we have this diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 240743d8d8a..f1e3cf531e1 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -14,6 +14,7 @@ import queue import select import socket import threading +import time import urllib import tornado @@ -324,7 +325,6 @@ class TCPPubClient(salt.transport.base.PublishClient): port=None, connect_callback=None, disconnect_callback=None, - background=False, ): if port is not None: self.port = port @@ -332,10 +332,7 @@ class TCPPubClient(salt.transport.base.PublishClient): self.connect_callback = None if disconnect_callback: self.disconnect_callback = None - if background: - self.io_loop.spawn_callback(self._connect) - else: - await self._connect() + await self._connect() def _decode_messages(self, messages): if not isinstance(messages, dict): @@ -407,9 +404,6 @@ class TCPPubClient(salt.transport.base.PublishClient): self.disconnect_callback() self.unpacker = salt.utils.msgpack.Unpacker() continue - except Exception: - log.error("Other exception", exc_info=True) - log.error("on recv got msg %r", msg) callback(msg) def on_recv(self, callback): @@ -694,7 +688,7 @@ class MessageClient: source_ip=None, source_port=None, ): - warn_until( + salt.utils.versions.warn_until( 3008, "MessageClient has been deprecated and will be removed.", ) @@ -1844,7 +1838,7 @@ class TCPReqClient(salt.transport.base.RequestClient): if future is not None: future.set_exception(SaltReqTimeoutError("Message timed out")) - async def send(self, msg, timeout=None, callback=None, raw=False, reply=True): + async def send(self, load, timeout=60): # , callback=None, raw=False, reply=True): await self.connect() if self._closing: raise ClosingError() @@ -1854,13 +1848,12 @@ class TCPReqClient(salt.transport.base.RequestClient): header = {"mid": message_id} future = tornado.concurrent.Future() - if callback is not None: + # if callback is not None: + # def handle_future(future): + # response = future.result() + # self.io_loop.add_callback(callback, response) + # future.add_done_callback(handle_future) - def handle_future(future): - response = future.result() - self.io_loop.add_callback(callback, response) - - future.add_done_callback(handle_future) # Add this future to the mapping self.send_future_map[message_id] = future @@ -1868,9 +1861,9 @@ class TCPReqClient(salt.transport.base.RequestClient): timeout = 1 if timeout is not None: - self.io_loop.call_later(timeout, self.timeout_message, message_id, msg) + self.io_loop.call_later(timeout, self.timeout_message, message_id, load) - item = salt.transport.frame.frame_msg(msg, header=header) + item = salt.transport.frame.frame_msg(load, header=header) async def _do_send(): await self.connect() diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 90774206b2d..ce64265cb8b 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -550,7 +550,7 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer): await self._socket.send(self.encode_payload(reply)) except TimeoutError: continue - except Exception: + except Exception as exc: # pylint: disable=broad-except log.error("Exception in request handler", exc_info=True) break @@ -930,7 +930,6 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): while True: try: package = await self.daemon_pull_sock.recv() - # payload = salt.payload.loads(package) await publish_payload(package) except Exception as exc: # pylint: disable=broad-except log.error( @@ -938,46 +937,37 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer): ) async def publish_payload(self, payload, topic_list=None): - try: - log.trace("Publish payload %r", payload) - # payload = salt.payload.dumps(payload) - if self.opts["zmq_filtering"]: - if topic_list: - for topic in topic_list: - log.trace( - "Sending filtered data over publisher %s", self.pub_uri - ) - # zmq filters are substring match, hash the topic - # to avoid collisions - htopic = salt.utils.stringutils.to_bytes( - hashlib.sha1( - salt.utils.stringutils.to_bytes(topic) - ).hexdigest() - ) - await self.dpub_sock.send(htopic, flags=zmq.SNDMORE) - await self.dpub_sock.send(payload) - log.trace("Filtered data has been sent") - # Syndic broadcast - if self.opts.get("order_masters"): - log.trace("Sending filtered data to syndic") - await self.dpub_sock.send(b"syndic", flags=zmq.SNDMORE) - await self.dpub_sock.send(payload) - log.trace("Filtered data has been sent to syndic") - # otherwise its a broadcast - else: - # TODO: constants file for "broadcast" - log.trace( - "Sending broadcasted data over publisher %s", self.pub_uri + log.trace("Publish payload %r", payload) + # payload = salt.payload.dumps(payload) + if self.opts["zmq_filtering"]: + if topic_list: + for topic in topic_list: + log.trace("Sending filtered data over publisher %s", self.pub_uri) + # zmq filters are substring match, hash the topic + # to avoid collisions + htopic = salt.utils.stringutils.to_bytes( + hashlib.sha1(salt.utils.stringutils.to_bytes(topic)).hexdigest() ) - await self.dpub_sock.send(b"broadcast", flags=zmq.SNDMORE) + await self.dpub_sock.send(htopic, flags=zmq.SNDMORE) await self.dpub_sock.send(payload) - log.trace("Broadcasted data has been sent") + log.trace("Filtered data has been sent") + # Syndic broadcast + if self.opts.get("order_masters"): + log.trace("Sending filtered data to syndic") + await self.dpub_sock.send(b"syndic", flags=zmq.SNDMORE) + await self.dpub_sock.send(payload) + log.trace("Filtered data has been sent to syndic") + # otherwise its a broadcast else: - log.trace("Sending ZMQ-unfiltered data over publisher %s", self.pub_uri) + # TODO: constants file for "broadcast" + log.trace("Sending broadcasted data over publisher %s", self.pub_uri) + await self.dpub_sock.send(b"broadcast", flags=zmq.SNDMORE) await self.dpub_sock.send(payload) - log.trace("Unfiltered data has been sent") - except Exception as exc: # pylint: disable=broad-except - log.error("pub payload %s", exc, exc_info=True) + log.trace("Broadcasted data has been sent") + else: + log.trace("Sending ZMQ-unfiltered data over publisher %s", self.pub_uri) + await self.dpub_sock.send(payload) + log.trace("Unfiltered data has been sent") def pre_fork(self, process_manager): """ diff --git a/tests/pytests/unit/test_master.py b/tests/pytests/unit/test_master.py index 451d92b8c84..7436474b16c 100644 --- a/tests/pytests/unit/test_master.py +++ b/tests/pytests/unit/test_master.py @@ -25,7 +25,6 @@ def maintenence_opts(master_opts): yield opts - @pytest.fixture def maintenence(maintenence_opts): """ @@ -36,7 +35,7 @@ def maintenence(maintenence_opts): yield maintenence finally: pass - + @pytest.fixture def clear_funcs(master_opts): @@ -204,6 +203,7 @@ def test_when_syndic_return_processes_load_then_correct_values_should_be_returne encrypted_requests._syndic_return(payload) fake_return.assert_called_with(expected_return) + def test_aes_funcs_white(): """ Validate methods exposed on AESFuncs exist and are callable @@ -217,6 +217,7 @@ def test_aes_funcs_white(): finally: aes_funcs.destroy() + def test_transport_methods(): class Foo(salt.master.TransportMethods): expose_methods = ["bar"] @@ -282,6 +283,7 @@ def test_aes_funcs_black(): finally: aes_funcs.destroy() + def test_clear_funcs_white(): """ Validate methods exposed on ClearFuncs exist and are callable @@ -295,6 +297,7 @@ def test_clear_funcs_white(): finally: clear_funcs.destroy() + def test_clear_funcs_black(): """ Validate methods on ClearFuncs that should not be called remotely @@ -344,6 +347,7 @@ def test_clear_funcs_black(): finally: clear_funcs.destroy() + def test_clear_funcs_get_method(clear_funcs): assert getattr(clear_funcs, "_send_pub", None) is not None assert clear_funcs.get_method("_send_pub") is None @@ -363,6 +367,7 @@ def test_runner_token_not_authenticated(clear_funcs): ret = clear_funcs.runner({"token": "asdfasdfasdfasdf"}) assert mock_ret == ret + @pytest.mark.slow_test def test_runner_token_authorization_error(clear_funcs): """ @@ -407,9 +412,7 @@ def test_runner_token_salt_invocation_error(clear_funcs): with patch( "salt.auth.LoadAuth.authenticate_token", MagicMock(return_value=mock_token) - ), patch( - "salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"]) - ): + ), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])): ret = clear_funcs.runner(clear_load) assert mock_ret == ret @@ -431,6 +434,7 @@ def test_runner_eauth_not_authenticated(clear_funcs): ret = clear_funcs.runner({"eauth": "foo"}) assert mock_ret == ret + @pytest.mark.slow_test def test_runner_eauth_authorization_error(clear_funcs): """ @@ -469,13 +473,12 @@ def test_runner_eauth_salt_invocation_error(clear_funcs): } with patch( "salt.auth.LoadAuth.authenticate_eauth", MagicMock(return_value=True) - ), patch( - "salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"]) - ): + ), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])): ret = clear_funcs.runner(clear_load) assert mock_ret == ret + @pytest.mark.slow_test def test_runner_user_not_authenticated(clear_funcs): """ @@ -490,8 +493,10 @@ def test_runner_user_not_authenticated(clear_funcs): ret = clear_funcs.runner({}) assert mock_ret == ret + # wheel tests + @pytest.mark.slow_test def test_wheel_token_not_authenticated(clear_funcs): """ @@ -504,7 +509,8 @@ def test_wheel_token_not_authenticated(clear_funcs): } } ret = clear_funcs.wheel({"token": "asdfasdfasdfasdf"}) - assert mock_ret == ret + assert mock_ret == ret + @pytest.mark.slow_test def test_wheel_token_authorization_error(clear_funcs): @@ -529,7 +535,8 @@ def test_wheel_token_authorization_error(clear_funcs): ), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=[])): ret = clear_funcs.wheel(clear_load) - assert mock_ret == ret + assert mock_ret == ret + @pytest.mark.slow_test def test_wheel_token_salt_invocation_error(clear_funcs): @@ -549,12 +556,11 @@ def test_wheel_token_salt_invocation_error(clear_funcs): with patch( "salt.auth.LoadAuth.authenticate_token", MagicMock(return_value=mock_token) - ), patch( - "salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"]) - ): + ), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])): ret = clear_funcs.wheel(clear_load) - assert mock_ret == ret + assert mock_ret == ret + @pytest.mark.slow_test def test_wheel_eauth_not_authenticated(clear_funcs): @@ -572,6 +578,7 @@ def test_wheel_eauth_not_authenticated(clear_funcs): ret = clear_funcs.wheel({"eauth": "foo"}) assert mock_ret == ret + @pytest.mark.slow_test def test_wheel_eauth_authorization_error(clear_funcs): """ @@ -594,6 +601,7 @@ def test_wheel_eauth_authorization_error(clear_funcs): assert mock_ret == ret + @pytest.mark.slow_test def test_wheel_eauth_salt_invocation_error(clear_funcs): """ @@ -609,13 +617,12 @@ def test_wheel_eauth_salt_invocation_error(clear_funcs): } with patch( "salt.auth.LoadAuth.authenticate_eauth", MagicMock(return_value=True) - ), patch( - "salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"]) - ): + ), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])): ret = clear_funcs.wheel(clear_load) assert mock_ret == ret + @pytest.mark.slow_test def test_wheel_user_not_authenticated(clear_funcs): """ @@ -630,8 +637,10 @@ def test_wheel_user_not_authenticated(clear_funcs): ret = clear_funcs.wheel({}) assert mock_ret == ret + # publish tests + @pytest.mark.slow_test async def test_publish_user_is_blacklisted(clear_funcs): """ @@ -648,6 +657,7 @@ async def test_publish_user_is_blacklisted(clear_funcs): ): assert mock_ret == await clear_funcs.publish({"user": "foo", "fun": "test.arg"}) + @pytest.mark.slow_test async def test_publish_cmd_blacklisted(clear_funcs): """ @@ -661,11 +671,10 @@ async def test_publish_cmd_blacklisted(clear_funcs): } with patch( "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False) - ), patch( - "salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=True) - ): + ), patch("salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=True)): assert mock_ret == await clear_funcs.publish({"user": "foo", "fun": "test.arg"}) + @pytest.mark.slow_test async def test_publish_token_not_authenticated(clear_funcs): """ @@ -685,11 +694,10 @@ async def test_publish_token_not_authenticated(clear_funcs): } with patch( "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False) - ), patch( - "salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False) - ): + ), patch("salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)): assert mock_ret == await clear_funcs.publish(load) + @pytest.mark.slow_test async def test_publish_token_authorization_error(clear_funcs): """ @@ -723,6 +731,7 @@ async def test_publish_token_authorization_error(clear_funcs): ): assert mock_ret == await clear_funcs.publish(load) + @pytest.mark.slow_test async def test_publish_eauth_not_authenticated(clear_funcs): """ @@ -742,11 +751,10 @@ async def test_publish_eauth_not_authenticated(clear_funcs): } with patch( "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False) - ), patch( - "salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False) - ): + ), patch("salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)): assert mock_ret == await clear_funcs.publish(load) + @pytest.mark.slow_test async def test_publish_eauth_authorization_error(clear_funcs): """ @@ -777,6 +785,7 @@ async def test_publish_eauth_authorization_error(clear_funcs): ): assert mock_ret == await clear_funcs.publish(load) + @pytest.mark.slow_test async def test_publish_user_not_authenticated(clear_funcs): """ @@ -791,11 +800,10 @@ async def test_publish_user_not_authenticated(clear_funcs): } with patch( "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False) - ), patch( - "salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False) - ): + ), patch("salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)): assert mock_ret == await clear_funcs.publish(load) + @pytest.mark.slow_test async def test_publish_user_authenticated_missing_auth_list(clear_funcs): """ @@ -906,9 +914,7 @@ def test_run_func(maintenence): with patch("salt.master.time", mocked_time), patch( "salt.utils.process", autospec=True - ), patch( - "salt.master.Maintenance._post_fork_init", mocked__post_fork_init - ), patch( + ), patch("salt.master.Maintenance._post_fork_init", mocked__post_fork_init), patch( "salt.daemons.masterapi.clean_old_jobs", mocked_clean_old_jobs ), patch( "salt.daemons.masterapi.clean_expired_tokens", mocked_clean_expired_tokens diff --git a/tests/pytests/unit/transport/test_publish_client.py b/tests/pytests/unit/transport/test_publish_client.py index a96db798514..f07e1e129d1 100644 --- a/tests/pytests/unit/transport/test_publish_client.py +++ b/tests/pytests/unit/transport/test_publish_client.py @@ -171,7 +171,10 @@ async def test_publish_client_connect_server_down(transport, io_loop): elif transport == "tcp": client = salt.transport.tcp.TCPPubClient(opts, io_loop, host=host, port=port) try: - await client.connect(background=True) + # XXX: This is an implimentation detail of the tcp transport. + # XXX: This is an implimentation detail of the tcp transport. + # await client.connect(port) + io_loop.spawn_callback(client.connect) except TimeoutError: pass except Exception: @@ -227,7 +230,9 @@ async def test_publish_client_connect_server_comes_up(transport, io_loop): import tornado client = salt.transport.tcp.TCPPubClient(opts, io_loop, host=host, port=port) - await client.connect(port, background=True) + # XXX: This is an implimentation detail of the tcp transport. + # await client.connect(port) + io_loop.spawn_callback(client.connect) assert client._stream is None await asyncio.sleep(2)