Fix vent assertion race condition

This commit is contained in:
Daniel A. Wozniak 2019-04-19 16:16:01 +00:00
parent 4859e6c9f0
commit 423f6f96b7
No known key found for this signature in database
GPG key ID: 166B9D2C06C82D61
2 changed files with 26 additions and 10 deletions

View file

@ -35,4 +35,4 @@ class ReactorTest(ModuleCase, SaltMinionEventAssertsMixin):
e.fire_event({'a': 'b'}, '/test_event')
self.assertMinionEventReceived({'a': 'b'})
self.assertMinionEventReceived({'a': 'b'}, timeout=30)

View file

@ -660,6 +660,14 @@ def _fetch_events(q):
sock_dir=a_config.get_config('minion')['sock_dir'],
opts=a_config.get_config('minion'),
)
# Wait for event bus to be connected
while not event.connect_pull(30):
time.sleep(1)
# Notify parent process that the event bus is connected
q.put('CONNECTED')
while True:
try:
events = event.get_event(full=False)
@ -682,6 +690,11 @@ class SaltMinionEventAssertsMixin(object):
target=_fetch_events, args=(cls.q,)
)
cls.fetch_proc.start()
# Wait for the event bus to be connected
msg = cls.q.get(block=True)
if msg != 'CONNECTED':
# Just in case something very bad happens
raise RuntimeError('Unexpected message in test\'s event queue')
return object.__new__(cls)
def __exit__(self, *args, **kwargs):
@ -691,19 +704,22 @@ class SaltMinionEventAssertsMixin(object):
#TODO
raise salt.exceptions.NotImplemented('assertMinionEventFired() not implemented')
def assertMinionEventReceived(self, desired_event):
queue_wait = 5 # 2.5s
while self.q.empty():
time.sleep(0.5) # Wait for events to be pushed into the queue
queue_wait -= 1
if queue_wait <= 0:
raise AssertionError('Queue wait timer expired')
while not self.q.empty(): # This is not thread-safe and may be inaccurate
event = self.q.get()
def assertMinionEventReceived(self, desired_event, timeout=5, sleep_time=0.5):
start = time.time()
while True:
try:
event = self.q.get(False)
except Empty:
time.sleep(sleep_time)
if time.time() - start >= timeout:
break
continue
if isinstance(event, dict):
event.pop('_stamp')
if desired_event == event:
self.fetch_proc.terminate()
return True
if time.time() - start >= timeout:
break
self.fetch_proc.terminate()
raise AssertionError('Event {0} was not received by minion'.format(desired_event))