mirror of
https://github.com/saltstack/salt.git
synced 2025-04-16 09:40:20 +00:00
Some more cleanup. Using collections.deque to gather up the messages from Slack before they are handled.
This commit is contained in:
parent
46358c645b
commit
b393a7a6a8
4 changed files with 126 additions and 76 deletions
1
changelog/57842.fixed
Normal file
1
changelog/57842.fixed
Normal file
|
@ -0,0 +1 @@
|
|||
Updating Slack engine to use slack_bolt library.
|
|
@ -146,6 +146,7 @@ must be quoted, or else PyYAML will fail to load the configuration.
|
|||
"""
|
||||
|
||||
import ast
|
||||
import collections
|
||||
import datetime
|
||||
import itertools
|
||||
import logging
|
||||
|
@ -166,8 +167,8 @@ import salt.utils.slack
|
|||
import salt.utils.yaml
|
||||
|
||||
try:
|
||||
from slack_bolt import App
|
||||
from slack_bolt.adapter.socket_mode import SocketModeHandler
|
||||
import slack_bolt
|
||||
import slack_bolt.adapter.socket_mode
|
||||
|
||||
HAS_SLACKBOLT = True
|
||||
except ImportError:
|
||||
|
@ -180,19 +181,34 @@ __virtualname__ = "slack"
|
|||
|
||||
def __virtual__():
|
||||
if not HAS_SLACKBOLT:
|
||||
return (False, "The 'slackclient' Python module could not be loaded")
|
||||
return (False, "The 'slack_bolt' Python module could not be loaded")
|
||||
return __virtualname__
|
||||
|
||||
|
||||
class SlackClient:
|
||||
def __init__(self, app_token, bot_token):
|
||||
def __init__(self, app_token, bot_token, trigger_string):
|
||||
self.master_minion = salt.minion.MasterMinion(__opts__)
|
||||
|
||||
self.app = App(token=bot_token)
|
||||
self.handler = SocketModeHandler(self.app, app_token)
|
||||
self.app = slack_bolt.App(token=bot_token)
|
||||
self.handler = slack_bolt.adapter.socket_mode.SocketModeHandler(
|
||||
self.app, app_token
|
||||
)
|
||||
self.handler.connect()
|
||||
|
||||
self.app.message(re.compile("(^!.*)"))(self.message_trigger)
|
||||
self.app_token = app_token
|
||||
self.bot_token = bot_token
|
||||
|
||||
self.msg_queue = collections.deque()
|
||||
|
||||
trigger_pattern = "(^{}.*)".format(trigger_string)
|
||||
|
||||
# Register message_trigger when we see messages that start
|
||||
# with the trigger string
|
||||
self.app.message(re.compile(trigger_pattern))(self.message_trigger)
|
||||
|
||||
def message_trigger(self, message):
|
||||
# Add the received message to the queue
|
||||
self.msg_queue.append(message)
|
||||
|
||||
def get_slack_users(self, token):
|
||||
"""
|
||||
|
@ -547,13 +563,12 @@ class SlackClient:
|
|||
return data
|
||||
|
||||
for sleeps in (5, 10, 30, 60):
|
||||
if self.slack_connect:
|
||||
if self.handler:
|
||||
break
|
||||
else:
|
||||
# see https://api.slack.com/docs/rate-limits
|
||||
log.warning(
|
||||
"Slack connection is invalid. Server: %s, sleeping %s",
|
||||
self.sc.server,
|
||||
"Slack connection is invalid, sleeping %s",
|
||||
sleeps,
|
||||
)
|
||||
time.sleep(
|
||||
|
@ -562,51 +577,51 @@ class SlackClient:
|
|||
else:
|
||||
raise UserWarning(
|
||||
"Connection to slack is still invalid, giving up: {}".format(
|
||||
self.slack_connect
|
||||
self.handler
|
||||
)
|
||||
) # Boom!
|
||||
while True:
|
||||
msg = self.sc.rtm_read()
|
||||
for m_data in msg:
|
||||
while self.msg_queue:
|
||||
msg = self.msg_queue.popleft()
|
||||
try:
|
||||
msg_text = self.message_text(m_data)
|
||||
msg_text = self.message_text(msg)
|
||||
except (ValueError, TypeError) as msg_err:
|
||||
log.debug(
|
||||
"Got an error from trying to get the message text %s", msg_err
|
||||
)
|
||||
yield {"message_data": m_data} # Not a message type from the API?
|
||||
yield {"message_data": msg} # Not a message type from the API?
|
||||
continue
|
||||
|
||||
# Find the channel object from the channel name
|
||||
channel = self.sc.server.channels.find(m_data["channel"])
|
||||
data = just_data(m_data)
|
||||
channel = msg["channel"]
|
||||
data = just_data(msg)
|
||||
if msg_text.startswith(trigger_string):
|
||||
loaded_groups = self.get_config_groups(groups, groups_pillar_name)
|
||||
if not data.get("user_name"):
|
||||
log.error(
|
||||
"The user %s can not be looked up via slack. What has"
|
||||
" happened here?",
|
||||
m_data.get("user"),
|
||||
msg.get("user"),
|
||||
)
|
||||
channel.send_message(
|
||||
"The user {} can not be looked up via slack. Not"
|
||||
" running {}".format(data["user_id"], msg_text)
|
||||
)
|
||||
yield {"message_data": m_data}
|
||||
yield {"message_data": msg}
|
||||
continue
|
||||
(allowed, target, cmdline) = self.control_message_target(
|
||||
data["user_name"], msg_text, loaded_groups, trigger_string
|
||||
)
|
||||
log.debug("Got target: %s, cmdline: %s", target, cmdline)
|
||||
if allowed:
|
||||
yield {
|
||||
"message_data": m_data,
|
||||
"channel": m_data["channel"],
|
||||
ret = {
|
||||
"message_data": msg,
|
||||
"channel": msg["channel"],
|
||||
"user": data["user_id"],
|
||||
"user_name": data["user_name"],
|
||||
"cmdline": cmdline,
|
||||
"target": target,
|
||||
}
|
||||
yield ret
|
||||
continue
|
||||
else:
|
||||
channel.send_message(
|
||||
|
@ -780,39 +795,42 @@ class SlackClient:
|
|||
# Drain the slack messages, up to 10 messages at a clip
|
||||
count = 0
|
||||
for msg in message_generator:
|
||||
# The message_generator yields dicts. Leave this loop
|
||||
# on a dict that looks like {'done': True} or when we've done it
|
||||
# 10 times without taking a break.
|
||||
log.trace("Got a message from the generator: %s", msg.keys())
|
||||
if count > 10:
|
||||
log.warning(
|
||||
"Breaking in getting messages because count is exceeded"
|
||||
)
|
||||
break
|
||||
if not msg:
|
||||
count += 1
|
||||
log.warning("Skipping an empty message.")
|
||||
continue # This one is a dud, get the next message
|
||||
if msg.get("done"):
|
||||
log.trace("msg is done")
|
||||
break
|
||||
if fire_all:
|
||||
log.debug("Firing message to the bus with tag: %s", tag)
|
||||
log.debug("%s %s", tag, msg)
|
||||
self.fire("{}/{}".format(tag, msg["message_data"].get("type")), msg)
|
||||
if control and (len(msg) > 1) and msg.get("cmdline"):
|
||||
channel = self.sc.server.channels.find(msg["channel"])
|
||||
jid = self.run_command_async(msg)
|
||||
log.debug("Submitted a job and got jid: %s", jid)
|
||||
outstanding[
|
||||
jid
|
||||
] = msg # record so we can return messages to the caller
|
||||
channel.send_message(
|
||||
"@{}'s job is submitted as salt jid {}".format(
|
||||
if msg:
|
||||
# The message_generator yields dicts. Leave this loop
|
||||
# on a dict that looks like {'done': True} or when we've done it
|
||||
# 10 times without taking a break.
|
||||
log.trace("Got a message from the generator: %s", msg.keys())
|
||||
if count > 10:
|
||||
log.warning(
|
||||
"Breaking in getting messages because count is exceeded"
|
||||
)
|
||||
break
|
||||
if not msg:
|
||||
count += 1
|
||||
log.warning("Skipping an empty message.")
|
||||
continue # This one is a dud, get the next message
|
||||
if msg.get("done"):
|
||||
log.trace("msg is done")
|
||||
break
|
||||
if fire_all:
|
||||
log.debug("Firing message to the bus with tag: %s", tag)
|
||||
log.debug("%s %s", tag, msg)
|
||||
self.fire(
|
||||
"{}/{}".format(tag, msg["message_data"].get("type")), msg
|
||||
)
|
||||
if control and (len(msg) > 1) and msg.get("cmdline"):
|
||||
jid = self.run_command_async(msg)
|
||||
log.debug("Submitted a job and got jid: %s", jid)
|
||||
outstanding[
|
||||
jid
|
||||
] = msg # record so we can return messages to the caller
|
||||
text_msg = "@{}'s job is submitted as salt jid {}".format(
|
||||
msg["user_name"], jid
|
||||
)
|
||||
)
|
||||
count += 1
|
||||
self.app.client.chat_postMessage(
|
||||
channel=msg["channel"], text=text_msg
|
||||
)
|
||||
count += 1
|
||||
start_time = time.time()
|
||||
job_status = self.get_jobs_from_runner(
|
||||
outstanding.keys()
|
||||
|
@ -829,7 +847,7 @@ class SlackClient:
|
|||
log.debug("ret to send back is %s", result)
|
||||
# formatting function?
|
||||
this_job = outstanding[jid]
|
||||
channel = self.sc.server.channels.find(this_job["channel"])
|
||||
channel = this_job["channel"]
|
||||
return_text = self.format_return_text(result, function)
|
||||
return_prefix = (
|
||||
"@{}'s job `{}` (id: {}) (target: {}) returned".format(
|
||||
|
@ -839,19 +857,19 @@ class SlackClient:
|
|||
this_job["target"],
|
||||
)
|
||||
)
|
||||
channel.send_message(return_prefix)
|
||||
self.app.client.chat_postMessage(
|
||||
channel=channel, text=return_prefix
|
||||
)
|
||||
ts = time.time()
|
||||
st = datetime.datetime.fromtimestamp(ts).strftime("%Y%m%d%H%M%S%f")
|
||||
filename = "salt-results-{}.yaml".format(st)
|
||||
r = self.sc.api_call(
|
||||
"files.upload",
|
||||
channels=channel.id,
|
||||
resp = self.app.client.files_upload(
|
||||
channels=channel,
|
||||
filename=filename,
|
||||
content=return_text,
|
||||
)
|
||||
# Handle unicode return
|
||||
log.debug("Got back %s via the slack client", r)
|
||||
resp = salt.utils.yaml.safe_load(salt.utils.json.dumps(r))
|
||||
log.debug("Got back %s via the slack client", resp)
|
||||
if "ok" in resp and resp["ok"] is False:
|
||||
this_job["channel"].send_message(
|
||||
"Error: {}".format(resp["error"])
|
||||
|
@ -919,7 +937,8 @@ class SlackClient:
|
|||
|
||||
|
||||
def start(
|
||||
token,
|
||||
app_token,
|
||||
bot_token,
|
||||
control=False,
|
||||
trigger="!",
|
||||
groups=None,
|
||||
|
@ -931,18 +950,18 @@ def start(
|
|||
Listen to slack events and forward them to salt, new version
|
||||
"""
|
||||
|
||||
if (not token) or (not token.startswith("xoxb")):
|
||||
if (not bot_token) or (not bot_token.startswith("xoxb")):
|
||||
time.sleep(2) # don't respawn too quickly
|
||||
log.error("Slack bot token not found, bailing...")
|
||||
raise UserWarning("Slack Engine bot token not configured")
|
||||
|
||||
app_token = "xapp-1-A047F7H80DC-4245337892359-e26770884d0e159372cdeb768fa44ca62523f144c7082d4d56d76127e3619456"
|
||||
bot_token = "xoxb-2848035968-4245469590103-ZE5uptNNYhffMiM8rND5iX01"
|
||||
try:
|
||||
client = SlackClient(app_token=app_token, bot_token=bot_token)
|
||||
# message_generator = client.generate_triggered_messages(
|
||||
# token, trigger, groups, groups_pillar_name
|
||||
# )
|
||||
# client.run_commands_from_slack_async(message_generator, fire_all, tag, control)
|
||||
client = SlackClient(
|
||||
app_token=app_token, bot_token=bot_token, trigger_string=trigger
|
||||
)
|
||||
message_generator = client.generate_triggered_messages(
|
||||
bot_token, trigger, groups, groups_pillar_name
|
||||
)
|
||||
client.run_commands_from_slack_async(message_generator, fire_all, tag, control)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
raise Exception("{}".format(traceback.format_exc()))
|
||||
|
|
|
@ -46,7 +46,7 @@ def query(
|
|||
ret = {"message": "", "res": True}
|
||||
|
||||
slack_functions = {
|
||||
"rooms": {"request": "channels.list", "response": "channels"},
|
||||
"rooms": {"request": "conversations.list", "response": "channels"},
|
||||
"users": {"request": "users.list", "response": "members"},
|
||||
"message": {"request": "chat.postMessage", "response": "channel"},
|
||||
}
|
||||
|
|
|
@ -9,11 +9,33 @@ from tests.support.mock import MagicMock, patch
|
|||
|
||||
pytestmark = [
|
||||
pytest.mark.skipif(
|
||||
slack.HAS_SLACKCLIENT is False, reason="The SlackClient is not installed"
|
||||
slack.HAS_SLACKBOLT is False, reason="The slack_bolt is not installed"
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
class MockSlackBoltSocketMode:
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
def connect(self, *args, **kwargs):
|
||||
return True
|
||||
|
||||
|
||||
class MockSlackBoltApp:
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
self.client = None
|
||||
self.logger = None
|
||||
self.proxy = None
|
||||
|
||||
def message(self, *args, **kwargs):
|
||||
return MagicMock(return_value=True)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def configure_loader_modules():
|
||||
return {slack: {}}
|
||||
|
@ -22,12 +44,20 @@ def configure_loader_modules():
|
|||
@pytest.fixture
|
||||
def slack_client():
|
||||
mock_opts = salt.config.DEFAULT_MINION_OPTS.copy()
|
||||
token = "xoxb-xxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
app_token = "xapp-x-xxxxxxxxxxx-xxxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
bot_token = "xoxb-xxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxx"
|
||||
trigger = "!"
|
||||
|
||||
with patch.dict(slack.__opts__, mock_opts):
|
||||
with patch("slackclient.SlackClient.rtm_connect", MagicMock(return_value=True)):
|
||||
slack_client = slack.SlackClient(token)
|
||||
yield slack_client
|
||||
with patch(
|
||||
"slack_bolt.App", MagicMock(autospec=True, return_value=MockSlackBoltApp())
|
||||
):
|
||||
with patch(
|
||||
"slack_bolt.adapter.socket_mode.SocketModeHandler",
|
||||
MagicMock(autospec=True, return_value=MockSlackBoltSocketMode()),
|
||||
):
|
||||
slack_client = slack.SlackClient(app_token, bot_token, trigger)
|
||||
yield slack_client
|
||||
|
||||
|
||||
def test_control_message_target(slack_client):
|
||||
|
|
Loading…
Add table
Reference in a new issue