Seperate saltnado_websockets from the rest of the implementation.

This commit is contained in:
Thomas Jackson 2014-06-20 15:10:56 -07:00
parent 099187c9af
commit c721d9d664
5 changed files with 437 additions and 382 deletions

View file

@ -4,4 +4,6 @@ rest_tornado
.. automodule:: salt.netapi.rest_tornado.saltnado
.. automodule:: salt.netapi.rest_tornado.saltnado_websockets
.. ............................................................................

View file

@ -37,6 +37,9 @@ def start():
mod_opts = __opts__.get(__virtualname__, {})
if mod_opts.get('websockets', False):
from . import saltnado_websockets
if 'num_processes' not in mod_opts:
mod_opts['num_processes'] = 1
@ -46,7 +49,7 @@ def start():
formatted_events_pattern = r"/formatted_events/{0}".format(token_pattern)
logger.debug("All events URL pattern is {0}".format(all_events_pattern))
application = tornado.web.Application([
paths = [
(r"/", saltnado.SaltAPIHandler),
(r"/login", saltnado.SaltAuthHandler),
(r"/minions/(.*)", saltnado.MinionSaltAPIHandler),
@ -56,14 +59,21 @@ def start():
(r"/run", saltnado.RunSaltAPIHandler),
(r"/events", saltnado.EventsSaltAPIHandler),
(r"/hook(/.*)?", saltnado.WebhookSaltAPIHandler),
# Matches /all_events/[0-9A-Fa-f]{n}
# Where n is the length of hexdigest
# for the current hashing algorithm.
# This algorithm is specified in the
# salt master config file.
(all_events_pattern, saltnado.AllEventsHandler),
(formatted_events_pattern, saltnado.FormattedEventsHandler),
], debug=mod_opts.get('debug', False))
]
# if you have enabled websockets, add them!
if mod_opts.get('websockets', False):
paths += [
# Matches /all_events/[0-9A-Fa-f]{n}
# Where n is the length of hexdigest
# for the current hashing algorithm.
# This algorithm is specified in the
# salt master config file.
(all_events_pattern, saltnado_websockets.AllEventsHandler),
(formatted_events_pattern, saltnado_websockets.FormattedEventsHandler),
]
application = tornado.web.Application(paths, debug=mod_opts.get('debug', False))
application.opts = __opts__
application.mod_opts = mod_opts

View file

@ -1,6 +1,7 @@
# encoding: utf-8
import json
import logging
import threading
import salt.netapi
@ -201,7 +202,6 @@ class SaltInfo(object):
'''
Process events and publish data
'''
import threading
logger.debug('In process {0}'.format(threading.current_thread()))
logger.debug(salt_data['tag'])
logger.debug(salt_data)

View file

@ -7,271 +7,7 @@ A REST API for Salt
:depends: - tornado Python module
All Events
----------
Exposes ``all`` "real-time" events from Salt's event bus on a websocket connection.
It should be noted that "Real-time" here means these events are made available
to the server as soon as any salt related action (changes to minions, new jobs etc) happens.
Clients are however assumed to be able to tolerate any network transport related latencies.
Functionality provided by this endpoint is similar to the ``/events`` end point.
The event bus on the Salt master exposes a large variety of things, notably
when executions are started on the master and also when minions ultimately
return their results. This URL provides a real-time window into a running
Salt infrastructure. Uses websocket as the transport mechanism.
Exposes GET method to return websocket connections.
All requests should include an auth token.
A way to obtain obtain authentication tokens is shown below.
.. code-block:: bash
% curl -si localhost:8000/login \\
-H "Accept: application/json" \\
-d username='salt' \\
-d password='salt' \\
-d eauth='pam'
Which results in the response
.. code-block:: json
{
"return": [{
"perms": [".*", "@runner", "@wheel"],
"start": 1400556492.277421,
"token": "d0ce6c1a37e99dcc0374392f272fe19c0090cca7",
"expire": 1400599692.277422,
"user": "salt",
"eauth": "pam"
}]
}
In this example the ``token`` returned is ``d0ce6c1a37e99dcc0374392f272fe19c0090cca7`` and can be included
in subsequent websocket requests (as part of the URL).
The event stream can be easily consumed via JavaScript:
.. code-block:: javascript
// Note, you must be authenticated!
// Get the Websocket connection to Salt
var source = new Websocket('wss://localhost:8000/all_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7');
// Get Salt's "real time" event stream.
source.onopen = function() { source.send('websocket client ready'); };
// Other handlers
source.onerror = function(e) { console.debug('error!', e); };
// e.data represents Salt's "real time" event data as serialized JSON.
source.onmessage = function(e) { console.debug(e.data); };
// Terminates websocket connection and Salt's "real time" event stream on the server.
source.close();
Or via Python, using the Python module
`websocket-client <https://pypi.python.org/pypi/websocket-client/>`_ for example.
Or the tornado
`client <http://tornado.readthedocs.org/en/latest/websocket.html#client-side-support>`_.
.. code-block:: python
# Note, you must be authenticated!
from websocket import create_connection
# Get the Websocket connection to Salt
ws = create_connection('wss://localhost:8000/all_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7')
# Get Salt's "real time" event stream.
ws.send('websocket client ready')
# Simple listener to print results of Salt's "real time" event stream.
# Look at https://pypi.python.org/pypi/websocket-client/ for more examples.
while listening_to_events:
print ws.recv() # Salt's "real time" event data as serialized JSON.
# Terminates websocket connection and Salt's "real time" event stream on the server.
ws.close()
# Please refer to https://github.com/liris/websocket-client/issues/81 when using a self signed cert
Above examples show how to establish a websocket connection to Salt and activating
real time updates from Salt's event stream by signaling ``websocket client ready``.
Formatted Events
-----------------
Exposes ``formatted`` "real-time" events from Salt's event bus on a websocket connection.
It should be noted that "Real-time" here means these events are made available
to the server as soon as any salt related action (changes to minions, new jobs etc) happens.
Clients are however assumed to be able to tolerate any network transport related latencies.
Functionality provided by this endpoint is similar to the ``/events`` end point.
The event bus on the Salt master exposes a large variety of things, notably
when executions are started on the master and also when minions ultimately
return their results. This URL provides a real-time window into a running
Salt infrastructure. Uses websocket as the transport mechanism.
Formatted events parses the raw "real time" event stream and maintains
a current view of the following:
- minions
- jobs
A change to the minions (such as addition, removal of keys or connection drops)
or jobs is processed and clients are updated.
Since we use salt's presence events to track minions,
please enable ``presence_events``
and set a small value for the ``loop_interval``
in the salt master config file.
Exposes GET method to return websocket connections.
All requests should include an auth token.
A way to obtain obtain authentication tokens is shown below.
.. code-block:: bash
% curl -si localhost:8000/login \\
-H "Accept: application/json" \\
-d username='salt' \\
-d password='salt' \\
-d eauth='pam'
Which results in the response
.. code-block:: json
{
"return": [{
"perms": [".*", "@runner", "@wheel"],
"start": 1400556492.277421,
"token": "d0ce6c1a37e99dcc0374392f272fe19c0090cca7",
"expire": 1400599692.277422,
"user": "salt",
"eauth": "pam"
}]
}
In this example the ``token`` returned is ``d0ce6c1a37e99dcc0374392f272fe19c0090cca7`` and can be included
in subsequent websocket requests (as part of the URL).
The event stream can be easily consumed via JavaScript:
.. code-block:: javascript
// Note, you must be authenticated!
// Get the Websocket connection to Salt
var source = new Websocket('wss://localhost:8000/formatted_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7');
// Get Salt's "real time" event stream.
source.onopen = function() { source.send('websocket client ready'); };
// Other handlers
source.onerror = function(e) { console.debug('error!', e); };
// e.data represents Salt's "real time" event data as serialized JSON.
source.onmessage = function(e) { console.debug(e.data); };
// Terminates websocket connection and Salt's "real time" event stream on the server.
source.close();
Or via Python, using the Python module
`websocket-client <https://pypi.python.org/pypi/websocket-client/>`_ for example.
Or the tornado
`client <http://tornado.readthedocs.org/en/latest/websocket.html#client-side-support>`_.
.. code-block:: python
# Note, you must be authenticated!
from websocket import create_connection
# Get the Websocket connection to Salt
ws = create_connection('wss://localhost:8000/formatted_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7')
# Get Salt's "real time" event stream.
ws.send('websocket client ready')
# Simple listener to print results of Salt's "real time" event stream.
# Look at https://pypi.python.org/pypi/websocket-client/ for more examples.
while listening_to_events:
print ws.recv() # Salt's "real time" event data as serialized JSON.
# Terminates websocket connection and Salt's "real time" event stream on the server.
ws.close()
# Please refer to https://github.com/liris/websocket-client/issues/81 when using a self signed cert
Above examples show how to establish a websocket connection to Salt and activating
real time updates from Salt's event stream by signaling ``websocket client ready``.
Example responses
-----------------
``Minion information`` is a dictionary keyed by each connected minion's ``id`` (``mid``),
grains information for each minion is also included.
Minion information is sent in response to the following minion events:
- connection drops
- requires running ``manage.present`` periodically every ``loop_interval`` seconds
- minion addition
- minon removal
.. code-block:: python
# Not all grains are shown
data: {
"minions": {
"minion1": {
"id": "minion1",
"grains": {
"kernel": "Darwin",
"domain": "local",
"zmqversion": "4.0.3",
"kernelrelease": "13.2.0"
}
}
}
}
``Job information`` is also tracked and delivered.
Job information is also a dictionary
in which each job's information is keyed by salt's ``jid``.
.. code-block:: python
data: {
"jobs": {
"20140609153646699137": {
"tgt_type": "glob",
"jid": "20140609153646699137",
"tgt": "*",
"start_time": "2014-06-09T15:36:46.700315",
"state": "complete",
"fun": "test.ping",
"minions": {
"minion1": {
"return": true,
"retcode": 0,
"success": true
}
}
}
}
}
Setup
=====
In order to run rest_tornado with the salt-master
add the following to your salt master config file.
@ -320,9 +56,8 @@ import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.gen
import tornado.websocket
from tornado.concurrent import Future
from . import event_processor
from collections import defaultdict
@ -358,7 +93,9 @@ logger = logging.getLogger()
class SaltClientsMixIn(object):
'''
MixIn class to container all of the salt clients that the API needs
'''
@property
def saltclients(self):
if not hasattr(self, '__saltclients'):
@ -398,6 +135,12 @@ class Any(Future):
class EventListener(object):
'''
Class responsible for listening to the salt master event bus and updating
futures. This is the core of what makes this async, this allows us to do
non-blocking work in the main processes and "wait" for an event to happen
'''
def __init__(self, mod_opts, opts):
self.mod_opts = mod_opts
self.opts = opts
@ -430,9 +173,11 @@ class EventListener(object):
if len(self.tag_map[tag]) == 0:
del self.tag_map[tag]
def get_event(self, request,
tag='',
callback=None):
def get_event(self,
request,
tag='',
callback=None,
):
'''
Get an event (async of course) return a future that will get it later
'''
@ -992,108 +737,6 @@ class EventsSaltAPIHandler(SaltAPIHandler):
self.finish()
class AllEventsHandler(tornado.websocket.WebSocketHandler):
'''
Server side websocket handler.
'''
def open(self, token):
'''
Return a websocket connection to Salt
representing Salt's "real time" event stream.
'''
logger.debug('In the websocket open method')
self.token = token
# close the connection, if not authenticated
if not self.application.auth.get_tok(token):
logger.debug('Refusing websocket connection, bad token!')
self.close()
return
self.connected = False
@tornado.gen.coroutine
def on_message(self, message):
"""Listens for a "websocket client ready" message.
Once that message is received an asynchronous job
is stated that yeilds messages to the client.
These messages make up salt's
"real time" event stream.
"""
logger.debug('Got websocket message {0}'.format(message))
if message == 'websocket client ready':
if self.connected:
# TBD: Add ability to run commands in this branch
logger.debug('Websocket already connected, returning')
return
self.connected = True
while True:
try:
event = yield self.application.event_listener.get_event(self)
self.write_message(u'data: {0}\n\n'.format(json.dumps(event)))
except Exception as err:
logger.info('Error! Ending server side websocket connection. Reason = {0}'.format(str(err)))
break
self.close()
else:
# TBD: Add logic to run salt commands here
pass
def on_close(self, *args, **kwargs):
'''Cleanup.
'''
logger.debug('In the websocket close method')
self.close()
class FormattedEventsHandler(AllEventsHandler):
@tornado.gen.coroutine
def on_message(self, message):
"""Listens for a "websocket client ready" message.
Once that message is received an asynchronous job
is stated that yeilds messages to the client.
These messages make up salt's
"real time" event stream.
"""
logger.debug('Got websocket message {0}'.format(message))
if message == 'websocket client ready':
if self.connected:
# TBD: Add ability to run commands in this branch
logger.debug('Websocket already connected, returning')
return
self.connected = True
evt_processor = event_processor.SaltInfo(self)
client = salt.netapi.NetapiClient(self.application.opts)
client.run({
'fun': 'grains.items',
'tgt': '*',
'token': self.token,
'mode': 'client',
'async': 'local_async',
'client': 'local'
})
while True:
try:
event = yield self.application.event_listener.get_event(self)
evt_processor.process(event, self.token, self.application.opts)
# self.write_message(u'data: {0}\n\n'.format(json.dumps(event)))
except Exception as err:
logger.debug('Error! Ending server side websocket connection. Reason = {0}'.format(str(err)))
break
self.close()
else:
# TBD: Add logic to run salt commands here
pass
class WebhookSaltAPIHandler(SaltAPIHandler):
'''
Handler for /run requests

View file

@ -0,0 +1,400 @@
# encoding: utf-8
'''
A Websockets add-on to saltnado
===================
.. py:currentmodule:: salt.netapi.rest_tornado.saltnado
:depends: - tornado Python module
In order to enable saltnado_webosockets you must add websockets: True to your
saltnado config block.
.. code-block:: yaml
rest_tornado:
# can be any port
port: 8000
ssl_crt: /etc/pki/api/certs/server.crt
# no need to specify ssl_key if cert and key
# are in one single file
ssl_key: /etc/pki/api/certs/server.key
debug: False
disable_ssl: False
websockets: True
All Events
----------
Exposes ``all`` "real-time" events from Salt's event bus on a websocket connection.
It should be noted that "Real-time" here means these events are made available
to the server as soon as any salt related action (changes to minions, new jobs etc) happens.
Clients are however assumed to be able to tolerate any network transport related latencies.
Functionality provided by this endpoint is similar to the ``/events`` end point.
The event bus on the Salt master exposes a large variety of things, notably
when executions are started on the master and also when minions ultimately
return their results. This URL provides a real-time window into a running
Salt infrastructure. Uses websocket as the transport mechanism.
Exposes GET method to return websocket connections.
All requests should include an auth token.
A way to obtain obtain authentication tokens is shown below.
.. code-block:: bash
% curl -si localhost:8000/login \\
-H "Accept: application/json" \\
-d username='salt' \\
-d password='salt' \\
-d eauth='pam'
Which results in the response
.. code-block:: json
{
"return": [{
"perms": [".*", "@runner", "@wheel"],
"start": 1400556492.277421,
"token": "d0ce6c1a37e99dcc0374392f272fe19c0090cca7",
"expire": 1400599692.277422,
"user": "salt",
"eauth": "pam"
}]
}
In this example the ``token`` returned is ``d0ce6c1a37e99dcc0374392f272fe19c0090cca7`` and can be included
in subsequent websocket requests (as part of the URL).
The event stream can be easily consumed via JavaScript:
.. code-block:: javascript
// Note, you must be authenticated!
// Get the Websocket connection to Salt
var source = new Websocket('wss://localhost:8000/all_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7');
// Get Salt's "real time" event stream.
source.onopen = function() { source.send('websocket client ready'); };
// Other handlers
source.onerror = function(e) { console.debug('error!', e); };
// e.data represents Salt's "real time" event data as serialized JSON.
source.onmessage = function(e) { console.debug(e.data); };
// Terminates websocket connection and Salt's "real time" event stream on the server.
source.close();
Or via Python, using the Python module
`websocket-client <https://pypi.python.org/pypi/websocket-client/>`_ for example.
Or the tornado
`client <http://tornado.readthedocs.org/en/latest/websocket.html#client-side-support>`_.
.. code-block:: python
# Note, you must be authenticated!
from websocket import create_connection
# Get the Websocket connection to Salt
ws = create_connection('wss://localhost:8000/all_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7')
# Get Salt's "real time" event stream.
ws.send('websocket client ready')
# Simple listener to print results of Salt's "real time" event stream.
# Look at https://pypi.python.org/pypi/websocket-client/ for more examples.
while listening_to_events:
print ws.recv() # Salt's "real time" event data as serialized JSON.
# Terminates websocket connection and Salt's "real time" event stream on the server.
ws.close()
# Please refer to https://github.com/liris/websocket-client/issues/81 when using a self signed cert
Above examples show how to establish a websocket connection to Salt and activating
real time updates from Salt's event stream by signaling ``websocket client ready``.
Formatted Events
-----------------
Exposes ``formatted`` "real-time" events from Salt's event bus on a websocket connection.
It should be noted that "Real-time" here means these events are made available
to the server as soon as any salt related action (changes to minions, new jobs etc) happens.
Clients are however assumed to be able to tolerate any network transport related latencies.
Functionality provided by this endpoint is similar to the ``/events`` end point.
The event bus on the Salt master exposes a large variety of things, notably
when executions are started on the master and also when minions ultimately
return their results. This URL provides a real-time window into a running
Salt infrastructure. Uses websocket as the transport mechanism.
Formatted events parses the raw "real time" event stream and maintains
a current view of the following:
- minions
- jobs
A change to the minions (such as addition, removal of keys or connection drops)
or jobs is processed and clients are updated.
Since we use salt's presence events to track minions,
please enable ``presence_events``
and set a small value for the ``loop_interval``
in the salt master config file.
Exposes GET method to return websocket connections.
All requests should include an auth token.
A way to obtain obtain authentication tokens is shown below.
.. code-block:: bash
% curl -si localhost:8000/login \\
-H "Accept: application/json" \\
-d username='salt' \\
-d password='salt' \\
-d eauth='pam'
Which results in the response
.. code-block:: json
{
"return": [{
"perms": [".*", "@runner", "@wheel"],
"start": 1400556492.277421,
"token": "d0ce6c1a37e99dcc0374392f272fe19c0090cca7",
"expire": 1400599692.277422,
"user": "salt",
"eauth": "pam"
}]
}
In this example the ``token`` returned is ``d0ce6c1a37e99dcc0374392f272fe19c0090cca7`` and can be included
in subsequent websocket requests (as part of the URL).
The event stream can be easily consumed via JavaScript:
.. code-block:: javascript
// Note, you must be authenticated!
// Get the Websocket connection to Salt
var source = new Websocket('wss://localhost:8000/formatted_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7');
// Get Salt's "real time" event stream.
source.onopen = function() { source.send('websocket client ready'); };
// Other handlers
source.onerror = function(e) { console.debug('error!', e); };
// e.data represents Salt's "real time" event data as serialized JSON.
source.onmessage = function(e) { console.debug(e.data); };
// Terminates websocket connection and Salt's "real time" event stream on the server.
source.close();
Or via Python, using the Python module
`websocket-client <https://pypi.python.org/pypi/websocket-client/>`_ for example.
Or the tornado
`client <http://tornado.readthedocs.org/en/latest/websocket.html#client-side-support>`_.
.. code-block:: python
# Note, you must be authenticated!
from websocket import create_connection
# Get the Websocket connection to Salt
ws = create_connection('wss://localhost:8000/formatted_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7')
# Get Salt's "real time" event stream.
ws.send('websocket client ready')
# Simple listener to print results of Salt's "real time" event stream.
# Look at https://pypi.python.org/pypi/websocket-client/ for more examples.
while listening_to_events:
print ws.recv() # Salt's "real time" event data as serialized JSON.
# Terminates websocket connection and Salt's "real time" event stream on the server.
ws.close()
# Please refer to https://github.com/liris/websocket-client/issues/81 when using a self signed cert
Above examples show how to establish a websocket connection to Salt and activating
real time updates from Salt's event stream by signaling ``websocket client ready``.
Example responses
-----------------
``Minion information`` is a dictionary keyed by each connected minion's ``id`` (``mid``),
grains information for each minion is also included.
Minion information is sent in response to the following minion events:
- connection drops
- requires running ``manage.present`` periodically every ``loop_interval`` seconds
- minion addition
- minon removal
.. code-block:: python
# Not all grains are shown
data: {
"minions": {
"minion1": {
"id": "minion1",
"grains": {
"kernel": "Darwin",
"domain": "local",
"zmqversion": "4.0.3",
"kernelrelease": "13.2.0"
}
}
}
}
``Job information`` is also tracked and delivered.
Job information is also a dictionary
in which each job's information is keyed by salt's ``jid``.
.. code-block:: python
data: {
"jobs": {
"20140609153646699137": {
"tgt_type": "glob",
"jid": "20140609153646699137",
"tgt": "*",
"start_time": "2014-06-09T15:36:46.700315",
"state": "complete",
"fun": "test.ping",
"minions": {
"minion1": {
"return": true,
"retcode": 0,
"success": true
}
}
}
}
}
Setup
=====
'''
import tornado.websocket
from . import event_processor
import tornado.gen
import logging
logger = logging.getLogger()
class AllEventsHandler(tornado.websocket.WebSocketHandler):
'''
Server side websocket handler.
'''
def open(self, token):
'''
Return a websocket connection to Salt
representing Salt's "real time" event stream.
'''
logger.debug('In the websocket open method')
self.token = token
# close the connection, if not authenticated
if not self.application.auth.get_tok(token):
logger.debug('Refusing websocket connection, bad token!')
self.close()
return
self.connected = False
@tornado.gen.coroutine
def on_message(self, message):
"""Listens for a "websocket client ready" message.
Once that message is received an asynchronous job
is stated that yeilds messages to the client.
These messages make up salt's
"real time" event stream.
"""
logger.debug('Got websocket message {0}'.format(message))
if message == 'websocket client ready':
if self.connected:
# TBD: Add ability to run commands in this branch
logger.debug('Websocket already connected, returning')
return
self.connected = True
while True:
try:
event = yield self.application.event_listener.get_event(self)
self.write_message(u'data: {0}\n\n'.format(json.dumps(event)))
except Exception as err:
logger.info('Error! Ending server side websocket connection. Reason = {0}'.format(str(err)))
break
self.close()
else:
# TBD: Add logic to run salt commands here
pass
def on_close(self, *args, **kwargs):
'''Cleanup.
'''
logger.debug('In the websocket close method')
self.close()
class FormattedEventsHandler(AllEventsHandler):
@tornado.gen.coroutine
def on_message(self, message):
"""Listens for a "websocket client ready" message.
Once that message is received an asynchronous job
is stated that yeilds messages to the client.
These messages make up salt's
"real time" event stream.
"""
logger.debug('Got websocket message {0}'.format(message))
if message == 'websocket client ready':
if self.connected:
# TBD: Add ability to run commands in this branch
logger.debug('Websocket already connected, returning')
return
self.connected = True
evt_processor = event_processor.SaltInfo(self)
client = salt.netapi.NetapiClient(self.application.opts)
client.run({
'fun': 'grains.items',
'tgt': '*',
'token': self.token,
'mode': 'client',
'async': 'local_async',
'client': 'local'
})
while True:
try:
event = yield self.application.event_listener.get_event(self)
evt_processor.process(event, self.token, self.application.opts)
# self.write_message(u'data: {0}\n\n'.format(json.dumps(event)))
except Exception as err:
logger.debug('Error! Ending server side websocket connection. Reason = {0}'.format(str(err)))
break
self.close()
else:
# TBD: Add logic to run salt commands here
pass