mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Update cassandra returner for JPMC
This commit is contained in:
parent
87e29188c0
commit
1954c1a3f3
2 changed files with 261 additions and 38 deletions
|
@ -31,17 +31,63 @@ Cassandra Database Module
|
|||
- 192.168.50.12
|
||||
port: 9000
|
||||
username: cas_admin
|
||||
|
||||
.. versionchanged:: Carbon
|
||||
Added support for ``ssl_options`` and ``protocol_version``.
|
||||
|
||||
Example configuration with
|
||||
`ssl options <http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.ssl_options>`_:
|
||||
|
||||
If ``ssl_options`` are present in cassandra config the cassandra_cql returner
|
||||
will use SSL. SSL isn't used if ``ssl_options`` isn't specified.
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
cassandra:
|
||||
cluster:
|
||||
- 192.168.50.10
|
||||
- 192.168.50.11
|
||||
- 192.168.50.12
|
||||
port: 9000
|
||||
username: cas_admin
|
||||
|
||||
ssl_options:
|
||||
ca_certs: /etc/ssl/certs/ca-bundle.trust.crt
|
||||
|
||||
# SSL version should be one from the ssl module
|
||||
# This is an optional parameter
|
||||
ssl_version: PROTOCOL_TLSv1
|
||||
|
||||
Additionally you can also specify the ``protocol_version`` to
|
||||
`use <http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.ssl_options>`_.
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
cassandra:
|
||||
cluster:
|
||||
- 192.168.50.10
|
||||
- 192.168.50.11
|
||||
- 192.168.50.12
|
||||
port: 9000
|
||||
username: cas_admin
|
||||
|
||||
# defaults to 4, if not set
|
||||
protocol_version: 3
|
||||
|
||||
'''
|
||||
|
||||
# Import Python Libs
|
||||
from __future__ import absolute_import
|
||||
import logging
|
||||
import json
|
||||
import ssl
|
||||
|
||||
# Import Salt Libs
|
||||
from salt.exceptions import CommandExecutionError
|
||||
from salt.ext import six
|
||||
|
||||
SSL_VERSION = 'ssl_version'
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
__virtualname__ = 'cassandra_cql'
|
||||
|
@ -51,8 +97,10 @@ try:
|
|||
# pylint: disable=import-error,no-name-in-module
|
||||
from cassandra.cluster import Cluster
|
||||
from cassandra.cluster import NoHostAvailable
|
||||
from cassandra.connection import ConnectionException, ConnectionShutdown
|
||||
from cassandra.connection import ConnectionException, \
|
||||
ConnectionShutdown, OperationTimedOut
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
from cassandra.policies import DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy
|
||||
from cassandra.query import dict_factory
|
||||
# pylint: enable=import-error,no-name-in-module
|
||||
HAS_DRIVER = True
|
||||
|
@ -72,6 +120,10 @@ def __virtual__():
|
|||
return (False, 'Cannot load cassandra_cql module: python driver not found')
|
||||
|
||||
|
||||
def _async_log_errors(errors):
|
||||
log.error('Cassandra_cql async call returned: {0}'.format(errors))
|
||||
|
||||
|
||||
def _load_properties(property_name, config_option, set_default=False, default=None):
|
||||
'''
|
||||
Load properties for the cassandra module from config or pillar.
|
||||
|
@ -83,7 +135,7 @@ def _load_properties(property_name, config_option, set_default=False, default=No
|
|||
:param set_default: Should a default be set if not found in config.
|
||||
:type set_default: bool
|
||||
:param default: The default value to be set.
|
||||
:type default: str
|
||||
:type default: str or int
|
||||
:return: The property fetched from the configuration or default.
|
||||
:rtype: str or list of str
|
||||
'''
|
||||
|
@ -107,7 +159,39 @@ def _load_properties(property_name, config_option, set_default=False, default=No
|
|||
return property_name
|
||||
|
||||
|
||||
def _connect(contact_points=None, port=None, cql_user=None, cql_pass=None):
|
||||
def _get_ssl_opts():
|
||||
'''
|
||||
Parse out ssl_options for Cassandra cluster connection.
|
||||
Make sure that the ssl_version (if any specified) is valid.
|
||||
'''
|
||||
sslopts = __salt__['config.option']('cassandra').get('ssl_options', None)
|
||||
ssl_opts = {}
|
||||
|
||||
if sslopts:
|
||||
ssl_opts['ca_certs'] = sslopts['ca_certs']
|
||||
if SSL_VERSION in sslopts:
|
||||
if not sslopts[SSL_VERSION].startswith('PROTOCOL_'):
|
||||
valid_opts = ', '.join(
|
||||
[x for x in dir(ssl) if x.startswith('PROTOCOL_')]
|
||||
)
|
||||
raise CommandExecutionError('Invalid protocol_version '
|
||||
'specified! '
|
||||
'Please make sure '
|
||||
'that the ssl protocol'
|
||||
'version is one from the SSL'
|
||||
'module. '
|
||||
'Valid options are '
|
||||
'{0}'.format(valid_opts))
|
||||
else:
|
||||
ssl_opts[SSL_VERSION] = \
|
||||
getattr(ssl, sslopts[SSL_VERSION])
|
||||
return ssl_opts
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def _connect(contact_points=None, port=None, cql_user=None, cql_pass=None,
|
||||
protocol_version=4):
|
||||
'''
|
||||
Connect to a Cassandra cluster.
|
||||
|
||||
|
@ -119,6 +203,8 @@ def _connect(contact_points=None, port=None, cql_user=None, cql_pass=None):
|
|||
:type cql_pass: str
|
||||
:param port: The Cassandra cluster port, defaults to None.
|
||||
:type port: int
|
||||
:param protocol_version: Cassandra protocol version to use.
|
||||
:type port: int
|
||||
:return: The session and cluster objects.
|
||||
:rtype: cluster object, session object
|
||||
'''
|
||||
|
@ -140,19 +226,53 @@ def _connect(contact_points=None, port=None, cql_user=None, cql_pass=None):
|
|||
and 'cassandra_cql_returner_session' in __context__):
|
||||
return __context__['cassandra_cql_returner_cluster'], __context__['cassandra_cql_returner_session']
|
||||
else:
|
||||
|
||||
contact_points = _load_properties(property_name=contact_points, config_option='cluster')
|
||||
contact_points = contact_points if isinstance(contact_points, list) else contact_points.split(',')
|
||||
port = _load_properties(property_name=port, config_option='port', set_default=True, default=9042)
|
||||
cql_user = _load_properties(property_name=cql_user, config_option='username', set_default=True, default="cassandra")
|
||||
cql_pass = _load_properties(property_name=cql_pass, config_option='password', set_default=True, default="cassandra")
|
||||
log.debug('()()()()()()()()()()()()( proto version passed {0} ^^^'.format(protocol_version))
|
||||
protocol_version = _load_properties(property_name=None, config_option='protocol_version',
|
||||
set_default=True, default=4)
|
||||
log.debug('^^^^^^^^^^^^^^^^^^^^^^ proto version retrieved {0} ^^^'.format(protocol_version))
|
||||
|
||||
|
||||
try:
|
||||
auth_provider = PlainTextAuthProvider(username=cql_user, password=cql_pass)
|
||||
cluster = Cluster(contact_points, port=port, auth_provider=auth_provider)
|
||||
session = cluster.connect()
|
||||
ssl_opts = _get_ssl_opts()
|
||||
if ssl_opts:
|
||||
# lbp = WhiteListRoundRobinPolicy(contact_points)
|
||||
cluster = Cluster(contact_points,
|
||||
port=port,
|
||||
# load_balancing_policy=lbp,
|
||||
auth_provider=auth_provider,
|
||||
ssl_options=ssl_opts,
|
||||
protocol_version=protocol_version,
|
||||
compression=True)
|
||||
else:
|
||||
# lbp = WhiteListRoundRobinPolicy(contact_points)
|
||||
cluster = Cluster(contact_points, port=port,
|
||||
# load_balancing_policy=lbp,
|
||||
auth_provider=auth_provider,
|
||||
protocol_version=protocol_version,
|
||||
compression=True)
|
||||
for recontimes in [1, 2, 3]:
|
||||
try:
|
||||
log.warning('Attempting connect')
|
||||
session = cluster.connect()
|
||||
log.warning('After connect')
|
||||
break
|
||||
except OperationTimedOut:
|
||||
log.warning('Cassandra cluster.connect timed out, try {0}'.format(recontimes))
|
||||
if recontimes >= 3:
|
||||
raise
|
||||
|
||||
# TODO: Call cluster.shutdown() when the module is unloaded on shutdown.
|
||||
__context__['cassandra_cql_returner_cluster'] = cluster
|
||||
__context__['cassandra_cql_returner_session'] = session
|
||||
__context__['cassandra_cql_prepared'] = {}
|
||||
|
||||
log.debug('Successfully connected to Cassandra cluster at {0}'.format(contact_points))
|
||||
return cluster, session
|
||||
except TypeError:
|
||||
|
@ -216,6 +336,92 @@ def cql_query(query, contact_points=None, port=None, cql_user=None, cql_pass=Non
|
|||
return ret
|
||||
|
||||
|
||||
def cql_query_with_prepare(query, statement_name, statement_arguments, async=False,
|
||||
callback_errors=None,
|
||||
contact_points=None, port=None, cql_user=None, cql_pass=None):
|
||||
'''
|
||||
Run a query on a Cassandra cluster and return a dictionary.
|
||||
|
||||
This function should not be used asynchronously for SELECTs -- it will not
|
||||
return anything and we don't currently have a mechanism for handling a future
|
||||
that will return results.
|
||||
|
||||
:param query: The query to execute.
|
||||
:type query: str
|
||||
:param statement_name: Name to assign the prepared statement in the __context__ dictionary
|
||||
:type statement_name: str
|
||||
:param statement_arguments: Bind parameters for the SQL statement
|
||||
:type statement_arguments: list[str]
|
||||
:param async: Run this query in asynchronous mode
|
||||
:type async: bool
|
||||
:param callback_errors: Function to call after query runs if there is an error
|
||||
:type callback_errors: Function callable
|
||||
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
|
||||
:type contact_points: str | list[str]
|
||||
:param cql_user: The Cassandra user if authentication is turned on.
|
||||
:type cql_user: str
|
||||
:param cql_pass: The Cassandra user password if authentication is turned on.
|
||||
:type cql_pass: str
|
||||
:param port: The Cassandra cluster port, defaults to None.
|
||||
:type port: int
|
||||
:param params: The parameters for the query, optional.
|
||||
:type params: str
|
||||
:return: A dictionary from the return values of the query
|
||||
:rtype: list[dict]
|
||||
'''
|
||||
try:
|
||||
cluster, session = _connect(contact_points=contact_points, port=port,
|
||||
cql_user=cql_user, cql_pass=cql_pass)
|
||||
except CommandExecutionError:
|
||||
log.critical('Could not get Cassandra cluster session.')
|
||||
raise
|
||||
except BaseException as e:
|
||||
log.critical('Unexpected error while getting Cassandra cluster session: {0}'.format(str(e)))
|
||||
raise
|
||||
|
||||
if statement_name not in __context__['cassandra_cql_prepared']:
|
||||
try:
|
||||
bound_statement = session.prepare(query)
|
||||
__context__['cassandra_cql_prepared'][statement_name] = bound_statement
|
||||
except BaseException as e:
|
||||
log.critical('Unexpected error while preparing SQL statement: {0}'.format(str(e)))
|
||||
raise
|
||||
else:
|
||||
bound_statement = __context__['cassandra_cql_prepared'][statement_name]
|
||||
|
||||
session.row_factory = dict_factory
|
||||
ret = []
|
||||
|
||||
try:
|
||||
if async:
|
||||
future_results = session.execute_async(bound_statement.bind(statement_arguments))
|
||||
# future_results.add_callbacks(_async_log_errors)
|
||||
else:
|
||||
results = session.execute(bound_statement.bind(statement_arguments))
|
||||
except BaseException as e:
|
||||
log.error('Failed to execute query: {0}\n reason: {1}'.format(query, str(e)))
|
||||
msg = "ERROR: Cassandra query failed: {0} reason: {1}".format(query, str(e))
|
||||
raise CommandExecutionError(msg)
|
||||
|
||||
if not async and results:
|
||||
for result in results:
|
||||
values = {}
|
||||
for key, value in six.iteritems(result):
|
||||
# Salt won't return dictionaries with odd types like uuid.UUID
|
||||
if not isinstance(value, six.text_type):
|
||||
# Must support Cassandra collection types.
|
||||
# Namely, Cassandras set, list, and map collections.
|
||||
if not isinstance(value, (set, list, dict)):
|
||||
value = str(value)
|
||||
values[key] = value
|
||||
ret.append(values)
|
||||
|
||||
# If this was a synchronous call, then we either have a empty list
|
||||
# because there was no return, or we have a return
|
||||
# If this was an async call we only return the empty list
|
||||
return ret
|
||||
|
||||
|
||||
def version(contact_points=None, port=None, cql_user=None, cql_pass=None):
|
||||
'''
|
||||
Show the Cassandra version.
|
||||
|
|
|
@ -159,21 +159,24 @@ def returner(ret):
|
|||
'''
|
||||
query = '''INSERT INTO salt.salt_returns (
|
||||
jid, minion_id, fun, alter_time, full_ret, return, success
|
||||
) VALUES (
|
||||
'{0}', '{1}', '{2}', '{3}', '{4}', '{5}', {6}
|
||||
);'''.format(
|
||||
ret['jid'],
|
||||
ret['id'],
|
||||
ret['fun'],
|
||||
int(time.time() * 1000),
|
||||
json.dumps(ret).replace("'", "''"),
|
||||
json.dumps(ret['return']).replace("'", "''"),
|
||||
ret.get('success', False)
|
||||
)
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?)'''
|
||||
|
||||
statement_arguments = []
|
||||
|
||||
statement_arguments.append('{0}'.format(ret['jid']))
|
||||
statement_arguments.append('{0}'.format(ret['id']))
|
||||
statement_arguments.append('{0}'.format(ret['fun']))
|
||||
statement_arguments.append(int(time.time() * 1000))
|
||||
statement_arguments.append('{0}'.format(json.dumps(ret).replace("'", "''")))
|
||||
statement_arguments.append('{0}'.format(json.dumps(ret['return']).replace("'", "''")))
|
||||
statement_arguments.append(ret.get('success', False))
|
||||
|
||||
# cassandra_cql.cql_query may raise a CommandExecutionError
|
||||
try:
|
||||
__salt__['cassandra_cql.cql_query'](query)
|
||||
__salt__['cassandra_cql.cql_query_with_prepare'](query,
|
||||
'returner_return',
|
||||
tuple(statement_arguments),
|
||||
async=True)
|
||||
except CommandExecutionError:
|
||||
log.critical('Could not insert into salt_returns with Cassandra returner.')
|
||||
raise
|
||||
|
@ -185,13 +188,19 @@ def returner(ret):
|
|||
# The data in salt.minions will be used by get_fun and get_minions
|
||||
query = '''INSERT INTO salt.minions (
|
||||
minion_id, last_fun
|
||||
) VALUES (
|
||||
'{0}', '{1}'
|
||||
);'''.format(ret['id'], ret['fun'])
|
||||
) VALUES (?, ?)'''
|
||||
|
||||
statement_arguments = []
|
||||
|
||||
statement_arguments.append('{0}'.format(ret['id']))
|
||||
statement_arguments.append('{0}'.format(ret['fun']))
|
||||
|
||||
# cassandra_cql.cql_query may raise a CommandExecutionError
|
||||
try:
|
||||
__salt__['cassandra_cql.cql_query'](query)
|
||||
__salt__['cassandra_cql.cql_query_with_prepare'](query,
|
||||
'returner_minion',
|
||||
tuple(statement_arguments),
|
||||
async=True)
|
||||
except CommandExecutionError:
|
||||
log.critical('Could not store minion ID with Cassandra returner.')
|
||||
raise
|
||||
|
@ -218,16 +227,19 @@ def event_return(events):
|
|||
query = '''INSERT INTO salt.salt_events (
|
||||
id, alter_time, data, master_id, tag
|
||||
) VALUES (
|
||||
{0}, {1}, '{2}', '{3}', '{4}'
|
||||
);'''.format(str(uuid.uuid1()),
|
||||
int(time.time() * 1000),
|
||||
json.dumps(data).replace("'", "''"),
|
||||
__opts__['id'],
|
||||
tag)
|
||||
?, ?, ?, ?, ?)
|
||||
'''
|
||||
statement_arguments = [str(uuid.uuid1()),
|
||||
int(time.time() * 1000),
|
||||
json.dumps(data).replace("'", "''"),
|
||||
__opts__['id'],
|
||||
tag]
|
||||
|
||||
# cassandra_cql.cql_query may raise a CommandExecutionError
|
||||
try:
|
||||
__salt__['cassandra_cql.cql_query'](query)
|
||||
__salt__['cassandra_cql.cql_query_with_prepare'](query, 'salt_events',
|
||||
statement_arguments,
|
||||
async=True)
|
||||
except CommandExecutionError:
|
||||
log.critical('Could not store events with Cassandra returner.')
|
||||
raise
|
||||
|
@ -245,13 +257,18 @@ def save_load(jid, load, minions=None):
|
|||
# json.dumps(load) must be escaped Cassandra style.
|
||||
query = '''INSERT INTO salt.jids (
|
||||
jid, load
|
||||
) VALUES (
|
||||
'{0}', '{1}'
|
||||
);'''.format(jid, json.dumps(load).replace("'", "''"))
|
||||
) VALUES (?, ?)'''
|
||||
|
||||
statement_arguments = [
|
||||
jid,
|
||||
json.dumps(load).replace("'", "''")
|
||||
]
|
||||
|
||||
# cassandra_cql.cql_query may raise a CommandExecutionError
|
||||
try:
|
||||
__salt__['cassandra_cql.cql_query'](query)
|
||||
__salt__['cassandra_cql.cql_query_with_prepare'](query, 'save_load',
|
||||
statement_arguments,
|
||||
async=True)
|
||||
except CommandExecutionError:
|
||||
log.critical('Could not save load in jids table.')
|
||||
raise
|
||||
|
@ -272,13 +289,13 @@ def get_load(jid):
|
|||
'''
|
||||
Return the load data that marks a specified jid
|
||||
'''
|
||||
query = '''SELECT load FROM salt.jids WHERE jid = '{0}';'''.format(jid)
|
||||
query = '''SELECT load FROM salt.jids WHERE jid = ?;'''
|
||||
|
||||
ret = {}
|
||||
|
||||
# cassandra_cql.cql_query may raise a CommandExecutionError
|
||||
try:
|
||||
data = __salt__['cassandra_cql.cql_query'](query)
|
||||
data = __salt__['cassandra_cql.cql_query_with_prepare'](query, 'get_load', [jid])
|
||||
if data:
|
||||
load = data[0].get('load')
|
||||
if load:
|
||||
|
@ -298,13 +315,13 @@ def get_jid(jid):
|
|||
'''
|
||||
Return the information returned when the specified job id was executed
|
||||
'''
|
||||
query = '''SELECT minion_id, full_ret FROM salt.salt_returns WHERE jid = '{0}';'''.format(jid)
|
||||
query = '''SELECT minion_id, full_ret FROM salt.salt_returns WHERE jid = ?;'''
|
||||
|
||||
ret = {}
|
||||
|
||||
# cassandra_cql.cql_query may raise a CommandExecutionError
|
||||
try:
|
||||
data = __salt__['cassandra_cql.cql_query'](query)
|
||||
data = __salt__['cassandra_cql.cql_query_with_prepare'](query, 'get_jid', [jid])
|
||||
if data:
|
||||
for row in data:
|
||||
minion = row.get('minion_id')
|
||||
|
@ -326,13 +343,13 @@ def get_fun(fun):
|
|||
'''
|
||||
Return a dict of the last function called for all minions
|
||||
'''
|
||||
query = '''SELECT minion_id, last_fun FROM salt.minions where last_fun = '{0}';'''.format(fun)
|
||||
query = '''SELECT minion_id, last_fun FROM salt.minions where last_fun = ?;'''
|
||||
|
||||
ret = {}
|
||||
|
||||
# cassandra_cql.cql_query may raise a CommandExecutionError
|
||||
try:
|
||||
data = __salt__['cassandra_cql.cql_query'](query)
|
||||
data = __salt__['cassandra_cql.cql_query'](query, 'get_fun', [fun])
|
||||
if data:
|
||||
for row in data:
|
||||
minion = row.get('minion_id')
|
||||
|
|
Loading…
Add table
Reference in a new issue