Don't hard-code temporary paths on tests. Refs #2462.

This commit is contained in:
Pedro Algarvio 2012-11-06 11:20:06 +00:00
parent 667f3f0fcb
commit 75644992e2
11 changed files with 254 additions and 62 deletions

View file

@ -10,6 +10,7 @@ import sys
import shutil
import subprocess
import tempfile
import time
try:
import pwd
except ImportError:
@ -24,6 +25,14 @@ import salt.runner
from salt.utils.verify import verify_env
from saltunittest import TestCase
try:
import console
width, height = console.getTerminalSize()
PNUM = width
except:
PNUM = 70
INTEGRATION_TEST_DIR = os.path.dirname(
os.path.normpath(os.path.abspath(__file__))
)
@ -35,6 +44,30 @@ PYEXEC = 'python{0}.{1}'.format(sys.version_info[0], sys.version_info[1])
TMP = os.path.join(tempfile.gettempdir(), 'salt-tests-tmpdir')
FILES = os.path.join(INTEGRATION_TEST_DIR, 'files')
MOCKBIN = os.path.join(INTEGRATION_TEST_DIR, 'mockbin')
MINIONS_CONNECT_TIMEOUT = 60
def print_header(header, sep='~', top=True, bottom=True, inline=False,
centered=False):
'''
Allows some pretty printing of headers on the console, either with a
"ruler" on bottom and/or top, inline, centered, etc.
'''
if top and not inline:
print(sep * PNUM)
if centered and not inline:
fmt = u'{0:^{width}}'
elif inline and not centered:
fmt = u'{0:{sep}<{width}}'
elif inline and centered:
fmt = u'{0:{sep}^{width}}'
else:
fmt = u'{0}'
print(fmt.format(header, sep=sep, width=PNUM))
if bottom and not inline:
print(sep * PNUM)
def run_tests(TestCase):
@ -86,6 +119,7 @@ class TestDaemon(object):
def __init__(self, clean):
self.clean = clean
self.__evt_miconn = multiprocessing.Event()
def __enter__(self):
'''
@ -131,9 +165,11 @@ class TestDaemon(object):
# Point the config values to the correct temporary paths
for name in ('hosts', 'aliases'):
self.master_opts['{0}.file'.format(name)] = os.path.join(TMP, name)
self.minion_opts['{0}.file'.format(name)] = os.path.join(TMP, name)
self.sub_minion_opts['{0}.file'.format(name)] = os.path.join(TMP, name)
optname = '{0}.file'.format(name)
optname_path = os.path.join(TMP, name)
self.master_opts[optname] = optname_path
self.minion_opts[optname] = optname_path
self.sub_minion_opts[optname] = optname_path
verify_env([os.path.join(self.master_opts['pki_dir'], 'minions'),
os.path.join(self.master_opts['pki_dir'], 'minions_pre'),
@ -182,6 +218,22 @@ class TestDaemon(object):
self.syndic_process = multiprocessing.Process(target=syndic.tune_in)
self.syndic_process.start()
# Let's create a local client to ping and sync minions
self.client = salt.client.LocalClient(
os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf', 'master')
)
# Wait for minions to connect back and sync_all
sync_minions = multiprocessing.Process(
target=self.__wait_for_minions_connections,
args=(self.__evt_miconn,)
)
sync_minions.start()
if self.__evt_miconn.wait(MINIONS_CONNECT_TIMEOUT) is not True:
print('WARNING: Minions failed to connect back. Tests requiring '
'them WILL fail')
return self
def __exit__(self, type, value, traceback):
@ -227,6 +279,52 @@ class TestDaemon(object):
if os.path.isdir(TMP):
shutil.rmtree(TMP)
def __wait_for_minions_connections(self, evt):
print_header(
'Waiting at most {0} secs for local minions to connect '
'back'.format(MINIONS_CONNECT_TIMEOUT), sep='=', centered=True
)
targets = set(['minion', 'sub_minion'])
expected_connections = set(targets)
while True:
# If enough time passes, a timeout will be triggered by
# multiprocessing.Event, so, we can have this while True here
targets = self.client.cmd('*', 'test.ping')
for target in targets:
if target not in expected_connections:
# Someone(minion) else "listening"?
continue
expected_connections.remove(target)
print(' * {0} minion connected'.format(target))
if not expected_connections:
# All expected connections have connected
break
time.sleep(1)
# Let's sync all connected minions
print(' * Syncing local minion\'s dynamic data(saltutil.sync_all)')
syncing = set(targets)
jid_info = self.client.run_job(
','.join(targets), 'saltutil.sync_all',
expr_form='list',
timeout=9999999999999999,
)
while syncing:
rdata = self.client.get_returns(jid_info['jid'], syncing, 1)
if rdata:
for idx, (name, output) in enumerate(rdata.iteritems()):
print(' * Synced {0}: {1}'.format(name, output))
# Synced!
try:
syncing.remove(name)
except KeyError:
print(' * {0} already synced??? {1}'.format(
name, output
))
print_header('', sep='=', inline=True)
evt.set()
class ModuleCase(TestCase):
'''
@ -322,7 +420,7 @@ class ModuleCase(TestCase):
pass
else:
if isinstance(res, dict):
if res['result'] == True:
if res['result'] is True:
return
if 'comment' in res:
raise AssertionError(res['comment'])

