mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00

* Check only ssh-rsa encyption for set_known_host * Windows test fix * Fix pre-commit * add CentOS Stream to _OS_FAMILY_MAP, fix #59161 * added changelog and test * fix syntax * Use centosstream 8 for testing * Use ? for matching spaces Technically this isn't *quite* right as 'CentOSyStream' would also match, but it's pretty reasonable: - OS grains shouldn't ever be that kind of close - This test is only swapping out spaces, and only for the os grain. That would mean there would have to be two OSes with grains that only differ by one having a space where another one has any other character. - This test really isn't even about matching grains, we're just using compound matching and that's a reasonable one to use. * Add centos stream when detecting package manager name * Fix pre-commit * Remove tests for fedora 32/33 EOL * Remove tests for fedora 32/33 EOL * Remove tests for fedora 33 EOL * Use centosstream 8 for testing * Use ? for matching spaces Technically this isn't *quite* right as 'CentOSyStream' would also match, but it's pretty reasonable: - OS grains shouldn't ever be that kind of close - This test is only swapping out spaces, and only for the os grain. That would mean there would have to be two OSes with grains that only differ by one having a space where another one has any other character. - This test really isn't even about matching grains, we're just using compound matching and that's a reasonable one to use. * 3002.9: Fix pre-commit * 3003.5 Fix pre-commit * [3002.9] Replace use of 'sl' with 'paper' for Arch tests, due to 'sl' having key issues * Remove mojave testing * Remove mojave and high sierra testing * Remove mojave testing * [3002.9] Fix cloud vultr size issue * Update package name to aspnetcore-runtime-6.0 for redhat 8 pkg tests * Update package name to aspnetcore-runtime-6.0 for redhat 8 pkg tests * change amazon linux AMI * Migrate `unit.modules.test_gpg` to PyTest * Don't leave any `gpg-agent`'s running behind Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * Start a background process to generate entropy. Some tests have failed because of not enough entropy which then makes the test timeout. Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * A different approach at generating entropy Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * Turn entropy generation into a helper Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * change amazon linux AMI * change amazon linux AMI * [3004.2] Fix cloud vultr size issue * Fix cloud requirements * Skip pam tests on windows * Update ami to try to get the tests running * Update amis to try to get the tests running * Fixing test_publish_to_pubserv_ipc_tcp, moving the call to socket.socket into the while loop. * Add static requirements for 3.8 and 3.9 on Windows * Fix requirements Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * The whole CI process is already slower than GH Actions, no caches. * Pre-commit must not run with ``PIP_EXTRA_INDEX_URL`` set. * Lint fixes Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * Compile cloud requirements * Run add requirements files for 3.8 and 3.9 * Fix docs and cloud requirements * [3003.5] Fix cloud vultr size issue * Windows test fix * Skip test if docker not running * [3003.5] Fix pre-commit * Update Markup and contextfunction imports for jinja versions >=3.1. * update bootstrap to 2022.03.15 * update bootstrap to 2022.03.15 * skipping tests/pytests/integration/modules/test_virt.py on 3002.x and 3003.x branches. * Windows test fix * Skip PAM tests on Windows Windows has no ctypes with the PAM bits, so we should go ahead and skip on Windows. * Skip PAM auth tests on Windows Windows lacks the correct bits, so... * Fix pre-commit * Skipping tests since they're also skipped on the master branch Fixes #403 Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * Skip test that only runs because the patch binary is now available. The feature though, was only added in 3004. Fixes #404 Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * Skip test which is only supposed to run in Linux Fixes https://github.com/saltstack/salt-priv/issues/405 Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * GPG tests do not work on windows yet * Fix tests * Fix pre-commit * skip tests.integration.modules.test_mac_brew_pkg.BrewModuleTest.test_list_upgrades and tests.integration.modules.test_state.StateModuleTest.test_get_file_from_env_in_top_match on Mac OS. * skip tests.integration.modules.test_mac_brew_pkg.BrewModuleTest.test_list_upgrades and tests.integration.modules.test_state.StateModuleTest.test_get_file_from_env_in_top_match on Mac OS. * Removing skip, moving it to different PR. * Skipping tests on 3002.9. * test fix * Do not run patch tests on 3003.5. Feature not added till 3004 * skipping tests/pytests/integration/modules/test_virt.py on 3002.x and 3003.x branches. * Fix pre-commit * [3004.2] Update freebsd ami * Bump the git version for freebsd CI tests * removing versions that are no longer available from the tests.pytests.scenarios.compat.test_with_versions tests. * Skip tests on windows when NOT using static requirements Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * removing versions that are no longer available from the tests.pytests.scenarios.compat.test_with_versions tests. * test_issue_36469_tcp causes a fatal python error when run on Mac OS, so skipping. * Fix tests * Fix pre-commit * Do not run patch tests on 3003.5. Feature not added till 3004 * Skip archive tar tests on windows * [3002.9] Skip archive tar tests on windows * GPG tests do not work on windows yet * Skip test which is only supposed to run in Linux Fixes https://github.com/saltstack/salt-priv/issues/405 Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * Skip test that only runs because the patch binary is now available. The feature though, was only added in 3004. Fixes #404 Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * Skipping tests since they're also skipped on the master branch Fixes #403 Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * Fix pre-commit * Fix pre-commit * Fix pre-commit * Fix pre-commit * retry sdb.get if it returns None None is an entirely valid return - see EtcdClient.get in salt/utils/etcd_util.py * drop py2/six * fix etcd sdb.set as well * Fix etdcd-sdb test failure If docker container is up and running, but etcd isn't responding yet it's possible that we get some failing tests. This should wait a reasonable amount of time for things to come up. Or just skip the test. * Fix etdcd-sdb test failure If docker container is up and running, but etcd isn't responding yet it's possible that we get some failing tests. This should wait a reasonable amount of time for things to come up. Or just skip the test. * Skip the tests from unit/transport/test_zero.py that are hanging on Mac. * skip tests in tests/pytests/unit/states/test_archive.py for 3002.9 * 3002.9 Skipping CA permissions tests on Windows, similar to 3003.5 and 3004.2 * change skipif to skip * Rollback Windows AMIs to use Python 3.7 * Rollback AMI's to Python 3.7... fix tests * Fix failing test_archive tests * Build using pyenv * Add symlinks to openssl and rpath * Add shasum for zeromq 4.3.4 * Fix docs on scripts * Build zeromq earlier, fix symlinks * Bring 61446 to 3004.1 branch * Add changelog and tests * Fix schedule test flakiness * Retry with new port if in use * fixing failing tests, ensuring that the correct path is used. * fixing failing tests, ensuring that the correct path is used. * fixing failing tests, ensuring that the correct path is used. * Re-enable tiamat-pip on windows Signed-off-by: Pedro Algarvio <palgarvio@vmware.com> * Bump duration time for windwos for test_retry_option_success * Skip test cauing hangs * go go pylint disable * more pre-commit * oh lint * so many weird hook failures * Add unit tests for PAM auth CVE We could add functional tests if it's important enough, but this is the narrowest place to test. * Fix PAM auth CVE Credit to @ysf Previously we weren't checking the result of PAM_ACCT_MGMT. * pylint disable * rewrite hook changes * Skip PAM auth tests on Windows Since Windows ends out lacking the correct bits, no need to run tests there. * pre-commit fixes * docs 3004.2 release * Fix bug in tcp transport * Fix the test_zeromq_filtering test * skip test_npm_install_url_referenced_package on centos 7 and 8. * Swapping CentOS Linux-8 for CentOS Stream-8 * Update build scripts to use pyenv * Fix tests on MacOS * Fix bug in tcp transport * Fix test failures * Update release notes and man pages for 3003.5 * Add 3002.9 changelog, release notes, man pages * Update doc/topics/releases/3002.9.rst Co-authored-by: Megan Wilhite <mwilhite@vmware.com> * Fix requirements * Fix imports * Test fixup * Fix merge warts * fix merge wart in changelog * Fix merge warts in tests Co-authored-by: krionbsd <krion@FreeBSD.org> Co-authored-by: Megan Wilhite <megan.wilhite@gmail.com> Co-authored-by: Alexander Kriventsov <akriventsov@nic.ru> Co-authored-by: Megan Wilhite <mwilhite@vmware.com> Co-authored-by: Wayne Werner <wwerner@vmware.com> Co-authored-by: Gareth J. Greenaway <gareth@saltstack.com> Co-authored-by: David Murphy < dmurphy@saltstack.com> Co-authored-by: Twangboy <leesh@vmware.com> Co-authored-by: MKLeb <calebb@vmware.com> Co-authored-by: Pedro Algarvio <pedro@algarvio.me> Co-authored-by: Pedro Algarvio <palgarvio@vmware.com> Co-authored-by: Thomas Phipps <tphipps@vmware.com> Co-authored-by: Frode Gundersen <frogunder@gmail.com> Co-authored-by: Alyssa Rock <alyssa.rock@gmail.com> Co-authored-by: Alyssa Rock <43180546+barbaricyawps@users.noreply.github.com>
601 lines
20 KiB
Python
601 lines
20 KiB
Python
"""
|
|
Encapsulate the different transports available to Salt.
|
|
|
|
This includes client side transport, for the ReqServer and the Publisher
|
|
"""
|
|
|
|
|
|
import logging
|
|
import os
|
|
import time
|
|
import uuid
|
|
|
|
import salt.crypt
|
|
import salt.exceptions
|
|
import salt.ext.tornado.gen
|
|
import salt.ext.tornado.ioloop
|
|
import salt.payload
|
|
import salt.transport.frame
|
|
import salt.utils.event
|
|
import salt.utils.files
|
|
import salt.utils.minions
|
|
import salt.utils.stringutils
|
|
import salt.utils.verify
|
|
from salt.utils.asynchronous import SyncWrapper
|
|
|
|
try:
|
|
from M2Crypto import RSA
|
|
|
|
HAS_M2 = True
|
|
except ImportError:
|
|
HAS_M2 = False
|
|
try:
|
|
from Cryptodome.Cipher import PKCS1_OAEP
|
|
except ImportError:
|
|
try:
|
|
from Crypto.Cipher import PKCS1_OAEP # nosec
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class ReqChannel:
|
|
"""
|
|
Factory class to create a sychronous communication channels to the master's
|
|
ReqServer. ReqChannels use transports to connect to the ReqServer.
|
|
"""
|
|
|
|
@staticmethod
|
|
def factory(opts, **kwargs):
|
|
return SyncWrapper(
|
|
AsyncReqChannel.factory,
|
|
(opts,),
|
|
kwargs,
|
|
loop_kwarg="io_loop",
|
|
)
|
|
|
|
|
|
class PushChannel:
|
|
"""
|
|
Factory class to create Sync channel for push side of push/pull IPC
|
|
"""
|
|
|
|
@staticmethod
|
|
def factory(opts, **kwargs):
|
|
return SyncWrapper(
|
|
AsyncPushChannel.factory,
|
|
(opts,),
|
|
kwargs,
|
|
loop_kwarg="io_loop",
|
|
)
|
|
|
|
|
|
class PullChannel:
|
|
"""
|
|
Factory class to create Sync channel for pull side of push/pull IPC
|
|
"""
|
|
|
|
@staticmethod
|
|
def factory(opts, **kwargs):
|
|
return SyncWrapper(
|
|
AsyncPullChannel.factory,
|
|
(opts,),
|
|
kwargs,
|
|
loop_kwarg="io_loop",
|
|
)
|
|
|
|
|
|
class AsyncReqChannel:
|
|
"""
|
|
Factory class to create a asynchronous communication channels to the
|
|
master's ReqServer. ReqChannels connect to the master's ReqServerChannel.
|
|
"""
|
|
|
|
async_methods = [
|
|
"crypted_transfer_decode_dictentry",
|
|
"_crypted_transfer",
|
|
"_uncrypted_transfer",
|
|
"send",
|
|
"connect",
|
|
]
|
|
close_methods = [
|
|
"close",
|
|
]
|
|
|
|
@classmethod
|
|
def factory(cls, opts, **kwargs):
|
|
|
|
# Default to ZeroMQ for now
|
|
ttype = "zeromq"
|
|
# determine the ttype
|
|
if "transport" in opts:
|
|
ttype = opts["transport"]
|
|
elif "transport" in opts.get("pillar", {}).get("master", {}):
|
|
ttype = opts["pillar"]["master"]["transport"]
|
|
|
|
if "master_uri" not in opts and "master_uri" in kwargs:
|
|
opts["master_uri"] = kwargs["master_uri"]
|
|
io_loop = kwargs.get("io_loop")
|
|
if io_loop is None:
|
|
io_loop = salt.ext.tornado.ioloop.IOLoop.current()
|
|
|
|
crypt = kwargs.get("crypt", "aes")
|
|
if crypt != "clear":
|
|
# we don't need to worry about auth as a kwarg, since its a singleton
|
|
auth = salt.crypt.AsyncAuth(opts, io_loop=io_loop)
|
|
else:
|
|
auth = None
|
|
|
|
transport = salt.transport.request_client(opts, io_loop=io_loop)
|
|
return cls(opts, transport, auth)
|
|
|
|
def __init__(self, opts, transport, auth, **kwargs):
|
|
self.opts = dict(opts)
|
|
self.transport = transport
|
|
self.auth = auth
|
|
self.master_pubkey_path = None
|
|
if self.auth:
|
|
self.master_pubkey_path = os.path.join(self.opts["pki_dir"], self.auth.mpub)
|
|
self._closing = False
|
|
|
|
@property
|
|
def crypt(self):
|
|
if self.auth:
|
|
return "aes"
|
|
return "clear"
|
|
|
|
@property
|
|
def ttype(self):
|
|
return self.transport.ttype
|
|
|
|
def _package_load(self, load):
|
|
return {
|
|
"enc": self.crypt,
|
|
"load": load,
|
|
"version": 2,
|
|
}
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def crypted_transfer_decode_dictentry(
|
|
self,
|
|
load,
|
|
dictkey=None,
|
|
timeout=60,
|
|
):
|
|
nonce = uuid.uuid4().hex
|
|
load["nonce"] = nonce
|
|
if not self.auth.authenticated:
|
|
yield self.auth.authenticate()
|
|
ret = yield self.transport.send(
|
|
self._package_load(self.auth.crypticle.dumps(load)),
|
|
timeout=timeout,
|
|
)
|
|
key = self.auth.get_keys()
|
|
if "key" not in ret:
|
|
# Reauth in the case our key is deleted on the master side.
|
|
yield self.auth.authenticate()
|
|
ret = yield self.transport.send(
|
|
self._package_load(self.auth.crypticle.dumps(load)),
|
|
timeout=timeout,
|
|
)
|
|
if HAS_M2:
|
|
aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
|
|
else:
|
|
cipher = PKCS1_OAEP.new(key)
|
|
aes = cipher.decrypt(ret["key"])
|
|
|
|
# Decrypt using the public key.
|
|
pcrypt = salt.crypt.Crypticle(self.opts, aes)
|
|
signed_msg = pcrypt.loads(ret[dictkey])
|
|
|
|
# Validate the master's signature.
|
|
if not self.verify_signature(signed_msg["data"], signed_msg["sig"]):
|
|
raise salt.crypt.AuthenticationError(
|
|
"Pillar payload signature failed to validate."
|
|
)
|
|
|
|
# Make sure the signed key matches the key we used to decrypt the data.
|
|
data = salt.payload.loads(signed_msg["data"])
|
|
if data["key"] != ret["key"]:
|
|
raise salt.crypt.AuthenticationError("Key verification failed.")
|
|
|
|
# Validate the nonce.
|
|
if data["nonce"] != nonce:
|
|
raise salt.crypt.AuthenticationError("Pillar nonce verification failed.")
|
|
raise salt.ext.tornado.gen.Return(data["pillar"])
|
|
|
|
def verify_signature(self, data, sig):
|
|
return salt.crypt.verify_signature(self.master_pubkey_path, data, sig)
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def _crypted_transfer(self, load, timeout=60, raw=False):
|
|
"""
|
|
Send a load across the wire, with encryption
|
|
|
|
In case of authentication errors, try to renegotiate authentication
|
|
and retry the method.
|
|
|
|
Indeed, we can fail too early in case of a master restart during a
|
|
minion state execution call
|
|
|
|
:param dict load: A load to send across the wire
|
|
:param int timeout: The number of seconds on a response before failing
|
|
"""
|
|
nonce = uuid.uuid4().hex
|
|
if load and isinstance(load, dict):
|
|
load["nonce"] = nonce
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def _do_transfer():
|
|
# Yield control to the caller. When send() completes, resume by populating data with the Future.result
|
|
data = yield self.transport.send(
|
|
self._package_load(self.auth.crypticle.dumps(load)),
|
|
timeout=timeout,
|
|
)
|
|
# we may not have always data
|
|
# as for example for saltcall ret submission, this is a blind
|
|
# communication, we do not subscribe to return events, we just
|
|
# upload the results to the master
|
|
if data:
|
|
data = self.auth.crypticle.loads(data, raw, nonce=nonce)
|
|
if not raw or self.ttype == "tcp": # XXX Why is this needed for tcp
|
|
data = salt.transport.frame.decode_embedded_strs(data)
|
|
raise salt.ext.tornado.gen.Return(data)
|
|
|
|
if not self.auth.authenticated:
|
|
# Return control back to the caller, resume when authentication succeeds
|
|
yield self.auth.authenticate()
|
|
try:
|
|
# We did not get data back the first time. Retry.
|
|
ret = yield _do_transfer()
|
|
except salt.crypt.AuthenticationError:
|
|
# If auth error, return control back to the caller, continue when authentication succeeds
|
|
yield self.auth.authenticate()
|
|
ret = yield _do_transfer()
|
|
raise salt.ext.tornado.gen.Return(ret)
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def _uncrypted_transfer(self, load, timeout=60):
|
|
"""
|
|
Send a load across the wire in cleartext
|
|
|
|
:param dict load: A load to send across the wire
|
|
:param int timeout: The number of seconds on a response before failing
|
|
"""
|
|
ret = yield self.transport.send(
|
|
self._package_load(load),
|
|
timeout=timeout,
|
|
)
|
|
|
|
raise salt.ext.tornado.gen.Return(ret)
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def connect(self):
|
|
yield self.transport.connect()
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def send(self, load, tries=3, timeout=60, raw=False):
|
|
"""
|
|
Send a request, return a future which will complete when we send the message
|
|
|
|
:param dict load: A load to send across the wire
|
|
:param int tries: The number of times to make before failure
|
|
:param int timeout: The number of seconds on a response before failing
|
|
"""
|
|
_try = 1
|
|
while True:
|
|
try:
|
|
if self.crypt == "clear":
|
|
log.trace("ReqChannel send clear load=%r", load)
|
|
ret = yield self._uncrypted_transfer(load, timeout=timeout)
|
|
else:
|
|
log.trace("ReqChannel send crypt load=%r", load)
|
|
ret = yield self._crypted_transfer(load, timeout=timeout, raw=raw)
|
|
break
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
log.error("Failed to send msg %r", exc)
|
|
if _try >= tries:
|
|
raise
|
|
else:
|
|
_try += 1
|
|
continue
|
|
raise salt.ext.tornado.gen.Return(ret)
|
|
|
|
def close(self):
|
|
"""
|
|
Since the message_client creates sockets and assigns them to the IOLoop we have to
|
|
specifically destroy them, since we aren't the only ones with references to the FDs
|
|
"""
|
|
if self._closing:
|
|
return
|
|
log.debug("Closing %s instance", self.__class__.__name__)
|
|
self._closing = True
|
|
self.transport.close()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self.close()
|
|
|
|
|
|
class AsyncPubChannel:
|
|
"""
|
|
Factory class to create subscription channels to the master's Publisher
|
|
"""
|
|
|
|
async_methods = [
|
|
"connect",
|
|
"_decode_messages",
|
|
]
|
|
close_methods = [
|
|
"close",
|
|
]
|
|
|
|
@classmethod
|
|
def factory(cls, opts, **kwargs):
|
|
# Default to ZeroMQ for now
|
|
ttype = "zeromq"
|
|
|
|
# determine the ttype
|
|
if "transport" in opts:
|
|
ttype = opts["transport"]
|
|
elif "transport" in opts.get("pillar", {}).get("master", {}):
|
|
ttype = opts["pillar"]["master"]["transport"]
|
|
|
|
if "master_uri" not in opts and "master_uri" in kwargs:
|
|
opts["master_uri"] = kwargs["master_uri"]
|
|
|
|
# switch on available ttypes
|
|
if ttype == "detect":
|
|
opts["detect_mode"] = True
|
|
log.info("Transport is set to detect; using %s", ttype)
|
|
|
|
io_loop = kwargs.get("io_loop")
|
|
if io_loop is None:
|
|
io_loop = salt.ext.tornado.ioloop.IOLoop.current()
|
|
|
|
auth = salt.crypt.AsyncAuth(opts, io_loop=io_loop)
|
|
transport = salt.transport.publish_client(opts, io_loop)
|
|
return cls(opts, transport, auth, io_loop)
|
|
|
|
def __init__(self, opts, transport, auth, io_loop=None):
|
|
self.opts = opts
|
|
self.io_loop = io_loop
|
|
self.auth = auth
|
|
self.token = self.auth.gen_token(b"salt")
|
|
self.transport = transport
|
|
self._closing = False
|
|
self._reconnected = False
|
|
self.event = salt.utils.event.get_event("minion", opts=self.opts, listen=False)
|
|
self.master_pubkey_path = os.path.join(self.opts["pki_dir"], self.auth.mpub)
|
|
|
|
@property
|
|
def crypt(self):
|
|
return "aes" if self.auth else "clear"
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def connect(self):
|
|
"""
|
|
Return a future which completes when connected to the remote publisher
|
|
"""
|
|
try:
|
|
if not self.auth.authenticated:
|
|
log.error("WTF %r %r", self.auth.authenticated, self.auth.authenticate)
|
|
yield self.auth.authenticate()
|
|
# if this is changed from the default, we assume it was intentional
|
|
if int(self.opts.get("publish_port", 4506)) != 4506:
|
|
publish_port = self.opts.get("publish_port")
|
|
# else take the relayed publish_port master reports
|
|
else:
|
|
publish_port = self.auth.creds["publish_port"]
|
|
# TODO: The zeromq transport does not use connect_callback and
|
|
# disconnect_callback.
|
|
yield self.transport.connect(
|
|
publish_port, self.connect_callback, self.disconnect_callback
|
|
)
|
|
# TODO: better exception handling...
|
|
except KeyboardInterrupt: # pylint: disable=try-except-raise
|
|
raise
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
if "-|RETRY|-" not in str(exc):
|
|
raise salt.exceptions.SaltClientError(
|
|
"Unable to sign_in to master: {}".format(exc)
|
|
) # TODO: better error message
|
|
|
|
def close(self):
|
|
"""
|
|
Close the channel
|
|
"""
|
|
self.transport.close()
|
|
if self.event is not None:
|
|
self.event.destroy()
|
|
self.event = None
|
|
|
|
def on_recv(self, callback=None):
|
|
"""
|
|
When jobs are received pass them (decoded) to callback
|
|
"""
|
|
if callback is None:
|
|
return self.transport.on_recv(None)
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def wrap_callback(messages):
|
|
payload = yield self.transport._decode_messages(messages)
|
|
decoded = yield self._decode_payload(payload)
|
|
log.debug("PubChannel received: %r", decoded)
|
|
if decoded is not None:
|
|
callback(decoded)
|
|
|
|
return self.transport.on_recv(wrap_callback)
|
|
|
|
def _package_load(self, load):
|
|
return {
|
|
"enc": self.crypt,
|
|
"load": load,
|
|
"version": 2,
|
|
}
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def send_id(self, tok, force_auth):
|
|
"""
|
|
Send the minion id to the master so that the master may better
|
|
track the connection state of the minion.
|
|
In case of authentication errors, try to renegotiate authentication
|
|
and retry the method.
|
|
"""
|
|
load = {"id": self.opts["id"], "tok": tok}
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def _do_transfer():
|
|
msg = self._package_load(self.auth.crypticle.dumps(load))
|
|
package = salt.transport.frame.frame_msg(msg, header=None)
|
|
yield self.transport.send(package)
|
|
|
|
raise salt.ext.tornado.gen.Return(True)
|
|
|
|
if force_auth or not self.auth.authenticated:
|
|
count = 0
|
|
while (
|
|
count <= self.opts["tcp_authentication_retries"]
|
|
or self.opts["tcp_authentication_retries"] < 0
|
|
):
|
|
try:
|
|
yield self.auth.authenticate()
|
|
break
|
|
except salt.exceptions.SaltClientError as exc:
|
|
log.debug(exc)
|
|
count += 1
|
|
try:
|
|
ret = yield _do_transfer()
|
|
raise salt.ext.tornado.gen.Return(ret)
|
|
except salt.crypt.AuthenticationError:
|
|
yield self.auth.authenticate()
|
|
ret = yield _do_transfer()
|
|
raise salt.ext.tornado.gen.Return(ret)
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def connect_callback(self, result):
|
|
if self._closing:
|
|
return
|
|
try:
|
|
# Force re-auth on reconnect since the master
|
|
# may have been restarted
|
|
yield self.send_id(self.token, self._reconnected)
|
|
self.connected = True
|
|
self.event.fire_event({"master": self.opts["master"]}, "__master_connected")
|
|
if self._reconnected:
|
|
# On reconnects, fire a master event to notify that the minion is
|
|
# available.
|
|
if self.opts.get("__role") == "syndic":
|
|
data = "Syndic {} started at {}".format(
|
|
self.opts["id"], time.asctime()
|
|
)
|
|
tag = salt.utils.event.tagify([self.opts["id"], "start"], "syndic")
|
|
else:
|
|
data = "Minion {} started at {}".format(
|
|
self.opts["id"], time.asctime()
|
|
)
|
|
tag = salt.utils.event.tagify([self.opts["id"], "start"], "minion")
|
|
load = {
|
|
"id": self.opts["id"],
|
|
"cmd": "_minion_event",
|
|
"pretag": None,
|
|
"tok": self.token,
|
|
"data": data,
|
|
"tag": tag,
|
|
}
|
|
req_channel = AsyncReqChannel.factory(self.opts)
|
|
try:
|
|
yield req_channel.send(load, timeout=60)
|
|
except salt.exceptions.SaltReqTimeoutError:
|
|
log.info(
|
|
"fire_master failed: master could not be contacted. Request timed"
|
|
" out."
|
|
)
|
|
except Exception: # pylint: disable=broad-except
|
|
log.info("fire_master failed", exc_info=True)
|
|
finally:
|
|
# SyncWrapper will call either close() or destroy(), whichever is available
|
|
del req_channel
|
|
else:
|
|
self._reconnected = True
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
log.error(
|
|
"Caught exception in PubChannel connect callback %r", exc, exc_info=True
|
|
)
|
|
|
|
def disconnect_callback(self):
|
|
if self._closing:
|
|
return
|
|
self.connected = False
|
|
self.event.fire_event({"master": self.opts["master"]}, "__master_disconnected")
|
|
|
|
def _verify_master_signature(self, payload):
|
|
if self.opts.get("sign_pub_messages"):
|
|
if not payload.get("sig", False):
|
|
raise salt.crypt.AuthenticationError(
|
|
"Message signing is enabled but the payload has no signature."
|
|
)
|
|
|
|
# Verify that the signature is valid
|
|
if not salt.crypt.verify_signature(
|
|
self.master_pubkey_path, payload["load"], payload.get("sig")
|
|
):
|
|
raise salt.crypt.AuthenticationError(
|
|
"Message signature failed to validate."
|
|
)
|
|
|
|
@salt.ext.tornado.gen.coroutine
|
|
def _decode_payload(self, payload):
|
|
# we need to decrypt it
|
|
log.trace("Decoding payload: %s", payload)
|
|
if payload["enc"] == "aes":
|
|
self._verify_master_signature(payload)
|
|
try:
|
|
payload["load"] = self.auth.crypticle.loads(payload["load"])
|
|
except salt.crypt.AuthenticationError:
|
|
yield self.auth.authenticate()
|
|
payload["load"] = self.auth.crypticle.loads(payload["load"])
|
|
|
|
raise salt.ext.tornado.gen.Return(payload)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self.close()
|
|
|
|
|
|
class AsyncPushChannel:
|
|
"""
|
|
Factory class to create IPC Push channels
|
|
"""
|
|
|
|
@staticmethod
|
|
def factory(opts, **kwargs):
|
|
"""
|
|
If we have additional IPC transports other than UxD and TCP, add them here
|
|
"""
|
|
# FIXME for now, just UXD
|
|
# Obviously, this makes the factory approach pointless, but we'll extend later
|
|
import salt.transport.ipc
|
|
|
|
return salt.transport.ipc.IPCMessageClient(opts, **kwargs)
|
|
|
|
|
|
class AsyncPullChannel:
|
|
"""
|
|
Factory class to create IPC pull channels
|
|
"""
|
|
|
|
@staticmethod
|
|
def factory(opts, **kwargs):
|
|
"""
|
|
If we have additional IPC transports other than UXD and TCP, add them here
|
|
"""
|
|
import salt.transport.ipc
|
|
|
|
return salt.transport.ipc.IPCMessageServer(opts, **kwargs)
|