Azure Blob ext_pillar (#55493)

* Fixes #54374

* Added tests for #55493

* Fixes #54374

* Added tests for #55493

* Fixed lint issue

* Add pylint exception

* Fixes #54374

* Added tests for #55493

* Fixed lint issue

* Add pylint exception

* Blacken Salt

* Added sphinx stub

* Fixed sphinx stub

Co-authored-by: Akmod <tjohnson@saltstack.com>
Co-authored-by: Daniel Wozniak <dwozniak@saltstack.com>
This commit is contained in:
Alex Rothman 2020-04-26 23:50:13 -04:00 committed by GitHub
parent 396499c18a
commit 507998c1b0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 810 additions and 0 deletions

View file

@ -10,6 +10,7 @@ pillar modules
:toctree:
:template: autosummary.rst.tmpl
azureblob
cmd_json
cmd_yaml
cmd_yamlex

View file

@ -0,0 +1,6 @@
=====================
salt.pillar.azureblob
=====================
.. automodule:: salt.pillar.azureblob
:members:

475
salt/pillar/azureblob.py Normal file
View file

@ -0,0 +1,475 @@
# -*- coding: utf-8 -*-
"""
Use Azure Blob as a Pillar source.
.. versionadded:: Sodium
:maintainer: <devops@eitr.tech>
:maturity: new
:depends:
* `azure-storage-blob <https://pypi.org/project/azure-storage-blob/>`_ >= 12.0.0
The Azure Blob ext_pillar can be configured with the following parameters:
.. code-block:: yaml
ext_pillar:
- azureblob:
container: 'test_container'
connection_string: 'connection_string'
multiple_env: False
environment: 'base'
blob_cache_expire: 30
blob_sync_on_update: True
:param container: The name of the target Azure Blob Container.
:param connection_string: The connection string to use to access the specified Azure Blob Container.
:param multiple_env: Specifies whether the pillar should interpret top level folders as pillar environments.
Defaults to false.
:param environment: Specifies which environment the container represents when in single environment mode. Defaults
to 'base' and is ignored if multiple_env is set as True.
:param blob_cache_expire: Specifies expiration time of the Azure Blob metadata cache file. Defaults to 30s.
:param blob_sync_on_update: Specifies if the cache is synced on update. Defaults to True.
"""
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
import os
import pickle
import time
from copy import deepcopy
import salt.utils.files
import salt.utils.hashutils
# Import 3rd-party libs
# pylint: disable=import-error,no-name-in-module,redefined-builtin
from salt.ext import six
from salt.ext.six.moves import filter
# Import Salt libs
from salt.pillar import Pillar
# pylint: enable=import-error,no-name-in-module,redefined-builtin
# Import Azure libs
HAS_LIBS = False
try:
from azure.storage.blob import BlobServiceClient
HAS_LIBS = True
except ImportError:
pass
__virtualname__ = "azureblob"
# Set up logging
log = logging.getLogger(__name__)
def __virtual__():
if not HAS_LIBS:
return (
False,
"The following dependency is required to use the Azure Blob ext_pillar: "
"Microsoft Azure Storage Blob >= 12.0.0 ",
)
return __virtualname__
def ext_pillar(
minion_id,
pillar, # pylint: disable=W0613
container,
connection_string,
multiple_env=False,
environment="base",
blob_cache_expire=30,
blob_sync_on_update=True,
):
"""
Execute a command and read the output as YAML.
:param container: The name of the target Azure Blob Container.
:param connection_string: The connection string to use to access the specified Azure Blob Container.
:param multiple_env: Specifies whether the pillar should interpret top level folders as pillar environments.
Defaults to false.
:param environment: Specifies which environment the container represents when in single environment mode. Defaults
to 'base' and is ignored if multiple_env is set as True.
:param blob_cache_expire: Specifies expiration time of the Azure Blob metadata cache file. Defaults to 30s.
:param blob_sync_on_update: Specifies if the cache is synced on update. Defaults to True.
"""
# normpath is needed to remove appended '/' if root is empty string.
pillar_dir = os.path.normpath(
os.path.join(_get_cache_dir(), environment, container)
)
if __opts__["pillar_roots"].get(environment, []) == [pillar_dir]:
return {}
metadata = _init(
connection_string, container, multiple_env, environment, blob_cache_expire
)
log.debug("Blob metadata: %s", metadata)
if blob_sync_on_update:
# sync the containers to the local cache
log.info("Syncing local pillar cache from Azure Blob...")
for saltenv, env_meta in six.iteritems(metadata):
for container, files in six.iteritems(_find_files(env_meta)):
for file_path in files:
cached_file_path = _get_cached_file_name(
container, saltenv, file_path
)
log.info("%s - %s : %s", container, saltenv, file_path)
# load the file from Azure Blob if not in the cache or too old
_get_file_from_blob(
connection_string,
metadata,
saltenv,
container,
file_path,
cached_file_path,
)
log.info("Sync local pillar cache from Azure Blob completed.")
opts = deepcopy(__opts__)
opts["pillar_roots"][environment] = (
[os.path.join(pillar_dir, environment)] if multiple_env else [pillar_dir]
)
# Avoid recursively re-adding this same pillar
opts["ext_pillar"] = [x for x in opts["ext_pillar"] if "azureblob" not in x]
pil = Pillar(opts, __grains__, minion_id, environment)
compiled_pillar = pil.compile_pillar(ext=False)
return compiled_pillar
def _init(connection_string, container, multiple_env, environment, blob_cache_expire):
"""
.. versionadded:: Sodium
Connect to Blob Storage and download the metadata for each file in all containers specified and
cache the data to disk.
:param connection_string: The connection string to use to access the specified Azure Blob Container.
:param container: The name of the target Azure Blob Container.
:param multiple_env: Specifies whether the pillar should interpret top level folders as pillar environments.
Defaults to false.
:param environment: Specifies which environment the container represents when in single environment mode. Defaults
to 'base' and is ignored if multiple_env is set as True.
:param blob_cache_expire: Specifies expiration time of the Azure Blob metadata cache file. Defaults to 30s.
"""
cache_file = _get_containers_cache_filename(container)
exp = time.time() - blob_cache_expire
# Check if cache_file exists and its mtime
if os.path.isfile(cache_file):
cache_file_mtime = os.path.getmtime(cache_file)
else:
# If the file does not exist then set mtime to 0 (aka epoch)
cache_file_mtime = 0
expired = cache_file_mtime <= exp
log.debug(
"Blob storage container cache file %s is %sexpired, mtime_diff=%ss, expiration=%ss",
cache_file,
"" if expired else "not ",
cache_file_mtime - exp,
blob_cache_expire,
)
if expired:
pillars = _refresh_containers_cache_file(
connection_string, container, cache_file, multiple_env, environment
)
else:
pillars = _read_containers_cache_file(cache_file)
log.debug("Blob container retrieved pillars %s", pillars)
return pillars
def _get_cache_dir():
"""
.. versionadded:: Sodium
Get pillar cache directory. Initialize it if it does not exist.
"""
cache_dir = os.path.join(__opts__["cachedir"], "pillar_azureblob")
if not os.path.isdir(cache_dir):
log.debug("Initializing Azure Blob Pillar Cache")
os.makedirs(cache_dir)
return cache_dir
def _get_cached_file_name(container, saltenv, path):
"""
.. versionadded:: Sodium
Return the cached file name for a container path file.
:param container: The name of the target Azure Blob Container.
:param saltenv: Specifies which environment the container represents.
:param path: The path of the file in the container.
"""
file_path = os.path.join(_get_cache_dir(), saltenv, container, path)
# make sure container and saltenv directories exist
if not os.path.exists(os.path.dirname(file_path)):
os.makedirs(os.path.dirname(file_path))
return file_path
def _get_containers_cache_filename(container):
"""
.. versionadded:: Sodium
Return the filename of the cache for container contents. Create the path if it does not exist.
:param container: The name of the target Azure Blob Container.
"""
cache_dir = _get_cache_dir()
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
return os.path.join(cache_dir, "{0}-files.cache".format(container))
def _refresh_containers_cache_file(
connection_string, container, cache_file, multiple_env=False, environment="base"
):
"""
.. versionadded:: Sodium
Downloads the entire contents of an Azure storage container to the local filesystem.
:param connection_string: The connection string to use to access the specified Azure Blob Container.
:param container: The name of the target Azure Blob Container.
:param cache_file: The path of where the file will be cached.
:param multiple_env: Specifies whether the pillar should interpret top level folders as pillar environments.
:param environment: Specifies which environment the container represents when in single environment mode. This is
ignored if multiple_env is set as True.
"""
try:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(
connection_string
)
# Create the ContainerClient object
container_client = blob_service_client.get_container_client(container)
except Exception as exc: # pylint: disable=broad-except
log.error("Exception: %s", exc)
return False
metadata = {}
def _walk_blobs(saltenv="base", prefix=None):
# Walk the blobs in the container with a generator
blob_list = container_client.walk_blobs(name_starts_with=prefix)
# Iterate over the generator
while True:
try:
blob = next(blob_list)
except StopIteration:
break
log.debug("Raw blob attributes: %s", blob)
# Directories end with "/".
if blob.name.endswith("/"):
# Recurse into the directory
_walk_blobs(prefix=blob.name)
continue
if multiple_env:
saltenv = "base" if (not prefix or prefix == ".") else prefix[:-1]
if saltenv not in metadata:
metadata[saltenv] = {}
if container not in metadata[saltenv]:
metadata[saltenv][container] = []
metadata[saltenv][container].append(blob)
_walk_blobs(saltenv=environment)
# write the metadata to disk
if os.path.isfile(cache_file):
os.remove(cache_file)
log.debug("Writing Azure blobs pillar cache file")
with salt.utils.files.fopen(cache_file, "wb") as fp_:
pickle.dump(metadata, fp_)
return metadata
def _read_containers_cache_file(cache_file):
"""
.. versionadded:: Sodium
Return the contents of the containers cache file.
:param cache_file: The path for where the file will be cached.
"""
log.debug("Reading containers cache file")
with salt.utils.files.fopen(cache_file, "rb") as fp_:
data = pickle.load(fp_)
return data
def _find_files(metadata):
"""
.. versionadded:: Sodium
Looks for all the files in the Azure Blob container cache metadata.
:param metadata: The metadata for the container files.
"""
ret = {}
for container, data in six.iteritems(metadata):
if container not in ret:
ret[container] = []
# grab the paths from the metadata
file_paths = [k["name"] for k in data]
# filter out the dirs
ret[container] += [k for k in file_paths if not k.endswith("/")]
return ret
def _find_file_meta(metadata, container, saltenv, path):
"""
.. versionadded:: Sodium
Looks for a file's metadata in the Azure Blob Container cache file.
:param metadata: The metadata for the container files.
:param container: The name of the target Azure Blob Container.
:param saltenv: Specifies which environment the container represents.
:param path: The path of the file in the container.
"""
env_meta = metadata[saltenv] if saltenv in metadata else {}
container_meta = env_meta[container] if container in env_meta else {}
for item_meta in container_meta:
item_meta = dict(item_meta)
if "name" in item_meta and item_meta["name"] == path:
return item_meta
def _get_file_from_blob(
connection_string, metadata, saltenv, container, path, cached_file_path
):
"""
.. versionadded:: Sodium
Downloads the entire contents of an Azure storage container to the local filesystem.
:param connection_string: The connection string to use to access the specified Azure Blob Container.
:param metadata: The metadata for the container files.
:param saltenv: Specifies which environment the container represents when in single environment mode. This is
ignored if multiple_env is set as True.
:param container: The name of the target Azure Blob Container.
:param path: The path of the file in the container.
:param cached_file_path: The path of where the file will be cached.
"""
# check the local cache...
if os.path.isfile(cached_file_path):
file_meta = _find_file_meta(metadata, container, saltenv, path)
file_md5 = (
"".join(list(filter(str.isalnum, file_meta["etag"]))) if file_meta else None
)
cached_md5 = salt.utils.hashutils.get_hash(cached_file_path, "md5")
# hashes match we have a cache hit
log.debug(
"Cached file: path=%s, md5=%s, etag=%s",
cached_file_path,
cached_md5,
file_md5,
)
if cached_md5 == file_md5:
return
try:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(
connection_string
)
# Create the ContainerClient object
container_client = blob_service_client.get_container_client(container)
# Create the BlobClient object
blob_client = container_client.get_blob_client(path)
except Exception as exc: # pylint: disable=broad-except
log.error("Exception: %s", exc)
return False
with salt.utils.files.fopen(cached_file_path, "wb") as outfile:
outfile.write(blob_client.download_blob().readall())
return

View file

@ -0,0 +1,328 @@
# -*- coding: utf-8 -*-
"""
Tests for the Azure Blob External Pillar.
"""
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import os
import pickle
import tempfile
import time
import salt.config
import salt.loader
# Import Salt Libs
import salt.pillar.azureblob as azureblob
import salt.utils.files
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import MagicMock, patch
# Import Salt Testing libs
from tests.support.unit import TestCase, skipIf
# Import Azure libs
HAS_LIBS = False
try:
from azure.storage.blob import BlobServiceClient
HAS_LIBS = True
except ImportError:
pass
class MockBlob(dict):
"""
Creates a Mock Blob object.
"""
name = ""
def __init__(self):
super(MockBlob, self).__init__(
{
"container": None,
"name": "test.sls",
"prefix": None,
"delimiter": "/",
"results_per_page": None,
"location_mode": None,
}
)
class MockContainerClient(object):
"""
Creates a Mock ContainerClient.
"""
def __init__(self):
pass
def walk_blobs(self, *args, **kwargs):
yield MockBlob()
def get_blob_client(self, *args, **kwargs):
pass
class MockBlobServiceClient(object):
"""
Creates a Mock BlobServiceClient.
"""
def __init__(self):
pass
def get_container_client(self, *args, **kwargs):
container_client = MockContainerClient()
return container_client
@skipIf(HAS_LIBS is False, "The azure.storage.blob module must be installed.")
class AzureBlobTestCase(TestCase, LoaderModuleMockMixin):
"""
TestCase for salt.pillar.azureblob ext_pillar.
"""
def setup_loader_modules(self):
self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
utils = salt.loader.utils(self.opts)
return {
azureblob: {"__opts__": self.opts, "__utils__": utils},
}
def test__init_expired(self):
"""
Tests the result of _init when the cache is expired.
"""
container = "test"
multiple_env = False
environment = "base"
blob_cache_expire = 0 # The cache will be expired
blob_client = MockBlobServiceClient()
cache_file = tempfile.NamedTemporaryFile()
# Patches the _get_containers_cache_filename module so that it returns the name of the new tempfile that
# represents the cache file
with patch.object(
azureblob,
"_get_containers_cache_filename",
MagicMock(return_value=str(cache_file.name)),
):
# Patches the from_connection_string module of the BlobServiceClient class so that a connection string does
# not need to be given. Additionally it returns example blob data used by the ext_pillar.
with patch.object(
BlobServiceClient,
"from_connection_string",
MagicMock(return_value=blob_client),
):
ret = azureblob._init(
"", container, multiple_env, environment, blob_cache_expire
)
cache_file.close()
self.assertEqual(
ret,
{
"base": {
"test": [
{
"container": None,
"name": "test.sls",
"prefix": None,
"delimiter": "/",
"results_per_page": None,
"location_mode": None,
}
]
}
},
)
def test__init_not_expired(self):
"""
Tests the result of _init when the cache is not expired.
"""
container = "test"
multiple_env = False
environment = "base"
blob_cache_expire = (time.time()) * (
time.time()
) # The cache will not be expired
metadata = {
"base": {
"test": [
{"name": "base/secret.sls", "relevant": "include.sls"},
{"name": "blobtest.sls", "irrelevant": "ignore.sls"},
]
}
}
cache_file = tempfile.NamedTemporaryFile()
# Pickles the metadata and stores it in cache_file
with salt.utils.files.fopen(str(cache_file), "wb") as fp_:
pickle.dump(metadata, fp_)
# Patches the _get_containers_cache_filename module so that it returns the name of the new tempfile that
# represents the cache file
with patch.object(
azureblob,
"_get_containers_cache_filename",
MagicMock(return_value=str(cache_file.name)),
):
# Patches the _read_containers_cache_file module so that it returns what it normally would if the new
# tempfile representing the cache file was passed to it
plugged = azureblob._read_containers_cache_file(str(cache_file))
with patch.object(
azureblob,
"_read_containers_cache_file",
MagicMock(return_value=plugged),
):
ret = azureblob._init(
"", container, multiple_env, environment, blob_cache_expire
)
fp_.close()
os.remove(str(fp_.name))
cache_file.close()
self.assertEqual(ret, metadata)
def test__get_cache_dir(self):
"""
Tests the result of _get_cache_dir.
"""
ret = azureblob._get_cache_dir()
self.assertEqual(ret, "/var/cache/salt/master/pillar_azureblob")
def test__get_cached_file_name(self):
"""
Tests the result of _get_cached_file_name.
"""
container = "test"
saltenv = "base"
path = "base/secret.sls"
ret = azureblob._get_cached_file_name(container, saltenv, path)
self.assertEqual(
ret, "/var/cache/salt/master/pillar_azureblob/base/test/base/secret.sls"
)
def test__get_containers_cache_filename(self):
"""
Tests the result of _get_containers_cache_filename.
"""
container = "test"
ret = azureblob._get_containers_cache_filename(container)
self.assertEqual(
ret, "/var/cache/salt/master/pillar_azureblob/test-files.cache"
)
def test__refresh_containers_cache_file(self):
"""
Tests the result of _refresh_containers_cache_file to ensure that it successfully copies blob data into a
cache file.
"""
blob_client = MockBlobServiceClient()
container = "test"
cache_file = tempfile.NamedTemporaryFile()
with patch.object(
BlobServiceClient,
"from_connection_string",
MagicMock(return_value=blob_client),
):
ret = azureblob._refresh_containers_cache_file(
"", container, cache_file.name
)
cache_file.close()
self.assertEqual(
ret,
{
"base": {
"test": [
{
"container": None,
"name": "test.sls",
"prefix": None,
"delimiter": "/",
"results_per_page": None,
"location_mode": None,
}
]
}
},
)
def test__read_containers_cache_file(self):
"""
Tests the result of _read_containers_cache_file to make sure that it successfully loads in pickled metadata.
"""
metadata = {
"base": {
"test": [
{"name": "base/secret.sls", "relevant": "include.sls"},
{"name": "blobtest.sls", "irrelevant": "ignore.sls"},
]
}
}
cache_file = tempfile.NamedTemporaryFile()
# Pickles the metadata and stores it in cache_file
with salt.utils.files.fopen(str(cache_file), "wb") as fp_:
pickle.dump(metadata, fp_)
# Checks to see if _read_containers_cache_file can successfully read the pickled metadata from the cache file
ret = azureblob._read_containers_cache_file(str(cache_file))
fp_.close()
os.remove(str(fp_.name))
cache_file.close()
self.assertEqual(ret, metadata)
def test__find_files(self):
"""
Tests the result of _find_files. Ensures it only finds files and not directories. Ensures it also ignore
irrelevant files.
"""
metadata = {
"test": [
{"name": "base/secret.sls"},
{"name": "blobtest.sls", "irrelevant": "ignore.sls"},
{"name": "base/"},
]
}
ret = azureblob._find_files(metadata)
self.assertEqual(ret, {"test": ["base/secret.sls", "blobtest.sls"]})
def test__find_file_meta1(self):
"""
Tests the result of _find_file_meta when the metadata contains a blob with the specified path and a blob
without the specified path.
"""
metadata = {
"base": {
"test": [
{"name": "base/secret.sls", "relevant": "include.sls"},
{"name": "blobtest.sls", "irrelevant": "ignore.sls"},
]
}
}
container = "test"
saltenv = "base"
path = "base/secret.sls"
ret = azureblob._find_file_meta(metadata, container, saltenv, path)
self.assertEqual(ret, {"name": "base/secret.sls", "relevant": "include.sls"})
def test__find_file_meta2(self):
"""
Tests the result of _find_file_meta when the saltenv in metadata does not match the specified saltenv.
"""
metadata = {"wrong": {"test": [{"name": "base/secret.sls"}]}}
container = "test"
saltenv = "base"
path = "base/secret.sls"
ret = azureblob._find_file_meta(metadata, container, saltenv, path)
self.assertEqual(ret, None)
def test__find_file_meta3(self):
"""
Tests the result of _find_file_meta when the container in metadata does not match the specified metadata.
"""
metadata = {"base": {"wrong": [{"name": "base/secret.sls"}]}}
container = "test"
saltenv = "base"
path = "base/secret.sls"
ret = azureblob._find_file_meta(metadata, container, saltenv, path)
self.assertEqual(ret, None)