mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Fix vent assertion race condition
This commit is contained in:
parent
4859e6c9f0
commit
423f6f96b7
2 changed files with 26 additions and 10 deletions
|
@ -35,4 +35,4 @@ class ReactorTest(ModuleCase, SaltMinionEventAssertsMixin):
|
|||
|
||||
e.fire_event({'a': 'b'}, '/test_event')
|
||||
|
||||
self.assertMinionEventReceived({'a': 'b'})
|
||||
self.assertMinionEventReceived({'a': 'b'}, timeout=30)
|
||||
|
|
|
@ -660,6 +660,14 @@ def _fetch_events(q):
|
|||
sock_dir=a_config.get_config('minion')['sock_dir'],
|
||||
opts=a_config.get_config('minion'),
|
||||
)
|
||||
|
||||
# Wait for event bus to be connected
|
||||
while not event.connect_pull(30):
|
||||
time.sleep(1)
|
||||
|
||||
# Notify parent process that the event bus is connected
|
||||
q.put('CONNECTED')
|
||||
|
||||
while True:
|
||||
try:
|
||||
events = event.get_event(full=False)
|
||||
|
@ -682,6 +690,11 @@ class SaltMinionEventAssertsMixin(object):
|
|||
target=_fetch_events, args=(cls.q,)
|
||||
)
|
||||
cls.fetch_proc.start()
|
||||
# Wait for the event bus to be connected
|
||||
msg = cls.q.get(block=True)
|
||||
if msg != 'CONNECTED':
|
||||
# Just in case something very bad happens
|
||||
raise RuntimeError('Unexpected message in test\'s event queue')
|
||||
return object.__new__(cls)
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
|
@ -691,19 +704,22 @@ class SaltMinionEventAssertsMixin(object):
|
|||
#TODO
|
||||
raise salt.exceptions.NotImplemented('assertMinionEventFired() not implemented')
|
||||
|
||||
def assertMinionEventReceived(self, desired_event):
|
||||
queue_wait = 5 # 2.5s
|
||||
while self.q.empty():
|
||||
time.sleep(0.5) # Wait for events to be pushed into the queue
|
||||
queue_wait -= 1
|
||||
if queue_wait <= 0:
|
||||
raise AssertionError('Queue wait timer expired')
|
||||
while not self.q.empty(): # This is not thread-safe and may be inaccurate
|
||||
event = self.q.get()
|
||||
def assertMinionEventReceived(self, desired_event, timeout=5, sleep_time=0.5):
|
||||
start = time.time()
|
||||
while True:
|
||||
try:
|
||||
event = self.q.get(False)
|
||||
except Empty:
|
||||
time.sleep(sleep_time)
|
||||
if time.time() - start >= timeout:
|
||||
break
|
||||
continue
|
||||
if isinstance(event, dict):
|
||||
event.pop('_stamp')
|
||||
if desired_event == event:
|
||||
self.fetch_proc.terminate()
|
||||
return True
|
||||
if time.time() - start >= timeout:
|
||||
break
|
||||
self.fetch_proc.terminate()
|
||||
raise AssertionError('Event {0} was not received by minion'.format(desired_event))
|
||||
|
|
Loading…
Add table
Reference in a new issue