[PY3] Add unicode_literals to transports

This commit is contained in:
Erik Johnson 2017-12-28 17:09:30 -06:00
parent fc0046bf96
commit 3f85b60077
No known key found for this signature in database
GPG key ID: 5E5583C437808F3F
14 changed files with 104 additions and 110 deletions

View file

@ -2,7 +2,7 @@
'''
Encapsulate the different transports available to Salt.
'''
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import logging
# Import third party libs
@ -57,8 +57,10 @@ class MessageClientPool(object):
def __init__(self, tgt, opts, args=None, kwargs=None):
sock_pool_size = opts['sock_pool_size'] if 'sock_pool_size' in opts else 1
if sock_pool_size < 1:
log.warn('sock_pool_size is not correctly set, \
the option should be greater than 0 but, {0}'.format(sock_pool_size))
log.warning(
'sock_pool_size is not correctly set, the option should be '
'greater than 0 but is instead %s', sock_pool_size
)
sock_pool_size = 1
if args is None:

View file

@ -6,7 +6,7 @@ This includes client side transport, for the ReqServer and the Publisher
'''
# Import Python Libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import logging
# Import Salt Libs
@ -156,7 +156,7 @@ class AsyncPubChannel(AsyncChannel):
# switch on available ttypes
if ttype == 'detect':
opts['detect_mode'] = True
log.info('Transport is set to detect; using {0}'.format(ttype))
log.info('Transport is set to detect; using %s', ttype)
if ttype == 'zeromq':
import salt.transport.zeromq
return salt.transport.zeromq.AsyncZeroMQPubChannel(opts, **kwargs)

View file

@ -3,7 +3,7 @@
Helper functions for transport components to handle message framing
'''
# Import python libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import msgpack
from salt.ext import six

View file

