Merge branch '2018.3' into tls-impr

This commit is contained in:
Mike Place 2018-08-17 16:57:09 +02:00 committed by GitHub
commit 22240c073b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
144 changed files with 9390 additions and 9623 deletions

View file

@ -51,19 +51,19 @@ def __virtual__():
return __virtualname__
def _get_service(name):
def _name_in_services(name, services):
'''
Get information about a service. If the service is not found, raise an
error
Checks to see if the given service is in the given services.
:param str name: Service label, file name, or full path
:return: The service information for the service, otherwise an Error
:param dict services: The currently available services.
:return: The service information for the service, otherwise
an empty dictionary
:rtype: dict
'''
services = __utils__['mac_utils.available_services']()
name = name.lower()
if name in services:
# Match on label
return services[name]
@ -77,8 +77,50 @@ def _get_service(name):
# Match on basename
return service
# Could not find service
raise CommandExecutionError('Service not found: {0}'.format(name))
return dict()
def _get_service(name):
'''
Get information about a service. If the service is not found, raise an
error
:param str name: Service label, file name, or full path
:return: The service information for the service, otherwise an Error
:rtype: dict
'''
services = __utils__['mac_utils.available_services']()
name = name.lower()
service = _name_in_services(name, services)
# if we would the service we can return it
if service:
return service
# if we got here our service is not available, now we can check to see if
# we received a cached batch of services, if not we did a fresh check
# so we need to raise that the service could not be found.
try:
if not __context__['using_cached_services']:
raise CommandExecutionError('Service not found: {0}'.format(name))
except KeyError:
pass
# we used a cached version to check, a service could have been made
# between now and then, we should refresh our available services.
services = __utils__['mac_utils.available_services'](refresh=True)
# check to see if we found the service we are looking for.
service = _name_in_services(name, services)
if not service:
# Could not find the service after refresh raise.
raise CommandExecutionError('Service not found: {0}'.format(name))
# found it :)
return service
def show(name):
@ -429,7 +471,7 @@ def disabled(name, runas=None, domain='system'):
salt '*' service.disabled org.cups.cupsd
'''
ret = False
disabled = launchctl('print-disabled',
domain,
return_stdout=True,

View file

@ -817,14 +817,21 @@ class SerializerExtension(Extension, object):
return explore(data)
def format_json(self, value, sort_keys=True, indent=None):
return Markup(salt.utils.json.dumps(value, sort_keys=sort_keys, indent=indent).strip())
json_txt = salt.utils.json.dumps(value, sort_keys=sort_keys, indent=indent).strip()
try:
return Markup(json_txt)
except UnicodeDecodeError:
return Markup(salt.utils.stringutils.to_unicode(json_txt))
def format_yaml(self, value, flow_style=True):
yaml_txt = salt.utils.yaml.safe_dump(
value, default_flow_style=flow_style).strip()
if yaml_txt.endswith('\n...'):
if yaml_txt.endswith(str('\n...')): # future lint: disable=blacklisted-function
yaml_txt = yaml_txt[:len(yaml_txt)-4]
return Markup(yaml_txt)
try:
return Markup(yaml_txt)
except UnicodeDecodeError:
return Markup(salt.utils.stringutils.to_unicode(yaml_txt))
def format_xml(self, value):
"""Render a formatted multi-line XML string from a complex Python

View file

@ -15,7 +15,6 @@ import time
# Import Salt Libs
import salt.modules.cmdmod
import salt.utils.args
import salt.utils.decorators as decorators
import salt.utils.files
import salt.utils.path
import salt.utils.platform
@ -299,14 +298,18 @@ def launchctl(sub_cmd, *args, **kwargs):
return ret['stdout'] if return_stdout else True
def _available_services():
def _available_services(refresh=False):
'''
This is a helper function needed for testing. We are using the memoziation
decorator on the `available_services` function, which causes the function
to run once and then return the results of the first run on subsequent
calls. This causes problems when trying to test the functionality of the
`available_services` function.
This is a helper function for getting the available macOS services.
'''
try:
if __context__['available_services'] and not refresh:
log.debug('Found context for available services.')
__context__['using_cached_services'] = True
return __context__['available_services']
except KeyError:
pass
launchd_paths = [
'/Library/LaunchAgents',
'/Library/LaunchDaemons',
@ -359,14 +362,22 @@ def _available_services():
'file_path': true_path,
'plist': plist}
return _available_services
# put this in __context__ as this is a time consuming function.
# a fix for this issue. https://github.com/saltstack/salt/issues/48414
__context__['available_services'] = _available_services
# this is a fresh gathering of services, set cached to false
__context__['using_cached_services'] = False
return __context__['available_services']
@decorators.memoize
def available_services():
def available_services(refresh=False):
'''
Return a dictionary of all available services on the system
:param bool refresh: If you wish to refresh the available services
as this data is cached on the first run.
Returns:
dict: All available services
@ -377,4 +388,5 @@ def available_services():
import salt.utils.mac_service
salt.utils.mac_service.available_services()
'''
return _available_services()
log.debug('Loading available services')
return _available_services(refresh)

View file

@ -6,26 +6,43 @@ Tests for the Openstack Cloud Provider
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
import os
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.case import ModuleCase, ShellCase
from tests.support.paths import FILES
from tests.support.unit import skipIf
from tests.support.helpers import destructiveTest
from tests.support.helpers import destructiveTest, expensiveTest, generate_random_name
from tests.support.mixins import SaltReturnAssertsMixin
# Import Salt Libs
from salt.config import cloud_providers_config
log = logging.getLogger(__name__)
NO_KEYSTONE = False
try:
import keystoneclient # pylint: disable=import-error,unused-import
from libcloud.common.openstack_identity import OpenStackIdentity_3_0_Connection
from libcloud.common.openstack_identity import OpenStackIdentityTokenScope
HAS_KEYSTONE = True
except ImportError:
NO_KEYSTONE = True
HAS_KEYSTONE = True
# Import Third-Party Libs
try:
import shade # pylint: disable=unused-import
HAS_SHADE = True
except ImportError:
HAS_SHADE = False
# Create the cloud instance name to be used throughout the tests
INSTANCE_NAME = generate_random_name('CLOUD-TEST-')
PROVIDER_NAME = 'openstack'
DRIVER_NAME = 'openstack'
@skipIf(
NO_KEYSTONE,
not HAS_KEYSTONE,
'Please install keystoneclient and a keystone server before running'
'openstack integration tests.'
)
@ -156,3 +173,81 @@ class OpenstackTest(ModuleCase, SaltReturnAssertsMixin):
tenant_name='admin')
driver.authenticate()
self.assertTrue(driver.auth_token)
@skipIf(not HAS_SHADE, 'openstack driver requires `shade`')
class RackspaceTest(ShellCase):
'''
Integration tests for the Rackspace cloud provider using the Openstack driver
'''
@expensiveTest
def setUp(self):
'''
Sets up the test requirements
'''
super(RackspaceTest, self).setUp()
# check if appropriate cloud provider and profile files are present
profile_str = 'openstack-config'
providers = self.run_cloud('--list-providers')
if profile_str + ':' not in providers:
self.skipTest(
'Configuration file for {0} was not found. Check {0}.conf files '
'in tests/integration/files/conf/cloud.*.d/ to run these tests.'
.format(PROVIDER_NAME)
)
# check if personal access token, ssh_key_file, and ssh_key_names are present
config = cloud_providers_config(
os.path.join(
FILES,
'conf',
'cloud.providers.d',
PROVIDER_NAME + '.conf'
)
)
region_name = config[profile_str][DRIVER_NAME].get('region_name')
auth = config[profile_str][DRIVER_NAME].get('auth')
cloud = config[profile_str][DRIVER_NAME].get('cloud')
if not region_name or not (auth or cloud):
self.skipTest(
'A region_name and (auth or cloud) must be provided to run these '
'tests. Check tests/integration/files/conf/cloud.providers.d/{0}.conf'
.format(PROVIDER_NAME)
)
def test_instance(self):
'''
Test creating an instance on rackspace with the openstack driver
'''
# check if instance with salt installed returned
try:
self.assertIn(
INSTANCE_NAME,
[i.strip() for i in self.run_cloud('-p rackspace-test {0}'.format(INSTANCE_NAME), timeout=500)]
)
except AssertionError:
self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME), timeout=500)
raise
# delete the instance
try:
self.assertIn(
INSTANCE_NAME + ':',
[i.strip() for i in self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME), timeout=500)]
)
except AssertionError:
raise
def tearDown(self):
'''
Clean up after tests
'''
query = self.run_cloud('--query')
ret = ' {0}:'.format(INSTANCE_NAME)
# if test instance is still present, delete it
if ret in query:
self.run_cloud('-d {0} --assume-yes'.format(INSTANCE_NAME), timeout=500)

View file

@ -31,7 +31,7 @@ peer:
- 'test.*'
ext_pillar:
- test_ext_pillar_opts:
- ext_pillar_opts:
- test_issue_5951_actual_file_roots_in_opts
config_opt:

View file

@ -15,11 +15,11 @@ import logging
log = logging.getLogger(__name__)
# DRY up the name we use
MY_NAME = 'test_ext_pillar_opts'
MY_NAME = 'ext_pillar_opts'
def __virtual__():
log.debug('Loaded external pillar {0} as {1}'.format(__name__, MY_NAME))
log.debug('Loaded external pillar %s as %s', __name__, MY_NAME)
return True

View file

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
def myfunction():
grains = {}
grains['a_custom'] = {'k1': 'v1'}
return grains

View file

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
def myfunction():
grains = {}
grains['a_custom'] = {'k2': 'v2'}
return grains

View file

@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
def myfunction():
grains = {}
grains['a_custom'] = {'k1': 'v1'}
return grains

View file

@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
def myfunction():
grains = {}
grains['a_custom'] = {'k2': 'v2'}
return grains

View file

@ -0,0 +1,6 @@
{% set result = {"Question": "Quieres Café?"} %}
test:
module.run:
- name: test.echo
- text: '{{ result | json }}'

View file

@ -0,0 +1,6 @@
{% set result = {"Question": "Quieres Café?"} %}
test:
module.run:
- name: test.echo
- text: "{{ result | yaml }}"

View file

@ -48,7 +48,7 @@ class PillarModuleTest(ModuleCase):
def test_issue_5951_actual_file_roots_in_opts(self):
self.assertIn(
TMP_STATE_TREE,
self.run_function('pillar.data')['test_ext_pillar_opts']['file_roots']['base']
self.run_function('pillar.data')['ext_pillar_opts']['file_roots']['base']
)
def test_pillar_items(self):

View file

@ -12,7 +12,7 @@ from salt.netapi.rest_tornado import saltnado
from salt.utils.versions import StrictVersion
# Import Salt Testing Libs
from tests.unit.netapi.rest_tornado.test_handlers import SaltnadoTestCase
from tests.unit.netapi.test_rest_tornado import SaltnadoTestCase
from tests.support.helpers import flaky
from tests.support.unit import skipIf

View file

@ -0,0 +1,231 @@
# -*- coding: utf-8 -*-
'''
Tests for the MySQL states
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.unit import skipIf
from tests.support.helpers import destructiveTest
from tests.support.mixins import SaltReturnAssertsMixin
# Import salt libs
import salt.utils.path
from salt.ext import six
NO_MYSQL = False
try:
import MySQLdb # pylint: disable=import-error,unused-import
except ImportError:
NO_MYSQL = True
if not salt.utils.path.which('mysqladmin'):
NO_MYSQL = True
@skipIf(
NO_MYSQL,
'Please install MySQL bindings and a MySQL Server before running'
'MySQL integration tests.'
)
class MysqlDatabaseStateTest(ModuleCase, SaltReturnAssertsMixin):
'''
Validate the mysql_database state
'''
user = 'root'
password = 'poney'
@destructiveTest
def setUp(self):
'''
Test presence of MySQL server, enforce a root password
'''
super(MysqlDatabaseStateTest, self).setUp()
NO_MYSQL_SERVER = True
# now ensure we know the mysql root password
# one of theses two at least should work
ret1 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' flush-privileges password "'
+ self.password
+ '"'
)
ret2 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' --password="'
+ self.password
+ '" flush-privileges password "'
+ self.password
+ '"'
)
key, value = ret2.popitem()
if value['result']:
NO_MYSQL_SERVER = False
else:
self.skipTest('No MySQL Server running, or no root access on it.')
def _test_database(self, db_name, second_db_name, test_conn, **kwargs):
'''
Create db two times, test conn, remove it two times
'''
# In case of...
ret = self.run_state('mysql_database.absent',
name=db_name,
**kwargs
)
ret = self.run_state('mysql_database.present',
name=db_name,
**kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'The database ' + db_name + ' has been created',
ret
)
#2nd run
ret = self.run_state('mysql_database.present',
name=second_db_name,
**kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'Database ' + db_name + ' is already present',
ret
)
if test_conn:
# test root connection
ret = self.run_function(
'mysql.query',
database=db_name,
query='SELECT 1',
**kwargs
)
if not isinstance(ret, dict) or 'results' not in ret:
raise AssertionError(
('Unexpected result while testing connection'
' on db \'{0}\': {1}').format(
db_name,
repr(ret)
)
)
self.assertEqual([['1']], ret['results'])
# Now removing databases
kwargs.pop('character_set')
kwargs.pop('collate')
ret = self.run_state('mysql_database.absent',
name=db_name,
**kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'Database ' + db_name + ' has been removed',
ret
)
#2nd run
ret = self.run_state('mysql_database.absent',
name=second_db_name,
** kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'Database ' + db_name + ' is not present, so it cannot be removed',
ret
)
self.assertSaltStateChangesEqual(ret, {})
@destructiveTest
def test_present_absent(self):
'''
mysql_database.present
'''
self._test_database(
'testdb1',
'testdb1',
test_conn=True,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
# TODO: test with variations on collate and charset, check for db alter
# once it will be done in mysql_database.present state
@destructiveTest
def test_present_absent_fuzzy(self):
'''
mysql_database.present with utf-8 andf fuzzy db name
'''
# this is : ":() ;,?@=`&'\
dbname_fuzzy = '":() ;,?@=`&/\'\\'
# \xe6\xa8\x99\ = \u6a19 = 標
# this is : "();,?:@=`&/標'\
dbname_utf8 = '"();,?@=`&//\xe6\xa8\x99\'\\'
dbname_unicode = u'"();,?@=`&//\u6a19\'\\'
self._test_database(
dbname_fuzzy,
dbname_fuzzy,
test_conn=True,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
# FIXME: MySQLdb bugs on dbnames with utf-8?
self._test_database(
dbname_utf8,
dbname_unicode,
test_conn=False,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
#saltenv={"LC_ALL": "en_US.utf8"}
)
@destructiveTest
@skipIf(True, 'This tests needs issue #8947 to be fixed first')
def test_utf8_from_sls_file(self):
'''
Try to create/destroy an utf-8 database name from an sls file #8947
'''
expected_result = {
'mysql_database_|-A_|-foo \xe6\xba\x96`bar_|-present': {
'__run_num__': 0,
'comment': 'The database foo \xe6\xba\x96`bar has been created',
'result': True},
'mysql_database_|-B_|-foo \xe6\xba\x96`bar_|-absent': {
'__run_num__': 1,
'comment': 'Database foo \xe6\xba\x96`bar has been removed',
'result': True},
}
result = {}
ret = self.run_function('state.sls', mods='mysql_utf8')
if not isinstance(ret, dict):
raise AssertionError(
('Unexpected result while testing external mysql utf8 sls'
': {0}').format(
repr(ret)
)
)
for item, descr in six.iteritems(ret):
result[item] = {
'__run_num__': descr['__run_num__'],
'comment': descr['comment'],
'result': descr['result']
}
self.assertEqual(expected_result, result)

View file

@ -30,211 +30,6 @@ if not salt.utils.path.which('mysqladmin'):
NO_MYSQL = True
@skipIf(
NO_MYSQL,
'Please install MySQL bindings and a MySQL Server before running'
'MySQL integration tests.'
)
class MysqlDatabaseStateTest(ModuleCase, SaltReturnAssertsMixin):
'''
Validate the mysql_database state
'''
user = 'root'
password = 'poney'
@destructiveTest
def setUp(self):
'''
Test presence of MySQL server, enforce a root password
'''
super(MysqlDatabaseStateTest, self).setUp()
NO_MYSQL_SERVER = True
# now ensure we know the mysql root password
# one of theses two at least should work
ret1 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' flush-privileges password "'
+ self.password
+ '"'
)
ret2 = self.run_state(
'cmd.run',
name='mysqladmin --host="localhost" -u '
+ self.user
+ ' --password="'
+ self.password
+ '" flush-privileges password "'
+ self.password
+ '"'
)
key, value = ret2.popitem()
if value['result']:
NO_MYSQL_SERVER = False
else:
self.skipTest('No MySQL Server running, or no root access on it.')
def _test_database(self, db_name, second_db_name, test_conn, **kwargs):
'''
Create db two times, test conn, remove it two times
'''
# In case of...
ret = self.run_state('mysql_database.absent',
name=db_name,
**kwargs
)
ret = self.run_state('mysql_database.present',
name=db_name,
**kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'The database ' + db_name + ' has been created',
ret
)
#2nd run
ret = self.run_state('mysql_database.present',
name=second_db_name,
**kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'Database ' + db_name + ' is already present',
ret
)
if test_conn:
# test root connection
ret = self.run_function(
'mysql.query',
database=db_name,
query='SELECT 1',
**kwargs
)
if not isinstance(ret, dict) or 'results' not in ret:
raise AssertionError(
('Unexpected result while testing connection'
' on db \'{0}\': {1}').format(
db_name,
repr(ret)
)
)
self.assertEqual([['1']], ret['results'])
# Now removing databases
kwargs.pop('character_set')
kwargs.pop('collate')
ret = self.run_state('mysql_database.absent',
name=db_name,
**kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'Database ' + db_name + ' has been removed',
ret
)
#2nd run
ret = self.run_state('mysql_database.absent',
name=second_db_name,
** kwargs
)
self.assertSaltTrueReturn(ret)
self.assertInSaltComment(
'Database ' + db_name + ' is not present, so it cannot be removed',
ret
)
self.assertSaltStateChangesEqual(ret, {})
@destructiveTest
def test_present_absent(self):
'''
mysql_database.present
'''
self._test_database(
'testdb1',
'testdb1',
test_conn=True,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
# TODO: test with variations on collate and charset, check for db alter
# once it will be done in mysql_database.present state
@destructiveTest
def test_present_absent_fuzzy(self):
'''
mysql_database.present with utf-8 andf fuzzy db name
'''
# this is : ":() ;,?@=`&'\
dbname_fuzzy = '":() ;,?@=`&/\'\\'
# \xe6\xa8\x99\ = \u6a19 = 標
# this is : "();,?:@=`&/標'\
dbname_utf8 = '"();,?@=`&//\xe6\xa8\x99\'\\'
dbname_unicode = u'"();,?@=`&//\u6a19\'\\'
self._test_database(
dbname_fuzzy,
dbname_fuzzy,
test_conn=True,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8'
)
# FIXME: MySQLdb bugs on dbnames with utf-8?
self._test_database(
dbname_utf8,
dbname_unicode,
test_conn=False,
character_set='utf8',
collate='utf8_general_ci',
connection_user=self.user,
connection_pass=self.password,
connection_charset='utf8',
#saltenv={"LC_ALL": "en_US.utf8"}
)
@destructiveTest
@skipIf(True, 'This tests needs issue #8947 to be fixed first')
def test_utf8_from_sls_file(self):
'''
Try to create/destroy an utf-8 database name from an sls file #8947
'''
expected_result = {
'mysql_database_|-A_|-foo \xe6\xba\x96`bar_|-present': {
'__run_num__': 0,
'comment': 'The database foo \xe6\xba\x96`bar has been created',
'result': True},
'mysql_database_|-B_|-foo \xe6\xba\x96`bar_|-absent': {
'__run_num__': 1,
'comment': 'Database foo \xe6\xba\x96`bar has been removed',
'result': True},
}
result = {}
ret = self.run_function('state.sls', mods='mysql_utf8')
if not isinstance(ret, dict):
raise AssertionError(
('Unexpected result while testing external mysql utf8 sls'
': {0}').format(
repr(ret)
)
)
for item, descr in six.iteritems(ret):
result[item] = {
'__run_num__': descr['__run_num__'],
'comment': descr['comment'],
'result': descr['result']
}
self.assertEqual(expected_result, result)
@skipIf(
NO_MYSQL,
'Please install MySQL bindings and a MySQL Server before running'

View file

@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
'''
Test the ssh_auth states
'''
# Import python libs
from __future__ import absolute_import, unicode_literals, print_function
import os
# Import Salt Testing libs
from tests.support.case import ModuleCase
from tests.support.mixins import SaltReturnAssertsMixin
from tests.support.runtests import RUNTIME_VARS
from tests.support.helpers import (
destructiveTest,
with_system_user,
skip_if_not_root
)
# Import salt libs
import salt.utils.files
class SSHAuthStateTests(ModuleCase, SaltReturnAssertsMixin):
@destructiveTest
@skip_if_not_root
@with_system_user('issue_7409', on_existing='delete', delete=True)
def test_issue_7409_no_linebreaks_between_keys(self, username):
userdetails = self.run_function('user.info', [username])
user_ssh_dir = os.path.join(userdetails['home'], '.ssh')
authorized_keys_file = os.path.join(user_ssh_dir, 'authorized_keys')
ret = self.run_state(
'file.managed',
name=authorized_keys_file,
user=username,
makedirs=True,
contents_newline=False,
# Explicit no ending line break
contents='ssh-rsa AAAAB3NzaC1kc3MAAACBAL0sQ9fJ5bYTEyY== root'
)
ret = self.run_state(
'ssh_auth.present',
name='AAAAB3NzaC1kcQ9J5bYTEyZ==',
enc='ssh-rsa',
user=username,
comment=username
)
self.assertSaltTrueReturn(ret)
self.assertSaltStateChangesEqual(
ret, {'AAAAB3NzaC1kcQ9J5bYTEyZ==': 'New'}
)
with salt.utils.files.fopen(authorized_keys_file, 'r') as fhr:
self.assertEqual(
fhr.read(),
'ssh-rsa AAAAB3NzaC1kc3MAAACBAL0sQ9fJ5bYTEyY== root\n'
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
@destructiveTest
@skip_if_not_root
@with_system_user('issue_10198', on_existing='delete', delete=True)
def test_issue_10198_keyfile_from_another_env(self, username=None):
userdetails = self.run_function('user.info', [username])
user_ssh_dir = os.path.join(userdetails['home'], '.ssh')
authorized_keys_file = os.path.join(user_ssh_dir, 'authorized_keys')
key_fname = 'issue_10198.id_rsa.pub'
# Create the keyfile that we expect to get back on the state call
with salt.utils.files.fopen(os.path.join(RUNTIME_VARS.TMP_PRODENV_STATE_TREE, key_fname), 'w') as kfh:
kfh.write(
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
# Create a bogus key file on base environment
with salt.utils.files.fopen(os.path.join(RUNTIME_VARS.TMP_STATE_TREE, key_fname), 'w') as kfh:
kfh.write(
'ssh-rsa BAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
ret = self.run_state(
'ssh_auth.present',
name='Setup Keys',
source='salt://{0}?saltenv=prod'.format(key_fname),
enc='ssh-rsa',
user=username,
comment=username
)
self.assertSaltTrueReturn(ret)
with salt.utils.files.fopen(authorized_keys_file, 'r') as fhr:
self.assertEqual(
fhr.read(),
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
os.unlink(authorized_keys_file)
ret = self.run_state(
'ssh_auth.present',
name='Setup Keys',
source='salt://{0}'.format(key_fname),
enc='ssh-rsa',
user=username,
comment=username,
saltenv='prod'
)
self.assertSaltTrueReturn(ret)
with salt.utils.files.fopen(authorized_keys_file, 'r') as fhr:
self.assertEqual(
fhr.read(),
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)

View file

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
'''
Test the ssh_known_hosts state
Test the ssh_known_hosts states
'''
# Import python libs
@ -12,15 +12,7 @@ import shutil
from tests.support.case import ModuleCase
from tests.support.mixins import SaltReturnAssertsMixin
from tests.support.runtests import RUNTIME_VARS
from tests.support.helpers import (
destructiveTest,
with_system_user,
skip_if_binaries_missing,
skip_if_not_root
)
# Import salt libs
import salt.utils.files
from tests.support.helpers import skip_if_binaries_missing
KNOWN_HOSTS = os.path.join(RUNTIME_VARS.TMP, 'known_hosts')
GITHUB_FINGERPRINT = '9d:38:5b:83:a9:17:52:92:56:1a:5e:c4:d4:81:8e:0a:ca:51:a2:64:f1:74:20:11:2e:f8:8a:c3:a1:39:49:8f'
@ -154,98 +146,3 @@ class SSHKnownHostsStateTest(ModuleCase, SaltReturnAssertsMixin):
# test again
ret = self.run_state('ssh_known_hosts.absent', test=True, **kwargs)
self.assertSaltTrueReturn(ret)
class SSHAuthStateTests(ModuleCase, SaltReturnAssertsMixin):
@destructiveTest
@skip_if_not_root
@with_system_user('issue_7409', on_existing='delete', delete=True)
def test_issue_7409_no_linebreaks_between_keys(self, username):
userdetails = self.run_function('user.info', [username])
user_ssh_dir = os.path.join(userdetails['home'], '.ssh')
authorized_keys_file = os.path.join(user_ssh_dir, 'authorized_keys')
ret = self.run_state(
'file.managed',
name=authorized_keys_file,
user=username,
makedirs=True,
contents_newline=False,
# Explicit no ending line break
contents='ssh-rsa AAAAB3NzaC1kc3MAAACBAL0sQ9fJ5bYTEyY== root'
)
ret = self.run_state(
'ssh_auth.present',
name='AAAAB3NzaC1kcQ9J5bYTEyZ==',
enc='ssh-rsa',
user=username,
comment=username
)
self.assertSaltTrueReturn(ret)
self.assertSaltStateChangesEqual(
ret, {'AAAAB3NzaC1kcQ9J5bYTEyZ==': 'New'}
)
with salt.utils.files.fopen(authorized_keys_file, 'r') as fhr:
self.assertEqual(
fhr.read(),
'ssh-rsa AAAAB3NzaC1kc3MAAACBAL0sQ9fJ5bYTEyY== root\n'
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
@destructiveTest
@skip_if_not_root
@with_system_user('issue_10198', on_existing='delete', delete=True)
def test_issue_10198_keyfile_from_another_env(self, username=None):
userdetails = self.run_function('user.info', [username])
user_ssh_dir = os.path.join(userdetails['home'], '.ssh')
authorized_keys_file = os.path.join(user_ssh_dir, 'authorized_keys')
key_fname = 'issue_10198.id_rsa.pub'
# Create the keyfile that we expect to get back on the state call
with salt.utils.files.fopen(os.path.join(RUNTIME_VARS.TMP_PRODENV_STATE_TREE, key_fname), 'w') as kfh:
kfh.write(
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
# Create a bogus key file on base environment
with salt.utils.files.fopen(os.path.join(RUNTIME_VARS.TMP_STATE_TREE, key_fname), 'w') as kfh:
kfh.write(
'ssh-rsa BAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
ret = self.run_state(
'ssh_auth.present',
name='Setup Keys',
source='salt://{0}?saltenv=prod'.format(key_fname),
enc='ssh-rsa',
user=username,
comment=username
)
self.assertSaltTrueReturn(ret)
with salt.utils.files.fopen(authorized_keys_file, 'r') as fhr:
self.assertEqual(
fhr.read(),
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)
os.unlink(authorized_keys_file)
ret = self.run_state(
'ssh_auth.present',
name='Setup Keys',
source='salt://{0}'.format(key_fname),
enc='ssh-rsa',
user=username,
comment=username,
saltenv='prod'
)
self.assertSaltTrueReturn(ret)
with salt.utils.files.fopen(authorized_keys_file, 'r') as fhr:
self.assertEqual(
fhr.read(),
'ssh-rsa AAAAB3NzaC1kcQ9J5bYTEyZ== {0}\n'.format(username)
)

View file

@ -160,7 +160,7 @@ TEST_SUITES = {
'path': 'integration/netapi'},
'cloud_provider':
{'display_name': 'Cloud Provider',
'path': 'integration/cloud/providers'},
'path': 'integration/cloud/clouds'},
'minion':
{'display_name': 'Minion',
'path': 'integration/minion'},

View file

@ -78,7 +78,10 @@ def no_symlinks():
return not HAS_SYMLINKS
output = ''
try:
output = subprocess.check_output('git config --get core.symlinks', shell=True)
output = subprocess.Popen(
['git', 'config', '--get', 'core.symlinks'],
cwd=TMP,
stdout=subprocess.PIPE).communicate()[0]
except OSError as exc:
if exc.errno != errno.ENOENT:
raise

View file

@ -713,3 +713,23 @@ class JinjaFiltersTest(object):
self.assertIn('module_|-test_|-test.echo_|-run', ret)
self.assertEqual(ret['module_|-test_|-test.echo_|-run']['changes'],
_expected)
def test_yaml(self):
'''
test yaml filter
'''
_expected = {'ret': "{Question: 'Quieres Café?'}"}
ret = self.run_function('state.sls', ['jinja_filters.yaml'])
self.assertIn('module_|-test_|-test.echo_|-run', ret)
self.assertEqual(ret['module_|-test_|-test.echo_|-run']['changes'],
_expected)
def test_json(self):
'''
test json filter
'''
_expected = {'ret': '{"Question": "Quieres Café?"}'}
ret = self.run_function('state.sls', ['jinja_filters.json'])
self.assertIn('module_|-test_|-test.echo_|-run', ret)
self.assertEqual(ret['module_|-test_|-test.echo_|-run']['changes'],
_expected)

View file

@ -3,7 +3,7 @@
:synopsis: Base class for kernelpkg modules
:platform: Linux
:maturity: develop
versionadded:: 2018.3.0
.. versionadded:: 2018.3.0
'''
# pylint: disable=invalid-name,no-member

View file

@ -7,6 +7,11 @@ from __future__ import absolute_import
from tests.support.unit import skipIf, TestCase
from tests.support.mock import NO_MOCK, NO_MOCK_REASON
from tests.support.mixins import LoaderModuleMockMixin
try:
from pyroute2 import IPDB
HAS_PYROUTE2 = True
except ImportError:
HAS_PYROUTE2 = False
# Salt libs
import salt.beacons.network_settings as network_settings
@ -43,3 +48,18 @@ class NetworkSettingsBeaconTestCase(TestCase, LoaderModuleMockMixin):
ret = network_settings.validate(config)
self.assertEqual(ret, (True, 'Valid beacon configuration'))
@skipIf(not HAS_PYROUTE2, 'no pyroute2 installed, skipping')
class Pyroute2TestCase(TestCase):
def test_interface_dict_fields(self):
with IPDB() as ipdb:
for attr in network_settings.ATTRS:
# ipdb.interfaces is a dict-like object, that
# contains interface definitions. Interfaces can
# be referenced both with indices and names.
#
# ipdb.interfaces[1] is an interface with index 1,
# that is the loopback interface.
self.assertIn(attr, ipdb.interfaces[1])

View file

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
'''
unit tests for the localfs cache
unit tests for salt.cache
'''
# Import Python libs

View file

@ -0,0 +1,152 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Daniel Wallace <dwallace@saltstack.com`
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import os
import shutil
import tempfile
# Import Salt Testing libs
from tests.support.unit import skipIf, TestCase
from tests.support.case import ShellCase
from tests.support.mock import NO_MOCK, NO_MOCK_REASON, patch, MagicMock
from tests.support.paths import TMP
# Import Salt libs
import salt.config
import salt.roster
import salt.utils.files
import salt.utils.path
import salt.utils.thin
import salt.utils.yaml
from salt.client import ssh
ROSTER = '''
localhost:
host: 127.0.0.1
port: 2827
self:
host: 0.0.0.0
port: 42
'''
@skipIf(NO_MOCK, NO_MOCK_REASON)
@skipIf(not salt.utils.path.which('ssh'), "No ssh binary found in path")
class SSHPasswordTests(ShellCase):
def test_password_failure(self):
'''
Check password failures when trying to deploy keys
'''
opts = salt.config.client_config(self.get_config_file_path('master'))
opts['list_hosts'] = False
opts['argv'] = ['test.ping']
opts['selected_target_option'] = 'glob'
opts['tgt'] = 'localhost'
opts['arg'] = []
roster = os.path.join(self.get_config_dir(), 'roster')
handle_ssh_ret = [
{'localhost': {'retcode': 255, 'stderr': u'Permission denied (publickey).\r\n', 'stdout': ''}},
]
expected = {'localhost': 'Permission denied (publickey)'}
display_output = MagicMock()
with patch('salt.roster.get_roster_file', MagicMock(return_value=roster)), \
patch('salt.client.ssh.SSH.handle_ssh', MagicMock(return_value=handle_ssh_ret)), \
patch('salt.client.ssh.SSH.key_deploy', MagicMock(return_value=expected)), \
patch('salt.output.display_output', display_output):
client = ssh.SSH(opts)
ret = next(client.run_iter())
with self.assertRaises(SystemExit):
client.run()
display_output.assert_called_once_with(expected, 'nested', opts)
self.assertIs(ret, handle_ssh_ret[0])
class SSHRosterDefaults(TestCase):
def test_roster_defaults_flat(self):
'''
Test Roster Defaults on the flat roster
'''
tempdir = tempfile.mkdtemp(dir=TMP)
expected = {
'self': {
'host': '0.0.0.0',
'user': 'daniel',
'port': 42,
},
'localhost': {
'host': '127.0.0.1',
'user': 'daniel',
'port': 2827,
},
}
try:
root_dir = os.path.join(tempdir, 'foo', 'bar')
os.makedirs(root_dir)
fpath = os.path.join(root_dir, 'config')
with salt.utils.files.fopen(fpath, 'w') as fp_:
fp_.write(
'''
roster_defaults:
user: daniel
'''
)
opts = salt.config.master_config(fpath)
with patch('salt.roster.get_roster_file', MagicMock(return_value=ROSTER)):
with patch('salt.template.compile_template', MagicMock(return_value=salt.utils.yaml.safe_load(ROSTER))):
roster = salt.roster.Roster(opts=opts)
self.assertEqual(roster.targets('*', 'glob'), expected)
finally:
if os.path.isdir(tempdir):
shutil.rmtree(tempdir)
@skipIf(NO_MOCK, NO_MOCK_REASON)
class SSHSingleTests(TestCase):
def setUp(self):
self.tmp_cachedir = tempfile.mkdtemp(dir=TMP)
def test_single_opts(self):
''' Sanity check for ssh.Single options
'''
argv = ['ssh.set_auth_key', 'root', 'hobn+amNAXSBTiOXEqlBjGB...rsa root@master']
opts = {
'argv': argv,
'__role': 'master',
'cachedir': self.tmp_cachedir,
'extension_modules': os.path.join(self.tmp_cachedir, 'extmods'),
}
target = {
'passwd': 'abc123',
'ssh_options': None,
'sudo': False,
'identities_only': False,
'host': 'login1',
'user': 'root',
'timeout': 65,
'remote_port_forwards': None,
'sudo_user': '',
'port': '22',
'priv': '/etc/salt/pki/master/ssh/salt-ssh.rsa'
}
single = ssh.Single(
opts,
opts['argv'],
'localhost',
mods={},
fsclient=None,
thin=salt.utils.thin.thin_path(opts['cachedir']),
mine=False,
**target)
self.assertEqual(single.shell._ssh_opts(), '')
self.assertEqual(single.shell._cmd_str('date +%s'), 'ssh login1 '
'-o KbdInteractiveAuthentication=no -o '
'PasswordAuthentication=yes -o ConnectTimeout=65 -o Port=22 '
'-o IdentityFile=/etc/salt/pki/master/ssh/salt-ssh.rsa '
'-o User=root date +%s')

View file

@ -1,140 +0,0 @@
# -*- coding: utf-8 -*-
'''
tests.unit.api_config_test
'''
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Testing libs
from tests.support.unit import skipIf, TestCase
from tests.support.helpers import destructiveTest
from tests.support.mock import (
MagicMock,
NO_MOCK,
NO_MOCK_REASON,
patch
)
# Import Salt libs
import salt.config
import salt.utils.platform
import salt.syspaths
MOCK_MASTER_DEFAULT_OPTS = {
'log_file': '{0}/var/log/salt/master'.format(salt.syspaths.ROOT_DIR),
'pidfile': '{0}/var/run/salt-master.pid'.format(salt.syspaths.ROOT_DIR),
'root_dir': format(salt.syspaths.ROOT_DIR)
}
if salt.utils.platform.is_windows():
MOCK_MASTER_DEFAULT_OPTS = {
'log_file': '{0}\\var\\log\\salt\\master'.format(
salt.syspaths.ROOT_DIR),
'pidfile': '{0}\\var\\run\\salt-master.pid'.format(
salt.syspaths.ROOT_DIR),
'root_dir': format(salt.syspaths.ROOT_DIR)
}
@skipIf(NO_MOCK, NO_MOCK_REASON)
class APIConfigTestCase(TestCase):
'''
TestCase for the api_config function in salt.config.__init__.py
'''
def setUp(self):
# Copy DEFAULT_API_OPTS to restore after the test
self.default_api_opts = salt.config.DEFAULT_API_OPTS.copy()
def tearDown(self):
# Reset DEFAULT_API_OPTS settings as to not interfere with other unit tests
salt.config.DEFAULT_API_OPTS = self.default_api_opts
def test_api_config_log_file_values(self):
'''
Tests the opts value of the 'log_file' after running through the
various default dict updates. 'log_file' should be updated to match
the DEFAULT_API_OPTS 'api_logfile' value.
'''
with patch('salt.config.client_config', MagicMock(return_value=MOCK_MASTER_DEFAULT_OPTS)):
expected = '{0}/var/log/salt/api'.format(
salt.syspaths.ROOT_DIR if salt.syspaths.ROOT_DIR != '/' else '')
if salt.utils.platform.is_windows():
expected = '{0}\\var\\log\\salt\\api'.format(
salt.syspaths.ROOT_DIR)
ret = salt.config.api_config('/some/fake/path')
self.assertEqual(ret['log_file'], expected)
def test_api_config_pidfile_values(self):
'''
Tests the opts value of the 'pidfile' after running through the
various default dict updates. 'pidfile' should be updated to match
the DEFAULT_API_OPTS 'api_pidfile' value.
'''
with patch('salt.config.client_config', MagicMock(return_value=MOCK_MASTER_DEFAULT_OPTS)):
expected = '{0}/var/run/salt-api.pid'.format(
salt.syspaths.ROOT_DIR if salt.syspaths.ROOT_DIR != '/' else '')
if salt.utils.platform.is_windows():
expected = '{0}\\var\\run\\salt-api.pid'.format(
salt.syspaths.ROOT_DIR)
ret = salt.config.api_config('/some/fake/path')
self.assertEqual(ret['pidfile'], expected)
@destructiveTest
def test_master_config_file_overrides_defaults(self):
'''
Tests the opts value of the api config values after running through the
various default dict updates that should be overridden by settings in
the user's master config file.
'''
foo_dir = '/foo/bar/baz'
hello_dir = '/hello/world'
if salt.utils.platform.is_windows():
foo_dir = 'c:\\foo\\bar\\baz'
hello_dir = 'c:\\hello\\world'
mock_master_config = {
'api_pidfile': foo_dir,
'api_logfile': hello_dir,
'rest_timeout': 5
}
mock_master_config.update(MOCK_MASTER_DEFAULT_OPTS.copy())
with patch('salt.config.client_config',
MagicMock(return_value=mock_master_config)):
ret = salt.config.api_config('/some/fake/path')
self.assertEqual(ret['rest_timeout'], 5)
self.assertEqual(ret['api_pidfile'], foo_dir)
self.assertEqual(ret['pidfile'], foo_dir)
self.assertEqual(ret['api_logfile'], hello_dir)
self.assertEqual(ret['log_file'], hello_dir)
@destructiveTest
def test_api_config_prepend_root_dirs_return(self):
'''
Tests the opts value of the api_logfile, log_file, api_pidfile, and pidfile
when a custom root directory is used. This ensures that each of these
values is present in the list of opts keys that should have the root_dir
prepended when the api_config function returns the opts dictionary.
'''
mock_log = '/mock/root/var/log/salt/api'
mock_pid = '/mock/root/var/run/salt-api.pid'
mock_master_config = MOCK_MASTER_DEFAULT_OPTS.copy()
mock_master_config['root_dir'] = '/mock/root/'
if salt.utils.platform.is_windows():
mock_log = 'c:\\mock\\root\\var\\log\\salt\\api'
mock_pid = 'c:\\mock\\root\\var\\run\\salt-api.pid'
mock_master_config['root_dir'] = 'c:\\mock\\root'
with patch('salt.config.client_config',
MagicMock(return_value=mock_master_config)):
ret = salt.config.api_config('/some/fake/path')
self.assertEqual(ret['api_logfile'], mock_log)
self.assertEqual(ret['log_file'], mock_log)
self.assertEqual(ret['api_pidfile'], mock_pid)
self.assertEqual(ret['pidfile'], mock_pid)

File diff suppressed because it is too large Load diff

View file

@ -1,361 +0,0 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: Mike Place <mp@saltstack.com>
'''
# Import Python libs
from __future__ import absolute_import
import errno
import logging
import os
import shutil
# Import Salt Testing libs
from tests.integration import AdaptedConfigurationTestCaseMixin
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.paths import TMP
from tests.support.unit import TestCase, skipIf
from tests.support.mock import MagicMock, patch, NO_MOCK, NO_MOCK_REASON
# Import salt libs
import salt.utils.files
from salt import fileclient
from salt.ext import six
log = logging.getLogger(__name__)
SALTENVS = ('base', 'dev')
FS_ROOT = os.path.join(TMP, 'fileclient_fs_root')
CACHE_ROOT = os.path.join(TMP, 'fileclient_cache_root')
SUBDIR = 'subdir'
SUBDIR_FILES = ('foo.txt', 'bar.txt', 'baz.txt')
def _get_file_roots():
return dict(
[(x, [os.path.join(FS_ROOT, x)]) for x in SALTENVS]
)
MOCKED_OPTS = {
'file_roots': _get_file_roots(),
'fileserver_backend': ['roots'],
'cachedir': CACHE_ROOT,
'file_client': 'local',
}
@skipIf(NO_MOCK, NO_MOCK_REASON)
class FileClientTest(TestCase, AdaptedConfigurationTestCaseMixin, LoaderModuleMockMixin):
def setup_loader_modules(self):
return {fileclient: {'__opts__': MOCKED_OPTS}}
def setUp(self):
self.file_client = fileclient.Client(self.master_opts)
def tearDown(self):
del self.file_client
def test_file_list_emptydirs(self):
'''
Ensure that the fileclient class won't allow a direct call to file_list_emptydirs()
'''
with self.assertRaises(NotImplementedError):
self.file_client.file_list_emptydirs()
def test_get_file(self):
'''
Ensure that the fileclient class won't allow a direct call to get_file()
'''
with self.assertRaises(NotImplementedError):
self.file_client.get_file(None)
def test_get_file_client(self):
minion_opts = self.get_temp_config('minion')
minion_opts['file_client'] = 'remote'
with patch('salt.fileclient.RemoteClient', MagicMock(return_value='remote_client')):
ret = fileclient.get_file_client(minion_opts)
self.assertEqual('remote_client', ret)
@skipIf(NO_MOCK, NO_MOCK_REASON)
class FileclientCacheTest(TestCase, AdaptedConfigurationTestCaseMixin, LoaderModuleMockMixin):
'''
Tests for the fileclient caching. The LocalClient is the only thing we can
test as it is the only way we can mock the fileclient (the tests run from
the minion process, so the master cannot be mocked from test code).
'''
def setup_loader_modules(self):
return {fileclient: {'__opts__': MOCKED_OPTS}}
def setUp(self):
'''
No need to add a dummy foo.txt to muddy up the github repo, just make
our own fileserver root on-the-fly.
'''
def _new_dir(path):
'''
Add a new dir at ``path`` using os.makedirs. If the directory
already exists, remove it recursively and then try to create it
again.
'''
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST:
# Just in case a previous test was interrupted, remove the
# directory and try adding it again.
shutil.rmtree(path)
os.makedirs(path)
else:
raise
# Crete the FS_ROOT
for saltenv in SALTENVS:
saltenv_root = os.path.join(FS_ROOT, saltenv)
# Make sure we have a fresh root dir for this saltenv
_new_dir(saltenv_root)
path = os.path.join(saltenv_root, 'foo.txt')
with salt.utils.files.fopen(path, 'w') as fp_:
fp_.write(
'This is a test file in the \'{0}\' saltenv.\n'
.format(saltenv)
)
subdir_abspath = os.path.join(saltenv_root, SUBDIR)
os.makedirs(subdir_abspath)
for subdir_file in SUBDIR_FILES:
path = os.path.join(subdir_abspath, subdir_file)
with salt.utils.files.fopen(path, 'w') as fp_:
fp_.write(
'This is file \'{0}\' in subdir \'{1} from saltenv '
'\'{2}\''.format(subdir_file, SUBDIR, saltenv)
)
# Create the CACHE_ROOT
_new_dir(CACHE_ROOT)
def tearDown(self):
'''
Remove the directories created for these tests
'''
shutil.rmtree(FS_ROOT)
shutil.rmtree(CACHE_ROOT)
def test_cache_dir(self):
'''
Ensure entire directory is cached to correct location
'''
patched_opts = dict((x, y) for x, y in six.iteritems(self.minion_opts))
patched_opts.update(MOCKED_OPTS)
with patch.dict(fileclient.__opts__, patched_opts):
client = fileclient.get_file_client(fileclient.__opts__, pillar=False)
for saltenv in SALTENVS:
self.assertTrue(
client.cache_dir(
'salt://{0}'.format(SUBDIR),
saltenv,
cachedir=None
)
)
for subdir_file in SUBDIR_FILES:
cache_loc = os.path.join(fileclient.__opts__['cachedir'],
'files',
saltenv,
SUBDIR,
subdir_file)
# Double check that the content of the cached file
# identifies it as being from the correct saltenv. The
# setUp function creates the file with the name of the
# saltenv mentioned in the file, so a simple 'in' check is
# sufficient here. If opening the file raises an exception,
# this is a problem, so we are not catching the exception
# and letting it be raised so that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(subdir_file in content)
self.assertTrue(SUBDIR in content)
self.assertTrue(saltenv in content)
def test_cache_dir_with_alternate_cachedir_and_absolute_path(self):
'''
Ensure entire directory is cached to correct location when an alternate
cachedir is specified and that cachedir is an absolute path
'''
patched_opts = dict((x, y) for x, y in six.iteritems(self.minion_opts))
patched_opts.update(MOCKED_OPTS)
alt_cachedir = os.path.join(TMP, 'abs_cachedir')
with patch.dict(fileclient.__opts__, patched_opts):
client = fileclient.get_file_client(fileclient.__opts__, pillar=False)
for saltenv in SALTENVS:
self.assertTrue(
client.cache_dir(
'salt://{0}'.format(SUBDIR),
saltenv,
cachedir=alt_cachedir
)
)
for subdir_file in SUBDIR_FILES:
cache_loc = os.path.join(alt_cachedir,
'files',
saltenv,
SUBDIR,
subdir_file)
# Double check that the content of the cached file
# identifies it as being from the correct saltenv. The
# setUp function creates the file with the name of the
# saltenv mentioned in the file, so a simple 'in' check is
# sufficient here. If opening the file raises an exception,
# this is a problem, so we are not catching the exception
# and letting it be raised so that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(subdir_file in content)
self.assertTrue(SUBDIR in content)
self.assertTrue(saltenv in content)
def test_cache_dir_with_alternate_cachedir_and_relative_path(self):
'''
Ensure entire directory is cached to correct location when an alternate
cachedir is specified and that cachedir is a relative path
'''
patched_opts = dict((x, y) for x, y in six.iteritems(self.minion_opts))
patched_opts.update(MOCKED_OPTS)
alt_cachedir = 'foo'
with patch.dict(fileclient.__opts__, patched_opts):
client = fileclient.get_file_client(fileclient.__opts__, pillar=False)
for saltenv in SALTENVS:
self.assertTrue(
client.cache_dir(
'salt://{0}'.format(SUBDIR),
saltenv,
cachedir=alt_cachedir
)
)
for subdir_file in SUBDIR_FILES:
cache_loc = os.path.join(fileclient.__opts__['cachedir'],
alt_cachedir,
'files',
saltenv,
SUBDIR,
subdir_file)
# Double check that the content of the cached file
# identifies it as being from the correct saltenv. The
# setUp function creates the file with the name of the
# saltenv mentioned in the file, so a simple 'in' check is
# sufficient here. If opening the file raises an exception,
# this is a problem, so we are not catching the exception
# and letting it be raised so that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(subdir_file in content)
self.assertTrue(SUBDIR in content)
self.assertTrue(saltenv in content)
def test_cache_file(self):
'''
Ensure file is cached to correct location
'''
patched_opts = dict((x, y) for x, y in six.iteritems(self.minion_opts))
patched_opts.update(MOCKED_OPTS)
with patch.dict(fileclient.__opts__, patched_opts):
client = fileclient.get_file_client(fileclient.__opts__, pillar=False)
for saltenv in SALTENVS:
self.assertTrue(
client.cache_file('salt://foo.txt', saltenv, cachedir=None)
)
cache_loc = os.path.join(
fileclient.__opts__['cachedir'], 'files', saltenv, 'foo.txt')
# Double check that the content of the cached file identifies
# it as being from the correct saltenv. The setUp function
# creates the file with the name of the saltenv mentioned in
# the file, so a simple 'in' check is sufficient here. If
# opening the file raises an exception, this is a problem, so
# we are not catching the exception and letting it be raised so
# that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(saltenv in content)
def test_cache_file_with_alternate_cachedir_and_absolute_path(self):
'''
Ensure file is cached to correct location when an alternate cachedir is
specified and that cachedir is an absolute path
'''
patched_opts = dict((x, y) for x, y in six.iteritems(self.minion_opts))
patched_opts.update(MOCKED_OPTS)
alt_cachedir = os.path.join(TMP, 'abs_cachedir')
with patch.dict(fileclient.__opts__, patched_opts):
client = fileclient.get_file_client(fileclient.__opts__, pillar=False)
for saltenv in SALTENVS:
self.assertTrue(
client.cache_file('salt://foo.txt',
saltenv,
cachedir=alt_cachedir)
)
cache_loc = os.path.join(alt_cachedir,
'files',
saltenv,
'foo.txt')
# Double check that the content of the cached file identifies
# it as being from the correct saltenv. The setUp function
# creates the file with the name of the saltenv mentioned in
# the file, so a simple 'in' check is sufficient here. If
# opening the file raises an exception, this is a problem, so
# we are not catching the exception and letting it be raised so
# that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(saltenv in content)
def test_cache_file_with_alternate_cachedir_and_relative_path(self):
'''
Ensure file is cached to correct location when an alternate cachedir is
specified and that cachedir is a relative path
'''
patched_opts = dict((x, y) for x, y in six.iteritems(self.minion_opts))
patched_opts.update(MOCKED_OPTS)
alt_cachedir = 'foo'
with patch.dict(fileclient.__opts__, patched_opts):
client = fileclient.get_file_client(fileclient.__opts__, pillar=False)
for saltenv in SALTENVS:
self.assertTrue(
client.cache_file('salt://foo.txt',
saltenv,
cachedir=alt_cachedir)
)
cache_loc = os.path.join(fileclient.__opts__['cachedir'],
alt_cachedir,
'files',
saltenv,
'foo.txt')
# Double check that the content of the cached file identifies
# it as being from the correct saltenv. The setUp function
# creates the file with the name of the saltenv mentioned in
# the file, so a simple 'in' check is sufficient here. If
# opening the file raises an exception, this is a problem, so
# we are not catching the exception and letting it be raised so
# that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(saltenv in content)

View file

@ -1,140 +0,0 @@
# -*- coding: utf-8 -*-
'''
integration.loader.globals
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Test Salt's loader regarding globals that it should pack in
'''
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import inspect
# Import Salt Testing libs
from tests.support.case import ModuleCase
# Import salt libs
import salt.loader
import salt.utils.yaml
# Import 3rd-party libs
from salt.ext import six
class LoaderGlobalsTest(ModuleCase):
'''
Test all of the globals that the loader is responsible for adding to modules
This shouldn't be done here, but should rather be done per module type (in the cases where they are used)
so they can check ALL globals that they have (or should have) access to.
This is intended as a shorter term way of testing these so we don't break the loader
'''
def _verify_globals(self, mod_dict):
'''
Verify that the globals listed in the doc string (from the test) are in these modules
'''
# find the globals
global_vars = []
for val in six.itervalues(mod_dict):
# only find salty globals
if val.__module__.startswith('salt.loaded') and hasattr(val, '__globals__'):
global_vars.append(val.__globals__)
# if we couldn't find any, then we have no modules -- so something is broken
self.assertNotEqual(global_vars, [], msg='No modules were loaded.')
# get the names of the globals you should have
func_name = inspect.stack()[1][3]
names = next(six.itervalues(salt.utils.yaml.safe_load(getattr(self, func_name).__doc__)))
# Now, test each module!
for item in global_vars:
for name in names:
self.assertIn(name, list(item.keys()))
def test_auth(self):
'''
Test that auth mods have:
- __pillar__
- __grains__
- __salt__
- __context__
'''
self._verify_globals(salt.loader.auth(self.master_opts))
def test_runners(self):
'''
Test that runners have:
- __pillar__
- __salt__
- __opts__
- __grains__
- __context__
'''
self._verify_globals(salt.loader.runner(self.master_opts))
def test_returners(self):
'''
Test that returners have:
- __salt__
- __opts__
- __pillar__
- __grains__
- __context__
'''
self._verify_globals(salt.loader.returners(self.master_opts, {}))
def test_pillars(self):
'''
Test that pillars have:
- __salt__
- __opts__
- __pillar__
- __grains__
- __context__
'''
self._verify_globals(salt.loader.pillars(self.master_opts, {}))
def test_tops(self):
'''
Test that tops have: []
'''
self._verify_globals(salt.loader.tops(self.master_opts))
def test_outputters(self):
'''
Test that outputters have:
- __opts__
- __pillar__
- __grains__
- __context__
'''
self._verify_globals(salt.loader.outputters(self.master_opts))
def test_serializers(self):
'''
Test that serializers have: []
'''
self._verify_globals(salt.loader.serializers(self.master_opts))
def test_states(self):
'''
Test that states:
- __pillar__
- __salt__
- __opts__
- __grains__
- __context__
'''
self._verify_globals(salt.loader.states(self.master_opts, {}, {}, {}))
def test_renderers(self):
'''
Test that renderers have:
- __salt__ # Execution functions (i.e. __salt__['test.echo']('foo'))
- __grains__ # Grains (i.e. __grains__['os'])
- __pillar__ # Pillar data (i.e. __pillar__['foo'])
- __opts__ # Minion configuration options
- __context__ # Context dict shared amongst all modules of the same type
'''
self._verify_globals(salt.loader.render(self.master_opts, {}))

View file

@ -1,40 +0,0 @@
# -*- coding: utf-8 -*-
'''
integration.loader.interfaces
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Test Salt's loader
'''
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Testing libs
from tests.support.unit import TestCase
# Import Salt libs
from salt.ext import six
from salt.config import minion_config
import salt.loader
# TODO: the rest of the public interfaces
class RawModTest(TestCase):
'''
Test the interface of raw_mod
'''
def setUp(self):
self.opts = minion_config(None)
def tearDown(self):
del self.opts
def test_basic(self):
testmod = salt.loader.raw_mod(self.opts, 'test', None)
for k, v in six.iteritems(testmod):
self.assertEqual(k.split('.')[0], 'test')
def test_bad_name(self):
testmod = salt.loader.raw_mod(self.opts, 'module_we_do_not_have', None)
self.assertEqual(testmod, {})

View file

@ -71,7 +71,7 @@ class InspectorCollectorTestCase(TestCase):
inspector = Inspector(cachedir=os.sep + 'test',
piddir=os.sep + 'test',
pidfilename='bar.pid')
tree_root = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'inspectlib', 'tree_test')
tree_root = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tree_test')
expected_tree = ([os.sep + os.sep.join(['a', 'a', 'dummy.a']),
os.sep + os.sep.join(['a', 'b', 'dummy.b']),
os.sep + os.sep.join(['b', 'b.1']),

View file

@ -3,7 +3,7 @@
:synopsis: Unit Tests for 'module.aptkernelpkg'
:platform: Linux
:maturity: develop
versionadded:: 2018.3.0
.. versionadded:: 2018.3.0
'''
# pylint: disable=invalid-name,no-member
@ -18,7 +18,7 @@ try:
from tests.support.mock import MagicMock, patch, NO_MOCK, NO_MOCK_REASON
# Import Salt Libs
from tests.unit.modules.test_kernelpkg import KernelPkgTestCase
from tests.support.kernelpkg import KernelPkgTestCase
import salt.modules.kernelpkg_linux_apt as kernelpkg
from salt.exceptions import CommandExecutionError
HAS_MODULES = True

View file

@ -3,7 +3,7 @@
:synopsis: Unit Tests for 'module.yumkernelpkg'
:platform: Linux
:maturity: develop
versionadded:: 2018.3.0
.. versionadded:: 2018.3.0
'''
# pylint: disable=invalid-name,no-member
@ -17,7 +17,7 @@ try:
from tests.support.mock import MagicMock, patch, NO_MOCK, NO_MOCK_REASON
# Import Salt Libs
from tests.unit.modules.test_kernelpkg import KernelPkgTestCase
from tests.support.kernelpkg import KernelPkgTestCase
import salt.modules.kernelpkg_linux_yum as kernelpkg
import salt.modules.yumpkg as pkg
from salt.exceptions import CommandExecutionError

View file

@ -1,20 +0,0 @@
# coding: utf-8
# Import Python libs
from __future__ import absolute_import, unicode_literals, print_function
# Import Salt Testing libs
from tests.support.case import ModuleCase
# Import Salt libs
import salt.loader
class NetworkUtilsTestCase(ModuleCase):
def test_is_private(self):
__salt__ = salt.loader.raw_mod(self.minion_opts, 'network', None)
self.assertTrue(__salt__['network.is_private']('10.0.0.1'), True)
def test_is_loopback(self):
__salt__ = salt.loader.raw_mod(self.minion_opts, 'network', None)
self.assertTrue(__salt__['network.is_loopback']('127.0.0.1'), True)

View file

@ -1,27 +0,0 @@
# -*- coding: UTF-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from tests.support.unit import TestCase
from tests.support.unit import skipIf
from salt.beacons.network_settings import ATTRS
try:
from pyroute2 import IPDB
HAS_PYROUTE2 = True
except ImportError:
HAS_PYROUTE2 = False
@skipIf(not HAS_PYROUTE2, 'no pyroute2 installed, skipping')
class Pyroute2TestCase(TestCase):
def test_interface_dict_fields(self):
with IPDB() as ipdb:
for attr in ATTRS:
# ipdb.interfaces is a dict-like object, that
# contains interface definitions. Interfaces can
# be referenced both with indices and names.
#
# ipdb.interfaces[1] is an interface with index 1,
# that is the loopback interface.
self.assertIn(attr, ipdb.interfaces[1])

View file

@ -1 +0,0 @@
# -*- coding: utf-8 -*-

View file

@ -1,223 +0,0 @@
# coding: utf-8
# Import Python Libs
from __future__ import absolute_import
import os
# Import Salt Testing Libs
from tests.support.unit import skipIf
# Import 3rd-party libs
# pylint: disable=import-error
try:
import tornado.testing
import tornado.concurrent
from tornado.testing import AsyncTestCase
HAS_TORNADO = True
except ImportError:
HAS_TORNADO = False
# Let's create a fake AsyncHTTPTestCase so we can properly skip the test case
class AsyncTestCase(object):
pass
from salt.ext.six.moves import range # pylint: disable=redefined-builtin
# pylint: enable=import-error
try:
from salt.netapi.rest_tornado import saltnado
HAS_TORNADO = True
except ImportError:
HAS_TORNADO = False
# Import utility lib from tests
import salt.utils.event
from tests.unit.utils.test_event import eventpublisher_process, SOCK_DIR # pylint: disable=import-error
@skipIf(HAS_TORNADO is False, 'The tornado package needs to be installed')
class TestSaltnadoUtils(AsyncTestCase):
def test_any_future(self):
'''
Test that the Any Future does what we think it does
'''
# create a few futures
futures = []
for x in range(0, 3):
future = tornado.concurrent.Future()
future.add_done_callback(self.stop)
futures.append(future)
# create an any future, make sure it isn't immediately done
any_ = saltnado.Any(futures)
self.assertIs(any_.done(), False)
# finish one, lets see who finishes
futures[0].set_result('foo')
self.wait()
self.assertIs(any_.done(), True)
self.assertIs(futures[0].done(), True)
self.assertIs(futures[1].done(), False)
self.assertIs(futures[2].done(), False)
# make sure it returned the one that finished
self.assertEqual(any_.result(), futures[0])
futures = futures[1:]
# re-wait on some other futures
any_ = saltnado.Any(futures)
futures[0].set_result('foo')
self.wait()
self.assertIs(any_.done(), True)
self.assertIs(futures[0].done(), True)
self.assertIs(futures[1].done(), False)
@skipIf(HAS_TORNADO is False, 'The tornado package needs to be installed')
class TestEventListener(AsyncTestCase):
def setUp(self):
if not os.path.exists(SOCK_DIR):
os.makedirs(SOCK_DIR)
super(TestEventListener, self).setUp()
def test_simple(self):
'''
Test getting a few events
'''
with eventpublisher_process():
me = salt.utils.event.MasterEvent(SOCK_DIR)
event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save?
{'sock_dir': SOCK_DIR,
'transport': 'zeromq'})
self._finished = False # fit to event_listener's behavior
event_future = event_listener.get_event(self, 'evt1', callback=self.stop) # get an event future
me.fire_event({'data': 'foo2'}, 'evt2') # fire an event we don't want
me.fire_event({'data': 'foo1'}, 'evt1') # fire an event we do want
self.wait() # wait for the future
# check that we got the event we wanted
self.assertTrue(event_future.done())
self.assertEqual(event_future.result()['tag'], 'evt1')
self.assertEqual(event_future.result()['data']['data'], 'foo1')
def test_set_event_handler(self):
'''
Test subscribing events using set_event_handler
'''
with eventpublisher_process():
me = salt.utils.event.MasterEvent(SOCK_DIR)
event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save?
{'sock_dir': SOCK_DIR,
'transport': 'zeromq'})
self._finished = False # fit to event_listener's behavior
event_future = event_listener.get_event(self,
tag='evt',
callback=self.stop,
timeout=1,
) # get an event future
me.fire_event({'data': 'foo'}, 'evt') # fire an event we do want
self.wait()
# check that we subscribed the event we wanted
self.assertEqual(len(event_listener.timeout_map), 0)
def test_timeout(self):
'''
Make sure timeouts work correctly
'''
with eventpublisher_process():
event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save?
{'sock_dir': SOCK_DIR,
'transport': 'zeromq'})
self._finished = False # fit to event_listener's behavior
event_future = event_listener.get_event(self,
tag='evt1',
callback=self.stop,
timeout=1,
) # get an event future
self.wait()
self.assertTrue(event_future.done())
with self.assertRaises(saltnado.TimeoutException):
event_future.result()
def test_clean_by_request(self):
'''
Make sure the method clean_by_request clean up every related data in EventListener
request_future_1 : will be timeout-ed by clean_by_request(self)
request_future_2 : will be finished by me.fire_event ...
dummy_request_future_1 : will be finished by me.fire_event ...
dummy_request_future_2 : will be timeout-ed by clean-by_request(dummy_request)
'''
class DummyRequest(object):
'''
Dummy request object to simulate the request object
'''
@property
def _finished(self):
'''
Simulate _finished of the request object
'''
return False
# Inner functions never permit modifying primitive values directly
cnt = [0]
def stop():
'''
To realize the scenario of this test, define a custom stop method to call
self.stop after finished two events.
'''
cnt[0] += 1
if cnt[0] == 2:
self.stop()
with eventpublisher_process():
me = salt.utils.event.MasterEvent(SOCK_DIR)
event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save?
{'sock_dir': SOCK_DIR,
'transport': 'zeromq'})
self.assertEqual(0, len(event_listener.tag_map))
self.assertEqual(0, len(event_listener.request_map))
self._finished = False # fit to event_listener's behavior
dummy_request = DummyRequest()
request_future_1 = event_listener.get_event(self, tag='evt1')
request_future_2 = event_listener.get_event(self, tag='evt2', callback=lambda f: stop())
dummy_request_future_1 = event_listener.get_event(dummy_request, tag='evt3', callback=lambda f: stop())
dummy_request_future_2 = event_listener.get_event(dummy_request, timeout=10, tag='evt4')
self.assertEqual(4, len(event_listener.tag_map))
self.assertEqual(2, len(event_listener.request_map))
me.fire_event({'data': 'foo2'}, 'evt2')
me.fire_event({'data': 'foo3'}, 'evt3')
self.wait()
event_listener.clean_by_request(self)
me.fire_event({'data': 'foo1'}, 'evt1')
self.assertTrue(request_future_1.done())
with self.assertRaises(saltnado.TimeoutException):
request_future_1.result()
self.assertTrue(request_future_2.done())
self.assertEqual(request_future_2.result()['tag'], 'evt2')
self.assertEqual(request_future_2.result()['data']['data'], 'foo2')
self.assertTrue(dummy_request_future_1.done())
self.assertEqual(dummy_request_future_1.result()['tag'], 'evt3')
self.assertEqual(dummy_request_future_1.result()['data']['data'], 'foo3')
self.assertFalse(dummy_request_future_2.done())
self.assertEqual(2, len(event_listener.tag_map))
self.assertEqual(1, len(event_listener.request_map))
event_listener.clean_by_request(dummy_request)
with self.assertRaises(saltnado.TimeoutException):
dummy_request_future_2.result()
self.assertEqual(0, len(event_listener.tag_map))
self.assertEqual(0, len(event_listener.request_map))

View file

@ -12,12 +12,12 @@ from tests.support.unit import TestCase, skipIf
# Import Salt libs
import salt.auth
import salt.utils.event
import salt.utils.json
import salt.utils.yaml
from salt.ext.six.moves import map # pylint: disable=import-error
from salt.ext.six.moves import map, range # pylint: disable=import-error
from tests.unit.utils.test_event import eventpublisher_process, SOCK_DIR # pylint: disable=import-error
try:
import salt.netapi.rest_tornado as rest_tornado
from salt.netapi.rest_tornado import saltnado
HAS_TORNADO = True
except ImportError:
HAS_TORNADO = False
@ -28,14 +28,19 @@ try:
import tornado.escape
import tornado.testing
import tornado.concurrent
from tornado.testing import AsyncHTTPTestCase, gen_test
from tornado.testing import AsyncTestCase, AsyncHTTPTestCase, gen_test
from tornado.httpclient import HTTPRequest, HTTPError
from tornado.websocket import websocket_connect
import salt.netapi.rest_tornado as rest_tornado
from salt.netapi.rest_tornado import saltnado
HAS_TORNADO = True
except ImportError:
HAS_TORNADO = False
# Let's create a fake AsyncHTTPTestCase so we can properly skip the test case
# Create fake test case classes so we can properly skip the test case
class AsyncTestCase(object):
pass
class AsyncHTTPTestCase(object):
pass
@ -46,7 +51,7 @@ from salt.ext.six.moves.urllib.parse import urlencode, urlparse # pylint: disab
from tests.support.mock import NO_MOCK, NO_MOCK_REASON, MagicMock, patch
@skipIf(HAS_TORNADO is False, 'The tornado package needs to be installed') # pylint: disable=W0223
@skipIf(not HAS_TORNADO, 'The tornado package needs to be installed') # pylint: disable=W0223
class SaltnadoTestCase(TestCase, AdaptedConfigurationTestCaseMixin, AsyncHTTPTestCase):
'''
Mixin to hold some shared things
@ -648,7 +653,7 @@ class TestSaltRunHandler(SaltnadoTestCase):
self.assertEqual(valid_response, salt.utils.json.loads(response.body))
@skipIf(HAS_TORNADO is False, 'The tornado package needs to be installed') # pylint: disable=W0223
@skipIf(not HAS_TORNADO, 'The tornado package needs to be installed') # pylint: disable=W0223
class TestWebsocketSaltAPIHandler(SaltnadoTestCase):
def get_app(self):
@ -753,3 +758,191 @@ class TestWebsocketSaltAPIHandler(SaltnadoTestCase):
ws = yield websocket_connect(request)
ws.write_message('websocket client ready')
ws.close()
@skipIf(not HAS_TORNADO, 'The tornado package needs to be installed')
class TestSaltnadoUtils(AsyncTestCase):
def test_any_future(self):
'''
Test that the Any Future does what we think it does
'''
# create a few futures
futures = []
for x in range(0, 3):
future = tornado.concurrent.Future()
future.add_done_callback(self.stop)
futures.append(future)
# create an any future, make sure it isn't immediately done
any_ = saltnado.Any(futures)
self.assertIs(any_.done(), False)
# finish one, lets see who finishes
futures[0].set_result('foo')
self.wait()
self.assertIs(any_.done(), True)
self.assertIs(futures[0].done(), True)
self.assertIs(futures[1].done(), False)
self.assertIs(futures[2].done(), False)
# make sure it returned the one that finished
self.assertEqual(any_.result(), futures[0])
futures = futures[1:]
# re-wait on some other futures
any_ = saltnado.Any(futures)
futures[0].set_result('foo')
self.wait()
self.assertIs(any_.done(), True)
self.assertIs(futures[0].done(), True)
self.assertIs(futures[1].done(), False)
@skipIf(not HAS_TORNADO, 'The tornado package needs to be installed')
class TestEventListener(AsyncTestCase):
def setUp(self):
if not os.path.exists(SOCK_DIR):
os.makedirs(SOCK_DIR)
super(TestEventListener, self).setUp()
def test_simple(self):
'''
Test getting a few events
'''
with eventpublisher_process():
me = salt.utils.event.MasterEvent(SOCK_DIR)
event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save?
{'sock_dir': SOCK_DIR,
'transport': 'zeromq'})
self._finished = False # fit to event_listener's behavior
event_future = event_listener.get_event(self, 'evt1', callback=self.stop) # get an event future
me.fire_event({'data': 'foo2'}, 'evt2') # fire an event we don't want
me.fire_event({'data': 'foo1'}, 'evt1') # fire an event we do want
self.wait() # wait for the future
# check that we got the event we wanted
self.assertTrue(event_future.done())
self.assertEqual(event_future.result()['tag'], 'evt1')
self.assertEqual(event_future.result()['data']['data'], 'foo1')
def test_set_event_handler(self):
'''
Test subscribing events using set_event_handler
'''
with eventpublisher_process():
me = salt.utils.event.MasterEvent(SOCK_DIR)
event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save?
{'sock_dir': SOCK_DIR,
'transport': 'zeromq'})
self._finished = False # fit to event_listener's behavior
event_future = event_listener.get_event(self,
tag='evt',
callback=self.stop,
timeout=1,
) # get an event future
me.fire_event({'data': 'foo'}, 'evt') # fire an event we do want
self.wait()
# check that we subscribed the event we wanted
self.assertEqual(len(event_listener.timeout_map), 0)
def test_timeout(self):
'''
Make sure timeouts work correctly
'''
with eventpublisher_process():
event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save?
{'sock_dir': SOCK_DIR,
'transport': 'zeromq'})
self._finished = False # fit to event_listener's behavior
event_future = event_listener.get_event(self,
tag='evt1',
callback=self.stop,
timeout=1,
) # get an event future
self.wait()
self.assertTrue(event_future.done())
with self.assertRaises(saltnado.TimeoutException):
event_future.result()
def test_clean_by_request(self):
'''
Make sure the method clean_by_request clean up every related data in EventListener
request_future_1 : will be timeout-ed by clean_by_request(self)
request_future_2 : will be finished by me.fire_event ...
dummy_request_future_1 : will be finished by me.fire_event ...
dummy_request_future_2 : will be timeout-ed by clean-by_request(dummy_request)
'''
class DummyRequest(object):
'''
Dummy request object to simulate the request object
'''
@property
def _finished(self):
'''
Simulate _finished of the request object
'''
return False
# Inner functions never permit modifying primitive values directly
cnt = [0]
def stop():
'''
To realize the scenario of this test, define a custom stop method to call
self.stop after finished two events.
'''
cnt[0] += 1
if cnt[0] == 2:
self.stop()
with eventpublisher_process():
me = salt.utils.event.MasterEvent(SOCK_DIR)
event_listener = saltnado.EventListener({}, # we don't use mod_opts, don't save?
{'sock_dir': SOCK_DIR,
'transport': 'zeromq'})
self.assertEqual(0, len(event_listener.tag_map))
self.assertEqual(0, len(event_listener.request_map))
self._finished = False # fit to event_listener's behavior
dummy_request = DummyRequest()
request_future_1 = event_listener.get_event(self, tag='evt1')
request_future_2 = event_listener.get_event(self, tag='evt2', callback=lambda f: stop())
dummy_request_future_1 = event_listener.get_event(dummy_request, tag='evt3', callback=lambda f: stop())
dummy_request_future_2 = event_listener.get_event(dummy_request, timeout=10, tag='evt4')
self.assertEqual(4, len(event_listener.tag_map))
self.assertEqual(2, len(event_listener.request_map))
me.fire_event({'data': 'foo2'}, 'evt2')
me.fire_event({'data': 'foo3'}, 'evt3')
self.wait()
event_listener.clean_by_request(self)
me.fire_event({'data': 'foo1'}, 'evt1')
self.assertTrue(request_future_1.done())
with self.assertRaises(saltnado.TimeoutException):
request_future_1.result()
self.assertTrue(request_future_2.done())
self.assertEqual(request_future_2.result()['tag'], 'evt2')
self.assertEqual(request_future_2.result()['data']['data'], 'foo2')
self.assertTrue(dummy_request_future_1.done())
self.assertEqual(dummy_request_future_1.result()['tag'], 'evt3')
self.assertEqual(dummy_request_future_1.result()['data']['data'], 'foo3')
self.assertFalse(dummy_request_future_2.done())
self.assertEqual(2, len(event_listener.tag_map))
self.assertEqual(1, len(event_listener.request_map))
event_listener.clean_by_request(dummy_request)
with self.assertRaises(saltnado.TimeoutException):
dummy_request_future_2.result()
self.assertEqual(0, len(event_listener.tag_map))
self.assertEqual(0, len(event_listener.request_map))

Some files were not shown because too many files have changed in this diff Show more