Merge pull request #26333 from gtmanfred/develop

Add some redis sentinel and cluster support
This commit is contained in:
Erik Johnson 2015-08-15 00:31:19 -05:00
commit c34c6b992f
3 changed files with 203 additions and 1 deletions

View file

@ -0,0 +1,94 @@
# -*- coding: utf-8 -*-
'''
An engine that reads messages from the redis sentinel pubsub and sends reactor
events based on the channels they are subscribed to.
.. versionadded: Boron
:configuration:
Example configuration
engines:
- redis_sentinel:
hosts:
matching: 'board*'
port: 26379
interface: eth2
channels:
- '+switch-master'
- '+odown'
- '-odown'
:depends: redis
'''
from __future__ import absolute_import
import redis
import salt.client
from salt.ext import six
from salt.ext.six.moves import zip
try:
import redis
HAS_REDIS = True
except ImportError:
HAS_REDIS = False
import logging
log = logging.getLogger(__name__)
log.debug('LOAD REDIS_SENTINEL')
def __virtual__():
return HAS_REDIS
class Listener(object):
def __init__(self, host=None, port=None, channels=None, tag=None):
if host is None:
host = 'localhost'
if port is None:
port = 26379
if channels is None:
channels = ['*']
if tag is None:
tag = 'salt/engine/redis_sentinel'
super(Listener, self).__init__()
self.tag = tag
self.redis = redis.StrictRedis(host=host, port=port)
self.pubsub = self.redis.pubsub()
self.pubsub.psubscribe(channels)
self.fire_master = salt.utils.event.get_master_event(__opts__, __opts__['sock_dir']).fire_event
def work(self, item):
ret = {'channel': item['channel']}
if isinstance(item['data'], six.integer_types):
ret['code'] = item['data']
elif item['channel'] == '+switch-master':
ret.update(dict(list(zip(
('master', 'old_host', 'old_port', 'new_host', 'new_port'), item['data'].split(' ')
))))
elif item['channel'] in ('+odown', '-odown'):
ret.update(dict(list(zip(
('master', 'host', 'port'), item['data'].split(' ')[1:]
))))
else:
ret = {
'channel': item['channel'],
'data': item['data'],
}
self.fire_master(ret, '{0}/{1}'.format(self.tag, item['channel']))
def run(self):
log.debug('Start Listener')
for item in self.pubsub.listen():
log.debug('Item: \n{0}'.format(item))
self.work(item)
def start(hosts, channels, tag=None):
if tag is None:
tag = 'salt/engine/redis_sentinel'
local = salt.client.LocalClient()
ips = local.cmd(hosts['matching'], 'network.ip_addrs', [hosts['interface']]).values()
client = Listener(host=ips.pop()[0], port=hosts['port'], channels=channels, tag=tag)
client.run()

View file

@ -15,8 +15,9 @@ Module to provide redis functionality to Salt
redis.password: None
'''
# Import Pytho libs
# Import Python libs
from __future__ import absolute_import
from salt.ext.six.moves import zip
# Import third party libs
try:
@ -54,6 +55,20 @@ def _connect(host=None, port=None, db=None, password=None):
return redis.StrictRedis(host, port, db, password)
def _sconnect(host=None, port=None, password=None):
'''
Returns an instance of the redis client
'''
if host is None:
host = __salt__['config.option']('redis_sentinel.host', 'localhost')
if port is None:
port = __salt__['config.option']('redis_sentinel.port', 26379)
if password is None:
password = __salt__['config.option']('redis_sentinel.password')
return redis.StrictRedis(host, port, password=password)
def bgrewriteaof(host=None, port=None, db=None, password=None):
'''
Asynchronously rewrite the append-only file
@ -488,3 +503,38 @@ def zrange(key, start, stop, host=None, port=None, db=None, password=None):
'''
server = _connect(host, port, db, password)
return server.zrange(key, start, stop)
def sentinel_get_master_ip(master, host=None, port=None, password=None):
'''
Get ip for sentinel master
.. versionadded: Boron
CLI Example:
.. code-block:: bash
salt '*' redis.sentinel_get_master_ip 'mymaster'
'''
server = _sconnect(host, port, password)
ret = server.sentinel_get_master_addr_by_name(master)
return dict(list(zip(('master_host', 'master_port'), ret)))
def get_master_ip(host=None, port=None, password=None):
'''
Get host information about slave
.. versionadded: Boron
CLI Example:
.. code-block:: bash
salt '*' redis.get_master_ip
'''
server = _connect(host, port, password)
info = server.info()
ret = (info.get('master_host', ''), info.get('master_port', ''))
return dict(list(zip(('master_host', 'master_port'), ret)))

View file

@ -28,6 +28,8 @@ overridden in states using the following arguments: ``host``, ``post``, ``db``,
- db: 0
- password: somuchkittycat
'''
from __future__ import absolute_import
import copy
__virtualname__ = 'redis'
@ -113,3 +115,59 @@ def absent(name, keys=None, **connection_args):
ret['comment'] = 'Key deleted'
ret['changes']['deleted'] = [name]
return ret
def slaveof(name, sentinel_host=None, sentinel_port=None, sentinel_password=None, **connection_args):
'''
Set this redis instance as a slave.
.. versionadded: Boron
name
Master to make this a slave of
sentinel_host
Ip of the sentinel to check for the master
sentinel_port
Port of the sentinel to check for the master
'''
ret = {'name': name,
'changes': {},
'result': False,
'comment': 'Failed to setup slave'}
kwargs = copy.copy(connection_args)
sentinel_master = __salt__['redis.sentinel_get_master_ip'](name, sentinel_host, sentinel_port, sentinel_password)
if sentinel_master['master_host'] in __salt__['network.ip_addrs']():
ret['result'] = True
ret['comment'] = 'Minion is the master: {0}'.format(name)
return ret
first_master = __salt__['redis.get_master_ip'](**connection_args)
if first_master == sentinel_master:
ret['result'] = True
ret['comment'] = 'Minion already slave of master: {0}'.format(name)
return ret
if __opts__['test'] is True:
ret['comment'] = 'Minion will be made a slave of {0}: {1}'.format(name, sentinel_master['host'])
ret['result'] = None
return ret
kwargs.update(**sentinel_master)
__salt__['redis.slaveof'](**kwargs)
current_master = __salt__['redis.get_master_ip'](**connection_args)
if current_master != sentinel_master:
return ret
ret['result'] = True
ret['changes'] = {
'old': first_master,
'new': current_master,
}
ret['comment'] = 'Minion successfully connected to master: {0}'.format(name)
return ret