Merge pull request #47374 from DSRCorporation/bugs/tornado50-2018.3

tornado50 merge forward for 2018.3
This commit is contained in:
Daniel Wallace 2018-04-29 11:29:11 -05:00 committed by GitHub
commit 3400f829c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 76 additions and 91 deletions

View file

@ -1,4 +1,4 @@
-r base.txt
-r base-py2.txt
mock>=2.0.0
apache-libcloud>=0.14.0
@ -6,7 +6,7 @@ boto>=2.32.1
boto3>=1.2.1
moto>=0.3.6
SaltPyLint>=v2017.3.6
pytest
pytest>=3.5.0
git+https://github.com/eisensheng/pytest-catchlog.git@develop#egg=Pytest-catchlog
git+https://github.com/saltstack/pytest-salt.git@master#egg=pytest-salt
testinfra>=1.7.0

View file

@ -1,4 +1,4 @@
-r base.txt
-r base-py3.txt
mock>=2.0.0
apache-libcloud>=0.14.0
@ -11,7 +11,7 @@ moto>=0.3.6
# prevent it from being successfully installed (at least on Python 3.4).
httpretty
SaltPyLint>=v2017.2.29
pytest
pytest>=3.5.0
git+https://github.com/saltstack/pytest-salt.git@master#egg=pytest-salt
git+https://github.com/eisensheng/pytest-catchlog.git@develop#egg=Pytest-catchlog
testinfra>=1.7.0

View file

@ -1,3 +1,3 @@
pytest
pytest>=3.5.0
pytest-helpers-namespace
pytest-tempdir

View file

@ -97,14 +97,15 @@ class IRCClient(object):
self.allow_nicks = allow_nicks
self.disable_query = disable_query
self.io_loop = tornado.ioloop.IOLoop(make_current=False)
self.io_loop.make_current()
self._connect()
def _connect(self):
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
if self.ssl is True:
self._stream = tornado.iostream.SSLIOStream(_sock, ssl_options={'cert_reqs': ssl.CERT_NONE}, io_loop=self.io_loop)
self._stream = tornado.iostream.SSLIOStream(_sock, ssl_options={'cert_reqs': ssl.CERT_NONE})
else:
self._stream = tornado.iostream.IOStream(_sock, io_loop=self.io_loop)
self._stream = tornado.iostream.IOStream(_sock)
self._stream.set_close_callback(self.on_closed)
self._stream.connect((self.host, self.port), self.on_connect)

View file

@ -81,6 +81,7 @@ def start(address=None, port=5000, ssl_crt=None, ssl_key=None):
if all([ssl_crt, ssl_key]):
ssl_options = {"certfile": ssl_crt, "keyfile": ssl_key}
io_loop = tornado.ioloop.IOLoop(make_current=False)
http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options, io_loop=io_loop)
io_loop.make_current()
http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options)
http_server.listen(port, address=address)
io_loop.start()

View file

@ -2495,13 +2495,15 @@ class Minion(MinionBase):
if beacons and self.connected:
self._fire_master(events=beacons)
new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(handle_beacons, loop_interval * 1000, io_loop=self.io_loop)
new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(
handle_beacons, loop_interval * 1000)
if before_connect:
# Make sure there is a chance for one iteration to occur before connect
handle_beacons()
if 'cleanup' not in self.periodic_callbacks:
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(
self._fallback_cleanups, loop_interval * 1000)
# start all the other callbacks
for periodic_cb in six.itervalues(new_periodic_callbacks):
@ -2554,14 +2556,15 @@ class Minion(MinionBase):
# TODO: actually listen to the return and change period
def handle_schedule():
self.process_schedule(self, loop_interval)
new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000, io_loop=self.io_loop)
new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000)
if before_connect:
# Make sure there is a chance for one iteration to occur before connect
handle_schedule()
if 'cleanup' not in self.periodic_callbacks:
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop)
new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(
self._fallback_cleanups, loop_interval * 1000)
# start all the other callbacks
for periodic_cb in six.itervalues(new_periodic_callbacks):
@ -2618,7 +2621,7 @@ class Minion(MinionBase):
self._fire_master('ping', 'minion_ping', sync=False, timeout_handler=ping_timeout_handler)
except Exception:
log.warning('Attempt to ping master failed.', exc_on_loglevel=logging.DEBUG)
self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000, io_loop=self.io_loop)
self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000)
self.periodic_callbacks['ping'].start()
# add handler to subscriber
@ -3087,7 +3090,7 @@ class SyndicManager(MinionBase):
# forward events every syndic_event_forward_timeout
self.forward_events = tornado.ioloop.PeriodicCallback(self._forward_events,
self.opts['syndic_event_forward_timeout'] * 1000,
io_loop=self.io_loop)
)
self.forward_events.start()
# Make sure to gracefully handle SIGUSR1

View file

@ -128,6 +128,6 @@ def start():
raise SystemExit(1)
try:
tornado.ioloop.IOLoop.instance().start()
tornado.ioloop.IOLoop.current().start()
except KeyboardInterrupt:
raise SystemExit(0)

