Create file if doesnt exist ini.options_present

This commit is contained in:
ch3ll 2020-08-31 14:54:32 -04:00 committed by Daniel Wozniak
parent 98ba07a618
commit 137dbce30f
5 changed files with 204 additions and 33 deletions

1
changelog/34236.fixed Normal file
View file

@ -0,0 +1 @@
Create ini file if does not exist when using ini.options_present state module.

View file

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Edit ini files
@ -10,7 +9,6 @@ Edit ini files
(for example /etc/sysctl.conf)
"""
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
import os
@ -24,7 +22,6 @@ import salt.utils.stringutils
from salt.exceptions import CommandExecutionError
# Import 3rd-party libs
from salt.ext import six
from salt.utils.odict import OrderedDict
log = logging.getLogger(__name__)
@ -169,7 +166,7 @@ def get_section(file_name, section, separator="="):
"""
inifile = _Ini.get_ini_file(file_name, separator=separator)
ret = {}
for key, value in six.iteritems(inifile.get(section, {})):
for key, value in inifile.get(section, {}).items():
if key[0] != "#":
ret.update({key: value})
return ret
@ -200,7 +197,7 @@ def remove_section(file_name, section, separator="="):
section = inifile.pop(section)
inifile.flush()
ret = {}
for key, value in six.iteritems(section):
for key, value in section.items():
if key[0] != "#":
ret.update({key: value})
return ret
@ -233,7 +230,7 @@ def get_ini(file_name, separator="="):
:return: regular dict
"""
ret = {}
for key, val in six.iteritems(odict):
for key, val in odict.items():
if key[0] != "#":
if isinstance(val, (dict, OrderedDict)):
ret.update({key: ini_odict2dict(val)})
@ -247,7 +244,7 @@ def get_ini(file_name, separator="="):
class _Section(OrderedDict):
def __init__(self, name, inicontents="", separator="=", commenter="#"):
super(_Section, self).__init__(self)
super().__init__(self)
self.name = name
self.inicontents = inicontents
self.sep = separator
@ -255,7 +252,7 @@ class _Section(OrderedDict):
opt_regx_prefix = r"(\s*)(.+?)\s*"
opt_regx_suffix = r"\s*(.*)\s*"
self.opt_regx_str = r"{0}(\{1}){2}".format(
self.opt_regx_str = r"{}(\{}){}".format(
opt_regx_prefix, self.sep, opt_regx_suffix
)
self.opt_regx = re.compile(self.opt_regx_str)
@ -275,7 +272,7 @@ class _Section(OrderedDict):
# Match comments
com_match = COM_REGX.match(opt_str)
if com_match:
name = "#comment{0}".format(comment_count)
name = "#comment{}".format(comment_count)
self.com = com_match.group(1)
comment_count += 1
self.update({name: opt_str})
@ -299,7 +296,7 @@ class _Section(OrderedDict):
self.update({name: value})
continue
# Anything remaining is a mystery.
name = "#unknown{0}".format(unknown_count)
name = "#unknown{}".format(unknown_count)
self.update({name: opt_str})
unknown_count += 1
@ -311,7 +308,7 @@ class _Section(OrderedDict):
# and to make sure options without sectons go above any section
options_backup = OrderedDict()
comment_index = None
for key, value in six.iteritems(self):
for key, value in self.items():
if comment_index is not None:
options_backup.update({key: value})
continue
@ -323,13 +320,13 @@ class _Section(OrderedDict):
for key in options_backup:
self.pop(key)
self.pop(comment_index, None)
super(_Section, self).update({opt_key: None})
for key, value in six.iteritems(options_backup):
super(_Section, self).update({key: value})
super().update({opt_key: None})
for key, value in options_backup.items():
super().update({key: value})
def update(self, update_dict):
changes = {}
for key, value in six.iteritems(update_dict):
for key, value in update_dict.items():
# Ensure the value is either a _Section or a string
if isinstance(value, (dict, OrderedDict)):
sect = _Section(
@ -339,7 +336,7 @@ class _Section(OrderedDict):
value = sect
value_plain = value.as_dict()
else:
value = six.text_type(value)
value = str(value)
value_plain = value
if key not in self:
@ -349,7 +346,7 @@ class _Section(OrderedDict):
if not isinstance(value, _Section):
self._uncomment_if_commented(key)
super(_Section, self).update({key: value})
super().update({key: value})
else:
curr_value = self.get(key, None)
if isinstance(curr_value, _Section):
@ -361,31 +358,30 @@ class _Section(OrderedDict):
changes.update(
{key: {"before": curr_value, "after": value_plain}}
)
super(_Section, self).update({key: value})
super().update({key: value})
return changes
def gen_ini(self):
yield "{0}[{1}]{0}".format(os.linesep, self.name)
sections_dict = OrderedDict()
for name, value in six.iteritems(self):
for name, value in self.items():
# Handle Comment Lines
if COM_REGX.match(name):
yield "{0}{1}".format(value, os.linesep)
yield "{}{}".format(value, os.linesep)
# Handle Sections
elif isinstance(value, _Section):
sections_dict.update({name: value})
# Key / Value pairs
# Adds spaces between the separator
else:
yield "{0}{1}{2}{3}".format(
yield "{}{}{}{}".format(
name,
" {0} ".format(self.sep) if self.sep != " " else self.sep,
" {} ".format(self.sep) if self.sep != " " else self.sep,
value,
os.linesep,
)
for name, value in six.iteritems(sections_dict):
for line in value.gen_ini():
yield line
for name, value in sections_dict.items():
yield from value.gen_ini()
def as_ini(self):
return "".join(self.gen_ini())
@ -394,11 +390,14 @@ class _Section(OrderedDict):
return dict(self)
def dump(self):
print(six.text_type(self))
print(str(self))
def __repr__(self, _repr_running=None):
_repr_running = _repr_running or {}
super_repr = super(_Section, self).__repr__(_repr_running)
try:
super_repr = super().__repr__(_repr_running)
except TypeError:
super_repr = super().__repr__()
return os.linesep.join((super_repr, salt.utils.json.dumps(self, indent=4)))
def __str__(self):
@ -414,14 +413,19 @@ class _Section(OrderedDict):
class _Ini(_Section):
def refresh(self, inicontents=None):
if inicontents is None:
if not os.path.exists(self.name):
log.trace(
"File {} does not exist and will be created".format(self.name)
)
return
try:
with salt.utils.files.fopen(self.name) as rfh:
inicontents = salt.utils.stringutils.to_unicode(rfh.read())
except (OSError, IOError) as exc:
except OSError as exc:
if __opts__["test"] is False:
raise CommandExecutionError(
"Unable to open file '{0}'. "
"Exception: {1}".format(self.name, exc)
"Unable to open file '{}'. "
"Exception: {}".format(self.name, exc)
)
if not inicontents:
return
@ -432,7 +436,7 @@ class _Ini(_Section):
inicontents.reverse()
# Pop anything defined outside of a section (ie. at the top of
# the ini file).
super(_Ini, self).refresh(inicontents.pop())
super().refresh(inicontents.pop())
for section_name, sect_ini in self._gen_tuples(inicontents):
try:
sect_obj = _Section(section_name, sect_ini, separator=self.sep)
@ -451,9 +455,9 @@ class _Ini(_Section):
if ini_gen_list:
ini_gen_list[0] = ini_gen_list[0].lstrip(os.linesep)
outfile.writelines(salt.utils.data.encode(ini_gen_list))
except (OSError, IOError) as exc:
except OSError as exc:
raise CommandExecutionError(
"Unable to write file '{0}'. " "Exception: {1}".format(self.name, exc)
"Unable to write file '{}'. " "Exception: {}".format(self.name, exc)
)
@staticmethod

View file

@ -0,0 +1,59 @@
"""
Integration tests for the ini_manage state
"""
import pytest
def test_options_present(salt_call_cli):
"""
test ini.options_present when the file
does not exist and then run it again
when it does exist and run it again when
we want to add more sections to the ini
"""
with pytest.helpers.temp_file("ini_file.ini") as tpath:
content = """
test_ini:
ini.options_present:
- name: {}
- sections:
general:
server_hostname: foo.com
server_port: 1234
""".format(
tpath
)
with pytest.helpers.temp_state_file("manage_ini.sls", content) as sfpath:
ret = salt_call_cli.run("--local", "state.apply", "manage_ini")
assert ret.json[next(iter(ret.json))]["changes"] == {
"general": {
"before": None,
"after": {"server_hostname": "foo.com", "server_port": "1234"},
}
}
content = """
test_ini:
ini.options_present:
- name: {}
- sections:
general:
server_hostname: foo.com
server_port: 1234
server_user: saltfoo
""".format(
tpath
)
with pytest.helpers.temp_state_file("manage_ini.sls", content) as sfpath:
# check to see adding a new section works
ret = salt_call_cli.run("--local", "state.apply", "manage_ini")
assert ret.json[next(iter(ret.json))]["changes"] == {
"general": {"server_user": {"before": None, "after": "saltfoo"}}
}
# check when no changes are expected
ret = salt_call_cli.run("--local", "state.apply", "manage_ini")
assert ret.json[next(iter(ret.json))]["changes"] == {}

View file

@ -0,0 +1,8 @@
import salt.modules.ini_manage
def test_section_req():
"""
Test the __repr__ in the _Section class
"""
assert repr(salt.modules.ini_manage._Section("test")) == "_Section()\n{}"

View file

@ -0,0 +1,99 @@
import copy
import os
import pytest
import salt.modules.ini_manage as mod_ini_manage
import salt.states.ini_manage as ini_manage
from tests.support.mock import patch
@pytest.fixture()
def sections():
sections = {"general": {"hostname": "myserver.com", "port": "1234"}}
return sections
@pytest.fixture(autouse=True)
def setup_loader(request):
setup_loader_modules = {
ini_manage: {
"__salt__": {
"ini.get_ini": mod_ini_manage.get_ini,
"ini.set_option": mod_ini_manage.set_option,
},
"__opts__": {"test": False},
},
mod_ini_manage: {"__opts__": {"test": False}},
}
with pytest.helpers.loader_mock(request, setup_loader_modules) as loader_mock:
yield loader_mock
def test_options_present(tmpdir, sections):
"""
Test to verify options present when
file does not initially exist
"""
name = tmpdir.join("test.ini").strpath
exp_ret = {
"name": name,
"changes": {"general": {"before": None, "after": sections["general"]}},
"result": True,
"comment": "Changes take effect",
}
assert ini_manage.options_present(name, sections) == exp_ret
assert os.path.exists(name)
assert mod_ini_manage.get_ini(name) == sections
def test_options_present_true_no_file(tmpdir, sections):
"""
Test to verify options present when
file does not initially exist and test=True
"""
name = tmpdir.join("test_true_no_file.ini").strpath
exp_ret = {
"name": name,
"changes": {},
"result": None,
"comment": "Changed key hostname in section general.\n"
"Changed key port in section general.\n",
}
with patch.dict(ini_manage.__opts__, {"test": True}), patch.dict(
mod_ini_manage.__opts__, {"test": True}
):
assert ini_manage.options_present(name, sections) == exp_ret
assert not os.path.exists(name)
def test_options_present_true_file(tmpdir, sections):
"""
Test to verify options present when
file does exist and test=True
"""
name = tmpdir.join("test_true_file.ini").strpath
exp_ret = {
"name": name,
"changes": {},
"result": None,
"comment": "Unchanged key hostname in section general.\n"
"Unchanged key port in section general.\n"
"Changed key user in section general.\n",
}
ini_manage.options_present(name, sections)
new_section = copy.deepcopy(sections)
new_section["general"]["user"] = "saltuser"
with patch.dict(ini_manage.__opts__, {"test": True}), patch.dict(
mod_ini_manage.__opts__, {"test": True}
):
assert ini_manage.options_present(name, new_section) == exp_ret
assert os.path.exists(name)
assert mod_ini_manage.get_ini(name) == sections