Merge pull request #2 from hykilpikonna/main

[PR] Add mention feature
This commit is contained in:
Rongrong 2021-08-05 18:30:05 +08:00 committed by GitHub
commit 8a2e7b5eb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 17 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.idea

View File

@ -1,5 +1,8 @@
import os import os
import re import re
from typing import List, Dict, Union
import requests
from telegram.ext import Updater, MessageHandler, filters from telegram.ext import Updater, MessageHandler, filters
TELEGRAM = 777000 TELEGRAM = 777000
@ -15,6 +18,12 @@ else:
raise Exception('no token') raise Exception('no token')
# Find someone's full name by their username
def find_name_by_username(username: str) -> str:
r = requests.get(f'https://t.me/{username.replace("@", "")}')
return re.search('(?<=<meta property="og:title" content=").*(?=")', r.text, re.IGNORECASE).group(0)
def get_user(msg): def get_user(msg):
if msg['from']['id'] == TELEGRAM: if msg['from']['id'] == TELEGRAM:
return {'first_name': msg['forward_from_chat']['title'], 'id': msg['forward_from_chat']['id']} return {'first_name': msg['forward_from_chat']['title'], 'id': msg['forward_from_chat']['id']}
@ -31,17 +40,45 @@ def get_users(msg):
else: else:
msg_rpl = msg_from.copy() msg_rpl = msg_from.copy()
from_user, rpl_user = get_user(msg_from), get_user(msg_rpl) from_user, rpl_user = get_user(msg_from), get_user(msg_rpl)
# Not replying to anything
if rpl_user == from_user: if rpl_user == from_user:
rpl_user = {'first_name': '自己', 'id': rpl_user['id']}
# Detect if the message contains a mention. If it has, use the mentioned user.
entities: List[Dict[str, Union[str, int]]] = msg['entities']
mentions = [e for e in entities if e['type'] == 'mention']
if mentions:
# Find username
offset = mentions[0]['offset']
length = mentions[0]['length']
text = msg['text']
username = text[offset : offset + length]
rpl_user = {'first_name': find_name_by_username(username), 'username': username}
# Remove mention from message text
msg['text'] = text[:offset] + text[offset + length:]
else:
rpl_user = {'first_name': '自己', 'id': rpl_user['id']}
return from_user, rpl_user return from_user, rpl_user
def mention(user): # Create mention string from user
space = ' ' def mention(user: Dict[str, str]) -> str:
if 'last_name' not in user:
user['last_name'] = '' # Combine name
space = '' last = user.get('last_name', '')
return f"[{user['first_name']}{space}{user['last_name']}](tg://user?id={user['id']})" first = user['first_name']
name = first + (f' {last}' if last else '')
# Create user reference link
username = user.get('username', '')
uid = user.get('id', '')
link = f'tg://resolve?domain={username}' if username else f'tg://user?id={uid}'
return f"[{name}]({link})"
def get_text(mention_from, mention_rpl, command): def get_text(mention_from, mention_rpl, command):
@ -61,21 +98,23 @@ def get_text(mention_from, mention_rpl, command):
def reply(update, context): def reply(update, context):
print(update.to_dict()) print(update.to_dict())
msg = update.to_dict()['message'] msg = update.to_dict()['message']
command = msg['text']
from_user, rpl_user = get_users(msg) from_user, rpl_user = get_users(msg)
mention_from, mention_rpl = mention(from_user), mention(rpl_user) # Escape markdown
command = msg['text']
command = command.replace("_", "\\_").replace("*", "\\*").replace("[", "\\[").replace("`", "\\`")
text = get_text(mention_from, mention_rpl, command) text = get_text(mention(from_user), mention(rpl_user), command)
print(text, end='\n\n') print(text, end='\n\n')
update.effective_message.reply_text(text, parse_mode='Markdown') update.effective_message.reply_text(text, parse_mode='Markdown')
updater = Updater(token=Token, use_context=True) if __name__ == '__main__':
delUsername = re.compile('@' + updater.bot.username, re.I) updater = Updater(token=Token, use_context=True)
dp = updater.dispatcher delUsername = re.compile('@' + updater.bot.username, re.I)
dp.add_handler(MessageHandler(Filters.regex(parser), reply)) dp = updater.dispatcher
dp.add_handler(MessageHandler(Filters.regex(parser), reply))
updater.start_polling() updater.start_polling()
updater.idle() updater.idle()

25
mtp_test.py Normal file
View File

@ -0,0 +1,25 @@
import asyncio
import socks
from telethon import TelegramClient, functions
# Configuration
proxy = (socks.SOCKS5, 'localhost', 7890)
api_id = 7000000
api_hash = '506828842c80ea408b94d1**********'
bot = TelegramClient('Bot', api_id, api_hash, proxy=proxy).start(bot_token='16679*****:***hCZ-*******RNRx9BYd-Hb**********')
async def main():
# Find user by username
user = await bot(functions.contacts.ResolveUsernameRequest(username='hykilpikonna'))
print(user)
print()
user = user.users[0]
name = user.first_name + (' ' + user.last_name if user.last_name else '')
print(name)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

View File

@ -1 +1,2 @@
python-telegram-bot python-telegram-bot
requests