Run the test daemons from CLI

This should help running the test suite on Windows machines
This commit is contained in:
Pedro Algarvio 2016-05-06 19:49:08 +01:00
parent 336fbb03bc
commit 184c3c83ad
No known key found for this signature in database
GPG key ID: BB36BF6584A298FF
3 changed files with 570 additions and 47 deletions

View file

@ -17,8 +17,10 @@ import signal
import shutil
import pprint
import atexit
import socket
import logging
import tempfile
import threading
import subprocess
import multiprocessing
from hashlib import md5
@ -27,6 +29,10 @@ try:
import pwd
except ImportError:
pass
try:
import SocketServer as socketserver
except ImportError:
import socketserver
STATE_FUNCTION_RUNNING_RE = re.compile(
r'''The function (?:"|')(?P<state_func>.*)(?:"|') is running as PID '''
@ -56,12 +62,13 @@ import salt.runner
import salt.output
import salt.version
import salt.utils
import salt.utils.vt as vt
import salt.utils.process
import salt.log.setup as salt_log_setup
from salt.utils import fopen, get_colors
from salt.utils.verify import verify_env
from salt.utils.immutabletypes import freeze
from salt.utils.process import MultiprocessingProcess
from salt.utils.process import MultiprocessingProcess, SignalHandlingMultiprocessingProcess
from salt.exceptions import SaltClientError
try:
@ -72,10 +79,15 @@ except ImportError:
# Import 3rd-party libs
import yaml
import msgpack
import salt.ext.six as six
if salt.utils.is_windows():
import win32api
from tornado import gen
from tornado import ioloop
from tornado import concurrent
SYS_TMP_DIR = os.path.realpath(
# Avoid ${TMPDIR} and gettempdir() on MacOS as they yield a base path too long
@ -93,7 +105,30 @@ TMP_PRODENV_STATE_TREE = os.path.join(SYS_TMP_DIR, 'salt-temp-prodenv-state-tree
TMP_CONF_DIR = os.path.join(TMP, 'config')
CONF_DIR = os.path.join(INTEGRATION_TEST_DIR, 'files', 'conf')
PILLAR_DIR = os.path.join(FILES, 'pillar')
TMP_SCRIPT_DIR = os.path.join(SYS_TMP_DIR, 'scripts')
ENGINES_DIR = os.path.join(FILES, 'engines')
LOG_HANDLERS_DIR = os.path.join(FILES, 'log_handlers')
SCRIPT_TEMPLATES = {
'salt': [
'from salt.scripts import salt_main\n',
'if __name__ == \'__main__\':'
' salt_main()'
],
'salt-api': [
'import salt.cli\n',
'def main():',
' sapi = salt.cli.SaltAPI()',
' sapi.run()\n',
'if __name__ == \'__main__\':',
' main()'
],
'common': [
'from salt.scripts import salt_{0}\n',
'if __name__ == \'__main__\':',
' salt_{0}()'
]
}
RUNTIME_CONFIGS = {}
log = logging.getLogger(__name__)
@ -109,6 +144,20 @@ def cleanup_runtime_config_instance(to_cleanup):
atexit.register(cleanup_runtime_config_instance, RUNTIME_CONFIGS)
def get_unused_localhost_port():
'''
Return a random unused port on localhost
'''
usock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
usock.bind(('127.0.0.1', 0))
port = usock.getsockname()[1]
usock.close()
return port
SALT_LOG_PORT = get_unused_localhost_port()
def run_tests(*test_cases, **kwargs):
'''
Run integration tests for the chosen test cases.
@ -170,6 +219,276 @@ def run_tests(*test_cases, **kwargs):
parser.finalize(0)
class ThreadingMixIn(socketserver.ThreadingMixIn):
daemon_threads = True
class ThreadedSocketServer(ThreadingMixIn, socketserver.TCPServer):
def server_activate(self):
self.shutting_down = threading.Event()
socketserver.TCPServer.server_activate(self)
#super(ThreadedSocketServer, self).server_activate()
def server_close(self):
self.shutting_down.set()
socketserver.TCPServer.server_close(self)
#super(ThreadedSocketServer, self).server_close()
class SocketServerRequestHandler(socketserver.StreamRequestHandler):
def handle(self):
unpacker = msgpack.Unpacker(encoding='utf-8')
while not self.server.shutting_down.is_set():
try:
wire_bytes = self.request.recv(1024)
if not wire_bytes:
break
unpacker.feed(wire_bytes)
for record_dict in unpacker:
record = logging.makeLogRecord(record_dict)
logger = logging.getLogger(record.name)
logger.handle(record)
except (EOFError, KeyboardInterrupt, SystemExit):
break
except Exception as exc:
log.exception(exc)
class SaltScriptBase(object):
'''
Base class for Salt CLI scripts
'''
cli_script_name = None
def __init__(self,
config,
config_dir,
bin_dir_path,
io_loop=None):
self.config = config
self.config_dir = config_dir
self.bin_dir_path = bin_dir_path
self._io_loop = io_loop
@property
def io_loop(self):
'''
Return an IOLoop
'''
if self._io_loop is None:
self._io_loop = ioloop.IOLoop.current()
return self._io_loop
def get_script_path(self, script_name):
'''
Return the path to a testing runtime script
'''
if not os.path.isdir(TMP_SCRIPT_DIR):
os.makedirs(TMP_SCRIPT_DIR)
script_path = os.path.join(TMP_SCRIPT_DIR, script_name)
if not os.path.isfile(script_path):
log.debug('Generating {0}'.format(script_path))
# Late import
import salt.utils
with salt.utils.fopen(script_path, 'w') as sfh:
script_template = SCRIPT_TEMPLATES.get(script_name, None)
if script_template is None:
script_template = SCRIPT_TEMPLATES.get('common', None)
if script_template is None:
raise RuntimeError(
'{0} does not know how to handle the {1} script'.format(
self.__class__.__name__,
script_name
)
)
sfh.write(
#'#!/usr/bin/env python{0}.{1}\n'.format(*sys.version_info),
'\n'.join(script_template).format(script_name.replace('salt-', ''))
)
return script_path
def get_script_args(self): # pylint: disable=no-self-use
'''
Returns any additional arguments to pass to the CLI script
'''
return []
class SaltDaemonScriptBase(SaltScriptBase):
'''
Base class for Salt Daemon CLI scripts
'''
def __init__(self, *args, **kwargs):
super(SaltDaemonScriptBase, self).__init__(*args, **kwargs)
self._running = multiprocessing.Event()
self._connectable = multiprocessing.Event()
self._process = None
def is_alive(self):
'''
Returns true if the process is alive
'''
return self._running.is_set()
def get_check_ports(self): # pylint: disable=no-self-use
'''
Return a list of ports to check against to ensure the daemon is running
'''
return []
def start(self):
'''
Start the daemon subprocess
'''
self._process = SignalHandlingMultiprocessingProcess(
target=self._start, args=(self._running,))
self._process.start()
self._running.set()
return True
def _start(self, running_event):
'''
The actual, coroutine aware, start method
'''
log.info('Starting pytest %s DAEMON', self.__class__.__name__)
proc_args = [
sys.executable,
self.get_script_path(self.cli_script_name),
'-c',
self.config_dir,
] + self.get_script_args()
log.info('Running \'%s\' from %s...', ' '.join(proc_args), self.__class__.__name__)
terminal = vt.Terminal(proc_args,
stream_stdout=False,
log_stdout=True,
#log_stdout_level='warning',
stream_stderr=False,
log_stderr=True,
#log_stderr_level='warning'
cwd=CODE_DIR,
env={'PYTHONPATH': CODE_DIR}
)
self.pid = terminal.pid
while running_event.is_set() and terminal.has_unread_data:
# We're not actually interested in processing the output, just consume it
terminal.recv()
time.sleep(0.125)
# Let's close the terminal now that we're done with it
terminal.close(kill=True)
self.exitcode = terminal.exitstatus
def terminate(self):
'''
Terminate the started daemon
'''
self._running.clear()
self._connectable.clear()
self._process.terminate()
time.sleep(0.0125)
self._process.join()
def wait_until_running(self, timeout=None):
'''
Blocking call to wait for the daemon to start listening
'''
if self._connectable.is_set():
return True
try:
return self.io_loop.run_sync(self._wait_until_running, timeout=timeout)
except ioloop.TimeoutError:
return False
@gen.coroutine
def _wait_until_running(self):
'''
The actual, coroutine aware, call to wait for the daemon to start listening
'''
check_ports = self.get_check_ports()
log.debug(
'%s is checking the following ports to assure running status: %s',
self.__class__.__name__,
check_ports
)
while self._running.is_set():
if not check_ports:
self._connectable.set()
break
for port in set(check_ports):
log.debug('Checking connectable status on port: %s', port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn = sock.connect_ex(('localhost', port))
if conn == 0:
log.debug('Port %s is connectable!', port)
check_ports.remove(port)
sock.shutdown(socket.SHUT_RDWR)
sock.close()
del sock
yield gen.sleep(0.125)
# A final sleep to allow the ioloop to do other things
yield gen.sleep(0.125)
log.info('All ports checked. {0} running!')
raise gen.Return(self._connectable.is_set())
class SaltMinion(SaltDaemonScriptBase):
'''
Class which runs the salt-minion daemon
'''
cli_script_name = 'salt-minion'
def get_script_args(self):
#return ['--disable-keepalive', '-l', 'debug']
return ['--disable-keepalive', '-l', 'quiet']
def get_check_ports(self):
return set([self.config['runtests_conn_check_port']])
class SaltMaster(SaltDaemonScriptBase):
'''
Class which runs the salt-minion daemon
'''
cli_script_name = 'salt-master'
def get_check_ports(self):
return set([self.config['runtests_conn_check_port']])
return set([self.config['ret_port'],
self.config['publish_port'],
self.config['gc']])
def get_script_args(self):
#return ['-l', 'debug']
return ['-l', 'quiet']
class SaltSyndic(SaltDaemonScriptBase):
'''
Class which runs the salt-syndic daemon
'''
cli_script_name = 'salt-syndic'
def get_script_args(self):
#return ['-l', 'debug']
return ['-l', 'quiet']
def get_check_ports(self):
return set([self.config['runtests_conn_check_port']])
class TestDaemon(object):
'''
Set up the master and minion daemons, and run related cases
@ -186,7 +505,7 @@ class TestDaemon(object):
'''
# Setup the multiprocessing logging queue listener
salt_log_setup.setup_multiprocessing_logging_listener(
self.parser.options
self.master_opts
)
# Set up PATH to mockbin
@ -258,25 +577,20 @@ class TestDaemon(object):
'''
Fire up the daemons used for zeromq tests
'''
self.master_process = self.start_daemon(salt.master.Master,
self.master_opts,
'start')
self.log_server = ThreadedSocketServer(('localhost', SALT_LOG_PORT), SocketServerRequestHandler)
self.log_server_process = threading.Thread(target=self.log_server.serve_forever)
self.log_server_process.daemon = True
self.log_server_process.start()
self.minion_process = self.start_daemon(salt.minion.Minion,
self.minion_opts,
'tune_in')
self.sub_minion_process = self.start_daemon(salt.minion.Minion,
self.sub_minion_opts,
'tune_in')
self.smaster_process = self.start_daemon(salt.master.Master,
self.syndic_master_opts,
'start')
self.syndic_process = self.start_daemon(salt.minion.Syndic,
self.syndic_opts,
'tune_in')
self.master_process = SaltMaster(self.master_opts, TMP_CONF_DIR, SCRIPT_DIR)
self.minion_process = SaltMinion(self.minion_opts, TMP_CONF_DIR, SCRIPT_DIR)
self.sub_minion_process = SaltMinion(self.sub_minion_opts, os.path.join(TMP_CONF_DIR, 'sub-minion'), SCRIPT_DIR)
self.smaster_process = SaltMaster(self.syndic_master_opts, os.path.join(TMP_CONF_DIR, 'syndic-master'), SCRIPT_DIR)
self.syndic_process = SaltSyndic(self.syndic_opts, TMP_CONF_DIR, SCRIPT_DIR)
for process in (self.master_process, self.minion_process, self.sub_minion_process,
self.smaster_process, self.syndic_process):
process.start()
process.wait_until_running(timeout=15)
def start_raet_daemons(self):
'''
@ -501,6 +815,8 @@ class TestDaemon(object):
if os.path.isdir(TMP_CONF_DIR):
shutil.rmtree(TMP_CONF_DIR)
os.makedirs(TMP_CONF_DIR)
os.makedirs(os.path.join(TMP_CONF_DIR, 'sub-minion'))
os.makedirs(os.path.join(TMP_CONF_DIR, 'syndic-master'))
print(' * Transplanting configuration files to \'{0}\''.format(TMP_CONF_DIR))
if salt.utils.is_windows():
running_tests_user = win32api.GetUserName()
@ -524,10 +840,12 @@ class TestDaemon(object):
sub_minion_opts = salt.config._read_conf_file(os.path.join(CONF_DIR, 'sub_minion'))
sub_minion_opts['root_dir'] = os.path.join(TMP, 'sub-minion-root')
sub_minion_opts['user'] = running_tests_user
sub_minion_opts['id'] = 'sub_minion'
syndic_master_opts = salt.config._read_conf_file(os.path.join(CONF_DIR, 'syndic_master'))
syndic_master_opts['user'] = running_tests_user
syndic_master_opts['root_dir'] = os.path.join(TMP, 'syndic-master-root')
syndic_master_opts['id'] = 'syndic_master'
if transport == 'raet':
master_opts['transport'] = 'raet'
@ -590,6 +908,32 @@ class TestDaemon(object):
minion_opts[optname] = optname_path
sub_minion_opts[optname] = optname_path
master_opts['runtests_log_port'] = \
minion_opts['runtests_log_port'] = \
sub_minion_opts['runtests_log_port'] = \
syndic_opts['runtests_log_port'] = \
syndic_master_opts['runtests_log_port'] = SALT_LOG_PORT
master_opts['runtests_conn_check_port'] = get_unused_localhost_port()
minion_opts['runtests_conn_check_port'] = get_unused_localhost_port()
sub_minion_opts['runtests_conn_check_port'] = get_unused_localhost_port()
syndic_opts['runtests_conn_check_port'] = get_unused_localhost_port()
syndic_master_opts['runtests_conn_check_port'] = get_unused_localhost_port()
for conf in (master_opts, minion_opts, sub_minion_opts, syndic_opts, syndic_master_opts):
if 'engines' not in conf:
conf['engines'] = []
conf['engines'].append({'salt_runtests': {}})
if 'engines_dirs' not in conf:
conf['engines_dirs'] = []
conf['engines_dirs'].insert(0, ENGINES_DIR)
if 'log_handlers_dirs' not in conf:
conf['log_handlers_dirs'] = []
conf['log_handlers_dirs'].insert(0, LOG_HANDLERS_DIR)
# ----- Transcribe Configuration ---------------------------------------------------------------------------->
for entry in os.listdir(CONF_DIR):
if entry in ('master', 'minion', 'sub_minion', 'syndic_master'):
@ -613,6 +957,14 @@ class TestDaemon(object):
salt.utils.fopen(os.path.join(TMP_CONF_DIR, entry), 'w').write(
yaml.dump(computed_config, default_flow_style=False)
)
sub_minion_computed_config = copy.deepcopy(sub_minion_opts)
salt.utils.fopen(os.path.join(TMP_CONF_DIR, 'sub-minion', 'minion'), 'w').write(
yaml.dump(sub_minion_computed_config, default_flow_style=False)
)
syndic_master_computed_config = copy.deepcopy(syndic_master_opts)
salt.utils.fopen(os.path.join(TMP_CONF_DIR, 'syndic-master', 'master'), 'w').write(
yaml.dump(syndic_master_computed_config, default_flow_style=False)
)
# <---- Transcribe Configuration -----------------------------------------------------------------------------
# ----- Verify Environment ---------------------------------------------------------------------------------->
@ -624,8 +976,8 @@ class TestDaemon(object):
os.path.join(TMP_CONF_DIR, 'syndic'),
minion_config_path
)
sub_minion_opts = salt.config.minion_config(os.path.join(TMP_CONF_DIR, 'sub_minion'))
syndic_master_opts = salt.config.master_config(os.path.join(TMP_CONF_DIR, 'syndic_master'))
sub_minion_opts = salt.config.minion_config(os.path.join(TMP_CONF_DIR, 'sub-minion', 'minion'))
syndic_master_opts = salt.config.master_config(os.path.join(TMP_CONF_DIR, 'syndic-master', 'master'))
RUNTIME_CONFIGS['master'] = freeze(master_opts)
RUNTIME_CONFIGS['minion'] = freeze(minion_opts)
@ -687,22 +1039,35 @@ class TestDaemon(object):
'''
Kill the minion and master processes
'''
salt.utils.process.clean_proc(self.sub_minion_process, wait_for_kill=50)
self.sub_minion_process.join()
salt.utils.process.clean_proc(self.minion_process, wait_for_kill=50)
self.minion_process.join()
salt.utils.process.clean_proc(self.master_process, wait_for_kill=50)
self.master_process.join()
self.sub_minion_process.terminate()
self.minion_process.terminate()
self.master_process.terminate()
try:
salt.utils.process.clean_proc(self.syndic_process, wait_for_kill=50)
self.syndic_process.join()
self.syndic_process.terminate()
except AttributeError:
pass
try:
salt.utils.process.clean_proc(self.smaster_process, wait_for_kill=50)
self.smaster_process.join()
self.smaster_process.terminate()
except AttributeError:
pass
#salt.utils.process.clean_proc(self.sub_minion_process, wait_for_kill=50)
#self.sub_minion_process.join()
#salt.utils.process.clean_proc(self.minion_process, wait_for_kill=50)
#self.minion_process.join()
#salt.utils.process.clean_proc(self.master_process, wait_for_kill=50)
#self.master_process.join()
#try:
# salt.utils.process.clean_proc(self.syndic_process, wait_for_kill=50)
# self.syndic_process.join()
#except AttributeError:
# pass
#try:
# salt.utils.process.clean_proc(self.smaster_process, wait_for_kill=50)
# self.smaster_process.join()
#except AttributeError:
# pass
self.log_server.server_close()
self.log_server.shutdown()
self._exit_mockbin()
self._exit_ssh()
# Shutdown the multiprocessing logging queue listener
@ -715,23 +1080,24 @@ class TestDaemon(object):
'''
def setup_minions(self):
return
# Wait for minions to connect back
wait_minion_connections = MultiprocessingProcess(
target=self.wait_for_minion_connections,
args=(self.minion_targets, self.MINIONS_CONNECT_TIMEOUT)
)
wait_minion_connections.start()
wait_minion_connections.join()
wait_minion_connections.terminate()
if wait_minion_connections.exitcode > 0:
print(
'\n {LIGHT_RED}*{ENDC} ERROR: Minions failed to connect'.format(
**self.colors
)
)
return False
#wait_minion_connections = MultiprocessingProcess(
# target=self.wait_for_minion_connections,
# args=(self.minion_targets, self.MINIONS_CONNECT_TIMEOUT)
#)
#wait_minion_connections.start()
#wait_minion_connections.join()
#wait_minion_connections.terminate()
#if wait_minion_connections.exitcode > 0:
# print(
# '\n {LIGHT_RED}*{ENDC} ERROR: Minions failed to connect'.format(
# **self.colors
# )
# )
# return False
del wait_minion_connections
#del wait_minion_connections
sync_needed = self.parser.options.clean
if self.parser.options.clean is False:
@ -1072,6 +1438,10 @@ class AdaptedConfigurationTestCaseMixIn(object):
return TMP_CONF_DIR
def get_config_file_path(self, filename):
if filename == 'syndic_master':
return os.path.join(TMP_CONF_DIR, 'syndic-master', 'master')
if filename == 'sub_minion':
return os.path.join(TMP_CONF_DIR, 'sub-minion', 'minion')
return os.path.join(TMP_CONF_DIR, filename)
@property

View file

@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Pedro Algarvio (pedro@algarvio.me)`
:copyright: © 2015 by the SaltStack Team, see AUTHORS for more details.
:license: Apache 2.0, see LICENSE for more details.
pytestsalt.engines.pytest_engine
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Simple salt engine which will setup a socket to accept connections allowing us to know
when a daemon is up and running
'''
# Import python libs
from __future__ import absolute_import
import socket
import logging
# Import salt libs
import salt.utils.event
# Import 3rd-party libs
from tornado import gen
from tornado import ioloop
from tornado import netutil
log = logging.getLogger(__name__)
__virtualname__ = 'salt_runtests'
def __virtual__():
return 'runtests_conn_check_port' in __opts__ # pylint: disable=undefined-variable
def start():
# Create our own IOLoop, we're in another process
io_loop = ioloop.IOLoop()
io_loop.make_current()
pytest_engine = PyTestEngine(__opts__, io_loop) # pylint: disable=undefined-variable
io_loop.add_callback(pytest_engine.start)
io_loop.start()
class PyTestEngine(object):
def __init__(self, opts, io_loop):
self.opts = opts
self.io_loop = io_loop
self.sock = None
@gen.coroutine
def start(self):
if self.opts['__role'] == 'minion':
yield self.listen_to_minion_connected_event()
port = int(self.opts['runtests_conn_check_port'])
log.info('Starting Pytest Engine(role=%s) on port %s', self.opts['__role'], port)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.setblocking(0)
# bind the socket to localhost on the config provided port
self.sock.bind(('localhost', port))
# become a server socket
self.sock.listen(5)
netutil.add_accept_handler(
self.sock,
self.handle_connection,
io_loop=self.io_loop,
)
def handle_connection(self, connection, address):
log.warning('Accepted connection from %s. Role: %s', address, self.opts['__role'])
# We just need to know that the daemon running the engine is alive...
connection.shutdown(socket.SHUT_RDWR) # pylint: disable=no-member
connection.close()
@gen.coroutine
def listen_to_minion_connected_event(self):
log.info('Listening for minion connected event...')
minion_start_event_match = 'salt/minion/{0}/start'.format(self.opts['id'])
event_bus = salt.utils.event.get_master_event(self.opts,
self.opts['sock_dir'],
listen=True)
event_bus.subscribe(minion_start_event_match)
while True:
event = event_bus.get_event(full=True, no_block=True)
if event is not None and event['tag'] == minion_start_event_match:
log.info('Got minion connected event: %s', event)
break
yield gen.sleep(0.25)

View file

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Pedro Algarvio (pedro@algarvio.me)`
:copyright: © 2016 by the SaltStack Team, see AUTHORS for more details.
:license: Apache 2.0, see LICENSE for more details.
pytestsalt.salt.log_handlers.pytest_log_handler
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Salt External Logging Handler
'''
# Import python libs
from __future__ import absolute_import
import socket
import threading
import logging
import msgpack
import salt.log.setup
from salt.ext.six.moves.queue import Queue
__virtualname__ = 'runtests_log_handler'
log = logging.getLogger(__name__)
def __virtual__():
if 'runtests_log_port' not in __opts__:
return False, "'runtests_log_port' not in options"
return True
def setup_handlers():
queue = Queue()
handler = salt.log.setup.QueueHandler(queue)
handler.setLevel(1)
process_queue_thread = threading.Thread(target=process_queue, args=(__opts__['runtests_log_port'], queue))
process_queue_thread.daemon = True
process_queue_thread.start()
return handler
def process_queue(port, queue):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', port))
while True:
try:
record = queue.get()
if record is None:
# A sentinel to stop processing the queue
break
# Just log everything, filtering will happen on the main process
# logging handlers
sock.sendall(msgpack.dumps(record.__dict__, encoding='utf-8'))
except (EOFError, KeyboardInterrupt, SystemExit):
break
except Exception as exc: # pylint: disable=broad-except
log.warning(
'An exception occurred in the pytest salt logging '
'queue thread: {0}'.format(exc),
exc_info_on_loglevel=logging.DEBUG
)