@ -4,7 +4,7 @@ IPC transport classes
'''
# Import Python libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import logging
import socket
import weakref
@ -119,7 +119,7 @@ class IPCServer(object):
Blocks until socket is established
'''
# Start up the ioloop
log.trace('IPCServer: binding to socket: {0}'.format(self.socket_path))
log.trace('IPCServer: binding to socket: %s', self.socket_path)
if isinstance(self.socket_path, int):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@ -177,25 +177,24 @@ class IPCServer(object):
body = framed_msg['body']
self.io_loop.spawn_callback(self.payload_handler, body, write_callback(stream, framed_msg['head']))
except tornado.iostream.StreamClosedError:
log.trace('Client disconnected '
'from IPC {0}'.format(self.socket_path))
log.trace('Client disconnected from IPC %s', self.socket_path)
break
except socket.error as exc:
# On occasion an exception will occur with
# an error code of 0, it's a spurious exception.
if exc.errno == 0:
log.trace('Exception occured with error number 0, '
'spurious exception: {0}'.format(exc))
'spurious exception: %s', exc)
else:
log.error('Exception occurred while '
'handling stream: {0}'.format(exc))
'handling stream: %s', exc)
except Exception as exc:
log.error('Exception occurred while '
'handling stream: {0}'.format(exc))
'handling stream: %s', exc)
def handle_connection(self, connection, address):
log.trace('IPCServer: Handling connection '
'to address: {0}'.format(address))
'to address: %s', address)
try:
stream = IOStream(
connection,
@ -203,7 +202,7 @@ class IPCServer(object):
)
self.io_loop.spawn_callback(self.handle_stream, stream)
except Exception as exc:
log.error('IPC streaming error: {0}'.format(exc))
log.error('IPC streaming error: %s', exc)
def close(self):
'''
@ -248,17 +247,17 @@ class IPCClient(object):
loop_instance_map = IPCClient.instance_map[io_loop]
# FIXME
key = str(socket_path)
key = six.text_type(socket_path)
client = loop_instance_map.get(key)
if client is None:
log.debug('Initializing new IPCClient for path: {0}'.format(key))
log.debug('Initializing new IPCClient for path: %s', key)
client = object.__new__(cls)
# FIXME
client.__singleton_init__(io_loop=io_loop, socket_path=socket_path)
loop_instance_map[key] = client
else:
log.debug('Re-using IPCClient for {0}'.format(key))
log.debug('Re-using IPCClient for %s', key)
return client
def __singleton_init__(self, socket_path, io_loop=None):
@ -336,7 +335,7 @@ class IPCClient(object):
)
try:
log.trace('IPCClient: Connecting to socket: {0}'.format(self.socket_path))
log.trace('IPCClient: Connecting to socket: %s', self.socket_path)
yield self.stream.connect(sock_addr)
self._connecting_future.set_result(True)
break
@ -374,7 +373,7 @@ class IPCClient(object):
# count of the entry has not yet gone to zero.
if self.io_loop in IPCClient.instance_map:
loop_instance_map = IPCClient.instance_map[self.io_loop]
key = str(self.socket_path)
key = six.text_type(self.socket_path)
if key in loop_instance_map:
del loop_instance_map[key]
@ -500,7 +499,7 @@ class IPCMessagePublisher(object):
Blocks until socket is established
'''
# Start up the ioloop
log.trace('IPCMessagePublisher: binding to socket: {0}'.format(self.socket_path))
log.trace('IPCMessagePublisher: binding to socket: %s', self.socket_path)
if isinstance(self.socket_path, int):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@ -523,10 +522,10 @@ class IPCMessagePublisher(object):
try:
yield stream.write(pack)
except tornado.iostream.StreamClosedError:
log.trace('Client disconnected from IPC {0}'.format(self.socket_path))
log.trace('Client disconnected from IPC %s', self.socket_path)
self.streams.discard(stream)
except Exception as exc:
log.error('Exception occurred while handling stream: {0}'.format(exc))
log.error('Exception occurred while handling stream: %s', exc)
if not stream.closed():
stream.close()
self.streams.discard(stream)
@ -544,10 +543,10 @@ class IPCMessagePublisher(object):
self.io_loop.spawn_callback(self._write, stream, pack)
def handle_connection(self, connection, address):
log.trace('IPCServer: Handling connection to address: {0}'.format(address))
log.trace('IPCServer: Handling connection to address: %s', address)
try:
if self.opts['ipc_write_buffer'] > 0:
log.trace('Setting IPC connection write buffer: {0}'.format((self.opts['ipc_write_buffer'])))
log.trace('Setting IPC connection write buffer: %s', (self.opts['ipc_write_buffer']))
stream = IOStream(
connection,
io_loop=self.io_loop,
@ -560,7 +559,7 @@ class IPCMessagePublisher(object):
)
self.streams.add(stream)
except Exception as exc:
log.error('IPC streaming error: {0}'.format(exc))
log.error('IPC streaming error: %s', exc)
def close(self):
'''
@ -664,11 +663,11 @@ class IPCMessageSubscriber(IPCClient):
# Keep 'self._read_stream_future' alive.
ret = None
except tornado.iostream.StreamClosedError as exc:
log.trace('Subscriber disconnected from IPC {0}'.format(self.socket_path))
log.trace('Subscriber disconnected from IPC %s', self.socket_path)
self._read_stream_future = None
exc_to_raise = exc
except Exception as exc:
log.error('Exception occurred in Subscriber while handling stream: {0}'.format(exc))
log.error('Exception occurred in Subscriber while handling stream: %s', exc)
self._read_stream_future = None
exc_to_raise = exc
@ -716,10 +715,10 @@ class IPCMessageSubscriber(IPCClient):
body = framed_msg['body']
self.io_loop.spawn_callback(callback, body)
except tornado.iostream.StreamClosedError:
log.trace('Subscriber disconnected from IPC {0}'.format(self.socket_path))
log.trace('Subscriber disconnected from IPC %s', self.socket_path)
break
except Exception as exc:
log.error('Exception occurred while Subscriber handling stream: {0}'.format(exc))
log.error('Exception occurred while Subscriber handling stream: %s', exc)
@tornado.gen.coroutine
def read_async(self, callback):
@ -732,10 +731,10 @@ class IPCMessageSubscriber(IPCClient):
try:
yield self.connect(timeout=5)
except tornado.iostream.StreamClosedError:
log.trace('Subscriber closed stream on IPC {0} before connect'.format(self.socket_path))
log.trace('Subscriber closed stream on IPC %s before connect', self.socket_path)
yield tornado.gen.sleep(1)
except Exception as exc:
log.error('Exception occurred while Subscriber connecting: {0}'.format(exc))
log.error('Exception occurred while Subscriber connecting: %s', exc)
yield tornado.gen.sleep(1)
yield self._read_async(callback)

View file

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# Import Python Libs
from __future__ import absolute_import, print_function
from __future__ import absolute_import, print_function, unicode_literals
import logging
# Import Salt Libs
@ -23,7 +23,7 @@ class LocalChannel(ReqChannel):
def send(self, load, tries=3, timeout=60, raw=False):
if self.tries == 0:
log.debug('LocalChannel load: {0}').format(load)
log.debug('LocalChannel load: %s', load)
#data = json.loads(load)
#{'path': 'apt-cacher-ng/map.jinja', 'saltenv': 'base', 'cmd': '_serve_file', 'loc': 0}
#f = open(data['path'])

View file

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# Import Python Libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import multiprocessing
import ctypes
import logging
@ -51,7 +51,7 @@ class AESPubClientMixin(object):
@tornado.gen.coroutine
def _decode_payload(self, payload):
# we need to decrypt it
log.trace('Decoding payload: {0}'.format(payload))
log.trace('Decoding payload: %s', payload)
if payload['enc'] == 'aes':
self._verify_master_signature(payload)
try:
@ -172,12 +172,10 @@ class AESReqServerMixin(object):
'''
if not salt.utils.verify.valid_id(self.opts, load['id']):
log.info(
'Authentication request from invalid id {id}'.format(**load)
)
log.info('Authentication request from invalid id %s', load['id'])
return {'enc': 'clear',
'load': {'ret': False}}
log.info('Authentication request from {id}'.format(**load))
log.info('Authentication request from %s', load['id'])
# 0 is default which should be 'unlimited'
if self.opts['max_minions'] > 0:
@ -231,8 +229,8 @@ class AESReqServerMixin(object):
pass
elif os.path.isfile(pubfn_rejected):
# The key has been rejected, don't place it in pending
log.info('Public key rejected for {0}. Key is present in '
'rejection key dir.'.format(load['id']))
log.info('Public key rejected for %s. Key is present in '
'rejection key dir.', load['id'])
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
@ -245,9 +243,9 @@ class AESReqServerMixin(object):
with salt.utils.files.fopen(pubfn, 'r') as pubfn_handle:
if pubfn_handle.read().strip() != load['pub'].strip():
log.error(
'Authentication attempt from {id} failed, the public '
'Authentication attempt from %s failed, the public '
'keys did not match. This may be an attempt to compromise '
'the Salt cluster.'.format(**load)
'the Salt cluster.', load['id']
)
# put denied minion key into minions_denied
with salt.utils.files.fopen(pubfn_denied, 'w+') as fp_:
@ -264,9 +262,7 @@ class AESReqServerMixin(object):
# The key has not been accepted, this is a new minion
if os.path.isdir(pubfn_pend):
# The key path is a directory, error out
log.info(
'New public key {id} is a directory'.format(**load)
)
log.info('New public key %s is a directory', load['id'])
eload = {'result': False,
'id': load['id'],
'pub': load['pub']}
@ -276,14 +272,12 @@ class AESReqServerMixin(object):
if auto_reject:
key_path = pubfn_rejected
log.info('New public key for {id} rejected via autoreject_file'
.format(**load))
log.info('New public key for %s rejected via autoreject_file', load['id'])
key_act = 'reject'
key_result = False
elif not auto_sign:
key_path = pubfn_pend
log.info('New public key for {id} placed in pending'
.format(**load))
log.info('New public key for %s placed in pending', load['id'])
key_act = 'pend'
key_result = True
else:
@ -314,8 +308,8 @@ class AESReqServerMixin(object):
shutil.move(pubfn_pend, pubfn_rejected)
except (IOError, OSError):
pass
log.info('Pending public key for {id} rejected via '
'autoreject_file'.format(**load))
log.info('Pending public key for %s rejected via '
'autoreject_file', load['id'])
ret = {'enc': 'clear',
'load': {'ret': False}}
eload = {'result': False,
@ -333,10 +327,9 @@ class AESReqServerMixin(object):
with salt.utils.files.fopen(pubfn_pend, 'r') as pubfn_handle:
if pubfn_handle.read() != load['pub']:
log.error(
'Authentication attempt from {id} failed, the public '
'Authentication attempt from %s failed, the public '
'key in pending did not match. This may be an '
'attempt to compromise the Salt cluster.'
.format(**load)
'attempt to compromise the Salt cluster.', load['id']
)
# put denied minion key into minions_denied
with salt.utils.files.fopen(pubfn_denied, 'w+') as fp_:
@ -350,9 +343,9 @@ class AESReqServerMixin(object):
'load': {'ret': False}}
else:
log.info(
'Authentication failed from host {id}, the key is in '
'Authentication failed from host %s, the key is in '
'pending and needs to be accepted with salt-key '
'-a {id}'.format(**load)
'-a %s', load['id'], load['id']
)
eload = {'result': True,
'act': 'pend',
@ -369,10 +362,9 @@ class AESReqServerMixin(object):
with salt.utils.files.fopen(pubfn_pend, 'r') as pubfn_handle:
if pubfn_handle.read() != load['pub']:
log.error(
'Authentication attempt from {id} failed, the public '
'Authentication attempt from %s failed, the public '
'keys in pending did not match. This may be an '
'attempt to compromise the Salt cluster.'
.format(**load)
'attempt to compromise the Salt cluster.', load['id']
)
# put denied minion key into minions_denied
with salt.utils.files.fopen(pubfn_denied, 'w+') as fp_:
@ -396,7 +388,7 @@ class AESReqServerMixin(object):
return {'enc': 'clear',
'load': {'ret': False}}
log.info('Authentication accepted from {id}'.format(**load))
log.info('Authentication accepted from %s', load['id'])
# only write to disk if you are adding the file, and in open mode,
# which implies we accept any key from a minion.
if not os.path.isfile(pubfn) and not self.opts['open_mode']:
@ -424,7 +416,7 @@ class AESReqServerMixin(object):
with salt.utils.files.fopen(pubfn) as f:
pub = RSA.importKey(f.read())
except (ValueError, IndexError, TypeError) as err:
log.error('Corrupt public key "{0}": {1}'.format(pubfn, err))
log.error('Corrupt public key "%s": %s', pubfn, err)
return {'enc': 'clear',
'load': {'ret': False}}

View file

@ -3,7 +3,7 @@
RAET transport classes
'''
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import time
# Import Salt Libs
@ -77,7 +77,7 @@ class RAETReqChannel(ReqChannel):
self.stack = jobber_stack
else:
self.stack = jobber_stack = self._setup_stack(ryn=self.ryn)
log.debug("RAETReqChannel Using Jobber Stack at = {0}\n".format(self.stack.ha))
log.debug("RAETReqChannel Using Jobber Stack at = %s\n", self.stack.ha)
def _setup_stack(self, ryn='manor'):
'''
@ -117,7 +117,7 @@ class RAETReqChannel(ReqChannel):
name=ryn,
lanename=lanename,
dirpath=self.opts['sock_dir']))
log.debug("Created Channel Jobber Stack {0}\n".format(stack.name))
log.debug("Created Channel Jobber Stack %s\n", stack.name)
return stack
def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout=60):

