Update cassandra returner for JPMC

This commit is contained in:
C. R. Oldham 2016-08-12 15:21:15 -06:00
parent 87e29188c0
commit 1954c1a3f3
2 changed files with 261 additions and 38 deletions

View file

@ -31,17 +31,63 @@ Cassandra Database Module
- 192.168.50.12
port: 9000
username: cas_admin
.. versionchanged:: Carbon
Added support for ``ssl_options`` and ``protocol_version``.
Example configuration with
`ssl options <http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.ssl_options>`_:
If ``ssl_options`` are present in cassandra config the cassandra_cql returner
will use SSL. SSL isn't used if ``ssl_options`` isn't specified.
.. code-block:: yaml
cassandra:
cluster:
- 192.168.50.10
- 192.168.50.11
- 192.168.50.12
port: 9000
username: cas_admin
ssl_options:
ca_certs: /etc/ssl/certs/ca-bundle.trust.crt
# SSL version should be one from the ssl module
# This is an optional parameter
ssl_version: PROTOCOL_TLSv1
Additionally you can also specify the ``protocol_version`` to
`use <http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.ssl_options>`_.
.. code-block:: yaml
cassandra:
cluster:
- 192.168.50.10
- 192.168.50.11
- 192.168.50.12
port: 9000
username: cas_admin
# defaults to 4, if not set
protocol_version: 3
'''
# Import Python Libs
from __future__ import absolute_import
import logging
import json
import ssl
# Import Salt Libs
from salt.exceptions import CommandExecutionError
from salt.ext import six
SSL_VERSION = 'ssl_version'
log = logging.getLogger(__name__)
__virtualname__ = 'cassandra_cql'
@ -51,8 +97,10 @@ try:
# pylint: disable=import-error,no-name-in-module
from cassandra.cluster import Cluster
from cassandra.cluster import NoHostAvailable
from cassandra.connection import ConnectionException, ConnectionShutdown
from cassandra.connection import ConnectionException, \
ConnectionShutdown, OperationTimedOut
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy
from cassandra.query import dict_factory
# pylint: enable=import-error,no-name-in-module
HAS_DRIVER = True
@ -72,6 +120,10 @@ def __virtual__():
return (False, 'Cannot load cassandra_cql module: python driver not found')
def _async_log_errors(errors):
log.error('Cassandra_cql async call returned: {0}'.format(errors))
def _load_properties(property_name, config_option, set_default=False, default=None):
'''
Load properties for the cassandra module from config or pillar.
@ -83,7 +135,7 @@ def _load_properties(property_name, config_option, set_default=False, default=No
:param set_default: Should a default be set if not found in config.
:type set_default: bool
:param default: The default value to be set.
:type default: str
:type default: str or int
:return: The property fetched from the configuration or default.
:rtype: str or list of str
'''
@ -107,7 +159,39 @@ def _load_properties(property_name, config_option, set_default=False, default=No
return property_name
def _connect(contact_points=None, port=None, cql_user=None, cql_pass=None):
def _get_ssl_opts():
'''
Parse out ssl_options for Cassandra cluster connection.
Make sure that the ssl_version (if any specified) is valid.
'''
sslopts = __salt__['config.option']('cassandra').get('ssl_options', None)
ssl_opts = {}
if sslopts:
ssl_opts['ca_certs'] = sslopts['ca_certs']
if SSL_VERSION in sslopts:
if not sslopts[SSL_VERSION].startswith('PROTOCOL_'):
valid_opts = ', '.join(
[x for x in dir(ssl) if x.startswith('PROTOCOL_')]
)
raise CommandExecutionError('Invalid protocol_version '
'specified! '
'Please make sure '
'that the ssl protocol'
'version is one from the SSL'
'module. '
'Valid options are '
'{0}'.format(valid_opts))
else:
ssl_opts[SSL_VERSION] = \
getattr(ssl, sslopts[SSL_VERSION])
return ssl_opts
else:
return None
def _connect(contact_points=None, port=None, cql_user=None, cql_pass=None,
protocol_version=4):
'''
Connect to a Cassandra cluster.
@ -119,6 +203,8 @@ def _connect(contact_points=None, port=None, cql_user=None, cql_pass=None):
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param protocol_version: Cassandra protocol version to use.
:type port: int
:return: The session and cluster objects.
:rtype: cluster object, session object
'''
@ -140,19 +226,53 @@ def _connect(contact_points=None, port=None, cql_user=None, cql_pass=None):
and 'cassandra_cql_returner_session' in __context__):
return __context__['cassandra_cql_returner_cluster'], __context__['cassandra_cql_returner_session']
else:
contact_points = _load_properties(property_name=contact_points, config_option='cluster')
contact_points = contact_points if isinstance(contact_points, list) else contact_points.split(',')
port = _load_properties(property_name=port, config_option='port', set_default=True, default=9042)
cql_user = _load_properties(property_name=cql_user, config_option='username', set_default=True, default="cassandra")
cql_pass = _load_properties(property_name=cql_pass, config_option='password', set_default=True, default="cassandra")
log.debug('()()()()()()()()()()()()( proto version passed {0} ^^^'.format(protocol_version))
protocol_version = _load_properties(property_name=None, config_option='protocol_version',
set_default=True, default=4)
log.debug('^^^^^^^^^^^^^^^^^^^^^^ proto version retrieved {0} ^^^'.format(protocol_version))
try:
auth_provider = PlainTextAuthProvider(username=cql_user, password=cql_pass)
cluster = Cluster(contact_points, port=port, auth_provider=auth_provider)
session = cluster.connect()
ssl_opts = _get_ssl_opts()
if ssl_opts:
# lbp = WhiteListRoundRobinPolicy(contact_points)
cluster = Cluster(contact_points,
port=port,
# load_balancing_policy=lbp,
auth_provider=auth_provider,
ssl_options=ssl_opts,
protocol_version=protocol_version,
compression=True)
else:
# lbp = WhiteListRoundRobinPolicy(contact_points)
cluster = Cluster(contact_points, port=port,
# load_balancing_policy=lbp,
auth_provider=auth_provider,
protocol_version=protocol_version,
compression=True)
for recontimes in [1, 2, 3]:
try:
log.warning('Attempting connect')
session = cluster.connect()
log.warning('After connect')
break
except OperationTimedOut:
log.warning('Cassandra cluster.connect timed out, try {0}'.format(recontimes))
if recontimes >= 3:
raise
# TODO: Call cluster.shutdown() when the module is unloaded on shutdown.
__context__['cassandra_cql_returner_cluster'] = cluster
__context__['cassandra_cql_returner_session'] = session
__context__['cassandra_cql_prepared'] = {}
log.debug('Successfully connected to Cassandra cluster at {0}'.format(contact_points))
return cluster, session
except TypeError:
@ -216,6 +336,92 @@ def cql_query(query, contact_points=None, port=None, cql_user=None, cql_pass=Non
return ret
def cql_query_with_prepare(query, statement_name, statement_arguments, async=False,
callback_errors=None,
contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
Run a query on a Cassandra cluster and return a dictionary.
This function should not be used asynchronously for SELECTs -- it will not
return anything and we don't currently have a mechanism for handling a future
that will return results.
:param query: The query to execute.
:type query: str
:param statement_name: Name to assign the prepared statement in the __context__ dictionary
:type statement_name: str
:param statement_arguments: Bind parameters for the SQL statement
:type statement_arguments: list[str]
:param async: Run this query in asynchronous mode
:type async: bool
:param callback_errors: Function to call after query runs if there is an error
:type callback_errors: Function callable
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
:type contact_points: str | list[str]
:param cql_user: The Cassandra user if authentication is turned on.
:type cql_user: str
:param cql_pass: The Cassandra user password if authentication is turned on.
:type cql_pass: str
:param port: The Cassandra cluster port, defaults to None.
:type port: int
:param params: The parameters for the query, optional.
:type params: str
:return: A dictionary from the return values of the query
:rtype: list[dict]
'''
try:
cluster, session = _connect(contact_points=contact_points, port=port,
cql_user=cql_user, cql_pass=cql_pass)
except CommandExecutionError:
log.critical('Could not get Cassandra cluster session.')
raise
except BaseException as e:
log.critical('Unexpected error while getting Cassandra cluster session: {0}'.format(str(e)))
raise
if statement_name not in __context__['cassandra_cql_prepared']:
try:
bound_statement = session.prepare(query)
__context__['cassandra_cql_prepared'][statement_name] = bound_statement
except BaseException as e:
log.critical('Unexpected error while preparing SQL statement: {0}'.format(str(e)))
raise
else:
bound_statement = __context__['cassandra_cql_prepared'][statement_name]
session.row_factory = dict_factory
ret = []
try:
if async:
future_results = session.execute_async(bound_statement.bind(statement_arguments))
# future_results.add_callbacks(_async_log_errors)
else:
results = session.execute(bound_statement.bind(statement_arguments))
except BaseException as e:
log.error('Failed to execute query: {0}\n reason: {1}'.format(query, str(e)))
msg = "ERROR: Cassandra query failed: {0} reason: {1}".format(query, str(e))
raise CommandExecutionError(msg)
if not async and results:
for result in results:
values = {}
for key, value in six.iteritems(result):
# Salt won't return dictionaries with odd types like uuid.UUID
if not isinstance(value, six.text_type):
# Must support Cassandra collection types.
# Namely, Cassandras set, list, and map collections.
if not isinstance(value, (set, list, dict)):
value = str(value)
values[key] = value
ret.append(values)
# If this was a synchronous call, then we either have a empty list
# because there was no return, or we have a return
# If this was an async call we only return the empty list
return ret
def version(contact_points=None, port=None, cql_user=None, cql_pass=None):
'''
Show the Cassandra version.

View file

@ -159,21 +159,24 @@ def returner(ret):
'''
query = '''INSERT INTO salt.salt_returns (
jid, minion_id, fun, alter_time, full_ret, return, success
) VALUES (
'{0}', '{1}', '{2}', '{3}', '{4}', '{5}', {6}
);'''.format(
ret['jid'],
ret['id'],
ret['fun'],
int(time.time() * 1000),
json.dumps(ret).replace("'", "''"),
json.dumps(ret['return']).replace("'", "''"),
ret.get('success', False)
)
) VALUES (?, ?, ?, ?, ?, ?, ?)'''
statement_arguments = []
statement_arguments.append('{0}'.format(ret['jid']))
statement_arguments.append('{0}'.format(ret['id']))
statement_arguments.append('{0}'.format(ret['fun']))
statement_arguments.append(int(time.time() * 1000))
statement_arguments.append('{0}'.format(json.dumps(ret).replace("'", "''")))
statement_arguments.append('{0}'.format(json.dumps(ret['return']).replace("'", "''")))
statement_arguments.append(ret.get('success', False))
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
__salt__['cassandra_cql.cql_query'](query)
__salt__['cassandra_cql.cql_query_with_prepare'](query,
'returner_return',
tuple(statement_arguments),
async=True)
except CommandExecutionError:
log.critical('Could not insert into salt_returns with Cassandra returner.')
raise
@ -185,13 +188,19 @@ def returner(ret):
# The data in salt.minions will be used by get_fun and get_minions
query = '''INSERT INTO salt.minions (
minion_id, last_fun
) VALUES (
'{0}', '{1}'
);'''.format(ret['id'], ret['fun'])
) VALUES (?, ?)'''
statement_arguments = []
statement_arguments.append('{0}'.format(ret['id']))
statement_arguments.append('{0}'.format(ret['fun']))
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
__salt__['cassandra_cql.cql_query'](query)
__salt__['cassandra_cql.cql_query_with_prepare'](query,
'returner_minion',
tuple(statement_arguments),
async=True)
except CommandExecutionError:
log.critical('Could not store minion ID with Cassandra returner.')
raise
@ -218,16 +227,19 @@ def event_return(events):
query = '''INSERT INTO salt.salt_events (
id, alter_time, data, master_id, tag
) VALUES (
{0}, {1}, '{2}', '{3}', '{4}'
);'''.format(str(uuid.uuid1()),
int(time.time() * 1000),
json.dumps(data).replace("'", "''"),
__opts__['id'],
tag)
?, ?, ?, ?, ?)
'''
statement_arguments = [str(uuid.uuid1()),
int(time.time() * 1000),
json.dumps(data).replace("'", "''"),
__opts__['id'],
tag]
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
__salt__['cassandra_cql.cql_query'](query)
__salt__['cassandra_cql.cql_query_with_prepare'](query, 'salt_events',
statement_arguments,
async=True)
except CommandExecutionError:
log.critical('Could not store events with Cassandra returner.')
raise
@ -245,13 +257,18 @@ def save_load(jid, load, minions=None):
# json.dumps(load) must be escaped Cassandra style.
query = '''INSERT INTO salt.jids (
jid, load
) VALUES (
'{0}', '{1}'
);'''.format(jid, json.dumps(load).replace("'", "''"))
) VALUES (?, ?)'''
statement_arguments = [
jid,
json.dumps(load).replace("'", "''")
]
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
__salt__['cassandra_cql.cql_query'](query)
__salt__['cassandra_cql.cql_query_with_prepare'](query, 'save_load',
statement_arguments,
async=True)
except CommandExecutionError:
log.critical('Could not save load in jids table.')
raise
@ -272,13 +289,13 @@ def get_load(jid):
'''
Return the load data that marks a specified jid
'''
query = '''SELECT load FROM salt.jids WHERE jid = '{0}';'''.format(jid)
query = '''SELECT load FROM salt.jids WHERE jid = ?;'''
ret = {}
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
data = __salt__['cassandra_cql.cql_query'](query)
data = __salt__['cassandra_cql.cql_query_with_prepare'](query, 'get_load', [jid])
if data:
load = data[0].get('load')
if load:
@ -298,13 +315,13 @@ def get_jid(jid):
'''
Return the information returned when the specified job id was executed
'''
query = '''SELECT minion_id, full_ret FROM salt.salt_returns WHERE jid = '{0}';'''.format(jid)
query = '''SELECT minion_id, full_ret FROM salt.salt_returns WHERE jid = ?;'''
ret = {}
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
data = __salt__['cassandra_cql.cql_query'](query)
data = __salt__['cassandra_cql.cql_query_with_prepare'](query, 'get_jid', [jid])
if data:
for row in data:
minion = row.get('minion_id')
@ -326,13 +343,13 @@ def get_fun(fun):
'''
Return a dict of the last function called for all minions
'''
query = '''SELECT minion_id, last_fun FROM salt.minions where last_fun = '{0}';'''.format(fun)
query = '''SELECT minion_id, last_fun FROM salt.minions where last_fun = ?;'''
ret = {}
# cassandra_cql.cql_query may raise a CommandExecutionError
try:
data = __salt__['cassandra_cql.cql_query'](query)
data = __salt__['cassandra_cql.cql_query'](query, 'get_fun', [fun])
if data:
for row in data:
minion = row.get('minion_id')