mirror of
https://github.com/saltstack/salt.git
synced 2025-04-17 10:10:20 +00:00
Syndic fix-up
- Update _handle_decode_payload methods to use async/await. - Syndic closes request channels before creating new ones.
This commit is contained in:
parent
2240c08406
commit
11a06ce0da
1 changed files with 13 additions and 8 deletions
|
@ -1728,8 +1728,7 @@ class Minion(MinionBase):
|
|||
# pylint: enable=unexpected-keyword-arg
|
||||
return True
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def _handle_decoded_payload(self, data):
|
||||
async def _handle_decoded_payload(self, data):
|
||||
"""
|
||||
Override this method if you wish to handle the decoded data
|
||||
differently.
|
||||
|
@ -1786,7 +1785,7 @@ class Minion(MinionBase):
|
|||
" waiting...",
|
||||
data["jid"],
|
||||
)
|
||||
yield tornado.gen.sleep(10)
|
||||
await asyncio.sleep(10)
|
||||
process_count = len(salt.utils.minion.running(self.opts))
|
||||
|
||||
# We stash an instance references to allow for the socket
|
||||
|
@ -3323,8 +3322,9 @@ class Syndic(Minion):
|
|||
self.jids = {}
|
||||
self.raw_events = []
|
||||
self.pub_future = None
|
||||
self.async_req_channel = None
|
||||
|
||||
def _handle_decoded_payload(self, data):
|
||||
async def _handle_decoded_payload(self, data):
|
||||
"""
|
||||
Override this method if you wish to handle the decoded data
|
||||
differently.
|
||||
|
@ -3366,6 +3366,7 @@ class Syndic(Minion):
|
|||
data["jid"],
|
||||
data["to"],
|
||||
io_loop=self.io_loop,
|
||||
listen=False,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
@ -3417,6 +3418,11 @@ class Syndic(Minion):
|
|||
management of the event bus assuming that these are handled outside
|
||||
the tune_in sequence
|
||||
"""
|
||||
if self.req_channel:
|
||||
self.req_channel.close()
|
||||
if self.async_req_channel:
|
||||
self.async_req_channel.close()
|
||||
|
||||
# Instantiate the local client
|
||||
self.local = salt.client.get_local_client(
|
||||
self.opts["_minion_conf_file"], io_loop=self.io_loop
|
||||
|
@ -3427,10 +3433,10 @@ class Syndic(Minion):
|
|||
self.req_channel = salt.channel.client.ReqChannel.factory(self.opts)
|
||||
self.async_req_channel = salt.channel.client.AsyncReqChannel.factory(self.opts)
|
||||
|
||||
def _process_cmd_socket(self, payload):
|
||||
async def _process_cmd_socket(self, payload):
|
||||
if payload is not None and payload["enc"] == "aes":
|
||||
log.trace("Handling payload")
|
||||
self._handle_decoded_payload(payload["load"])
|
||||
await self._handle_decoded_payload(payload["load"])
|
||||
# If it's not AES, and thus has not been verified, we do nothing.
|
||||
# In the future, we could add support for some clearfuncs, but
|
||||
# the syndic currently has no need.
|
||||
|
@ -3918,8 +3924,7 @@ class ProxyMinion(Minion):
|
|||
mp_call = _metaproxy_call(self.opts, "handle_payload")
|
||||
return mp_call(self, payload)
|
||||
|
||||
@tornado.gen.coroutine
|
||||
def _handle_decoded_payload(self, data):
|
||||
async def _handle_decoded_payload(self, data):
|
||||
mp_call = _metaproxy_call(self.opts, "handle_decoded_payload")
|
||||
return mp_call(self, data)
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue