Add websocket support

This commit is contained in:
Aditya Kulkarni 2014-05-19 10:58:16 -04:00
parent 8a10a60da3
commit 64ed4a309b
4 changed files with 727 additions and 0 deletions

View file

@ -46,6 +46,7 @@ MOCK_MODULES = [
'cheroot.wsgi',
'cherrypy',
'cherrypy.lib',
'cherrypy.process',
'cherrypy.wsgiserver',
'cherrypy.wsgiserver.ssl_builtin',
@ -56,6 +57,11 @@ MOCK_MODULES = [
'tornado.ioloop',
'tornado.web',
'ws4py',
'multiprocessing',
'ws4py.server',
'ws4py.server.cherrypyserver',
'ws4py.websocket',
'yaml',
'zmq',

View file

@ -34,6 +34,12 @@ REST URI Reference
.. autoclass:: Events
:members: GET
.. autoclass:: WebsocketEndpoint
:members: GET
.. autoclass:: AllEvents
:members: GET
.. autoclass:: Webhook
:members: POST

View file

@ -5,6 +5,7 @@ A REST API for Salt
.. py:currentmodule:: saltapi.netapi.rest_cherrypy.app
:depends: - CherryPy Python module
:depends: - ws4py Python module
:configuration: All authentication is done through Salt's :ref:`external auth
<acl-eauth>` system. Be sure that it is enabled and the user you are
authenticating as has permissions for all the functions you will be
@ -214,6 +215,13 @@ import salt.utils.event
# Import salt-api libs
import saltapi
# Imports related to websocket
import time
import event_processor
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
from ws4py.websocket import WebSocket
from multiprocessing import Process, Lock, Pipe
logger = logging.getLogger(__name__)
@ -1254,6 +1262,502 @@ class Events(object):
return listen()
class SynchronizingWebsocket(WebSocket):
'''
Class to handle requests sent to this websocket connection.
Each instance of this class represents a Salt websocket connection.
Waits to receive a ``ready`` message fom the client.
Calls send on it's end of the pipe to signal to the sender on receipt
of ``ready``.
This class also kicks off initial information probing jobs when clients
initially connect. These jobs help gather information about minions, jobs,
and documentation.
'''
def __init__(self, *args, **kwargs):
super(SynchronizingWebsocket, self).__init__(*args, **kwargs)
'''
This pipe needs to represent the parent end of a pipe.
Clients need to ensure that the pipe assigned to ``self.pipe`` is
the ``parent end`` of a
`pipe <https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes>`_.
'''
self.pipe = None
'''
The token that we can use to make API calls.
There are times when we would like to kick off jobs,
examples include trying to obtain minions connected.
'''
self.token = None
'''
Options represent ``salt`` options defined in the configs.
'''
self.opts = None
def received_message(self, message):
'''
Checks if the client has sent a ready message.
A ready message causes ``send()`` to be called on the
``parent end`` of the pipe.
Clients need to ensure that the pipe assigned to ``self.pipe`` is
the ``parent end`` of a pipe.
This ensures completion of the underlying websocket connection
and can be used to synchronize parallel senders.
'''
if message.data == 'websocket client ready':
self.pipe.send(message)
client = saltapi.APIClient(self.opts)
client.run({
'fun': 'grains.items',
'tgt': '*',
'token': self.token,
'mode': 'client',
'async': 'local_async',
'client': 'local'
})
self.send('server received message', False)
class WebsocketEndpoint(object):
'''
Exposes formatted results from Salt's event bus.
The event bus on the Salt master exposes a large variety of things, notably
when executions are started on the master and also when minions ultimately
return their results. This URL provides a real-time window into a running
Salt infrastructure. Uses websocket as the transport mechanism.
Exposes GET method to return websocket connections.
All requests should include an auth token.
A way to obtain obtain authentication tokens is shown below.
.. code-block:: bash
% curl -si localhost:8000/login \\
-H "Accept: application/json" \\
-d username='salt' \\
-d password='salt' \\
-d eauth='pam'
Which results in the response
.. code-block:: json
{
"return": [{
"perms": [".*", "@runner", "@wheel"],
"start": 1400556492.277421,
"token": "d0ce6c1a37e99dcc0374392f272fe19c0090cca7",
"expire": 1400599692.277422,
"user": "salt",
"eauth": "pam"
}]
}
In this example the ``token`` returned is ``d0ce6c1a37e99dcc0374392f272fe19c0090cca7`` and can be included
in subsequent websocket requests (perhaps as part of the URL).
'''
exposed = True
_cp_config = dict(LowDataAdapter._cp_config, **{
'response.stream': True,
'tools.encode.encoding': 'utf-8',
# Auth handled manually below
'tools.salt_token.on': True,
'tools.salt_auth.on': False,
'tools.hypermedia_in.on': False,
'tools.hypermedia_out.on': False,
'tools.websocket.on': True,
'tools.websocket.handler_cls': SynchronizingWebsocket,
})
def __init__(self):
self.opts = cherrypy.config['saltopts']
self.auth = salt.auth.LoadAuth(self.opts)
def GET(self, token=None):
'''
Return a websocket connection to Salt
representing Salt's formatted "real time" event stream.
Makes use of Salt's ``presence
events`` to track minions connected. Presence events are OFF by default and
can be turned on using the ``presence_events`` and ``loop_interval`` options
in the Salt master :ref:`config file <configuration-salt-master>`.
Provides a convenient way for clients to make an HTTP
call and obtain a websocket connection.
.. http:get:: /formatted_events
**Example response**:
.. code-block:: http
Request URL:ws://localhost:8000/formatted_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7
Request Method:GET
Status Code:101 Switching Protocols
Host:localhost:8000
Origin:http://localhost:8000
Pragma:no-cache
Sec-WebSocket-Extensions:permessage-deflate; client_max_window_bits, x-webkit-deflate-frame
Sec-WebSocket-Key:Bdp7VlCtPvkieC3epOiIgA==
Sec-WebSocket-Version:13
Upgrade:websocket
Connection:Upgrade
Content-Type:text/plain;charset=utf-8
Date:Tue, 20 May 2014 02:03:08 GMT
Server:CherryPy/3.2.3
Upgrade:websocket
:status 401: could not authenticate using provided credentials
The event stream can be easily consumed via JavaScript:
.. code-block:: javascript
// Note, you must be authenticated!
// Get the Websocket connection to Salt
var source = new Websocket('ws://localhost:8000/formatted_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7');
// Get Salt's "real time" event stream.
source.onopen = function() { source.send('websocket client ready'); };
// Other handlers
source.onerror = function(e) { console.debug('error!', e); };
// e.data represents Salt's "real time" event data as serialized JSON.
source.onmessage = function(e) { console.debug(e.data); };
// Terminates websocket connection and Salt's "real time" event stream on the server.
source.close();
Or via Python, using the Python module
`websocket-client <https://pypi.python.org/pypi/websocket-client/>`_ for example.
.. code-block:: python
# Note, you must be authenticated!
from websocket import create_connection
# Get the Websocket connection to Salt
ws = create_connection('ws://localhost:8000/formatted_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7')
# Get Salt's "real time" event stream.
ws.send('websocket client ready')
# Simple listener to print results of Salt's "real time" event stream.
# Look at https://pypi.python.org/pypi/websocket-client/ for more examples.
while listening_to_events:
print ws.recv() # Salt's "real time" event data as serialized JSON.
# Terminates websocket connection and Salt's "real time" event stream on the server.
ws.close()
Above examples show how to establish a websocket connection to Salt and activating
real time updates from Salt's event stream by signaling ``websocket client ready``.
'''
# Pulling the session token from an URL param is a workaround for
# browsers not supporting CORS in the EventSource API.
if token:
orig_sesion, _ = cherrypy.session.cache.get(token, ({}, None))
salt_token = orig_sesion.get('token')
else:
salt_token = cherrypy.session.get('token')
# Manually verify the token
if not salt_token or not self.auth.get_tok(salt_token):
raise cherrypy.HTTPError(401) # unauthorized
# Release the session lock before starting the long-running response
cherrypy.session.release_lock()
'''
A handler is the server side end of the websocket connection.
Each request spawns a new instance of this handler
'''
handler = cherrypy.request.ws_handler
minions = {
'fun': 'grains.items',
'tgt': '*',
'expr_type': 'glob',
'mode': 'async',
'token': salt_token
}
def event_stream(handler, pipe):
pipe.recv() # blocks until send is called on the parent end of this pipe.
event = salt.utils.event.SaltEvent('master', self.opts['sock_dir'])
stream = event.iter_events(full=True)
SaltInfo = event_processor.SaltInfo(handler)
while True:
# data = client.get_event(wait=0.025, tag='salt/', full=True)
data = stream.next()
if data:
try: #work around try to decode catch unicode errors
SaltInfo.process(data, salt_token, self.opts)
# handler.send('data: {0}\n\n'.format(json.dumps(data)), False)
except UnicodeDecodeError as ex:
logger.error("Error: Salt event has non UTF-8 data:\n{0}".format(data))
time.sleep(0.1)
parent_pipe, child_pipe = Pipe()
handler.pipe = parent_pipe
handler.opts = self.opts
# Process to handle async push to a client.
# Each GET request causes a process to be kicked off.
proc = Process(target=event_stream, args=(handler,child_pipe))
proc.start()
class SynchronizingHandler(WebSocket):
'''
Class to handle requests sent to this websocket connection.
Each instance of this class represents a Salt websocket connection.
Waits to receive a ``ready`` message fom the client.
Calls send on it's end of the pipe to signal to the sender on receipt
of ``ready``.
This class also kicks off initial information probing jobs when clients
initially connect. These jobs help gather information about minions, jobs,
and documentation.
'''
def __init__(self, *args, **kwargs):
super(SynchronizingHandler, self).__init__(*args, **kwargs)
'''
This pipe needs to represent the parent end of a pipe.
Clients need to ensure that the pipe assigned to ``self.pipe`` is
the ``parent end`` of a
`pipe <https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes>`_.
'''
self.pipe = None
'''
The token that we can use to make API calls.
There are times when we would like to kick off jobs,
examples include trying to obtain minions connected.
'''
self.token = None
def received_message(self, message):
'''
Checks if the client has sent a ready message.
A ready message causes ``send()`` to be called on the
``parent end`` of the pipe.
Clients need to ensure that the pipe assigned to ``self.pipe`` is
the ``parent end`` of a pipe.
This ensures completion of the underlying websocket connection
and can be used to synchronize parallel senders.
'''
if message.data == 'websocket client ready':
self.pipe.send(message)
self.send('server received message', False)
class AllEvents(object):
'''
Exposes ``all`` events from Salt's event bus on a websocket connection.
The event bus on the Salt master exposes a large variety of things, notably
when executions are started on the master and also when minions ultimately
return their results. This URL provides a real-time window into a running
Salt infrastructure. Uses websocket as the transport mechanism.
Exposes GET method to return websocket connections.
All requests should include an auth token.
A way to obtain obtain authentication tokens is shown below.
.. code-block:: bash
% curl -si localhost:8000/login \\
-H "Accept: application/json" \\
-d username='salt' \\
-d password='salt' \\
-d eauth='pam'
Which results in the response
.. code-block:: json
{
"return": [{
"perms": [".*", "@runner", "@wheel"],
"start": 1400556492.277421,
"token": "d0ce6c1a37e99dcc0374392f272fe19c0090cca7",
"expire": 1400599692.277422,
"user": "salt",
"eauth": "pam"
}]
}
In this example the ``token`` returned is ``d0ce6c1a37e99dcc0374392f272fe19c0090cca7`` and can be included
in subsequent websocket requests (perhaps as part of the URL).
'''
exposed = True
_cp_config = dict(LowDataAdapter._cp_config, **{
'response.stream': True,
'tools.encode.encoding': 'utf-8',
# Auth handled manually below
'tools.salt_token.on': True,
'tools.salt_auth.on': False,
'tools.hypermedia_in.on': False,
'tools.hypermedia_out.on': False,
'tools.websocket.on': True,
'tools.websocket.handler_cls': SynchronizingHandler,
})
def __init__(self):
self.opts = cherrypy.config['saltopts']
self.auth = salt.auth.LoadAuth(self.opts)
def GET(self, token=None):
'''
Return a websocket connection to Salt
representing Salt's "real time" event stream.
Provides a convenient way for clients to make an HTTP
call and obtain a websocket connection.
.. http:get:: /all_events
**Example response**:
.. code-block:: http
Request URL:ws://localhost:8000/all_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7
Request Method:GET
Status Code:101 Switching Protocols
Host:localhost:8000
Origin:http://localhost:8000
Pragma:no-cache
Sec-WebSocket-Extensions:permessage-deflate; client_max_window_bits, x-webkit-deflate-frame
Sec-WebSocket-Key:Bdp7VlCtPvkieC3epOiIgA==
Sec-WebSocket-Version:13
Upgrade:websocket
Connection:Upgrade
Content-Type:text/plain;charset=utf-8
Date:Tue, 20 May 2014 02:03:08 GMT
Server:CherryPy/3.2.3
Upgrade:websocket
:status 401: could not authenticate using provided credentials
The event stream can be easily consumed via JavaScript:
.. code-block:: javascript
// Note, you must be authenticated!
// Get the Websocket connection to Salt
var source = new Websocket('ws://localhost:8000/all_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7');
// Get Salt's "real time" event stream.
source.onopen = function() { source.send('websocket client ready'); };
// Other handlers
source.onerror = function(e) { console.debug('error!', e); };
// e.data represents Salt's "real time" event data as serialized JSON.
source.onmessage = function(e) { console.debug(e.data); };
// Terminates websocket connection and Salt's "real time" event stream on the server.
source.close();
Or via Python, using the Python module
`websocket-client <https://pypi.python.org/pypi/websocket-client/>`_ for example.
.. code-block:: python
# Note, you must be authenticated!
from websocket import create_connection
# Get the Websocket connection to Salt
ws = create_connection('ws://localhost:8000/all_events/d0ce6c1a37e99dcc0374392f272fe19c0090cca7')
# Get Salt's "real time" event stream.
ws.send('websocket client ready')
# Simple listener to print results of Salt's "real time" event stream.
# Look at https://pypi.python.org/pypi/websocket-client/ for more examples.
while listening_to_events:
print ws.recv() # Salt's "real time" event data as serialized JSON.
# Terminates websocket connection and Salt's "real time" event stream on the server.
ws.close()
Above examples show how to establish a websocket connection to Salt and activating
real time updates from Salt's event stream by signaling ``websocket client ready``.
'''
# Pulling the session token from an URL param is a workaround for
# browsers not supporting CORS in the EventSource API.
if token:
orig_sesion, _ = cherrypy.session.cache.get(token, ({}, None))
salt_token = orig_sesion.get('token')
else:
salt_token = cherrypy.session.get('token')
# Manually verify the token
if not salt_token or not self.auth.get_tok(salt_token):
raise cherrypy.HTTPError(401) # unauthorized
# Release the session lock before starting the long-running response
cherrypy.session.release_lock()
'''
A handler is the server side end of the websocket connection.
Each request spawns a new instance of this handler
'''
handler = cherrypy.request.ws_handler
def event_stream(handler, pipe):
pipe.recv() # blocks until send is called on the parent end of this pipe.
event = salt.utils.event.SaltEvent('master', self.opts['sock_dir'])
stream = event.iter_events(full=True)
while True:
data = stream.next()
if data:
try: #work around try to decode catch unicode errors
handler.send('data: {0}\n\n'.format(json.dumps(data)), False)
except UnicodeDecodeError as ex:
logger.error("Error: Salt event has non UTF-8 data:\n{0}".format(data))
time.sleep(0.1)
parent_pipe, child_pipe = Pipe()
handler.pipe = parent_pipe
# Process to handle async push to a client.
# Each GET request causes a process to be kicked off.
proc = Process(target=event_stream, args=(handler,child_pipe))
proc.start()
class Webhook(object):
'''
A generic web hook entry point that fires an event on Salt's event bus
@ -1452,6 +1956,8 @@ class API(object):
'jobs': Jobs,
'events': Events,
'stats': Stats,
'formatted_events': WebsocketEndpoint,
'all_events': AllEvents,
}
def __init__(self):
@ -1467,6 +1973,9 @@ class API(object):
if 'app' in self.apiopts:
setattr(self, self.apiopts.get('app_path', 'app').lstrip('/'), App())
cherrypy.tools.websocket = WebSocketTool()
WebSocketPlugin(cherrypy.engine).subscribe()
def get_conf(self):
'''
Combine the CherryPy configuration with the rest_cherrypy config values

View file

@ -0,0 +1,206 @@
import json
import logging
logger = logging.getLogger(__name__)
class SaltInfo:
'''
Class to handle processing and publishing of "real time" Salt upates.
'''
def __init__(self, handler):
'''
handler is expected to be the server side end of a websocket
connection.
'''
self.handler = handler
'''
These represent a "real time" view into Salt's jobs.
'''
self.jobs = {}
'''
This represents a "real time" view of minions connected to
Salt.
'''
self.minions = {}
def publish_minions(self):
'''
Publishes minions as a list of dicts.
'''
minions = []
for minion, minion_info in self.minions.iteritems():
curr_minion = {}
curr_minion.update(minion_info)
curr_minion.update({'id': minion})
minions.append(curr_minion)
ret = {'minions': minions}
self.handler.send(json.dumps(ret), False)
def publish(self, key, data):
'''
Publishes the data to the event stream.
'''
publish_data = {key: data}
self.handler.send(json.dumps(publish_data), False)
def process_minion_update(self, event_data):
'''
Associate grains data with a minion and publish minion update
'''
tag = event_data['tag']
event_info = event_data['data']
_, _, _, _, mid = tag.split('/')
if not self.minions.get(mid, None):
self.minions[mid] = {}
minion = self.minions[mid]
minion.update({'grains': event_info['return']})
self.publish_minions()
def process_ret_job_event(self, event_data):
'''
Process a /ret event returned by Salt for a particular minion.
These events contain the returned results from a particular execution.
'''
tag = event_data['tag']
event_info = event_data['data']
_, _, jid, _, mid = tag.split('/')
job = self.jobs.setdefault(jid, {})
minion = job.setdefault('minions', {}).setdefault(mid, {})
minion.update({'return': event_info['return']})
minion.update({'retcode': event_info['retcode']})
minion.update({'success': event_info['success']})
job_complete = all([minion['success'] for mid, minion
in job['minions'].iteritems()])
if job_complete:
job['state'] = 'complete'
self.publish('jobs', self.jobs)
def process_new_job_event(self, event_data):
'''
Creates a new job with properties from the event data
like jid, function, args, timestamp.
Also sets the initial state to started.
Minions that are participating in this job are also noted.
'''
job = None
tag = event_data['tag']
event_info = event_data['data']
minions = {}
for mid in event_info['minions']:
minions[mid] = {'success': False}
job = {
'jid': event_info['jid'],
'start_time': event_info['_stamp'],
'minions': minions, # is a dictionary keyed by mids
'fun': event_info['fun'],
'tgt': event_info['tgt'],
'tgt_type': event_info['tgt_type'],
'state': 'running',
}
self.jobs[event_info['jid']] = job
self.publish('jobs', self.jobs)
def process_key_event(self, event_data):
tag = event_data['tag']
event_info = event_data['data']
'''
Tag: salt/key
Data:
{'_stamp': '2014-05-20T22:45:04.345583',
'act': 'delete',
'id': 'compute.home',
'result': True}
'''
if event_info['act'] == 'delete':
self.minions.pop(event_info['id'], None)
elif event_info['act'] == 'accept':
self.minions.setdefault(event_info['id'], {})
self.publish_minions()
def process_presense_events(salt_data, token, opts):
'''
Check if any minions have connected or dropped.
Send a message to the client if they have.
'''
tag = event_data['tag']
event_info = event_data['data']
minions_detected = event_info['present']
curr_minions = self.minions.keys()
changed = False
# check if any connections were dropped
dropped_minions = set(curr_minions) - set(minions_detected)
for minion in dropped_minions:
changed = True
self.minions.pop(minion, None)
# check if any new connections were made
new_minions = set(minions_detected) - set(curr_minions)
tgt = ','.join(new_minions)
if tgt:
changed = True
client = saltapi.APIClient(opts)
client.run(
{
'fun': 'grains.items',
'tgt': tgt,
'expr_type': 'list',
'mode': 'client',
'client': 'local',
'async': 'local_async',
'token': token,
})
if changed:
self.publish_minions()
def process(self, salt_data, token, opts):
'''
Process events and publish data
'''
parts = salt_data['tag'].split('/')
if len(parts) < 2:
return
# TBD: Simplify these conditional expressions
if parts[1] == 'job':
if parts[3] == 'new':
self.process_new_job_event(salt_data)
if salt_data['data']['fun'] == 'grains.items':
self.minions = {}
elif parts[3] == 'ret':
self.process_ret_job_event(salt_data)
if salt_data['data']['fun'] == 'grains.items':
self.process_minion_update(salt_data)
if parts[1] == 'key':
self.process_key_event(salt_data)
if parts[1] == 'presense':
self.process_presense_events(salt_data, token, opts)