Merge pull request #61150 from mymindstorm/splunk-event-returner

Add event return capability to Splunk returner
This commit is contained in:
David Hilton 2022-02-07 14:34:23 -07:00 committed by GitHub
commit a575896ef0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 69 deletions

1
changelog/50815.added Normal file
View file

@ -0,0 +1 @@
Added event return capability to Splunk returner

View file

@ -43,9 +43,45 @@ def __virtual__():
def returner(ret):
"""
Send a message to Splunk via the HTTP Event Collector
Send a message to Splunk via the HTTP Event Collector.
Requires the Splunk HTTP Event Collector running on port 8088.
This is available on Splunk Enterprise version 6.3 or higher.
"""
return _send_splunk(ret)
# Get Splunk Options
opts = _get_options()
log.info(
"Options: %s",
salt.utils.json.dumps(opts),
)
http_collector = _create_http_event_collector(opts)
payload = _prepare_splunk_payload(ret, opts)
http_collector.sendEvent(payload)
return True
def event_return(events):
"""
Return events to Splunk via the HTTP Event Collector.
Requires the Splunk HTTP Event Collector running on port 8088.
This is available on Splunk Enterprise version 6.3 or higher.
"""
# Get Splunk Options
opts = _get_options()
log.info(
"Options: %s",
salt.utils.json.dumps(opts),
)
http_collector = _create_http_event_collector(opts)
for event in events:
payload = _prepare_splunk_payload(event, opts)
http_collector.sendEvent(payload)
return True
def _get_options():
@ -70,40 +106,36 @@ def _get_options():
return splunk_opts
def _send_splunk(event, index_override=None, sourcetype_override=None):
def _create_http_event_collector(opts):
"""
Send the results to Splunk.
Requires the Splunk HTTP Event Collector running on port 8088.
This is available on Splunk Enterprise version 6.3 or higher.
Prepare a connection to the Splunk HTTP event collector.
"""
# Get Splunk Options
opts = _get_options()
log.info(
"Options: %s",
salt.utils.json.dumps(opts),
)
http_event_collector_key = opts["token"]
http_event_collector_host = opts["indexer"]
http_event_collector_verify_ssl = opts["verify_ssl"]
# Set up the collector
splunk_event = http_event_collector(
# Return the collector
return http_event_collector(
http_event_collector_key,
http_event_collector_host,
verify_ssl=http_event_collector_verify_ssl,
)
def _prepare_splunk_payload(event, opts):
"""
Prepare a payload for submission to the Splunk HTTP event collector.
"""
# Get Splunk Options
opts = _get_options()
# init the payload
payload = {}
# Set up the event metadata
if index_override is None:
payload.update({"index": opts["index"]})
else:
payload.update({"index": index_override})
if sourcetype_override is None:
payload.update({"sourcetype": opts["sourcetype"]})
else:
payload.update({"index": sourcetype_override})
payload.update({"index": opts["index"]})
payload.update({"sourcetype": opts["sourcetype"]})
# Add the event
payload.update({"event": event})
@ -111,14 +143,10 @@ def _send_splunk(event, index_override=None, sourcetype_override=None):
"Payload: %s",
salt.utils.json.dumps(payload),
)
# Fire it off
splunk_event.sendEvent(payload)
return True
return payload
# Thanks to George Starcher for the http_event_collector class (https://github.com/georgestarcher/)
class http_event_collector:
def __init__(
self,
@ -187,45 +215,3 @@ class http_event_collector:
if http_event_collector_debug:
log.debug(r.text)
log.debug(data)
def batchEvent(self, payload, eventtime=""):
# Method to store the event in a batch to flush later
# Fill in local hostname if not manually populated
if "host" not in payload:
payload.update({"host": self.host})
serialized_payload = salt.utils.json.dumps(payload)
payloadLength = len(serialized_payload)
if (self.currentByteLength + payloadLength) > self.maxByteLength:
self.flushBatch()
# Print debug info if flag set
if http_event_collector_debug:
log.debug("auto flushing")
else:
self.currentByteLength = self.currentByteLength + payloadLength
# If eventtime in epoch not passed as optional argument use current system time in epoch
if not eventtime:
eventtime = str(int(time.time()))
# Update time value on payload if need to use system time
data = {"time": eventtime}
data.update(payload)
self.batchEvents.append(serialized_payload)
def flushBatch(self):
# Method to flush the batch list of events
if len(self.batchEvents) > 0:
headers = {"Authorization": "Splunk " + self.token}
r = requests.post(
self.server_uri,
data=" ".join(self.batchEvents),
headers=headers,
verify=self.verify_ssl,
)
self.batchEvents = []
self.currentByteLength = 0

View file

@ -96,3 +96,34 @@ class SplunkReturnerTest(TestCase, LoaderModuleMockMixin):
requests_post.call_args_list[0][0][0]
== "https://the.splunk.domain:8088/services/collector/event"
)
def test_verify_event_returner(self):
payload = [{"some": "payload"}, {"another": "event"}]
ts = 1234565789
host = "TheHostName"
verify_ssl = True
requests_post = MagicMock()
with patch(
"salt.returners.splunk.time.time", MagicMock(return_value=ts)
), patch(
"salt.returners.splunk.socket.gethostname", MagicMock(return_value=host)
), patch(
"requests.post", requests_post
), patch.dict(
splunk.__opts__["splunk_http_forwarder"], verify_ssl=verify_ssl
):
splunk.event_return(payload)
for i in range(len(payload)):
assert (
json.loads(requests_post.call_args_list[0][1]["data"])["event"]
in payload
)
assert requests_post.call_args_list[0][1]["verify"] == verify_ssl
assert requests_post.call_args_list[0][1]["headers"] == {
"Authorization": "Splunk TheToken"
}
assert (
requests_post.call_args_list[0][0][0]
== "https://the.splunk.domain:8088/services/collector/event"
)