View file

@ -6,7 +6,7 @@ This includes server side transport, for the ReqServer and the Publisher
'''
# Import Python Libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
class ReqServerChannel(object):

View file

@ -7,7 +7,7 @@ Wire protocol: "len(payload) msgpack({'head': SOMEHEADER, 'body': SOMEBODY})"
'''
# Import Python Libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import logging
import msgpack
import socket
@ -224,7 +224,7 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
key = cls.__key(opts, **kwargs)
obj = loop_instance_map.get(key)
if obj is None:
log.debug('Initializing new AsyncTCPReqChannel for {0}'.format(key))
log.debug('Initializing new AsyncTCPReqChannel for %s', key)
# we need to make a local variable for this, as we are going to store
# it in a WeakValueDictionary-- which will remove the item if no one
# references it-- this forces a reference while we return to the caller
@ -232,7 +232,7 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
obj.__singleton_init__(opts, **kwargs)
loop_instance_map[key] = obj
else:
log.debug('Re-using AsyncTCPReqChannel for {0}'.format(key))
log.debug('Re-using AsyncTCPReqChannel for %s', key)
return obj
@classmethod
@ -475,9 +475,7 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
except salt.exceptions.SaltReqTimeoutError:
log.info('fire_master failed: master could not be contacted. Request timed out.')
except Exception:
log.info('fire_master failed: {0}'.format(
traceback.format_exc())
)
log.info('fire_master failed: %s', traceback.format_exc())
else:
self._reconnected = True
@ -512,7 +510,7 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
except KeyboardInterrupt:
raise
except Exception as exc:
if '-|RETRY|-' not in str(exc):
if '-|RETRY|-' not in six.text_type(exc):
raise SaltClientError('Unable to sign_in to master: {0}'.format(exc)) # TODO: better error message
def on_recv(self, callback):
@ -667,7 +665,7 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
req_opts['tgt'],
), header=header))
else:
log.error('Unknown req_fun {0}'.format(req_fun))
log.error('Unknown req_fun %s', req_fun)
# always attempt to return an error to the minion
stream.write('Server-side exception handling payload')
stream.close()
@ -680,7 +678,7 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
log.error('Connection was unexpectedly closed', exc_info=True)
except Exception as exc: # pylint: disable=broad-except
# Absorb any other exceptions
log.error('Unexpected exception occurred: {0}'.format(exc), exc_info=True)
log.error('Unexpected exception occurred: %s', exc, exc_info=True)
raise tornado.gen.Return()
@ -701,7 +699,7 @@ class SaltMessageServer(tornado.tcpserver.TCPServer, object):
'''
Handle incoming streams and add messages to the incoming queue
'''
log.trace('Req client {0} connected'.format(address))
log.trace('Req client %s connected', address)
self.clients.append((stream, address))
unpacker = msgpack.Unpacker()
try:
@ -717,10 +715,10 @@ class SaltMessageServer(tornado.tcpserver.TCPServer, object):
self.io_loop.spawn_callback(self.message_handler, stream, header, framed_msg['body'])
except tornado.iostream.StreamClosedError:
log.trace('req client disconnected {0}'.format(address))
log.trace('req client disconnected %s', address)
self.clients.remove((stream, address))
except Exception as e:
log.trace('other master-side exception: {0}'.format(e))
log.trace('other master-side exception: %s', e)
self.clients.remove((stream, address))
stream.close()
@ -989,9 +987,9 @@ class SaltMessageClient(object):
if self._on_recv is not None:
self.io_loop.spawn_callback(self._on_recv, header, body)
else:
log.error('Got response for message_id {0} that we are not tracking'.format(message_id))
log.error('Got response for message_id %s that we are not tracking', message_id)
except tornado.iostream.StreamClosedError as e:
log.debug('tcp stream to {0}:{1} closed, unable to recv'.format(self.host, self.port))
log.debug('tcp stream to %s:%s closed, unable to recv', self.host, self.port)
for future in six.itervalues(self.send_future_map):
future.set_exception(e)
self.send_future_map = {}
@ -1266,7 +1264,7 @@ class PubServer(tornado.tcpserver.TCPServer, object):
client.id_ = load['id']
self._add_client_present(client)
except tornado.iostream.StreamClosedError as e:
log.debug('tcp stream to {0} closed, unable to recv'.format(client.address))
log.debug('tcp stream to %s closed, unable to recv', client.address)
client.close()
self._remove_client_present(client)
self.clients.discard(client)
@ -1276,7 +1274,7 @@ class PubServer(tornado.tcpserver.TCPServer, object):
continue
def handle_stream(self, stream, address):
log.trace('Subscriber at {0} connected'.format(address))
log.trace('Subscriber at %s connected', address)
client = Subscriber(stream, address)
self.clients.add(client)
self.io_loop.spawn_callback(self._stream_read, client)
@ -1284,7 +1282,7 @@ class PubServer(tornado.tcpserver.TCPServer, object):
# TODO: ACK the publish through IPC
@tornado.gen.coroutine
def publish_payload(self, package, _):
log.debug('TCP PubServer sending payload: {0}'.format(package))
log.debug('TCP PubServer sending payload: %s', package)
payload = salt.transport.frame.frame_msg(package['payload'])
to_remove = []
@ -1305,7 +1303,7 @@ class PubServer(tornado.tcpserver.TCPServer, object):
except tornado.iostream.StreamClosedError:
to_remove.append(client)
else:
log.debug('Publish target {0} not connected'.format(topic))
log.debug('Publish target %s not connected', topic)
else:
for client in self.clients:
try:
@ -1315,7 +1313,7 @@ class PubServer(tornado.tcpserver.TCPServer, object):
except tornado.iostream.StreamClosedError:
to_remove.append(client)
for client in to_remove:
log.debug('Subscriber at {0} has disconnected from publisher'.format(client.address))
log.debug('Subscriber at %s has disconnected from publisher', client.address)
client.close()
self._remove_client_present(client)
self.clients.discard(client)
@ -1378,7 +1376,7 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel):
)
# Securely create socket
log.info('Starting the Salt Puller on {0}'.format(pull_uri))
log.info('Starting the Salt Puller on %s', pull_uri)
old_umask = os.umask(0o177)
try:
pull_sock.start()

