Add unit tests for vault util

This commit is contained in:
Christian McHugh 2020-05-04 09:56:33 +01:00 committed by Daniel Wozniak
parent ca7cbd1aa5
commit 075fef3b3d
2 changed files with 337 additions and 118 deletions

View file

@ -248,8 +248,6 @@ def get_cache():
if not connection or "url" not in connection:
return _gen_new_connection()
unlimited_uses = connection.get("unlimited_use_token", False)
# Drop 10 seconds from ttl to be safe
ttl10 = connection["issued"] + connection["lease_duration"] - 10
cur_time = int(round(time.time()))
@ -262,6 +260,7 @@ def get_cache():
else:
log.debug("Token has not expired {} > {}".format(ttl10, cur_time))
unlimited_uses = connection.get("unlimited_use_token", False)
if not unlimited_uses:
current_uses = connection.get("uses", 1)
log.debug("Token has {} uses left".format(current_uses))

View file

@ -6,10 +6,13 @@ Test case for the vault utils module
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
from copy import copy
# Import Salt libs
import salt.utils.vault as vault
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import Mock, patch
from tests.support.mock import ANY, MagicMock, Mock, mock_open, patch
# Import Salt Testing libs
from tests.support.unit import TestCase
@ -29,6 +32,78 @@ class TestVaultUtils(LoaderModuleMockMixin, TestCase):
Test case for the vault utils module
"""
json_success = {
"request_id": "35df4df1-c3d8-b270-0682-ddb0160c7450",
"lease_id": "",
"renewable": False,
"lease_duration": 0,
"data": {
"data": {"something": "myvalue"},
"metadata": {
"created_time": "2020-05-02T07:26:12.180848003Z",
"deletion_time": "",
"destroyed": False,
"version": 1,
},
},
"wrap_info": None,
"warnings": None,
"auth": None,
}
json_denied = {"errors": ["permission denied"]}
cache_single = {
"url": "http://127.0.0.1:8200",
"token": "test",
"verify": None,
"uses": 1,
"lease_duration": 100,
"issued": 3000,
}
cache_uses = {
"url": "http://127.0.0.1:8200",
"token": "test",
"verify": None,
"uses": 10,
"lease_duration": 100,
"issued": 3000,
"unlimited_use_token": False,
}
cache_uses_last = {
"url": "http://127.0.0.1:8200",
"token": "test",
"verify": None,
"uses": 1,
"lease_duration": 100,
"issued": 3000,
"unlimited_use_token": False,
}
cache_unlimited = {
"url": "http://127.0.0.1:8200",
"token": "test",
"verify": None,
"uses": 0,
"lease_duration": 100,
"issued": 3000,
"unlimited_use_token": True,
}
metadata_v2 = {
"accessor": "kv_f8731f1b",
"config": {
"default_lease_ttl": 0,
"force_no_cache": False,
"max_lease_ttl": 0,
},
"description": "key/value secret storage",
"external_entropy_access": False,
"local": False,
"options": {"version": "2"},
"path": "secret/",
"seal_wrap": False,
"type": "kv",
"uuid": "1d9431ac-060a-9b63-4572-3ca7ffd78347",
}
cache_secret_meta = {"vault_secret_path_metadata": {"secret/mything": metadata_v2}}
def setup_loader_modules(self):
return {
vault: {
@ -43,143 +118,288 @@ class TestVaultUtils(LoaderModuleMockMixin, TestCase):
},
},
"file_client": "local",
"cachedir": "somepath",
},
"__grains__": {"id": "test-minion"},
"requests": RequestMock(),
"__context__": {},
}
}
def test_make_request_no_cache(self):
def _mock_json_response(self, data, status_code=200, reason=""):
"""
Given no cache, function should request token and populate cache
Mock helper for http response
"""
expected_context = {
"salt_vault_token": {
"url": "http://127.0.0.1",
"token": "test",
"verify": None,
"issued": 1234,
"ttl": 3600,
}
}
with patch("time.time", return_value=1234):
vault_return = vault.make_request("/secret/my/secret", "key")
self.assertEqual(vault.__context__, expected_context)
response = MagicMock()
response.json = MagicMock(return_value=data)
response.status_code = status_code
response.reason = reason
if status_code == 200:
response.ok = True
else:
response.ok = False
return Mock(return_value=response)
def test_make_request_ttl_cache(self):
def test_make_request_single_use_token_run_ok(self):
"""
Given a valid issued date (greater than time.time result), cache should be re-used
Given single use token in __context__, function should run successful secret lookup with no other modifications
"""
local_context = {
"salt_vault_token": {
"issued": 3000,
"lease_duration": 20,
"token": "atest",
"url": "http://127.1.1.1",
}
}
expected_context = {
"salt_vault_token": {
"token": "atest",
"issued": 3000,
"url": "http://127.1.1.1",
"lease_duration": 20,
}
}
with patch("time.time", return_value=1234):
with patch.dict(vault.__context__, local_context):
mock = self._mock_json_response(self.json_success)
expected_context = {"vault_token": self.cache_single}
expected_headers = {"X-Vault-Token": "test", "Content-Type": "application/json"}
with patch.dict(vault.__context__, expected_context):
with patch("requests.request", mock):
vault_return = vault.make_request("/secret/my/secret", "key")
self.assertDictEqual(vault.__context__, expected_context)
self.assertEqual(vault.__context__, expected_context)
mock.assert_called_with(
"/secret/my/secret",
"http://127.0.0.1:8200/key",
headers=expected_headers,
verify=ANY,
)
self.assertEqual(vault_return.json(), self.json_success)
def test_make_request_expired_ttl_cache(self):
def test_make_request_single_use_token_run_auth_error(self):
"""
Given an expired issued date, function should notice and regenerate token and populate cache
Given single use token in __context__ and login error, function should request token and re-run
"""
local_context = {
"salt_vault_token": {
"issued": 1000,
"lease_duration": 20,
"token": "atest",
"url": "http://127.1.1.1",
}
}
expected_context = {
"salt_vault_token": {
"token": "atest",
"issued": 3000,
"url": "http://127.1.1.1",
"lease_duration": 20,
}
}
with patch("time.time", return_value=1234):
with patch.dict(vault.__context__, local_context):
with patch.object(
vault,
"get_vault_connection",
return_value=expected_context["salt_vault_token"],
):
logging.disable(logging.CRITICAL)
mock = self._mock_json_response(self.json_denied, status_code=400)
expected_context = {"vault_token": self.cache_single}
expected_headers = {"X-Vault-Token": "test", "Content-Type": "application/json"}
with patch.dict(vault.__context__, expected_context):
with patch("requests.request", mock):
with patch.object(vault, "del_cache") as mock_del_cache:
vault_return = vault.make_request("/secret/my/secret", "key")
self.assertDictEqual(vault.__context__, expected_context)
self.assertEqual(vault.__context__, expected_context)
mock.assert_called_with(
"/secret/my/secret",
"http://127.0.0.1:8200/key",
headers=expected_headers,
verify=ANY,
)
self.assertEqual(vault_return.json(), self.json_denied)
mock_del_cache.assert_called()
self.assertEqual(mock.call_count, 2)
logging.disable(logging.NOTSET)
def test_make_request_expired_uses_cache(self):
def test_multi_use_token_successful_run(self):
"""
Given 0 cached uses left, function should notice and regenerate token and populate cache
Given multi-use token, function should get secret and decrement token
"""
local_context = {
"salt_vault_token": {
"uses": 0,
"issued": 1000,
"lease_duration": 20,
"token": "atest",
"url": "http://127.1.1.1",
}
expected_cache_write = {
"url": "http://127.0.0.1:8200",
"token": "test",
"verify": None,
"uses": 9,
"lease_duration": 100,
"issued": 3000,
"unlimited_use_token": False,
}
expected_context = {
"salt_vault_token": {
"token": "atest",
"issued": 3000,
"url": "http://127.1.1.1",
"lease_duration": 20,
}
}
with patch.dict(vault.__context__, local_context):
mock = self._mock_json_response(self.json_success)
expected_headers = {"X-Vault-Token": "test", "Content-Type": "application/json"}
with patch.object(vault, "get_cache") as mock_get_cache:
mock_get_cache.return_value = copy(self.cache_uses)
with patch("requests.request", mock):
with patch.object(vault, "del_cache") as mock_del_cache:
with patch.object(vault, "write_cache") as mock_write_cache:
vault_return = vault.make_request("/secret/my/secret", "key")
mock.assert_called_with(
"/secret/my/secret",
"http://127.0.0.1:8200/key",
headers=expected_headers,
verify=ANY,
)
mock_write_cache.assert_called_with(expected_cache_write)
self.assertEqual(vault_return.json(), self.json_success)
self.assertEqual(mock.call_count, 1)
def test_multi_use_token_last_use(self):
"""
Given last use of multi-use token, function should succeed and flush token cache
"""
mock = self._mock_json_response(self.json_success)
expected_headers = {"X-Vault-Token": "test", "Content-Type": "application/json"}
with patch.object(vault, "get_cache") as mock_get_cache:
mock_get_cache.return_value = self.cache_uses_last
with patch("requests.request", mock):
with patch.object(vault, "del_cache") as mock_del_cache:
with patch.object(vault, "write_cache") as mock_write_cache:
vault_return = vault.make_request("/secret/my/secret", "key")
mock.assert_called_with(
"/secret/my/secret",
"http://127.0.0.1:8200/key",
headers=expected_headers,
verify=ANY,
)
mock_del_cache.assert_called()
self.assertEqual(vault_return.json(), self.json_success)
self.assertEqual(mock.call_count, 1)
def test_unlimited_use_token_no_decrement(self):
"""
Given unlimited-use token, function should succeed not del cache or decrement
"""
mock = self._mock_json_response(self.json_success)
expected_headers = {"X-Vault-Token": "test", "Content-Type": "application/json"}
with patch.object(vault, "get_cache") as mock_get_cache:
mock_get_cache.return_value = self.cache_unlimited
with patch("requests.request", mock):
with patch.object(vault, "del_cache") as mock_del_cache:
with patch.object(vault, "write_cache") as mock_write_cache:
vault_return = vault.make_request("/secret/my/secret", "key")
mock.assert_called_with(
"/secret/my/secret",
"http://127.0.0.1:8200/key",
headers=expected_headers,
verify=ANY,
)
assert (
not mock_del_cache.called
), "del cache should not be called for unlimited use token"
assert (
not mock_write_cache.called
), "write cache should not be called for unlimited use token"
self.assertEqual(vault_return.json(), self.json_success)
self.assertEqual(mock.call_count, 1)
def test_get_cache_standard(self):
"""
test standard first run of no cache file. Should generate new connection and write cache
"""
with patch.object(vault, "_read_cache_file") as mock_read_cache:
mock_read_cache.return_value = {}
with patch.object(
vault,
"get_vault_connection",
return_value=expected_context["salt_vault_token"],
):
vault_return = vault.make_request("/secret/my/secret", "key")
self.assertDictEqual(vault.__context__, expected_context)
vault, "get_vault_connection"
) as mock_get_vault_connection:
mock_get_vault_connection.return_value = self.cache_single
with patch.object(vault, "write_cache") as mock_write_cache:
cache_result = vault.get_cache()
mock_write_cache.assert_called_with(self.cache_single)
def test_make_request_remaining_uses_cache(self):
def test_get_cache_existing_cache_valid(self):
"""
Given remaining uses, function should reuse cache
test standard valid cache file
"""
local_context = {
"salt_vault_token": {
"uses": 3,
"issued": 3000,
"lease_duration": 20,
"token": "atest",
"url": "http://127.1.1.1",
}
}
expected_context = {
"salt_vault_token": {
"uses": 2,
"token": "atest",
"issued": 3000,
"url": "http://127.1.1.1",
"lease_duration": 20,
}
}
with patch("time.time", return_value=1234):
with patch.dict(vault.__context__, local_context):
with patch.object(
vault,
"get_vault_connection",
return_value=expected_context["salt_vault_token"],
):
vault_return = vault.make_request("/secret/my/secret", "key")
self.assertDictEqual(vault.__context__, expected_context)
with patch.object(vault, "_read_cache_file") as mock_read_cache:
mock_read_cache.return_value = self.cache_uses
with patch.object(vault, "write_cache") as mock_write_cache:
with patch.object(vault, "del_cache") as mock_del_cache:
cache_result = vault.get_cache()
assert not mock_write_cache.called
assert not mock_del_cache.called
self.assertEqual(cache_result, self.cache_uses)
def test_get_cache_existing_cache_old(self):
"""
test old cache file
"""
with patch("time.time", return_value=3101):
with patch.object(
vault, "get_vault_connection"
) as mock_get_vault_connection:
mock_get_vault_connection.return_value = self.cache_uses
with patch.object(vault, "_read_cache_file") as mock_read_cache:
mock_read_cache.return_value = self.cache_uses
with patch.object(vault, "write_cache") as mock_write_cache:
with patch.object(vault, "del_cache") as mock_del_cache:
cache_result = vault.get_cache()
assert mock_del_cache.called
assert mock_write_cache.called
self.assertEqual(cache_result, self.cache_uses)
def test_write_cache_standard(self):
"""
Test write cache with standard single use token
"""
function_response = vault.write_cache(self.cache_single)
self.assertEqual(vault.__context__["vault_token"], self.cache_single)
self.assertTrue(function_response)
def test_write_cache_multi_use_token(self):
"""
Test write cache with multi-use token
"""
expected_write = [
'{"url": "http://127.0.0.1:8200", "token": "test", "verify": null, "uses": 10, "lease_duration": 100, "issued": 3000, "unlimited_use_token": false}'
]
with patch("salt.utils.files.fpopen", mock_open()) as mock_fpopen:
function_response = vault.write_cache(self.cache_uses)
assert mock_fpopen.call_count == 1
self.assertEqual(
list(mock_fpopen.filehandles), ["somepath/salt_vault_token"]
)
opens = mock_fpopen.filehandles["somepath/salt_vault_token"]
self.assertEqual(opens[0].write_calls, expected_write)
self.assertTrue(function_response)
def test_write_cache_unlimited_token(self):
"""
Test write cache with unlimited use token
"""
write_data = {
"url": "http://127.0.0.1:8200",
"token": "test",
"verify": None,
"uses": 0,
"lease_duration": 100,
"issued": 3000,
}
expected_write = [
'{"url": "http://127.0.0.1:8200", "token": "test", "verify": null, "uses": 0, "lease_duration": 100, "issued": 3000, "unlimited_use_token": true}'
]
with patch("salt.utils.files.fpopen", mock_open()) as mock_fpopen:
function_response = vault.write_cache(write_data)
assert mock_fpopen.call_count == 1
self.assertEqual(
list(mock_fpopen.filehandles), ["somepath/salt_vault_token"]
)
opens = mock_fpopen.filehandles["somepath/salt_vault_token"]
self.assertEqual(opens[0].write_calls, expected_write)
self.assertTrue(function_response)
def test_path_is_v2(self):
"""
Validated v2 path is detected as vault kv v2
"""
expected_return = {
"v2": True,
"data": "secret/data/mything",
"metadata": "secret/metadata/mything",
"delete": "secret/mything",
"type": "kv",
"destroy": "secret/destroy/mything",
}
with patch.object(vault, "_get_secret_path_metadata") as mock_get_metadata:
mock_get_metadata.return_value = self.metadata_v2
function_return = vault.is_v2("secret/mything")
self.assertEqual(function_return, expected_return)
def test_get_secret_path_metadata_no_cache(self):
"""
test with no cache file
"""
make_request_response = {
"request_id": "b82f2df7-a9b6-920c-0ed2-a3463b996f9e",
"lease_id": "",
"renewable": False,
"lease_duration": 0,
"data": self.metadata_v2,
"wrap_info": None,
"warnings": None,
"auth": None,
}
cache_object = copy(self.cache_uses)
expected_cache_object = copy(self.cache_uses)
expected_cache_object.update(copy(self.cache_secret_meta))
secret_path = "secret/mything"
mock = self._mock_json_response(make_request_response)
with patch.object(vault, "_read_cache_file") as mock_read_cache:
mock_read_cache.return_value = cache_object
with patch.object(vault, "write_cache") as mock_write_cache:
with patch("salt.utils.vault.make_request", mock):
# with patch.object(vault, "make_request") as mock_make_request:
function_result = vault._get_secret_path_metadata(secret_path)
self.assertEqual(function_result, self.metadata_v2)
mock_write_cache.assert_called_with(cache_object)
self.assertEqual(cache_object, expected_cache_object)