View file

@ -202,14 +202,12 @@ import tornado.ioloop
import tornado.web
import tornado.gen
from tornado.concurrent import Future
from zmq.eventloop import ioloop
from salt.ext import six
# pylint: enable=import-error
# instantiate the zmq IOLoop (specialized poller)
ioloop.install()
import salt.utils
salt.utils.zeromq.install_zmq()
# salt imports
import salt.ext.six as six
import salt.netapi
import salt.utils.args
import salt.utils.event

View file

@ -130,11 +130,11 @@ class IPCServer(object):
else:
self.sock = tornado.netutil.bind_unix_socket(self.socket_path)
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
)
self._started = True
@tornado.gen.coroutine
@ -196,10 +196,10 @@ class IPCServer(object):
log.trace('IPCServer: Handling connection '
'to address: %s', address)
try:
stream = IOStream(
connection,
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
stream = IOStream(
connection,
)
self.io_loop.spawn_callback(self.handle_stream, stream)
except Exception as exc:
log.error('IPC streaming error: %s', exc)
@ -329,10 +329,10 @@ class IPCClient(object):
break
if self.stream is None:
self.stream = IOStream(
socket.socket(sock_type, socket.SOCK_STREAM),
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
self.stream = IOStream(
socket.socket(sock_type, socket.SOCK_STREAM),
)
try:
log.trace('IPCClient: Connecting to socket: %s', self.socket_path)
@ -510,11 +510,11 @@ class IPCMessagePublisher(object):
else:
self.sock = tornado.netutil.bind_unix_socket(self.socket_path)
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
)
self._started = True
@tornado.gen.coroutine
@ -545,17 +545,14 @@ class IPCMessagePublisher(object):
def handle_connection(self, connection, address):
log.trace('IPCServer: Handling connection to address: %s', address)
try:
kwargs = {}
if self.opts['ipc_write_buffer'] > 0:
kwargs['max_write_buffer_size'] = self.opts['ipc_write_buffer']
log.trace('Setting IPC connection write buffer: %s', (self.opts['ipc_write_buffer']))
with salt.utils.async.current_ioloop(self.io_loop):
stream = IOStream(
connection,
io_loop=self.io_loop,
max_write_buffer_size=self.opts['ipc_write_buffer']
)
else:
stream = IOStream(
connection,
io_loop=self.io_loop
**kwargs
)
self.streams.add(stream)

View file

@ -775,10 +775,9 @@ class TCPClientKeepAlive(tornado.tcpclient.TCPClient):
'''
Override _create_stream() in TCPClient to enable keep alive support.
'''
def __init__(self, opts, resolver=None, io_loop=None):
def __init__(self, opts, resolver=None):
self.opts = opts
super(TCPClientKeepAlive, self).__init__(
resolver=resolver, io_loop=io_loop)
super(TCPClientKeepAlive, self).__init__(resolver=resolver)
def _create_stream(self, max_buffer_size, af, addr, **kwargs): # pylint: disable=unused-argument
'''
@ -794,7 +793,6 @@ class TCPClientKeepAlive(tornado.tcpclient.TCPClient):
_set_tcp_keepalive(sock, self.opts)
stream = tornado.iostream.IOStream(
sock,
io_loop=self.io_loop,
max_buffer_size=max_buffer_size)
return stream.connect(addr)
@ -856,8 +854,8 @@ class SaltMessageClient(object):
self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
self._tcp_client = TCPClientKeepAlive(
opts, io_loop=self.io_loop, resolver=resolver)
with salt.utils.async.current_ioloop(self.io_loop):
self._tcp_client = TCPClientKeepAlive(opts, resolver=resolver)
self._mid = 1
self._max_messages = int((1 << 31) - 2) # number of IDs before we wrap
@ -946,18 +944,17 @@ class SaltMessageClient(object):
if self._closing:
break
try:
if (self.source_ip or self.source_port) and tornado.version_info >= (4, 5):
### source_ip and source_port are supported only in Tornado >= 4.5
# See http://www.tornadoweb.org/en/stable/releases/v4.5.0.html
# Otherwise will just ignore these args
self._stream = yield self._tcp_client.connect(self.host,
self.port,
ssl_options=self.opts.get('ssl'),
source_ip=self.source_ip,
source_port=self.source_port)
else:
if self.source_ip or self.source_port:
kwargs = {}
if self.source_ip or self.source_port:
if tornado.version_info >= (4, 5):
### source_ip and source_port are supported only in Tornado >= 4.5
# See http://www.tornadoweb.org/en/stable/releases/v4.5.0.html
# Otherwise will just ignore these args
kwargs = {'source_ip': self.source_ip,
'source_port': self.source_port}
else:
log.warning('If you need a certain source IP/port, consider upgrading Tornado >= 4.5')
with salt.utils.async.current_ioloop(self.io_loop):
self._stream = yield self._tcp_client.connect(self.host,
self.port,
ssl_options=self.opts.get('ssl'))
@ -1163,7 +1160,8 @@ class PubServer(tornado.tcpserver.TCPServer, object):
TCP publisher
'''
def __init__(self, opts, io_loop=None):
super(PubServer, self).__init__(io_loop=io_loop, ssl_options=opts.get('ssl'))
super(PubServer, self).__init__(ssl_options=opts.get('ssl'))
self.io_loop = io_loop
self.opts = opts
self._closing = False
self.clients = set()

View file

@ -55,13 +55,17 @@ import salt.log.setup
from salt.utils.odict import OrderedDict
# Define the pytest plugins we rely on
pytest_plugins = ['pytest_catchlog', 'tempdir', 'helpers_namespace'] # pylint: disable=invalid-name
pytest_plugins = ['tempdir', 'helpers_namespace'] # pylint: disable=invalid-name
# Define where not to collect tests from
collect_ignore = ['setup.py']
log = logging.getLogger('salt.testsuite')
# Reset logging root handlers
for handler in logging.root.handlers:
logging.root.removeHandler(handler)
def pytest_tempdir_basename():
'''
@ -197,25 +201,6 @@ def pytest_configure(config):
called after command line options have been parsed
and all plugins and initial conftest files been loaded.
'''
# Configure the console logger based on the catch_log settings.
# Most importantly, shutdown Salt's null, store and temporary logging queue handlers
catch_log = config.pluginmanager.getplugin('_catch_log')
cli_logging_handler = catch_log.log_cli_handler
# Add the pytest_catchlog CLI log handler to the logging root
logging.root.addHandler(cli_logging_handler)
cli_level = cli_logging_handler.level
cli_level = config._catchlog_log_cli_level
cli_format = cli_logging_handler.formatter._fmt
cli_date_format = cli_logging_handler.formatter.datefmt
# Setup the console logger which shuts down the null and the temporary queue handlers
salt.log.setup_console_logger(
log_level=salt.log.setup.LOG_VALUES_TO_LEVELS.get(cli_level, 'error'),
log_format=cli_format,
date_format=cli_date_format
)
# Disable the store logging queue handler
salt.log.setup.setup_extended_logging({'extension_modules': ''})
config.addinivalue_line('norecursedirs', os.path.join(CODE_DIR, 'templates'))
config.addinivalue_line(
'markers',

View file

@ -21,6 +21,7 @@ import logging
# Import salt libs
import salt.utils.event
import salt.utils.async
# Import 3rd-party libs
from tornado import gen
@ -69,11 +70,11 @@ class PyTestEngine(object):
self.sock.bind(('localhost', port))
# become a server socket
self.sock.listen(5)
netutil.add_accept_handler(
self.sock,
self.handle_connection,
io_loop=self.io_loop,
)
with salt.utils.async.current_ioloop(self.io_loop):
netutil.add_accept_handler(
self.sock,
self.handle_connection,
)
def handle_connection(self, connection, address):
log.warning('Accepted connection from %s. Role: %s', address, self.opts['__role'])

View file

@ -11,6 +11,7 @@ import os
# Import Salt Testing libs
from tests.support.unit import TestCase, skipIf
from tests.support.mock import NO_MOCK, NO_MOCK_REASON, patch, MagicMock
from tests.support.mixins import AdaptedConfigurationTestCaseMixin
from tests.support.helpers import skip_if_not_root
# Import salt libs
import salt.minion
@ -24,7 +25,7 @@ __opts__ = {}
@skipIf(NO_MOCK, NO_MOCK_REASON)
class MinionTestCase(TestCase):
class MinionTestCase(TestCase, AdaptedConfigurationTestCaseMixin):
def test_invalid_master_address(self):
with patch.dict(__opts__, {'ipv6': False, 'master': float('127.0'), 'master_port': '4555', 'retry_dns': False}):
self.assertRaises(SaltSystemExit, salt.minion.resolve_dns, __opts__)
@ -263,7 +264,7 @@ class MinionTestCase(TestCase):
patch('salt.minion.Minion.sync_connect_master', MagicMock(side_effect=RuntimeError('stop execution'))), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)):
mock_opts = copy.copy(salt.config.DEFAULT_MINION_OPTS)
mock_opts = self.get_config('minion', from_scratch=True)
mock_opts['beacons_before_connect'] = True
minion = salt.minion.Minion(mock_opts, io_loop=tornado.ioloop.IOLoop())
try:
@ -287,7 +288,7 @@ class MinionTestCase(TestCase):
patch('salt.minion.Minion.sync_connect_master', MagicMock(side_effect=RuntimeError('stop execution'))), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \
patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)):
mock_opts = copy.copy(salt.config.DEFAULT_MINION_OPTS)
mock_opts = self.get_config('minion', from_scratch=True)
mock_opts['scheduler_before_connect'] = True
minion = salt.minion.Minion(mock_opts, io_loop=tornado.ioloop.IOLoop())
try: