Factor out sum and sorting of permissions into separate functions.

Fixes #56495

Additionally, the same logic was applied to the rest_cherrypy netapi
This commit is contained in:
Pedro Algarvio 2021-07-14 15:52:41 +01:00 committed by Megan Wilhite
parent 9bccfcf9c4
commit 44ffb72840
8 changed files with 653 additions and 29 deletions

2
changelog/56495.fixed Normal file
View file

@ -0,0 +1,2 @@
Factor out sum and sorting of permissions into separate functions.
Additionally, the same logic was applied to the rest_cherrypy netapi

View file

@ -24,6 +24,40 @@ from salt.defaults import DEFAULT_TARGET_DELIM
log = logging.getLogger(__name__)
def sorted_permissions(perms):
"""
Return a sorted list of the passed in permissions, de-duplicating in the process
"""
_str_perms = []
_non_str_perms = []
for entry in perms:
if isinstance(entry, str):
if entry in _str_perms:
continue
_str_perms.append(entry)
continue
if entry in _non_str_perms:
continue
_non_str_perms.append(entry)
return sorted(_str_perms) + sorted(_non_str_perms, key=repr)
def sum_permissions(token, eauth):
"""
Returns the sum of '*', user-specific and group specific permissions
"""
perms = eauth.get(token["name"], [])
perms.extend(eauth.get("*", []))
if "groups" in token and token["groups"]:
user_groups = set(token["groups"])
eauth_groups = {i.rstrip("%") for i in eauth.keys() if i.endswith("%")}
for group in user_groups & eauth_groups:
perms.extend(eauth["{}%".format(group)])
return perms
class NetapiClient:
"""
Provide a uniform method of accessing the various client interfaces in Salt

View file

@ -1899,18 +1899,8 @@ class Login(LowDataAdapter):
if token["eauth"] == "django" and "^model" in eauth:
perms = token["auth_list"]
else:
# Get sum of '*' perms, user-specific perms, and group-specific perms
perms = eauth.get(token["name"], []).copy()
perms.extend(eauth.get("*", []))
if "groups" in token and token["groups"]:
user_groups = set(token["groups"])
eauth_groups = {
i.rstrip("%") for i in eauth.keys() if i.endswith("%")
}
for group in user_groups & eauth_groups:
perms.extend(eauth["{}%".format(group)])
perms = salt.netapi.sum_permissions(token, eauth)
perms = salt.netapi.sorted_permissions(perms)
if not perms:
logger.debug("Eauth permission list not found.")

View file

@ -756,18 +756,8 @@ class SaltAuthHandler(BaseSaltAPIHandler): # pylint: disable=W0223
# Grab eauth config for the current backend for the current user
try:
eauth = self.application.opts["external_auth"][token["eauth"]]
# Get sum of '*' perms, user-specific perms, and group-specific perms
perms = eauth.get(token["name"], [])
perms.extend(eauth.get("*", []))
if "groups" in token and token["groups"]:
user_groups = set(token["groups"])
eauth_groups = {i.rstrip("%") for i in eauth.keys() if i.endswith("%")}
for group in user_groups & eauth_groups:
perms.extend(eauth["{}%".format(group)])
perms = sorted(list(set(perms)))
perms = salt.netapi.sum_permissions(token, eauth)
perms = salt.netapi.sorted_permissions(perms)
# If we can't find the creds, then they aren't authorized
except KeyError:
self.send_error(401)

View file

@ -226,11 +226,10 @@ salt/modules/*_sysctl.py:
- integration.modules.test_sysctl
salt/netapi/rest_cherrypy/*:
- pytests.integration.netapi.test_client
- pytests.integration.netapi.test_ssh_client
- pytests.functional.netapi.rest_cherrypy.test_auth
- pytests.functional.netapi.rest_cherrypy.test_auth_pam
- pytests.functional.netapi.rest_cherrypy.test_cors
- pytests.functional.netapi.rest_cherrypy.test_external_auth_syntax
- pytests.functional.netapi.rest_cherrypy.test_in_formats
- pytests.functional.netapi.rest_cherrypy.test_out_formats
- pytests.integration.netapi.rest_cherrypy.test_arg_kwarg
@ -238,14 +237,15 @@ salt/netapi/rest_cherrypy/*:
- pytests.integration.netapi.rest_cherrypy.test_jobs
- pytests.integration.netapi.rest_cherrypy.test_run
- pytests.integration.netapi.rest_cherrypy.test_webhook_disable_auth
- pytests.integration.netapi.test_client
- pytests.integration.netapi.test_ssh_client
salt/netapi/rest_tornado/*:
- integration.netapi.rest_tornado.test_app
- pytests.integration.netapi.test_client
- pytests.integration.netapi.test_ssh_client
- pytests.functional.netapi.rest_tornado.test_auth_handler
- pytests.functional.netapi.rest_tornado.test_base_api_handler
- pytests.functional.netapi.rest_tornado.test_event_listener
- pytests.functional.netapi.rest_tornado.test_external_auth_syntax
- pytests.functional.netapi.rest_tornado.test_utils
- pytests.functional.netapi.rest_tornado.test_webhooks_handler
- pytests.functional.netapi.rest_tornado.test_websockets_handler
@ -254,6 +254,8 @@ salt/netapi/rest_tornado/*:
- pytests.integration.netapi.rest_tornado.test_minions_api_handler
- pytests.integration.netapi.rest_tornado.test_root_handler
- pytests.integration.netapi.rest_tornado.test_run_api_handler
- pytests.integration.netapi.test_client
- pytests.integration.netapi.test_ssh_client
salt/output/*:
- integration.output.test_output

View file

@ -3,7 +3,7 @@ import pathlib
import pytest
import salt.config
import tests.support.netapi as netapi
from saltfactories.utils.ports import get_unused_localhost_port
from pytestshellutils.utils.ports import get_unused_localhost_port
@pytest.fixture

View file

@ -0,0 +1,299 @@
import urllib.parse
import attr
import pytest
import salt.utils.json
import salt.utils.yaml
import tests.support.netapi as netapi
pytestmark = [
pytest.mark.destructive_test,
pytest.mark.skip_if_not_root,
]
ACCOUNT_USERNAME = "saltdev-syntax"
ACCOUNT_GROUP_NAME = "{}-group".format(ACCOUNT_USERNAME)
@attr.s(frozen=True, slots=True)
class ExternalAuthConfig:
eauth = attr.ib()
pam_key = attr.ib(repr=False)
pam_config = attr.ib(repr=False)
expected_perms = attr.ib(repr=False)
fixture_id = attr.ib(repr=False)
auto = attr.ib(init=False)
pam = attr.ib(init=False)
@auto.default
def _set_auto(self):
return {
"*": ["grains.*"],
ACCOUNT_USERNAME: ["@wheel"],
"{}%".format(ACCOUNT_GROUP_NAME): ["@runner"],
}
@pam.default
def _set_pam(self):
return {self.pam_key: self.pam_config}
@pytest.fixture(scope="module")
def netapi_account():
with pytest.helpers.create_account(
username=ACCOUNT_USERNAME, password="saltdev", group_name=ACCOUNT_GROUP_NAME
) as account:
yield account
def external_auth_ids(value):
return value.fixture_id
@pytest.fixture(
params=(
# By User
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=["test.*"],
expected_perms=["test.*"],
fixture_id="by-user-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=["test.*"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-auto",
),
# By Group
ExternalAuthConfig(
eauth="pam",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["grains.*"],
expected_perms=["grains.*"],
fixture_id="by-group-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@wheel", "grains.*"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-group-auto",
),
# By user, by minion
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=[{"G@id:master2": ["@jobs"]}, {"G@id:master1": ["@jobs"]}],
expected_perms=[{"G@id:master1": ["@jobs"]}, {"G@id:master2": ["@jobs"]}],
fixture_id="by-user-by-minion-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=[{"G@id:master2": ["@jobs"]}, {"G@id:master1": ["@jobs"]}],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-by-minion-auto",
),
# By user, by wheel
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=["@wheel"],
expected_perms=["@wheel"],
fixture_id="by-user-by-@wheel-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=["@wheel"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-by-@wheel-auto",
),
# By user, by runner
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=["@runner"],
expected_perms=["@runner"],
fixture_id="by-user-by-@runner-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=["@runner"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-by-@runner-auto",
),
# By user, by jobs
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=["@jobs"],
expected_perms=["@jobs"],
fixture_id="by-user-by-@jobs-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=["@jobs"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-by-@jobs-auto",
),
# By group, by wheel
ExternalAuthConfig(
eauth="pam",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@wheel"],
expected_perms=["@wheel"],
fixture_id="by-group-by-@wheel-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@wheel"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-group-by-@wheel-auto",
),
# By group, by runner
ExternalAuthConfig(
eauth="pam",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@runner"],
expected_perms=["@runner"],
fixture_id="by-group-by-@runner-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@runner"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-group-by-@runner-auto",
),
# By group, by jobs
ExternalAuthConfig(
eauth="pam",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@jobs"],
expected_perms=["@jobs"],
fixture_id="by-group-by-@jobs-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@jobs"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-group-by-@jobs-auto",
),
# By user, by wheel/runner/jobs module
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=[{"@runner": ["active"]}],
expected_perms=[{"@runner": ["active"]}],
fixture_id="by-user-by-@wheel/@runner/@jobs-module-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=[{"@runner": ["active"]}],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-by-@wheel/@runner/@jobs-module-auto",
),
# By user, module, args & kwargs
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=[
{
"*": [
{
"my_mod.*": {
"args": ["a1.*", ".*", "a3.*"],
"kwargs": {"kwa": "kwa.*", "kwb": "kwb"},
}
}
]
}
],
expected_perms=[
{
"*": [
{
"my_mod.*": {
"args": ["a1.*", ".*", "a3.*"],
"kwargs": {"kwa": "kwa.*", "kwb": "kwb"},
}
}
]
}
],
fixture_id="by-user-by-module-args-kwargs-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=[
{
"*": [
{
"my_mod.*": {
"args": ["a1.*", ".*", "a3.*"],
"kwargs": {"kwa": "kwa.*", "kwb": "kwb"},
}
}
]
}
],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-by-module-args-kwargs-auto",
),
),
ids=external_auth_ids,
)
def external_auth(request):
return request.param
@pytest.fixture
def auth_creds(external_auth, netapi_account):
return {
"username": netapi_account.username,
"password": netapi_account.password,
"eauth": external_auth.eauth,
}
@pytest.fixture
def client_config(client_config, external_auth):
client_config["external_auth"] = {
"auto": external_auth.auto,
"pam": external_auth.pam,
}
return client_config
@pytest.fixture
def http_server(io_loop, app, netapi_port, content_type_map):
with netapi.TestsTornadoHttpServer(
io_loop=io_loop,
app=app,
port=netapi_port,
client_headers={"Content-Type": content_type_map["form"]},
) as server:
yield server
async def test_perms(http_client, auth_creds, external_auth):
response = await http_client.fetch(
"/login",
method="POST",
body=urllib.parse.urlencode(auth_creds),
)
assert response.code == 200
response_obj = salt.utils.json.loads(response.body)["return"][0]
perms = response_obj["perms"]
assert perms == external_auth.expected_perms

View file

@ -0,0 +1,307 @@
import urllib.parse
import attr
import pytest
import salt.utils.json
import salt.utils.yaml
import tests.support.netapi as netapi
from salt.netapi.rest_tornado import saltnado
pytestmark = [
pytest.mark.destructive_test,
pytest.mark.skip_if_not_root,
]
ACCOUNT_USERNAME = "saltdev-syntax"
ACCOUNT_GROUP_NAME = "{}-group".format(ACCOUNT_USERNAME)
@attr.s(frozen=True, slots=True)
class ExternalAuthConfig:
eauth = attr.ib()
pam_key = attr.ib(repr=False)
pam_config = attr.ib(repr=False)
expected_perms = attr.ib(repr=False)
fixture_id = attr.ib(repr=False)
auto = attr.ib(init=False)
pam = attr.ib(init=False)
@auto.default
def _set_auto(self):
return {
"*": ["grains.*"],
ACCOUNT_USERNAME: ["@wheel"],
"{}%".format(ACCOUNT_GROUP_NAME): ["@runner"],
}
@pam.default
def _set_pam(self):
return {self.pam_key: self.pam_config}
@pytest.fixture(scope="module")
def netapi_account():
with pytest.helpers.create_account(
username=ACCOUNT_USERNAME, password="saltdev", group_name=ACCOUNT_GROUP_NAME
) as account:
yield account
def external_auth_ids(value):
return value.fixture_id
@pytest.fixture(
params=(
# By User
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=["test.*"],
expected_perms=["test.*"],
fixture_id="by-user-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=["test.*"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-auto",
),
# By Group
ExternalAuthConfig(
eauth="pam",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["grains.*"],
expected_perms=["grains.*"],
fixture_id="by-group-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@wheel", "grains.*"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-group-auto",
),
# By user, by minion
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=[{"G@id:master2": ["@jobs"]}, {"G@id:master1": ["@jobs"]}],
expected_perms=[{"G@id:master1": ["@jobs"]}, {"G@id:master2": ["@jobs"]}],
fixture_id="by-user-by-minion-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=[{"G@id:master2": ["@jobs"]}, {"G@id:master1": ["@jobs"]}],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-by-minion-auto",
),
# By user, by wheel
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=["@wheel"],
expected_perms=["@wheel"],
fixture_id="by-user-by-@wheel-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=["@wheel"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-by-@wheel-auto",
),
# By user, by runner
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=["@runner"],
expected_perms=["@runner"],
fixture_id="by-user-by-@runner-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=["@runner"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-by-@runner-auto",
),
# By user, by jobs
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=["@jobs"],
expected_perms=["@jobs"],
fixture_id="by-user-by-@jobs-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=["@jobs"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-by-@jobs-auto",
),
# By group, by wheel
ExternalAuthConfig(
eauth="pam",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@wheel"],
expected_perms=["@wheel"],
fixture_id="by-group-by-@wheel-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@wheel"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-group-by-@wheel-auto",
),
# By group, by runner
ExternalAuthConfig(
eauth="pam",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@runner"],
expected_perms=["@runner"],
fixture_id="by-group-by-@runner-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@runner"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-group-by-@runner-auto",
),
# By group, by jobs
ExternalAuthConfig(
eauth="pam",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@jobs"],
expected_perms=["@jobs"],
fixture_id="by-group-by-@jobs-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key="{}%".format(ACCOUNT_GROUP_NAME),
pam_config=["@jobs"],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-group-by-@jobs-auto",
),
# By user, by wheel/runner/jobs module
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=[{"@runner": ["active"]}],
expected_perms=[{"@runner": ["active"]}],
fixture_id="by-user-by-@wheel/@runner/@jobs-module-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=[{"@runner": ["active"]}],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-by-@wheel/@runner/@jobs-module-auto",
),
# By user, module, args & kwargs
ExternalAuthConfig(
eauth="pam",
pam_key=ACCOUNT_USERNAME,
pam_config=[
{
"*": [
{
"my_mod.*": {
"args": ["a1.*", ".*", "a3.*"],
"kwargs": {"kwa": "kwa.*", "kwb": "kwb"},
}
}
]
}
],
expected_perms=[
{
"*": [
{
"my_mod.*": {
"args": ["a1.*", ".*", "a3.*"],
"kwargs": {"kwa": "kwa.*", "kwb": "kwb"},
}
}
]
}
],
fixture_id="by-user-by-module-args-kwargs-pam",
),
ExternalAuthConfig(
eauth="auto",
pam_key=ACCOUNT_USERNAME,
pam_config=[
{
"*": [
{
"my_mod.*": {
"args": ["a1.*", ".*", "a3.*"],
"kwargs": {"kwa": "kwa.*", "kwb": "kwb"},
}
}
]
}
],
expected_perms=["@wheel", "grains.*"],
fixture_id="by-user-by-module-args-kwargs-auto",
),
),
ids=external_auth_ids,
)
def external_auth(request):
return request.param
@pytest.fixture
def auth_creds(external_auth, netapi_account):
return {
"username": netapi_account.username,
"password": netapi_account.password,
"eauth": external_auth.eauth,
}
@pytest.fixture
def client_config(client_config, external_auth):
client_config["external_auth"] = {
"auto": external_auth.auto,
"pam": external_auth.pam,
}
return client_config
@pytest.fixture
def http_server(io_loop, app, netapi_port, content_type_map):
with netapi.TestsTornadoHttpServer(
io_loop=io_loop,
app=app,
port=netapi_port,
client_headers={"Content-Type": content_type_map["form"]},
) as server:
yield server
@pytest.fixture
def app_urls():
return [
("/login", saltnado.SaltAuthHandler),
]
async def test_perms(http_client, auth_creds, external_auth):
response = await http_client.fetch(
"/login",
method="POST",
body=urllib.parse.urlencode(auth_creds),
)
assert response.code == 200
response_obj = salt.utils.json.loads(response.body)["return"][0]
perms = response_obj["perms"]
assert perms == external_auth.expected_perms