2020-10-26 02:11:53 +08:00
|
|
|
|
import os
|
2020-10-26 16:08:50 +08:00
|
|
|
|
import re
|
2021-08-04 22:54:49 +08:00
|
|
|
|
from typing import List, Dict, Union
|
|
|
|
|
|
|
|
|
|
import requests
|
2020-10-26 16:08:50 +08:00
|
|
|
|
from telegram.ext import Updater, MessageHandler, filters
|
|
|
|
|
|
2020-11-10 01:46:55 +08:00
|
|
|
|
TELEGRAM = 777000
|
|
|
|
|
GROUP = 1087968824
|
2020-10-26 02:11:53 +08:00
|
|
|
|
Filters = filters.Filters
|
2021-08-06 14:00:01 +08:00
|
|
|
|
parser = re.compile(r'^([\\/]_?)((?:[^ \\]|\\.)+)[ ]*(.*)$')
|
2020-11-29 19:11:19 +08:00
|
|
|
|
escaping = ('\\ ', '\\ ')
|
2021-08-06 14:00:01 +08:00
|
|
|
|
markdownEscape = lambda s: s.replace("_", "\\_").replace("*", "\\*").replace("[", "\\[").replace("`", "\\`")
|
2020-10-26 02:11:53 +08:00
|
|
|
|
|
|
|
|
|
# Docker env
|
|
|
|
|
if os.environ.get('TOKEN') and os.environ['TOKEN'] != 'X':
|
|
|
|
|
Token = os.environ['TOKEN']
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('no token')
|
|
|
|
|
|
2020-10-26 16:08:50 +08:00
|
|
|
|
|
2021-08-04 22:54:49 +08:00
|
|
|
|
# Find someone's full name by their username
|
|
|
|
|
def find_name_by_username(username: str) -> str:
|
2021-08-05 00:15:51 +08:00
|
|
|
|
r = requests.get(f'https://t.me/{username.replace("@", "")}')
|
2021-08-04 22:54:49 +08:00
|
|
|
|
return re.search('(?<=<meta property="og:title" content=").*(?=")', r.text, re.IGNORECASE).group(0)
|
|
|
|
|
|
|
|
|
|
|
2020-11-10 01:46:55 +08:00
|
|
|
|
def get_user(msg):
|
|
|
|
|
if msg['from']['id'] == TELEGRAM:
|
|
|
|
|
return {'first_name': msg['forward_from_chat']['title'], 'id': msg['forward_from_chat']['id']}
|
|
|
|
|
elif msg['from']['id'] == GROUP:
|
|
|
|
|
return {'first_name': msg['chat']['title'], 'id': msg['chat']['id']}
|
|
|
|
|
else:
|
|
|
|
|
return msg['from']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_users(msg):
|
|
|
|
|
msg_from = msg
|
|
|
|
|
if 'reply_to_message' in msg.keys():
|
|
|
|
|
msg_rpl = msg['reply_to_message']
|
|
|
|
|
else:
|
|
|
|
|
msg_rpl = msg_from.copy()
|
|
|
|
|
from_user, rpl_user = get_user(msg_from), get_user(msg_rpl)
|
2021-08-06 14:00:01 +08:00
|
|
|
|
reply_self = rpl_user == from_user
|
2021-08-05 00:18:42 +08:00
|
|
|
|
|
|
|
|
|
# Not replying to anything
|
2021-08-06 14:00:01 +08:00
|
|
|
|
if reply_self:
|
2021-08-05 00:18:42 +08:00
|
|
|
|
|
|
|
|
|
# Detect if the message contains a mention. If it has, use the mentioned user.
|
|
|
|
|
entities: List[Dict[str, Union[str, int]]] = msg['entities']
|
|
|
|
|
mentions = [e for e in entities if e['type'] == 'mention']
|
|
|
|
|
if mentions:
|
|
|
|
|
|
|
|
|
|
# Find username
|
|
|
|
|
offset = mentions[0]['offset']
|
|
|
|
|
length = mentions[0]['length']
|
|
|
|
|
text = msg['text']
|
|
|
|
|
username = text[offset : offset + length]
|
|
|
|
|
rpl_user = {'first_name': find_name_by_username(username), 'username': username}
|
|
|
|
|
|
2021-08-05 00:57:22 +08:00
|
|
|
|
# Remove mention from message text
|
|
|
|
|
msg['text'] = text[:offset] + text[offset + length:]
|
|
|
|
|
|
2021-08-05 00:18:42 +08:00
|
|
|
|
else:
|
|
|
|
|
rpl_user = {'first_name': '自己', 'id': rpl_user['id']}
|
|
|
|
|
|
2021-08-06 14:00:01 +08:00
|
|
|
|
return from_user, rpl_user, reply_self
|
2020-11-10 01:46:55 +08:00
|
|
|
|
|
|
|
|
|
|
2021-08-05 00:27:25 +08:00
|
|
|
|
# Create mention string from user
|
|
|
|
|
def mention(user: Dict[str, str]) -> str:
|
|
|
|
|
|
|
|
|
|
# Combine name
|
|
|
|
|
last = user.get('last_name', '')
|
|
|
|
|
first = user['first_name']
|
|
|
|
|
name = first + (f' {last}' if last else '')
|
|
|
|
|
|
|
|
|
|
# Create user reference link
|
|
|
|
|
username = user.get('username', '')
|
|
|
|
|
uid = user.get('id', '')
|
|
|
|
|
link = f'tg://resolve?domain={username}' if username else f'tg://user?id={uid}'
|
|
|
|
|
|
|
|
|
|
return f"[{name}]({link})"
|
2020-10-26 02:11:53 +08:00
|
|
|
|
|
|
|
|
|
|
2021-08-06 14:00:01 +08:00
|
|
|
|
def parse_command(command):
|
|
|
|
|
parsed = list(parser.search(command).groups())
|
|
|
|
|
predicate = parsed[1]
|
2020-11-29 19:11:19 +08:00
|
|
|
|
for escape in escaping:
|
2021-08-06 14:00:01 +08:00
|
|
|
|
predicate = predicate.replace(escape, escape[1:])
|
|
|
|
|
result = {'predicate': markdownEscape(predicate), 'complement': markdownEscape(parsed[2]), 'swap': parsed[0] != '/'}
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def get_text(mention_from, mention_rpl, command):
|
|
|
|
|
if command['predicate'] == 'me':
|
|
|
|
|
return f"{mention_from}{bool(command['complement'])*' '}{command['complement']}!"
|
|
|
|
|
elif command['predicate'] == 'you':
|
|
|
|
|
return f"{mention_rpl}{bool(command['complement'])*' '}{command['complement']}!"
|
|
|
|
|
elif command['complement']:
|
|
|
|
|
return f"{mention_from} {command['predicate']} {mention_rpl} {command['complement']}!"
|
2020-10-26 16:08:50 +08:00
|
|
|
|
else:
|
2021-08-06 14:00:01 +08:00
|
|
|
|
return f"{mention_from} {command['predicate']} 了 {mention_rpl}!"
|
2020-10-26 16:08:50 +08:00
|
|
|
|
|
|
|
|
|
|
2020-10-26 02:11:53 +08:00
|
|
|
|
def reply(update, context):
|
2020-11-06 02:55:01 +08:00
|
|
|
|
print(update.to_dict())
|
2020-10-26 02:11:53 +08:00
|
|
|
|
msg = update.to_dict()['message']
|
2021-08-06 14:00:01 +08:00
|
|
|
|
from_user, rpl_user, reply_self = get_users(msg)
|
2020-10-26 15:36:12 +08:00
|
|
|
|
|
2021-08-06 14:00:01 +08:00
|
|
|
|
command = parse_command(del_username.sub('', msg['text']))
|
|
|
|
|
if command['swap'] and not reply_self:
|
|
|
|
|
(from_user, rpl_user) = (rpl_user, from_user)
|
2020-10-26 15:36:12 +08:00
|
|
|
|
|
2021-08-05 00:52:33 +08:00
|
|
|
|
text = get_text(mention(from_user), mention(rpl_user), command)
|
2020-11-10 01:46:55 +08:00
|
|
|
|
print(text, end='\n\n')
|
2020-10-26 02:11:53 +08:00
|
|
|
|
|
2020-10-26 15:36:12 +08:00
|
|
|
|
update.effective_message.reply_text(text, parse_mode='Markdown')
|
2020-10-26 02:11:53 +08:00
|
|
|
|
|
|
|
|
|
|
2021-08-05 11:47:04 +08:00
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
updater = Updater(token=Token, use_context=True)
|
2021-08-06 14:00:01 +08:00
|
|
|
|
del_username = re.compile('@' + updater.bot.username, re.I)
|
2021-08-05 11:47:04 +08:00
|
|
|
|
dp = updater.dispatcher
|
|
|
|
|
dp.add_handler(MessageHandler(Filters.regex(parser), reply))
|
|
|
|
|
|
|
|
|
|
updater.start_polling()
|
|
|
|
|
updater.idle()
|