Migrate `rest_tornado` tests to PyTest(those possible)

This commit is contained in:
Pedro Algarvio 2021-09-21 07:53:19 +01:00 committed by Megan Wilhite
parent d92567a192
commit 044007145b
22 changed files with 1868 additions and 1645 deletions

View file

@ -231,7 +231,17 @@ salt/netapi/rest_cherrypy/*:
- integration.netapi.test_client
salt/netapi/rest_tornado/*:
- unit.netapi.test_rest_tornado
- pytests.functional.netapi.rest_tornado.test_auth_handler
- pytests.functional.netapi.rest_tornado.test_base_api_handler
- pytests.functional.netapi.rest_tornado.test_event_listener
- pytests.functional.netapi.rest_tornado.test_utils
- pytests.functional.netapi.rest_tornado.test_webhooks_handler
- pytests.functional.netapi.rest_tornado.test_websockets_handler
- pytests.integration.netapi.rest_tornado.test_events_api_handler
- pytests.integration.netapi.rest_tornado.test_jobs_api_handler
- pytests.integration.netapi.rest_tornado.test_minions_api_handler
- pytests.integration.netapi.rest_tornado.test_root_handler
- pytests.integration.netapi.rest_tornado.test_run_api_handler
- integration.netapi.rest_tornado.test_app
- integration.netapi.test_client

View file

@ -3,23 +3,114 @@ import threading
import time
import pytest
import salt.ext.tornado.ioloop
import salt.auth
import salt.ext.tornado.escape
import salt.ext.tornado.web
import salt.utils.json
import salt.utils.stringutils
from salt.ext.tornado.testing import AsyncHTTPTestCase
from salt.netapi.rest_tornado import saltnado
from tests.support.helpers import TstSuiteLoggingHandler
from tests.support.helpers import TstSuiteLoggingHandler, patched_environ
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
from tests.support.unit import skipIf
from tests.unit.netapi.test_rest_tornado import SaltnadoTestsBase
class SaltnadoIntegrationTestsBase(SaltnadoTestsBase):
class SaltnadoIntegrationTestsBase(
AsyncHTTPTestCase, AdaptedConfigurationTestCaseMixin
):
content_type_map = {
"json": "application/json",
"json-utf8": "application/json; charset=utf-8",
"yaml": "application/x-yaml",
"text": "text/plain",
"form": "application/x-www-form-urlencoded",
"xml": "application/xml",
"real-accept-header-json": "application/json, text/javascript, */*; q=0.01",
"real-accept-header-yaml": "application/x-yaml, text/yaml, */*; q=0.01",
}
auth_creds = (
("username", "saltdev_api"),
("password", "saltdev"),
("eauth", "auto"),
)
@property
def auth_creds_dict(self):
return dict(self.auth_creds)
@property
def opts(self):
return self.get_config("client_config", from_scratch=True)
return self.get_temp_config("client_config")
@property
def mod_opts(self):
return self.get_config("minion", from_scratch=True)
return self.get_temp_config("minion")
@property
def auth(self):
if not hasattr(self, "__auth"):
self.__auth = salt.auth.LoadAuth(self.opts)
return self.__auth
@property
def token(self):
"""Mint and return a valid token for auth_creds"""
return self.auth.mk_token(self.auth_creds_dict)
def setUp(self):
super().setUp()
self.patched_environ = patched_environ(ASYNC_TEST_TIMEOUT="30")
self.patched_environ.__enter__()
self.addCleanup(self.patched_environ.__exit__)
def tearDown(self):
super().tearDown()
if hasattr(self, "http_server"):
del self.http_server
if hasattr(self, "io_loop"):
del self.io_loop
if hasattr(self, "_app"):
del self._app
if hasattr(self, "http_client"):
del self.http_client
if hasattr(self, "__port"):
del self.__port
if hasattr(self, "_AsyncHTTPTestCase__port"):
del self._AsyncHTTPTestCase__port
if hasattr(self, "__auth"):
del self.__auth
if hasattr(self, "_SaltnadoIntegrationTestsBase__auth"):
del self._SaltnadoIntegrationTestsBase__auth
if hasattr(self, "_test_generator"):
del self._test_generator
if hasattr(self, "application"):
del self.application
if hasattr(self, "patched_environ"):
del self.patched_environ
def build_tornado_app(self, urls):
application = salt.ext.tornado.web.Application(urls, debug=True)
application.auth = self.auth
application.opts = self.opts
application.mod_opts = self.mod_opts
return application
def decode_body(self, response):
if response is None:
return response
if response.body:
# Decode it
if response.headers.get("Content-Type") == "application/json":
response._body = response.body.decode("utf-8")
else:
response._body = salt.ext.tornado.escape.native_str(response.body)
return response
def fetch(self, path, **kwargs):
return self.decode_body(super().fetch(path, **kwargs))
def get_app(self):
raise NotImplementedError
@ -40,44 +131,6 @@ class TestSaltAPIHandler(SaltnadoIntegrationTestsBase):
self.application = application
return application
def test_root(self):
"""
Test the root path which returns the list of clients we support
"""
response = self.fetch(
"/",
connect_timeout=30,
request_timeout=30,
)
self.assertEqual(response.code, 200)
response_obj = salt.utils.json.loads(response.body)
self.assertEqual(
sorted(response_obj["clients"]),
["local", "local_async", "runner", "runner_async"],
)
self.assertEqual(response_obj["return"], "Welcome")
@pytest.mark.slow_test
def test_post_no_auth(self):
"""
Test post with no auth token, should 401
"""
# get a token for this test
low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
response = self.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
headers={"Content-Type": self.content_type_map["json"]},
follow_redirects=False,
connect_timeout=30,
request_timeout=30,
)
self.assertEqual(response.code, 302)
self.assertEqual(response.headers["Location"], "/login")
# Local client tests
@pytest.mark.slow_test
def test_regression_49572(self):
with TstSuiteLoggingHandler() as handler:
@ -110,241 +163,6 @@ class TestSaltAPIHandler(SaltnadoIntegrationTestsBase):
"#49572: regression: set_result on completed event"
)
def test_simple_local_post(self):
"""
Test a basic API of /
"""
low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
response = self.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
connect_timeout=30,
request_timeout=30,
)
response_obj = salt.utils.json.loads(response.body)
self.assertEqual(len(response_obj["return"]), 1)
# If --proxy is set, it will cause an extra minion_id to be in the
# response. Since there's not a great way to know if the test
# runner's proxy minion is running, and we're not testing proxy
# minions here anyway, just remove it from the response.
response_obj["return"][0].pop("proxytest", None)
self.assertEqual(
response_obj["return"][0], {"minion": True, "sub_minion": True}
)
def test_simple_local_post_no_tgt(self):
"""
POST job with invalid tgt
"""
low = [{"client": "local", "tgt": "minion_we_dont_have", "fun": "test.ping"}]
response = self.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
connect_timeout=30,
request_timeout=30,
)
response_obj = salt.utils.json.loads(response.body)
self.assertEqual(
response_obj["return"],
[
"No minions matched the target. No command was sent, no jid was"
" assigned."
],
)
# local client request body test
def test_simple_local_post_only_dictionary_request(self):
"""
Test a basic API of /
"""
low = {
"client": "local",
"tgt": "*",
"fun": "test.ping",
}
response = self.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
connect_timeout=30,
request_timeout=30,
)
response_obj = salt.utils.json.loads(response.body)
self.assertEqual(len(response_obj["return"]), 1)
# If --proxy is set, it will cause an extra minion_id to be in the
# response. Since there's not a great way to know if the test
# runner's proxy minion is running, and we're not testing proxy
# minions here anyway, just remove it from the response.
response_obj["return"][0].pop("proxytest", None)
self.assertEqual(
response_obj["return"][0], {"minion": True, "sub_minion": True}
)
def test_simple_local_post_invalid_request(self):
"""
Test a basic API of /
"""
low = ["invalid request"]
response = self.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
connect_timeout=30,
request_timeout=30,
)
self.assertEqual(response.code, 400)
# local_async tests
def test_simple_local_async_post(self):
low = [{"client": "local_async", "tgt": "*", "fun": "test.ping"}]
response = self.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
)
response_obj = salt.utils.json.loads(response.body)
ret = response_obj["return"]
ret[0]["minions"] = sorted(ret[0]["minions"])
try:
# If --proxy is set, it will cause an extra minion_id to be in the
# response. Since there's not a great way to know if the test
# runner's proxy minion is running, and we're not testing proxy
# minions here anyway, just remove it from the response.
ret[0]["minions"].remove("proxytest")
except ValueError:
pass
# TODO: verify pub function? Maybe look at how we test the publisher
self.assertEqual(len(ret), 1)
self.assertIn("jid", ret[0])
self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
def test_multi_local_async_post(self):
low = [
{"client": "local_async", "tgt": "*", "fun": "test.ping"},
{"client": "local_async", "tgt": "*", "fun": "test.ping"},
]
response = self.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
)
response_obj = salt.utils.json.loads(response.body)
ret = response_obj["return"]
ret[0]["minions"] = sorted(ret[0]["minions"])
ret[1]["minions"] = sorted(ret[1]["minions"])
try:
# If --proxy is set, it will cause an extra minion_id to be in the
# response. Since there's not a great way to know if the test
# runner's proxy minion is running, and we're not testing proxy
# minions here anyway, just remove it from the response.
ret[0]["minions"].remove("proxytest")
ret[1]["minions"].remove("proxytest")
except ValueError:
pass
self.assertEqual(len(ret), 2)
self.assertIn("jid", ret[0])
self.assertIn("jid", ret[1])
self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
self.assertEqual(ret[1]["minions"], sorted(["minion", "sub_minion"]))
@pytest.mark.slow_test
def test_multi_local_async_post_multitoken(self):
low = [
{"client": "local_async", "tgt": "*", "fun": "test.ping"},
{
"client": "local_async",
"tgt": "*",
"fun": "test.ping",
"token": self.token[
"token"
], # send a different (but still valid token)
},
{
"client": "local_async",
"tgt": "*",
"fun": "test.ping",
"token": "BAD_TOKEN", # send a bad token
},
]
response = self.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
)
response_obj = salt.utils.json.loads(response.body)
ret = response_obj["return"]
ret[0]["minions"] = sorted(ret[0]["minions"])
ret[1]["minions"] = sorted(ret[1]["minions"])
try:
# If --proxy is set, it will cause an extra minion_id to be in the
# response. Since there's not a great way to know if the test
# runner's proxy minion is running, and we're not testing proxy
# minions here anyway, just remove it from the response.
ret[0]["minions"].remove("proxytest")
ret[1]["minions"].remove("proxytest")
except ValueError:
pass
self.assertEqual(len(ret), 3) # make sure we got 3 responses
self.assertIn("jid", ret[0]) # the first 2 are regular returns
self.assertIn("jid", ret[1])
self.assertIn("Failed to authenticate", ret[2]) # bad auth
self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
self.assertEqual(ret[1]["minions"], sorted(["minion", "sub_minion"]))
@pytest.mark.slow_test
def test_simple_local_async_post_no_tgt(self):
low = [
{"client": "local_async", "tgt": "minion_we_dont_have", "fun": "test.ping"}
]
response = self.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
)
response_obj = salt.utils.json.loads(response.body)
self.assertEqual(response_obj["return"], [{}])
@skipIf(True, "Undetermined race condition in test. Temporarily disabled.")
def test_simple_local_post_only_dictionary_request_with_order_masters(self):
"""
@ -380,288 +198,6 @@ class TestSaltAPIHandler(SaltnadoIntegrationTestsBase):
response_obj[0]["return"].pop("proxytest", None)
self.assertEqual(response_obj["return"], [{"minion": True, "sub_minion": True}])
# runner tests
@pytest.mark.slow_test
def test_simple_local_runner_post(self):
low = [{"client": "runner", "fun": "manage.up"}]
response = self.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
connect_timeout=30,
request_timeout=300,
)
response_obj = salt.utils.json.loads(response.body)
self.assertEqual(len(response_obj["return"]), 1)
try:
# If --proxy is set, it will cause an extra minion_id to be in the
# response. Since there's not a great way to know if the test
# runner's proxy minion is running, and we're not testing proxy
# minions here anyway, just remove it from the response.
response_obj["return"][0].remove("proxytest")
except ValueError:
pass
self.assertEqual(
sorted(response_obj["return"][0]), sorted(["minion", "sub_minion"])
)
# runner_async tests
def test_simple_local_runner_async_post(self):
low = [{"client": "runner_async", "fun": "manage.up"}]
response = self.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
connect_timeout=10,
request_timeout=10,
)
response_obj = salt.utils.json.loads(response.body)
self.assertIn("return", response_obj)
self.assertEqual(1, len(response_obj["return"]))
self.assertIn("jid", response_obj["return"][0])
self.assertIn("tag", response_obj["return"][0])
@pytest.mark.flaky(max_runs=4)
class TestMinionSaltAPIHandler(SaltnadoIntegrationTestsBase):
def get_app(self):
urls = [
(r"/minions/(.*)", saltnado.MinionSaltAPIHandler),
(r"/minions", saltnado.MinionSaltAPIHandler),
]
application = self.build_tornado_app(urls)
application.event_listener = saltnado.EventListener({}, self.opts)
return application
def test_get_no_mid(self):
response = self.fetch(
"/minions",
method="GET",
headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
follow_redirects=False,
)
response_obj = salt.utils.json.loads(response.body)
self.assertEqual(len(response_obj["return"]), 1)
# one per minion
self.assertEqual(len(response_obj["return"][0]), 2)
# check a single grain
for minion_id, grains in response_obj["return"][0].items():
self.assertEqual(minion_id, grains["id"])
@pytest.mark.slow_test
def test_get(self):
response = self.fetch(
"/minions/minion",
method="GET",
headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
follow_redirects=False,
)
response_obj = salt.utils.json.loads(response.body)
self.assertEqual(len(response_obj["return"]), 1)
self.assertEqual(len(response_obj["return"][0]), 1)
# check a single grain
self.assertEqual(response_obj["return"][0]["minion"]["id"], "minion")
def test_post(self):
low = [{"tgt": "*minion", "fun": "test.ping"}]
response = self.fetch(
"/minions",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
)
response_obj = salt.utils.json.loads(response.body)
ret = response_obj["return"]
ret[0]["minions"] = sorted(ret[0]["minions"])
# TODO: verify pub function? Maybe look at how we test the publisher
self.assertEqual(len(ret), 1)
self.assertIn("jid", ret[0])
self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
@pytest.mark.slow_test
def test_post_with_client(self):
# get a token for this test
low = [{"client": "local_async", "tgt": "*minion", "fun": "test.ping"}]
response = self.fetch(
"/minions",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
)
response_obj = salt.utils.json.loads(response.body)
ret = response_obj["return"]
ret[0]["minions"] = sorted(ret[0]["minions"])
# TODO: verify pub function? Maybe look at how we test the publisher
self.assertEqual(len(ret), 1)
self.assertIn("jid", ret[0])
self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
@pytest.mark.slow_test
def test_post_with_incorrect_client(self):
"""
The /minions endpoint is asynchronous only, so if you try something else
make sure you get an error
"""
# get a token for this test
low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
response = self.fetch(
"/minions",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
)
self.assertEqual(response.code, 400)
class TestJobsSaltAPIHandler(SaltnadoIntegrationTestsBase):
def get_app(self):
urls = [
(r"/jobs/(.*)", saltnado.JobsSaltAPIHandler),
(r"/jobs", saltnado.JobsSaltAPIHandler),
]
application = self.build_tornado_app(urls)
application.event_listener = saltnado.EventListener({}, self.opts)
return application
@pytest.mark.slow_test
def test_get(self):
# test with no JID
self.http_client.fetch(
self.get_url("/jobs"),
self.stop,
method="GET",
headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
follow_redirects=False,
)
response = self.wait(timeout=30)
response_obj = salt.utils.json.loads(response.body)["return"][0]
try:
for jid, ret in response_obj.items():
self.assertIn("Function", ret)
self.assertIn("Target", ret)
self.assertIn("Target-type", ret)
self.assertIn("User", ret)
self.assertIn("StartTime", ret)
self.assertIn("Arguments", ret)
except AttributeError as attribute_error:
print(salt.utils.json.loads(response.body))
raise
# test with a specific JID passed in
jid = next(iter(response_obj.keys()))
self.http_client.fetch(
self.get_url("/jobs/{}".format(jid)),
self.stop,
method="GET",
headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
follow_redirects=False,
)
response = self.wait(timeout=30)
response_obj = salt.utils.json.loads(response.body)["return"][0]
self.assertIn("Function", response_obj)
self.assertIn("Target", response_obj)
self.assertIn("Target-type", response_obj)
self.assertIn("User", response_obj)
self.assertIn("StartTime", response_obj)
self.assertIn("Arguments", response_obj)
self.assertIn("Result", response_obj)
# TODO: run all the same tests from the root handler, but for now since they are
# the same code, we'll just sanity check
class TestRunSaltAPIHandler(SaltnadoIntegrationTestsBase):
def get_app(self):
urls = [
("/run", saltnado.RunSaltAPIHandler),
]
application = self.build_tornado_app(urls)
application.event_listener = saltnado.EventListener({}, self.opts)
return application
@pytest.mark.slow_test
def test_get(self):
low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
response = self.fetch(
"/run",
method="POST",
body=salt.utils.json.dumps(low),
headers={
"Content-Type": self.content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: self.token["token"],
},
)
response_obj = salt.utils.json.loads(response.body)
self.assertEqual(response_obj["return"], [{"minion": True, "sub_minion": True}])
class TestEventsSaltAPIHandler(SaltnadoIntegrationTestsBase):
def get_app(self):
urls = [
(r"/events", saltnado.EventsSaltAPIHandler),
]
application = self.build_tornado_app(urls)
application.event_listener = saltnado.EventListener({}, self.opts)
# store a reference, for magic later!
self.application = application
self.events_to_fire = 0
return application
@pytest.mark.slow_test
def test_get(self):
self.events_to_fire = 5
response = self.fetch(
"/events",
headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
streaming_callback=self.on_event,
)
def _stop(self):
self.stop()
def on_event(self, event):
event = event.decode("utf-8")
if self.events_to_fire > 0:
self.application.event_listener.event.fire_event(
{"foo": "bar", "baz": "qux"}, "salt/netapi/test"
)
self.events_to_fire -= 1
# once we've fired all the events, lets call it a day
else:
# wait so that we can ensure that the next future is ready to go
# to make sure we don't explode if the next one is ready
salt.ext.tornado.ioloop.IOLoop.current().add_timeout(
time.time() + 0.5, self._stop
)
event = event.strip()
# if we got a retry, just continue
if event != "retry: 400":
tag, data = event.splitlines()
self.assertTrue(tag.startswith("tag: "))
self.assertTrue(data.startswith("data: "))
class TestWebhookSaltAPIHandler(SaltnadoIntegrationTestsBase):
def get_app(self):

View file

@ -0,0 +1,69 @@
import pathlib
import pytest
import salt.config
import tests.support.saltnado as saltnado_support
@pytest.fixture
def client_config(salt_master_factory):
# Make sure we have the tokens directory writable
tokens_dir = pathlib.Path(salt_master_factory.config["cachedir"]) / "tokens"
if not tokens_dir.is_dir():
tokens_dir.mkdir()
config = salt.config.client_config(
salt_master_factory.config["conf_file"],
defaults=salt_master_factory.config.copy(),
)
return config
@pytest.fixture
def minion_config(salt_minion_factory):
return salt_minion_factory.config.copy()
@pytest.fixture
def load_auth(client_config):
return saltnado_support.load_auth(client_config)
@pytest.fixture
def auth_creds():
return saltnado_support.auth_creds()
@pytest.fixture
def auth_creds_dict():
return saltnado_support.auth_creds_dict()
@pytest.fixture
def auth_token(load_auth, auth_creds_dict):
"""
Mint and return a valid token for auth_creds
"""
return saltnado_support.auth_token(load_auth, auth_creds_dict)
@pytest.fixture
def content_type_map():
return saltnado_support.content_type_map()
@pytest.fixture
def app(app_urls, load_auth, client_config, minion_config):
return saltnado_support.build_tornado_app(
app_urls, load_auth, client_config, minion_config
)
@pytest.fixture
def http_server(io_loop, app):
with saltnado_support.TestsHttpServer(io_loop=io_loop, app=app) as server:
yield server
@pytest.fixture
def http_client(http_server):
return http_server.client

View file

@ -0,0 +1,174 @@
import urllib.parse
import pytest
import salt.utils.json
import salt.utils.yaml
from salt.ext.tornado.httpclient import HTTPError
from salt.netapi.rest_tornado import saltnado
@pytest.fixture
def app_urls():
return [
("/login", saltnado.SaltAuthHandler),
]
async def test_get(http_client):
"""
We don't allow gets, so assert we get 401s
"""
with pytest.raises(HTTPError) as exc:
await http_client.fetch("/login")
assert exc.value.code == 401
async def test_login(
http_client, content_type_map, auth_creds, auth_creds_dict, subtests, client_config
):
"""
Test valid logins
"""
with subtests.test("Test in form encoded"):
response = await http_client.fetch(
"/login",
method="POST",
body=urllib.parse.urlencode(auth_creds),
headers={"Content-Type": content_type_map["form"]},
)
cookies = response.headers["Set-Cookie"]
assert response.code == 200
response_obj = salt.utils.json.loads(response.body)["return"][0]
token = response_obj["token"]
assert "session_id={}".format(token) in cookies
perms = response_obj["perms"]
perms_config = client_config["external_auth"]["auto"][
auth_creds_dict["username"]
]
assert set(perms) == set(perms_config)
assert "token" in response_obj # TODO: verify that its valid?
assert response_obj["user"] == auth_creds_dict["username"]
assert response_obj["eauth"] == auth_creds_dict["eauth"]
with subtests.test("Test in JSON"):
response = await http_client.fetch(
"/login",
method="POST",
body=salt.utils.json.dumps(auth_creds_dict),
headers={"Content-Type": content_type_map["json"]},
)
cookies = response.headers["Set-Cookie"]
assert response.code == 200
response_obj = salt.utils.json.loads(response.body)["return"][0]
token = response_obj["token"]
assert "session_id={}".format(token) in cookies
perms = response_obj["perms"]
perms_config = client_config["external_auth"]["auto"][
auth_creds_dict["username"]
]
assert set(perms) == set(perms_config)
assert "token" in response_obj # TODO: verify that its valid?
assert response_obj["user"] == auth_creds_dict["username"]
assert response_obj["eauth"] == auth_creds_dict["eauth"]
with subtests.test("Test in YAML"):
response = await http_client.fetch(
"/login",
method="POST",
body=salt.utils.yaml.safe_dump(auth_creds_dict),
headers={"Content-Type": content_type_map["yaml"]},
)
cookies = response.headers["Set-Cookie"]
assert response.code == 200
response_obj = salt.utils.json.loads(response.body)["return"][0]
token = response_obj["token"]
assert "session_id={}".format(token) in cookies
perms = response_obj["perms"]
perms_config = client_config["external_auth"]["auto"][
auth_creds_dict["username"]
]
assert set(perms) == set(perms_config)
assert "token" in response_obj # TODO: verify that its valid?
assert response_obj["user"] == auth_creds_dict["username"]
assert response_obj["eauth"] == auth_creds_dict["eauth"]
async def test_login_missing_password(http_client, auth_creds_dict, content_type_map):
"""
Test logins with bad/missing passwords
"""
bad_creds = []
for key, val in auth_creds_dict.items():
if key == "password":
continue
bad_creds.append((key, val))
with pytest.raises(HTTPError) as exc:
await http_client.fetch(
"/login",
method="POST",
body=urllib.parse.urlencode(bad_creds),
headers={"Content-Type": content_type_map["form"]},
)
assert exc.value.code == 400
async def test_login_bad_creds(http_client, content_type_map, auth_creds_dict):
"""
Test logins with bad/missing passwords
"""
bad_creds = []
for key, val in auth_creds_dict.items():
if key == "username":
val = val + "foo"
if key == "eauth":
val = "sharedsecret"
bad_creds.append((key, val))
with pytest.raises(HTTPError) as exc:
await http_client.fetch(
"/login",
method="POST",
body=urllib.parse.urlencode(bad_creds),
headers={"Content-Type": content_type_map["form"]},
)
assert exc.value.code == 401
async def test_login_invalid_data_structure(http_client, content_type_map, auth_creds):
"""
Test logins with either list or string JSON payload
"""
with pytest.raises(HTTPError) as exc:
await http_client.fetch(
"/login",
method="POST",
body=salt.utils.json.dumps(auth_creds),
headers={"Content-Type": content_type_map["form"]},
)
assert exc.value.code == 400
with pytest.raises(HTTPError) as exc:
await http_client.fetch(
"/login",
method="POST",
body=salt.utils.json.dumps(42),
headers={"Content-Type": content_type_map["form"]},
)
assert exc.value.code == 400
with pytest.raises(HTTPError) as exc:
await http_client.fetch(
"/login",
method="POST",
body=salt.utils.json.dumps("mystring42"),
headers={"Content-Type": content_type_map["form"]},
)
assert exc.value.code == 400

View file

@ -0,0 +1,387 @@
import urllib.parse
import pytest
import salt.utils.json
import salt.utils.yaml
from salt.ext.tornado.httpclient import HTTPError
from salt.netapi.rest_tornado import saltnado
class StubHandler(saltnado.BaseSaltAPIHandler): # pylint: disable=abstract-method
def get(self, *args, **kwargs):
return self.echo_stuff()
def post(self): # pylint: disable=arguments-differ
return self.echo_stuff()
def echo_stuff(self):
ret_dict = {"foo": "bar"}
attrs = (
"token",
"start",
"connected",
"lowstate",
)
for attr in attrs:
ret_dict[attr] = getattr(self, attr)
self.write(self.serialize(ret_dict))
@pytest.fixture
def app_urls():
return [
("/", StubHandler),
("/(.*)", StubHandler),
]
async def test_accept_content_type(http_client, content_type_map, subtests):
"""
Test the base handler's accept picking
"""
with subtests.test("Send NO accept header, should come back with json"):
response = await http_client.fetch("/")
assert response.headers["Content-Type"] == content_type_map["json"]
assert isinstance(salt.utils.json.loads(response.body), dict)
with subtests.test("Request application/json"):
response = await http_client.fetch(
"/", headers={"Accept": content_type_map["json"]}
)
assert response.headers["Content-Type"] == content_type_map["json"]
assert isinstance(salt.utils.json.loads(response.body), dict)
with subtests.test("Request application/x-yaml"):
response = await http_client.fetch(
"/", headers={"Accept": content_type_map["yaml"]}
)
assert response.headers["Content-Type"] == content_type_map["yaml"]
assert isinstance(salt.utils.yaml.safe_load(response.body), dict)
with subtests.test("Request not supported content-type"):
with pytest.raises(HTTPError) as error:
await http_client.fetch("/", headers={"Accept": content_type_map["xml"]})
assert error.value.code == 406
with subtests.test("Request some JSON with a browser like Accept"):
accept_header = content_type_map["real-accept-header-json"]
response = await http_client.fetch("/", headers={"Accept": accept_header})
assert response.headers["Content-Type"] == content_type_map["json"]
assert isinstance(salt.utils.json.loads(response.body), dict)
with subtests.test("Request some YAML with a browser like Accept"):
accept_header = content_type_map["real-accept-header-yaml"]
response = await http_client.fetch("/", headers={"Accept": accept_header})
assert response.headers["Content-Type"] == content_type_map["yaml"]
assert isinstance(salt.utils.yaml.safe_load(response.body), dict)
async def test_token(http_client):
"""
Test that the token is returned correctly
"""
response = await http_client.fetch("/")
token = salt.utils.json.loads(response.body)["token"]
assert token is None
# send a token as a header
response = await http_client.fetch("/", headers={saltnado.AUTH_TOKEN_HEADER: "foo"})
token = salt.utils.json.loads(response.body)["token"]
assert token == "foo"
# send a token as a cookie
response = await http_client.fetch(
"/", headers={"Cookie": "{}=foo".format(saltnado.AUTH_COOKIE_NAME)}
)
token = salt.utils.json.loads(response.body)["token"]
assert token == "foo"
# send both, make sure its the header
response = await http_client.fetch(
"/",
headers={
saltnado.AUTH_TOKEN_HEADER: "foo",
"Cookie": "{}=bar".format(saltnado.AUTH_COOKIE_NAME),
},
)
token = salt.utils.json.loads(response.body)["token"]
assert token == "foo"
async def test_deserialize(http_client, content_type_map, subtests):
"""
Send various encoded forms of lowstates (and bad ones) to make sure we
handle deserialization correctly
"""
valid_lowstate = [
{"client": "local", "tgt": "*", "fun": "test.fib", "arg": ["10"]},
{"client": "runner", "fun": "jobs.lookup_jid", "jid": "20130603122505459265"},
]
with subtests.test("send as JSON"):
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(valid_lowstate),
headers={"Content-Type": content_type_map["json"]},
)
assert valid_lowstate == salt.utils.json.loads(response.body)["lowstate"]
with subtests.test("send yaml as json (should break)"):
with pytest.raises(HTTPError) as exc:
await http_client.fetch(
"/",
method="POST",
body=salt.utils.yaml.safe_dump(valid_lowstate),
headers={"Content-Type": content_type_map["json"]},
)
assert exc.value.code == 400
with subtests.test("send as yaml"):
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.yaml.safe_dump(valid_lowstate),
headers={"Content-Type": content_type_map["yaml"]},
)
assert valid_lowstate == salt.utils.json.loads(response.body)["lowstate"]
with subtests.test("send json as yaml (works since yaml is a superset of json)"):
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(valid_lowstate),
headers={"Content-Type": content_type_map["yaml"]},
)
assert valid_lowstate == salt.utils.json.loads(response.body)["lowstate"]
with subtests.test("send json as text/plain"):
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(valid_lowstate),
headers={"Content-Type": content_type_map["text"]},
)
assert valid_lowstate == salt.utils.json.loads(response.body)["lowstate"]
with subtests.test("send form-urlencoded"):
form_lowstate = (
("client", "local"),
("tgt", "*"),
("fun", "test.fib"),
("arg", "10"),
("arg", "foo"),
)
response = await http_client.fetch(
"/",
method="POST",
body=urllib.parse.urlencode(form_lowstate),
headers={"Content-Type": content_type_map["form"]},
)
returned_lowstate = salt.utils.json.loads(response.body)["lowstate"]
assert len(returned_lowstate) == 1
returned_lowstate = returned_lowstate[0]
assert returned_lowstate["client"] == "local"
assert returned_lowstate["tgt"] == "*"
assert returned_lowstate["fun"] == "test.fib"
assert returned_lowstate["arg"] == ["10", "foo"]
with subtests.test("Send json with utf8 charset"):
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(valid_lowstate),
headers={"Content-Type": content_type_map["json-utf8"]},
)
assert valid_lowstate == salt.utils.json.loads(response.body)["lowstate"]
async def test_get_lowstate(http_client, content_type_map, subtests):
"""
Test transformations low data of the function _get_lowstate
"""
valid_lowstate = [{"client": "local", "tgt": "*", "fun": "test.fib", "arg": ["10"]}]
with subtests.test("Case 1. dictionary type of lowstate"):
request_lowstate = {
"client": "local",
"tgt": "*",
"fun": "test.fib",
"arg": ["10"],
}
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(request_lowstate),
headers={"Content-Type": content_type_map["json"]},
)
assert valid_lowstate == salt.utils.json.loads(response.body)["lowstate"]
with subtests.test("Case 2. string type of arg"):
request_lowstate = {
"client": "local",
"tgt": "*",
"fun": "test.fib",
"arg": "10",
}
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(request_lowstate),
headers={"Content-Type": content_type_map["json"]},
)
assert valid_lowstate == salt.utils.json.loads(response.body)["lowstate"]
with subtests.test("Case 3. Combine Case 1 and Case 2."):
request_lowstate = {
"client": "local",
"tgt": "*",
"fun": "test.fib",
"arg": "10",
}
# send as json
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(request_lowstate),
headers={"Content-Type": content_type_map["json"]},
)
assert valid_lowstate == salt.utils.json.loads(response.body)["lowstate"]
with subtests.test("send as yaml"):
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.yaml.safe_dump(request_lowstate),
headers={"Content-Type": content_type_map["yaml"]},
)
assert valid_lowstate == salt.utils.json.loads(response.body)["lowstate"]
with subtests.test("send as plain text"):
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(request_lowstate),
headers={"Content-Type": content_type_map["text"]},
)
assert valid_lowstate == salt.utils.json.loads(response.body)["lowstate"]
with subtests.test("send as form-urlencoded"):
request_form_lowstate = (
("client", "local"),
("tgt", "*"),
("fun", "test.fib"),
("arg", "10"),
)
response = await http_client.fetch(
"/",
method="POST",
body=urllib.parse.urlencode(request_form_lowstate),
headers={"Content-Type": content_type_map["form"]},
)
assert valid_lowstate == salt.utils.json.loads(response.body)["lowstate"]
async def test_cors_origin_wildcard(http_client, app):
"""
Check that endpoints returns Access-Control-Allow-Origin
"""
app.mod_opts["cors_origin"] = "*"
response = await http_client.fetch("/")
assert response.headers["Access-Control-Allow-Origin"] == "*"
async def test_cors_origin_single(http_client, app, subtests):
"""
Check that endpoints returns the Access-Control-Allow-Origin when
only one origins is set
"""
app.mod_opts["cors_origin"] = "http://example.foo"
with subtests.test("Example.foo is an authorized origin"):
response = await http_client.fetch(
"/", headers={"Origin": "http://example.foo"}
)
assert response.headers["Access-Control-Allow-Origin"] == "http://example.foo"
with subtests.test("Example2.foo is not an authorized origin"):
response = await http_client.fetch(
"/", headers={"Origin": "http://example2.foo"}
)
assert response.headers.get("Access-Control-Allow-Origin") is None
async def test_cors_origin_multiple(http_client, app, subtests):
"""
Check that endpoints returns the Access-Control-Allow-Origin when
multiple origins are set
"""
app.mod_opts["cors_origin"] = ["http://example.foo", "http://foo.example"]
with subtests.test("Example.foo is an authorized origin"):
response = await http_client.fetch(
"/", headers={"Origin": "http://example.foo"}
)
assert response.headers["Access-Control-Allow-Origin"] == "http://example.foo"
with subtests.test("Example2.foo is not an authorized origin"):
response = await http_client.fetch(
"/", headers={"Origin": "http://example2.foo"}
)
assert response.headers.get("Access-Control-Allow-Origin") is None
async def test_cors_preflight_request(http_client, app):
"""
Check that preflight request contains right headers
"""
app.mod_opts["cors_origin"] = "*"
request_headers = "X-Auth-Token, accept, content-type"
preflight_headers = {
"Access-Control-Request-Headers": request_headers,
"Access-Control-Request-Method": "GET",
}
response = await http_client.fetch("/", method="OPTIONS", headers=preflight_headers)
headers = response.headers
assert response.code == 204
assert headers["Access-Control-Allow-Headers"] == request_headers
assert headers["Access-Control-Expose-Headers"] == "X-Auth-Token"
assert headers["Access-Control-Allow-Methods"] == "OPTIONS, GET, POST"
assert response.code == 204
async def test_cors_origin_url_with_arguments(app, http_client):
"""
Check that preflight requests works with url with components
like jobs or minions endpoints.
"""
app.mod_opts["cors_origin"] = "*"
request_headers = "X-Auth-Token, accept, content-type"
preflight_headers = {
"Access-Control-Request-Headers": request_headers,
"Access-Control-Request-Method": "GET",
}
response = await http_client.fetch(
"/1234567890", method="OPTIONS", headers=preflight_headers
)
headers = response.headers
assert response.code == 204
assert headers["Access-Control-Allow-Origin"] == "*"

View file

@ -0,0 +1,157 @@
import pytest
import salt.utils.event
from salt.netapi.rest_tornado import saltnado
from tests.support.events import eventpublisher_process
pytestmark = [
pytest.mark.slow_test,
]
class Request:
__slots__ = ("_finished",)
def __init__(self):
self._finished = False
@pytest.fixture
def sock_dir(tmp_path):
yield str(tmp_path)
async def test_simple(sock_dir):
"""
Test getting a few events
"""
with eventpublisher_process(sock_dir):
with salt.utils.event.MasterEvent(sock_dir) as me:
request = Request()
event_listener = saltnado.EventListener(
{}, # we don't use mod_opts, don't save?
{"sock_dir": sock_dir, "transport": "zeromq"},
)
event_future = event_listener.get_event(
request, "evt1"
) # get an event future
me.fire_event({"data": "foo2"}, "evt2") # fire an event we don't want
me.fire_event({"data": "foo1"}, "evt1") # fire an event we do want
await event_future # wait for the future
# check that we got the event we wanted
assert event_future.done()
assert event_future.result()["tag"] == "evt1"
assert event_future.result()["data"]["data"] == "foo1"
async def test_set_event_handler(sock_dir):
"""
Test subscribing events using set_event_handler
"""
with eventpublisher_process(sock_dir):
with salt.utils.event.MasterEvent(sock_dir) as me:
request = Request()
event_listener = saltnado.EventListener(
{}, # we don't use mod_opts, don't save?
{"sock_dir": sock_dir, "transport": "zeromq"},
)
event_future = event_listener.get_event(
request,
tag="evt",
timeout=1,
) # get an event future
me.fire_event({"data": "foo"}, "evt") # fire an event we do want
await event_future # wait for the future
# check that we subscribed the event we wanted
assert len(event_listener.timeout_map) == 0
async def test_timeout(sock_dir):
"""
Make sure timeouts work correctly
"""
with eventpublisher_process(sock_dir):
request = Request()
event_listener = saltnado.EventListener(
{}, # we don't use mod_opts, don't save?
{"sock_dir": sock_dir, "transport": "zeromq"},
)
event_future = event_listener.get_event(
request,
tag="evt1",
timeout=1,
) # get an event future
with pytest.raises(saltnado.TimeoutException):
await event_future # wait for the future
assert event_future.done()
async def test_clean_by_request(sock_dir, io_loop):
"""
Make sure the method clean_by_request clean up every related data in EventListener
request_future_1 : will be timeout-ed by clean_by_request(request1)
request_future_2 : will be finished by me.fire_event and awaiting for it ...
request_future_3 : will be finished by me.fire_event and awaiting for it ...
request_future_4 : will be timeout-ed by clean-by_request(request2)
"""
with eventpublisher_process(sock_dir):
with salt.utils.event.MasterEvent(sock_dir) as me:
request1 = Request()
request2 = Request()
event_listener = saltnado.EventListener(
{}, # we don't use mod_opts, don't save?
{"sock_dir": sock_dir, "transport": "zeromq"},
)
assert 0 == len(event_listener.tag_map)
assert 0 == len(event_listener.request_map)
request_future_1 = event_listener.get_event(request1, tag="evt1")
request_future_2 = event_listener.get_event(request1, tag="evt2")
dummy_request_future_1 = event_listener.get_event(request2, tag="evt3")
dummy_request_future_2 = event_listener.get_event(
request2, timeout=10, tag="evt4"
)
assert 4 == len(event_listener.tag_map)
assert 2 == len(event_listener.request_map)
me.fire_event({"data": "foo2"}, "evt2")
me.fire_event({"data": "foo3"}, "evt3")
await request_future_2
await dummy_request_future_1
event_listener.clean_by_request(request1)
me.fire_event({"data": "foo1"}, "evt1")
assert request_future_1.done()
with pytest.raises(saltnado.TimeoutException):
request_future_1.result()
assert request_future_2.done()
assert request_future_2.result()["tag"] == "evt2"
assert request_future_2.result()["data"]["data"] == "foo2"
assert dummy_request_future_1.done()
assert dummy_request_future_1.result()["tag"] == "evt3"
assert dummy_request_future_1.result()["data"]["data"] == "foo3"
assert not dummy_request_future_2.done()
assert 2 == len(event_listener.tag_map)
assert 1 == len(event_listener.request_map)
event_listener.clean_by_request(request2)
with pytest.raises(saltnado.TimeoutException):
dummy_request_future_2.result()
assert 0 == len(event_listener.tag_map)
assert 0 == len(event_listener.request_map)

View file

@ -0,0 +1,36 @@
import pytest
import salt.utils.json
from salt.netapi.rest_tornado import saltnado
@pytest.fixture
def app_urls():
return [
("/run", saltnado.RunSaltAPIHandler),
]
@pytest.mark.parametrize("client", ["local", "local_async", "runner", "runner_async"])
async def test_authentication_exception_consistency(
http_client, client, content_type_map
):
"""
Test consistency of authentication exception of each clients.
"""
valid_response = {"return": ["Failed to authenticate"]}
request_lowstate = {
"client": client,
"tgt": "*",
"fun": "test.fib",
"arg": ["10"],
}
response = await http_client.fetch(
"/run",
method="POST",
body=salt.utils.json.dumps(request_lowstate),
headers={"Content-Type": content_type_map["json"]},
)
assert salt.utils.json.loads(response.body) == valid_response

View file

@ -0,0 +1,40 @@
import salt.ext.tornado.concurrent
from salt.netapi.rest_tornado import saltnado
async def test_any_future():
"""
Test that the Any Future does what we think it does
"""
# create a few futures
futures = []
for _ in range(3):
future = salt.ext.tornado.concurrent.Future()
futures.append(future)
# create an any future, make sure it isn't immediately done
any_ = saltnado.Any(futures)
assert any_.done() is False
# finish one, lets see who finishes
futures[0].set_result("foo")
await futures[0]
assert any_.done() is True
assert futures[0].done() is True
assert futures[1].done() is False
assert futures[2].done() is False
# make sure it returned the one that finished
assert any_.result() == futures[0]
futures = futures[1:]
# re-wait on some other futures
any_ = saltnado.Any(futures)
futures[0].set_result("foo")
await futures[0]
assert any_.done() is True
assert futures[0].done() is True
assert futures[1].done() is False

View file

@ -0,0 +1,43 @@
import urllib.parse
import pytest
import salt.utils.json
from salt.netapi.rest_tornado import saltnado
from tests.support.mock import MagicMock, patch
@pytest.fixture
def app_urls():
return [
(r"/hook(/.*)?", saltnado.WebhookSaltAPIHandler),
]
async def test_hook_can_handle_get_parameters(http_client, app, content_type_map):
with patch("salt.utils.event.get_event") as get_event:
with patch.dict(app.mod_opts, {"webhook_disable_auth": True}):
event = MagicMock()
event.fire_event.return_value = True
get_event.return_value = event
response = await http_client.fetch(
"/hook/my_service/?param=1&param=2",
body=salt.utils.json.dumps({}),
method="POST",
headers={"Content-Type": content_type_map["json"]},
)
assert response.code == 200
host = urllib.parse.urlparse(response.effective_url).netloc
event.fire_event.assert_called_once_with(
{
"headers": {
"Content-Length": "2",
"Connection": "close",
"Content-Type": "application/json",
"Host": host,
"Accept-Encoding": "gzip",
},
"post": {},
"get": {"param": ["1", "2"]},
},
"salt/netapi/hook/my_service/",
)

View file

@ -0,0 +1,142 @@
import hashlib
import urllib.parse
import pytest
import salt.netapi.rest_tornado as rest_tornado
import salt.utils.json
import salt.utils.yaml
from salt.ext.tornado.httpclient import HTTPError, HTTPRequest
from salt.ext.tornado.websocket import websocket_connect
@pytest.fixture
def app(client_config):
client_config.setdefault("rest_tornado", {})["websockets"] = True
return rest_tornado.get_application(client_config)
@pytest.fixture
def http_server_port(http_server):
return http_server.port
async def test_websocket_handler_upgrade_to_websocket(
http_client, auth_creds, content_type_map, http_server_port
):
response = await http_client.fetch(
"/login",
method="POST",
body=urllib.parse.urlencode(auth_creds),
headers={"Content-Type": content_type_map["form"]},
)
token = salt.utils.json.loads(response.body)["return"][0]["token"]
url = "ws://127.0.0.1:{}/all_events/{}".format(http_server_port, token)
request = HTTPRequest(
url, headers={"Origin": "http://example.com", "Host": "example.com"}
)
ws = await websocket_connect(request)
ws.write_message("websocket client ready")
ws.close()
async def test_websocket_handler_bad_token(client_config, http_server):
"""
A bad token should returns a 401 during a websocket connect
"""
token = "A" * len(
getattr(hashlib, client_config.get("hash_type", "md5"))().hexdigest()
)
url = "ws://127.0.0.1:{}/all_events/{}".format(http_server.port, token)
request = HTTPRequest(
url, headers={"Origin": "http://example.com", "Host": "example.com"}
)
with pytest.raises(HTTPError) as exc:
await websocket_connect(request)
assert exc.value.code == 401
async def test_websocket_handler_cors_origin_wildcard(
app, http_client, auth_creds, content_type_map, http_server_port
):
app.mod_opts["cors_origin"] = "*"
response = await http_client.fetch(
"/login",
method="POST",
body=urllib.parse.urlencode(auth_creds),
headers={"Content-Type": content_type_map["form"]},
)
token = salt.utils.json.loads(response.body)["return"][0]["token"]
url = "ws://127.0.0.1:{}/all_events/{}".format(http_server_port, token)
request = HTTPRequest(
url, headers={"Origin": "http://foo.bar", "Host": "example.com"}
)
ws = await websocket_connect(request)
ws.write_message("websocket client ready")
ws.close()
async def test_cors_origin_single(
app, http_client, auth_creds, content_type_map, http_server_port
):
app.mod_opts["cors_origin"] = "http://example.com"
response = await http_client.fetch(
"/login",
method="POST",
body=urllib.parse.urlencode(auth_creds),
headers={"Content-Type": content_type_map["form"]},
)
token = salt.utils.json.loads(response.body)["return"][0]["token"]
url = "ws://127.0.0.1:{}/all_events/{}".format(http_server_port, token)
# Example.com should works
request = HTTPRequest(
url, headers={"Origin": "http://example.com", "Host": "example.com"}
)
ws = await websocket_connect(request)
ws.write_message("websocket client ready")
ws.close()
# But foo.bar not
request = HTTPRequest(
url, headers={"Origin": "http://foo.bar", "Host": "example.com"}
)
with pytest.raises(HTTPError) as exc:
await websocket_connect(request)
assert exc.value.code == 403
async def test_cors_origin_multiple(
app, http_client, auth_creds, content_type_map, http_server_port
):
app.mod_opts["cors_origin"] = ["http://example.com", "http://foo.bar"]
response = await http_client.fetch(
"/login",
method="POST",
body=urllib.parse.urlencode(auth_creds),
headers={"Content-Type": content_type_map["form"]},
)
token = salt.utils.json.loads(response.body)["return"][0]["token"]
url = "ws://127.0.0.1:{}/all_events/{}".format(http_server_port, token)
# Example.com should works
request = HTTPRequest(
url, headers={"Origin": "http://example.com", "Host": "example.com"}
)
ws = await websocket_connect(request)
ws.write_message("websocket client ready")
ws.close()
# But foo.bar not
request = HTTPRequest(
url, headers={"Origin": "http://foo.bar", "Host": "example.com"}
)
ws = await websocket_connect(request)
ws.write_message("websocket client ready")
ws.close()

View file

@ -0,0 +1,74 @@
import pytest
import salt.config
import tests.support.saltnado as saltnado_support
from salt.netapi.rest_tornado import saltnado
@pytest.fixture
def client_config(salt_master):
config = salt.config.client_config(
salt_master.config["conf_file"],
defaults=salt_master.config.copy(),
)
return config
@pytest.fixture
def minion_config(salt_minion):
return salt_minion.config.copy()
@pytest.fixture
def load_auth(client_config):
return saltnado_support.load_auth(client_config)
@pytest.fixture
def auth_creds():
return saltnado_support.auth_creds()
@pytest.fixture
def auth_creds_dict():
return saltnado_support.auth_creds_dict()
@pytest.fixture
def auth_token(load_auth, auth_creds_dict):
"""
Mint and return a valid token for auth_creds
"""
return saltnado_support.auth_token(load_auth, auth_creds_dict)
@pytest.fixture
def content_type_map():
return saltnado_support.content_type_map()
@pytest.fixture
def app(app_urls, load_auth, client_config, minion_config, salt_sub_minion):
return saltnado_support.build_tornado_app(
app_urls, load_auth, client_config, minion_config, setup_event_listener=True
)
@pytest.fixture
def client_headers(auth_token, content_type_map):
return {
"Content-Type": content_type_map["json"],
saltnado.AUTH_TOKEN_HEADER: auth_token["token"],
}
@pytest.fixture
def http_server(io_loop, app, client_headers):
with saltnado_support.TestsHttpServer(
io_loop=io_loop, app=app, client_headers=client_headers
) as server:
yield server
@pytest.fixture
def http_client(http_server):
return http_server.client

View file

@ -0,0 +1,49 @@
from functools import partial
import pytest
import salt.ext.tornado.gen
from salt.netapi.rest_tornado import saltnado
# TODO: run all the same tests from the root handler, but for now since they are
# the same code, we'll just sanity check
@pytest.fixture
def app_urls():
return [
(r"/events", saltnado.EventsSaltAPIHandler),
]
@pytest.mark.slow_test
async def test_get(http_client, io_loop, app):
events_fired = []
def on_event(events_fired, event):
if len(events_fired) < 6:
event = event.decode("utf-8")
app.event_listener.event.fire_event(
{"foo": "bar", "baz": "qux"}, "salt/netapi/test"
)
events_fired.append(1)
event = event.strip()
# if we got a retry, just continue
if event != "retry: 400":
tag, data = event.splitlines()
assert tag.startswith("tag: ")
assert data.startswith("data: ")
# We spawn the call here because otherwise the fetch method would
# continue reading indefinitely and there would be no wait to
# properly run the assertions or stop the request.
io_loop.spawn_callback(
http_client.fetch,
"/events",
streaming_callback=partial(on_event, events_fired),
request_timeout=30,
)
while len(events_fired) < 5:
await salt.ext.tornado.gen.sleep(1)
assert len(events_fired) >= 5

View file

@ -0,0 +1,60 @@
import pytest
import salt.utils.json
from salt.netapi.rest_tornado import saltnado
@pytest.fixture
def app_urls():
return [
(r"/jobs/(.*)", saltnado.JobsSaltAPIHandler),
(r"/jobs", saltnado.JobsSaltAPIHandler),
]
@pytest.mark.slow_test
@pytest.mark.async_timeout(seconds=120)
async def test_get(http_client, subtests):
# test with no JID
response = await http_client.fetch("/jobs", method="GET", follow_redirects=False)
response_obj = salt.utils.json.loads(response.body)["return"][0]
assert response_obj
assert isinstance(response_obj, dict)
for ret in response_obj.values():
with subtests.test('assert "Function" in ret'):
assert "Function" in ret
with subtests.test('assert "Target" in ret'):
assert "Target" in ret
with subtests.test('assert "Target-type" in ret'):
assert "Target-type" in ret
with subtests.test('assert "User" in ret'):
assert "User" in ret
with subtests.test('assert "StartTime" in ret'):
assert "StartTime" in ret
with subtests.test('assert "Arguments" in ret'):
assert "Arguments" in ret
# test with a specific JID passed in
jid = next(iter(response_obj.keys()))
response = await http_client.fetch(
"/jobs/{}".format(jid),
method="GET",
follow_redirects=False,
)
response_obj = salt.utils.json.loads(response.body)["return"][0]
assert response_obj
assert isinstance(response_obj, dict)
with subtests.test('assert "Function" in response_obj'):
assert "Function" in response_obj
with subtests.test('assert "Target" in response_obj'):
assert "Target" in response_obj
with subtests.test('assert "Target-type" in response_obj'):
assert "Target-type" in response_obj
with subtests.test('assert "User" in response_obj'):
assert "User" in response_obj
with subtests.test('assert "StartTime" in response_obj'):
assert "StartTime" in response_obj
with subtests.test('assert "Arguments" in response_obj'):
assert "Arguments" in response_obj
with subtests.test('assert "Result" in response_obj'):
assert "Result" in response_obj

View file

@ -0,0 +1,94 @@
import pytest
import salt.utils.json
from salt.ext.tornado.httpclient import HTTPError
from salt.netapi.rest_tornado import saltnado
@pytest.fixture
def app_urls():
return [
(r"/minions/(.*)", saltnado.MinionSaltAPIHandler),
(r"/minions", saltnado.MinionSaltAPIHandler),
]
async def test_get_no_mid(http_client):
response = await http_client.fetch(
"/minions",
method="GET",
follow_redirects=False,
)
response_obj = salt.utils.json.loads(response.body)
assert len(response_obj["return"]) == 1
# one per minion
assert len(response_obj["return"][0]) == 2
# check a single grain
for minion_id, grains in response_obj["return"][0].items():
assert minion_id == grains["id"]
@pytest.mark.slow_test
async def test_get(http_client, salt_minion):
response = await http_client.fetch(
"/minions/{}".format(salt_minion.id),
method="GET",
follow_redirects=False,
)
response_obj = salt.utils.json.loads(response.body)
assert len(response_obj["return"]) == 1
assert len(response_obj["return"][0]) == 1
# check a single grain
assert response_obj["return"][0][salt_minion.id]["id"] == salt_minion.id
async def test_post(http_client, salt_minion, salt_sub_minion):
low = [{"tgt": "*minion-*", "fun": "test.ping"}]
response = await http_client.fetch(
"/minions",
method="POST",
body=salt.utils.json.dumps(low),
)
response_obj = salt.utils.json.loads(response.body)
ret = response_obj["return"]
# TODO: verify pub function? Maybe look at how we test the publisher
assert len(ret) == 1
assert "jid" in ret[0]
assert sorted(ret[0]["minions"]) == sorted([salt_minion.id, salt_sub_minion.id])
@pytest.mark.slow_test
async def test_post_with_client(http_client, salt_minion, salt_sub_minion):
# get a token for this test
low = [{"client": "local_async", "tgt": "*minion-*", "fun": "test.ping"}]
response = await http_client.fetch(
"/minions",
method="POST",
body=salt.utils.json.dumps(low),
)
response_obj = salt.utils.json.loads(response.body)
ret = response_obj["return"]
# TODO: verify pub function? Maybe look at how we test the publisher
assert len(ret) == 1
assert "jid" in ret[0]
assert sorted(ret[0]["minions"]) == sorted([salt_minion.id, salt_sub_minion.id])
@pytest.mark.slow_test
async def test_post_with_incorrect_client(http_client):
"""
The /minions endpoint is asynchronous only, so if you try something else
make sure you get an error
"""
# get a token for this test
low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
with pytest.raises(HTTPError) as exc:
await http_client.fetch(
"/minions",
method="POST",
body=salt.utils.json.dumps(low),
)
assert exc.value.code == 400

View file

@ -0,0 +1,258 @@
import pytest
import salt.utils.json
from salt.ext.tornado.httpclient import HTTPError
from salt.netapi.rest_tornado import saltnado
@pytest.fixture
def app_urls(salt_sub_minion):
return [
("/", saltnado.SaltAPIHandler),
]
async def test_root(http_client):
"""
Test the root path which returns the list of clients we support
"""
response = await http_client.fetch(
"/", connect_timeout=30, request_timeout=30, headers=None
)
assert response.code == 200
response_obj = salt.utils.json.loads(response.body)
assert sorted(response_obj["clients"]) == [
"local",
"local_async",
"runner",
"runner_async",
]
assert response_obj["return"] == "Welcome"
@pytest.mark.slow_test
async def test_post_no_auth(http_client, content_type_map):
"""
Test post with no auth token, should 401
"""
# get a token for this test
low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
with pytest.raises(HTTPError) as exc:
await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
headers={"Content-Type": content_type_map["json"]},
follow_redirects=False,
connect_timeout=30,
request_timeout=30,
)
assert exc.value.code == 302
assert exc.value.response.headers["Location"] == "/login"
# Local client tests
async def test_simple_local_post(http_client, salt_minion, salt_sub_minion):
"""
Test a basic API of /
"""
low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
connect_timeout=30,
request_timeout=30,
)
response_obj = salt.utils.json.loads(response.body)
assert len(response_obj["return"]) == 1
assert response_obj["return"][0] == {salt_minion.id: True, salt_sub_minion.id: True}
async def test_simple_local_post_no_tgt(http_client):
"""
POST job with invalid tgt
"""
low = [{"client": "local", "tgt": "minion_we_dont_have", "fun": "test.ping"}]
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
connect_timeout=30,
request_timeout=30,
)
response_obj = salt.utils.json.loads(response.body)
assert response_obj["return"] == [
"No minions matched the target. No command was sent, no jid was assigned."
]
# local client request body test
async def test_simple_local_post_only_dictionary_request(
http_client, salt_minion, salt_sub_minion
):
"""
Test a basic API of /
"""
low = {
"client": "local",
"tgt": "*",
"fun": "test.ping",
}
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
connect_timeout=30,
request_timeout=30,
)
response_obj = salt.utils.json.loads(response.body)
assert len(response_obj["return"]) == 1
assert response_obj["return"][0] == {salt_minion.id: True, salt_sub_minion.id: True}
async def test_simple_local_post_invalid_request(http_client):
"""
Test a basic API of /
"""
low = ["invalid request"]
with pytest.raises(HTTPError) as exc:
await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
connect_timeout=30,
request_timeout=30,
)
assert exc.value.code == 400
# local_async tests
async def test_simple_local_async_post(http_client, salt_minion, salt_sub_minion):
low = [{"client": "local_async", "tgt": "*", "fun": "test.ping"}]
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
)
response_obj = salt.utils.json.loads(response.body)
ret = response_obj["return"]
ret[0]["minions"] = sorted(ret[0]["minions"])
# TODO: verify pub function? Maybe look at how we test the publisher
assert len(ret) == 1
assert "jid" in ret[0]
assert ret[0]["minions"] == sorted([salt_minion.id, salt_sub_minion.id])
async def test_multi_local_async_post(http_client, salt_minion, salt_sub_minion):
low = [
{"client": "local_async", "tgt": "*", "fun": "test.ping"},
{"client": "local_async", "tgt": "*", "fun": "test.ping"},
]
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
)
response_obj = salt.utils.json.loads(response.body)
ret = response_obj["return"]
ret[0]["minions"] = sorted(ret[0]["minions"])
ret[1]["minions"] = sorted(ret[1]["minions"])
assert len(ret) == 2
assert "jid" in ret[0]
assert "jid" in ret[1]
assert ret[0]["minions"] == sorted([salt_minion.id, salt_sub_minion.id])
assert ret[1]["minions"] == sorted([salt_minion.id, salt_sub_minion.id])
@pytest.mark.slow_test
async def test_multi_local_async_post_multitoken(
http_client, auth_token, salt_minion, salt_sub_minion
):
low = [
{"client": "local_async", "tgt": "*", "fun": "test.ping"},
{
"client": "local_async",
"tgt": "*",
"fun": "test.ping",
# send a different (but still valid token)
"token": auth_token["token"],
},
{
"client": "local_async",
"tgt": "*",
"fun": "test.ping",
"token": "BAD_TOKEN", # send a bad token
},
]
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
)
response_obj = salt.utils.json.loads(response.body)
ret = response_obj["return"]
ret[0]["minions"] = sorted(ret[0]["minions"])
ret[1]["minions"] = sorted(ret[1]["minions"])
assert len(ret) == 3 # make sure we got 3 responses
assert "jid" in ret[0] # the first 2 are regular returns
assert "jid" in ret[1]
assert "Failed to authenticate" in ret[2] # bad auth
assert ret[0]["minions"] == sorted([salt_minion.id, salt_sub_minion.id])
assert ret[1]["minions"] == sorted([salt_minion.id, salt_sub_minion.id])
@pytest.mark.slow_test
async def test_simple_local_async_post_no_tgt(http_client):
low = [{"client": "local_async", "tgt": "minion_we_dont_have", "fun": "test.ping"}]
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
)
response_obj = salt.utils.json.loads(response.body)
assert response_obj["return"] == [{}]
# runner tests
@pytest.mark.slow_test
async def test_simple_local_runner_post(http_client, salt_minion, salt_sub_minion):
low = [{"client": "runner", "fun": "manage.up"}]
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
connect_timeout=30,
request_timeout=300,
)
response_obj = salt.utils.json.loads(response.body)
assert len(response_obj["return"]) == 1
assert sorted(response_obj["return"][0]) == sorted(
[salt_minion.id, salt_sub_minion.id]
)
# runner_async tests
async def test_simple_local_runner_async_post(http_client):
low = [{"client": "runner_async", "fun": "manage.up"}]
response = await http_client.fetch(
"/",
method="POST",
body=salt.utils.json.dumps(low),
connect_timeout=10,
request_timeout=10,
)
response_obj = salt.utils.json.loads(response.body)
assert "return" in response_obj
assert 1 == len(response_obj["return"])
assert "jid" in response_obj["return"][0]
assert "tag" in response_obj["return"][0]

View file

@ -0,0 +1,26 @@
import pytest
import salt.utils.json
from salt.netapi.rest_tornado import saltnado
# TODO: run all the same tests from the root handler, but for now since they are
# the same code, we'll just sanity check
@pytest.fixture
def app_urls():
return [
("/run", saltnado.RunSaltAPIHandler),
]
@pytest.mark.slow_test
async def test_get(http_client, salt_minion, salt_sub_minion):
low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
response = await http_client.fetch(
"/run",
method="POST",
body=salt.utils.json.dumps(low),
)
response_obj = salt.utils.json.loads(response.body)
ret = response_obj["return"]
assert sorted(ret[0]) == sorted([salt_minion.id, salt_sub_minion.id])

151
tests/support/saltnado.py Normal file
View file

@ -0,0 +1,151 @@
import logging
import socket
import attr
import salt.auth
import salt.ext.tornado.escape
import salt.ext.tornado.web
from salt.ext.tornado import netutil
from salt.ext.tornado.httpclient import AsyncHTTPClient, HTTPError
from salt.ext.tornado.httpserver import HTTPServer
from salt.ext.tornado.ioloop import TimeoutError as IOLoopTimeoutError
from salt.netapi.rest_tornado import saltnado
log = logging.getLogger(__name__)
@attr.s
class TestsHttpClient:
address = attr.ib()
io_loop = attr.ib(repr=False)
headers = attr.ib(default=None)
client = attr.ib(init=False, repr=False)
@client.default
def _client_default(self):
return AsyncHTTPClient(self.io_loop)
async def fetch(self, path, **kwargs):
if "headers" not in kwargs and self.headers:
kwargs["headers"] = self.headers.copy()
try:
response = await self.client.fetch(
"{}{}".format(self.address, path), **kwargs
)
return self._decode_body(response)
except HTTPError as exc:
exc.response = self._decode_body(exc.response)
raise
def _decode_body(self, response):
if response is None:
return response
if response.body:
# Decode it
if response.headers.get("Content-Type") == "application/json":
response._body = response.body.decode("utf-8")
else:
response._body = salt.ext.tornado.escape.native_str(response.body)
return response
@attr.s
class TestsHttpServer:
io_loop = attr.ib(repr=False)
app = attr.ib()
protocol = attr.ib(default="http", repr=False)
http_server_options = attr.ib(default=attr.Factory(dict))
sock = attr.ib(init=False, repr=False)
port = attr.ib(init=False, repr=False)
address = attr.ib(init=False)
server = attr.ib(init=False)
client_headers = attr.ib(default=None)
client = attr.ib(init=False, repr=False)
@sock.default
def _sock_default(self):
return netutil.bind_sockets(
None, "127.0.0.1", family=socket.AF_INET, reuse_port=False
)[0]
@port.default
def _port_default(self):
return self.sock.getsockname()[1]
@address.default
def _address_default(self):
return "{}://127.0.0.1:{}".format(self.protocol, self.port)
@server.default
def _server_default(self):
server = HTTPServer(self.app, io_loop=self.io_loop, **self.http_server_options)
server.add_sockets([self.sock])
return server
@client.default
def _client_default(self):
return TestsHttpClient(
address=self.address, io_loop=self.io_loop, headers=self.client_headers
)
def __enter__(self):
return self
def __exit__(self, *_):
self.server.stop()
try:
self.io_loop.run_sync(self.server.close_all_connections, timeout=10)
except IOLoopTimeoutError:
pass
self.client.client.close()
def load_auth(client_config):
return salt.auth.LoadAuth(client_config)
def auth_creds():
return (
("username", "saltdev_api"),
("password", "saltdev"),
("eauth", "auto"),
)
def auth_creds_dict():
return dict(auth_creds())
def auth_token(load_auth, auth_creds_dict):
"""
Mint and return a valid token for auth_creds
"""
return load_auth.mk_token(auth_creds_dict)
def build_tornado_app(
urls, load_auth, client_config, minion_config, setup_event_listener=False
):
application = salt.ext.tornado.web.Application(urls, debug=True)
application.auth = load_auth
application.opts = client_config
application.mod_opts = minion_config
if setup_event_listener:
application.event_listener = saltnado.EventListener(
minion_config, client_config
)
return application
def content_type_map():
return {
"json": "application/json",
"json-utf8": "application/json; charset=utf-8",
"yaml": "application/x-yaml",
"text": "text/plain",
"form": "application/x-www-form-urlencoded",
"xml": "application/xml",
"real-accept-header-json": "application/json, text/javascript, */*; q=0.01",
"real-accept-header-yaml": "application/x-yaml, text/yaml, */*; q=0.01",
}

File diff suppressed because it is too large Load diff