fix: Enable port modification in state selinux.port_policy_present

This commit is contained in:
Jason Woods 2020-02-14 14:11:26 +00:00 committed by Daniel Wozniak
parent c1ac77d842
commit a1ae9cccf5

View file

@ -1,5 +1,6 @@
"""
:codeauthor: Jayesh Kariya <jayeshk@saltstack.com>
:codeauthor: Jason Woods <devel@jasonwoods.me.uk>
"""
import pytest
@ -118,3 +119,337 @@ def test_boolean():
ret.update({"comment": comt, "result": False})
ret.update({"changes": {}})
assert selinux.boolean(name, value) == ret
def test_port_policy_present():
"""
Test to set up an SELinux port.
"""
name = "tcp/8080"
protocol = "tcp"
port = "8080"
ret = {"name": name, "changes": {}, "result": False, "comment": ""}
# Test when already present with same sel_type
mock_add = MagicMock(return_value={"retcode": 0})
mock_modify = MagicMock(return_value={"retcode": 0})
mock_get = MagicMock(
return_value={
"sel_type": "http_cache_port_t",
"protocol": "tcp",
"port": "8080",
}
)
with patch.dict(
selinux.__salt__,
{
"selinux.port_get_policy": mock_get,
"selinux.port_add_policy": mock_add,
"selinux.port_modify_policy": mock_modify,
},
):
with patch.dict(selinux.__opts__, {"test": False}):
comt = (
f'SELinux policy for "{name}" already present '
+ f'with specified sel_type "http_cache_port_t", protocol "None" '
+ f'and port "None".'
)
ret.update({"comment": comt, "result": True})
assert selinux.port_policy_present(name, "http_cache_port_t") == ret
comt = (
f'SELinux policy for "name" already present '
+ f'with specified sel_type "http_cache_port_t", protocol "{protocol}" '
+ f'and port "{port}".'
)
ret.update({"comment": comt, "changes": {}, "result": True, "name": "name"})
assert (
selinux.port_policy_present("name", "http_cache_port_t", protocol, port)
== ret
)
ret.update({"name": name})
# Test adding new port policy
mock_add = MagicMock(return_value={"retcode": 0})
mock_modify = MagicMock(return_value={"retcode": 0})
mock_get = MagicMock(
side_effect=[
None,
None,
None,
{"sel_type": "http_cache_port_t", "protocol": "tcp", "port": "8080"},
]
)
with patch.dict(
selinux.__salt__,
{
"selinux.port_get_policy": mock_get,
"selinux.port_add_policy": mock_add,
"selinux.port_modify_policy": mock_modify,
},
):
with patch.dict(selinux.__opts__, {"test": True}):
ret.update({"comment": "", "result": None})
assert selinux.port_policy_present(name, "http_cache_port_t") == ret
with patch.dict(selinux.__opts__, {"test": False}):
ret.update(
{
"comment": "",
"changes": {
"old": None,
"new": {
"sel_type": "http_cache_port_t",
"protocol": "tcp",
"port": "8080",
},
},
"result": True,
}
)
assert selinux.port_policy_present(name, "http_cache_port_t") == ret
# Test modifying policy to a new sel_type
mock_add = MagicMock(return_value={"retcode": 0})
mock_modify = MagicMock(return_value={"retcode": 0})
mock_get = MagicMock(
side_effect=[
None,
None,
{"sel_type": "http_cache_port_t", "protocol": "tcp", "port": "8080"},
{"sel_type": "http_port_t", "protocol": "tcp", "port": "8080"},
]
)
with patch.dict(
selinux.__salt__,
{
"selinux.port_get_policy": mock_get,
"selinux.port_add_policy": mock_add,
"selinux.port_modify_policy": mock_modify,
},
):
with patch.dict(selinux.__opts__, {"test": True}):
ret.update({"comment": "", "changes": {}, "result": None})
assert selinux.port_policy_present(name, "http_port_t") == ret
with patch.dict(selinux.__opts__, {"test": False}):
ret.update(
{
"comment": "",
"changes": {
"old": {
"sel_type": "http_cache_port_t",
"protocol": "tcp",
"port": "8080",
},
"new": {
"sel_type": "http_port_t",
"protocol": "tcp",
"port": "8080",
},
},
"result": True,
}
)
assert selinux.port_policy_present(name, "http_port_t") == ret
# Test adding new port policy with custom name and using protocol and port parameters
mock_add = MagicMock(return_value={"retcode": 0})
mock_modify = MagicMock(return_value={"retcode": 0})
mock_get = MagicMock(
side_effect=[
None,
None,
{"sel_type": "http_cache_port_t", "protocol": "tcp", "port": "8081"},
]
)
with patch.dict(
selinux.__salt__,
{
"selinux.port_get_policy": mock_get,
"selinux.port_add_policy": mock_add,
"selinux.port_modify_policy": mock_modify,
},
):
with patch.dict(selinux.__opts__, {"test": False}):
ret.update(
{
"name": "required_protocol_port",
"comment": "",
"changes": {
"old": None,
"new": {
"sel_type": "http_cache_port_t",
"protocol": "tcp",
"port": "8081",
},
},
"result": True,
}
)
assert (
selinux.port_policy_present(
"required_protocol_port",
"http_cache_port_t",
protocol="tcp",
port="8081",
)
== ret
)
# Test failure of adding new policy
mock_add = MagicMock(return_value={"retcode": 1})
mock_modify = MagicMock(return_value={"retcode": 1})
mock_get = MagicMock(return_value=None)
with patch.dict(
selinux.__salt__,
{
"selinux.port_get_policy": mock_get,
"selinux.port_add_policy": mock_add,
"selinux.port_modify_policy": mock_modify,
},
):
with patch.dict(selinux.__opts__, {"test": False}):
comt = "Error adding new policy: {'retcode': 1}"
ret.update({"name": name, "comment": comt, "changes": {}, "result": False})
assert selinux.port_policy_present(name, "http_cache_port_t") == ret
def test_port_policy_absent():
"""
Test to delete an SELinux port.
"""
name = "tcp/8080"
protocol = "tcp"
port = "8080"
ret = {"name": name, "changes": {}, "result": False, "comment": ""}
# Test policy already removed
mock_delete = MagicMock(return_value={"retcode": 0})
mock_get = MagicMock(return_value=None)
with patch.dict(
selinux.__salt__,
{
"selinux.port_get_policy": mock_get,
"selinux.port_delete_policy": mock_delete,
},
):
with patch.dict(selinux.__opts__, {"test": False}):
comt = (
f'SELinux policy for "{name}" already absent '
+ f'with specified sel_type "http_cache_port_t", protocol "None" '
+ f'and port "None".'
)
ret.update({"comment": comt, "changes": {}, "result": True})
assert selinux.port_policy_absent(name, "http_cache_port_t") == ret
comt = (
f'SELinux policy for "name" already absent '
+ f'with specified sel_type "http_cache_port_t", protocol "{protocol}" '
+ f'and port "{port}".'
)
ret.update({"comment": comt, "changes": {}, "result": True, "name": "name"})
assert (
selinux.port_policy_absent("name", "http_cache_port_t", protocol, port)
== ret
)
ret.update({"name": name})
# Test removing a policy
mock_delete = MagicMock(return_value={"retcode": 0})
mock_get = MagicMock(
side_effect=[
{"sel_type": "http_cache_port_t", "protocol": "tcp", "port": "8080"},
{"sel_type": "http_cache_port_t", "protocol": "tcp", "port": "8080"},
None,
]
)
with patch.dict(
selinux.__salt__,
{
"selinux.port_get_policy": mock_get,
"selinux.port_delete_policy": mock_delete,
},
):
with patch.dict(selinux.__opts__, {"test": True}):
ret.update({"comment": "", "result": None})
assert selinux.port_policy_absent(name, "http_cache_port_t") == ret
with patch.dict(selinux.__opts__, {"test": False}):
ret.update(
{
"comment": "",
"changes": {
"old": {
"sel_type": "http_cache_port_t",
"protocol": "tcp",
"port": "8080",
},
"new": None,
},
"result": True,
}
)
assert selinux.port_policy_absent(name, "http_cache_port_t") == ret
# Test removing a policy using custom name and with protocol and port parameters
mock_delete = MagicMock(return_value={"retcode": 0})
mock_get = MagicMock(
side_effect=[
{"sel_type": "http_cache_port_t", "protocol": "tcp", "port": "8081"},
None,
]
)
with patch.dict(
selinux.__salt__,
{
"selinux.port_get_policy": mock_get,
"selinux.port_delete_policy": mock_delete,
},
):
with patch.dict(selinux.__opts__, {"test": False}):
ret.update(
{
"name": "required_protocol_port",
"comment": "",
"changes": {
"old": {
"sel_type": "http_cache_port_t",
"protocol": "tcp",
"port": "8081",
},
"new": None,
},
"result": True,
}
)
assert (
selinux.port_policy_absent(
"required_protocol_port",
"http_cache_port_t",
protocol="tcp",
port="8081",
)
== ret
)
# Test failure to delete a policy
mock_delete = MagicMock(return_value={"retcode": 2})
mock_get = MagicMock(
return_value={
"sel_type": "http_cache_port_t",
"protocol": "tcp",
"port": "8080",
}
)
with patch.dict(
selinux.__salt__,
{
"selinux.port_get_policy": mock_get,
"selinux.port_delete_policy": mock_delete,
},
):
with patch.dict(selinux.__opts__, {"test": False}):
comt = "Error deleting policy: {'retcode': 2}"
ret.update({"name": name, "comment": comt, "changes": {}, "result": False})
assert selinux.port_policy_absent(name, "http_cache_port_t") == ret