mirror of
https://github.com/Rongronggg9/SlashBot.git
synced 2025-02-06 17:23:28 +08:00
rewrite
This commit is contained in:
parent
0516899540
commit
393ba738d3
2
.github/workflows/publish-docker-image.yml
vendored
2
.github/workflows/publish-docker-image.yml
vendored
@ -21,5 +21,5 @@ jobs:
|
|||||||
uses: docker/build-push-action@v2
|
uses: docker/build-push-action@v2
|
||||||
with:
|
with:
|
||||||
push: true
|
push: true
|
||||||
platforms: linux/amd64,linux/arm64
|
platforms: linux/amd64,linux/386,linux/arm64
|
||||||
tags: ${{ secrets.DOCKER_USERNAME }}/slashbot:latest
|
tags: ${{ secrets.DOCKER_USERNAME }}/slashbot:latest
|
||||||
|
@ -4,6 +4,6 @@ WORKDIR /app
|
|||||||
|
|
||||||
COPY . /app
|
COPY . /app
|
||||||
|
|
||||||
RUN pip install --trusted-host pypi.python.org -r /app/requirements.txt
|
RUN pip install --no-cache-dir --trusted-host pypi.python.org -r /app/requirements.txt
|
||||||
|
|
||||||
CMD ["python", "-u", "SlashBot.py"]
|
CMD ["python", "-u", "SlashBot.py"]
|
13
README.md
13
README.md
@ -1,17 +1,18 @@
|
|||||||
# SlashBot
|
# SlashBot
|
||||||
|
|
||||||
[![Docker Cloud Automated build](https://img.shields.io/docker/cloud/automated/rongronggg9/slashbot)](https://hub.docker.com/r/rongronggg9/slashbot)
|
[![Docker Cloud Automated build](https://img.shields.io/docker/cloud/automated/rongronggg9/slashbot)](https://hub.docker.com/r/rongronggg9/slashbot)
|
||||||
[![Docker Cloud Build Status](https://img.shields.io/docker/cloud/build/rongronggg9/slashbot)](https://hub.docker.com/r/rongronggg9/slashbot)
|
[![Docker Cloud Build Status](https://img.shields.io/docker/cloud/build/rongronggg9/slashbot)](https://hub.docker.com/r/rongronggg9/slashbot)
|
||||||
[![Docker Pulls](https://img.shields.io/docker/pulls/rongronggg9/slashbot)](https://hub.docker.com/r/rongronggg9/slashbot)
|
[![Docker Pulls](https://img.shields.io/docker/pulls/rongronggg9/slashbot)](https://hub.docker.com/r/rongronggg9/slashbot)
|
||||||
[![GitHub stars](https://img.shields.io/github/stars/Rongronggg9/SlashBot?style=social)](https://github.com/Rongronggg9/SlashBot)
|
[![GitHub stars](https://img.shields.io/github/stars/Rongronggg9/SlashBot?style=social)](https://github.com/Rongronggg9/SlashBot)
|
||||||
|
|
||||||
**[@RongSlashBot](https://t.me/RongSlashBot)**
|
**[@RongSlashBot](https://t.me/RongSlashBot)**
|
||||||
**Privacy Mode on,接收不到非指令消息,保证隐私安全性。**
|
**Privacy Mode on,接收不到非指令消息,保证隐私安全性。**
|
||||||
|
|
||||||
由于 Telegram 限制,如您的群组已有其他 bot ,请使用 Privacy Mode off 的 [@RongSlashRBot](https://t.me/RongSlashRBot) 。
|
由于 Telegram 限制,如您的群组已有其他 bot ,请使用 Privacy Mode off 的 [@RongSlashRBot](https://t.me/RongSlashRBot) 。
|
||||||
请注意,Privacy Mode off 意味着 bot 在技术上可以收到所有消息,本 bot 已经设计为不处理及不记录任何无关消息,如您疑虑安全性,
|
请注意,Privacy Mode off 意味着 bot 在技术上可以收到所有消息,本 bot 已经设计为不处理及不记录任何无关消息,如您疑虑安全性,请自行搭建或到末尾查看更多解决方案。
|
||||||
请自行搭建或到末尾查看更多解决方案。
|
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
docker create \
|
docker create \
|
||||||
--name [container name] \
|
--name [container name] \
|
||||||
@ -19,6 +20,7 @@ docker create \
|
|||||||
-e TOKEN=[bot token] \
|
-e TOKEN=[bot token] \
|
||||||
rongronggg9/slashbot
|
rongronggg9/slashbot
|
||||||
```
|
```
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
docker start [container name]
|
docker start [container name]
|
||||||
```
|
```
|
||||||
@ -26,12 +28,14 @@ docker start [container name]
|
|||||||
![](resources/example.jpg)
|
![](resources/example.jpg)
|
||||||
|
|
||||||
## 薛定谔的 Telegram Bot API
|
## 薛定谔的 Telegram Bot API
|
||||||
|
|
||||||
若您的群组存在大于一个 bot (含本 bot),本 bot 可能时常无法正常工作。
|
若您的群组存在大于一个 bot (含本 bot),本 bot 可能时常无法正常工作。
|
||||||
|
|
||||||
症状:
|
症状:
|
||||||
[@RongSlashBot](https://t.me/RongSlashBot) 突然接收不到任何指令消息,因而也无法回复。
|
[@RongSlashBot](https://t.me/RongSlashBot) 突然接收不到任何指令消息,因而也无法回复。
|
||||||
|
|
||||||
触发条件 (<u>需全部满足</u>):
|
触发条件 (<u>需全部满足</u>):
|
||||||
|
|
||||||
1. 群组内存在大于一个 bot
|
1. 群组内存在大于一个 bot
|
||||||
2. 该 bot 未被设置为管理员
|
2. 该 bot 未被设置为管理员
|
||||||
3. 该 bot Privacy Mode on
|
3. 该 bot Privacy Mode on
|
||||||
@ -39,7 +43,8 @@ docker start [container name]
|
|||||||
|
|
||||||
总体上,管理员或 Privacy Mode off 的 bot 几乎一定会收到消息,其余 bot 可能收到也可能收不到消息。
|
总体上,管理员或 Privacy Mode off 的 bot 几乎一定会收到消息,其余 bot 可能收到也可能收不到消息。
|
||||||
|
|
||||||
**规避方法 (<u>满足任一即可</u>):**
|
**规避方法 (<u>满足任一即可</u>):**
|
||||||
|
|
||||||
1. **仅保留本 bot**
|
1. **仅保留本 bot**
|
||||||
2. **将本 bot 添加为管理员(给予任一权限均可。需注意<u>该操作意味着对于本群,该 bot Privacy Mode off</u>)**
|
2. **将本 bot 添加为管理员(给予任一权限均可。需注意<u>该操作意味着对于本群,该 bot Privacy Mode off</u>)**
|
||||||
3. **换用 Privacy Mode off 的 [@RongSlashRBot](https://t.me/RongSlashRBot) (如您疑虑安全性,请自行搭建)**
|
3. **换用 Privacy Mode off 的 [@RongSlashRBot](https://t.me/RongSlashRBot) (如您疑虑安全性,请自行搭建)**
|
||||||
|
156
SlashBot.py
156
SlashBot.py
@ -1,106 +1,87 @@
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from typing import List, Dict, Union, Tuple
|
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
import telegram
|
||||||
|
from typing import Tuple, Optional, Callable
|
||||||
from telegram.ext import Updater, MessageHandler, filters
|
from telegram.ext import Updater, MessageHandler, filters
|
||||||
|
|
||||||
TELEGRAM = 777000
|
|
||||||
GROUP = 1087968824
|
|
||||||
Filters = filters.Filters
|
Filters = filters.Filters
|
||||||
parser = re.compile(r'^([\\/]_?)((?:[^ \\]|\\.)+)[ ]*(.*)$')
|
parser = re.compile(r'^([\\/]_?)((?:[^ \t\\]|\\.)+)[ \t]*(.*)$')
|
||||||
escaping = ('\\ ', '\\ ')
|
ESCAPING = ('\\ ', '\\ ', '\\\t')
|
||||||
markdownEscape = lambda s: s.replace("_", "\\_").replace("*", "\\*").replace("[", "\\[").replace("`", "\\`")
|
htmlEscape = lambda s: s.replace("<", "<").replace(">", ">").replace("&", "&")
|
||||||
|
mentionParser = re.compile(r'@([a-zA-Z]\w{4,})$')
|
||||||
|
delUsername: Optional[Callable] = None # placeholder
|
||||||
|
|
||||||
# Docker env
|
# Docker env
|
||||||
Token = os.environ.get('TOKEN')
|
Token = os.environ.get('TOKEN')
|
||||||
if not Token:
|
if not Token:
|
||||||
raise Exception('no token')
|
raise Exception('no token')
|
||||||
|
|
||||||
if os.environ.get('PROXY'):
|
telegram_proxy = os.environ.get('PROXY', '')
|
||||||
telegram_proxy = os.environ['PROXY']
|
requests_proxies = {'all': telegram_proxy} if telegram_proxy else None
|
||||||
requests_proxies = {'all': os.environ['PROXY']}
|
|
||||||
else:
|
|
||||||
telegram_proxy = ''
|
|
||||||
requests_proxies = None
|
|
||||||
|
|
||||||
|
|
||||||
# Find someone's full name by their username
|
class User:
|
||||||
def find_name_by_username(username: str) -> str:
|
def __init__(self, uid: Optional[int] = None, username: Optional[str] = None, name: Optional[str] = None):
|
||||||
r = requests.get(f'https://t.me/{username}', proxies=requests_proxies)
|
if not (uid and name) and not username:
|
||||||
return re.search('(?<=<meta property="og:title" content=").*(?=")', r.text, re.IGNORECASE).group(0)
|
raise ValueError('invalid user')
|
||||||
|
self.name = name
|
||||||
|
self.uid = uid
|
||||||
|
self.username = username
|
||||||
|
if not self.name and self.username:
|
||||||
|
self.__get_user_by_username()
|
||||||
|
|
||||||
|
def __get_user_by_username(self):
|
||||||
|
r = requests.get(f'https://t.me/{self.username}', proxies=requests_proxies)
|
||||||
|
self.name = re.search(r'(?<=<meta property="og:title" content=").*(?=")', r.text, re.IGNORECASE).group(0)
|
||||||
|
page_title = re.search(r'(?<=<title>).*(?=</title>)', r.text, re.IGNORECASE).group(0)
|
||||||
|
if page_title == self.name: # user does not exist
|
||||||
|
self.name = None
|
||||||
|
|
||||||
|
def mention(self, mention_self: bool = False) -> str:
|
||||||
|
if not self.name:
|
||||||
|
return f'@{self.username}'
|
||||||
|
|
||||||
|
mention_deep_link = (f'tg://resolve?domain={self.username}'
|
||||||
|
if (self.username and (not self.uid or self.uid < 0))
|
||||||
|
else f'tg://user?id={self.uid}')
|
||||||
|
return f'<a href ="{mention_deep_link}">{self.name if not mention_self else "自己"}</a>'
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
return (
|
||||||
|
type(self) == type(other)
|
||||||
|
and (
|
||||||
|
((self.uid or other.uid) and self.uid == other.uid) or
|
||||||
|
((self.username or other.username) and self.username == other.username)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_user(msg):
|
def get_user(msg: telegram.Message) -> User:
|
||||||
if msg['from']['id'] == TELEGRAM:
|
user = msg.sender_chat or msg.from_user
|
||||||
return {'first_name': msg['sender_chat']['title'], 'id': msg['sender_chat']['id'],
|
return User(name=user.full_name or user.title, uid=user.id, username=user.username)
|
||||||
'username': msg['sender_chat'].get('username')}
|
|
||||||
elif msg['from']['id'] == GROUP:
|
|
||||||
return {'first_name': msg['chat']['title'], 'id': msg['chat']['id'], 'username': msg['chat'].get('username')}
|
|
||||||
else:
|
|
||||||
return msg['from']
|
|
||||||
|
|
||||||
|
|
||||||
def get_users(msg: Dict) -> Tuple[Dict, Dict, bool, bool]:
|
def get_users(msg: telegram.Message) -> Tuple[User, User]:
|
||||||
msg_from = msg
|
msg_from = msg
|
||||||
if 'reply_to_message' in msg.keys():
|
msg_rpl = msg.reply_to_message or msg_from
|
||||||
msg_rpl = msg['reply_to_message']
|
|
||||||
else:
|
|
||||||
msg_rpl = msg_from.copy()
|
|
||||||
from_user, rpl_user = get_user(msg_from), get_user(msg_rpl)
|
from_user, rpl_user = get_user(msg_from), get_user(msg_rpl)
|
||||||
reply_self = rpl_user == from_user
|
return from_user, rpl_user
|
||||||
mentioned = False
|
|
||||||
|
|
||||||
# Not replying to anything
|
|
||||||
if reply_self:
|
|
||||||
|
|
||||||
# Detect if the message contains a mention. If it has, use the mentioned user.
|
|
||||||
entities: List[Dict[str, Union[str, int]]] = msg['entities']
|
|
||||||
mentions = [e for e in entities if e['type'] == 'mention']
|
|
||||||
if mentions:
|
|
||||||
mentioned = True
|
|
||||||
|
|
||||||
# Find username
|
|
||||||
offset = mentions[0]['offset']
|
|
||||||
length = mentions[0]['length']
|
|
||||||
text = msg['text']
|
|
||||||
username = text[offset: offset + length].replace("@", "")
|
|
||||||
rpl_user = {'first_name': find_name_by_username(username), 'username': username}
|
|
||||||
|
|
||||||
# Remove mention from message text
|
|
||||||
msg['text'] = text[:offset] + text[offset + length:]
|
|
||||||
|
|
||||||
else:
|
|
||||||
rpl_user = {'first_name': '自己', 'id': rpl_user['id']}
|
|
||||||
|
|
||||||
return from_user, rpl_user, reply_self, mentioned
|
|
||||||
|
|
||||||
|
|
||||||
# Create mention string from user
|
def parse_command(match: re.Match):
|
||||||
def mention(user: Dict[str, str]) -> str:
|
parsed = match.groups()
|
||||||
# Combine name
|
|
||||||
last = user.get('last_name', '')
|
|
||||||
first = user['first_name']
|
|
||||||
name = first + (f' {last}' if last else '')
|
|
||||||
|
|
||||||
# Create user reference link
|
|
||||||
username = user.get('username', '')
|
|
||||||
uid = user.get('id', -1)
|
|
||||||
link = f'tg://resolve?domain={username}' if (username and uid < 0) else f'tg://user?id={uid}'
|
|
||||||
|
|
||||||
return f"[{name}]({link})"
|
|
||||||
|
|
||||||
|
|
||||||
def parse_command(command):
|
|
||||||
parsed = list(parser.search(command).groups())
|
|
||||||
predicate = parsed[1]
|
predicate = parsed[1]
|
||||||
for escape in escaping:
|
for escape in ESCAPING:
|
||||||
|
predicate = delUsername('', predicate)
|
||||||
predicate = predicate.replace(escape, escape[1:])
|
predicate = predicate.replace(escape, escape[1:])
|
||||||
result = {'predicate': markdownEscape(predicate), 'complement': markdownEscape(parsed[2]), 'swap': parsed[0] != '/'}
|
result = {'predicate': htmlEscape(predicate), 'complement': htmlEscape(parsed[2]), 'swap': parsed[0] != '/'}
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def get_text(mention_from, mention_rpl, command):
|
def get_text(user_from: User, user_rpl: User, command: dict):
|
||||||
|
mention_from = user_from.mention()
|
||||||
|
mention_rpl = user_rpl.mention(mention_self=user_from == user_rpl)
|
||||||
if command['predicate'] == 'me':
|
if command['predicate'] == 'me':
|
||||||
return f"{mention_from}{bool(command['complement']) * ' '}{command['complement']}!"
|
return f"{mention_from}{bool(command['complement']) * ' '}{command['complement']}!"
|
||||||
elif command['predicate'] == 'you':
|
elif command['predicate'] == 'you':
|
||||||
@ -111,24 +92,31 @@ def get_text(mention_from, mention_rpl, command):
|
|||||||
return f"{mention_from} {command['predicate']} 了 {mention_rpl}!"
|
return f"{mention_from} {command['predicate']} 了 {mention_rpl}!"
|
||||||
|
|
||||||
|
|
||||||
def reply(update, context):
|
def reply(update: telegram.Update, context: telegram.ext.CallbackContext):
|
||||||
print(update.to_dict())
|
print(update.to_dict())
|
||||||
msg = update.to_dict()['message']
|
msg = update.effective_message
|
||||||
from_user, rpl_user, reply_self, mentioned = get_users(msg)
|
from_user, rpl_user = get_users(msg)
|
||||||
|
command = parse_command(context.match)
|
||||||
|
|
||||||
command = parse_command(del_username.sub('', msg['text']))
|
if from_user == rpl_user:
|
||||||
if command['swap'] and (not reply_self or mentioned):
|
mention_match = mentionParser.search(command['predicate'])
|
||||||
|
if mention_match:
|
||||||
|
mention = mentionParser.search(msg.text).group(1)
|
||||||
|
rpl_user = User(username=mention)
|
||||||
|
command['predicate'] = command['predicate'][:mention_match.start()]
|
||||||
|
|
||||||
|
if command['swap'] and (not from_user == rpl_user):
|
||||||
(from_user, rpl_user) = (rpl_user, from_user)
|
(from_user, rpl_user) = (rpl_user, from_user)
|
||||||
|
|
||||||
text = get_text(mention(from_user), mention(rpl_user), command)
|
text = get_text(from_user, rpl_user, command)
|
||||||
print(text, end='\n\n')
|
print(text, end='\n\n')
|
||||||
|
|
||||||
update.effective_message.reply_text(text, parse_mode='Markdown')
|
update.effective_message.reply_text(text, parse_mode='HTML')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
updater = Updater(token=Token, use_context=True, request_kwargs={'proxy_url': telegram_proxy})
|
updater = Updater(token=Token, use_context=True, request_kwargs={'proxy_url': telegram_proxy})
|
||||||
del_username = re.compile('@' + updater.bot.username, re.I)
|
delUsername = re.compile('@' + updater.bot.username, re.I).sub
|
||||||
dp = updater.dispatcher
|
dp = updater.dispatcher
|
||||||
dp.add_handler(MessageHandler(Filters.regex(parser), reply))
|
dp.add_handler(MessageHandler(Filters.regex(parser), reply))
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user