Clean up pre-commit

This commit is contained in:
Daniel A. Wozniak 2023-06-28 17:12:28 -07:00 committed by Gareth J. Greenaway
parent 7d38fe0b25
commit 027a29fd0a
7 changed files with 90 additions and 95 deletions

View file

@ -22,6 +22,7 @@ import salt.utils.files
import salt.utils.minions
import salt.utils.stringutils
import salt.utils.verify
import salt.utils.versions
from salt.utils.asynchronous import SyncWrapper
try:
@ -643,7 +644,7 @@ class AsyncPushChannel:
"""
# FIXME for now, just UXD
# Obviously, this makes the factory approach pointless, but we'll extend later
warn_until(
salt.utils.versions.warn_until(
3008,
"AsyncPushChannel is deprecated. Use zeromq or tcp transport instead.",
)
@ -663,7 +664,7 @@ class AsyncPullChannel:
"""
If we have additional IPC transports other than UXD and TCP, add them here
"""
warn_until(
salt.utils.versions.warn_until(
3008,
"AsyncPullChannel is deprecated. Use zeromq or tcp transport instead.",
)

View file

@ -247,9 +247,7 @@ class PublishClient:
"""
raise NotImplementedError
async def connect(
self, publish_port, connect_callback=None, disconnect_callback=None
):
async def connect(self, port=None, connect_callback=None, disconnect_callback=None):
"""
Create a network connection to the the PublishServer or broker.
"""

View file

@ -24,11 +24,13 @@ from salt.utils.versions import warn_until
log = logging.getLogger(__name__)
warn_until(
3008,
"This module is deprecated. Use zeromq or tcp transport instead.",
)
# 'tornado.concurrent.Future' doesn't support
# remove_done_callback() which we would have called
# in the timeout case. Due to this, we have this

View file