View file

@ -4,7 +4,7 @@ Zeromq transport classes
'''
# Import Python Libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import os
import sys
import copy
@ -125,16 +125,16 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
key = cls.__key(opts, **kwargs)
obj = loop_instance_map.get(key)
if obj is None:
log.debug('Initializing new AsyncZeroMQReqChannel for {0}'.format(key))
log.debug('Initializing new AsyncZeroMQReqChannel for %s', key)
# we need to make a local variable for this, as we are going to store
# it in a WeakValueDictionary-- which will remove the item if no one
# references it-- this forces a reference while we return to the caller
obj = object.__new__(cls)
obj.__singleton_init__(opts, **kwargs)
loop_instance_map[key] = obj
log.trace('Inserted key into loop_instance_map id {0} for key {1} and process {2}'.format(id(loop_instance_map), key, os.getpid()))
log.trace('Inserted key into loop_instance_map id %s for key %s and process %s', id(loop_instance_map), key, os.getpid())
else:
log.debug('Re-using AsyncZeroMQReqChannel for {0}'.format(key))
log.debug('Re-using AsyncZeroMQReqChannel for %s', key)
return obj
def __deepcopy__(self, memo):
@ -377,18 +377,20 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
self.opts['recon_default'] + self.opts['recon_max']
)
log.debug("Generated random reconnect delay between '{0}ms' and '{1}ms' ({2})".format(
log.debug(
"Generated random reconnect delay between '%sms' and '%sms' (%s)",
self.opts['recon_default'],
self.opts['recon_default'] + self.opts['recon_max'],
recon_delay)
recon_delay
)
log.debug("Setting zmq_reconnect_ivl to '{0}ms'".format(recon_delay))
log.debug("Setting zmq_reconnect_ivl to '%sms'", recon_delay)
self._socket.setsockopt(zmq.RECONNECT_IVL, recon_delay)
if hasattr(zmq, 'RECONNECT_IVL_MAX'):
log.debug("Setting zmq_reconnect_ivl_max to '{0}ms'".format(
self.opts['recon_default'] + self.opts['recon_max'])
log.debug(
"Setting zmq_reconnect_ivl_max to '%sms'",
self.opts['recon_default'] + self.opts['recon_max']
)
self._socket.setsockopt(
@ -452,7 +454,7 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t
# 2 includes a header which says who should do it
elif messages_len == 2:
if messages[0] not in ('broadcast', self.hexid):
log.debug('Publish received for not this minion: {0}'.format(messages[0]))
log.debug('Publish received for not this minion: %s', messages[0])
raise tornado.gen.Return(None)
payload = self.serial.loads(messages[1])
else:
@ -607,7 +609,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
self.w_uri = 'ipc://{0}'.format(
os.path.join(self.opts['sock_dir'], 'workers.ipc')
)
log.info('Worker binding to socket {0}'.format(self.w_uri))
log.info('Worker binding to socket %s', self.w_uri)
self._socket.connect(self.w_uri)
salt.transport.mixins.auth.AESReqServerMixin.post_fork(self, payload_handler, io_loop)
@ -644,7 +646,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
# TODO helper functions to normalize payload?
if not isinstance(payload, dict) or not isinstance(payload.get('load'), dict):
log.error('payload and load must be a dict. Payload was: {0} and load was {1}'.format(payload, payload.get('load')))
log.error('payload and load must be a dict. Payload was: %s and load was %s', payload, payload.get('load'))
stream.send(self.serial.dumps('payload and load must be a dict'))
raise tornado.gen.Return()
@ -687,7 +689,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.
req_opts['tgt'],
)))
else:
log.error('Unknown req_fun {0}'.format(req_fun))
log.error('Unknown req_fun %s', req_fun)
# always attempt to return an error to the minion
stream.send('Server-side exception handling payload')
raise tornado.gen.Return()
@ -790,11 +792,11 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
salt.utils.zeromq.check_ipc_path_max_len(pull_uri)
# Start the minion command publisher
log.info('Starting the Salt Publisher on {0}'.format(pub_uri))
log.info('Starting the Salt Publisher on %s', pub_uri)
pub_sock.bind(pub_uri)
# Securely create socket
log.info('Starting the Salt Puller on {0}'.format(pull_uri))
log.info('Starting the Salt Puller on %s', pull_uri)
old_umask = os.umask(0o177)
try:
pull_sock.bind(pull_uri)
@ -893,7 +895,7 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
tgt_type=load['tgt_type'])
match_ids = _res['minions']
log.debug("Publish Side Match: {0}".format(match_ids))
log.debug("Publish Side Match: %s", match_ids)
# Send list of miions thru so zmq can target them
int_payload['topic_lst'] = match_ids
@ -1057,7 +1059,7 @@ class AsyncReqMessageClient(object):
del self.send_timeout_map[message]
if future.attempts < future.tries:
future.attempts += 1
log.debug('SaltReqTimeoutError, retrying. ({0}/{1})'.format(future.attempts, future.tries))
log.debug('SaltReqTimeoutError, retrying. (%s/%s)', future.attempts, future.tries)
self.send(
message,
timeout=future.timeout,
@ -1146,7 +1148,7 @@ class ZeroMQSocketMonitor(object):
def monitor_callback(self, msg):
evt = zmq.utils.monitor.parse_monitor_message(msg)
evt['description'] = self.event_map[evt['event']]
log.debug("ZeroMQ event: {0}".format(evt))
log.debug("ZeroMQ event: %s", evt)
if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
self.stop()

View file

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# Import Python libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
# Import Salt Libs
import salt.transport.client

View file

@ -4,7 +4,7 @@
'''
# Import python libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import os
import errno
import socket
@ -21,6 +21,7 @@ import salt.transport.server
import salt.transport.client
import salt.utils.platform
from salt.ext import six
from salt.ext.six.moves import range
# Import Salt Testing libs
@ -125,7 +126,7 @@ class IPCMessageClient(BaseIPCReqCase):
self.assertEqual(self.payloads[:-1], msgs)
def test_very_big_message(self):
long_str = ''.join([str(num) for num in range(10**5)])
long_str = ''.join([six.text_type(num) for num in range(10**5)])
msg = {'long_str': long_str, 'stop': True}
self.channel.send(msg)
self.wait()

View file

@ -4,7 +4,7 @@
'''
# Import python libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import threading
import tornado.gen

View file

@ -4,7 +4,7 @@
'''
# Import python libs
from __future__ import absolute_import
from __future__ import absolute_import, print_function, unicode_literals
import os
import time
import threading