Merge pull request #47070 from terminalmage/with_tempdir

Use decorators for temp files/dirs in test suite
This commit is contained in:
Nicole Thomas 2018-04-19 10:01:47 -04:00 committed by GitHub
commit 6257862bbb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1777 additions and 2205 deletions

View file

@ -1,3 +0,0 @@
{{ salt['runtests_helpers.get_salt_temp_dir_for_path']('test.append') }}:
file:
- touch

View file

@ -1,4 +0,0 @@
issue-2227:
file.append:
- name: {{ salt['runtests_helpers.get_salt_temp_dir_for_path']('test.append') }}
- text: HISTTIMEFORMAT='%F %T '

View file

@ -1,4 +0,0 @@
{{ salt['runtests_helpers.get_salt_temp_dir_for_path']('test.append') }}:
file.append:
- source: salt://testappend/firstif

View file

@ -1,3 +0,0 @@
{{ salt['runtests_helpers.get_salt_temp_dir_for_path']('test.append') }}:
file.append:
- source: salt://testappend/secondif

View file

@ -44,7 +44,7 @@ class CPModuleTest(ModuleCase):
super(CPModuleTest, self).run_function(*args, **kwargs)
)
@with_tempfile
@with_tempfile()
def test_get_file(self, tgt):
'''
cp.get_file
@ -76,7 +76,7 @@ class CPModuleTest(ModuleCase):
self.assertIn('KNIGHT: They\'re nervous, sire.', data)
self.assertNotIn('bacon', data)
@with_tempfile
@with_tempfile()
def test_get_file_templated_paths(self, tgt):
'''
cp.get_file
@ -94,7 +94,7 @@ class CPModuleTest(ModuleCase):
self.assertIn('Gromit', data)
self.assertNotIn('bacon', data)
@with_tempfile
@with_tempfile()
def test_get_file_gzipped(self, tgt):
'''
cp.get_file
@ -137,7 +137,7 @@ class CPModuleTest(ModuleCase):
self.assertIn('KNIGHT: They\'re nervous, sire.', data)
self.assertNotIn('bacon', data)
@with_tempfile
@with_tempfile()
def test_get_template(self, tgt):
'''
cp.get_template
@ -186,7 +186,7 @@ class CPModuleTest(ModuleCase):
# cp.get_url tests
@with_tempfile
@with_tempfile()
def test_get_url(self, tgt):
'''
cp.get_url with salt:// source given
@ -277,7 +277,7 @@ class CPModuleTest(ModuleCase):
self.assertIn('KNIGHT: They\'re nervous, sire.', data)
self.assertNotIn('bacon', data)
@with_tempfile
@with_tempfile()
def test_get_url_https(self, tgt):
'''
cp.get_url with https:// source given
@ -619,7 +619,7 @@ class CPModuleTest(ModuleCase):
self.assertEqual(
sha256_hash['hsum'], hashlib.sha256(data).hexdigest())
@with_tempfile
@with_tempfile()
def test_get_file_from_env_predefined(self, tgt):
'''
cp.get_file
@ -634,7 +634,7 @@ class CPModuleTest(ModuleCase):
finally:
os.unlink(tgt)
@with_tempfile
@with_tempfile()
def test_get_file_from_env_in_url(self, tgt):
tgt = os.path.join(paths.TMP, 'cheese')
try:

View file

@ -11,6 +11,7 @@ import time
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.helpers import with_tempdir
from tests.support.unit import skipIf
from tests.support.paths import TMP, TMP_PILLAR_TREE
from tests.support.mixins import SaltReturnAssertsMixin
@ -187,21 +188,24 @@ class StateModuleTest(ModuleCase, SaltReturnAssertsMixin):
ret = self.run_function('state.run_request')
self.assertEqual(ret, {})
def test_issue_1896_file_append_source(self):
@with_tempdir()
def test_issue_1896_file_append_source(self, base_dir):
'''
Verify that we can append a file's contents
'''
testfile = os.path.join(TMP, 'test.append')
if os.path.isfile(testfile):
os.unlink(testfile)
testfile = os.path.join(base_dir, 'test.append')
ret = self.run_function('state.sls', mods='testappend')
ret = self.run_state('file.touch', name=testfile)
self.assertSaltTrueReturn(ret)
ret = self.run_function('state.sls', mods='testappend.step-1')
ret = self.run_state(
'file.append',
name=testfile,
source='salt://testappend/firstif')
self.assertSaltTrueReturn(ret)
ret = self.run_function('state.sls', mods='testappend.step-2')
ret = self.run_state(
'file.append',
name=testfile,
source='salt://testappend/secondif')
self.assertSaltTrueReturn(ret)
with salt.utils.files.fopen(testfile, 'r') as fp_:
@ -224,14 +228,17 @@ class StateModuleTest(ModuleCase, SaltReturnAssertsMixin):
contents = os.linesep.join(new_contents)
contents += os.linesep
self.assertMultiLineEqual(
contents, testfile_contents)
self.assertMultiLineEqual(contents, testfile_contents)
# Re-append switching order
ret = self.run_function('state.sls', mods='testappend.step-2')
ret = self.run_state(
'file.append',
name=testfile,
source='salt://testappend/secondif')
self.assertSaltTrueReturn(ret)
ret = self.run_function('state.sls', mods='testappend.step-1')
ret = self.run_state(
'file.append',
name=testfile,
source='salt://testappend/firstif')
self.assertSaltTrueReturn(ret)
with salt.utils.files.fopen(testfile, 'r') as fp_:

File diff suppressed because it is too large Load diff

View file

@ -5,18 +5,15 @@ Tests for the Git state
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import errno
import functools
import inspect
import os
import shutil
import socket
import string
import tempfile
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.paths import TMP
from tests.support.helpers import with_tempdir
from tests.support.mixins import SaltReturnAssertsMixin
# Import salt libs
@ -98,173 +95,150 @@ class GitTest(ModuleCase, SaltReturnAssertsMixin):
# Reset the dns timeout after the test is over
socket.setdefaulttimeout(None)
def test_latest(self):
@with_tempdir(create=False)
def test_latest(self, target):
'''
git.latest
'''
name = os.path.join(TMP, 'salt_repo')
try:
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=name
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=target
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
def test_latest_with_rev_and_submodules(self):
@with_tempdir(create=False)
def test_latest_with_rev_and_submodules(self, target):
'''
git.latest
'''
name = os.path.join(TMP, 'salt_repo')
try:
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='develop',
target=name,
submodules=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='develop',
target=target,
submodules=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
def test_latest_failure(self):
@with_tempdir(create=False)
def test_latest_failure(self, target):
'''
git.latest
'''
name = os.path.join(TMP, 'salt_repo')
try:
ret = self.run_state(
'git.latest',
name='https://youSpelledGitHubWrong.com/saltstack/salt-test-repo.git',
rev='develop',
target=name,
submodules=True
)
self.assertSaltFalseReturn(ret)
self.assertFalse(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://youSpelledGitHubWrong.com/saltstack/salt-test-repo.git',
rev='develop',
target=target,
submodules=True
)
self.assertSaltFalseReturn(ret)
self.assertFalse(os.path.isdir(os.path.join(target, '.git')))
def test_latest_empty_dir(self):
@with_tempdir()
def test_latest_empty_dir(self, target):
'''
git.latest
'''
name = os.path.join(TMP, 'salt_repo')
if not os.path.isdir(name):
os.mkdir(name)
try:
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='develop',
target=name,
submodules=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='develop',
target=target,
submodules=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
def test_latest_unless_no_cwd_issue_6800(self):
@with_tempdir(create=False)
def test_latest_unless_no_cwd_issue_6800(self, target):
'''
cwd=target was being passed to _run_check which blew up if
target dir did not already exist.
'''
name = os.path.join(TMP, 'salt_repo')
if os.path.isdir(name):
shutil.rmtree(name)
try:
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='develop',
target=name,
unless='test -e {0}'.format(name),
submodules=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='develop',
target=target,
unless='test -e {0}'.format(target),
submodules=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
def test_numeric_rev(self):
@with_tempdir(create=False)
def test_numeric_rev(self, target):
'''
git.latest with numeric revision
'''
name = os.path.join(TMP, 'salt_repo')
try:
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev=0.11,
target=name,
submodules=True,
timeout=120
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev=0.11,
target=target,
submodules=True,
timeout=120
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
def test_latest_with_local_changes(self):
@with_tempdir(create=False)
def test_latest_with_local_changes(self, target):
'''
Ensure that we fail the state when there are local changes and succeed
when force_reset is True.
'''
name = os.path.join(TMP, 'salt_repo')
try:
# Clone repo
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=name
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
# Clone repo
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=target
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
# Make change to LICENSE file.
with salt.utils.files.fopen(os.path.join(name, 'LICENSE'), 'a') as fp_:
fp_.write('Lorem ipsum dolor blah blah blah....\n')
# Make change to LICENSE file.
with salt.utils.files.fopen(os.path.join(target, 'LICENSE'), 'a') as fp_:
fp_.write('Lorem ipsum dolor blah blah blah....\n')
# Make sure that we now have uncommitted changes
self.assertTrue(self.run_function('git.diff', [name, 'HEAD']))
# Make sure that we now have uncommitted changes
self.assertTrue(self.run_function('git.diff', [target, 'HEAD']))
# Re-run state with force_reset=False
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=name,
force_reset=False
)
self.assertSaltTrueReturn(ret)
self.assertEqual(
ret[next(iter(ret))]['comment'],
('Repository {0} is up-to-date, but with local changes. Set '
'\'force_reset\' to True to purge local changes.'.format(name))
)
# Re-run state with force_reset=False
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=target,
force_reset=False
)
self.assertSaltTrueReturn(ret)
self.assertEqual(
ret[next(iter(ret))]['comment'],
('Repository {0} is up-to-date, but with local changes. Set '
'\'force_reset\' to True to purge local changes.'.format(target))
)
# Now run the state with force_reset=True
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=name,
force_reset=True
)
self.assertSaltTrueReturn(ret)
# Now run the state with force_reset=True
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
target=target,
force_reset=True
)
self.assertSaltTrueReturn(ret)
# Make sure that we no longer have uncommitted changes
self.assertFalse(self.run_function('git.diff', [name, 'HEAD']))
finally:
shutil.rmtree(name, ignore_errors=True)
# Make sure that we no longer have uncommitted changes
self.assertFalse(self.run_function('git.diff', [target, 'HEAD']))
@uses_git_opts
def test_latest_fast_forward(self):
@with_tempdir(create=False)
@with_tempdir(create=False)
@with_tempdir(create=False)
def test_latest_fast_forward(self, mirror_dir, admin_dir, clone_dir):
'''
Test running git.latest state a second time after changes have been
made to the remote repo.
@ -273,98 +247,87 @@ class GitTest(ModuleCase, SaltReturnAssertsMixin):
return self.run_function('git.rev_parse', [cwd, 'HEAD'])
repo_url = 'https://{0}/saltstack/salt-test-repo.git'.format(self.__domain)
mirror_dir = os.path.join(TMP, 'salt_repo_mirror')
mirror_url = 'file://' + mirror_dir
admin_dir = os.path.join(TMP, 'salt_repo_admin')
clone_dir = os.path.join(TMP, 'salt_repo')
try:
# Mirror the repo
self.run_function(
'git.clone', [mirror_dir], url=repo_url, opts='--mirror')
# Mirror the repo
self.run_function(
'git.clone', [mirror_dir], url=repo_url, opts='--mirror')
# Make sure the directory for the mirror now exists
self.assertTrue(os.path.exists(mirror_dir))
# Make sure the directory for the mirror now exists
self.assertTrue(os.path.exists(mirror_dir))
# Clone the mirror twice, once to the admin location and once to
# the clone_dir
ret = self.run_state('git.latest', name=mirror_url, target=admin_dir)
self.assertSaltTrueReturn(ret)
ret = self.run_state('git.latest', name=mirror_url, target=clone_dir)
self.assertSaltTrueReturn(ret)
# Clone the mirror twice, once to the admin location and once to
# the clone_dir
ret = self.run_state('git.latest', name=mirror_url, target=admin_dir)
self.assertSaltTrueReturn(ret)
ret = self.run_state('git.latest', name=mirror_url, target=clone_dir)
self.assertSaltTrueReturn(ret)
# Make a change to the repo by editing the file in the admin copy
# of the repo and committing.
head_pre = _head(admin_dir)
with salt.utils.files.fopen(os.path.join(admin_dir, 'LICENSE'), 'a') as fp_:
fp_.write('Hello world!')
self.run_function(
'git.commit', [admin_dir, 'added a line'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
opts='-a',
)
# Make sure HEAD is pointing to a new SHA so we know we properly
# committed our change.
head_post = _head(admin_dir)
self.assertNotEqual(head_pre, head_post)
# Make a change to the repo by editing the file in the admin copy
# of the repo and committing.
head_pre = _head(admin_dir)
with salt.utils.files.fopen(os.path.join(admin_dir, 'LICENSE'), 'a') as fp_:
fp_.write('Hello world!')
self.run_function(
'git.commit', [admin_dir, 'added a line'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
opts='-a',
)
# Make sure HEAD is pointing to a new SHA so we know we properly
# committed our change.
head_post = _head(admin_dir)
self.assertNotEqual(head_pre, head_post)
# Push the change to the mirror
# NOTE: the test will fail if the salt-test-repo's default branch
# is changed.
self.run_function('git.push', [admin_dir, 'origin', 'develop'])
# Push the change to the mirror
# NOTE: the test will fail if the salt-test-repo's default branch
# is changed.
self.run_function('git.push', [admin_dir, 'origin', 'develop'])
# Re-run the git.latest state on the clone_dir
ret = self.run_state('git.latest', name=mirror_url, target=clone_dir)
self.assertSaltTrueReturn(ret)
# Re-run the git.latest state on the clone_dir
ret = self.run_state('git.latest', name=mirror_url, target=clone_dir)
self.assertSaltTrueReturn(ret)
# Make sure that the clone_dir now has the correct SHA
self.assertEqual(head_post, _head(clone_dir))
# Make sure that the clone_dir now has the correct SHA
self.assertEqual(head_post, _head(clone_dir))
finally:
for path in (mirror_dir, admin_dir, clone_dir):
shutil.rmtree(path, ignore_errors=True)
def _changed_local_branch_helper(self, rev, hint):
@with_tempdir(create=False)
def _changed_local_branch_helper(self, target, rev, hint):
'''
We're testing two almost identical cases, the only thing that differs
is the rev used for the git.latest state.
'''
name = os.path.join(TMP, 'salt_repo')
try:
# Clone repo
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev=rev,
target=name
)
self.assertSaltTrueReturn(ret)
# Clone repo
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev=rev,
target=target
)
self.assertSaltTrueReturn(ret)
# Check out a new branch in the clone and make a commit, to ensure
# that when we re-run the state, it is not a fast-forward change
self.run_function('git.checkout', [name, 'new_branch'], opts='-b')
with salt.utils.files.fopen(os.path.join(name, 'foo'), 'w'):
pass
self.run_function('git.add', [name, '.'])
self.run_function(
'git.commit', [name, 'add file'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
)
# Check out a new branch in the clone and make a commit, to ensure
# that when we re-run the state, it is not a fast-forward change
self.run_function('git.checkout', [target, 'new_branch'], opts='-b')
with salt.utils.files.fopen(os.path.join(target, 'foo'), 'w'):
pass
self.run_function('git.add', [target, '.'])
self.run_function(
'git.commit', [target, 'add file'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
)
# Re-run the state, this should fail with a specific hint in the
# comment field.
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev=rev,
target=name
)
self.assertSaltFalseReturn(ret)
# Re-run the state, this should fail with a specific hint in the
# comment field.
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev=rev,
target=target
)
self.assertSaltFalseReturn(ret)
comment = ret[next(iter(ret))]['comment']
self.assertTrue(hint in comment)
finally:
shutil.rmtree(name, ignore_errors=True)
comment = ret[next(iter(ret))]['comment']
self.assertTrue(hint in comment)
@uses_git_opts
def test_latest_changed_local_branch_rev_head(self):
@ -375,7 +338,7 @@ class GitTest(ModuleCase, SaltReturnAssertsMixin):
This test will fail if the default branch for the salt-test-repo is
ever changed.
'''
self._changed_local_branch_helper(
self._changed_local_branch_helper( # pylint: disable=no-value-for-parameter
'HEAD',
'The default remote branch (develop) differs from the local '
'branch (new_branch)'
@ -387,162 +350,136 @@ class GitTest(ModuleCase, SaltReturnAssertsMixin):
Test for presence of hint in failure message when the local branch has
been changed and a non-HEAD rev is specified
'''
self._changed_local_branch_helper(
self._changed_local_branch_helper( # pylint: disable=no-value-for-parameter
'develop',
'The desired rev (develop) differs from the name of the local '
'branch (new_branch)'
)
@uses_git_opts
def test_latest_updated_remote_rev(self):
@with_tempdir(create=False)
@with_tempdir()
def test_latest_updated_remote_rev(self, name, target):
'''
Ensure that we don't exit early when checking for a fast-forward
'''
name = tempfile.mkdtemp(dir=TMP)
target = os.path.join(TMP, 'test_latest_updated_remote_rev')
# Initialize a new git repository
self.run_function('git.init', [name])
try:
# Add and commit a file
with salt.utils.files.fopen(os.path.join(name, 'foo.txt'), 'w') as fp_:
fp_.write('Hello world\n')
self.run_function('git.add', [name, '.'])
self.run_function(
'git.commit', [name, 'initial commit'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
)
# Add and commit a file
with salt.utils.files.fopen(os.path.join(name, 'foo.txt'), 'w') as fp_:
fp_.write('Hello world\n')
self.run_function('git.add', [name, '.'])
self.run_function(
'git.commit', [name, 'initial commit'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
)
# Run the state to clone the repo we just created
ret = self.run_state(
'git.latest',
name=name,
target=target,
)
self.assertSaltTrueReturn(ret)
# Run the state to clone the repo we just created
ret = self.run_state(
'git.latest',
name=name,
target=target,
)
self.assertSaltTrueReturn(ret)
# Add another commit
with salt.utils.files.fopen(os.path.join(name, 'foo.txt'), 'w') as fp_:
fp_.write('Added a line\n')
self.run_function(
'git.commit', [name, 'added a line'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
opts='-a',
)
# Add another commit
with salt.utils.files.fopen(os.path.join(name, 'foo.txt'), 'w') as fp_:
fp_.write('Added a line\n')
self.run_function(
'git.commit', [name, 'added a line'],
git_opts='-c user.name="Foo Bar" -c user.email=foo@bar.com',
opts='-a',
)
# Run the state again. It should pass, if it doesn't then there was
# a problem checking whether or not the change is a fast-forward.
ret = self.run_state(
'git.latest',
name=name,
target=target,
)
self.assertSaltTrueReturn(ret)
finally:
for path in (name, target):
try:
shutil.rmtree(path)
except OSError as exc:
if exc.errno != errno.ENOENT:
raise exc
# Run the state again. It should pass, if it doesn't then there was
# a problem checking whether or not the change is a fast-forward.
ret = self.run_state(
'git.latest',
name=name,
target=target,
)
self.assertSaltTrueReturn(ret)
def test_latest_depth(self):
@with_tempdir(create=False)
def test_latest_depth(self, target):
'''
Test running git.latest state using the "depth" argument to limit the
history. See #45394.
'''
name = os.path.join(TMP, 'salt_repo')
try:
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='HEAD',
target=name,
depth=1
)
# HEAD is not a branch, this should fail
self.assertSaltFalseReturn(ret)
self.assertIn(
'must be set to the name of a branch',
ret[next(iter(ret))]['comment']
)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='HEAD',
target=target,
depth=1
)
# HEAD is not a branch, this should fail
self.assertSaltFalseReturn(ret)
self.assertIn(
'must be set to the name of a branch',
ret[next(iter(ret))]['comment']
)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='non-default-branch',
target=name,
depth=1
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(name, '.git')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.latest',
name='https://{0}/saltstack/salt-test-repo.git'.format(self.__domain),
rev='non-default-branch',
target=target,
depth=1
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isdir(os.path.join(target, '.git')))
def test_present(self):
@with_tempdir(create=False)
def test_present(self, name):
'''
git.present
'''
name = os.path.join(TMP, 'salt_repo')
try:
ret = self.run_state(
'git.present',
name=name,
bare=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isfile(os.path.join(name, 'HEAD')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.present',
name=name,
bare=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isfile(os.path.join(name, 'HEAD')))
def test_present_failure(self):
@with_tempdir()
def test_present_failure(self, name):
'''
git.present
'''
name = os.path.join(TMP, 'salt_repo')
if not os.path.isdir(name):
os.mkdir(name)
try:
fname = os.path.join(name, 'stoptheprocess')
fname = os.path.join(name, 'stoptheprocess')
with salt.utils.files.fopen(fname, 'a'):
pass
with salt.utils.files.fopen(fname, 'a'):
pass
ret = self.run_state(
'git.present',
name=name,
bare=True
)
self.assertSaltFalseReturn(ret)
self.assertFalse(os.path.isfile(os.path.join(name, 'HEAD')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.present',
name=name,
bare=True
)
self.assertSaltFalseReturn(ret)
self.assertFalse(os.path.isfile(os.path.join(name, 'HEAD')))
def test_present_empty_dir(self):
@with_tempdir()
def test_present_empty_dir(self, name):
'''
git.present
'''
name = os.path.join(TMP, 'salt_repo')
if not os.path.isdir(name):
os.mkdir(name)
try:
ret = self.run_state(
'git.present',
name=name,
bare=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isfile(os.path.join(name, 'HEAD')))
finally:
shutil.rmtree(name, ignore_errors=True)
ret = self.run_state(
'git.present',
name=name,
bare=True
)
self.assertSaltTrueReturn(ret)
self.assertTrue(os.path.isfile(os.path.join(name, 'HEAD')))
def test_config_set_value_with_space_character(self):
@with_tempdir()
def test_config_set_value_with_space_character(self, name):
'''
git.config
'''
name = tempfile.mkdtemp(dir=TMP)
self.addCleanup(shutil.rmtree, name, ignore_errors=True)
self.run_function('git.init', [name])
ret = self.run_state(
@ -560,17 +497,14 @@ class LocalRepoGitTest(ModuleCase, SaltReturnAssertsMixin):
'''
Tests which do no require connectivity to github.com
'''
def test_renamed_default_branch(self):
@with_tempdir()
@with_tempdir()
@with_tempdir()
def test_renamed_default_branch(self, repo, admin, target):
'''
Test the case where the remote branch has been removed
https://github.com/saltstack/salt/issues/36242
'''
repo = tempfile.mkdtemp(dir=TMP)
admin = tempfile.mkdtemp(dir=TMP)
name = tempfile.mkdtemp(dir=TMP)
for dirname in (repo, admin, name):
self.addCleanup(shutil.rmtree, dirname, ignore_errors=True)
# Create bare repo
self.run_function('git.init', [repo], bare=True)
# Clone bare repo
@ -596,7 +530,7 @@ class LocalRepoGitTest(ModuleCase, SaltReturnAssertsMixin):
ret = self.run_state(
'git.latest',
name=repo,
target=name,
target=target,
rev='develop',
)
self.assertSaltFalseReturn(ret)
@ -610,11 +544,11 @@ class LocalRepoGitTest(ModuleCase, SaltReturnAssertsMixin):
'(which will ensure that the named branch is created '
'if it does not already exist).\n\n'
'Changes already made: {0} cloned to {1}'
.format(repo, name)
.format(repo, target)
)
self.assertEqual(
ret[next(iter(ret))]['changes'],
{'new': '{0} => {1}'.format(repo, name)}
{'new': '{0} => {1}'.format(repo, target)}
)
# Run git.latest state again. This should fail again, with a different
@ -622,7 +556,7 @@ class LocalRepoGitTest(ModuleCase, SaltReturnAssertsMixin):
ret = self.run_state(
'git.latest',
name=repo,
target=name,
target=target,
rev='develop',
)
self.assertSaltFalseReturn(ret)
@ -644,7 +578,7 @@ class LocalRepoGitTest(ModuleCase, SaltReturnAssertsMixin):
ret = self.run_state(
'git.latest',
name=repo,
target=name,
target=target,
rev='develop',
branch='develop',
)

View file

@ -12,7 +12,7 @@
# pylint: disable=repr-flag-used-in-string,wrong-import-order
# Import Python libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import base64
import errno
import functools
@ -20,6 +20,7 @@ import inspect
import logging
import os
import random
import shutil
import signal
import socket
import string
@ -53,6 +54,9 @@ from tests.support.unit import skip, _id
from tests.support.mock import patch
from tests.support.paths import FILES, TMP
# Import Salt libs
import salt.utils.files
log = logging.getLogger(__name__)
@ -955,22 +959,61 @@ def with_system_user_and_group(username, group,
return decorator
def with_tempfile(func):
'''
Generates a tempfile and cleans it up when test completes.
'''
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
fd_, name = tempfile.mkstemp(prefix='__salt.test.', dir=TMP)
os.close(fd_)
del fd_
ret = func(self, name, *args, **kwargs)
try:
class WithTempfile(object):
def __init__(self, **kwargs):
self.create = kwargs.pop('create', True)
if 'dir' not in kwargs:
kwargs['dir'] = TMP
if 'prefix' not in kwargs:
kwargs['prefix'] = '__salt.test.'
self.kwargs = kwargs
def __call__(self, func):
self.func = func
return functools.wraps(func)(
lambda testcase, *args, **kwargs: self.wrap(testcase, *args, **kwargs) # pylint: disable=W0108
)
def wrap(self, testcase, *args, **kwargs):
name = salt.utils.files.mkstemp(**self.kwargs)
if not self.create:
os.remove(name)
except Exception:
pass
return ret
return wrapper
try:
return self.func(testcase, name, *args, **kwargs)
finally:
try:
os.remove(name)
except OSError:
pass
with_tempfile = WithTempfile
class WithTempdir(object):
def __init__(self, **kwargs):
self.create = kwargs.pop('create', True)
if 'dir' not in kwargs:
kwargs['dir'] = TMP
self.kwargs = kwargs
def __call__(self, func):
self.func = func
return functools.wraps(func)(
lambda testcase, *args, **kwargs: self.wrap(testcase, *args, **kwargs) # pylint: disable=W0108
)
def wrap(self, testcase, *args, **kwargs):
tempdir = tempfile.mkdtemp(**self.kwargs)
if not self.create:
os.rmdir(tempdir)
try:
return self.func(testcase, tempdir, *args, **kwargs)
finally:
shutil.rmtree(tempdir, ignore_errors=True)
with_tempdir = WithTempdir
def requires_system_grains(func):

File diff suppressed because it is too large Load diff

View file

@ -7,9 +7,6 @@ from __future__ import absolute_import, unicode_literals, print_function
# Import Python libs
import shutil
import tempfile
import os
import logging
try:
# We're not going to actually use OpenSSL, we just want to check that
@ -20,8 +17,8 @@ except Exception:
NO_PYOPENSSL = True
# Import Salt Testing Libs
from tests.support.helpers import with_tempdir
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.paths import TMP
from tests.support.unit import TestCase, skipIf
from tests.support.mock import (
mock_open,
@ -309,367 +306,327 @@ class TLSAddTestCase(TestCase, LoaderModuleMockMixin):
remove_not_in_result(ret, result)
self.assertEqual(result, ret)
def test_create_ca(self):
@with_tempdir()
def test_create_ca(self, ca_path):
'''
Test creating CA cert
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/{2}_ca_cert.crt'.format(
ca_path,
ca_name,
ca_name)
certk = '{0}/{1}/{2}_ca_cert.key'.format(
ca_path,
ca_name,
ca_name)
ret = 'Created Private Key: "{0}." Created CA "{1}": "{2}."'.format(
certk, ca_name, certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
with patch.dict(tls.__salt__, {'config.option': mock_opt, 'cmd.retcode': mock_ret}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256', 'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/{2}_ca_cert.crt'.format(
ca_path,
ca_name,
ca_name)
certk = '{0}/{1}/{2}_ca_cert.key'.format(
ca_path,
ca_name,
ca_name)
ret = 'Created Private Key: "{0}." Created CA "{1}": "{2}."'.format(
certk, ca_name, certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
with patch.dict(tls.__salt__, {'config.option': mock_opt, 'cmd.retcode': mock_ret}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256', 'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
def test_recreate_ca(self):
@with_tempdir()
def test_recreate_ca(self, ca_path):
'''
Test creating CA cert when one already exists
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/{2}_ca_cert.crt'.format(
ca_path,
ca_name,
ca_name)
certk = '{0}/{1}/{2}_ca_cert.key'.format(
ca_path,
ca_name,
ca_name)
ret = 'Created Private Key: "{0}." Created CA "{1}": "{2}."'.format(
certk, ca_name, certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
with patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)), \
patch.dict(tls.__salt__, {'config.option': mock_opt, 'cmd.retcode': mock_ret}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256', 'cachedir': ca_path}), \
patch.dict(_TLS_TEST_DATA['create_ca'], {'replace': True}):
tls.create_ca(ca_name)
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/{2}_ca_cert.crt'.format(
ca_path,
ca_name,
ca_name)
certk = '{0}/{1}/{2}_ca_cert.key'.format(
ca_path,
ca_name,
ca_name)
ret = 'Created Private Key: "{0}." Created CA "{1}": "{2}."'.format(
certk, ca_name, certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
with patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)), \
patch.dict(tls.__salt__, {'config.option': mock_opt, 'cmd.retcode': mock_ret}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256', 'cachedir': ca_path}), \
patch.dict(_TLS_TEST_DATA['create_ca'], {'replace': True}):
tls.create_ca(ca_name)
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
def test_create_csr(self):
@with_tempdir()
def test_create_csr(self, ca_path):
'''
Test creating certificate signing request
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt, 'cmd.retcode': mock_ret, 'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256', 'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt, 'cmd.retcode': mock_ret, 'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256', 'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
def test_recreate_csr(self):
@with_tempdir()
def test_recreate_csr(self, ca_path):
'''
Test creating certificate signing request when one already exists
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch.dict(_TLS_TEST_DATA['create_ca'], {'replace': True}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch.dict(_TLS_TEST_DATA['create_ca'], {'replace': True}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
def test_create_self_signed_cert(self):
@with_tempdir()
def test_create_self_signed_cert(self, ca_path):
'''
Test creating self signed certificate
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
tls_dir = 'test_tls'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created Certificate: "{1}."').format(
certk, certp)
mock_opt = MagicMock(return_value=ca_path)
with patch.dict(tls.__salt__, {'config.option': mock_opt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
self.assertEqual(
tls.create_self_signed_cert(
tls_dir=tls_dir,
days=365,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
tls_dir = 'test_tls'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created Certificate: "{1}."').format(
certk, certp)
mock_opt = MagicMock(return_value=ca_path)
with patch.dict(tls.__salt__, {'config.option': mock_opt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
self.assertEqual(
tls.create_self_signed_cert(
tls_dir=tls_dir,
days=365,
**_TLS_TEST_DATA['create_ca']),
ret)
def test_recreate_self_signed_cert(self):
@with_tempdir()
def test_recreate_self_signed_cert(self, ca_path):
'''
Test creating self signed certificate when one already exists
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
tls_dir = 'test_tls'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created Certificate: "{1}."').format(
certk, certp)
mock_opt = MagicMock(return_value=ca_path)
with patch.dict(tls.__salt__, {'config.option': mock_opt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
self.assertEqual(
tls.create_self_signed_cert(
tls_dir=tls_dir,
days=365,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
tls_dir = 'test_tls'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
tls_dir,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created Certificate: "{1}."').format(
certk, certp)
mock_opt = MagicMock(return_value=ca_path)
with patch.dict(tls.__salt__, {'config.option': mock_opt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
self.assertEqual(
tls.create_self_signed_cert(
tls_dir=tls_dir,
days=365,
**_TLS_TEST_DATA['create_ca']),
ret)
def test_create_ca_signed_cert(self):
@with_tempdir()
def test_create_ca_signed_cert(self, ca_path):
'''
Test signing certificate from request
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name, **_TLS_TEST_DATA['create_ca'])
self.assertEqual(
tls.create_ca_signed_cert(
ca_name,
_TLS_TEST_DATA['create_ca']['CN']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name, **_TLS_TEST_DATA['create_ca'])
self.assertEqual(
tls.create_ca_signed_cert(
ca_name,
_TLS_TEST_DATA['create_ca']['CN']),
ret)
def test_recreate_ca_signed_cert(self):
@with_tempdir()
def test_recreate_ca_signed_cert(self, ca_path):
'''
Test signing certificate from request when certificate exists
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
tls.create_ca_signed_cert(ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
self.assertEqual(
tls.create_ca_signed_cert(
ca_name,
_TLS_TEST_DATA['create_ca']['CN'],
replace=True),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.crt'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
tls.create_ca_signed_cert(ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
self.assertEqual(
tls.create_ca_signed_cert(
ca_name,
_TLS_TEST_DATA['create_ca']['CN'],
replace=True),
ret)
def test_create_pkcs12(self):
@with_tempdir()
def test_create_pkcs12(self, ca_path):
'''
Test creating pkcs12
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.p12'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created PKCS#12 Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name, **_TLS_TEST_DATA['create_ca'])
tls.create_ca_signed_cert(ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
self.assertEqual(
tls.create_pkcs12(ca_name,
_TLS_TEST_DATA['create_ca']['CN'],
'password'),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.p12'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created PKCS#12 Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name, **_TLS_TEST_DATA['create_ca'])
tls.create_ca_signed_cert(ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
self.assertEqual(
tls.create_pkcs12(ca_name,
_TLS_TEST_DATA['create_ca']['CN'],
'password'),
ret)
def test_recreate_pkcs12(self):
@with_tempdir()
def test_recreate_pkcs12(self, ca_path):
'''
Test creating pkcs12 when it already exists
'''
ca_path = tempfile.mkdtemp(dir=TMP)
try:
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.p12'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created PKCS#12 Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch.dict(_TLS_TEST_DATA['create_ca'], {'replace': True}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
tls.create_ca_signed_cert(ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ca_name = 'test_ca'
certp = '{0}/{1}/certs/{2}.p12'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = 'Created PKCS#12 Certificate for "{0}": "{1}"'.format(
_TLS_TEST_DATA['create_ca']['CN'], certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
mock_pgt = MagicMock(return_value=False)
with patch.dict(tls.__salt__, {'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}), \
patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}), \
patch.dict(_TLS_TEST_DATA['create_ca'], {'replace': True}), \
patch('salt.modules.tls.maybe_fix_ssl_version',
MagicMock(return_value=True)):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
tls.create_ca_signed_cert(ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
tls.create_pkcs12(ca_name,
_TLS_TEST_DATA['create_ca']['CN'],
'password')
self.assertEqual(
tls.create_pkcs12(ca_name,
_TLS_TEST_DATA['create_ca']['CN'],
'password')
self.assertEqual(
tls.create_pkcs12(ca_name,
_TLS_TEST_DATA[
'create_ca']['CN'],
'password',
replace=True),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
_TLS_TEST_DATA[
'create_ca']['CN'],
'password',
replace=True),
ret)
def test_pyOpenSSL_version(self):
'''
@ -701,13 +658,13 @@ class TLSAddTestCase(TestCase, LoaderModuleMockMixin):
self.assertEqual(tls.get_extensions('server'), pillarval)
self.assertEqual(tls.get_extensions('client'), pillarval)
def test_pyOpenSSL_version_destructive(self):
@with_tempdir()
def test_pyOpenSSL_version_destructive(self, ca_path):
'''
Test extension logic with different pyOpenSSL versions
'''
pillarval = {'csr': {'extendedKeyUsage': 'serverAuth'}}
mock_pgt = MagicMock(return_value=pillarval)
ca_path = tempfile.mkdtemp(dir=TMP)
ca_name = 'test_ca'
certp = '{0}/{1}/{2}_ca_cert.crt'.format(
ca_path,
@ -721,105 +678,97 @@ class TLSAddTestCase(TestCase, LoaderModuleMockMixin):
certk, ca_name, certp)
mock_opt = MagicMock(return_value=ca_path)
mock_ret = MagicMock(return_value=0)
try:
with patch.dict(tls.__salt__, {
'config.option': mock_opt,
'cmd.retcode': mock_ret}):
with patch.dict(tls.__opts__, {
'hash_type': 'sha256',
'cachedir': ca_path}):
with patch.dict(_TLS_TEST_DATA['create_ca'],
{'replace': True}):
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.1.1'),
'X509_EXT_ENABLED': False}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.14.1'),
'X509_EXT_ENABLED': True}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.15.1'),
'X509_EXT_ENABLED': True}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
with patch.dict(tls.__salt__, {
'config.option': mock_opt,
'cmd.retcode': mock_ret}):
with patch.dict(tls.__opts__, {
'hash_type': 'sha256',
'cachedir': ca_path}):
with patch.dict(_TLS_TEST_DATA['create_ca'],
{'replace': True}):
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.1.1'),
'X509_EXT_ENABLED': False}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.14.1'),
'X509_EXT_ENABLED': True}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.15.1'),
'X509_EXT_ENABLED': True}):
self.assertEqual(
tls.create_ca(
ca_name,
days=365,
fixmode=False,
**_TLS_TEST_DATA['create_ca']),
ret)
try:
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
with patch.dict(tls.__salt__, {
'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}):
with patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}):
with patch.dict(_TLS_TEST_DATA['create_ca'], {
'subjectAltName': 'DNS:foo.bar',
'replace': True}):
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.1.1'),
'X509_EXT_ENABLED': False}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertRaises(ValueError,
tls.create_csr,
ca_name,
**_TLS_TEST_DATA['create_ca'])
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.14.1'),
'X509_EXT_ENABLED': True}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.15.1'),
'X509_EXT_ENABLED': True}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
finally:
if os.path.isdir(ca_path):
shutil.rmtree(ca_path)
certp = '{0}/{1}/certs/{2}.csr'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
certk = '{0}/{1}/certs/{2}.key'.format(
ca_path,
ca_name,
_TLS_TEST_DATA['create_ca']['CN'])
ret = ('Created Private Key: "{0}." '
'Created CSR for "{1}": "{2}."').format(
certk, _TLS_TEST_DATA['create_ca']['CN'], certp)
with patch.dict(tls.__salt__, {
'config.option': mock_opt,
'cmd.retcode': mock_ret,
'pillar.get': mock_pgt}):
with patch.dict(tls.__opts__, {'hash_type': 'sha256',
'cachedir': ca_path}):
with patch.dict(_TLS_TEST_DATA['create_ca'], {
'subjectAltName': 'DNS:foo.bar',
'replace': True}):
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.1.1'),
'X509_EXT_ENABLED': False}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertRaises(ValueError,
tls.create_csr,
ca_name,
**_TLS_TEST_DATA['create_ca'])
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.14.1'),
'X509_EXT_ENABLED': True}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)
with patch.dict(tls.__dict__, {
'OpenSSL_version':
LooseVersion('0.15.1'),
'X509_EXT_ENABLED': True}):
tls.create_ca(ca_name)
tls.create_csr(ca_name)
self.assertEqual(
tls.create_csr(
ca_name,
**_TLS_TEST_DATA['create_ca']),
ret)

