SlashBot/SlashBot.py

121 lines
3.7 KiB
Python
Raw Normal View History

2020-10-26 02:11:53 +08:00
import os
2020-10-26 16:08:50 +08:00
import re
from typing import List, Dict, Union
import requests
2020-10-26 16:08:50 +08:00
from telegram.ext import Updater, MessageHandler, filters
TELEGRAM = 777000
GROUP = 1087968824
2020-10-26 02:11:53 +08:00
Filters = filters.Filters
2020-11-29 19:11:19 +08:00
parser = re.compile(r'^\/((?:[^  \\]|\\.)+)([  ]*)(.*)$')
escaping = ('\\ ', '\\ ')
2020-10-26 02:11:53 +08:00
# Docker env
if os.environ.get('TOKEN') and os.environ['TOKEN'] != 'X':
Token = os.environ['TOKEN']
else:
raise Exception('no token')
2020-10-26 16:08:50 +08:00
# Find someone's full name by their username
def find_name_by_username(username: str) -> str:
r = requests.get(f'https://t.me/{username.replace("@", "")}')
return re.search('(?<=<meta property="og:title" content=").*(?=")', r.text, re.IGNORECASE).group(0)
def get_user(msg):
if msg['from']['id'] == TELEGRAM:
return {'first_name': msg['forward_from_chat']['title'], 'id': msg['forward_from_chat']['id']}
elif msg['from']['id'] == GROUP:
return {'first_name': msg['chat']['title'], 'id': msg['chat']['id']}
else:
return msg['from']
def get_users(msg):
msg_from = msg
if 'reply_to_message' in msg.keys():
msg_rpl = msg['reply_to_message']
else:
msg_rpl = msg_from.copy()
from_user, rpl_user = get_user(msg_from), get_user(msg_rpl)
2021-08-05 00:18:42 +08:00
# Not replying to anything
if rpl_user == from_user:
2021-08-05 00:18:42 +08:00
# Detect if the message contains a mention. If it has, use the mentioned user.
entities: List[Dict[str, Union[str, int]]] = msg['entities']
mentions = [e for e in entities if e['type'] == 'mention']
if mentions:
# Find username
offset = mentions[0]['offset']
length = mentions[0]['length']
text = msg['text']
username = text[offset : offset + length]
rpl_user = {'first_name': find_name_by_username(username), 'username': username}
# Remove mention from message text
msg['text'] = text[:offset] + text[offset + length:]
2021-08-05 00:18:42 +08:00
else:
rpl_user = {'first_name': '自己', 'id': rpl_user['id']}
return from_user, rpl_user
# Create mention string from user
def mention(user: Dict[str, str]) -> str:
# Combine name
last = user.get('last_name', '')
first = user['first_name']
name = first + (f' {last}' if last else '')
# Create user reference link
username = user.get('username', '')
uid = user.get('id', '')
link = f'tg://resolve?domain={username}' if username else f'tg://user?id={uid}'
return f"[{name}]({link})"
2020-10-26 02:11:53 +08:00
2020-10-26 16:08:50 +08:00
def get_text(mention_from, mention_rpl, command):
2020-11-29 19:35:03 +08:00
parsed = list(parser.search(delUsername.sub('', command)).groups())
2020-11-29 19:11:19 +08:00
for escape in escaping:
parsed[0] = parsed[0].replace(escape, escape[1:])
2020-11-06 02:55:01 +08:00
if parsed[0] == 'me':
return f"{mention_from}{bool(parsed[1])*' '}{parsed[2]}"
2020-11-07 22:54:07 +08:00
elif parsed[0] == 'you':
return f"{mention_rpl}{bool(parsed[1])*' '}{parsed[2]}"
2020-11-06 02:55:01 +08:00
elif parsed[2]:
2020-10-26 16:08:50 +08:00
return f"{mention_from} {parsed[0]} {mention_rpl} {parsed[2]}"
else:
return f"{mention_from} {parsed[0]}{mention_rpl}"
2020-10-26 02:11:53 +08:00
def reply(update, context):
2020-11-06 02:55:01 +08:00
print(update.to_dict())
2020-10-26 02:11:53 +08:00
msg = update.to_dict()['message']
from_user, rpl_user = get_users(msg)
2020-10-26 15:36:12 +08:00
# Escape markdown
command = msg['text']
command = command.replace("_", "\\_").replace("*", "\\*").replace("[", "\\[").replace("`", "\\`")
2020-10-26 15:36:12 +08:00
text = get_text(mention(from_user), mention(rpl_user), command)
print(text, end='\n\n')
2020-10-26 02:11:53 +08:00
2020-10-26 15:36:12 +08:00
update.effective_message.reply_text(text, parse_mode='Markdown')
2020-10-26 02:11:53 +08:00
2021-08-05 11:47:04 +08:00
if __name__ == '__main__':
updater = Updater(token=Token, use_context=True)
delUsername = re.compile('@' + updater.bot.username, re.I)
dp = updater.dispatcher
dp.add_handler(MessageHandler(Filters.regex(parser), reply))
updater.start_polling()
updater.idle()