Add tests, fix filtering, free text queries

This commit is contained in:
Twangboy 2022-08-25 14:09:46 -06:00 committed by Megan Wilhite
parent 8660718fa7
commit a9425ffbdf
8 changed files with 246 additions and 733 deletions

1
changelog/54713.added Normal file
View file

@ -0,0 +1 @@
Added Windows Event Viewer support

View file

@ -510,7 +510,7 @@ execution modules
win_dism
win_dns_client
win_dsc
win_event_viewer
win_event
win_file
win_firewall
win_groupadd

View file

@ -0,0 +1,6 @@
======================
salt.modules.win_event
======================
.. automodule:: salt.modules.win_event
:members:

View file

@ -1,6 +0,0 @@
=============================
salt.modules.win_event_viewer
=============================
.. automodule:: salt.modules.win_event_viewer
:members:

View file

@ -3,30 +3,28 @@ A module for working with the Windows Event log system.
"""
# https://docs.microsoft.com/en-us/windows/win32/eventlog/event-logging
from __future__ import absolute_import
# Import Python libs
import collections
import logging
import xmltodict
# Import Salt Libs
import salt.utils.platform
import salt.utils.stringutils
from salt.exceptions import CommandExecutionError
# Import Third Party Libs
try:
import pywintypes
import win32evtlog
import win32evtlogutil
import winerror
import pywintypes
IMPORT_STATUS = True
except ImportError:
IMPORT_STATUS = False
# keys of all the parts of a Event supported by the API
EVENT_PARTS = ("closingRecordNumber",
EVENT_PARTS = (
"closingRecordNumber",
"computerName",
"data",
"eventCategory",
@ -58,14 +56,17 @@ EVENT_TYPES = {
}
# keys time
TIME_PARTS = ("year",
TIME_PARTS = (
"year",
"month",
"day",
"hour",
"minute",
"second",
)
TimeTuple = collections.namedtuple("TimeTuple", "year, month, day, hour, minute, second")
TimeTuple = collections.namedtuple(
"TimeTuple", "year, month, day, hour, minute, second"
)
log = logging.getLogger(__name__)
@ -78,7 +79,7 @@ def __virtual__():
"""
if not salt.utils.platform.is_windows():
return False, "win_event: Most be on Windows"
return False, "win_event: Must be on Windows"
if not IMPORT_STATUS:
return False, "win_event: Missing PyWin32"
return __virtualname__
@ -93,8 +94,11 @@ def _to_bytes(data, encoding="utf-8", encode_keys=False):
to.
Args:
data (object): The string object to encode
encoding(str): The encoding type
encode_keys(bool): If false key strings will not be turned into bytes
Returns:
@ -135,13 +139,16 @@ def _raw_time(time):
Will make a pywintypes.datetime into a TimeTuple.
Args:
time (ob): A datetime object
Returns:
TimeTuple: A TimeTuple
"""
return TimeTuple._make((time.year, time.month, time.day, time.hour, time.minute, time.second))
return TimeTuple(
time.year, time.month, time.day, time.hour, time.minute, time.second
)
def _make_event_dict(event):
@ -149,6 +156,7 @@ def _make_event_dict(event):
Will make a PyEventLogRecord into a dictionary
Args:
event (PyEventLogRecord): An event to convert to a dictionary
Returns:
@ -158,7 +166,9 @@ def _make_event_dict(event):
event_dict = {}
for event_part in EVENT_PARTS:
# get object value and add it to the event dict
event_dict[event_part] = getattr(event, event_part[0].upper() + event_part[1:], None)
event_dict[event_part] = getattr(
event, event_part[0].upper() + event_part[1:], None
)
# format items
event_dict["eventID"] = winerror.HRESULT_CODE(event_dict["eventID"])
@ -175,6 +185,7 @@ def _get_handle(log_name):
Will try to open a PyHANDLE to the Event System
Args:
log_name (str): The name of the log to open
Returns:
@ -185,9 +196,9 @@ def _get_handle(log_name):
# "log close" can fail if this is not done
try:
return win32evtlog.OpenEventLog(None, log_name)
except pywintypes.error:
except pywintypes.error as exc:
raise FileNotFoundError(
"{0} log can not be found or access was denied!".format(log_name)
"Failed to open log: {}\nError: {}".format(log_name, exc.strerror)
)
@ -196,6 +207,7 @@ def _close_handle(handle):
Will close the handle to the event log
Args:
handle (PyHANDLE): The handle to the event log to close
"""
@ -208,6 +220,7 @@ def _event_generator(log_name):
Get all log events one by one. Events are not ordered
Args:
log_name(str): The name of the log to retrieve
Yields:
@ -230,11 +243,12 @@ def _event_generator(log_name):
_close_handle(handle)
def _event_generator_sorted(log_name):
def _event_generator_with_time(log_name):
"""
Sorts the results of the event generator
Args:
log_name (str): The name of the log to retrieve
Yields:
@ -254,33 +268,48 @@ def _event_generator_sorted(log_name):
def _event_generator_filter(log_name, all_requirements=True, **kwargs):
"""
Will find events that meet the requirements in the filter
Will find events that meet the requirements in the filter. Can be any item
in the return for the event.
Args:
log_name (str): The name of the log to retrieve
all_requirements (bool): Should the results match all requirements.
``True`` matches all requirements. ``False`` matches any
requirement.
Kwargs:
Can be any item in the return for the event. Common kwargs are:
eventID (int): The event ID number
eventType (int): The event type number. Valid options and their
corresponding meaning are:
- 0 : Success
- 1 : Error
- 2 : Warning
- 4 : Information
- 8 : Audit Success
- 10 : Audit Failure
year (int): The year
month (int): The month
day (int): The day of the month
hour (int): The hour
minute (int): The minute
second (int): The second
eventCategory (int): The event category number
sid (sid): The SID of the user that created the event
sourceName (str): The name of the event source
Yields:
@ -288,7 +317,7 @@ def _event_generator_filter(log_name, all_requirements=True, **kwargs):
CLI Example:
.. code-block::python
.. code-block:: python
# Return all events from the Security log with an ID of 1100
_event_generator_filter("Security", eventID=1100)
@ -300,7 +329,7 @@ def _event_generator_filter(log_name, all_requirements=True, **kwargs):
_event_generator_filter("System", eventType=1, sourceName="Service Control Manager", data="netprofm")
"""
for event, info in _event_generator_sorted(log_name):
for event, info in _event_generator_with_time(log_name):
if all_requirements:
# all keys need to match each other
for key in kwargs:
@ -317,7 +346,7 @@ def _event_generator_filter(log_name, all_requirements=True, **kwargs):
log.trace(
"utf-8: Does %s == %s",
repr(kwargs[key]),
repr(info[key].decode("utf-8"))
repr(info[key].decode("utf-8")),
)
if kwargs[key] != info[key].decode("utf-8"):
# try utf-16 and strip null bytes
@ -325,9 +354,11 @@ def _event_generator_filter(log_name, all_requirements=True, **kwargs):
log.trace(
"utf-16: Does %s == %s",
repr(kwargs[key]),
repr(info[key].decode("utf-16").strip("\x00"))
repr(info[key].decode("utf-16").strip("\x00")),
)
if kwargs[key] != info[key].decode("utf-16").strip("\x00"):
if kwargs[key] != info[key].decode("utf-16").strip(
"\x00"
):
break
except UnicodeDecodeError:
log.trace("Failed to decode (utf-16): %s", info[key])
@ -355,7 +386,7 @@ def _event_generator_filter(log_name, all_requirements=True, **kwargs):
log.trace(
"utf-8: Does %s == %s",
repr(kwargs[key]),
repr(info[key].decode("utf-8"))
repr(info[key].decode("utf-8")),
)
if kwargs[key] == info[key].decode("utf-8"):
yield info
@ -366,7 +397,7 @@ def _event_generator_filter(log_name, all_requirements=True, **kwargs):
log.trace(
"utf-16: Does %s == %s",
repr(kwargs[key]),
repr(info[key].decode("utf-16").strip("\x00"))
repr(info[key].decode("utf-16").strip("\x00")),
)
if kwargs[key] == info[key].decode("utf-16").strip("\x00"):
yield info
@ -388,6 +419,7 @@ def get(log_name):
``Applications`` log, can take a long time.
Args:
log_name(str): The name of the log to retrieve.
Returns
@ -395,7 +427,7 @@ def get(log_name):
CLI Example:
.. code-block::bash
.. code-block:: bash
salt '*' win_event.get Application
"""
@ -420,11 +452,16 @@ def query(log_name, query_text=None, records=20, latest=True, raw=False):
put spaces between comparison operators. For example: ``this >= that``.
Args:
log_name (str): The name of the log to query
query_text (str): The filter to apply to the log
records (int): The number of records to return
latest (bool): ``True`` will return the newest events. ``False`` will
return the oldest events. Default is ``True``
raw (bool): ``True`` will return the raw xml results. ``False`` will
return the xml converted to a dictionary. Default is ``False``
@ -433,7 +470,7 @@ def query(log_name, query_text=None, records=20, latest=True, raw=False):
CLI Example:
.. code-block::bash
.. code-block:: bash
# Return the 20 most recent events from the Application log with an event ID of 22
salt '*' win_event.query Application "*[System[(EventID=22)]]"
@ -467,12 +504,7 @@ def query(log_name, query_text=None, records=20, latest=True, raw=False):
if not latest:
direction = win32evtlog.EvtQueryForwardDirection
results = win32evtlog.EvtQuery(
log_name,
direction,
query_text,
None
)
results = win32evtlog.EvtQuery(log_name, direction, query_text, None)
event_list = []
for evt in win32evtlog.EvtNext(results, records):
@ -485,68 +517,52 @@ def query(log_name, query_text=None, records=20, latest=True, raw=False):
return event_list
def get_sorted(log_name):
"""
Make a list of events sorted by date.
.. warning::
Running this command on a log with thousands of events, such as the
``Applications`` log, can take a long time.
Args:
log_name (str): The name of the log to retrieve
Returns:
dict: A dictionary of events
CLI Example:
.. code-block::bash
# This command can take a long time
salt "*" win_event.get_sorted Application
"""
event_info = {event_part: collections.defaultdict(list) for event_part in EVENT_PARTS + TIME_PARTS}
for event, info in _event_generator_sorted(log_name):
for part in info:
event_info[part][info.get(part)].append(event)
return event_info
def get_filtered(log_name, all_requirements=True, **kwargs):
"""
Will find events that match the fields and values specified in the kwargs.
Kwargs can be any item in the return for the event.
.. warning::
Running this command on a log with thousands of events, such as the
``Applications`` log, can take a long time.
Args:
log_name (str): The name of the log to retrieve
all_requirements (bool): ``True`` matches all requirements. ``False``
matches any requirement. Default is ``True``
Kwargs:
Can be any item in the return for the event. Common kwargs are:
eventID (int): The event ID number
eventType (int): The event type number. Valid options and their
corresponding meaning are:
- 0 : Success
- 1 : Error
- 2 : Warning
- 4 : Information
- 8 : Audit Success
- 10 : Audit Failure
year (int): The year
month (int): The month
day (int): The day of the month
hour (int): The hour
minute (int): The minute
second (int): The second
eventCategory (int): The event category number
sid (sid): The SID of the user that created the event
sourceName (str): The name of the event source
Returns:
@ -554,7 +570,7 @@ def get_filtered(log_name, all_requirements=True, **kwargs):
CLI Example:
.. code-block::bash
.. code-block:: bash
# Return all events from the Security log with an ID of 1100
salt "*" win_event.get_filtered Security eventID=1100
@ -581,7 +597,7 @@ def get_log_names():
CLI Example:
.. code-block::bash
.. code-block:: bash
salt "*" win_event.get_log_names
"""
@ -605,18 +621,26 @@ def add(
Adds an event to the application event log.
Args:
log_name (str): The name of the application or source
event_id (int): The event ID
event_category (int): The event category
event_type (str): The event category. Must be one of:
- Success
- Error
- Warning
- Information
- AuditSuccess
- AuditFailure
event_strings (list): A list of strings
event_data (bytes): Event data. Strings will be converted to bytes
event_sid (sid): The SID for the event
Raises:
@ -681,7 +705,7 @@ def add(
)
def clear_log(log_name):
def clear(log_name, backup=None):
"""
Clears the specified event log.
@ -689,17 +713,20 @@ def clear_log(log_name):
A clear log event will be added to the log after it is cleared.
Args:
log_name (str): The name of the log to clear
backup (str): Path to backup file
CLI Example:
.. code-block::bash
.. code-block:: bash
salt "*" win_event.clear_log Application
salt "*" win_event.clear Application
"""
handle = _get_handle(log_name)
win32evtlog.ClearEventLog(handle, log_name)
win32evtlog.ClearEventLog(handle, backup)
_close_handle(handle)
@ -708,6 +735,7 @@ def count(log_name):
Gets the number of events in the specified.
Args:
log_name (str): The name of the log
Returns:
@ -715,7 +743,7 @@ def count(log_name):
CLI Example:
.. code-block::bash
.. code-block:: bash
salt "*" win_event.count Application
"""

View file

@ -1,15 +0,0 @@
import pytest
pytestmark = [
pytest.mark.windows_whitelisted,
]
@pytest.fixture(scope="module")
def win_event(modules):
return modules.win_event
def test_get(win_event):
events = win_event.get()
assert events == {}

View file

@ -1,43 +1,52 @@
import datetime
import pytest
import salt.modules.win_event as win_event
pytestmark = [
pytest.mark.windows_whitelisted,
pytest.mark.skip_unless_on_windows,
pytest.mark.destructive_test,
]
@pytest.fixture(scope="function")
def application_events():
# This deletes the contents of the Application event log
win_event.clear("Application")
win_event.add("Application", 2011, event_type="Information")
win_event.add("Application", 2011, event_type="Information")
win_event.add("Application", 2011, event_type="Information")
win_event.add("Application", 2011, event_type="Information")
win_event.add("Application", 2020, event_type="Warning")
win_event.add("Application", 2020, event_type="Warning")
yield
# This deletes the contents of the Application event log
win_event.clear("Application")
def test__to_bytes_utf8():
data = {'key1': 'item1',
'key2': [1, 2, 'item2'],
'key3': 45,
45: str}
data = {"key1": "item1", "key2": [1, 2, "item2"], "key3": 45, 45: str}
new_data = win_event._to_bytes(data, 'utf-8', False)
new_data = win_event._to_bytes(data, "utf-8", False)
assert 'key1' in new_data
assert "key1" in new_data
assert new_data['key1'] == 'item1'.encode('utf-8')
assert new_data['key2'][2] == 'item2'.encode('utf-8')
assert new_data["key1"] == b"item1"
assert new_data["key2"][2] == b"item2"
def test__to_bytes_cp1252():
data = {'key1': 'item1',
'key2': [1, 2, 'item2'],
'key3': 45,
45: str}
data = {"key1": "item1", "key2": [1, 2, "item2"], "key3": 45, 45: str}
new_data = win_event._to_bytes(data, 'CP1252', True)
new_data = win_event._to_bytes(data, "CP1252", True)
assert b'key1' in new_data
assert b'key2' in new_data
assert b'key3' in new_data
assert b"key1" in new_data
assert b"key2" in new_data
assert b"key3" in new_data
assert new_data['key1'.encode('CP1252')] == 'item1'.encode('CP1252')
assert new_data['key2'.encode('CP1252')][2] == 'item2'.encode('CP1252')
assert new_data["key1".encode("CP1252")] == "item1".encode("CP1252")
assert new_data["key2".encode("CP1252")][2] == "item2".encode("CP1252")
def test__raw_time():
@ -45,34 +54,109 @@ def test__raw_time():
assert raw_time == (2019, 7, 2, 10, 8, 19)
def test_count():
@pytest.mark.destructive_test
def test_count(application_events):
"""
Test win_event.count
"""
ret = win_event.count("System")
assert ret > 0
ret = win_event.count("Application")
assert ret == 6
def test_security():
ret = win_event.get("Security")
print(len(ret))
assert len(ret) > 0
@pytest.mark.destructive_test
def test_get(application_events):
ret = win_event.get("Application")
assert len(ret) == 6
def test_windows_powershell():
ret = win_event.get("Windows PowerShell")
print(len(ret))
assert len(ret) > 0
@pytest.mark.destructive_test
def test_query(application_events):
ret = win_event.query("Application")
assert len(ret) == 6
def test_operational():
ret = win_event.get("Operational")
print(len(ret))
assert len(ret) > 0
@pytest.mark.destructive_test
def test_query_records(application_events):
ret = win_event.query("Application", records=3)
for item in ret:
assert isinstance(item, dict)
assert len(ret) == 3
def test_query():
@pytest.mark.destructive_test
def test_query_raw(application_events):
ret = win_event.query("Application", raw=True)
for item in ret:
assert isinstance(item, str)
assert len(ret) == 6
ret = win_event.query("Microsoft-Windows-TerminalServices-LocalSessionManager/Operational", 22)
print(len(ret))
assert len(ret) > 0
@pytest.mark.destructive_test
def test_query_level(application_events):
ret = win_event.query("Application", "*[System[(Level=3)]]")
assert len(ret) == 2
@pytest.mark.destructive_test
def test_query_level_eventid(application_events):
ret = win_event.query(
"Application", "*[System[(Level=4 or Level=0) and (EventID=2011)]]"
)
assert len(ret) == 4
@pytest.mark.destructive_test
def test_query_last_hour(application_events):
ret = win_event.query(
"Application", "*[System[TimeCreated[timediff(@SystemTime) <= 3600000]]]"
)
assert len(ret) == 6
@pytest.mark.destructive_test
def test_get_filtered(application_events):
ret = win_event.get_filtered("Application")
assert len(ret) == 6
@pytest.mark.destructive_test
def test_get_filtered_event_id(application_events):
ret = win_event.get_filtered("Application", eventID=2011)
assert len(ret) == 4
@pytest.mark.destructive_test
def test_get_filtered_event_type(application_events):
ret = win_event.get_filtered("Application", eventType=2)
assert len(ret) == 2
@pytest.mark.destructive_test
def test_get_filtered_year(application_events):
year = datetime.datetime.now().year
ret = win_event.get_filtered("Application", year=year)
assert len(ret) == 6
@pytest.mark.destructive_test
def test_get_filtered_year_none(application_events):
year = 1999
ret = win_event.get_filtered("Application", year=year)
assert len(ret) == 0
@pytest.mark.destructive_test
def test_clear(application_events):
assert win_event.count("Application") == 6
win_event.clear("Application")
assert win_event.count("Application") == 0
@pytest.mark.destructive_test
def test_clear_backup(application_events, tmp_path):
assert win_event.count("Application") == 6
backup_log = tmp_path / "test.bak"
assert not backup_log.exists()
win_event.clear("Application", str(backup_log))
assert backup_log.exists()
assert win_event.count("Application") == 0

View file

@ -1,585 +0,0 @@
# -*- coding: utf-8 -*-
'''
test of win_event_viewer
'''
from __future__ import absolute_import
# Import Salt Libs
import salt.utils.platform
import salt.modules.win_event_viewer as win_event_viewer
# Import Salt Testing Libs
from tests.support.mixins import LoaderModuleMockMixin
from tests.support.unit import TestCase, skipIf
from tests.support.helpers import destructiveTest
from tests.support.mock import (
patch,
NO_MOCK,
NO_MOCK_REASON
)
# Import Third Party Libs
try:
import win32evtlog
import win32evtlogutil
import pywintypes
except ImportError:
pass
MAX_EVENT_LOOK_UP = 2000
class MockTime(object):
def __init__(self, year=2000, month=1, day=1, hour=1, minute=1, second=1):
self.year = year
self.month = month
self.day = day
self.hour = hour
self.minute = minute
self.second = second
class MockEvent(object):
def __init__(self,
closing_record_number=0,
computer_name='PC',
data=bytes(),
event_category=117,
event_id=101,
event_type=4,
record_number=0,
reserved_data=4,
reserved_flags=0,
sid=None,
source_name=0,
string_inserts=("cat", "m"),
time_generated=None,
time_written=None):
self.ClosingRecordNumber = closing_record_number
self.ComputerName = computer_name
self.Data = data
self.EventCategory = event_category
self.EventID = event_id
self.EventType = event_type
self.RecordNumber = record_number
# 'reserved' is a build in function
self.Reserved = reserved_data
self.ReservedFlags = reserved_flags
self.Sid = sid
self.SourceName = source_name
self.StringInserts = string_inserts
if time_generated is None:
time_generated = MockTime()
self.TimeGenerated = time_generated
if time_written is None:
time_written = MockTime()
self.TimeWritten = time_written
MOCK_EVENT_1 = MockEvent()
MOCK_EVENT_2 = MockEvent(event_category=404, time_generated=MockTime(2019, 4, 6, 2, 3, 12))
MOCK_EVENT_3 = MockEvent(reserved_data=2, string_inserts=("fail...", "error..."))
MOCK_EVENT_4 = MockEvent(event_category=301, event_id=9)
MOCK_EVENT_5 = MockEvent(event_category=300, event_id=5)
MOCK_EVENT_6 = MockEvent(computer_name='sky',
time_generated=MockTime(1997, 8, 29, 2, 14, 0),
string_inserts=("'I am a live!'", "launching..."))
EVENTS = (
MOCK_EVENT_1,
MOCK_EVENT_2,
MOCK_EVENT_3,
MOCK_EVENT_4,
MOCK_EVENT_5,
MOCK_EVENT_6,
)
class MockHandler(object):
def __init__(self, events=None):
if events is None:
events = []
self.events = events
self.generator = self.get_generator()
def __len__(self):
return len(self.events)
def get_generator(self):
for event in self.events:
yield [event]
def read(self):
try:
return next(self.generator)
except StopIteration:
return []
def mock_read_event_log(handler, flag, start):
assert isinstance(flag, int) and flag == abs(flag)
assert isinstance(start, int) and start == abs(start)
return handler.read()
def mock_get_number_of_event_log_records(handler):
return len(handler)
@skipIf(not salt.utils.platform.is_windows(), "Windows is required")
class WinEventViewerSimpleTestCase(TestCase, LoaderModuleMockMixin):
'''
Test cases for salt.states.win_iis
'''
def setup_loader_modules(self):
return {win_event_viewer: {}}
def test__str_to_bytes(self):
data = {'key1': 'item1',
'key2': [1, 2, 'item2'],
'key3': 45,
45: str}
new_data = win_event_viewer._change_str_to_bytes(data, 'utf-8', False)
self.assertTrue('key1' in new_data)
self.assertEqual(new_data['key1'], 'item1'.encode('utf-8'))
self.assertEqual(new_data['key2'][2], 'item2'.encode('utf-8'))
def test_2__str_to_bytes(self):
data = {'key1': 'item1',
'key2': [1, 2, 'item2'],
'key3': 45,
45: str}
new_data = win_event_viewer._change_str_to_bytes(data, 'CP1252', True)
self.assertTrue('key1'.encode('CP1252') in new_data)
self.assertTrue('key2'.encode('CP1252') in new_data)
self.assertTrue('key3'.encode('CP1252') in new_data)
self.assertEqual(new_data['key1'.encode('CP1252')], 'item1'.encode('CP1252'))
self.assertEqual(new_data['key2'.encode('CP1252')][2], 'item2'.encode('CP1252'))
def test__get_raw_time(self):
mock_time = MockTime(2019, 7, 2, 10, 8, 19)
raw_time = win_event_viewer._get_raw_time(mock_time)
self.assertEqual(raw_time, (2019, 7, 2, 10, 8, 19))
@skipIf(NO_MOCK, NO_MOCK_REASON)
@skipIf(not salt.utils.platform.is_windows(), "Windows is required")
class WinEventViewerMockTestCase(TestCase, LoaderModuleMockMixin):
'''
Test cases for salt.states.win_iis
'''
def setup_loader_modules(self):
return {win_event_viewer: {}}
def test__get_event_handler(self):
with patch.object(win32evtlog, 'OpenEventLog', return_value=MockHandler()) as open_event_log:
ret = win_event_viewer._get_event_handler("System", None)
open_event_log.called_once_with("System", None)
self.assertIsInstance(ret, MockHandler)
def test_fail__get_event_handler(self):
with patch.object(win32evtlog, 'OpenEventLog', side_effect=pywintypes.error()) as open_event_log:
open_event_log.called_once_with("System", None)
self.assertRaises(FileNotFoundError, win_event_viewer._get_event_handler, "System")
@staticmethod
def test__close_event_handler():
with patch.object(win32evtlog, 'CloseEventLog', return_value=None) as close_event_log:
handler = MockHandler()
win_event_viewer._close_event_handler(MockHandler())
close_event_log.called_once_with(handler)
def test_get_event_generator(self):
handler = MockHandler(EVENTS)
with patch.object(win_event_viewer, "_get_event_handler", return_value=handler) as _get_event_handler:
with patch.object(win_event_viewer, "_close_event_handler", return_value=None) as _close_event_handler:
with patch.object(win32evtlog, 'ReadEventLog', wraps=mock_read_event_log) as ReadEventLog:
with patch.object(win32evtlog,
'GetNumberOfEventLogRecords',
wraps=mock_get_number_of_event_log_records) as GetNumberOfEventLogRecords:
ret = tuple(win_event_viewer.get_event_generator("System", target_computer=None, raw=False))
self.assertIsInstance(ret, tuple)
self.assertTrue(all([isinstance(event, dict)for event in ret]))
_get_event_handler.assert_called_once_with('System', None)
_close_event_handler.assert_called_once_with(handler)
self.assertEqual(ReadEventLog.call_count, len(handler))
self.assertEqual(GetNumberOfEventLogRecords.call_count, len(handler) + 1)
def test_raw_get_event_generator(self):
handler = MockHandler(EVENTS)
with patch.object(win_event_viewer, "_get_event_handler", return_value=handler) as _get_event_handler:
with patch.object(win_event_viewer, "_close_event_handler", return_value=None) as _close_event_handler:
with patch.object(win32evtlog, 'ReadEventLog', wraps=mock_read_event_log) as ReadEventLog:
with patch.object(win32evtlog,
'GetNumberOfEventLogRecords',
wraps=mock_get_number_of_event_log_records) as GetNumberOfEventLogRecords:
ret = tuple(win_event_viewer.get_event_generator("System", target_computer=None, raw=True))
self.assertIsInstance(ret, tuple)
self.assertTrue(all([isinstance(event, MockEvent) for event in ret]))
_get_event_handler.assert_called_once_with('System', None)
_close_event_handler.assert_called_once_with(handler)
self.assertEqual(ReadEventLog.call_count, len(handler))
self.assertEqual(GetNumberOfEventLogRecords.call_count, len(handler) + 1)
def test_get_events(self):
handler = MockHandler(EVENTS)
with patch.object(win_event_viewer, "_get_event_handler", return_value=handler) as _get_event_handler:
with patch.object(win_event_viewer, "_close_event_handler", return_value=None) as _close_event_handler:
with patch.object(win32evtlog, 'ReadEventLog', wraps=mock_read_event_log) as ReadEventLog:
with patch.object(win32evtlog,
'GetNumberOfEventLogRecords',
wraps=mock_get_number_of_event_log_records) as GetNumberOfEventLogRecords:
ret = win_event_viewer.get_events("System", target_computer=None, raw=False)
self.assertIsInstance(ret, tuple)
self.assertTrue(all([isinstance(event, dict)for event in ret]))
_get_event_handler.assert_called_once_with('System', None)
_close_event_handler.assert_called_once_with(handler)
self.assertEqual(ReadEventLog.call_count, len(handler))
self.assertEqual(GetNumberOfEventLogRecords.call_count, len(handler) + 1)
def test_raw_get_events(self):
handler = MockHandler(EVENTS)
with patch.object(win_event_viewer, "_get_event_handler", return_value=handler) as _get_event_handler:
with patch.object(win_event_viewer, "_close_event_handler", return_value=None) as _close_event_handler:
with patch.object(win32evtlog, 'ReadEventLog', wraps=mock_read_event_log) as ReadEventLog:
with patch.object(win32evtlog,
'GetNumberOfEventLogRecords',
wraps=mock_get_number_of_event_log_records) as GetNumberOfEventLogRecords:
ret = win_event_viewer.get_events("System", target_computer=None, raw=True)
self.assertIsInstance(ret, tuple)
self.assertTrue(all([isinstance(event, MockEvent) for event in ret]))
_get_event_handler.assert_called_once_with('System', None)
_close_event_handler.assert_called_once_with(handler)
self.assertEqual(ReadEventLog.call_count, len(handler))
self.assertEqual(GetNumberOfEventLogRecords.call_count, len(handler) + 1)
def test_get_event_sorted_by_info_generator(self):
handler = MockHandler(EVENTS)
with patch.object(win_event_viewer, "_get_event_handler", return_value=handler) as _get_event_handler:
with patch.object(win_event_viewer, "_close_event_handler", return_value=None) as _close_event_handler:
with patch.object(win32evtlog, 'ReadEventLog', wraps=mock_read_event_log) as ReadEventLog:
with patch.object(win32evtlog,
'GetNumberOfEventLogRecords',
wraps=mock_get_number_of_event_log_records) as GetNumberOfEventLogRecords:
ret = tuple(win_event_viewer.get_event_sorted_by_info_generator("System", target_computer=None))
self.assertIsInstance(ret, tuple)
self.assertTrue(all([isinstance(event_info, tuple) for event_info in ret]))
_get_event_handler.assert_called_once_with('System', None)
_close_event_handler.assert_called_once_with(handler)
self.assertEqual(ReadEventLog.call_count, len(handler))
self.assertEqual(GetNumberOfEventLogRecords.call_count, len(handler) + 1)
self.assertEqual(ret[1][1]['eventCategory'], 404)
self.assertEqual(ret[2][1]['stringInserts'], (b'fail...', b'error...'))
self.assertEqual(ret[4][1]['eventID'], 5)
self.assertEqual(ret[5][1]['computerName'], b'sky')
self.assertEqual(ret[5][1]['timeGenerated'], (1997, 8, 29, 2, 14, 0))
def test_get_events_sorted_by_info(self):
handler = MockHandler(EVENTS)
with patch.object(win_event_viewer, "_get_event_handler", return_value=handler) as _get_event_handler:
with patch.object(win_event_viewer, "_close_event_handler", return_value=None) as _close_event_handler:
with patch.object(win32evtlog, 'ReadEventLog', wraps=mock_read_event_log) as ReadEventLog:
with patch.object(win32evtlog,
'GetNumberOfEventLogRecords',
wraps=mock_get_number_of_event_log_records) as GetNumberOfEventLogRecords:
ret = win_event_viewer.get_events_sorted_by_info("System", target_computer=None)
self.assertIsInstance(ret, dict)
_get_event_handler.assert_called_once_with('System', None)
_close_event_handler.assert_called_once_with(handler)
self.assertEqual(ReadEventLog.call_count, len(handler))
self.assertEqual(GetNumberOfEventLogRecords.call_count, len(handler) + 1)
self.assertEqual(ret['eventID'][5], [{'closingRecordNumber': 0,
'computerName': b'PC',
'data': b'',
'eventCategory': 300,
'eventID': 5,
'eventType': 4,
'recordNumber': 0,
'reserved': 4,
'reservedFlags': 0,
'sid': None,
'sourceName': 0,
'stringInserts': (b'cat', b'm'),
'timeGenerated': (2000, 1, 1, 1, 1, 1),
'timeWritten': (2000, 1, 1, 1, 1, 1)}])
self.assertEqual(ret['computerName'][b'sky'],
[{'closingRecordNumber': 0,
'computerName': b'sky',
'data': b'',
'eventCategory': 117,
'eventID': 101,
'eventType': 4,
'recordNumber': 0,
'reserved': 4,
'reservedFlags': 0,
'sid': None,
'sourceName': 0,
'stringInserts': (b"'I am a live!'", b'launching...'),
'timeGenerated': (1997, 8, 29, 2, 14, 0),
'timeWritten': (2000, 1, 1, 1, 1, 1)}])
def test_get_event_filter_generator(self):
handler = MockHandler(EVENTS)
with patch.object(win_event_viewer, "_get_event_handler", return_value=handler) as _get_event_handler:
with patch.object(win_event_viewer, "_close_event_handler", return_value=None) as _close_event_handler:
with patch.object(win32evtlog, 'ReadEventLog', wraps=mock_read_event_log) as ReadEventLog:
with patch.object(win32evtlog,
'GetNumberOfEventLogRecords',
wraps=mock_get_number_of_event_log_records) as GetNumberOfEventLogRecords:
ret = tuple(win_event_viewer.get_event_filter_generator("System",
target_computer=None,
all_requirements=True,
eventID=101,
sid=None))
self.assertIsInstance(ret, tuple)
_get_event_handler.assert_called_once_with('System', None)
_close_event_handler.assert_called_once_with(handler)
self.assertEqual(ReadEventLog.call_count, len(handler))
self.assertEqual(GetNumberOfEventLogRecords.call_count, len(handler) + 1)
self.assertEqual(len(ret), 4)
def test_all_get_event_filter_generator(self):
handler = MockHandler(EVENTS)
with patch.object(win_event_viewer, "_get_event_handler", return_value=handler) as _get_event_handler:
with patch.object(win_event_viewer, "_close_event_handler", return_value=None) as _close_event_handler:
with patch.object(win32evtlog, 'ReadEventLog', wraps=mock_read_event_log) as ReadEventLog:
with patch.object(win32evtlog,
'GetNumberOfEventLogRecords',
wraps=mock_get_number_of_event_log_records) as GetNumberOfEventLogRecords:
ret = tuple(win_event_viewer.get_event_filter_generator("System",
target_computer=None,
all_requirements=False,
eventID=101,
sid=None))
self.assertIsInstance(ret, tuple)
_get_event_handler.assert_called_once_with('System', None)
_close_event_handler.assert_called_once_with(handler)
self.assertEqual(ReadEventLog.call_count, len(handler))
self.assertEqual(GetNumberOfEventLogRecords.call_count, len(handler) + 1)
self.assertEqual(len(ret), 6)
def test_get_events_filter(self):
handler = MockHandler(EVENTS)
with patch.object(win_event_viewer, "_get_event_handler", return_value=handler) as _get_event_handler:
with patch.object(win_event_viewer, "_close_event_handler", return_value=None) as _close_event_handler:
with patch.object(win32evtlog, 'ReadEventLog', wraps=mock_read_event_log) as ReadEventLog:
with patch.object(win32evtlog,
'GetNumberOfEventLogRecords',
wraps=mock_get_number_of_event_log_records) as GetNumberOfEventLogRecords:
ret = tuple(win_event_viewer.get_events_filter("System",
target_computer=None,
all_requirements=True,
eventID=101,
sid=None))
self.assertIsInstance(ret, tuple)
_get_event_handler.assert_called_once_with('System', None)
_close_event_handler.assert_called_once_with(handler)
self.assertEqual(ReadEventLog.call_count, len(handler))
self.assertEqual(GetNumberOfEventLogRecords.call_count, len(handler) + 1)
self.assertEqual(len(ret), 4)
def test_all_get_events_filter(self):
handler = MockHandler(EVENTS)
with patch.object(win_event_viewer, "_get_event_handler", return_value=handler) as _get_event_handler:
with patch.object(win_event_viewer, "_close_event_handler", return_value=None) as _close_event_handler:
with patch.object(win32evtlog, 'ReadEventLog', wraps=mock_read_event_log) as ReadEventLog:
with patch.object(win32evtlog,
'GetNumberOfEventLogRecords',
wraps=mock_get_number_of_event_log_records) as GetNumberOfEventLogRecords:
ret = tuple(win_event_viewer.get_events_filter("System",
target_computer=None,
all_requirements=False,
eventID=101,
sid=None))
self.assertIsInstance(ret, tuple)
_get_event_handler.assert_called_once_with('System', None)
_close_event_handler.assert_called_once_with(handler)
self.assertEqual(ReadEventLog.call_count, len(handler))
self.assertEqual(GetNumberOfEventLogRecords.call_count, len(handler) + 1)
self.assertEqual(len(ret), 6)
@staticmethod
def test_clear_log():
with patch.object(win32evtlogutil, 'ReportEvent', return_value=None) as ReportEvent:
win_event_viewer.log_event('salt', 117, eventType=1)
ReportEvent.assert_called_once_with('salt', 117, eventType=1)
@staticmethod
def test_log_event():
handler = MockHandler(EVENTS)
with patch.object(win_event_viewer, "_get_event_handler", return_value=handler) as _get_event_handler:
with patch.object(win_event_viewer, "_close_event_handler", return_value=None) as _close_event_handler:
with patch.object(win32evtlog, 'ClearEventLog', return_value=None) as ClearEventLog:
win_event_viewer.clear_log('System', target_computer=None)
_get_event_handler.assert_called_once_with('System', None)
_close_event_handler.assert_called_once_with(handler)
ClearEventLog.assert_called_once_with(handler, 'System')
@staticmethod
def test_get_number_of_events():
handler = MockHandler(EVENTS)
with patch.object(win_event_viewer, "_get_event_handler", return_value=handler) as _get_event_handler:
with patch.object(win_event_viewer, "_close_event_handler", return_value=None) as _close_event_handler:
with patch.object(win32evtlog, 'GetNumberOfEventLogRecords',
return_value=None) as GetNumberOfEventLogRecords:
win_event_viewer.get_number_of_events('System', target_computer=None)
_get_event_handler.assert_called_once_with('System', None)
_close_event_handler.assert_called_once_with(handler)
GetNumberOfEventLogRecords.assert_called_once_with(handler)
@destructiveTest
@skipIf(not salt.utils.platform.is_windows(), "Windows is required")
class WinEventViewerDestructiveTestCase(TestCase, LoaderModuleMockMixin):
'''
Test cases for salt.states.win_iis
All test are destructive because their actions will add events to the security log.
Even if the method is only reading a log or getting the size of a log.
DONT!!!!! add a "clear log" test for security reasons!!!!!
'''
def setup_loader_modules(self):
return {win_event_viewer: {}}
def test_get_event_generator(self):
for number, event in enumerate(win_event_viewer.get_event_generator("System",
target_computer=None,
raw=False)):
self.assertIsInstance(event, dict)
for event_part in win_event_viewer.EVENT_PARTS:
self.assertTrue(event_part in event)
if number == MAX_EVENT_LOOK_UP:
break
def test_raw_get_event_generator(self):
# TODO: type(event) does not work as of 7/3/2019.
# its a upstream problem and python will crash if you call it
# when this upstream problem is no more add a self.assertIsInstance pls
for number, event in enumerate(win_event_viewer.get_event_generator("System",
target_computer=None,
raw=True)):
for event_part in win_event_viewer.EVENT_PARTS:
self.assertTrue(hasattr(event, event_part[0].upper() + event_part[1:]))
if number == MAX_EVENT_LOOK_UP:
break
def test_get_event_sorted_by_info_generator(self):
for number, ret in enumerate(win_event_viewer.get_event_sorted_by_info_generator("Application",
target_computer=None)):
event, event_info = ret[0], ret[1]
for event_part in win_event_viewer.EVENT_PARTS:
self.assertEqual(event[event_part], event_info[event_part])
for event_part in win_event_viewer.TIME_PARTS:
self.assertTrue(event_part in event_info)
if number == MAX_EVENT_LOOK_UP:
break
def test_get_event_filter_generator(self):
for number, event in enumerate(win_event_viewer.get_event_filter_generator("System",
target_computer=None,
all_requirements=True,
hour=3,
eventID=37)):
self.assertEqual(event['timeGenerated'][3], 3)
self.assertEqual(event['eventID'], 37)
if number == MAX_EVENT_LOOK_UP:
break
def test_all_get_event_filter_generator(self):
for number, event in enumerate(win_event_viewer.get_event_filter_generator("System",
target_computer=None,
all_requirements=False,
hour=3,
eventID=37)):
self.assertTrue(event['timeGenerated'][3] == 3 or event['eventID'] == 37)
if number == MAX_EVENT_LOOK_UP:
break
@staticmethod
def test_log_event():
'''
Does not check for event because
* the log can be slow to update
* the log can be cleared at anytime
* I dont want to add a flaky test
'''
win_event_viewer.log_event('salt_test', event_id=117)
def test_get_number_of_events(self):
event_count = win_event_viewer.get_number_of_events("Application", target_computer=None)
self.assertIsInstance(event_count, int)
self.assertEqual(event_count, abs(event_count))