diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 3045ae4db1e..1d9a4689e9e 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -532,9 +532,10 @@ class AsyncZeroMQPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.t payload = self.serial.loads(messages[0]) # 2 includes a header which says who should do it elif messages_len == 2: - if (self.opts.get('__role') != 'syndic' and messages[0] not in ('broadcast', self.hexid)) or \ - (self.opts.get('__role') == 'syndic' and messages[0] not in ('broadcast', 'syndic')): - log.debug('Publish received for not this minion: %s', messages[0]) + message_target = salt.utils.stringutils.to_str(messages[0]) + if (self.opts.get('__role') != 'syndic' and message_target not in ('broadcast', self.hexid)) or \ + (self.opts.get('__role') == 'syndic' and message_target not in ('broadcast', 'syndic')): + log.debug('Publish received for not this minion: %s', message_target) raise tornado.gen.Return(None) payload = self.serial.loads(messages[1]) else: @@ -916,7 +917,7 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel): log.trace('Sending filtered data over publisher %s', pub_uri) # zmq filters are substring match, hash the topic # to avoid collisions - htopic = salt.utils.stringutils.to_bytes(hashlib.sha1(topic).hexdigest()) + htopic = salt.utils.stringutils.to_bytes(hashlib.sha1(salt.utils.stringutils.to_bytes(topic)).hexdigest()) pub_sock.send(htopic, flags=zmq.SNDMORE) pub_sock.send(payload) log.trace('Filtered data has been sent') diff --git a/tests/unit/transport/test_zeromq.py b/tests/unit/transport/test_zeromq.py index 2ed77c8860c..89ce04e9a13 100644 --- a/tests/unit/transport/test_zeromq.py +++ b/tests/unit/transport/test_zeromq.py @@ -436,7 +436,7 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin): del self.process_manager @staticmethod - def _gather_results(opts, pub_uri, results, timeout=120): + def _gather_results(opts, pub_uri, results, timeout=120, messages=None): ''' Gather results until then number of seconds specified by timeout passes without reveiving a message @@ -455,6 +455,10 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin): except zmq.ZMQError: time.sleep(.01) else: + if messages: + if messages != 1: + messages -= 1 + continue payload = crypticle.loads(serial.loads(payload)['load']) if 'stop' in payload: break @@ -493,6 +497,94 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin): server_channel.pub_close() assert len(results) == send_num, (len(results), set(expect).difference(results)) + def test_zeromq_zeromq_filtering_decode_message_no_match(self): + ''' + test AsyncZeroMQPubChannel _decode_messages when + zmq_filtering enabled and minion does not match + ''' + message = [b'4f26aeafdb2367620a393c973eddbe8f8b846eb', + b'\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf' + b'\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2' + b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg' + b'\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d' + b'\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>' + b'\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D'] + + opts = dict(self.master_config, ipc_mode='ipc', + pub_hwm=0, zmq_filtering=True, recon_randomize=False, + recon_default=1, recon_max=2, master_ip='127.0.0.1', + acceptance_wait_time=5, acceptance_wait_time_max=5) + opts['master_uri'] = 'tcp://{interface}:{publish_port}'.format(**opts) + + server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts) + with patch('salt.crypt.AsyncAuth.crypticle', + MagicMock(return_value={'tgt_type': 'glob', 'tgt': '*', + 'jid': 1})) as mock_test: + res = server_channel._decode_messages(message) + assert res.result() is None + + def test_zeromq_zeromq_filtering_decode_message(self): + ''' + test AsyncZeroMQPubChannel _decode_messages + when zmq_filtered enabled + ''' + message = [b'4f26aeafdb2367620a393c973eddbe8f8b846ebd', + b'\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf' + b'\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2' + b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg' + b'\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d' + b'\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>' + b'\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D'] + + opts = dict(self.master_config, ipc_mode='ipc', + pub_hwm=0, zmq_filtering=True, recon_randomize=False, + recon_default=1, recon_max=2, master_ip='127.0.0.1', + acceptance_wait_time=5, acceptance_wait_time_max=5) + opts['master_uri'] = 'tcp://{interface}:{publish_port}'.format(**opts) + + server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts) + with patch('salt.crypt.AsyncAuth.crypticle', + MagicMock(return_value={'tgt_type': 'glob', 'tgt': '*', + 'jid': 1})) as mock_test: + res = server_channel._decode_messages(message) + + assert res.result()['enc'] == 'aes' + + @skipIf(salt.utils.platform.is_windows(), 'Skip on Windows OS') + def test_zeromq_filtering(self): + ''' + Test sending messags to publisher using UDP + with zeromq_filtering enabled + ''' + opts = dict(self.master_config, ipc_mode='ipc', + pub_hwm=0, zmq_filtering=True, acceptance_wait_time=5) + server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts) + server_channel.pre_fork(self.process_manager, kwargs={ + 'log_queue': salt.log.setup.get_multiprocessing_logging_queue() + }) + pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts) + send_num = 1 + expect = [] + results = [] + gather = threading.Thread(target=self._gather_results, + args=(self.minion_config, pub_uri, results,), + kwargs={'messages': 2}) + gather.start() + # Allow time for server channel to start, especially on windows + time.sleep(2) + expect.append(send_num) + load = {'tgt_type': 'glob', 'tgt': '*', 'jid': send_num} + with patch('salt.utils.minions.CkMinions.check_minions', + MagicMock(return_value={'minions': ['minion'], 'missing': [], + 'ssh_minions': False})): + server_channel.publish(load) + server_channel.publish( + {'tgt_type': 'glob', 'tgt': '*', 'stop': True} + ) + gather.join() + server_channel.pub_close() + assert len(results) == send_num, (len(results), set(expect).difference(results)) + def test_publish_to_pubserv_tcp(self): ''' Test sending 10K messags to ZeroMQPubServerChannel using TCP transport