Merge pull request #52699 from aplanas/fix_brtfs

btrfs: Add properties state
This commit is contained in:
Daniel Wozniak 2020-04-22 12:01:00 -07:00 committed by GitHub
commit a9ebb98d5d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 1155 additions and 0 deletions

View file

@ -66,6 +66,7 @@ state modules
boto_sqs
boto_vpc
bower
btrfs
cabal
ceph
chef

View file

@ -0,0 +1,6 @@
=================
salt.states.btrfs
=================
.. automodule:: salt.states.btrfs
:members:

377
salt/states/btrfs.py Normal file
View file

@ -0,0 +1,377 @@
# -*- coding: utf-8 -*-
"""
:maintainer: Alberto Planas <aplanas@suse.com>
:maturity: new
:depends: None
:platform: Linux
"""
from __future__ import absolute_import, print_function, unicode_literals
import functools
import logging
import os.path
import tempfile
import traceback
from salt.exceptions import CommandExecutionError
log = logging.getLogger(__name__)
__virtualname__ = "btrfs"
def _mount(device, use_default):
"""
Mount the device in a temporary place.
"""
opts = "defaults" if use_default else "subvol=/"
dest = tempfile.mkdtemp()
res = __states__["mount.mounted"](
dest, device=device, fstype="btrfs", opts=opts, persist=False
)
if not res["result"]:
log.error("Cannot mount device %s in %s", device, dest)
_umount(dest)
return None
return dest
def _umount(path):
"""
Umount and clean the temporary place.
"""
__states__["mount.unmounted"](path)
__utils__["files.rm_rf"](path)
def _is_default(path, dest, name):
"""
Check if the subvolume is the current default.
"""
subvol_id = __salt__["btrfs.subvolume_show"](path)[name]["subvolume id"]
def_id = __salt__["btrfs.subvolume_get_default"](dest)["id"]
return subvol_id == def_id
def _set_default(path, dest, name):
"""
Set the subvolume as the current default.
"""
subvol_id = __salt__["btrfs.subvolume_show"](path)[name]["subvolume id"]
return __salt__["btrfs.subvolume_set_default"](subvol_id, dest)
def _is_cow(path):
"""
Check if the subvolume is copy on write
"""
dirname = os.path.dirname(path)
return "C" not in __salt__["file.lsattr"](dirname)[path]
def _unset_cow(path):
"""
Disable the copy on write in a subvolume
"""
return __salt__["file.chattr"](path, operator="add", attributes="C")
def __mount_device(action):
"""
Small decorator to makes sure that the mount and umount happends in
a transactional way.
"""
@functools.wraps(action)
def wrapper(*args, **kwargs):
name = kwargs["name"]
device = kwargs["device"]
use_default = kwargs.get("use_default", False)
ret = {
"name": name,
"result": False,
"changes": {},
"comment": ["Some error happends during the operation."],
}
try:
if device:
dest = _mount(device, use_default)
if not dest:
msg = "Device {} cannot be mounted".format(device)
ret["comment"].append(msg)
kwargs["__dest"] = dest
ret = action(*args, **kwargs)
except Exception as e: # pylint: disable=broad-except
log.error("""Traceback: {}""".format(traceback.format_exc()))
ret["comment"].append(e)
finally:
if device:
_umount(dest)
return ret
return wrapper
@__mount_device
def subvolume_created(
name,
device,
qgroupids=None,
set_default=False,
copy_on_write=True,
force_set_default=True,
__dest=None,
):
"""
Makes sure that a btrfs subvolume is present.
name
Name of the subvolume to add
device
Device where to create the subvolume
qgroupids
Add the newly created subcolume to a qgroup. This parameter
is a list
set_default
If True, this new subvolume will be set as default when
mounted, unless subvol option in mount is used
copy_on_write
If false, set the subvolume with chattr +C
force_set_default
If false and the subvolume is already present, it will not
force it as default if ``set_default`` is True
"""
ret = {
"name": name,
"result": False,
"changes": {},
"comment": [],
}
path = os.path.join(__dest, name)
exists = __salt__["btrfs.subvolume_exists"](path)
if exists:
ret["comment"].append("Subvolume {} already present".format(name))
# Resolve first the test case. The check is not complete, but at
# least we will report if a subvolume needs to be created. Can
# happend that the subvolume is there, but we also need to set it
# as default, or persist in fstab.
if __opts__["test"]:
ret["result"] = None
if not exists:
ret["changes"][name] = "Subvolume {} will be created".format(name)
return ret
if not exists:
# Create the directories where the subvolume lives
_path = os.path.dirname(path)
res = __states__["file.directory"](_path, makedirs=True)
if not res["result"]:
ret["comment"].append("Error creating {} directory".format(_path))
return ret
try:
__salt__["btrfs.subvolume_create"](name, dest=__dest, qgroupids=qgroupids)
except CommandExecutionError:
ret["comment"].append("Error creating subvolume {}".format(name))
return ret
ret["changes"][name] = "Created subvolume {}".format(name)
# If the volume was already present, we can opt-out the check for
# default subvolume.
if (
(not exists or (exists and force_set_default))
and set_default
and not _is_default(path, __dest, name)
):
ret["changes"][name + "_default"] = _set_default(path, __dest, name)
if not copy_on_write and _is_cow(path):
ret["changes"][name + "_no_cow"] = _unset_cow(path)
ret["result"] = True
return ret
@__mount_device
def subvolume_deleted(name, device, commit=False, __dest=None):
"""
Makes sure that a btrfs subvolume is removed.
name
Name of the subvolume to remove
device
Device where to remove the subvolume
commit
Wait until the transaction is over
"""
ret = {
"name": name,
"result": False,
"changes": {},
"comment": [],
}
path = os.path.join(__dest, name)
exists = __salt__["btrfs.subvolume_exists"](path)
if not exists:
ret["comment"].append("Subvolume {} already missing".format(name))
if __opts__["test"]:
ret["result"] = None
if exists:
ret["changes"][name] = "Subvolume {} will be removed".format(name)
return ret
# If commit is set, we wait until all is over
commit = "after" if commit else None
if not exists:
try:
__salt__["btrfs.subvolume_delete"](path, commit=commit)
except CommandExecutionError:
ret["comment"].append("Error removing subvolume {}".format(name))
return ret
ret["changes"][name] = "Removed subvolume {}".format(name)
ret["result"] = True
return ret
def _diff_properties(expected, current):
"""Calculate the difference between the current and the expected
properties
* 'expected' is expressed in a dictionary like: {'property': value}
* 'current' contains the same format retuned by 'btrfs.properties'
If the property is not available, will throw an exception.
"""
difference = {}
for _property, value in expected.items():
current_value = current[_property]["value"]
if value is False and current_value == "N/A":
needs_update = False
elif value != current_value:
needs_update = True
else:
needs_update = False
if needs_update:
difference[_property] = value
return difference
@__mount_device
def properties(name, device, use_default=False, __dest=None, **properties):
"""
Makes sure that a list of properties are set in a subvolume, file
or device.
name
Name of the object to change
device
Device where the object lives, if None, the device will be in
name
use_default
If True, this subvolume will be resolved to the default
subvolume assigned during the create operation
properties
Dictionary of properties
Valid properties are 'ro', 'label' or 'compression'. Check the
documentation to see where those properties are valid for each
object.
"""
ret = {
"name": name,
"result": False,
"changes": {},
"comment": [],
}
# 'name' will have always the name of the object that we want to
# change, but if the object is a device, we do not repeat it again
# in 'device'. This makes device sometimes optional.
if device:
if os.path.isabs(name):
path = os.path.join(__dest, os.path.relpath(name, os.path.sep))
else:
path = os.path.join(__dest, name)
else:
path = name
if not os.path.exists(path):
ret["comment"].append("Object {} not found".format(name))
return ret
# Convert the booleans to lowercase
properties = {
k: v if type(v) is not bool else str(v).lower() for k, v in properties.items()
}
current_properties = {}
try:
current_properties = __salt__["btrfs.properties"](path)
except CommandExecutionError as e:
ret["comment"].append("Error reading properties from {}".format(name))
ret["comment"].append("Current error {}".format(e))
return ret
try:
properties_to_set = _diff_properties(properties, current_properties)
except KeyError:
ret["comment"].append("Some property not found in {}".format(name))
return ret
if __opts__["test"]:
ret["result"] = None
if properties_to_set:
ret["changes"] = properties_to_set
else:
msg = "No properties will be changed in {}".format(name)
ret["comment"].append(msg)
return ret
if properties_to_set:
_properties = ",".join(
"{}={}".format(k, v) for k, v in properties_to_set.items()
)
__salt__["btrfs.properties"](path, set=_properties)
current_properties = __salt__["btrfs.properties"](path)
properties_failed = _diff_properties(properties, current_properties)
if properties_failed:
msg = "Properties {} failed to be changed in {}".format(
properties_failed, name
)
ret["comment"].append(msg)
return ret
ret["comment"].append("Properties changed in {}".format(name))
ret["changes"] = properties_to_set
else:
ret["comment"].append("Properties not changed in {}".format(name))
ret["result"] = True
return ret

