Add onchange configuration for script engine

This commit is contained in:
jeanluc 2022-10-12 12:23:10 +02:00
parent 4e1bbec71d
commit 448b5f519f
No known key found for this signature in database
GPG key ID: 3EB52D4C754CD898
3 changed files with 116 additions and 7 deletions

1
changelog/62867.added Normal file
View file

@ -0,0 +1 @@
Added onchange configuration for script engine

View file

@ -10,13 +10,23 @@ Example Config
cmd: /some/script.py -a 1 -b 2
output: json
interval: 5
onchange: false
Script engine configs:
cmd: Script or command to execute
output: Any available saltstack deserializer
interval: How often in seconds to execute the command
cmd
Script or command to execute
output
Any available saltstack deserializer
interval
How often in seconds to execute the command
onchange
.. versionadded:: 3006
Only fire an event if the tag-specific output changes. Defaults to False.
"""
import logging
@ -53,7 +63,7 @@ def _get_serializer(output):
)
def start(cmd, output="json", interval=1):
def start(cmd, output="json", interval=1, onchange=False):
"""
Parse stdout of a command and generate an event
@ -80,6 +90,7 @@ def start(cmd, output="json", interval=1):
:param cmd: The command to execute
:param output: How to deserialize stdout of the script
:param interval: How often to execute the script
:param onchange: Only fire an event if the tag-specific output changes
"""
try:
cmd = shlex.split(cmd)
@ -96,7 +107,10 @@ def start(cmd, output="json", interval=1):
else:
fire_master = __salt__["event.send"]
while True:
if onchange:
events = {}
while _running():
try:
proc = subprocess.Popen(
@ -116,8 +130,12 @@ def start(cmd, output="json", interval=1):
data["id"] = __opts__["id"]
if tag:
if onchange and tag in events and events[tag] == data:
continue
log.info("script engine firing event with tag %s", tag)
fire_master(tag=tag, data=data)
if onchange:
events[tag] = data
log.debug("Closing script with pid %d", proc.pid)
proc.stdout.close()
@ -132,3 +150,8 @@ def start(cmd, output="json", interval=1):
proc.terminate()
time.sleep(interval)
# helper to test the start function
def _running():
return True

View file

@ -2,17 +2,20 @@
unit tests for the script engine
"""
import copy
import pytest
import salt.config
import salt.engines.script as script
from salt.exceptions import CommandExecutionError
from tests.support.mock import patch
from tests.support.mock import Mock, patch
@pytest.fixture
def configure_loader_modules():
opts = salt.config.DEFAULT_MASTER_OPTS
opts = copy.deepcopy(salt.config.DEFAULT_MASTER_OPTS)
opts["id"] = "test"
return {script: {"__opts__": opts}}
@ -45,3 +48,85 @@ def test__read_stdout_terminates_properly():
popen_mock.stdout.readline.return_value = b""
with pytest.raises(StopIteration):
next(script._read_stdout(popen_mock))
@pytest.fixture()
def serializer():
with patch("salt.engines.script._get_serializer", autospec=True) as get_serializer:
serializer = Mock()
get_serializer.return_value = serializer
serializer.deserialize.side_effect = lambda x: x
yield serializer
@pytest.fixture(params=[1])
def runs(request):
runs = Mock()
runs.side_effect = request.param * [True] + [False]
with patch("salt.engines.script._running", runs):
yield
@pytest.fixture()
def event_send():
event = Mock()
with patch("salt.utils.event.get_master_event") as get_master:
get_master.fire_event = event
with patch.dict(script.__salt__, {"event.send": event}):
yield event
@pytest.fixture()
def raw_event():
with patch("salt.engines.script._read_stdout") as stdout:
yield stdout
@pytest.fixture()
def proc():
with patch("salt.engines.script.subprocess.Popen") as popen:
proc = Mock()
proc.wait.return_value = False
proc.pid = 1337
popen.return_value = proc
yield
@pytest.fixture()
def sleep():
with patch("time.sleep"):
yield
@pytest.fixture()
def event():
return {"tag": "test", "data": {"foo": "bar", "id": "test"}}
@pytest.fixture()
def new_event():
return {"tag": "test", "data": {"foo": "baz", "id": "test"}}
@pytest.mark.usefixtures(
"proc", "serializer", "runs", "sleep", "event_send", "raw_event"
)
class TestStart:
def test_start(self, event, raw_event, event_send):
raw_event.return_value = [event]
script.start("cmd")
event_send.assert_called_once_with(tag=event["tag"], data=event["data"])
@pytest.mark.parametrize("runs", [10], indirect=True)
def test_start_onchange_no_change(self, event, raw_event, event_send):
raw_event.side_effect = 10 * [[event]]
script.start("cmd", onchange=True)
event_send.assert_called_once_with(tag=event["tag"], data=event["data"])
@pytest.mark.parametrize("runs", [8], indirect=True)
def test_start_onchange_with_change(self, event, new_event, raw_event, event_send):
raw_event.side_effect = 3 * [[event]] + 5 * [[new_event]]
script.start("cmd", onchange=True)
assert event_send.call_count == 2
event_send.assert_any_call(tag=event["tag"], data=event["data"])
event_send.assert_called_with(tag=new_event["tag"], data=new_event["data"])