Merge pull request #41989 from rallytime/merge-2017.7

[2017.7] Merge forward from 2016.11 to 2017.7
This commit is contained in:
Nicole Thomas 2017-06-28 17:12:00 -06:00 committed by GitHub
commit 8ed53b818e
14 changed files with 957 additions and 445 deletions

View file

@ -77,6 +77,7 @@ MOCK_MODULES = [
'yaml.nodes',
'yaml.parser',
'yaml.scanner',
'salt.utils.yamlloader',
'zmq',
'zmq.eventloop',
'zmq.eventloop.ioloop',
@ -125,6 +126,7 @@ MOCK_MODULES = [
'ClusterShell',
'ClusterShell.NodeSet',
'django',
'docker',
'libvirt',
'MySQLdb',
'MySQLdb.cursors',
@ -174,7 +176,7 @@ MOCK_MODULES = [
for mod_name in MOCK_MODULES:
if mod_name == 'psutil':
mock = Mock(mapping={'total': 0}) # Otherwise it will crash Sphinx
mock = Mock(mapping={'total': 0, 'version_info': (0, 6,0)}) # Otherwise it will crash Sphinx
else:
mock = Mock()
sys.modules[mod_name] = mock
@ -244,7 +246,7 @@ on_saltstack = 'SALT_ON_SALTSTACK' in os.environ
project = 'Salt'
version = salt.version.__version__
latest_release = '2016.11.5' # latest release
latest_release = '2016.11.6' # latest release
previous_release = '2016.3.6' # latest release from previous branch
previous_release_dir = '2016.3' # path on web server for previous branch
next_release = '' # next release

View file

@ -902,6 +902,8 @@ what you are doing! Transports are explained in :ref:`Salt Transports
transport: zeromq
.. conf_master:: transport_opts
``transport_opts``
------------------
@ -920,6 +922,27 @@ what you are doing! Transports are explained in :ref:`Salt Transports
ret_port: 4606
zeromq: []
.. conf_master:: sock_pool_size
``sock_pool_size``
------------------
Default: 1
To avoid blocking waiting while writing a data to a socket, we support
socket pool for Salt applications. For example, a job with a large number
of target host list can cause long period blocking waiting. The option
is used by ZMQ and TCP transports, and the other transport methods don't
need the socket pool by definition. Most of Salt tools, including CLI,
are enough to use a single bucket of socket pool. On the other hands,
it is highly recommended to set the size of socket pool larger than 1
for other Salt applications, especially Salt API, which must write data
to socket concurrently.
.. code-block:: yaml
sock_pool_size: 15
.. _salt-ssh-configuration:

File diff suppressed because it is too large Load diff

View file

@ -205,6 +205,9 @@ VALID_OPTS = {
# The directory containing unix sockets for things like the event bus
'sock_dir': str,
# The pool size of unix sockets, it is necessary to avoid blocking waiting for zeromq and tcp communications.
'sock_pool_size': int,
# Specifies how the file server should backup files, if enabled. The backups
# live in the cache dir.
'backup_mode': str,
@ -1088,6 +1091,7 @@ DEFAULT_MINION_OPTS = {
'grains_deep_merge': False,
'conf_file': os.path.join(salt.syspaths.CONFIG_DIR, 'minion'),
'sock_dir': os.path.join(salt.syspaths.SOCK_DIR, 'minion'),
'sock_pool_size': 1,
'backup_mode': '',
'renderer': 'yaml_jinja',
'renderer_whitelist': [],
@ -1332,6 +1336,7 @@ DEFAULT_MASTER_OPTS = {
'user': _MASTER_USER,
'worker_threads': 5,
'sock_dir': os.path.join(salt.syspaths.SOCK_DIR, 'master'),
'sock_pool_size': 1,
'ret_port': 4506,
'timeout': 5,
'keep_jobs': 24,
@ -2268,6 +2273,7 @@ def syndic_config(master_config_path,
'sock_dir': os.path.join(
opts['cachedir'], opts.get('syndic_sock_dir', opts['sock_dir'])
),
'sock_pool_size': master_opts['sock_pool_size'],
'cachedir': master_opts['cachedir'],
}
opts.update(syndic_opts)

View file

@ -1116,6 +1116,7 @@ _OS_NAME_MAP = {
'nilrt': 'NILinuxRT',
'nilrt-xfce': 'NILinuxRT-XFCE',
'manjaro': 'Manjaro',
'manjarolin': 'Manjaro',
'antergos': 'Antergos',
'sles': 'SUSE',
'void': 'Void',

View file

@ -376,7 +376,7 @@ def extracted(name,
.. versionadded:: 2016.11.0
source_hash_update
source_hash_update : False
Set this to ``True`` if archive should be extracted if source_hash has
changed. This would extract regardless of the ``if_missing`` parameter.
@ -871,10 +871,10 @@ def extracted(name,
if source_hash:
try:
source_sum = __salt__['file.get_source_sum'](
source=source_match,
source_hash=source_hash,
source_hash_name=source_hash_name,
saltenv=__env__)
source=source_match,
source_hash=source_hash,
source_hash_name=source_hash_name,
saltenv=__env__)
except CommandExecutionError as exc:
ret['comment'] = exc.strerror
return ret
@ -895,7 +895,7 @@ def extracted(name,
# Prevent a traceback from attempting to read from a directory path
salt.utils.rm_rf(cached_source)
existing_cached_source_sum = _read_cached_checksum(cached_source) \
existing_cached_source_sum = _read_cached_checksum(cached_source)
if source_is_local:
# No need to download archive, it's local to the minion
@ -962,15 +962,16 @@ def extracted(name,
)
return file_result
if source_hash:
_update_checksum(cached_source)
else:
log.debug(
'Archive %s is already in cache',
salt.utils.url.redact_http_basic_auth(source_match)
)
if source_hash and source_hash_update and not skip_verify:
# Create local hash sum file if we're going to track sum update
_update_checksum(cached_source)
if archive_format == 'zip' and not password:
log.debug('Checking %s to see if it is password-protected',
source_match)
@ -1174,6 +1175,15 @@ def extracted(name,
created_destdir = False
if extraction_needed:
if source_is_local and source_hash and not skip_verify:
ret['result'] = __salt__['file.check_hash'](source_match, source_sum['hsum'])
if not ret['result']:
ret['comment'] = \
'{0} does not match the desired source_hash {1}'.format(
source_match, source_sum['hsum']
)
return ret
if __opts__['test']:
ret['result'] = None
ret['comment'] = \

View file

@ -66,17 +66,17 @@ def installed(name,
'''
Verify that the correct versions of composer dependencies are present.
dir
Directory location of the composer.json file.
name
Directory location of the ``composer.json`` file.
composer
Location of the composer.phar file. If not set composer will
just execute "composer" as if it is installed globally.
(i.e. /path/to/composer.phar)
Location of the ``composer.phar`` file. If not set composer will
just execute ``composer`` as if it is installed globally.
(i.e. ``/path/to/composer.phar``)
php
Location of the php executable to use with composer.
(i.e. /usr/bin/php)
(i.e. ``/usr/bin/php``)
user
Which system user to run composer as.
@ -84,32 +84,32 @@ def installed(name,
.. versionadded:: 2014.1.4
prefer_source
--prefer-source option of composer.
``--prefer-source`` option of composer.
prefer_dist
--prefer-dist option of composer.
``--prefer-dist`` option of composer.
no_scripts
--no-scripts option of composer.
``--no-scripts`` option of composer.
no_plugins
--no-plugins option of composer.
``--no-plugins`` option of composer.
optimize
--optimize-autoloader option of composer. Recommended for production.
``--optimize-autoloader`` option of composer. Recommended for production.
no_dev
--no-dev option for composer. Recommended for production.
``--no-dev`` option for composer. Recommended for production.
quiet
--quiet option for composer. Whether or not to return output from composer.
``--quiet`` option for composer. Whether or not to return output from composer.
composer_home
$COMPOSER_HOME environment variable
``$COMPOSER_HOME`` environment variable
always_check
If True, _always_ run `composer install` in the directory. This is the
default behavior. If False, only run `composer install` if there is no
If ``True``, *always* run ``composer install`` in the directory. This is the
default behavior. If ``False``, only run ``composer install`` if there is no
vendor directory present.
'''
ret = {'name': name, 'result': None, 'comment': '', 'changes': {}}
@ -193,17 +193,17 @@ def update(name,
Composer update the directory to ensure we have the latest versions
of all project dependencies.
dir
Directory location of the composer.json file.
name
Directory location of the ``composer.json`` file.
composer
Location of the composer.phar file. If not set composer will
just execute "composer" as if it is installed globally.
Location of the ``composer.phar`` file. If not set composer will
just execute ``composer`` as if it is installed globally.
(i.e. /path/to/composer.phar)
php
Location of the php executable to use with composer.
(i.e. /usr/bin/php)
(i.e. ``/usr/bin/php``)
user
Which system user to run composer as.
@ -211,28 +211,28 @@ def update(name,
.. versionadded:: 2014.1.4
prefer_source
--prefer-source option of composer.
``--prefer-source`` option of composer.
prefer_dist
--prefer-dist option of composer.
``--prefer-dist`` option of composer.
no_scripts
--no-scripts option of composer.
``--no-scripts`` option of composer.
no_plugins
--no-plugins option of composer.
``--no-plugins`` option of composer.
optimize
--optimize-autoloader option of composer. Recommended for production.
``--optimize-autoloader`` option of composer. Recommended for production.
no_dev
--no-dev option for composer. Recommended for production.
``--no-dev`` option for composer. Recommended for production.
quiet
--quiet option for composer. Whether or not to return output from composer.
``--quiet`` option for composer. Whether or not to return output from composer.
composer_home
$COMPOSER_HOME environment variable
``$COMPOSER_HOME`` environment variable
'''
ret = {'name': name, 'result': None, 'comment': '', 'changes': {}}

View file

@ -3,9 +3,13 @@
Encapsulate the different transports available to Salt.
'''
from __future__ import absolute_import
import logging
# Import third party libs
import salt.ext.six as six
from salt.ext.six.moves import range
log = logging.getLogger(__name__)
def iter_transport_opts(opts):
@ -47,3 +51,19 @@ class Channel(object):
# salt.transport.channel.Channel.factory()
from salt.transport.client import ReqChannel
return ReqChannel.factory(opts, **kwargs)
class MessageClientPool(object):
def __init__(self, tgt, opts, args=None, kwargs=None):
sock_pool_size = opts['sock_pool_size'] if 'sock_pool_size' in opts else 1
if sock_pool_size < 1:
log.warn('sock_pool_size is not correctly set, \
the option should be greater than 0 but, {0}'.format(sock_pool_size))
sock_pool_size = 1
if args is None:
args = ()
if kwargs is None:
kwargs = {}
self.message_clients = [tgt(*args, **kwargs) for _ in range(sock_pool_size)]

View file

@ -267,9 +267,9 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
host, port = parse.netloc.rsplit(':', 1)
self.master_addr = (host, int(port))
self._closing = False
self.message_client = SaltMessageClient(
self.opts, host, int(port), io_loop=self.io_loop,
resolver=resolver)
self.message_client = SaltMessageClientPool(self.opts,
args=(self.opts, host, int(port),),
kwargs={'io_loop': self.io_loop, 'resolver': resolver})
def close(self):
if self._closing:
@ -404,7 +404,7 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
def _do_transfer():
msg = self._package_load(self.auth.crypticle.dumps(load))
package = salt.transport.frame.frame_msg(msg, header=None)
yield self.message_client._stream.write(package)
yield self.message_client.write_to_stream(package)
raise tornado.gen.Return(True)
if force_auth or not self.auth.authenticated:
@ -494,13 +494,12 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
if not self.auth.authenticated:
yield self.auth.authenticate()
if self.auth.authenticated:
self.message_client = SaltMessageClient(
self.message_client = SaltMessageClientPool(
self.opts,
self.opts['master_ip'],
int(self.auth.creds['publish_port']),
io_loop=self.io_loop,
connect_callback=self.connect_callback,
disconnect_callback=self.disconnect_callback)
args=(self.opts, self.opts['master_ip'], int(self.auth.creds['publish_port']),),
kwargs={'io_loop': self.io_loop,
'connect_callback': self.connect_callback,
'disconnect_callback': self.disconnect_callback})
yield self.message_client.connect() # wait for the client to be connected
self.connected = True
# TODO: better exception handling...
@ -776,6 +775,43 @@ class TCPClientKeepAlive(tornado.tcpclient.TCPClient):
return stream.connect(addr)
class SaltMessageClientPool(salt.transport.MessageClientPool):
'''
Wrapper class of SaltMessageClient to avoid blocking waiting while writing data to socket.
'''
def __init__(self, opts, args=None, kwargs=None):
super(SaltMessageClientPool, self).__init__(SaltMessageClient, opts, args=args, kwargs=kwargs)
def __del__(self):
self.close()
def close(self):
for message_client in self.message_clients:
message_client.close()
self.message_clients = []
@tornado.gen.coroutine
def connect(self):
futures = []
for message_client in self.message_clients:
futures.append(message_client.connect())
for future in futures:
yield future
raise tornado.gen.Return(None)
def on_recv(self, *args, **kwargs):
for message_client in self.message_clients:
message_client.on_recv(*args, **kwargs)
def send(self, *args, **kwargs):
message_clients = sorted(self.message_clients, key=lambda x: len(x.send_queue))
return message_clients[0].send(*args, **kwargs)
def write_to_stream(self, *args, **kwargs):
message_clients = sorted(self.message_clients, key=lambda x: len(x.send_queue))
return message_clients[0]._stream.write(*args, **kwargs)
# TODO consolidate with IPCClient
# TODO: limit in-flight messages.
# TODO: singleton? Something to not re-create the tcp connection so much

View file

@ -118,8 +118,9 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
# copied. The reason is the same as the io_loop skip above.
setattr(result, key,
AsyncReqMessageClientPool(result.opts,
self.master_uri,
io_loop=result._io_loop))
args=(result.opts, self.master_uri,),
kwargs={'io_loop': self._io_loop}))
continue
setattr(result, key, copy.deepcopy(self.__dict__[key], memo))
return result
@ -156,9 +157,8 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
# we don't need to worry about auth as a kwarg, since its a singleton
self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self._io_loop)
self.message_client = AsyncReqMessageClientPool(self.opts,
self.master_uri,
io_loop=self._io_loop,
)
args=(self.opts, self.master_uri,),
kwargs={'io_loop': self._io_loop})
def __del__(self):
'''
@ -847,32 +847,24 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
context.term()
# TODO: unit tests!
class AsyncReqMessageClientPool(object):
def __init__(self, opts, addr, linger=0, io_loop=None, socket_pool=1):
self.opts = opts
self.addr = addr
self.linger = linger
self.io_loop = io_loop
self.socket_pool = socket_pool
self.message_clients = []
class AsyncReqMessageClientPool(salt.transport.MessageClientPool):
'''
Wrapper class of AsyncReqMessageClientPool to avoid blocking waiting while writing data to socket.
'''
def __init__(self, opts, args=None, kwargs=None):
super(AsyncReqMessageClientPool, self).__init__(AsyncReqMessageClient, opts, args=args, kwargs=kwargs)
def __del__(self):
self.destroy()
def destroy(self):
for message_client in self.message_clients:
message_client.destroy()
self.message_clients = []
def __del__(self):
self.destroy()
def send(self, message, timeout=None, tries=3, future=None, callback=None, raw=False):
if len(self.message_clients) < self.socket_pool:
message_client = AsyncReqMessageClient(self.opts, self.addr, self.linger, self.io_loop)
self.message_clients.append(message_client)
return message_client.send(message, timeout, tries, future, callback, raw)
else:
available_clients = sorted(self.message_clients, key=lambda x: len(x.send_queue))
return available_clients[0].send(message, timeout, tries, future, callback, raw)
def send(self, *args, **kwargs):
message_clients = sorted(self.message_clients, key=lambda x: len(x.send_queue))
return message_clients[0].send(*args, **kwargs)
# TODO: unit tests!

View file

@ -12,6 +12,7 @@ import os
from tests.support.case import ModuleCase
from tests.support.helpers import skip_if_not_root, Webserver
from tests.support.mixins import SaltReturnAssertsMixin
from tests.support.paths import FILES
# Import salt libs
import salt.utils
@ -24,8 +25,12 @@ if salt.utils.is_windows():
else:
ARCHIVE_DIR = '/tmp/archive'
ARCHIVE_NAME = 'custom.tar.gz'
ARCHIVE_TAR_SOURCE = 'http://localhost:{0}/{1}'.format(9999, ARCHIVE_NAME)
ARCHIVE_LOCAL_TAR_SOURCE = 'file://{0}'.format(os.path.join(FILES, 'file', 'base', ARCHIVE_NAME))
UNTAR_FILE = os.path.join(ARCHIVE_DIR, 'custom/README')
ARCHIVE_TAR_HASH = 'md5=7643861ac07c30fe7d2310e9f25ca514'
ARCHIVE_TAR_BAD_HASH = 'md5=d41d8cd98f00b204e9800998ecf8427e'
class ArchiveTest(ModuleCase, SaltReturnAssertsMixin):
@ -178,3 +183,52 @@ class ArchiveTest(ModuleCase, SaltReturnAssertsMixin):
self.assertSaltTrueReturn(ret)
self._check_extracted(UNTAR_FILE)
def test_local_archive_extracted(self):
'''
test archive.extracted with local file
'''
ret = self.run_state('archive.extracted', name=ARCHIVE_DIR,
source=ARCHIVE_LOCAL_TAR_SOURCE, archive_format='tar')
log.debug('ret = %s', ret)
self.assertSaltTrueReturn(ret)
self._check_extracted(UNTAR_FILE)
def test_local_archive_extracted_skip_verify(self):
'''
test archive.extracted with local file, bad hash and skip_verify
'''
ret = self.run_state('archive.extracted', name=ARCHIVE_DIR,
source=ARCHIVE_LOCAL_TAR_SOURCE, archive_format='tar',
source_hash=ARCHIVE_TAR_BAD_HASH, skip_verify=True)
log.debug('ret = %s', ret)
self.assertSaltTrueReturn(ret)
self._check_extracted(UNTAR_FILE)
def test_local_archive_extracted_with_source_hash(self):
'''
test archive.extracted with local file and valid hash
'''
ret = self.run_state('archive.extracted', name=ARCHIVE_DIR,
source=ARCHIVE_LOCAL_TAR_SOURCE, archive_format='tar',
source_hash=ARCHIVE_TAR_HASH)
log.debug('ret = %s', ret)
self.assertSaltTrueReturn(ret)
self._check_extracted(UNTAR_FILE)
def test_local_archive_extracted_with_bad_source_hash(self):
'''
test archive.extracted with local file and bad hash
'''
ret = self.run_state('archive.extracted', name=ARCHIVE_DIR,
source=ARCHIVE_LOCAL_TAR_SOURCE, archive_format='tar',
source_hash=ARCHIVE_TAR_BAD_HASH)
log.debug('ret = %s', ret)
self.assertSaltFalseReturn(ret)

View file

@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
# Import python libs
from __future__ import absolute_import
import logging
from salt.transport import MessageClientPool
# Import Salt Testing libs
from tests.support.unit import TestCase
log = logging.getLogger(__name__)
class MessageClientPoolTest(TestCase):
class MockClass(object):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def test_init(self):
opts = {'sock_pool_size': 10}
args = (0,)
kwargs = {'kwarg': 1}
message_client_pool = MessageClientPool(self.MockClass, opts, args=args, kwargs=kwargs)
self.assertEqual(opts['sock_pool_size'], len(message_client_pool.message_clients))
for message_client in message_client_pool.message_clients:
self.assertEqual(message_client.args, args)
self.assertEqual(message_client.kwargs, kwargs)
def test_init_without_config(self):
opts = {}
args = (0,)
kwargs = {'kwarg': 1}
message_client_pool = MessageClientPool(self.MockClass, opts, args=args, kwargs=kwargs)
# The size of pool is set as 1 by the MessageClientPool init method.
self.assertEqual(1, len(message_client_pool.message_clients))
for message_client in message_client_pool.message_clients:
self.assertEqual(message_client.args, args)
self.assertEqual(message_client.kwargs, kwargs)
def test_init_less_than_one(self):
opts = {'sock_pool_size': -1}
args = (0,)
kwargs = {'kwarg': 1}
message_client_pool = MessageClientPool(self.MockClass, opts, args=args, kwargs=kwargs)
# The size of pool is set as 1 by the MessageClientPool init method.
self.assertEqual(1, len(message_client_pool.message_clients))
for message_client in message_client_pool.message_clients:
self.assertEqual(message_client.args, args)
self.assertEqual(message_client.kwargs, kwargs)

View file

@ -9,7 +9,8 @@ import threading
import tornado.gen
import tornado.ioloop
from tornado.testing import AsyncTestCase
import tornado.concurrent
from tornado.testing import AsyncTestCase, gen_test
import salt.config
import salt.ext.six as six
@ -17,11 +18,14 @@ import salt.utils
import salt.transport.server
import salt.transport.client
import salt.exceptions
from salt.ext.six.moves import range
from salt.transport.tcp import SaltMessageClientPool
# Import Salt Testing libs
from tests.support.unit import TestCase, skipIf
from tests.support.helpers import get_unused_localhost_port, flaky
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
from tests.support.mock import MagicMock, patch
from tests.unit.transport.mixins import PubChannelMixin, ReqChannelMixin
@ -234,3 +238,74 @@ class AsyncPubChannelTest(BaseTCPPubCase, PubChannelMixin):
'''
Tests around the publish system
'''
class SaltMessageClientPoolTest(AsyncTestCase):
def setUp(self):
super(SaltMessageClientPoolTest, self).setUp()
sock_pool_size = 5
with patch('salt.transport.tcp.SaltMessageClient.__init__', MagicMock(return_value=None)):
self.message_client_pool = SaltMessageClientPool({'sock_pool_size': sock_pool_size},
args=({}, '', 0))
self.original_message_clients = self.message_client_pool.message_clients
self.message_client_pool.message_clients = [MagicMock() for _ in range(sock_pool_size)]
def tearDown(self):
with patch('salt.transport.tcp.SaltMessageClient.close', MagicMock(return_value=None)):
del self.original_message_clients
super(SaltMessageClientPoolTest, self).tearDown()
def test_send(self):
for message_client_mock in self.message_client_pool.message_clients:
message_client_mock.send_queue = [0, 0, 0]
message_client_mock.send.return_value = []
self.assertEqual([], self.message_client_pool.send())
self.message_client_pool.message_clients[2].send_queue = [0]
self.message_client_pool.message_clients[2].send.return_value = [1]
self.assertEqual([1], self.message_client_pool.send())
def test_write_to_stream(self):
for message_client_mock in self.message_client_pool.message_clients:
message_client_mock.send_queue = [0, 0, 0]
message_client_mock._stream.write.return_value = []
self.assertEqual([], self.message_client_pool.write_to_stream(''))
self.message_client_pool.message_clients[2].send_queue = [0]
self.message_client_pool.message_clients[2]._stream.write.return_value = [1]
self.assertEqual([1], self.message_client_pool.write_to_stream(''))
def test_close(self):
self.message_client_pool.close()
self.assertEqual([], self.message_client_pool.message_clients)
def test_on_recv(self):
for message_client_mock in self.message_client_pool.message_clients:
message_client_mock.on_recv.return_value = None
self.message_client_pool.on_recv()
for message_client_mock in self.message_client_pool.message_clients:
self.assertTrue(message_client_mock.on_recv.called)
def test_connect_all(self):
@gen_test
def test_connect(self):
yield self.message_client_pool.connect()
for message_client_mock in self.message_client_pool.message_clients:
future = tornado.concurrent.Future()
future.set_result('foo')
message_client_mock.connect.return_value = future
self.assertIsNone(test_connect(self))
def test_connect_partial(self):
@gen_test(timeout=0.1)
def test_connect(self):
yield self.message_client_pool.connect()
for idx, message_client_mock in enumerate(self.message_client_pool.message_clients):
future = tornado.concurrent.Future()
if idx % 2 == 0:
future.set_result('foo')
message_client_mock.connect.return_value = future
with self.assertRaises(tornado.ioloop.TimeoutError):
test_connect(self)

View file

@ -30,12 +30,15 @@ import salt.utils
import salt.transport.server
import salt.transport.client
import salt.exceptions
from salt.ext.six.moves import range
from salt.transport.zeromq import AsyncReqMessageClientPool
# Import test support libs
from tests.support.paths import TMP_CONF_DIR
from tests.support.unit import TestCase, skipIf
from tests.support.helpers import flaky, get_unused_localhost_port
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
from tests.support.mock import MagicMock, patch
from tests.unit.transport.mixins import PubChannelMixin, ReqChannelMixin
ON_SUSE = False
@ -271,3 +274,34 @@ class AsyncPubChannelTest(BaseZMQPubCase, PubChannelMixin):
'''
def get_new_ioloop(self):
return zmq.eventloop.ioloop.ZMQIOLoop()
class AsyncReqMessageClientPoolTest(TestCase):
def setUp(self):
super(AsyncReqMessageClientPoolTest, self).setUp()
sock_pool_size = 5
with patch('salt.transport.zeromq.AsyncReqMessageClient.__init__', MagicMock(return_value=None)):
self.message_client_pool = AsyncReqMessageClientPool({'sock_pool_size': sock_pool_size},
args=({}, ''))
self.original_message_clients = self.message_client_pool.message_clients
self.message_client_pool.message_clients = [MagicMock() for _ in range(sock_pool_size)]
def tearDown(self):
with patch('salt.transport.zeromq.AsyncReqMessageClient.destroy', MagicMock(return_value=None)):
del self.original_message_clients
super(AsyncReqMessageClientPoolTest, self).tearDown()
def test_send(self):
for message_client_mock in self.message_client_pool.message_clients:
message_client_mock.send_queue = [0, 0, 0]
message_client_mock.send.return_value = []
self.assertEqual([], self.message_client_pool.send())
self.message_client_pool.message_clients[2].send_queue = [0]
self.message_client_pool.message_clients[2].send.return_value = [1]
self.assertEqual([1], self.message_client_pool.send())
def test_destroy(self):
self.message_client_pool.destroy()
self.assertEqual([], self.message_client_pool.message_clients)