View file

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
# vim: sw=4 ts=4 fenc=utf-8
"""
:copyright: © 2012 UfSoft.org - :email:`Pedro Algarvio (pedro@algarvio.me)`
:license: Apache 2.0, see LICENSE for more details
"""
import os
import tempfile
# This tempdir path is defined on tests.integration.__init__
TMP = os.path.join(tempfile.gettempdir(), 'salt-tests-tmpdir')
def get_temp_dir_for_path(path):
return os.path.join(TMP, path)

View file

@ -1,5 +1,4 @@
/tmp/salttest/issue-1876:
{{ salt['runtests_helpers.get_temp_dir_for_path']('issue-1876') }}:
file:
- managed
- source: salt://testfile

View file

@ -1,3 +1,3 @@
/tmp/salttest/issue-1879:
{{ salt['runtests_helpers.get_temp_dir_for_path']('issue-1879') }}:
file:
- touch

View file

@ -1,4 +1,4 @@
/tmp/salttest/issue-1879:
{{ salt['runtests_helpers.get_temp_dir_for_path']('issue-1879') }}:
file.append:
- text: |
# set variable identifying the chroot you work in (used in the prompt below)

View file

@ -1,5 +1,4 @@
/tmp/salttest/issue-1879:
{{ salt['runtests_helpers.get_temp_dir_for_path']('issue-1879') }}:
file.append:
- text: |
# enable bash completion in interactive shells

View file

@ -1,3 +1,3 @@
/tmp/salttest/test.append:
{{ salt['runtests_helpers.get_temp_dir_for_path']('test.append') }}:
file:
- touch

View file

@ -1,4 +1,4 @@
/tmp/salttest/test.append:
{{ salt['runtests_helpers.get_temp_dir_for_path']('test.append') }}:
file.append:
- source: salt://testappend/firstif

View file

@ -1,3 +1,3 @@
/tmp/salttest/test.append:
{{ salt['runtests_helpers.get_temp_dir_for_path']('test.append') }}:
file.append:
- source: salt://testappend/secondif
- source: salt://testappend/secondif

View file

