Fix mock calls

Signed-off-by: Pedro Algarvio <palgarvio@vmware.com>
This commit is contained in:
Pedro Algarvio 2023-06-13 07:37:19 +01:00 committed by Pedro Algarvio
parent 3c415b222f
commit 3506e7fd0e
8 changed files with 38 additions and 42 deletions

View file

@ -1290,17 +1290,17 @@ def test_call_apt_dpkg_lock():
]
cmd_mock = MagicMock(side_effect=cmd_side_effect)
cmd_call = (
cmd_call = [
call(
["dpkg", "-l", "python"],
env={},
ignore_retcode=False,
output_loglevel="quiet",
python_shell=True,
env={},
ignore_retcode=False,
username="Darth Vader",
),
)
expected_calls = [cmd_call * 5]
]
expected_calls = cmd_call * 5
with patch.dict(
aptpkg.__salt__,
@ -1320,7 +1320,7 @@ def test_call_apt_dpkg_lock():
# We should attempt to call the cmd 5 times
assert cmd_mock.call_count == 5
cmd_mock.has_calls(expected_calls)
cmd_mock.assert_has_calls(expected_calls)
def test_services_need_restart_checkrestart_missing():

View file

@ -215,7 +215,7 @@ def test_persist_no_conf_failure():
):
with pytest.raises(CommandExecutionError):
linux_sysctl.persist("net.ipv4.ip_forward", 42, config=None)
fopen_mock.called_once()
fopen_mock.assert_called_once()
def test_persist_no_conf_success():
@ -353,7 +353,7 @@ def test_persist_value_with_spaces_already_set(tmp_path):
"""
config = str(tmp_path / "existing_sysctl_with_spaces.conf")
value = "|/usr/share/kdump-tools/dump-core %p %s %t %e"
config_file_content = "kernel.core_pattern = {}\n".format(value)
config_file_content = f"kernel.core_pattern = {value}\n"
with fopen(config, "w", encoding="utf-8") as config_file:
config_file.write(config_file_content)
mock_run = MagicMock(return_value=value)
@ -383,7 +383,7 @@ def test_persist_value_with_spaces_already_configured(tmp_path):
"""
config = str(tmp_path / "existing_sysctl_with_spaces.conf")
value = "|/usr/share/kdump-tools/dump-core %p %s %t %e"
config_file_content = "kernel.core_pattern = {}\n".format(value)
config_file_content = f"kernel.core_pattern = {value}\n"
with fopen(config, "w", encoding="utf-8") as config_file:
config_file.write(config_file_content)
mock_run = MagicMock(return_value="")
@ -451,7 +451,7 @@ def test_persist_value_with_spaces_update_config(tmp_path):
assert os.path.isfile(config)
with fopen(config, encoding="utf-8") as config_file:
written = config_file.read()
assert written == "kernel.core_pattern = {}\n".format(value)
assert written == f"kernel.core_pattern = {value}\n"
def test_persist_value_with_spaces_new_file(tmp_path):

View file

