mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Merge branch '2019.2.1' of https://github.com/tanlingyun2005/salt into 2019.2.1
This commit is contained in:
commit
65ee219318
2 changed files with 27 additions and 10 deletions
|
@ -35,4 +35,4 @@ class ReactorTest(ModuleCase, SaltMinionEventAssertsMixin):
|
|||
|
||||
e.fire_event({'a': 'b'}, '/test_event')
|
||||
|
||||
self.assertMinionEventReceived({'a': 'b'})
|
||||
self.assertMinionEventReceived({'a': 'b'}, timeout=30)
|
||||
|
|
|
@ -47,6 +47,7 @@ from salt._compat import ElementTree as etree
|
|||
# Import 3rd-party libs
|
||||
from salt.ext import six
|
||||
from salt.ext.six.moves import zip # pylint: disable=import-error,redefined-builtin
|
||||
from salt.ext.six.moves.queue import Empty # pylint: disable=import-error,no-name-in-module
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -660,6 +661,14 @@ def _fetch_events(q):
|
|||
sock_dir=a_config.get_config('minion')['sock_dir'],
|
||||
opts=a_config.get_config('minion'),
|
||||
)
|
||||
|
||||
# Wait for event bus to be connected
|
||||
while not event.connect_pull(30):
|
||||
time.sleep(1)
|
||||
|
||||
# Notify parent process that the event bus is connected
|
||||
q.put('CONNECTED')
|
||||
|
||||
while True:
|
||||
try:
|
||||
events = event.get_event(full=False)
|
||||
|
@ -682,6 +691,11 @@ class SaltMinionEventAssertsMixin(object):
|
|||
target=_fetch_events, args=(cls.q,)
|
||||
)
|
||||
cls.fetch_proc.start()
|
||||
# Wait for the event bus to be connected
|
||||
msg = cls.q.get(block=True)
|
||||
if msg != 'CONNECTED':
|
||||
# Just in case something very bad happens
|
||||
raise RuntimeError('Unexpected message in test\'s event queue')
|
||||
return object.__new__(cls)
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
|
@ -691,19 +705,22 @@ class SaltMinionEventAssertsMixin(object):
|
|||
#TODO
|
||||
raise salt.exceptions.NotImplemented('assertMinionEventFired() not implemented')
|
||||
|
||||
def assertMinionEventReceived(self, desired_event):
|
||||
queue_wait = 5 # 2.5s
|
||||
while self.q.empty():
|
||||
time.sleep(0.5) # Wait for events to be pushed into the queue
|
||||
queue_wait -= 1
|
||||
if queue_wait <= 0:
|
||||
raise AssertionError('Queue wait timer expired')
|
||||
while not self.q.empty(): # This is not thread-safe and may be inaccurate
|
||||
event = self.q.get()
|
||||
def assertMinionEventReceived(self, desired_event, timeout=5, sleep_time=0.5):
|
||||
start = time.time()
|
||||
while True:
|
||||
try:
|
||||
event = self.q.get(False)
|
||||
except Empty:
|
||||
time.sleep(sleep_time)
|
||||
if time.time() - start >= timeout:
|
||||
break
|
||||
continue
|
||||
if isinstance(event, dict):
|
||||
event.pop('_stamp')
|
||||
if desired_event == event:
|
||||
self.fetch_proc.terminate()
|
||||
return True
|
||||
if time.time() - start >= timeout:
|
||||
break
|
||||
self.fetch_proc.terminate()
|
||||
raise AssertionError('Event {0} was not received by minion'.format(desired_event))
|
||||
|
|
Loading…
Add table
Reference in a new issue