Merge pull request #62868 from lkubb/script-engine-onchange

Add onchange configuration for script engine
This commit is contained in:
Gareth J. Greenaway 2022-12-08 13:30:13 -08:00 committed by GitHub
commit cb820b6bf0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 179 additions and 6 deletions

1
changelog/62867.added Normal file
View file

@ -0,0 +1 @@
Added onchange configuration for script engine

View file

@ -10,13 +10,23 @@ Example Config
cmd: /some/script.py -a 1 -b 2
output: json
interval: 5
onchange: false
Script engine configs:
cmd: Script or command to execute
output: Any available saltstack deserializer
interval: How often in seconds to execute the command
cmd
Script or command to execute
output
Any available saltstack deserializer
interval
How often in seconds to execute the command
onchange
.. versionadded:: 3006.0
Only fire an event if the tag-specific output changes. Defaults to False.
"""
import logging
@ -53,7 +63,7 @@ def _get_serializer(output):
)
def start(cmd, output="json", interval=1):
def start(cmd, output="json", interval=1, onchange=False):
"""
Parse stdout of a command and generate an event
@ -80,6 +90,7 @@ def start(cmd, output="json", interval=1):
:param cmd: The command to execute
:param output: How to deserialize stdout of the script
:param interval: How often to execute the script
:param onchange: Only fire an event if the tag-specific output changes
"""
try:
cmd = shlex.split(cmd)
@ -96,8 +107,10 @@ def start(cmd, output="json", interval=1):
else:
fire_master = __salt__["event.send"]
while True:
if onchange:
events = {}
while True:
try:
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
@ -116,8 +129,12 @@ def start(cmd, output="json", interval=1):
data["id"] = __opts__["id"]
if tag:
if onchange and tag in events and events[tag] == data:
continue
log.info("script engine firing event with tag %s", tag)
fire_master(tag=tag, data=data)
if onchange:
events[tag] = data
log.debug("Closing script with pid %d", proc.pid)
proc.stdout.close()

View file

@ -2,11 +2,18 @@
unit tests for the script engine
"""
import contextlib
import logging
import signal
import sys
import pytest
import salt.engines.script as script
from salt.exceptions import CommandExecutionError
from tests.support.mock import patch
from tests.support.mock import Mock, patch
log = logging.getLogger(__name__)
@pytest.fixture
@ -43,3 +50,151 @@ def test__read_stdout_terminates_properly():
popen_mock.stdout.readline.return_value = b""
with pytest.raises(StopIteration):
next(script._read_stdout(popen_mock))
@pytest.fixture()
def serializer():
with patch("salt.engines.script._get_serializer", autospec=True) as get_serializer:
serializer = Mock()
get_serializer.return_value = serializer
serializer.deserialize.side_effect = lambda x: x
yield serializer
@pytest.fixture()
def event_send():
event = Mock()
with patch("salt.utils.event.get_master_event", autospec=True) as get_master:
get_master.return_value.fire_event = event
with patch.dict(script.__salt__, {"event.send": event}):
yield event
@pytest.fixture()
def raw_event():
with patch("salt.engines.script._read_stdout", autospec=True) as stdout:
yield stdout
@pytest.fixture()
def proc():
with patch("subprocess.Popen", autospec=True) as popen:
proc = Mock()
proc.wait.return_value = False
proc.pid = 1337
popen.return_value = proc
yield
@pytest.fixture()
def event():
return {"tag": "test", "data": {"foo": "bar", "id": "test"}}
@pytest.fixture()
def new_tag():
return {"tag": "testnew", "data": {"foo": "bar", "id": "test"}}
@pytest.fixture()
def new_event():
return {"tag": "test", "data": {"foo": "baz", "id": "test"}}
@pytest.fixture
def timeout():
"""
This fixture was proposed by waynew to allow testing
an otherwise infinite loop.
Once https://github.com/saltstack/salt/pull/62910 is merged,
this can be migrated.
"""
if sys.platform.startswith("win"):
pytest.skip("SIGALRM is not available on Windows.")
def handler(num, frame):
raise TimeoutError()
@contextlib.contextmanager
def _timeout(t=1):
signal.signal(signal.SIGALRM, handler)
signal.alarm(t)
try:
yield _timeout
except TimeoutError:
pass
finally:
signal.alarm(0)
return _timeout
@pytest.mark.usefixtures("proc", "serializer", "event_send", "raw_event", "timeout")
class TestStart:
def test_start(self, event, raw_event, event_send, timeout):
raw_event.side_effect = ([event],)
try:
with timeout():
script.start("cmd", interval=1.5)
except StopIteration:
log.warning("Timeout failure")
event_send.assert_called_once_with(tag=event["tag"], data=event["data"])
def test_multiple(self, event, new_event, raw_event, event_send, timeout):
raw_event.side_effect = ([event, new_event],)
try:
with timeout():
script.start("cmd", interval=1.5)
except StopIteration:
log.warning("Timeout failure")
assert event_send.call_count == 2
event_send.assert_any_call(tag=event["tag"], data=event["data"])
event_send.assert_any_call(tag=new_event["tag"], data=new_event["data"])
def test_onchange_no_change_no_output(self, event, raw_event, event_send, timeout):
raw_event.side_effect = 110 * ([event],)
try:
with timeout():
script.start("cmd", onchange=True, interval=0.01)
except StopIteration:
log.warning("Timeout failure")
event_send.assert_called_once_with(tag=event["tag"], data=event["data"])
def test_start_onchange_no_change_multiple(
self, event, new_tag, raw_event, event_send, timeout
):
raw_event.side_effect = 110 * ([event, new_tag],)
try:
with timeout():
script.start("cmd", onchange=True, interval=0.01)
except StopIteration:
log.warning("Timeout failure")
assert event_send.call_count == 2
event_send.assert_any_call(tag=event["tag"], data=event["data"])
event_send.assert_any_call(tag=new_tag["tag"], data=new_tag["data"])
def test_start_onchange_with_change(
self, event, new_event, raw_event, event_send, timeout
):
raw_event.side_effect = 50 * [[event]] + 60 * [[new_event]]
try:
with timeout():
script.start("cmd", onchange=True, interval=0.01)
except StopIteration:
log.warning("Timeout failure")
assert event_send.call_count == 2
event_send.assert_any_call(tag=event["tag"], data=event["data"])
event_send.assert_called_with(tag=new_event["tag"], data=new_event["data"])
def test_start_onchange_new_tag(
self, event, new_tag, raw_event, event_send, timeout
):
raw_event.side_effect = 50 * [[event]] + 60 * [[new_tag]]
try:
with timeout():
script.start("cmd", onchange=True, interval=0.01)
except StopIteration:
log.warning("Timeout failure")
event_send.assert_any_call(tag=event["tag"], data=event["data"])
event_send.assert_called_with(tag=new_tag["tag"], data=new_tag["data"])