mirror of
https://github.com/Rongronggg9/SlashBot.git
synced 2025-02-06 17:23:28 +08:00
feat: multi-bot support; better logging
This commit is contained in:
parent
05d89f78ee
commit
f50954c30f
92
SlashBot.py
92
SlashBot.py
@ -1,10 +1,16 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
import re
|
import re
|
||||||
import requests
|
import requests
|
||||||
import telegram
|
import telegram
|
||||||
from typing import Tuple, Optional, Callable, Union, Dict
|
from loguru import logger as _logger
|
||||||
from telegram.ext import Updater, MessageHandler, filters
|
from typing import Optional, Union, Any
|
||||||
|
from telegram.ext import Updater, MessageHandler, filters, Dispatcher
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
from threading import Thread
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
Filters = filters.Filters
|
Filters = filters.Filters
|
||||||
parser = re.compile(r'^(?P<slash>[\\/]_?)'
|
parser = re.compile(r'^(?P<slash>[\\/]_?)'
|
||||||
@ -13,18 +19,27 @@ parser = re.compile(r'^(?P<slash>[\\/]_?)'
|
|||||||
convertEscapes = partial(re.compile(r'\\(\s)').sub, r'\1')
|
convertEscapes = partial(re.compile(r'\\(\s)').sub, r'\1')
|
||||||
htmlEscape = lambda s: s.replace("<", "<").replace(">", ">").replace("&", "&")
|
htmlEscape = lambda s: s.replace("<", "<").replace(">", ">").replace("&", "&")
|
||||||
mentionParser = re.compile(r'@([a-zA-Z]\w{4,})')
|
mentionParser = re.compile(r'@([a-zA-Z]\w{4,})')
|
||||||
delUsername: Optional[Callable] = None # placeholder
|
|
||||||
|
|
||||||
PUNCTUATION_TAIL = '.,?!;:~(' \
|
PUNCTUATION_TAIL = '.,?!;:~(' \
|
||||||
'。,?!;:~('
|
'。,?!;:~('
|
||||||
|
|
||||||
# Docker env
|
# Docker env
|
||||||
Token = os.environ.get('TOKEN')
|
TOKENS = re.compile(r'[^a-zA-Z\-_\d:]+').split(os.environ.get('TOKEN', ''))
|
||||||
if not Token:
|
if not TOKENS:
|
||||||
raise Exception('no token')
|
raise ValueError('no any valid token found')
|
||||||
|
|
||||||
telegram_proxy = os.environ.get('PROXY', '')
|
TELEGRAM_PROXY = os.environ.get('PROXY', '')
|
||||||
requests_proxies = {'all': telegram_proxy} if telegram_proxy else None
|
REQUEST_PROXIES = {'all': TELEGRAM_PROXY} if TELEGRAM_PROXY else None
|
||||||
|
|
||||||
|
_logger.remove()
|
||||||
|
_logger.add(sys.stderr,
|
||||||
|
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green>"
|
||||||
|
"|<level>{level:^8}</level>"
|
||||||
|
"|<cyan>{extra[username]:^15}</cyan>"
|
||||||
|
"|<level>{message}</level>",
|
||||||
|
level="DEBUG")
|
||||||
|
|
||||||
|
_updaters: list[Updater] = []
|
||||||
|
|
||||||
|
|
||||||
class User:
|
class User:
|
||||||
@ -38,7 +53,7 @@ class User:
|
|||||||
self.__get_user_by_username()
|
self.__get_user_by_username()
|
||||||
|
|
||||||
def __get_user_by_username(self):
|
def __get_user_by_username(self):
|
||||||
r = requests.get(f'https://t.me/{self.username}', proxies=requests_proxies)
|
r = requests.get(f'https://t.me/{self.username}', proxies=REQUEST_PROXIES)
|
||||||
self.name = re.search(r'(?<=<meta property="og:title" content=").*(?=")', r.text, re.IGNORECASE).group(0)
|
self.name = re.search(r'(?<=<meta property="og:title" content=").*(?=")', r.text, re.IGNORECASE).group(0)
|
||||||
page_title = re.search(r'(?<=<title>).*(?=</title>)', r.text, re.IGNORECASE).group(0)
|
page_title = re.search(r'(?<=<title>).*(?=</title>)', r.text, re.IGNORECASE).group(0)
|
||||||
if page_title == self.name: # user does not exist
|
if page_title == self.name: # user does not exist
|
||||||
@ -69,20 +84,21 @@ def get_user(msg: telegram.Message) -> User:
|
|||||||
return User(name=user.full_name or user.title, uid=user.id, username=user.username)
|
return User(name=user.full_name or user.title, uid=user.id, username=user.username)
|
||||||
|
|
||||||
|
|
||||||
def get_users(msg: telegram.Message) -> Tuple[User, User]:
|
def get_users(msg: telegram.Message) -> tuple[User, User]:
|
||||||
msg_from = msg
|
msg_from = msg
|
||||||
msg_rpl = msg.reply_to_message or msg_from
|
msg_rpl = msg.reply_to_message or msg_from
|
||||||
from_user, rpl_user = get_user(msg_from), get_user(msg_rpl)
|
from_user, rpl_user = get_user(msg_from), get_user(msg_rpl)
|
||||||
return from_user, rpl_user
|
return from_user, rpl_user
|
||||||
|
|
||||||
|
|
||||||
def parse_command(match: re.Match) -> Dict[str, Union[str, bool]]:
|
def parse_command(ctx: telegram.ext.CallbackContext) -> dict[str, Union[str, bool]]:
|
||||||
|
match = ctx.match
|
||||||
parsed = match.groupdict()
|
parsed = match.groupdict()
|
||||||
predicate = parsed['predicate']
|
predicate = parsed['predicate']
|
||||||
omit_le = predicate.endswith('\\')
|
omit_le = predicate.endswith('\\')
|
||||||
predicate = predicate[:-1] if omit_le else predicate
|
predicate = predicate[:-1] if omit_le else predicate
|
||||||
predicate = convertEscapes(predicate)
|
predicate = convertEscapes(predicate)
|
||||||
predicate = delUsername(predicate)
|
predicate = ctx.bot_data['delUsername'](predicate)
|
||||||
result = {'predicate': htmlEscape(predicate),
|
result = {'predicate': htmlEscape(predicate),
|
||||||
'complement': htmlEscape(parsed['complement'] or ''),
|
'complement': htmlEscape(parsed['complement'] or ''),
|
||||||
'slash': parsed['slash'],
|
'slash': parsed['slash'],
|
||||||
@ -124,11 +140,12 @@ def get_text(user_from: User, user_rpl: User, command: dict):
|
|||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
def reply(update: telegram.Update, context: telegram.ext.CallbackContext):
|
def reply(update: telegram.Update, ctx: telegram.ext.CallbackContext):
|
||||||
print(update.to_dict())
|
logger = ctx.bot_data['logger']
|
||||||
|
logger.debug(str(update.to_dict()))
|
||||||
msg = update.effective_message
|
msg = update.effective_message
|
||||||
from_user, rpl_user = get_users(msg)
|
from_user, rpl_user = get_users(msg)
|
||||||
command = parse_command(context.match)
|
command = parse_command(ctx)
|
||||||
|
|
||||||
if from_user == rpl_user:
|
if from_user == rpl_user:
|
||||||
mention_match = mentionParser.search(command['predicate'])
|
mention_match = mentionParser.search(command['predicate'])
|
||||||
@ -149,16 +166,49 @@ def reply(update: telegram.Update, context: telegram.ext.CallbackContext):
|
|||||||
(from_user, rpl_user) = (rpl_user, from_user)
|
(from_user, rpl_user) = (rpl_user, from_user)
|
||||||
|
|
||||||
text = get_text(from_user, rpl_user, command)
|
text = get_text(from_user, rpl_user, command)
|
||||||
print(text, end='\n\n')
|
logger.info(text)
|
||||||
|
|
||||||
update.effective_message.reply_text('\u200e' + text, parse_mode='HTML')
|
update.effective_message.reply_text('\u200e' + text, parse_mode='HTML')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
def start(token: str):
|
||||||
updater = Updater(token=Token, use_context=True, request_kwargs={'proxy_url': telegram_proxy})
|
updater = Updater(token=token, use_context=True, request_kwargs={'proxy_url': TELEGRAM_PROXY})
|
||||||
delUsername = partial(re.compile(r'@' + updater.bot.username, re.I).sub, '')
|
dp: Dispatcher = updater.dispatcher
|
||||||
dp = updater.dispatcher
|
|
||||||
dp.add_handler(MessageHandler(Filters.regex(parser), reply, run_async=True))
|
dp.add_handler(MessageHandler(Filters.regex(parser), reply, run_async=True))
|
||||||
|
username = f'@{updater.bot.username}'
|
||||||
|
logger = _logger.bind(username=username)
|
||||||
|
dp.bot_data['delUsername'] = partial(re.compile(username, re.I).sub, '')
|
||||||
|
dp.bot_data['logger'] = logger
|
||||||
|
|
||||||
updater.start_polling()
|
updater.start_polling()
|
||||||
updater.idle()
|
logger.info('Started')
|
||||||
|
|
||||||
|
_updaters.append(updater)
|
||||||
|
# updater.idle()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
threads: list[Thread] = []
|
||||||
|
for token in TOKENS:
|
||||||
|
thread = Thread(target=start, args=(token,), daemon=True)
|
||||||
|
threads.append(thread)
|
||||||
|
thread.start()
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
sleep(1)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
threads_and_logger: list[tuple[Thread, Any]] = []
|
||||||
|
for updater in _updaters:
|
||||||
|
thread = Thread(target=updater.stop, daemon=True)
|
||||||
|
threads_and_logger.append((thread, updater.dispatcher.bot_data['logger']))
|
||||||
|
thread.start()
|
||||||
|
for thread, logger in threads_and_logger:
|
||||||
|
thread.join()
|
||||||
|
logger.info('Stopped')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
@ -1,2 +1,3 @@
|
|||||||
python-telegram-bot==13.11
|
python-telegram-bot==13.11
|
||||||
requests==2.27.1
|
requests==2.27.1
|
||||||
|
loguru==0.6.0
|
Loading…
Reference in New Issue
Block a user