mirror of
https://github.com/saltstack/salt-bootstrap.git
synced 2025-04-16 09:40:21 +00:00
220 lines
6.9 KiB
Python
220 lines
6.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
'''
|
|
bootstrap.unittesting
|
|
~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Unit testing related classes, helpers.
|
|
|
|
:codeauthor: :email:`Pedro Algarvio (pedro@algarvio.me)`
|
|
:copyright: © 2013 by the SaltStack Team, see AUTHORS for more details.
|
|
:license: Apache 2.0, see LICENSE for more details.
|
|
'''
|
|
|
|
# Import python libs
|
|
import os
|
|
import sys
|
|
import fcntl
|
|
import signal
|
|
import tempfile
|
|
import subprocess
|
|
from datetime import datetime, timedelta
|
|
|
|
# Import salt bootstrap libs
|
|
from bootstrap.ext.os_data import GRAINS
|
|
|
|
|
|
# support python < 2.7 via unittest2
|
|
if sys.version_info < (2, 7):
|
|
try:
|
|
from unittest2 import (
|
|
TestLoader,
|
|
TextTestRunner,
|
|
TestCase,
|
|
expectedFailure,
|
|
TestSuite,
|
|
skipIf,
|
|
TestResult,
|
|
Testrogram
|
|
)
|
|
except ImportError:
|
|
raise SystemExit('You need to install unittest2 to run the salt tests')
|
|
else:
|
|
from unittest import (
|
|
TestLoader,
|
|
TextTestRunner,
|
|
TestCase,
|
|
expectedFailure,
|
|
TestSuite,
|
|
skipIf,
|
|
TestResult,
|
|
TestProgram
|
|
)
|
|
|
|
|
|
TEST_DIR = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
|
|
EXT_DIR = os.path.join(TEST_DIR, 'ext')
|
|
PARENT_DIR = os.path.dirname(TEST_DIR)
|
|
BOOTSTRAP_SCRIPT_PATH = os.path.join(PARENT_DIR, 'bootstrap-salt.sh')
|
|
|
|
|
|
class NonBlockingPopen(subprocess.Popen):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.stream_stds = kwargs.pop('stream_stds', False)
|
|
super(NonBlockingPopen, self).__init__(*args, **kwargs)
|
|
if self.stdout is not None and self.stream_stds:
|
|
fod = self.stdout.fileno()
|
|
fol = fcntl.fcntl(fod, fcntl.F_GETFL)
|
|
fcntl.fcntl(fod, fcntl.F_SETFL, fol | os.O_NONBLOCK)
|
|
self.obuff = ''
|
|
|
|
if self.stderr is not None and self.stream_stds:
|
|
fed = self.stderr.fileno()
|
|
fel = fcntl.fcntl(fed, fcntl.F_GETFL)
|
|
fcntl.fcntl(fed, fcntl.F_SETFL, fel | os.O_NONBLOCK)
|
|
self.ebuff = ''
|
|
|
|
def poll(self):
|
|
poll = super(NonBlockingPopen, self).poll()
|
|
|
|
if self.stdout is not None and self.stream_stds:
|
|
try:
|
|
obuff = self.stdout.read()
|
|
self.obuff += obuff
|
|
sys.stdout.write(obuff)
|
|
except IOError, err:
|
|
if err.errno not in (11, 35):
|
|
# We only handle Resource not ready properly, any other
|
|
# raise the exception
|
|
raise
|
|
if self.stderr is not None and self.stream_stds:
|
|
try:
|
|
ebuff = self.stderr.read()
|
|
self.ebuff += ebuff
|
|
sys.stderr.write(ebuff)
|
|
except IOError, err:
|
|
if err.errno not in (11, 35):
|
|
# We only handle Resource not ready properly, any other
|
|
# raise the exception
|
|
raise
|
|
|
|
if poll is None:
|
|
# Not done yet
|
|
return poll
|
|
|
|
if not self.stream_stds:
|
|
# Allow the same attribute access even though not streaming to stds
|
|
try:
|
|
self.obuff = self.stdout.read()
|
|
except IOError, err:
|
|
if err.errno not in (11, 35):
|
|
# We only handle Resource not ready properly, any other
|
|
# raise the exception
|
|
raise
|
|
try:
|
|
self.ebuff = self.stderr.read()
|
|
except IOError, err:
|
|
if err.errno not in (11, 35):
|
|
# We only handle Resource not ready properly, any other
|
|
# raise the exception
|
|
raise
|
|
return poll
|
|
|
|
|
|
class BootstrapTestCase(TestCase):
|
|
def run_script(self,
|
|
script=BOOTSTRAP_SCRIPT_PATH,
|
|
args=(),
|
|
cwd=PARENT_DIR,
|
|
timeout=None,
|
|
executable='/bin/sh',
|
|
stream_stds=False):
|
|
|
|
cmd = [script] + list(args)
|
|
|
|
outbuff = errbuff = ''
|
|
|
|
popen_kwargs = {
|
|
'cwd': cwd,
|
|
'shell': True,
|
|
'stderr': subprocess.PIPE,
|
|
'stdout': subprocess.PIPE,
|
|
'close_fds': True,
|
|
'executable': executable,
|
|
|
|
'stream_stds': stream_stds,
|
|
|
|
# detach from parent group (no more inherited signals!)
|
|
#'preexec_fn': os.setpgrp
|
|
}
|
|
|
|
cmd = ' '.join(filter(None, [script] + list(args)))
|
|
|
|
process = NonBlockingPopen(cmd, **popen_kwargs)
|
|
|
|
if timeout is not None:
|
|
stop_at = datetime.now() + timedelta(seconds=timeout)
|
|
term_sent = False
|
|
|
|
while process.poll() is None:
|
|
|
|
if timeout is not None:
|
|
now = datetime.now()
|
|
|
|
if now > stop_at:
|
|
if term_sent is False:
|
|
# Kill the process group since sending the term signal
|
|
# would only terminate the shell, not the command
|
|
# executed in the shell
|
|
os.killpg(os.getpgid(process.pid), signal.SIGINT)
|
|
term_sent = True
|
|
continue
|
|
|
|
# As a last resort, kill the process group
|
|
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
|
|
|
|
return 1, [
|
|
'Process took more than {0} seconds to complete. '
|
|
'Process Killed! Current STDOUT: \n{1}'.format(
|
|
timeout, process.obuff
|
|
)
|
|
], [
|
|
'Process took more than {0} seconds to complete. '
|
|
'Process Killed! Current STDERR: \n{1}'.format(
|
|
timeout, process.ebuff
|
|
)
|
|
]
|
|
|
|
process.communicate()
|
|
|
|
try:
|
|
return (
|
|
process.returncode,
|
|
process.obuff.splitlines(),
|
|
process.ebuff.splitlines()
|
|
)
|
|
finally:
|
|
try:
|
|
process.terminate()
|
|
except OSError:
|
|
# process already terminated
|
|
pass
|
|
|
|
def assert_script_result(self, fail_msg, expected_rcs, process_details):
|
|
if not isinstance(expected_rcs, (tuple, list)):
|
|
expected_rcs = (expected_rcs,)
|
|
|
|
rc, out, err = process_details
|
|
if rc not in expected_rcs:
|
|
err_msg = '{0}:\n'.format(fail_msg)
|
|
if out:
|
|
err_msg = '{0}STDOUT:\n{1}\n'.format(err_msg, '\n'.join(out))
|
|
if err:
|
|
err_msg = '{0}STDERR:\n{1}\n'.format(err_msg, '\n'.join(err))
|
|
if not err and not out:
|
|
err_msg = (
|
|
'{0} No stdout nor stderr captured. Exit code: {1}'.format(
|
|
err_msg, rc
|
|
)
|
|
)
|
|
raise AssertionError(err_msg.rstrip())
|