2020-10-26 02:11:53 +08:00
|
|
|
|
import os
|
2020-10-26 16:08:50 +08:00
|
|
|
|
import re
|
2021-08-04 22:54:49 +08:00
|
|
|
|
import requests
|
2021-12-13 04:54:02 +08:00
|
|
|
|
import telegram
|
2021-12-28 22:41:32 +08:00
|
|
|
|
from typing import Tuple, Optional, Callable, Union, Dict
|
2020-10-26 16:08:50 +08:00
|
|
|
|
from telegram.ext import Updater, MessageHandler, filters
|
|
|
|
|
|
2020-10-26 02:11:53 +08:00
|
|
|
|
Filters = filters.Filters
|
2021-12-13 04:54:02 +08:00
|
|
|
|
parser = re.compile(r'^([\\/]_?)((?:[^ \t\\]|\\.)+)[ \t]*(.*)$')
|
|
|
|
|
ESCAPING = ('\\ ', '\\ ', '\\\t')
|
|
|
|
|
htmlEscape = lambda s: s.replace("<", "<").replace(">", ">").replace("&", "&")
|
2021-12-28 22:41:32 +08:00
|
|
|
|
mentionParser = re.compile(r'@([a-zA-Z]\w{4,})')
|
2021-12-13 04:54:02 +08:00
|
|
|
|
delUsername: Optional[Callable] = None # placeholder
|
2020-10-26 02:11:53 +08:00
|
|
|
|
|
|
|
|
|
# Docker env
|
2021-09-30 01:15:39 +08:00
|
|
|
|
Token = os.environ.get('TOKEN')
|
|
|
|
|
if not Token:
|
2020-10-26 02:11:53 +08:00
|
|
|
|
raise Exception('no token')
|
|
|
|
|
|
2021-12-13 04:54:02 +08:00
|
|
|
|
telegram_proxy = os.environ.get('PROXY', '')
|
|
|
|
|
requests_proxies = {'all': telegram_proxy} if telegram_proxy else None
|
2020-11-10 01:46:55 +08:00
|
|
|
|
|
2021-08-05 00:18:42 +08:00
|
|
|
|
|
2021-12-13 04:54:02 +08:00
|
|
|
|
class User:
|
|
|
|
|
def __init__(self, uid: Optional[int] = None, username: Optional[str] = None, name: Optional[str] = None):
|
|
|
|
|
if not (uid and name) and not username:
|
|
|
|
|
raise ValueError('invalid user')
|
|
|
|
|
self.name = name
|
|
|
|
|
self.uid = uid
|
|
|
|
|
self.username = username
|
|
|
|
|
if not self.name and self.username:
|
|
|
|
|
self.__get_user_by_username()
|
2021-08-05 00:18:42 +08:00
|
|
|
|
|
2021-12-13 04:54:02 +08:00
|
|
|
|
def __get_user_by_username(self):
|
|
|
|
|
r = requests.get(f'https://t.me/{self.username}', proxies=requests_proxies)
|
|
|
|
|
self.name = re.search(r'(?<=<meta property="og:title" content=").*(?=")', r.text, re.IGNORECASE).group(0)
|
|
|
|
|
page_title = re.search(r'(?<=<title>).*(?=</title>)', r.text, re.IGNORECASE).group(0)
|
|
|
|
|
if page_title == self.name: # user does not exist
|
|
|
|
|
self.name = None
|
2021-08-05 00:18:42 +08:00
|
|
|
|
|
2021-12-13 15:18:12 +08:00
|
|
|
|
def mention(self, mention_self: bool = False, pure: bool = False) -> str:
|
2021-12-13 04:54:02 +08:00
|
|
|
|
if not self.name:
|
|
|
|
|
return f'@{self.username}'
|
2021-08-05 00:57:22 +08:00
|
|
|
|
|
2021-12-13 04:54:02 +08:00
|
|
|
|
mention_deep_link = (f'tg://resolve?domain={self.username}'
|
|
|
|
|
if (self.username and (not self.uid or self.uid < 0))
|
|
|
|
|
else f'tg://user?id={self.uid}')
|
2021-12-13 15:18:12 +08:00
|
|
|
|
name = self.name if not mention_self else "自己"
|
|
|
|
|
return f'<a href ="{mention_deep_link}">{name}</a>' if not pure else name
|
2021-08-05 00:18:42 +08:00
|
|
|
|
|
2021-12-13 04:54:02 +08:00
|
|
|
|
def __eq__(self, other):
|
|
|
|
|
return (
|
|
|
|
|
type(self) == type(other)
|
|
|
|
|
and (
|
|
|
|
|
((self.uid or other.uid) and self.uid == other.uid) or
|
|
|
|
|
((self.username or other.username) and self.username == other.username)
|
|
|
|
|
)
|
|
|
|
|
)
|
2020-11-10 01:46:55 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-13 04:54:02 +08:00
|
|
|
|
def get_user(msg: telegram.Message) -> User:
|
|
|
|
|
user = msg.sender_chat or msg.from_user
|
|
|
|
|
return User(name=user.full_name or user.title, uid=user.id, username=user.username)
|
2021-08-05 00:27:25 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-13 04:54:02 +08:00
|
|
|
|
def get_users(msg: telegram.Message) -> Tuple[User, User]:
|
|
|
|
|
msg_from = msg
|
|
|
|
|
msg_rpl = msg.reply_to_message or msg_from
|
|
|
|
|
from_user, rpl_user = get_user(msg_from), get_user(msg_rpl)
|
|
|
|
|
return from_user, rpl_user
|
2020-10-26 02:11:53 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-28 22:41:32 +08:00
|
|
|
|
def parse_command(match: re.Match) -> Dict[str, Union[str, bool]]:
|
2021-12-13 04:54:02 +08:00
|
|
|
|
parsed = match.groups()
|
2021-08-06 14:00:01 +08:00
|
|
|
|
predicate = parsed[1]
|
2021-12-13 04:54:02 +08:00
|
|
|
|
for escape in ESCAPING:
|
|
|
|
|
predicate = delUsername('', predicate)
|
2021-08-06 14:00:01 +08:00
|
|
|
|
predicate = predicate.replace(escape, escape[1:])
|
2021-12-13 04:54:02 +08:00
|
|
|
|
result = {'predicate': htmlEscape(predicate), 'complement': htmlEscape(parsed[2]), 'swap': parsed[0] != '/'}
|
2021-08-06 14:00:01 +08:00
|
|
|
|
return result
|
|
|
|
|
|
2021-08-09 16:18:44 +08:00
|
|
|
|
|
2021-12-13 04:54:02 +08:00
|
|
|
|
def get_text(user_from: User, user_rpl: User, command: dict):
|
2021-12-13 15:18:12 +08:00
|
|
|
|
rpl_self = user_from == user_rpl
|
2021-12-13 04:54:02 +08:00
|
|
|
|
mention_from = user_from.mention()
|
2021-12-13 15:18:12 +08:00
|
|
|
|
mention_rpl = user_rpl.mention(mention_self=rpl_self)
|
|
|
|
|
predicate, complement = command['predicate'], command['complement']
|
|
|
|
|
|
|
|
|
|
if predicate == 'me':
|
|
|
|
|
ret = f"{mention_from}{bool(complement) * ' '}{complement}"
|
|
|
|
|
halfwidth_mark = (complement or user_from.mention(pure=True))[-1].isascii()
|
|
|
|
|
elif predicate == 'you':
|
|
|
|
|
ret = f"{mention_rpl}{bool(complement) * ' '}{complement}"
|
|
|
|
|
halfwidth_mark = (complement or user_rpl.mention(mention_self=rpl_self, pure=True))[-1].isascii()
|
|
|
|
|
elif complement:
|
|
|
|
|
ret = f"{mention_from} {predicate} {mention_rpl} {complement}"
|
|
|
|
|
halfwidth_mark = complement[-1].isascii()
|
2020-10-26 16:08:50 +08:00
|
|
|
|
else:
|
2021-12-13 15:18:12 +08:00
|
|
|
|
ret = f"{mention_from} {predicate} 了 {mention_rpl}"
|
|
|
|
|
halfwidth_mark = mention_rpl[-1].isascii()
|
|
|
|
|
ret += '!' if halfwidth_mark else '!'
|
|
|
|
|
|
|
|
|
|
return ret
|
2020-10-26 16:08:50 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-13 04:54:02 +08:00
|
|
|
|
def reply(update: telegram.Update, context: telegram.ext.CallbackContext):
|
2020-11-06 02:55:01 +08:00
|
|
|
|
print(update.to_dict())
|
2021-12-13 04:54:02 +08:00
|
|
|
|
msg = update.effective_message
|
|
|
|
|
from_user, rpl_user = get_users(msg)
|
|
|
|
|
command = parse_command(context.match)
|
|
|
|
|
|
|
|
|
|
if from_user == rpl_user:
|
|
|
|
|
mention_match = mentionParser.search(command['predicate'])
|
|
|
|
|
if mention_match:
|
|
|
|
|
mention = mentionParser.search(msg.text).group(1)
|
|
|
|
|
rpl_user = User(username=mention)
|
|
|
|
|
command['predicate'] = command['predicate'][:mention_match.start()]
|
2021-12-28 22:41:32 +08:00
|
|
|
|
else:
|
|
|
|
|
mention_match = mentionParser.search(command['complement'])
|
|
|
|
|
if mention_match:
|
|
|
|
|
mention = mentionParser.search(msg.text).group(1)
|
|
|
|
|
rpl_user = User(username=mention)
|
|
|
|
|
complement = command['complement']
|
|
|
|
|
complement = complement[:mention_match.start()] + complement[mention_match.end():]
|
|
|
|
|
command['complement'] = complement.strip()
|
2021-12-13 04:54:02 +08:00
|
|
|
|
|
|
|
|
|
if command['swap'] and (not from_user == rpl_user):
|
2021-08-06 14:00:01 +08:00
|
|
|
|
(from_user, rpl_user) = (rpl_user, from_user)
|
2020-10-26 15:36:12 +08:00
|
|
|
|
|
2021-12-13 04:54:02 +08:00
|
|
|
|
text = get_text(from_user, rpl_user, command)
|
2020-11-10 01:46:55 +08:00
|
|
|
|
print(text, end='\n\n')
|
2020-10-26 02:11:53 +08:00
|
|
|
|
|
2022-03-18 01:34:09 +08:00
|
|
|
|
update.effective_message.reply_text('\u200e' + text, parse_mode='HTML')
|
2020-10-26 02:11:53 +08:00
|
|
|
|
|
|
|
|
|
|
2021-08-05 11:47:04 +08:00
|
|
|
|
if __name__ == '__main__':
|
2021-08-09 17:57:43 +08:00
|
|
|
|
updater = Updater(token=Token, use_context=True, request_kwargs={'proxy_url': telegram_proxy})
|
2021-12-13 04:54:02 +08:00
|
|
|
|
delUsername = re.compile('@' + updater.bot.username, re.I).sub
|
2021-08-05 11:47:04 +08:00
|
|
|
|
dp = updater.dispatcher
|
|
|
|
|
dp.add_handler(MessageHandler(Filters.regex(parser), reply))
|
|
|
|
|
|
|
|
|
|
updater.start_polling()
|
|
|
|
|
updater.idle()
|