@ -14,6 +14,7 @@ import queue
import select
import socket
import threading
import time
import urllib
import tornado
@ -324,7 +325,6 @@ class TCPPubClient(salt.transport.base.PublishClient):
port=None,
connect_callback=None,
disconnect_callback=None,
background=False,
):
if port is not None:
self.port = port
@ -332,10 +332,7 @@ class TCPPubClient(salt.transport.base.PublishClient):
self.connect_callback = None
if disconnect_callback:
self.disconnect_callback = None
if background:
self.io_loop.spawn_callback(self._connect)
else:
await self._connect()
await self._connect()
def _decode_messages(self, messages):
if not isinstance(messages, dict):
@ -407,9 +404,6 @@ class TCPPubClient(salt.transport.base.PublishClient):
self.disconnect_callback()
self.unpacker = salt.utils.msgpack.Unpacker()
continue
except Exception:
log.error("Other exception", exc_info=True)
log.error("on recv got msg %r", msg)
callback(msg)
def on_recv(self, callback):
@ -694,7 +688,7 @@ class MessageClient:
source_ip=None,
source_port=None,
):
warn_until(
salt.utils.versions.warn_until(
3008,
"MessageClient has been deprecated and will be removed.",
)
@ -1844,7 +1838,7 @@ class TCPReqClient(salt.transport.base.RequestClient):
if future is not None:
future.set_exception(SaltReqTimeoutError("Message timed out"))
async def send(self, msg, timeout=None, callback=None, raw=False, reply=True):
async def send(self, load, timeout=60): # , callback=None, raw=False, reply=True):
await self.connect()
if self._closing:
raise ClosingError()
@ -1854,13 +1848,12 @@ class TCPReqClient(salt.transport.base.RequestClient):
header = {"mid": message_id}
future = tornado.concurrent.Future()
if callback is not None:
# if callback is not None:
# def handle_future(future):
# response = future.result()
# self.io_loop.add_callback(callback, response)
# future.add_done_callback(handle_future)
def handle_future(future):
response = future.result()
self.io_loop.add_callback(callback, response)
future.add_done_callback(handle_future)
# Add this future to the mapping
self.send_future_map[message_id] = future
@ -1868,9 +1861,9 @@ class TCPReqClient(salt.transport.base.RequestClient):
timeout = 1
if timeout is not None:
self.io_loop.call_later(timeout, self.timeout_message, message_id, msg)
self.io_loop.call_later(timeout, self.timeout_message, message_id, load)
item = salt.transport.frame.frame_msg(msg, header=header)
item = salt.transport.frame.frame_msg(load, header=header)
async def _do_send():
await self.connect()

View file

@ -550,7 +550,7 @@ class RequestServer(salt.transport.base.DaemonizedRequestServer):
await self._socket.send(self.encode_payload(reply))
except TimeoutError:
continue
except Exception:
except Exception as exc: # pylint: disable=broad-except
log.error("Exception in request handler", exc_info=True)
break
@ -930,7 +930,6 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
while True:
try:
package = await self.daemon_pull_sock.recv()
# payload = salt.payload.loads(package)
await publish_payload(package)
except Exception as exc: # pylint: disable=broad-except
log.error(
@ -938,46 +937,37 @@ class PublishServer(salt.transport.base.DaemonizedPublishServer):
)
async def publish_payload(self, payload, topic_list=None):
try:
log.trace("Publish payload %r", payload)
# payload = salt.payload.dumps(payload)
if self.opts["zmq_filtering"]:
if topic_list:
for topic in topic_list:
log.trace(
"Sending filtered data over publisher %s", self.pub_uri
)
# zmq filters are substring match, hash the topic
# to avoid collisions
htopic = salt.utils.stringutils.to_bytes(
hashlib.sha1(
salt.utils.stringutils.to_bytes(topic)
).hexdigest()
)
await self.dpub_sock.send(htopic, flags=zmq.SNDMORE)
await self.dpub_sock.send(payload)
log.trace("Filtered data has been sent")
# Syndic broadcast
if self.opts.get("order_masters"):
log.trace("Sending filtered data to syndic")
await self.dpub_sock.send(b"syndic", flags=zmq.SNDMORE)
await self.dpub_sock.send(payload)
log.trace("Filtered data has been sent to syndic")
# otherwise its a broadcast
else:
# TODO: constants file for "broadcast"
log.trace(
"Sending broadcasted data over publisher %s", self.pub_uri
log.trace("Publish payload %r", payload)
# payload = salt.payload.dumps(payload)
if self.opts["zmq_filtering"]:
if topic_list:
for topic in topic_list:
log.trace("Sending filtered data over publisher %s", self.pub_uri)
# zmq filters are substring match, hash the topic
# to avoid collisions
htopic = salt.utils.stringutils.to_bytes(
hashlib.sha1(salt.utils.stringutils.to_bytes(topic)).hexdigest()
)
await self.dpub_sock.send(b"broadcast", flags=zmq.SNDMORE)
await self.dpub_sock.send(htopic, flags=zmq.SNDMORE)
await self.dpub_sock.send(payload)
log.trace("Broadcasted data has been sent")
log.trace("Filtered data has been sent")
# Syndic broadcast
if self.opts.get("order_masters"):
log.trace("Sending filtered data to syndic")
await self.dpub_sock.send(b"syndic", flags=zmq.SNDMORE)
await self.dpub_sock.send(payload)
log.trace("Filtered data has been sent to syndic")
# otherwise its a broadcast
else:
log.trace("Sending ZMQ-unfiltered data over publisher %s", self.pub_uri)
# TODO: constants file for "broadcast"
log.trace("Sending broadcasted data over publisher %s", self.pub_uri)
await self.dpub_sock.send(b"broadcast", flags=zmq.SNDMORE)
await self.dpub_sock.send(payload)
log.trace("Unfiltered data has been sent")
except Exception as exc: # pylint: disable=broad-except
log.error("pub payload %s", exc, exc_info=True)
log.trace("Broadcasted data has been sent")
else:
log.trace("Sending ZMQ-unfiltered data over publisher %s", self.pub_uri)
await self.dpub_sock.send(payload)
log.trace("Unfiltered data has been sent")
def pre_fork(self, process_manager):
"""

View file

@ -25,7 +25,6 @@ def maintenence_opts(master_opts):
yield opts
@pytest.fixture
def maintenence(maintenence_opts):
"""
@ -36,7 +35,7 @@ def maintenence(maintenence_opts):
yield maintenence
finally:
pass
@pytest.fixture
def clear_funcs(master_opts):
@ -204,6 +203,7 @@ def test_when_syndic_return_processes_load_then_correct_values_should_be_returne
encrypted_requests._syndic_return(payload)
fake_return.assert_called_with(expected_return)
def test_aes_funcs_white():
"""
Validate methods exposed on AESFuncs exist and are callable
@ -217,6 +217,7 @@ def test_aes_funcs_white():
finally:
aes_funcs.destroy()
def test_transport_methods():
class Foo(salt.master.TransportMethods):
expose_methods = ["bar"]
@ -282,6 +283,7 @@ def test_aes_funcs_black():
finally:
aes_funcs.destroy()
def test_clear_funcs_white():
"""
Validate methods exposed on ClearFuncs exist and are callable
@ -295,6 +297,7 @@ def test_clear_funcs_white():
finally:
clear_funcs.destroy()
def test_clear_funcs_black():
"""
Validate methods on ClearFuncs that should not be called remotely
@ -344,6 +347,7 @@ def test_clear_funcs_black():
finally:
clear_funcs.destroy()
def test_clear_funcs_get_method(clear_funcs):
assert getattr(clear_funcs, "_send_pub", None) is not None
assert clear_funcs.get_method("_send_pub") is None
@ -363,6 +367,7 @@ def test_runner_token_not_authenticated(clear_funcs):
ret = clear_funcs.runner({"token": "asdfasdfasdfasdf"})
assert mock_ret == ret
@pytest.mark.slow_test
def test_runner_token_authorization_error(clear_funcs):
"""
@ -407,9 +412,7 @@ def test_runner_token_salt_invocation_error(clear_funcs):
with patch(
"salt.auth.LoadAuth.authenticate_token", MagicMock(return_value=mock_token)
), patch(
"salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])
):
), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])):
ret = clear_funcs.runner(clear_load)
assert mock_ret == ret
@ -431,6 +434,7 @@ def test_runner_eauth_not_authenticated(clear_funcs):
ret = clear_funcs.runner({"eauth": "foo"})
assert mock_ret == ret
@pytest.mark.slow_test
def test_runner_eauth_authorization_error(clear_funcs):
"""
@ -469,13 +473,12 @@ def test_runner_eauth_salt_invocation_error(clear_funcs):
}
with patch(
"salt.auth.LoadAuth.authenticate_eauth", MagicMock(return_value=True)
), patch(
"salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])
):
), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])):
ret = clear_funcs.runner(clear_load)
assert mock_ret == ret
@pytest.mark.slow_test
def test_runner_user_not_authenticated(clear_funcs):
"""
@ -490,8 +493,10 @@ def test_runner_user_not_authenticated(clear_funcs):
ret = clear_funcs.runner({})
assert mock_ret == ret
# wheel tests
@pytest.mark.slow_test
def test_wheel_token_not_authenticated(clear_funcs):
"""
@ -504,7 +509,8 @@ def test_wheel_token_not_authenticated(clear_funcs):
}
}
ret = clear_funcs.wheel({"token": "asdfasdfasdfasdf"})
assert mock_ret == ret
assert mock_ret == ret
@pytest.mark.slow_test
def test_wheel_token_authorization_error(clear_funcs):
@ -529,7 +535,8 @@ def test_wheel_token_authorization_error(clear_funcs):
), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=[])):
ret = clear_funcs.wheel(clear_load)
assert mock_ret == ret
assert mock_ret == ret
@pytest.mark.slow_test
def test_wheel_token_salt_invocation_error(clear_funcs):
@ -549,12 +556,11 @@ def test_wheel_token_salt_invocation_error(clear_funcs):
with patch(
"salt.auth.LoadAuth.authenticate_token", MagicMock(return_value=mock_token)
), patch(
"salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])
):
), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])):
ret = clear_funcs.wheel(clear_load)
assert mock_ret == ret
assert mock_ret == ret
@pytest.mark.slow_test
def test_wheel_eauth_not_authenticated(clear_funcs):
@ -572,6 +578,7 @@ def test_wheel_eauth_not_authenticated(clear_funcs):
ret = clear_funcs.wheel({"eauth": "foo"})
assert mock_ret == ret
@pytest.mark.slow_test
def test_wheel_eauth_authorization_error(clear_funcs):
"""
@ -594,6 +601,7 @@ def test_wheel_eauth_authorization_error(clear_funcs):
assert mock_ret == ret
@pytest.mark.slow_test
def test_wheel_eauth_salt_invocation_error(clear_funcs):
"""
@ -609,13 +617,12 @@ def test_wheel_eauth_salt_invocation_error(clear_funcs):
}
with patch(
"salt.auth.LoadAuth.authenticate_eauth", MagicMock(return_value=True)
), patch(
"salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])
):
), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])):
ret = clear_funcs.wheel(clear_load)
assert mock_ret == ret
@pytest.mark.slow_test
def test_wheel_user_not_authenticated(clear_funcs):
"""
@ -630,8 +637,10 @@ def test_wheel_user_not_authenticated(clear_funcs):
ret = clear_funcs.wheel({})
assert mock_ret == ret
# publish tests
@pytest.mark.slow_test
async def test_publish_user_is_blacklisted(clear_funcs):
"""
@ -648,6 +657,7 @@ async def test_publish_user_is_blacklisted(clear_funcs):
):
assert mock_ret == await clear_funcs.publish({"user": "foo", "fun": "test.arg"})
@pytest.mark.slow_test
async def test_publish_cmd_blacklisted(clear_funcs):
"""
@ -661,11 +671,10 @@ async def test_publish_cmd_blacklisted(clear_funcs):
}
with patch(
"salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False)
), patch(
"salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=True)
):
), patch("salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=True)):
assert mock_ret == await clear_funcs.publish({"user": "foo", "fun": "test.arg"})
@pytest.mark.slow_test
async def test_publish_token_not_authenticated(clear_funcs):
"""
@ -685,11 +694,10 @@ async def test_publish_token_not_authenticated(clear_funcs):
}
with patch(
"salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False)
), patch(
"salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)
):
), patch("salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)):
assert mock_ret == await clear_funcs.publish(load)
@pytest.mark.slow_test
async def test_publish_token_authorization_error(clear_funcs):
"""
@ -723,6 +731,7 @@ async def test_publish_token_authorization_error(clear_funcs):
):
assert mock_ret == await clear_funcs.publish(load)
@pytest.mark.slow_test
async def test_publish_eauth_not_authenticated(clear_funcs):
"""
@ -742,11 +751,10 @@ async def test_publish_eauth_not_authenticated(clear_funcs):
}
with patch(
"salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False)
), patch(
"salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)
):
), patch("salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)):
assert mock_ret == await clear_funcs.publish(load)
@pytest.mark.slow_test
async def test_publish_eauth_authorization_error(clear_funcs):
"""
@ -777,6 +785,7 @@ async def test_publish_eauth_authorization_error(clear_funcs):
):
assert mock_ret == await clear_funcs.publish(load)
@pytest.mark.slow_test
async def test_publish_user_not_authenticated(clear_funcs):
"""
@ -791,11 +800,10 @@ async def test_publish_user_not_authenticated(clear_funcs):
}
with patch(
"salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False)
), patch(
"salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)
):
), patch("salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)):
assert mock_ret == await clear_funcs.publish(load)
@pytest.mark.slow_test
async def test_publish_user_authenticated_missing_auth_list(clear_funcs):
"""
@ -906,9 +914,7 @@ def test_run_func(maintenence):
with patch("salt.master.time", mocked_time), patch(
"salt.utils.process", autospec=True
), patch(
"salt.master.Maintenance._post_fork_init", mocked__post_fork_init
), patch(
), patch("salt.master.Maintenance._post_fork_init", mocked__post_fork_init), patch(
"salt.daemons.masterapi.clean_old_jobs", mocked_clean_old_jobs
), patch(
"salt.daemons.masterapi.clean_expired_tokens", mocked_clean_expired_tokens

View file

@ -171,7 +171,10 @@ async def test_publish_client_connect_server_down(transport, io_loop):
elif transport == "tcp":
client = salt.transport.tcp.TCPPubClient(opts, io_loop, host=host, port=port)
try:
await client.connect(background=True)
# XXX: This is an implimentation detail of the tcp transport.
# XXX: This is an implimentation detail of the tcp transport.
# await client.connect(port)
io_loop.spawn_callback(client.connect)
except TimeoutError:
pass
except Exception:
@ -227,7 +230,9 @@ async def test_publish_client_connect_server_comes_up(transport, io_loop):
import tornado
client = salt.transport.tcp.TCPPubClient(opts, io_loop, host=host, port=port)
await client.connect(port, background=True)
# XXX: This is an implimentation detail of the tcp transport.
# await client.connect(port)
io_loop.spawn_callback(client.connect)
assert client._stream is None
await asyncio.sleep(2)