@ -53,12 +53,43 @@ class StateModuleTest(integration.ModuleCase):
'''
Verify that we can append a file's contents
'''
if os.path.isfile('/tmp/salttest/test.append'):
os.unlink('/tmp/salttest/test.append')
testfile = os.path.join(integration.TMP, 'test.append')
if os.path.isfile(testfile):
os.unlink(testfile)
ret = self.run_function('state.sls', mods='testappend')
try:
self.assertTrue(isinstance(ret, dict)), ret
self.assertNotEqual(ret, {})
for key in ret.iterkeys():
self.assertTrue(ret[key]['result'])
except AssertionError:
print ret
raise
ret = self.run_function('state.sls', mods='testappend.step-1')
try:
self.assertTrue(isinstance(ret, dict)), ret
self.assertNotEqual(ret, {})
for key in ret.iterkeys():
self.assertTrue(ret[key]['result'])
except AssertionError:
print ret
raise
ret = self.run_function('state.sls', mods='testappend.step-2')
try:
self.assertTrue(isinstance(ret, dict)), ret
self.assertNotEqual(ret, {})
for key in ret.iterkeys():
self.assertTrue(ret[key]['result'])
except AssertionError:
print ret
raise
self.run_function('state.sls', mods='testappend')
self.run_function('state.sls', mods='testappend.step-1')
self.run_function('state.sls', mods='testappend.step-2')
self.assertMultiLineEqual('''\
# set variable identifying the chroot you work in (used in the prompt below)
if [ -z "$debian_chroot" ] && [ -r /etc/debian_chroot ]; then
@ -69,11 +100,31 @@ fi
if [ -f /etc/bash_completion ] && ! shopt -oq posix; then
. /etc/bash_completion
fi
''', open('/tmp/salttest/test.append', 'r').read())
''', open(testfile, 'r').read())
# Re-append switching order
self.run_function('state.sls', mods='testappend.step-2')
self.run_function('state.sls', mods='testappend.step-1')
ret = self.run_function('state.sls', mods='testappend.step-2')
try:
self.assertTrue(isinstance(ret, dict)), ret
self.assertNotEqual(ret, {})
for key in ret.iterkeys():
self.assertTrue(ret[key]['result'])
except AssertionError:
print ret
raise
ret = self.run_function('state.sls', mods='testappend.step-1')
try:
self.assertTrue(isinstance(ret, dict)), ret
self.assertNotEqual(ret, {})
for key in ret.iterkeys():
self.assertTrue(ret[key]['result'])
except AssertionError:
print ret
raise
self.assertMultiLineEqual('''\
# set variable identifying the chroot you work in (used in the prompt below)
if [ -z "$debian_chroot" ] && [ -r /etc/debian_chroot ]; then
@ -84,7 +135,7 @@ fi
if [ -f /etc/bash_completion ] && ! shopt -oq posix; then
. /etc/bash_completion
fi
''', open('/tmp/salttest/test.append', 'r').read())
''', open(testfile, 'r').read())
def test_issue_1876_syntax_error(self):
'''
@ -100,10 +151,12 @@ fi
- text: foo
'''
testfile = os.path.join(integration.TMP, 'issue-1876')
sls = self.run_function('state.sls', mods='issue-1876')
self.assertIn(
'Name "/tmp/salttest/issue-1876" in sls "issue-1876" contains '
'multiple state decs of the same type', sls
'Name "{0}" in sls "issue-1876" contains multiple state decs of '
'the same type'.format(testfile),
sls
)
def test_issue_1879_too_simple_contains_check(self):
@ -117,32 +170,83 @@ if [ -f /etc/bash_completion ] && ! shopt -oq posix; then
. /etc/bash_completion
fi
'''
testfile = os.path.join(integration.TMP, 'issue-1879')
# Delete if exiting
if os.path.isfile('/tmp/salttest/issue-1879'):
os.unlink('/tmp/salttest/issue-1879')
if os.path.isfile(testfile):
os.unlink(testfile)
# Create the file
self.run_function('state.sls', mods='issue-1879')
ret = self.run_function('state.sls', mods='issue-1879')
try:
self.assertTrue(isinstance(ret, dict)), ret
self.assertNotEqual(ret, {})
for key in ret.iterkeys():
self.assertTrue(ret[key]['result'])
except AssertionError:
print ret
raise
# The first append
self.run_function('state.sls', mods='issue-1879.step-1')
# The seccond append
self.run_function('state.sls', mods='issue-1879.step-2')
ret = self.run_function('state.sls', mods='issue-1879.step-1')
try:
self.assertTrue(isinstance(ret, dict)), ret
self.assertNotEqual(ret, {})
for key in ret.iterkeys():
self.assertTrue(ret[key]['result'])
except AssertionError:
print ret
raise
# The second append
ret = self.run_function('state.sls', mods='issue-1879.step-2')
try:
self.assertTrue(isinstance(ret, dict)), ret
self.assertNotEqual(ret, {})
for key in ret.iterkeys():
self.assertTrue(ret[key]['result'])
except AssertionError:
print ret
raise
# Does it match?
try:
self.assertMultiLineEqual(
contents, open('/tmp/salttest/issue-1879', 'r').read()
contents,
open(testfile, 'r').read()
)
# Make sure we don't re-append existing text
self.run_function('state.sls', mods='issue-1879.step-1')
self.run_function('state.sls', mods='issue-1879.step-2')
ret = self.run_function('state.sls', mods='issue-1879.step-1')
try:
self.assertTrue(isinstance(ret, dict)), ret
self.assertNotEqual(ret, {})
for key in ret.iterkeys():
self.assertTrue(ret[key]['result'])
except AssertionError:
print ret
raise
ret = self.run_function('state.sls', mods='issue-1879.step-2')
try:
self.assertTrue(isinstance(ret, dict)), ret
self.assertNotEqual(ret, {})
for key in ret.iterkeys():
self.assertTrue(ret[key]['result'])
except AssertionError:
print ret
raise
self.assertMultiLineEqual(
contents, open('/tmp/salttest/issue-1879', 'r').read()
contents,
open(testfile, 'r').read()
)
except Exception:
shutil.copy('/tmp/salttest/issue-1879', '/tmp/salttest/issue-1879.bak')
if os.path.exists(testfile):
shutil.copy(testfile, testfile + '.bak')
raise
finally:
os.unlink('/tmp/salttest/issue-1879')
if os.path.exists(testfile):
os.unlink(testfile)
def test_include(self):
fnames = ('/tmp/include-test', '/tmp/to-include-test')

View file

@ -12,14 +12,8 @@ import resource
import tempfile
# Import salt libs
try:
import console
width, height = console.getTerminalSize()
PNUM = width
except:
PNUM = 70
import saltunittest
from integration import TestDaemon
from integration import print_header, PNUM, TestDaemon
try:
import xmlrunner
@ -46,24 +40,6 @@ REQUIRED_OPEN_FILES = 2048
TEST_RESULTS = []
def print_header(header, sep='~', top=True, bottom=True, inline=False,
centered=False):
if top and not inline:
print(sep * PNUM)
if centered and not inline:
fmt = u'{0:^{width}}'
elif inline and not centered:
fmt = u'{0:{sep}<{width}}'
elif inline and centered:
fmt = u'{0:{sep}^{width}}'
else:
fmt = u'{0}'
print(fmt.format(header, sep=sep, width=PNUM))
if bottom and not inline:
print(sep * PNUM)
def run_suite(opts, path, display_name, suffix='[!_]*.py'):
'''