View file

@ -0,0 +1,771 @@
# -*- coding: utf-8 -*-
"""
:maintainer: Alberto Planas <aplanas@suse.com>
:platform: Linux
"""
# Import Python Libs
from __future__ import absolute_import, print_function, unicode_literals
import pytest
import salt.states.btrfs as btrfs
import salt.utils.platform
from salt.exceptions import CommandExecutionError
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.mock import MagicMock, patch
from tests.support.unit import TestCase, skipIf
@skipIf(salt.utils.platform.is_windows(), "Non-Windows feature")
class BtrfsTestCase(TestCase, LoaderModuleMockMixin):
"""
Test cases for salt.states.btrfs
"""
def setup_loader_modules(self):
return {btrfs: {"__salt__": {}, "__states__": {}, "__utils__": {}}}
@patch("salt.states.btrfs._umount")
@patch("tempfile.mkdtemp")
def test__mount_fails(self, mkdtemp, umount):
"""
Test mounting a device in a temporary place.
"""
mkdtemp.return_value = "/tmp/xxx"
states_mock = {
"mount.mounted": MagicMock(return_value={"result": False}),
}
with patch.dict(btrfs.__states__, states_mock):
assert btrfs._mount("/dev/sda1", use_default=False) is None
mkdtemp.assert_called_once()
states_mock["mount.mounted"].assert_called_with(
"/tmp/xxx",
device="/dev/sda1",
fstype="btrfs",
opts="subvol=/",
persist=False,
)
umount.assert_called_with("/tmp/xxx")
@patch("salt.states.btrfs._umount")
@patch("tempfile.mkdtemp")
def test__mount(self, mkdtemp, umount):
"""
Test mounting a device in a temporary place.
"""
mkdtemp.return_value = "/tmp/xxx"
states_mock = {
"mount.mounted": MagicMock(return_value={"result": True}),
}
with patch.dict(btrfs.__states__, states_mock):
assert btrfs._mount("/dev/sda1", use_default=False) == "/tmp/xxx"
mkdtemp.assert_called_once()
states_mock["mount.mounted"].assert_called_with(
"/tmp/xxx",
device="/dev/sda1",
fstype="btrfs",
opts="subvol=/",
persist=False,
)
umount.assert_not_called()
@patch("salt.states.btrfs._umount")
@patch("tempfile.mkdtemp")
def test__mount_use_default(self, mkdtemp, umount):
"""
Test mounting a device in a temporary place.
"""
mkdtemp.return_value = "/tmp/xxx"
states_mock = {
"mount.mounted": MagicMock(return_value={"result": True}),
}
with patch.dict(btrfs.__states__, states_mock):
assert btrfs._mount("/dev/sda1", use_default=True) == "/tmp/xxx"
mkdtemp.assert_called_once()
states_mock["mount.mounted"].assert_called_with(
"/tmp/xxx",
device="/dev/sda1",
fstype="btrfs",
opts="defaults",
persist=False,
)
umount.assert_not_called()
def test__umount(self):
"""
Test umounting and cleanning temporary place.
"""
states_mock = {
"mount.unmounted": MagicMock(),
}
utils_mock = {
"files.rm_rf": MagicMock(),
}
with patch.dict(btrfs.__states__, states_mock), patch.dict(
btrfs.__utils__, utils_mock
):
btrfs._umount("/tmp/xxx")
states_mock["mount.unmounted"].assert_called_with("/tmp/xxx")
utils_mock["files.rm_rf"].assert_called_with("/tmp/xxx")
def test__is_default_not_default(self):
"""
Test if the subvolume is the current default.
"""
salt_mock = {
"btrfs.subvolume_show": MagicMock(
return_value={"@/var": {"subvolume id": "256"}}
),
"btrfs.subvolume_get_default": MagicMock(return_value={"id": "5"}),
}
with patch.dict(btrfs.__salt__, salt_mock):
assert not btrfs._is_default("/tmp/xxx/@/var", "/tmp/xxx", "@/var")
salt_mock["btrfs.subvolume_show"].assert_called_with("/tmp/xxx/@/var")
salt_mock["btrfs.subvolume_get_default"].assert_called_with("/tmp/xxx")
def test__is_default(self):
"""
Test if the subvolume is the current default.
"""
salt_mock = {
"btrfs.subvolume_show": MagicMock(
return_value={"@/var": {"subvolume id": "256"}}
),
"btrfs.subvolume_get_default": MagicMock(return_value={"id": "256"}),
}
with patch.dict(btrfs.__salt__, salt_mock):
assert btrfs._is_default("/tmp/xxx/@/var", "/tmp/xxx", "@/var")
salt_mock["btrfs.subvolume_show"].assert_called_with("/tmp/xxx/@/var")
salt_mock["btrfs.subvolume_get_default"].assert_called_with("/tmp/xxx")
def test__set_default(self):
"""
Test setting a subvolume as the current default.
"""
salt_mock = {
"btrfs.subvolume_show": MagicMock(
return_value={"@/var": {"subvolume id": "256"}}
),
"btrfs.subvolume_set_default": MagicMock(return_value=True),
}
with patch.dict(btrfs.__salt__, salt_mock):
assert btrfs._set_default("/tmp/xxx/@/var", "/tmp/xxx", "@/var")
salt_mock["btrfs.subvolume_show"].assert_called_with("/tmp/xxx/@/var")
salt_mock["btrfs.subvolume_set_default"].assert_called_with(
"256", "/tmp/xxx"
)
def test__is_cow_not_cow(self):
"""
Test if the subvolume is copy on write.
"""
salt_mock = {
"file.lsattr": MagicMock(return_value={"/tmp/xxx/@/var": ["C"]}),
}
with patch.dict(btrfs.__salt__, salt_mock):
assert not btrfs._is_cow("/tmp/xxx/@/var")
salt_mock["file.lsattr"].assert_called_with("/tmp/xxx/@")
def test__is_cow(self):
"""
Test if the subvolume is copy on write.
"""
salt_mock = {
"file.lsattr": MagicMock(return_value={"/tmp/xxx/@/var": []}),
}
with patch.dict(btrfs.__salt__, salt_mock):
assert btrfs._is_cow("/tmp/xxx/@/var")
salt_mock["file.lsattr"].assert_called_with("/tmp/xxx/@")
def test__unset_cow(self):
"""
Test disabling the subvolume as copy on write.
"""
salt_mock = {
"file.chattr": MagicMock(return_value=True),
}
with patch.dict(btrfs.__salt__, salt_mock):
assert btrfs._unset_cow("/tmp/xxx/@/var")
salt_mock["file.chattr"].assert_called_with(
"/tmp/xxx/@/var", operator="add", attributes="C"
)
@skipIf(salt.utils.platform.is_windows(), "Skip on Windows")
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
def test_subvolume_created_exists(self, mount, umount):
"""
Test creating a subvolume.
"""
mount.return_value = "/tmp/xxx"
salt_mock = {
"btrfs.subvolume_exists": MagicMock(return_value=True),
}
opts_mock = {
"test": False,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__opts__, opts_mock
):
assert btrfs.subvolume_created(name="@/var", device="/dev/sda1") == {
"name": "@/var",
"result": True,
"changes": {},
"comment": ["Subvolume @/var already present"],
}
salt_mock["btrfs.subvolume_exists"].assert_called_with("/tmp/xxx/@/var")
mount.assert_called_once()
umount.assert_called_once()
@skipIf(salt.utils.platform.is_windows(), "Skip on Windows")
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
def test_subvolume_created_exists_test(self, mount, umount):
"""
Test creating a subvolume.
"""
mount.return_value = "/tmp/xxx"
salt_mock = {
"btrfs.subvolume_exists": MagicMock(return_value=True),
}
opts_mock = {
"test": True,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__opts__, opts_mock
):
assert btrfs.subvolume_created(name="@/var", device="/dev/sda1") == {
"name": "@/var",
"result": None,
"changes": {},
"comment": ["Subvolume @/var already present"],
}
salt_mock["btrfs.subvolume_exists"].assert_called_with("/tmp/xxx/@/var")
mount.assert_called_once()
umount.assert_called_once()
@skipIf(salt.utils.platform.is_windows(), "Skip on Windows")
@patch("salt.states.btrfs._is_default")
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
def test_subvolume_created_exists_was_default(self, mount, umount, is_default):
"""
Test creating a subvolume.
"""
mount.return_value = "/tmp/xxx"
is_default.return_value = True
salt_mock = {
"btrfs.subvolume_exists": MagicMock(return_value=True),
}
opts_mock = {
"test": False,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__opts__, opts_mock
):
assert btrfs.subvolume_created(
name="@/var", device="/dev/sda1", set_default=True
) == {
"name": "@/var",
"result": True,
"changes": {},
"comment": ["Subvolume @/var already present"],
}
salt_mock["btrfs.subvolume_exists"].assert_called_with("/tmp/xxx/@/var")
mount.assert_called_once()
umount.assert_called_once()
@skipIf(salt.utils.platform.is_windows(), "Skip on Windows")
@patch("salt.states.btrfs._set_default")
@patch("salt.states.btrfs._is_default")
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
def test_subvolume_created_exists_set_default(
self, mount, umount, is_default, set_default
):
"""
Test creating a subvolume.
"""
mount.return_value = "/tmp/xxx"
is_default.return_value = False
set_default.return_value = True
salt_mock = {
"btrfs.subvolume_exists": MagicMock(return_value=True),
}
opts_mock = {
"test": False,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__opts__, opts_mock
):
assert btrfs.subvolume_created(
name="@/var", device="/dev/sda1", set_default=True
) == {
"name": "@/var",
"result": True,
"changes": {"@/var_default": True},
"comment": ["Subvolume @/var already present"],
}
salt_mock["btrfs.subvolume_exists"].assert_called_with("/tmp/xxx/@/var")
mount.assert_called_once()
umount.assert_called_once()
@skipIf(salt.utils.platform.is_windows(), "Skip on Windows")
@patch("salt.states.btrfs._set_default")
@patch("salt.states.btrfs._is_default")
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
def test_subvolume_created_exists_set_default_no_force(
self, mount, umount, is_default, set_default
):
"""
Test creating a subvolume.
"""
mount.return_value = "/tmp/xxx"
is_default.return_value = False
set_default.return_value = True
salt_mock = {
"btrfs.subvolume_exists": MagicMock(return_value=True),
}
opts_mock = {
"test": False,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__opts__, opts_mock
):
assert btrfs.subvolume_created(
name="@/var",
device="/dev/sda1",
set_default=True,
force_set_default=False,
) == {
"name": "@/var",
"result": True,
"changes": {},
"comment": ["Subvolume @/var already present"],
}
salt_mock["btrfs.subvolume_exists"].assert_called_with("/tmp/xxx/@/var")
mount.assert_called_once()
umount.assert_called_once()
@skipIf(salt.utils.platform.is_windows(), "Skip on Windows")
@patch("salt.states.btrfs._is_cow")
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
def test_subvolume_created_exists_no_cow(self, mount, umount, is_cow):
"""
Test creating a subvolume.
"""
mount.return_value = "/tmp/xxx"
is_cow.return_value = False
salt_mock = {
"btrfs.subvolume_exists": MagicMock(return_value=True),
}
opts_mock = {
"test": False,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__opts__, opts_mock
):
assert btrfs.subvolume_created(
name="@/var", device="/dev/sda1", copy_on_write=False
) == {
"name": "@/var",
"result": True,
"changes": {},
"comment": ["Subvolume @/var already present"],
}
salt_mock["btrfs.subvolume_exists"].assert_called_with("/tmp/xxx/@/var")
mount.assert_called_once()
umount.assert_called_once()
@skipIf(salt.utils.platform.is_windows(), "Skip on Windows")
@patch("salt.states.btrfs._unset_cow")
@patch("salt.states.btrfs._is_cow")
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
def test_subvolume_created_exists_unset_cow(self, mount, umount, is_cow, unset_cow):
"""
Test creating a subvolume.
"""
mount.return_value = "/tmp/xxx"
is_cow.return_value = True
unset_cow.return_value = True
salt_mock = {
"btrfs.subvolume_exists": MagicMock(return_value=True),
}
opts_mock = {
"test": False,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__opts__, opts_mock
):
assert btrfs.subvolume_created(
name="@/var", device="/dev/sda1", copy_on_write=False
) == {
"name": "@/var",
"result": True,
"changes": {"@/var_no_cow": True},
"comment": ["Subvolume @/var already present"],
}
salt_mock["btrfs.subvolume_exists"].assert_called_with("/tmp/xxx/@/var")
mount.assert_called_once()
umount.assert_called_once()
@skipIf(salt.utils.platform.is_windows(), "Skip on Windows")
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
def test_subvolume_created(self, mount, umount):
"""
Test creating a subvolume.
"""
mount.return_value = "/tmp/xxx"
salt_mock = {
"btrfs.subvolume_exists": MagicMock(return_value=False),
"btrfs.subvolume_create": MagicMock(),
}
states_mock = {
"file.directory": MagicMock(return_value={"result": True}),
}
opts_mock = {
"test": False,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__states__, states_mock
), patch.dict(btrfs.__opts__, opts_mock):
assert btrfs.subvolume_created(name="@/var", device="/dev/sda1") == {
"name": "@/var",
"result": True,
"changes": {"@/var": "Created subvolume @/var"},
"comment": [],
}
salt_mock["btrfs.subvolume_exists"].assert_called_with("/tmp/xxx/@/var")
salt_mock["btrfs.subvolume_create"].assert_called_once()
mount.assert_called_once()
umount.assert_called_once()
@skipIf(salt.utils.platform.is_windows(), "Skip on Windows")
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
def test_subvolume_created_fails_directory(self, mount, umount):
"""
Test creating a subvolume.
"""
mount.return_value = "/tmp/xxx"
salt_mock = {
"btrfs.subvolume_exists": MagicMock(return_value=False),
}
states_mock = {
"file.directory": MagicMock(return_value={"result": False}),
}
opts_mock = {
"test": False,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__states__, states_mock
), patch.dict(btrfs.__opts__, opts_mock):
assert btrfs.subvolume_created(name="@/var", device="/dev/sda1") == {
"name": "@/var",
"result": False,
"changes": {},
"comment": ["Error creating /tmp/xxx/@ directory"],
}
salt_mock["btrfs.subvolume_exists"].assert_called_with("/tmp/xxx/@/var")
mount.assert_called_once()
umount.assert_called_once()
@skipIf(salt.utils.platform.is_windows(), "Skip on Windows")
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
def test_subvolume_created_fails(self, mount, umount):
"""
Test creating a subvolume.
"""
mount.return_value = "/tmp/xxx"
salt_mock = {
"btrfs.subvolume_exists": MagicMock(return_value=False),
"btrfs.subvolume_create": MagicMock(side_effect=CommandExecutionError),
}
states_mock = {
"file.directory": MagicMock(return_value={"result": True}),
}
opts_mock = {
"test": False,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__states__, states_mock
), patch.dict(btrfs.__opts__, opts_mock):
assert btrfs.subvolume_created(name="@/var", device="/dev/sda1") == {
"name": "@/var",
"result": False,
"changes": {},
"comment": ["Error creating subvolume @/var"],
}
salt_mock["btrfs.subvolume_exists"].assert_called_with("/tmp/xxx/@/var")
salt_mock["btrfs.subvolume_create"].assert_called_once()
mount.assert_called_once()
umount.assert_called_once()
def test_diff_properties_fails(self):
"""
Test when diff_properties do not found a property
"""
expected = {"wrong_property": True}
current = {
"compression": {
"description": "Set/get compression for a file or directory",
"value": "N/A",
},
"label": {"description": "Set/get label of device.", "value": "N/A"},
"ro": {
"description": "Set/get read-only flag or subvolume",
"value": "N/A",
},
}
with pytest.raises(Exception):
btrfs._diff_properties(expected, current)
def test_diff_properties_enable_ro(self):
"""
Test when diff_properties enable one single property
"""
expected = {"ro": True}
current = {
"compression": {
"description": "Set/get compression for a file or directory",
"value": "N/A",
},
"label": {"description": "Set/get label of device.", "value": "N/A"},
"ro": {
"description": "Set/get read-only flag or subvolume",
"value": "N/A",
},
}
assert btrfs._diff_properties(expected, current) == {"ro": True}
def test_diff_properties_only_enable_ro(self):
"""
Test when diff_properties is half ready
"""
expected = {"ro": True, "label": "mylabel"}
current = {
"compression": {
"description": "Set/get compression for a file or directory",
"value": "N/A",
},
"label": {"description": "Set/get label of device.", "value": "mylabel"},
"ro": {
"description": "Set/get read-only flag or subvolume",
"value": "N/A",
},
}
assert btrfs._diff_properties(expected, current) == {"ro": True}
def test_diff_properties_disable_ro(self):
"""
Test when diff_properties enable one single property
"""
expected = {"ro": False}
current = {
"compression": {
"description": "Set/get compression for a file or directory",
"value": "N/A",
},
"label": {"description": "Set/get label of device.", "value": "N/A"},
"ro": {
"description": "Set/get read-only flag or subvolume",
"value": True,
},
}
assert btrfs._diff_properties(expected, current) == {"ro": False}
def test_diff_properties_emty_na(self):
"""
Test when diff_properties is already disabled as N/A
"""
expected = {"ro": False}
current = {
"compression": {
"description": "Set/get compression for a file or directory",
"value": "N/A",
},
"label": {"description": "Set/get label of device.", "value": "N/A"},
"ro": {
"description": "Set/get read-only flag or subvolume",
"value": "N/A",
},
}
assert btrfs._diff_properties(expected, current) == {}
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
@patch("os.path.exists")
def test_properties_subvolume_not_exists(self, exists, mount, umount):
"""
Test when subvolume is not present
"""
exists.return_value = False
mount.return_value = "/tmp/xxx"
assert btrfs.properties(name="@/var", device="/dev/sda1") == {
"name": "@/var",
"result": False,
"changes": {},
"comment": ["Object @/var not found"],
}
mount.assert_called_once()
umount.assert_called_once()
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
@patch("os.path.exists")
def test_properties_default_root_subvolume(self, exists, mount, umount):
"""
Test when root subvolume resolves to another subvolume
"""
exists.return_value = False
mount.return_value = "/tmp/xxx"
assert btrfs.properties(name="/", device="/dev/sda1") == {
"name": "/",
"result": False,
"changes": {},
"comment": ["Object / not found"],
}
exists.assert_called_with("/tmp/xxx/.")
@patch("os.path.exists")
def test_properties_device_fail(self, exists):
"""
Test when we try to set a device that is not pressent
"""
exists.return_value = False
assert btrfs.properties(name="/dev/sda1", device=None) == {
"name": "/dev/sda1",
"result": False,
"changes": {},
"comment": ["Object /dev/sda1 not found"],
}
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
@patch("os.path.exists")
def test_properties_subvolume_fail(self, exists, mount, umount):
"""
Test setting a wrong property in a subvolume
"""
exists.return_value = True
mount.return_value = "/tmp/xxx"
salt_mock = {
"btrfs.properties": MagicMock(
side_effect=[
{
"ro": {
"description": "Set/get read-only flag or subvolume",
"value": "N/A",
},
}
]
),
}
opts_mock = {
"test": False,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__opts__, opts_mock
):
assert btrfs.properties(
name="@/var", device="/dev/sda1", wrond_property=True
) == {
"name": "@/var",
"result": False,
"changes": {},
"comment": ["Some property not found in @/var"],
}
salt_mock["btrfs.properties"].assert_called_with("/tmp/xxx/@/var")
mount.assert_called_once()
umount.assert_called_once()
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
@patch("os.path.exists")
def test_properties_enable_ro_subvolume(self, exists, mount, umount):
"""
Test setting a ro property in a subvolume
"""
exists.return_value = True
mount.return_value = "/tmp/xxx"
salt_mock = {
"btrfs.properties": MagicMock(
side_effect=[
{
"ro": {
"description": "Set/get read-only flag or subvolume",
"value": "N/A",
},
},
None,
{
"ro": {
"description": "Set/get read-only flag or subvolume",
"value": "true",
},
},
]
),
}
opts_mock = {
"test": False,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__opts__, opts_mock
):
assert btrfs.properties(name="@/var", device="/dev/sda1", ro=True) == {
"name": "@/var",
"result": True,
"changes": {"ro": "true"},
"comment": ["Properties changed in @/var"],
}
salt_mock["btrfs.properties"].assert_any_call("/tmp/xxx/@/var")
salt_mock["btrfs.properties"].assert_any_call(
"/tmp/xxx/@/var", set="ro=true"
)
mount.assert_called_once()
umount.assert_called_once()
@patch("salt.states.btrfs._umount")
@patch("salt.states.btrfs._mount")
@patch("os.path.exists")
def test_properties_test(self, exists, mount, umount):
"""
Test setting a property in test mode.
"""
exists.return_value = True
mount.return_value = "/tmp/xxx"
salt_mock = {
"btrfs.properties": MagicMock(
side_effect=[
{
"ro": {
"description": "Set/get read-only flag or subvolume",
"value": "N/A",
},
},
]
),
}
opts_mock = {
"test": True,
}
with patch.dict(btrfs.__salt__, salt_mock), patch.dict(
btrfs.__opts__, opts_mock
):
assert btrfs.properties(name="@/var", device="/dev/sda1", ro=True) == {
"name": "@/var",
"result": None,
"changes": {"ro": "true"},
"comment": [],
}
salt_mock["btrfs.properties"].assert_called_with("/tmp/xxx/@/var")
mount.assert_called_once()
umount.assert_called_once()