@ -151,7 +151,7 @@ def test_enable():
):
assert win_ip.enable("Ethernet")
mock_cmd.called_once_with(
mock_cmd.assert_called_once_with(
[
"netsh",
"interface",
@ -180,7 +180,7 @@ def test_disable():
):
assert win_ip.disable("Ethernet")
mock_cmd.called_once_with(
mock_cmd.assert_called_once_with(
[
"netsh",
"interface",

View file

@ -60,7 +60,7 @@ def test_fileserver_duration():
end = time.time()
# Interval is equal to timeout so the _do_update method will be called
# one time.
update.called_once()
update.assert_called_once()
# Timeout is 1 second
duration = end - start
if duration > 2 and salt.utils.platform.spawning_platform():

View file

@ -700,7 +700,9 @@ def test_gen_modules_executors(minion_opts):
with patch("salt.pillar.get_pillar", return_value=MockPillarCompiler()):
with patch("salt.loader.executors") as execmock:
minion.gen_modules()
assert execmock.called_with(minion.opts, minion.functions)
execmock.assert_called_with(
minion.opts, functions=minion.functions, proxy=minion.proxy, context={}
)
finally:
minion.destroy()

View file

@ -38,7 +38,7 @@ def sock_dir(tmp_path):
def _assert_got_event(evt, data, msg=None, expected_failure=False):
assert evt is not None, msg
for key in data:
assert key in evt, "{}: Key {} missing".format(msg, key)
assert key in evt, f"{msg}: Key {key} missing"
assertMsg = "{0}: Key {1} value mismatch, {2} != {3}"
assertMsg = assertMsg.format(msg, key, data[key], evt[key])
if not expected_failure:
@ -59,8 +59,8 @@ def test_minion_event(sock_dir):
:10
]
with salt.utils.event.MinionEvent(opts, listen=False) as me:
assert me.puburi == str(sock_dir / "minion_event_{}_pub.ipc".format(id_hash))
assert me.pulluri == str(sock_dir / "minion_event_{}_pull.ipc".format(id_hash))
assert me.puburi == str(sock_dir / f"minion_event_{id_hash}_pub.ipc")
assert me.pulluri == str(sock_dir / f"minion_event_{id_hash}_pull.ipc")
def test_minion_event_tcp_ipc_mode():
@ -73,8 +73,8 @@ def test_minion_event_tcp_ipc_mode():
def test_minion_event_no_id(sock_dir):
with salt.utils.event.MinionEvent(dict(sock_dir=str(sock_dir)), listen=False) as me:
id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes("")).hexdigest()[:10]
assert me.puburi == str(sock_dir / "minion_event_{}_pub.ipc".format(id_hash))
assert me.pulluri == str(sock_dir / "minion_event_{}_pull.ipc".format(id_hash))
assert me.puburi == str(sock_dir / f"minion_event_{id_hash}_pub.ipc")
assert me.pulluri == str(sock_dir / f"minion_event_{id_hash}_pull.ipc")
@pytest.mark.slow_test
@ -256,9 +256,9 @@ def test_event_many(sock_dir):
with eventpublisher_process(str(sock_dir)):
with salt.utils.event.MasterEvent(str(sock_dir), listen=True) as me:
for i in range(500):
me.fire_event({"data": "{}".format(i)}, "testevents")
me.fire_event({"data": f"{i}"}, "testevents")
evt = me.get_event(tag="testevents")
_assert_got_event(evt, {"data": "{}".format(i)}, "Event {}".format(i))
_assert_got_event(evt, {"data": f"{i}"}, f"Event {i}")
@pytest.mark.slow_test
@ -268,10 +268,10 @@ def test_event_many_backlog(sock_dir):
with salt.utils.event.MasterEvent(str(sock_dir), listen=True) as me:
# Must not exceed zmq HWM
for i in range(500):
me.fire_event({"data": "{}".format(i)}, "testevents")
me.fire_event({"data": f"{i}"}, "testevents")
for i in range(500):
evt = me.get_event(tag="testevents")
_assert_got_event(evt, {"data": "{}".format(i)}, "Event {}".format(i))
_assert_got_event(evt, {"data": f"{i}"}, f"Event {i}")
# Test the fire_master function. As it wraps the underlying fire_event,
@ -300,7 +300,7 @@ def test_connect_pull_should_debug_log_on_StreamClosedError():
event = SaltEvent(node=None)
with patch.object(event, "pusher") as mock_pusher:
with patch.object(
salt.utils.event.log, "debug", auto_spec=True
salt.utils.event.log, "debug", autospec=True
) as mock_log_debug:
mock_pusher.connect.side_effect = tornado.iostream.StreamClosedError
event.connect_pull()
@ -315,10 +315,10 @@ def test_connect_pull_should_error_log_on_other_errors(error):
event = SaltEvent(node=None)
with patch.object(event, "pusher") as mock_pusher:
with patch.object(
salt.utils.event.log, "debug", auto_spec=True
salt.utils.event.log, "debug", autospec=True
) as mock_log_debug:
with patch.object(
salt.utils.event.log, "error", auto_spec=True
salt.utils.event.log, "error", autospec=True
) as mock_log_error:
mock_pusher.connect.side_effect = error
event.connect_pull()

View file

@ -28,7 +28,7 @@ class NilrtIPTestCase(TestCase, LoaderModuleMockMixin):
"salt.modules.nilrt_ip._change_dhcp_config", return_value=True
) as change_dhcp_config_mock:
assert nilrt_ip._change_state("test_interface", "down")
assert change_dhcp_config_mock.called_with("test_interface", False)
change_dhcp_config_mock.assert_called_with("test_interface", False)
def test_change_state_up_state(self):
"""
@ -42,7 +42,7 @@ class NilrtIPTestCase(TestCase, LoaderModuleMockMixin):
"salt.modules.nilrt_ip._change_dhcp_config", return_value=True
) as change_dhcp_config_mock:
assert nilrt_ip._change_state("test_interface", "up")
assert change_dhcp_config_mock.called_with("test_interface")
change_dhcp_config_mock.assert_called_with("test_interface")
def test_set_static_all_with_dns(self):
"""

View file

@ -648,7 +648,6 @@ class TestDisbatchLocal(tornado.testing.AsyncTestCase):
with patch.object(
self.handler.application.event_listener,
"get_event",
autospec=True,
side_effect=fancy_get_event,
), patch.dict(
self.handler.application.opts,
@ -699,7 +698,6 @@ class TestDisbatchLocal(tornado.testing.AsyncTestCase):
with patch.object(
self.handler.application.event_listener,
"get_event",
autospec=True,
side_effect=fancy_get_event,
), patch.object(
self.handler,
@ -730,8 +728,8 @@ class TestDisbatchLocal(tornado.testing.AsyncTestCase):
{
"tag": "fnord",
"data": {
"return": "return from fnord {}".format(i),
"id": "fnord {}".format(i),
"return": f"return from fnord {i}",
"id": f"fnord {i}",
},
}
)
@ -761,7 +759,6 @@ class TestDisbatchLocal(tornado.testing.AsyncTestCase):
with patch.object(
self.handler.application.event_listener,
"get_event",
autospec=True,
side_effect=fancy_get_event,
), patch.object(
self.handler,
@ -795,8 +792,8 @@ class TestDisbatchLocal(tornado.testing.AsyncTestCase):
{
"tag": "fnord",
"data": {
"return": "return from fnord {}".format(i),
"id": "fnord {}".format(i),
"return": f"return from fnord {i}",
"id": f"fnord {i}",
},
}
)
@ -821,7 +818,6 @@ class TestDisbatchLocal(tornado.testing.AsyncTestCase):
with patch.object(
self.handler.application.event_listener,
"get_event",
autospec=True,
side_effect=fancy_get_event,
), patch.dict(
self.handler.application.opts,
@ -844,12 +840,12 @@ class TestDisbatchLocal(tornado.testing.AsyncTestCase):
completed_events = [tornado.gen.Future() for _ in range(10)]
events_by_id = {}
for i, event in enumerate(completed_events):
id_ = "fnord {}".format(i)
id_ = f"fnord {i}"
events_by_id[id_] = event
event.set_result(
{
"tag": "fnord",
"data": {"return": "return from {}".format(id_), "id": id_},
"data": {"return": f"return from {id_}", "id": id_},
}
)
expected_result = {
@ -879,7 +875,6 @@ class TestDisbatchLocal(tornado.testing.AsyncTestCase):
with patch.object(
self.handler.application.event_listener,
"get_event",
autospec=True,
side_effect=fancy_get_event,
), patch.dict(
self.handler.application.opts,
@ -905,12 +900,12 @@ class TestDisbatchLocal(tornado.testing.AsyncTestCase):
events_by_id = {}
# Setup some real-enough looking return data
for i, event in enumerate(completed_events):
id_ = "fnord {}".format(i)
id_ = f"fnord {i}"
events_by_id[id_] = event
event.set_result(
{
"tag": "fnord",
"data": {"return": "return from {}".format(id_), "id": id_},
"data": {"return": f"return from {id_}", "id": id_},
}
)
# Hard coded instead of dynamic to avoid potentially writing a test
@ -972,7 +967,6 @@ class TestDisbatchLocal(tornado.testing.AsyncTestCase):
with patch.object(
self.handler.application.event_listener,
"get_event",
autospec=True,
side_effect=fancy_get_event,
), patch.object(
self.handler,