View file

@ -10,6 +10,7 @@ import textwrap
import copy
# Import Salt Testing libs
from tests.support.helpers import with_tempdir
from tests.support.unit import TestCase
from tests.support.paths import TMP
@ -34,6 +35,7 @@ class CommonTestCaseBoilerplate(TestCase):
def setUp(self):
self.root_dir = tempfile.mkdtemp(dir=TMP)
self.addCleanup(shutil.rmtree, self.root_dir, ignore_errors=True)
self.state_tree_dir = os.path.join(self.root_dir, 'state_tree')
self.cache_dir = os.path.join(self.root_dir, 'cachedir')
if not os.path.isdir(self.root_dir):
@ -291,167 +293,130 @@ class PyDSLRendererTestCase(CommonTestCaseBoilerplate):
self.assertEqual(result['C']['cmd'][1]['require'][0]['cmd'], 'A')
self.assertEqual(result['B']['file'][1]['require'][0]['cmd'], 'C')
def test_pipe_through_stateconf(self):
dirpath = tempfile.mkdtemp(dir=TMP)
if not os.path.isdir(dirpath):
self.skipTest(
'The temporary directory \'{0}\' was not created'.format(
dirpath
)
)
@with_tempdir()
def test_pipe_through_stateconf(self, dirpath):
output = os.path.join(dirpath, 'output')
try:
write_to(os.path.join(dirpath, 'xxx.sls'), textwrap.dedent(
'''#!stateconf -os yaml . jinja
.X:
cmd.run:
- name: echo X >> {0}
- cwd: /
.Y:
cmd.run:
- name: echo Y >> {0}
- cwd: /
.Z:
cmd.run:
- name: echo Z >> {0}
- cwd: /
'''.format(output.replace('\\', '/'))))
write_to(os.path.join(dirpath, 'yyy.sls'), textwrap.dedent('''\
#!pydsl|stateconf -ps
write_to(os.path.join(dirpath, 'xxx.sls'), textwrap.dedent(
'''#!stateconf -os yaml . jinja
.X:
cmd.run:
- name: echo X >> {0}
- cwd: /
.Y:
cmd.run:
- name: echo Y >> {0}
- cwd: /
.Z:
cmd.run:
- name: echo Z >> {0}
- cwd: /
'''.format(output.replace('\\', '/'))))
write_to(os.path.join(dirpath, 'yyy.sls'), textwrap.dedent('''\
#!pydsl|stateconf -ps
__pydsl__.set(ordered=True)
state('.D').cmd.run('echo D >> {0}', cwd='/')
state('.E').cmd.run('echo E >> {0}', cwd='/')
state('.F').cmd.run('echo F >> {0}', cwd='/')
'''.format(output.replace('\\', '/'))))
__pydsl__.set(ordered=True)
state('.D').cmd.run('echo D >> {0}', cwd='/')
state('.E').cmd.run('echo E >> {0}', cwd='/')
state('.F').cmd.run('echo F >> {0}', cwd='/')
'''.format(output.replace('\\', '/'))))
write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
#!pydsl|stateconf -ps
write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
#!pydsl|stateconf -ps
include('xxx', 'yyy')
include('xxx', 'yyy')
# make all states in xxx run BEFORE states in this sls.
extend(state('.start').stateconf.require(stateconf='xxx::goal'))
# make all states in xxx run BEFORE states in this sls.
extend(state('.start').stateconf.require(stateconf='xxx::goal'))
# make all states in yyy run AFTER this sls.
extend(state('.goal').stateconf.require_in(stateconf='yyy::start'))
# make all states in yyy run AFTER this sls.
extend(state('.goal').stateconf.require_in(stateconf='yyy::start'))
__pydsl__.set(ordered=True)
__pydsl__.set(ordered=True)
state('.A').cmd.run('echo A >> {0}', cwd='/')
state('.B').cmd.run('echo B >> {0}', cwd='/')
state('.C').cmd.run('echo C >> {0}', cwd='/')
'''.format(output.replace('\\', '/'))))
state('.A').cmd.run('echo A >> {0}', cwd='/')
state('.B').cmd.run('echo B >> {0}', cwd='/')
state('.C').cmd.run('echo C >> {0}', cwd='/')
'''.format(output.replace('\\', '/'))))
self.state_highstate({'base': ['aaa']}, dirpath)
with salt.utils.files.fopen(output, 'r') as f:
self.assertEqual(''.join(f.read().split()), "XYZABCDEF")
self.state_highstate({'base': ['aaa']}, dirpath)
with salt.utils.files.fopen(output, 'r') as f:
self.assertEqual(''.join(f.read().split()), "XYZABCDEF")
finally:
shutil.rmtree(dirpath, ignore_errors=True)
def test_compile_time_state_execution(self):
@with_tempdir()
def test_compile_time_state_execution(self, dirpath):
if not sys.stdin.isatty():
self.skipTest('Not attached to a TTY')
dirpath = tempfile.mkdtemp(dir=TMP)
if not os.path.isdir(dirpath):
self.skipTest(
'The temporary directory \'{0}\' was not created'.format(
dirpath
)
)
try:
# The Windows shell will include any spaces before the redirect
# in the text that is redirected.
# For example: echo hello > test.txt will contain "hello "
write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
#!pydsl
# The Windows shell will include any spaces before the redirect
# in the text that is redirected.
# For example: echo hello > test.txt will contain "hello "
write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
#!pydsl
__pydsl__.set(ordered=True)
A = state('A')
A.cmd.run('echo hehe>{0}/zzz.txt', cwd='/')
A.file.managed('{0}/yyy.txt', source='salt://zzz.txt')
A()
A()
__pydsl__.set(ordered=True)
A = state('A')
A.cmd.run('echo hehe>{0}/zzz.txt', cwd='/')
A.file.managed('{0}/yyy.txt', source='salt://zzz.txt')
A()
A()
state().cmd.run('echo hoho>>{0}/yyy.txt', cwd='/')
state().cmd.run('echo hoho>>{0}/yyy.txt', cwd='/')
A.file.managed('{0}/xxx.txt', source='salt://zzz.txt')
A()
'''.format(dirpath.replace('\\', '/'))))
self.state_highstate({'base': ['aaa']}, dirpath)
with salt.utils.files.fopen(os.path.join(dirpath, 'yyy.txt'), 'rt') as f:
self.assertEqual(f.read(), 'hehe' + os.linesep + 'hoho' + os.linesep)
with salt.utils.files.fopen(os.path.join(dirpath, 'xxx.txt'), 'rt') as f:
self.assertEqual(f.read(), 'hehe' + os.linesep)
finally:
shutil.rmtree(dirpath, ignore_errors=True)
A.file.managed('{0}/xxx.txt', source='salt://zzz.txt')
A()
'''.format(dirpath.replace('\\', '/'))))
self.state_highstate({'base': ['aaa']}, dirpath)
with salt.utils.files.fopen(os.path.join(dirpath, 'yyy.txt'), 'rt') as f:
self.assertEqual(f.read(), 'hehe' + os.linesep + 'hoho' + os.linesep)
with salt.utils.files.fopen(os.path.join(dirpath, 'xxx.txt'), 'rt') as f:
self.assertEqual(f.read(), 'hehe' + os.linesep)
def test_nested_high_state_execution(self):
dirpath = tempfile.mkdtemp(dir=TMP)
if not os.path.isdir(dirpath):
self.skipTest(
'The temporary directory \'{0}\' was not created'.format(
dirpath
)
)
@with_tempdir()
def test_nested_high_state_execution(self, dirpath):
output = os.path.join(dirpath, 'output')
try:
write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
#!pydsl
__salt__['state.sls']('bbb')
state().cmd.run('echo bbbbbb', cwd='/')
'''))
write_to(os.path.join(dirpath, 'bbb.sls'), textwrap.dedent(
'''
# {{ salt['state.sls']('ccc') }}
test:
cmd.run:
- name: echo bbbbbbb
- cwd: /
'''))
write_to(os.path.join(dirpath, 'ccc.sls'), textwrap.dedent(
'''
#!pydsl
state().cmd.run('echo ccccc', cwd='/')
'''))
self.state_highstate({'base': ['aaa']}, dirpath)
finally:
shutil.rmtree(dirpath, ignore_errors=True)
write_to(os.path.join(dirpath, 'aaa.sls'), textwrap.dedent('''\
#!pydsl
__salt__['state.sls']('bbb')
state().cmd.run('echo bbbbbb', cwd='/')
'''))
write_to(os.path.join(dirpath, 'bbb.sls'), textwrap.dedent(
'''
# {{ salt['state.sls']('ccc') }}
test:
cmd.run:
- name: echo bbbbbbb
- cwd: /
'''))
write_to(os.path.join(dirpath, 'ccc.sls'), textwrap.dedent(
'''
#!pydsl
state().cmd.run('echo ccccc', cwd='/')
'''))
self.state_highstate({'base': ['aaa']}, dirpath)
def test_repeat_includes(self):
dirpath = tempfile.mkdtemp(dir=TMP)
if not os.path.isdir(dirpath):
self.skipTest(
'The temporary directory \'{0}\' was not created'.format(
dirpath
)
)
@with_tempdir()
def test_repeat_includes(self, dirpath):
output = os.path.join(dirpath, 'output')
try:
write_to(os.path.join(dirpath, 'b.sls'), textwrap.dedent('''\
#!pydsl
include('c')
include('d')
'''))
write_to(os.path.join(dirpath, 'c.sls'), textwrap.dedent('''\
#!pydsl
modtest = include('e')
modtest.success
'''))
write_to(os.path.join(dirpath, 'd.sls'), textwrap.dedent('''\
#!pydsl
modtest = include('e')
modtest.success
'''))
write_to(os.path.join(dirpath, 'e.sls'), textwrap.dedent('''\
#!pydsl
success = True
'''))
self.state_highstate({'base': ['b']}, dirpath)
self.state_highstate({'base': ['c', 'd']}, dirpath)
finally:
shutil.rmtree(dirpath, ignore_errors=True)
write_to(os.path.join(dirpath, 'b.sls'), textwrap.dedent('''\
#!pydsl
include('c')
include('d')
'''))
write_to(os.path.join(dirpath, 'c.sls'), textwrap.dedent('''\
#!pydsl
modtest = include('e')
modtest.success
'''))
write_to(os.path.join(dirpath, 'd.sls'), textwrap.dedent('''\
#!pydsl
modtest = include('e')
modtest.success
'''))
write_to(os.path.join(dirpath, 'e.sls'), textwrap.dedent('''\
#!pydsl
success = True
'''))
self.state_highstate({'base': ['b']}, dirpath)
self.state_highstate({'base': ['c', 'd']}, dirpath)
def write_to(fpath, content):

View file

@ -6,15 +6,13 @@ Unit Tests for functions located in salt.utils.files.py.
# Import python libs
from __future__ import absolute_import, unicode_literals, print_function
import os
import shutil
import tempfile
# Import Salt libs
import salt.utils.files
from salt.ext import six
# Import Salt Testing libs
from tests.support.paths import TMP
from tests.support.helpers import with_tempdir
from tests.support.unit import TestCase, skipIf
from tests.support.mock import (
patch,
@ -43,33 +41,30 @@ class FilesUtilTestCase(TestCase):
error = True
self.assertFalse(error, 'salt.utils.files.safe_rm raised exception when it should not have')
def test_safe_walk_symlink_recursion(self):
tmp = tempfile.mkdtemp(dir=TMP)
try:
if os.stat(tmp).st_ino == 0:
self.skipTest('inodes not supported in {0}'.format(tmp))
os.mkdir(os.path.join(tmp, 'fax'))
os.makedirs(os.path.join(tmp, 'foo/bar'))
os.symlink('../..', os.path.join(tmp, 'foo/bar/baz'))
os.symlink('foo', os.path.join(tmp, 'root'))
expected = [
(os.path.join(tmp, 'root'), ['bar'], []),
(os.path.join(tmp, 'root/bar'), ['baz'], []),
(os.path.join(tmp, 'root/bar/baz'), ['fax', 'foo', 'root'], []),
(os.path.join(tmp, 'root/bar/baz/fax'), [], []),
]
paths = []
for root, dirs, names in salt.utils.files.safe_walk(os.path.join(tmp, 'root')):
paths.append((root, sorted(dirs), names))
if paths != expected:
raise AssertionError(
'\n'.join(
['got:'] + [repr(p) for p in paths] +
['', 'expected:'] + [repr(p) for p in expected]
)
@with_tempdir()
def test_safe_walk_symlink_recursion(self, tmp):
if os.stat(tmp).st_ino == 0:
self.skipTest('inodes not supported in {0}'.format(tmp))
os.mkdir(os.path.join(tmp, 'fax'))
os.makedirs(os.path.join(tmp, 'foo/bar'))
os.symlink('../..', os.path.join(tmp, 'foo/bar/baz'))
os.symlink('foo', os.path.join(tmp, 'root'))
expected = [
(os.path.join(tmp, 'root'), ['bar'], []),
(os.path.join(tmp, 'root/bar'), ['baz'], []),
(os.path.join(tmp, 'root/bar/baz'), ['fax', 'foo', 'root'], []),
(os.path.join(tmp, 'root/bar/baz/fax'), [], []),
]
paths = []
for root, dirs, names in salt.utils.files.safe_walk(os.path.join(tmp, 'root')):
paths.append((root, sorted(dirs), names))
if paths != expected:
raise AssertionError(
'\n'.join(
['got:'] + [repr(p) for p in paths] +
['', 'expected:'] + [repr(p) for p in expected]
)
finally:
shutil.rmtree(tmp)
)
@skipIf(not six.PY3, 'This test only applies to Python 3')
def test_fopen_with_disallowed_fds(self):

View file

@ -132,7 +132,7 @@ class JSONTestCase(TestCase):
# Loading it should be equal to the original data
self.assertEqual(salt.utils.json.loads(ret), self.data)
@with_tempfile
@with_tempfile()
def test_dump_load(self, json_out):
'''
Test dumping to and loading from a file handle