mirror of
https://github.com/saltstack/salt.git
synced 2025-04-16 17:50:20 +00:00
Add zmq_filter unit tests
This commit is contained in:
parent
805c5eeb08
commit
784786d004
1 changed files with 95 additions and 1 deletions
|
@ -436,7 +436,7 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
|
|||
del self.process_manager
|
||||
|
||||
@staticmethod
|
||||
def _gather_results(opts, pub_uri, results, timeout=120):
|
||||
def _gather_results(opts, pub_uri, results, timeout=20, messages=None):
|
||||
'''
|
||||
Gather results until then number of seconds specified by timeout passes
|
||||
without reveiving a message
|
||||
|
@ -455,6 +455,10 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
|
|||
except zmq.ZMQError:
|
||||
time.sleep(.01)
|
||||
else:
|
||||
if messages:
|
||||
if messages != 1:
|
||||
messages -= 1
|
||||
continue
|
||||
payload = crypticle.loads(serial.loads(payload)['load'])
|
||||
if 'stop' in payload:
|
||||
break
|
||||
|
@ -493,6 +497,96 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
|
|||
server_channel.pub_close()
|
||||
assert len(results) == send_num, (len(results), set(expect).difference(results))
|
||||
|
||||
|
||||
def test_zeromq_zeromq_filtering_decode_message_no_match(self):
|
||||
'''
|
||||
test AsyncZeroMQPubChannel _decode_messages when
|
||||
zmq_filtering enabled and minion does not match
|
||||
'''
|
||||
message = [b'4f26aeafdb2367620a393c973eddbe8f8b846eb',
|
||||
b'\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf'
|
||||
b'\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2'
|
||||
b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
|
||||
b'\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d'
|
||||
b'\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>'
|
||||
b'\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D']
|
||||
|
||||
opts = dict(self.master_config, ipc_mode='ipc',
|
||||
pub_hwm=0, zmq_filtering=True, recon_randomize=False,
|
||||
recon_default=1, recon_max=2, master_ip='127.0.0.1',
|
||||
acceptance_wait_time=5, acceptance_wait_time_max=5)
|
||||
opts['master_uri'] = 'tcp://{interface}:{publish_port}'.format(**opts)
|
||||
|
||||
server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
|
||||
with patch('salt.crypt.AsyncAuth.crypticle',
|
||||
MagicMock(return_value={'tgt_type': 'glob', 'tgt': '*',
|
||||
'jid': 1})) as mock_test:
|
||||
res = server_channel._decode_messages(message)
|
||||
assert res.result() is None
|
||||
|
||||
def test_zeromq_zeromq_filtering_decode_message(self):
|
||||
'''
|
||||
test AsyncZeroMQPubChannel _decode_messages
|
||||
when zmq_filtered enabled
|
||||
'''
|
||||
message = [b'4f26aeafdb2367620a393c973eddbe8f8b846ebd',
|
||||
b'\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf'
|
||||
b'\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2'
|
||||
b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
|
||||
b'\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d'
|
||||
b'\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>'
|
||||
b'\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D']
|
||||
|
||||
opts = dict(self.master_config, ipc_mode='ipc',
|
||||
pub_hwm=0, zmq_filtering=True, recon_randomize=False,
|
||||
recon_default=1, recon_max=2, master_ip='127.0.0.1',
|
||||
acceptance_wait_time=5, acceptance_wait_time_max=5)
|
||||
opts['master_uri'] = 'tcp://{interface}:{publish_port}'.format(**opts)
|
||||
|
||||
server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
|
||||
with patch('salt.crypt.AsyncAuth.crypticle',
|
||||
MagicMock(return_value={'tgt_type': 'glob', 'tgt': '*',
|
||||
'jid': 1})) as mock_test:
|
||||
res = server_channel._decode_messages(message)
|
||||
|
||||
assert res.result()['enc'] == 'aes'
|
||||
|
||||
@skipIf(salt.utils.platform.is_windows(), 'Skip on Windows OS')
|
||||
def test_zeromq_filtering(self):
|
||||
'''
|
||||
Test sending messags to publisher using UDP
|
||||
with zeromq_filtering enabled
|
||||
'''
|
||||
opts = dict(self.master_config, ipc_mode='ipc',
|
||||
pub_hwm=0, zmq_filtering=True, acceptance_wait_time=5)
|
||||
server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
|
||||
server_channel.pre_fork(self.process_manager, kwargs={
|
||||
'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
|
||||
})
|
||||
pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts)
|
||||
send_num = 1
|
||||
expect = []
|
||||
results = []
|
||||
gather = threading.Thread(target=self._gather_results,
|
||||
args=(self.minion_config, pub_uri, results,),
|
||||
kwargs={'messages': 2})
|
||||
gather.start()
|
||||
# Allow time for server channel to start, especially on windows
|
||||
time.sleep(2)
|
||||
expect.append(send_num)
|
||||
load = {'tgt_type': 'glob', 'tgt': '*', 'jid': send_num}
|
||||
with patch('salt.utils.minions.CkMinions.check_minions',
|
||||
MagicMock(return_value={'minions': ['minion'], 'missing': [],
|
||||
'ssh_minions': False})):
|
||||
server_channel.publish(load)
|
||||
server_channel.publish(
|
||||
{'tgt_type': 'glob', 'tgt': '*', 'stop': True}
|
||||
)
|
||||
gather.join()
|
||||
server_channel.pub_close()
|
||||
assert len(results) == send_num, (len(results), set(expect).difference(results))
|
||||
|
||||
|
||||
def test_publish_to_pubserv_tcp(self):
|
||||
'''
|
||||
Test sending 10K messags to ZeroMQPubServerChannel using TCP transport
|
||||
|
|
Loading…
Add table
Reference in a new issue