feat: multiple slashes -> random sticker

This commit is contained in:
Rongrong 2022-12-17 01:43:05 +08:00
parent 3b49ce35bd
commit 0624b6b847
No known key found for this signature in database
GPG Key ID: 1C2D45D45AB7FE94

View File

@ -6,21 +6,25 @@ import re
import requests import requests
import telegram import telegram
from loguru import logger as _logger from loguru import logger as _logger
from typing import Optional, Union, Any from typing import Optional, Union, Any, Callable
from telegram.ext import Updater, MessageHandler, filters, Dispatcher from telegram.ext import Updater, MessageHandler, filters, Dispatcher
from functools import partial from functools import partial
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from itertools import product as _product from itertools import product as _product
from itertools import starmap
from random import Random, SystemRandom from random import Random, SystemRandom
from collections import deque, Counter from collections import deque, Counter
Filters = filters.Filters Filters = filters.Filters
parser = re.compile(r'^(?P<slash>[\\/]_?\$?)' parser = re.compile(
r'^(?P<slash>[\\/]_?\$?)'
r'(?P<predicate>([^\s\\]|\\.)*((?<=\S)\\)?)' r'(?P<predicate>([^\s\\]|\\.)*((?<=\S)\\)?)'
r'(\s+(?P<complement>.+))?$') r'(\s+(?P<complement>.+))?$'
ouenParser = re.compile(r'^(' )
ouenParser = re.compile(
r'^('
r'\\ .* /' r'\\ .* /'
r'|' r'|'
r' .* ' r' .* '
@ -30,8 +34,16 @@ ouenParser = re.compile(r'^('
r'(.*\s*){2,}}' r'(.*\s*){2,}}'
r'|' r'|'
r'[/\\]{2,}' r'[/\\]{2,}'
r')$') r')$'
pinParser = re.compile(r'^[\\/]_?pin$') )
pinParser = re.compile(
r'^[\\/]_?pin$'
)
randomStickerParser = re.compile(
r'^([\\/])'
r'(_?)'
r'(\1\2?)+$'
)
convertEscapes = partial(re.compile(r'\\(\s)').sub, r'\1') convertEscapes = partial(re.compile(r'\\(\s)').sub, r'\1')
htmlEscape = lambda s: s.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;") htmlEscape = lambda s: s.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
@ -43,8 +55,23 @@ PUNCTUATION_TAIL = '.,?!;:~(' \
'。,?!;:~(' '。,?!;:~('
class Vegetable: class RandomizerMeta(type):
_counter = Counter() def __init__(cls, name, bases, namespace):
super().__init__(name, bases, namespace)
cls._counter = Counter()
class Randomizer(metaclass=RandomizerMeta):
def __class_getitem__(cls, item: str) -> Any:
collection = getattr(cls, item)
if cls._counter[item] % len(collection) == 0:
random.shuffle(collection)
cls._counter[item] += 1
collection.rotate()
return collection[-1]
class Vegetable(Randomizer):
permission_denied = deque( permission_denied = deque(
product( product(
{'我太菜了', '我好菜', '我好菜啊', '我菜死了'}, {'我太菜了', '我好菜', '我好菜啊', '我菜死了'},
@ -68,13 +95,33 @@ class Vegetable:
) )
) )
def __class_getitem__(cls, item: str):
collection = getattr(cls, item) class Stickers(Randomizer):
if cls._counter[item] % len(collection) == 0: _cnstr = partial(telegram.Sticker,
random.shuffle(collection) width=512,
cls._counter[item] += 1 height=512,
collection.rotate() is_animated=False,
return collection[-1] is_video=False,
type=telegram.Sticker.REGULAR)
stickers = deque(
starmap(
_cnstr,
(
('CAACAgUAAxkBAAInuWOcpUaH6eiL7vc9bIw6GedK-DNzAALNAQAC3x9yGXsiKdzwYgnWLAQ', 'AgADzQEAAt8fchk'),
('CAACAgUAAxkBAAInu2OcpakDQaHnVWVo_80AAVgE7EcujAAC2gEAAt8fchkAAbkxR5lrlfUsBA', 'AgAD2gEAAt8fchk'),
('CAACAgUAAxkBAAInvWOcpcUcJDWODiIxUoSTs840QJaFAALbAQAC3x9yGcc6smv9nZmELAQ', 'AgAD2wEAAt8fchk'),
('CAACAgUAAxkBAAKNaWOcn1dZP5Ooe5wX8JrCKkK2qXGzAALnAQAC3x9yGfJozMIlJl_kLAQ', 'AgAD5wEAAt8fchk'),
('CAACAgUAAxkBAAInt2Ocor-N-gnaJTGR-RtyopIgI0l5AALrAQAC3x9yGWlN_RGM1AESLAQ', 'AgAD6wEAAt8fchk'),
('CAACAgUAAxkBAAInv2Ocph3PT5XiCArmoehOYzCn1sJRAALyAQAC3x9yGSLk1aHaA09kLAQ', 'AgAD8gEAAt8fchk'),
('CAACAgUAAxkBAAInwWOcplEELbRuIBkuy3Yyv6YCScxdAAISAgAC3x9yGaW3ftfZrg8eLAQ', 'AgADEgIAAt8fchk'),
('CAACAgUAAxkBAAInw2Ocpm8VgaPVTUhDPTGHWjnDjzsDAAIbAgAC3x9yGWCDLF4OLhaMLAQ', 'AgADGwIAAt8fchk'),
)
)
)
def __new__(cls) -> telegram.Sticker:
return cls.__class_getitem__('stickers')
_logger.remove() _logger.remove()
@ -85,6 +132,16 @@ _logger.add(sys.stderr,
"|<level>{message}</level>", "|<level>{message}</level>",
level="DEBUG") level="DEBUG")
def log(func: Callable):
def wrapper(update: telegram.Update, ctx: telegram.ext.CallbackContext):
logger = ctx.bot_data['logger']
logger.debug(str(update.to_dict()))
return func(update, ctx, logger)
return wrapper
try: try:
random = SystemRandom() random = SystemRandom()
except NotImplementedError: except NotImplementedError:
@ -238,10 +295,8 @@ def reply(update: telegram.Update, ctx: telegram.ext.CallbackContext):
update.effective_message.reply_text('\u200e' + text, parse_mode='HTML') update.effective_message.reply_text('\u200e' + text, parse_mode='HTML')
def repeat(update: telegram.Update, ctx: telegram.ext.CallbackContext): @log
logger = ctx.bot_data['logger'] def repeat(update: telegram.Update, ctx: telegram.ext.CallbackContext, logger: _logger):
logger.debug(str(update.to_dict()))
chat = update.effective_chat chat = update.effective_chat
msg = update.effective_message msg = update.effective_message
if chat.id < 0: if chat.id < 0:
@ -251,10 +306,8 @@ def repeat(update: telegram.Update, ctx: telegram.ext.CallbackContext):
msg.copy(chat.id) if chat.has_protected_content else msg.forward(chat.id) msg.copy(chat.id) if chat.has_protected_content else msg.forward(chat.id)
def pin(update: telegram.Update, ctx: telegram.ext.CallbackContext): @log
logger = ctx.bot_data['logger'] def pin(update: telegram.Update, _ctx: telegram.ext.CallbackContext, logger: _logger):
logger.debug(str(update.to_dict()))
msg = update.effective_message msg = update.effective_message
msg_to_pin = msg.reply_to_message msg_to_pin = msg.reply_to_message
if not msg_to_pin: if not msg_to_pin:
@ -273,9 +326,19 @@ def pin(update: telegram.Update, ctx: telegram.ext.CallbackContext):
logger.warning(vegetable) logger.warning(vegetable)
@log
def random_sticker(update: telegram.Update, _ctx: telegram.ext.CallbackContext, logger: _logger):
msg = update.effective_message
sticker = Stickers()
logger.info(sticker)
msg.reply_sticker(sticker)
def start(token: str): def start(token: str):
updater = Updater(token=token, use_context=True, request_kwargs={'proxy_url': TELEGRAM_PROXY}) updater = Updater(token=token, use_context=True, request_kwargs={'proxy_url': TELEGRAM_PROXY})
dp: Dispatcher = updater.dispatcher dp: Dispatcher = updater.dispatcher
dp.add_handler(MessageHandler(Filters.regex(randomStickerParser) & ~Filters.update.edited_message, random_sticker,
run_async=True))
dp.add_handler(MessageHandler(Filters.regex(pinParser) & ~Filters.update.edited_message, pin, run_async=True)) dp.add_handler(MessageHandler(Filters.regex(pinParser) & ~Filters.update.edited_message, pin, run_async=True))
dp.add_handler(MessageHandler(Filters.regex(ouenParser) & ~Filters.update.edited_message, repeat, run_async=True)) dp.add_handler(MessageHandler(Filters.regex(ouenParser) & ~Filters.update.edited_message, repeat, run_async=True))
dp.add_handler(MessageHandler(Filters.regex(parser) & ~Filters.update.edited_message, reply, run_async=True)) dp.add_handler(MessageHandler(Filters.regex(parser) & ~Filters.update.edited_message, reply, run_async=True))