[TOC]
转载请注明出处:小锋学长生活大爆炸(https://xfxuezhang.blog.csdn.net/)
前言
QQ、微信是我们平常使用最多的通讯工具,网上也有很多通过软件去控制QQ/微信的开源工具,通过这些工具,我们可以实现许多有意思的效果,而不仅仅局限于消息聊天。
自从微信网页版被官方禁用后,微信的软件工具几乎已经失效了,现有的一些是通过hook微信本身来实现,这种很容易被官方检测并封号。另一些是通过注册企业号来控制,但不直观且功能受限。
这里我们借助相对更开放的QQ来制作我们的机器人,对比几款工具后,最终选择了mirai。
先放一张整体结构图:
功能清单
网上现有开源的机器人大多只是实现了类似“自动推送天气、接入图灵机器人自动聊天”等等,大多属于自娱自乐,没有发挥最大用途。
因此,我们的QQ机器人(暂且取名为“小锋仔”)是根据日常所需而制,包含常用功能且设计得易于扩展。
目前包含的功能有:
- 类似QMsg酱的消息通知
- QQ群消息转发
- 翻译查询
- 照片上传
- 实况天气
- 实时热搜
- 控制树莓派舵机
- 控制树莓派屏显
- ... ...
将来可能包含的功能有:
- 接入控制ESP32(实现智能家居控制)
接下来详细介绍如何自己搭建一个这样的QQ机器人。篇幅较长且保姆级详细,建议收藏后慢慢看。
免费领取轻量应用服务器
首先为了能运行mirai,且随时随地能连接,我们需要有一个具备公网IP的服务器。这里使用腾讯云的免费服务器。
如果你已有服务器了,可以跳过本节,当然也可以看一看下面介绍的性价比超高的服务器。
限时特惠: 腾讯云服务器 1年92、3年只要388 !做个智能机器人啥的~
入口: https://url.cn/B4K9jWu0
对比云服务器 CVM,轻量应用服务器 更聚焦于中小企业、开发者、云计算入门者、学生等用户群体。详细的对比可以看:轻量应用服务器 与云服务器 CVM 对比-产品简介-文档中心-腾讯云。
因此,对于个人学习与使用而言,轻量服务器更便宜、更实用,且性能不输。如果是想要进阶的童靴,可以上手CVM、ECS服务器。
对于还不想买的童靴,可以免费领取腾讯云提供的1个月服务器试用套餐。直接上领取步骤:
- 进入官网领取:云产品免费试用;需要选购的进:轻量应用服务器专场;不清楚怎么领取的可以看教程:腾讯云产品免费试用教程
- 领取完成后,由于后面需要用到端口,因此这里我们提前开放2个端口:8888和9966
这里腾讯云可能有个小特点。如果发现在控制台防火墙放行后,还是无法访问。需要再在服务器里放行一下端口。这里先写着,大家可以在后面一节中连接上了服务器,再回过来这里输入指令。
sudo apt install firewalld -y
sudo firewall-cmd --list-all
sudo firewall-cmd --permanent --zone=public --add-port=8888/tcp && sudo firewall-cmd --reload
sudo firewall-cmd --permanent --zone=public --add-port=9966/tcp && sudo firewall-cmd --reload
sudo systemctl start firewalld.service
SSH连接服务器
服务器初始化完成后,就可以通过SSH去连接了。这里我们可以直接使用powershell来连接,其他SSH软件我强推mobaxterm!!安装包也已经准备好了:MobaXterm.exe
- 搜索打开powershell:
- 输入以下命令连接SSH:
ssh 用户名@<公网ip>
- 或者使用MobaXterm软件:
- 先更新一下软件库:
sudo apt upgrade -y
sudo apt autoremove -y
- 一般不建议使用管理员账户,因此我们要自己新建一个账户:
sudo adduser sxf
然后将账户加入sudoers组:
sudo apt install vim
sudo vim /etc/sudoers
然后退出软件,重新用新建的账号登录即可。
至此,服务器环境就搭建完成了。
常见Ubuntu软件安装与问题修复
这篇博客里记录了很多我在使用过程中,常用软件的安装,非常详细且经过亲测,时不时也会更新内容,大家可以收藏以备下次使用。
Ubuntu20.04 + VirtualBox相关_小锋学长生活大爆炸的博客-CSDN博客
搭建mirai环境
接下来就要在服务器上搭建QQ机器人(mirai)基础环境。搭建完成后,我们就可以远程跟机器人进行交互。
官方mirai的github仓库:GitHub - mamoe/mirai: 高效率 QQ 机器人支持库
由于github是国外的,而官方已经不再支持gitee的维护,因此如果大家无法访问上面的连接,可以用我帮大家下载下来的安装包:
其他的一些文档:https://docs.mirai.mamoe.net/
官方论坛:https://mirai.mamoe.net/
下面开始正式安装:
- 先SSH连接上服务器,建议不要用root用户登录。
- 下载安装包mcl-installer-a02f711-linux-amd64:
mkdir qqbot
cd qqbot
wget http://xfxuezhang.cn/web/share/QQBot/mcl-installer-a02f711-linux-amd64
sudo chmod +x mcl-installer-a02f711-linux-amd64
此时需要输入密码(在上面选购并装完服务器后会显示,当时要求记下的)。
./mcl-installer-a02f711-linux-amd64
此时进入安装流程,弹出的几个选项都直接回车选默认即可。
- 安装完成后,还需要安装mirai-api-http。在当前页面下,继续输入:
./mcl --update-package net.mamoe:mirai-api-http --channel stable-v2 --type plugin
- 编辑_config/net.mamoe.mirai-api-http/setting.yml_配置文件 (没有则自行创建)
## 配置文件中的值,全为默认值
## 启用的 adapter, 内置有 http, ws, reverse-ws, webhook
adapters:
- http
- ws
## 是否开启认证流程, 若为 true 则建立连接时需要验证 verifyKey
## 建议公网连接时开启
enableVerify: true
verifyKey: 1234567890
## 开启一些调式信息
debug: false
## 是否开启单 session 模式, 若为 true,则自动创建 session 绑定 console 中登录的 bot
## 开启后,接口中任何 sessionKey 不需要传递参数
## 若 console 中有多个 bot 登录,则行为未定义
## 确保 console 中只有一个 bot 登陆时启用
singleMode: false
## 历史消息的缓存大小
## 同时,也是 http adapter 的消息队列容量
cacheSize: 4096
## adapter 的单独配置,键名与 adapters 项配置相同
adapterSettings:
## 详情看 http adapter 使用说明 配置
http:
# 0.0.0.0是允许远程访问,localhost只能同机器访问
host: 0.0.0.0
port: 8888
cors: ["*"]
unreadQueueMaxSize: 100
## 详情看 websocket adapter 使用说明 配置
ws:
host: localhost
port: 8080
reservedSyncId: -1
- 启动mirai即可:
./mcl
首次启动会自动下载jar包。等待启动完成后,输入"?",可以查看所有支持的mcl命令。
- 使用以下命令即可登录QQ号:
/login [password]
如果想要启动mcl后自动登录QQ号,可以用:
/autoLogin add
也可以设置不同的设备登录。
/autoLogin setConfig protocol ANDROID_PAD
它对应的配置文件其实就在:config/Console/AutoLogin.yml
现在QQ风控很严了,第一次登录很有可能遇到“需要滑动验证码”的。建议申请小号使用,以免发生不测。并且首次使用时在QQ“账号安全设置”中关闭“安全登录检查”、“陌生设备登录保护”。如果遇到验证码,可以尝试:
- 将Captcha link通过另一个QQ,发给待登录的mirai-QQ,手机登录mirai-QQ并点击链接,手动完成滑块验证,然后回到mobaxterm输入回车;
- 如果不行,就参考这个链接的方法:https://github.com/project-mirai/mirai-login-solver-selenium;
- 还不行,再参考这个链接的方法:https://docs.mirai.mamoe.net/mirai-login-solver-selenium/;
- 还有一个小技巧可以尝试。在手机端先通过手机号登录QQ,如果没问题,再通过手机号在mirai上登录。手机建议先登录上mirai-QQ,有时可能会弹窗提示“是否允许陌生设备登录”等等,要手动点确认的。
- 另外,最新申请的QQ号,一般可以成功登录mirai。
- 如果以上都不行,目前的终极方案是使用miraiAndroid:MiraiAndroid:
- 在手机上的MiraiAndroid登录QQ后导出device.json
- 将cache目录下的3个文件account.secrets、servers.json、session.bin也复制出来
- 接下来点击左上角, 再点击“工具”。选择你机器人的账号, 选择 导出 DEVICE.JSON 将其导出。
- 再次回到服务器端,进入 “bots/<你的QQ号>” 下面, 将导出的 device.json 复制放入。对应的cache文件夹也复制放入。
- 再次执行 ./mcl 启动 mirai-console 看看效果。
- 若仍有问题,欢迎加入文末Q群交流。
- 至此,mcl就已经能正常接收QQ消息了。而我们的实现代码对mcl的控制,就是通过mirai-api-http插件来实现的。根据上面第4步配置的_setting.yml_文件,再参考官方API文档和HttpAdapter文档,即可实现互联互通。(讲起来比较麻烦,no bibi,后面直接show me the code)。
Python控制mirai篇
当服务器成功运行了mirai后,我们就可以在本地进行Python脚本的编写了。由于最新的mirai-api-http变更过接口规范,因此网上某些一两年前的代码已经失效了。本教程对应的mirai-api-http使用的是最新的2.x版本。
接下来的操作,都默认已经完成“启动mcl并login了QQ号”。
在上面setting.yml中,有两个配置项值得注意,他是我们脚本可以控制的密钥:
verifyKey: 1234567890
http: port: 8888
debug输出封装
简单封装下。直接用print也是可以的。
class Logger:
def __init__(self, level='debug'):
self.level = level
def DebugLog(self, *args):
if self.level == 'debug':
print(*args)
def TraceLog(self, *args):
if self.level == 'trace':
print(*args)
def setDebugLevel(self, level):
self.level = level.lower()
交互授权
在交互前,脚本需要先向mirai获取一个verifyKey,之后在每个请求时候,都需要带上这个key,也叫session。其中,参数auth_key对应了上面setting.yml里的verifyKey。
auth_key = '1234567890'
def verifySession(self, auth_key):
"""每个Session只能绑定一个Bot,但一个Bot可有多个Session。
session Key在未进行校验的情况下,一定时间后将会被自动释放"""
data = {"verifyKey": auth_key}
url = self.addr+'verify'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
return res['session']
return None
绑定bot
使用此方法校验并激活你的Session,同时将Session与一个已登录的Bot绑定。
qq = '121215' # mirai登录的那个QQ
session = 'grge8484' # 上面verifySession函数的返回值
def bindSession(self, session, qq):
"""校验并激活Session,同时将Session与一个已登录的Bot绑定"""
data = {"sessionKey": session, "qq": qq}
url = self.addr + 'bind'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
self.session = session
return True
return False
释放bot
使用此方式释放session及其相关资源(Bot不会被释放)
def releaseSession(self, session, qq):
"""不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
data = {"sessionKey": session, "qq": qq}
url = self.addr + 'release'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
return True
return False
未读消息的数量
获取当前有多少条未读消息。
def getMessageCount(self, session):
url = self.addr + 'countMessage?sessionKey='+session
res = requests.get(url).json()
if res['code'] == 0:
return res['data']
return 0
获取最新的消息
获取消息后会从队列中移除。
def fetchLatestMessage(self, session):
url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
res = requests.get(url).json()
if res['code'] == 0:
return res['data']
return None
解析消息内容
简单实现了部分消息类型的解析,会有消息丢失,请根据使用需求自行调整。
data = 'xxx' # 可以是上面getMsgFromGroup函数的返回值
def parseGroupMsg(self, data):
res = []
if data is None:
return res
for item in data:
if item['type'] == 'GroupMessage':
type = item['messageChain'][-1]['type']
if type == 'Image':
text = item['messageChain'][-1]['url']
elif type == 'Plain':
text = item['messageChain'][-1]['text']
elif type == 'Face':
text = item['messageChain'][-1]['faceId']
else:
logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
continue
name = item['sender']['memberName']
group_id = str(item['sender']['group']['id'])
group_name = item['sender']['group']['name']
res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
return res
向好友发送消息
向指定好友发送消息。
def sendFriendMessage(self, session, qq, msg):
msg_list = msg.split(r'\n')
msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
data = {
"sessionKey": session,
"target": qq,
"messageChain": msg_chain
}
url = self.addr + 'sendFriendMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 发送失败")
return 0
if res['code'] == 0:
return res['messageId']
return 0
向群发送消息
也只是简单实现。
def sendMsgToGroup(self, session, group, msg):
text = msg['text']
type = msg['type']
name = msg['name']
group_id = msg['groupId']
group_name = msg['groupName']
content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
name, group_id, group_name, text)
content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
name, group_id, group_name)
logger.DebugLog(">> 消息类型:" + type)
if type == 'Plain':
message = [{"type": type, "text": content1}]
elif type == 'Image':
message = [
{"type": 'Plain', "text": content2},
{"type": type, "url": text}]
elif type == 'Face':
message = [{"type": 'Plain', "text": content2},
{"type": type, "faceId": text}]
else:
logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
return 0
data = {
"sessionKey": session,
"group": group,
"messageChain": message
}
logger.DebugLog(">> 消息内容:" + str(data))
url = self.addr + 'sendGroupMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 转发失败")
return 0
logger.DebugLog(">> 请求返回:" + str(res))
if res['code'] == 0:
return res['messageId']
return 0
向群发送富文本消息
跟上面的差不多,消息类型变了一下,从而支持类似HTML形式的消息发送。
def sendPlainTextToGroup(self, session, group, msg):
msg_list = msg.split(r'\n')
msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
data = {
"sessionKey": session,
"group": group,
"messageChain": msg_chain
}
url = self.addr + 'sendGroupMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 转发失败")
return 0
logger.DebugLog(">> 请求返回:" + str(res))
if res['code'] == 0:
return res['messageId']
return 0
以上就是几个简单、常用的函数。基于这些函数,就已经可以实现蛮多有趣的功能了。
Q群消息转发
这部分可以直接参考之前的博客:Q群消息转发例程。其实也就是把上面的函数整合一下,放一个完整版:
import requests
from time import sleep
class Logger:
def __init__(self, level='debug'):
self.level = level
def DebugLog(self, *args):
if self.level == 'debug':
print(*args)
def TraceLog(self, *args):
if self.level == 'trace':
print(*args)
def setDebugLevel(self, level):
self.level = level.lower()
logger = Logger()
class QQBot:
def __init__(self):
self.addr = 'http://43.143.12.250:8888/'
self.session = None
def verifySession(self, auth_key):
"""每个Session只能绑定一个Bot,但一个Bot可有多个Session。
session Key在未进行校验的情况下,一定时间后将会被自动释放"""
data = {"verifyKey": auth_key}
url = self.addr+'verify'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
return res['session']
return None
def bindSession(self, session, qq):
"""校验并激活Session,同时将Session与一个已登录的Bot绑定"""
data = {"sessionKey": session, "qq": qq}
url = self.addr + 'bind'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
self.session = session
return True
return False
def releaseSession(self, session, qq):
"""不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
data = {"sessionKey": session, "qq": qq}
url = self.addr + 'release'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
return True
return False
def fetchLatestMessage(self, session):
url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
res = requests.get(url).json()
if res['code'] == 0:
return res['data']
return None
def parseGroupMsg(self, data):
res = []
if data is None:
return res
for item in data:
if item['type'] == 'GroupMessage':
type = item['messageChain'][-1]['type']
if type == 'Image':
text = item['messageChain'][-1]['url']
elif type == 'Plain':
text = item['messageChain'][-1]['text']
elif type == 'Face':
text = item['messageChain'][-1]['faceId']
else:
logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
continue
name = item['sender']['memberName']
group_id = str(item['sender']['group']['id'])
group_name = item['sender']['group']['name']
res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
return res
def getMessageCount(self, session):
url = self.addr + 'countMessage?sessionKey='+session
res = requests.get(url).json()
if res['code'] == 0:
return res['data']
return 0
def sendPlainTextToGroup(self, session, group, msg):
msg_list = msg.split(r'\n')
msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
data = {
"sessionKey": session,
"group": group,
"messageChain": msg_chain
}
url = self.addr + 'sendGroupMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 转发失败")
return 0
logger.DebugLog(">> 请求返回:" + str(res))
if res['code'] == 0:
return res['messageId']
return 0
def sendMsgToGroup(self, session, group, msg):
text = msg['text']
type = msg['type']
name = msg['name']
group_id = msg['groupId']
group_name = msg['groupName']
content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
name, group_id, group_name, text)
content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
name, group_id, group_name)
logger.DebugLog(">> 消息类型:" + type)
if type == 'Plain':
message = [{"type": type, "text": content1}]
elif type == 'Image':
message = [
{"type": 'Plain', "text": content2},
{"type": type, "url": text}]
elif type == 'Face':
message = [{"type": 'Plain', "text": content2},
{"type": type, "faceId": text}]
else:
logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
return 0
data = {
"sessionKey": session,
"group": group,
"messageChain": message
}
logger.DebugLog(">> 消息内容:" + str(data))
url = self.addr + 'sendGroupMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 转发失败")
return 0
logger.DebugLog(">> 请求返回:" + str(res))
if res['code'] == 0:
return res['messageId']
return 0
def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
# 对每条消息进行检查
for msg in msg_data:
group_id = msg['groupId']
# 接收的消息群正确(目前只支持 消息类型)
if group_id in receive_groups:
# 依次将消息转发到目标群
for g in send_groups:
logger.DebugLog(">> 当前群:"+g)
if g == group_id:
logger.DebugLog(">> 跳过此群")
continue
res = self.sendMsgToGroup(session, g, msg)
if res != 0:
logger.TraceLog(">> 转发成功!{}".format(g))
def sendFriendMessage(self, session, qq, msg):
msg_list = msg.split(r'\n')
msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
data = {
"sessionKey": session,
"target": qq,
"messageChain": msg_chain
}
url = self.addr + 'sendFriendMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 发送失败")
return 0
if res['code'] == 0:
return res['messageId']
return 0
def qqTransfer():
with open('conf.json', 'r+', encoding="utf-8") as f:
content = f.read()
conf = json.loads(content)
auth_key = conf['auth_key']
bind_qq = conf['bind_qq']
sleep_time = conf['sleep_time']
debug_level = conf['debug_level']
receive_groups = conf['receive_groups']
send_groups = conf['send_groups']
logger.setDebugLevel(debug_level)
session = bot.verifySession(auth_key)
logger.DebugLog(">> session: "+session)
bot.bindSession(session, bind_qq)
while True:
cnt = bot.getMessageCount(session)
if cnt:
logger.DebugLog('>> 有消息了 => {}'.format(cnt))
logger.DebugLog('获取消息内容')
data = bot.fetchLatestMessage(session)
if len(data) == 0:
logger.DebugLog('消息为空')
continue
logger.DebugLog(data)
logger.DebugLog('解析消息内容')
data = bot.parseGroupMsg(data)
logger.DebugLog(data)
logger.DebugLog('转发消息内容')
bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
# else:
# logger.DebugLog('空闲')
sleep(sleep_time)
bot.releaseSession(session, bind_qq)
其中,conf.json内容为:
{
"auth_key": "1234567890",
"bind_qq": "123456", # mirai登录的QQ (复制时记得删我)
"sleep_time": 1,
"receive_groups": ["913182235", "977307922"], # 要接受消息的群 (复制时记得删我)
"send_groups": ["913182235", "977307922"], # 要发送消息的群 (复制时记得删我)
"debug_level": "debug"
}
下面,我们就先从类似QMsg酱的消息通知开始。
类似QMsg酱的消息通知
设计目标:通过调用指定的URL,小锋仔机器人就会给指定的好友发送指定的消息。
关于QMsg酱的使用教程可以看:免费的QQ微信消息推送机器人
前面我们特地开放了9966端口,因此可以使用Flask来监听这个端口。
本着越简单越好的原则,我们把“发给好友还是群”、“目标好友或群的号”、“发送的内容”三部分都拼接到URL上,因此有:
http://43.143.12.250:9966/QQ/send/friend?target=123&msg=hello
http://43.143.12.250:9966/QQ/send/group?target=123&msg=hello
因此,代码可以写成:
from flask import Flask, request
app = Flask(__name__)
@app.route('/QQ/send/friend', methods=['GET'])
def qqListenMsgToFriend():
# 类似于Qmsg的功能
# flask做得接收HTTP请求转为QQ消息
qq = request.args.get('target', None)
msg = request.args.get('msg', None)
bot.sendFriendMessage(bot.session, qq, msg)
return 'Hello World! Friend!'
@app.route('/QQ/send/group', methods=['GET'])
def qqListenMsgToGroup():
# 类似于Qmsg的功能
# flask做得接收HTTP请求转为QQ消息
qq = request.args.get('target', None)
msg = request.args.get('msg', None)
bot.sendPlainTextToGroup(bot.session, qq, msg)
return 'Hello World! Group!'
if __name__ == '__main__':
app.run(port='9966', host='0.0.0.0')
由于Flask和小锋仔QQBot都要阻塞运行,因此稍微变动一下,让小锋仔以子线程的形式运行即可。
if __name__ == '__main__':
t = threading.Thread(target=qqTransfer)
t.setDaemon(True)
t.start()
app.run(port='9966', host='0.0.0.0')
测试一下:
http://localhost:9966/QQ/send/friend?target=1061700625&msg=hello
如果我们把这个脚本放到服务器上去运行,那么链接就变成了:
http://43.143.12.250:9966/QQ/send/friend?target=1061700625&msg=hello
当然,能发消息的前提是“先加好友”或“加群”啦。
多功能切换的实现设计
上面我们进行了简单地尝鲜。
1、从这部分开始,我们涉及的功能比较杂,为了能更好的区分功能,需要设计一个简单的交互协议。
- 我们发送的内容可以分为:功能选择 与 消息详情;
- 为了区分他俩,可以在选择功能时添加指定前缀,如“CMD+翻译”;
- 小锋仔接收到后,进入翻译模式准备;
- 发送指令详情时,就不加前缀。而小锋仔则将收到的消息进行翻译,再把结果返回。
根据以上内容,小锋仔需要记录的状态信息至少有:
class StatusStore:
def __init__(self, from_qq:int=None, is_cmd:bool=False, func_name:str=None, need_second:bool=False, msg:str=None) -> None:
self.from_qq = from_qq # 发送者的QQ号
self.is_cmd = is_cmd # 是否是指令(选择功能)
self.func_name = func_name # 选择的功能的名称
self.need_second = need_second # 是否需要经过两步:先发cmd指令,再发详细内容
self.msg = msg # 本次发送的消息内容
def detail(self):
return self.__dict__
2、并且我们设置,只有从指定QQ发过来消息,才能响应。因此在接收到消息时,需要判断对方的信息。对于好友类型的消息,mirai返回格式如消息类型说明:
{
"type": "FriendMessage",
"sender": {
"id": 123,
"nickname": "",
"remark": ""
},
"messageChain": [] // 数组,内容为下文消息类型
}
因此,我们可以从"type"和 "sender:id"入手判断。
3、我们暂时考虑只有一个主QQ能发送指令的情况。
4、定义一个类来专门管理不同功能的函数,例如:
class MultiFunction:
"""多功能函数集合"""
def __init__(self) -> None:
pass
@staticmethod
def translate(original:str, convert:str='zh2en') -> str:
return '假装是翻译结果'
@staticmethod
def uploadImage(image_path:str) -> str:
return '假装是上传结果'
@staticmethod
def weather(city:str) -> str:
return '假装是天气结果'
@staticmethod
def hotNews(status_store:StatusStore) -> str:
return '假装是热搜结果'
# 多功能函数的映射
# function: 功能对应函数名
# need_second: 是否需要经过两步:先发cmd指令,再发详细内容
# desc: 需要经过两步时,第一次返回的提示语
function_map = {
'翻译': {'function': MultiFunction.translate, 'need_second': True, 'desc': '请输入您要翻译的内容~'},
'天气': {'function': MultiFunction.weather, 'need_second': True, 'desc': '请问是哪座城市的天气呢?'},
'热搜': {'function': MultiFunction.hotNews, 'need_second': False}
}
def choiceFunction(store_obj:StatusStore):
res = ''
if function_map.get(store_obj.func_name):
res = function_map.get(store_obj.func_name)['function'](store_obj.msg)
return res
5、大致实现流程的想法是:
对应代码实现:
def analyzeFriendMsg(self, data):
if data is None or data['type'] != 'FriendMessage':
return None, None, None
sender_id = data['sender']['id']
msg_type = data['messageChain'][-1]['type']
if msg_type == 'Plain':
msg_text = data['messageChain'][-1]['text']
elif msg_type == 'Image':
msg_text = data['messageChain'][-1]['url']
else:
msg_text = ''
return sender_id, msg_type, msg_text
最终的框架就是:
def xiaofengzai():
auth_key = '1234567890' # settings.yml中的verifyKey
bind_qq = '3126229950' # mirai登录的QQ
target_qq = '1061700625' # 我们自己用的主QQ
target_qq = int(target_qq) # 接收到的消息里,QQ是int类型的
sleep_time = 1 # 轮询间隔
status_store = {}
session = bot.verifySession(auth_key)
logger.DebugLog(">> session: "+session)
bot.bindSession(session, bind_qq)
while True:
cnt = bot.getMessageCount(session)
if not cnt:
sleep(sleep_time)
continue
logger.DebugLog('>> 有消息了 => {}'.format(cnt))
logger.DebugLog('获取消息内容')
data = bot.fetchLatestMessage(session)
if len(data) == 0:
logger.DebugLog('消息为空')
sleep(sleep_time)
continue
logger.DebugLog(data)
logger.DebugLog('解析消息内容')
sender_id, msg_type, msg_text = bot.analyzeFriendMsg(data[0])
if not sender_id or sender_id != target_qq:
sleep(sleep_time)
continue
if msg_text.strip().lower().startswith('cmd'):
_, func_name = msg_text.strip().split('\n')[0].split()
func_name = func_name.strip()
store_obj = StatusStore(from_qq=sender_id, is_cmd=True, func_name=func_name)
# 不需要发两次,直接调用函数返回结果即可
func_info = function_map.get(func_name)
if not func_info:
res = '指令[{}]暂不支持'.format(func_name)
elif func_info.get('need_second'):
res = '收到你的指令:{}\n{}'.format(func_name, func_info.get('desc') or '已进入对应状态, 请继续发送详细内容')
# 添加或更新记录
status_store[sender_id] = store_obj
else:
res = '请求结果为:\n' + str(choiceFunction(store_obj))
status_store.pop(sender_id, '')
else:
res = '请先发送指令哦...'
store_obj = status_store.get(sender_id)
if store_obj and store_obj.is_cmd:
store_obj.msg = msg_text
res = '请求结果为:\n' + str(choiceFunction(store_obj))
status_store.pop(sender_id, '')
bot.sendFriendMessage(session, qq=sender_id, msg=res)
看一下效果:
至此,骨架有了,接下来开始填充功能了。
翻译查询
根据上面的骨架可知,我们只需要实现MultiFunction类下的translate函数即可。如果想快速测试函数效果,可以使用以下代码,而不用先启动mirai:
res = choiceFunction(StatusStore(func_name='翻译', msg='你好'))
print(res)
领取腾讯免费翻译API
要做翻译,最方便的就是调用API了(没错,调包侠!)。
这里使用腾讯的翻译API,可以免费领取:领取腾讯翻译API。点进链接后,往下拖到“云产品体验”专区,选择“人工智能”,下面有“机器翻译”。他的调用量是每月更新,非常的良心了。
点击“立即体验”,进入控制台界面,虽然上面显示的是“开通付费版”,但不用担心,他是有免费额度的,更何况你账户里又没充余额,哈哈哈。
支持很多类型的翻译,这次我们先选文本翻译,机器翻译 文本翻译-API 文档-文档中心-腾讯云:
我们用SDK的方式,免去了自己封装复杂的加密步骤:
pip install --upgrade tencentcloud-sdk-python
然后去获取密钥API密钥管理,记下APPID、SecretId、SecretKey:
机器人接入翻译功能
小锋仔bot结合翻译功能,直接上代码:
import json
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.tmt.v20180321 import tmt_client, models
def translate(original:str, convert:str='en'):
secretId = 'xxx' # 从API控制台获取
secretKey = 'xxx' # 从API控制台获取
AppId = 12123 # 从API控制台获取
try:
cred = credential.Credential(secretId, secretKey)
client = tmt_client.TmtClient(cred, "ap-guangzhou")
req = models.TextTranslateRequest()
params = {
"SourceText": original,
"Source": "auto",
"Target": convert,
"ProjectId": AppId
}
req.from_json_string(json.dumps(params))
resp = client.TextTranslate(req)
# print(resp.to_json_string())
return resp.TargetText
except TencentCloudSDKException as err:
print(err)
return ''
使用测试效果:
print(choiceFunction(StatusStore(func_name='翻译', msg='你好')))
# 输出:
{"TargetText": "Hello", "Source": "zh", "Target": "en", "RequestId": "a1b17f47-751e-44cd-89a5-6a22e9f2c444"}
Hello
实时天气
领取免费的和风天气API
天气部分,我们是用免费的和风天气API:实时天气 - API。
首先也要进行登录并获取KEY,这个步骤官网讲的很详细,图文并茂的,这边就不多写了,大家可以跳转过去(注意我们选的是Web API):创建应用和KEY - RESOURCE。
机器人接入天气功能
同样的,直接上代码:
def weather(city:str) -> str:
url_api_weather = 'https://devapi.qweather.com/v7/weather/now?'
url_api_geo = 'https://geoapi.qweather.com/v2/city/lookup?'
weather_key = 'xxxxx' # 和风天气控制台的key
# 实况天气
def getCityId(city_kw):
url_v2 = url_api_geo + 'location=' + city_kw + '&key=' + weather_key
city = requests.get(url_v2).json()['location'][0]
return city['id']
city_name = '广州'
city_id = getCityId(city_name)
url = url_api_weather + 'location=' + city_id + '&key=' + weather_key
res = requests.get(url).json()
text = "<天气信息获取失败>"
if res['code'] == '200' or res['code'] == 200:
text = '实时天气:\n 亲爱的 小主, 您所在的地区为 {},\n 现在的天气是 {},\n 气温 {}°, 湿度 {}%,\n 体感气温为 {}°,\n 风向 {}, 风速 {}km/h'.format(
city_name, res['now']['text'], res['now']['temp'], res['now']['humidity'], res['now']['feelsLike'], res['now']['windDir'], res['now']['windSpeed'])
return text
实时热搜
领取免费的天行热搜API
这部分用的是天行数据,免费会员每天赠送100次调用额度:今日头条新闻API接口 - 天行数据TianAPI。先注册账号,然后点击“申请接口”即可。
注意,首次注册需要在控制台完成“实名认证”和“邮箱验证”(马上通过,不需要等待审核)。
对于密钥Key,是在“控制台-数据管理-我的密钥KEY”中。
机器人接入热搜功能
同样的,直接上代码:
def hotNews(status_store:StatusStore) -> str:
tianxing_key = 'e05966abe0b054686c9f6b7d60e59a8d'
def common(data):
url = data + '?key={}'.format(tianxing_key)
res = requests.get(url).json()
return res['newslist']
res = common('http://api.tianapi.com/topnews/index')
tops = []
index = 1
for item in res:
tops.append(str(index) + '. ' + item['title'])
index += 1
return '\n'.join(tops[0:10])
照片上传
有时候我们想保存一些照片,但又不想放手机里,那我们可以做个“通过把照片发给小锋仔机器人,让小锋仔再上传到服务器或者COS上”的功能。
领取腾讯对象存储COS
还是这个链接:云产品体验 - 腾讯云,在“云产品体验-基础-对象存储COS”下面。对象存储不止可以用来存文件,这里我们只用来存图片。
- 领取完成后,进入控制台,创建存储桶:
- 配置存储桶信息,访问权限设置为“公有读私有写”,这样别人就能看到了,便于分享图片:
- 创建完成后,就可以通过Python APi去控制上传了。不过需要先安装下SDK:
pip install -U cos-python-sdk-v5
- 上传图片部分,我们现在随便拿一张图测试:
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
import os
def uploadImage(image_path:str) -> str:
bucket_id = 'image-1253093297' # 存储桶的名称
secret_id = 'xxx'
secret_key = 'xxx'
region = 'ap-guangzhou' # 存储桶的地区
token = None
scheme = 'https'
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
client = CosS3Client(config)
# 本地文件形式上传
# response = client.upload_file(
# Bucket=bucket_id,
# LocalFilePath=image_path,
# Key=image_path.split(os.sep)[-1],
# PartSize=1,
# MAXThread=10,
# EnableMD5=False
# )
# 网络文件形式上传
file_keyname = image_path.split('/')[-2] + '.jpg'
stream = requests.get(image_path)
response = client.put_object(
Bucket=bucket_id,
Body=stream,
Key=file_keyname
)
print(response['ETag'])
img_url = 'https://{}.cos.{}.myqcloud.com/{}'.format(bucket_id, region, file_keyname)
return '上传成功, ETag: {}\nURL: {}'.format(response['ETag'], img_url)
- 上传完成后,在控制台就可以看到了。
- 更多cos操作可看官方文档:对象存储 快速入门-SDK 文档-文档中心-腾讯云
机器人接入图片上传功能
通过mirai文档可知,图片消息格式为:
[{'type': 'FriendMessage', 'messageChain': [{'type': 'Source', 'id': 55312, 'time': 1662048857}, {'type': 'Image', 'imageId': '{DCAD8B29-D606-B354-117D-F39479C14FE3}.jpg', 'url': 'http://c2cpicdw.qpic.cn/offpic_new/1061700625//1061700625-141936558-DCAD8B29D606B354117DF39479C14FE3/0?term=2', 'path': None, 'base64': None}], 'sender': {'id': 1061700625, 'nickname': '热心市民', 'remark': '热心市民'}}]
因此只需要拿到里面的URL就行,而我们的analyzeFriendMsg函数就已经提取了URl了,因此啥都不用多改!!(结构好,就是方便呀~)
直接测试:
自行添加小功能函数总结
通过上面几个小功能,不难发现我们的程序在功能上很方便扩展,总结一下,就2步:
- 在MultiFunction类中添加功能函数的实现,入参尽量为字符串型,返回也为字符串型;
- 在function_map中添加函数信息;
下面提供几个好玩的接口,给大家留个作业,自己集成到机器人中去:
# 疫情信息
def getYiQing():
url = 'https://c.m.163.com/ug/api/wuhan/app/data/list-total?t={}'.format(329091037164)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/96.0.4664.110 Safari/537.36 '
}
res = requests.get(url, headers=headers).json()
total = res['data']['chinaTotal']['total']
today = res['data']['chinaTotal']['today']
a = 99
symbol_today = '+' if today['confirm'] >= 0 else ''
symbol_total = '+' if today['storeConfirm'] >= 0 else ''
symbol_input = '+' if today['input'] >= 0 else ''
confirmTotal = '累计确诊:{},较昨日:{}{}'.format(total['confirm'], symbol_today, today['confirm'])
confirmToday = '现有确诊:{},较昨日:{}{}'.format(total['confirm'] - total['dead'] - total['heal'], symbol_total, today['storeConfirm'])
inputs = '境外输入:{},较昨日:{}{}'.format(total['input'], symbol_input, today['input'])
return inputs + '\n' + confirmToday + '\n' + confirmTotal
# 历史上的今天
def getHistoryToday():
url = 'https://api.oick.cn/lishi/api.php'
res = requests.get(url).json()
historyToday = []
for item in res['result']:
historyToday.append(item['date'] + ', ' + item['title'])
return '\n'.join(random.choices(historyToday, k=3))
# 一言
def dailysentence():
url = 'https://res.abeim.cn/api-text_yiyan'
res = requests.get(url).json()
return res['content']
# 天行api
def common(data):
tianxing_key = '' # 天行key
url = data + '?key={}'.format(tianxing_key)
res = requests.get(url).json()
return res['newslist']
# 天行api - 小窍门
def dailyTips():
res = common('http://api.tianapi.com/qiaomen/index')
tipsArray = res[0]
return tipsArray['content']
# 天行api - 健康小知识
def healthTips():
res = common('http://api.tianapi.com/healthtip/index')
tipsArray = res[0]
return tipsArray['content']
如果以后功能越来越多,我们很容易记不住关键词是啥,因此,稍稍变动一下,让我们可以知道功能清单。在xiaofengzai函数这个位置添加一段代码:
if msg_text.strip() == '功能清单':
res = '目前支持的关键词有:\n' + '\n'.join(function_map.keys())
控制树莓派舵机与屏显
这部分摘自我前面的博客:
与树莓派的主要交互,这里主要有两种方式:
- 树莓派上也运行mirai。通过设置不同的protocol,是可以实现同时在线的。
- 通过MQTT通信。这个比较好用,是个物联网协议,广泛适用于IoT场景,推荐。
我的另一个大型项目“基于树莓派的智能魔镜”,它里面树莓派与手机的通信,就是通过MQTT实现的。很贴心的,B站还有配套的视频教程,欢迎来踩,哈哈哈~小锋学长生活大爆炸的个人空间。
腾讯云服务器搭建MQTT环境
树莓派由于不在身边,因此这部分暂时先略过,大家可以通过上面几篇博客自学一下,他们也都是使用到了mirai的。这里讲一下MQTT的安装,也可以参考安装EMQX MQTT。
- SSH进入我们的服务器后,输入指令:
sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
sudo apt-get update
sudo apt-get install mosquitto
sudo apt-get install mosquitto-clients
sudo apt clean
- 然后去腾讯云控制台开放下1883端口。
- 再在防火墙软件上也放行下1883端口
sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp && sudo firewall-cmd --reload
sudo systemctl start firewalld.service
- 通过Python调用MQTT的示例可以参考:Python MQTT。
- 想了解学习MQTT概念的可以参考:MQTT V3.1协议规范。
更多MQTT使用示例可以参考:
待实现功能
接入控制ESP32(实现智能家居控制)
ESP32是一块可以链接WIFI的嵌入式开发板,支持MQTT协议。这样一来,只要通过跟我们的机器人互相订阅Topic,在通过设计一套通信协议,就可以实现远程交互了。进一步地,给ESP32接入外设,就可以很容易的实现一个智能家居,而我们则可以通过QQ机器人来实现对智能家居的控制。
完整代码整理
为了方便,我们把所有需要修改的变量,都统一提取到了最前面。大家在复制过程中,务必记得都填上自己的!!
![Q073C@O[}BS4ON(A]H$C_YX.png](https://cdn.nlark.com/yuque/0/2022/png/21876370/1662051010396-4111cadd-c0f7-444e-88bf-f5ab77abd9c1.png#clientId=u1cf12aa3-c8f3-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=547&id=u1448e26e&margin=%5Bobject%20Object%5D&name=Q073C%40O%5B%7DBS4ON%28A%5DH%24C_YX.png&originHeight=903&originWidth=1013&originalType=binary&ratio=1&rotation=0&showTitle=false&size=46750&status=done&style=none&taskId=ud48045de-328e-4049-870b-8561bc1feb4&title=&width=614)
最后,贴上完整代码。由于水平有限,写的可能不是很好。也欢迎大家DIY魔改成自己的。如果有问题,欢迎加入文末Q群一起交流~~~
import json
import os
import requests
from flask import Flask, request
from time import sleep
import threading
import json
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.tmt.v20180321 import tmt_client, models
import requests
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
# ---------------------------- 变量定义区 ---------------------------- #
# 自己运行mirai的服务器IP和mirai-api-http的监听端口
mirai_server_url = 'http://43.143.12.250:8888/'
# settings.yml中的verifyKey
auth_key = '1234567890'
# mirai登录的QQ
bind_qq = 'xxx'
# 我们自己用的主QQ。接收到的消息里,QQ是int类型的
target_qq = 123123
# 腾讯COS存储桶的名称
tencent_cos_bucket_id = 'xxx'
# 腾讯COS存储桶的地区
tencent_cos_region = 'ap-guangzhou'
# 腾讯控制台的SecretId
tencent_secret_id = 'xxx'
# 腾讯控制台的SecretKey
tencent_secret_key = 'xxx'
# 腾讯机器翻译的appid
tencent_translate_AppId = xxx
# 和风天气API的key
weather_key = 'xxx'
# 天行API的key
tianxing_key = 'xxx'
# ------------------------------------------------------------------ #
class Logger:
def __init__(self, level='debug'):
self.level = level
def DebugLog(self, *args):
if self.level == 'debug':
print(*args)
def TraceLog(self, *args):
if self.level == 'trace':
print(*args)
def setDebugLevel(self, level):
self.level = level.lower()
class QQBot:
def __init__(self):
self.addr = mirai_server_url
self.session = None
def verifySession(self, auth_key):
"""每个Session只能绑定一个Bot,但一个Bot可有多个Session。
session Key在未进行校验的情况下,一定时间后将会被自动释放"""
data = {"verifyKey": auth_key}
url = self.addr+'verify'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
return res['session']
return None
def bindSession(self, session, qq):
"""校验并激活Session,同时将Session与一个已登录的Bot绑定"""
data = {"sessionKey": session, "qq": qq}
url = self.addr + 'bind'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
self.session = session
return True
return False
def releaseSession(self, session, qq):
"""不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
data = {"sessionKey": session, "qq": qq}
url = self.addr + 'release'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
return True
return False
def fetchLatestMessage(self, session):
url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
res = requests.get(url).json()
if res['code'] == 0:
return res['data']
return None
def parseGroupMsg(self, data):
res = []
if data is None:
return res
for item in data:
if item['type'] == 'GroupMessage':
type = item['messageChain'][-1]['type']
if type == 'Image':
text = item['messageChain'][-1]['url']
elif type == 'Plain':
text = item['messageChain'][-1]['text']
elif type == 'Face':
text = item['messageChain'][-1]['faceId']
else:
logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
continue
name = item['sender']['memberName']
group_id = str(item['sender']['group']['id'])
group_name = item['sender']['group']['name']
res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
return res
def getMessageCount(self, session):
url = self.addr + 'countMessage?sessionKey='+session
res = requests.get(url).json()
if res['code'] == 0:
return res['data']
return 0
def peekMessage(self, session):
url = self.addr + 'peekMessage?sessionKey='+session
res = requests.get(url).json()
if res['code'] == 0:
return res['data']
return 0
def sendPlainTextToGroup(self, session, group, msg):
msg_list = msg.split(r'\n')
msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
data = {
"sessionKey": session,
"group": group,
"messageChain": msg_chain
}
url = self.addr + 'sendGroupMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 转发失败")
return 0
logger.DebugLog(">> 请求返回:" + str(res))
if res['code'] == 0:
return res['messageId']
return 0
def sendMsgToGroup(self, session, group, msg):
text = msg['text']
type = msg['type']
name = msg['name']
group_id = msg['groupId']
group_name = msg['groupName']
content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
name, group_id, group_name, text)
content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
name, group_id, group_name)
logger.DebugLog(">> 消息类型:" + type)
if type == 'Plain':
message = [{"type": type, "text": content1}]
elif type == 'Image':
message = [
{"type": 'Plain', "text": content2},
{"type": type, "url": text}]
elif type == 'Face':
message = [{"type": 'Plain', "text": content2},
{"type": type, "faceId": text}]
else:
logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
return 0
data = {
"sessionKey": session,
"group": group,
"messageChain": message
}
logger.DebugLog(">> 消息内容:" + str(data))
url = self.addr + 'sendGroupMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 转发失败")
return 0
logger.DebugLog(">> 请求返回:" + str(res))
if res['code'] == 0:
return res['messageId']
return 0
def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
# 对每条消息进行检查
for msg in msg_data:
group_id = msg['groupId']
# 接收的消息群正确(目前只支持 消息类型)
if group_id in receive_groups:
# 依次将消息转发到目标群
for g in send_groups:
logger.DebugLog(">> 当前群:"+g)
if g == group_id:
logger.DebugLog(">> 跳过此群")
continue
res = self.sendMsgToGroup(session, g, msg)
if res != 0:
logger.TraceLog(">> 转发成功!{}".format(g))
def sendFriendMessage(self, session, qq, msg):
msg_list = msg.split(r'\n')
msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
data = {
"sessionKey": session,
"target": qq,
"messageChain": msg_chain
}
url = self.addr + 'sendFriendMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 发送失败")
return 0
if res['code'] == 0:
return res['messageId']
return 0
def analyzeFriendMsg(self, data):
if data is None or data['type'] != 'FriendMessage':
return None, None, None
sender_id = data['sender']['id']
msg_type = data['messageChain'][-1]['type']
if msg_type == 'Plain':
msg_text = data['messageChain'][-1]['text']
elif msg_type == 'Image':
msg_text = data['messageChain'][-1]['url']
else:
msg_text = ''
return sender_id, msg_type, msg_text
logger = Logger()
bot = QQBot()
app = Flask(__name__)
def qqTransfer():
with open('conf.json', 'r+', encoding="utf-8") as f:
content = f.read()
conf = json.loads(content)
auth_key = conf['auth_key']
bind_qq = conf['bind_qq']
sleep_time = conf['sleep_time']
debug_level = conf['debug_level']
receive_groups = conf['receive_groups']
send_groups = conf['send_groups']
logger.setDebugLevel(debug_level)
session = bot.verifySession(auth_key)
logger.DebugLog(">> session: "+session)
bot.bindSession(session, bind_qq)
while True:
cnt = bot.getMessageCount(session)
if cnt:
logger.DebugLog('>> 有消息了 => {}'.format(cnt))
logger.DebugLog('获取消息内容')
data = bot.fetchLatestMessage(session)
if len(data) == 0:
logger.DebugLog('消息为空')
continue
logger.DebugLog(data)
logger.DebugLog('解析消息内容')
data = bot.parseGroupMsg(data)
logger.DebugLog(data)
logger.DebugLog('转发消息内容')
bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
# else:
# logger.DebugLog('空闲')
sleep(sleep_time)
bot.releaseSession(session, bind_qq)
class StatusStore:
def __init__(self, from_qq:int=None, is_cmd:bool=False, func_name:str=None, need_second:bool=False, msg:str=None) -> None:
self.from_qq = from_qq # 发送者的QQ号
self.is_cmd = is_cmd # 是否是指令(选择功能)
self.func_name = func_name # 选择的功能的名称
self.need_second = need_second # 是否需要经过两步:先发cmd指令,再发详细内容
self.msg = msg # 本次发送的消息内容
def detail(self):
return self.__dict__
class MultiFunction:
"""多功能函数集合"""
def __init__(self) -> None:
pass
@staticmethod
def translate(original:str, convert:str='en'):
try:
cred = credential.Credential(tencent_secret_id, tencent_secret_key)
client = tmt_client.TmtClient(cred, "ap-guangzhou")
req = models.TextTranslateRequest()
params = {
"SourceText": original,
"Source": "auto",
"Target": convert,
"ProjectId": tencent_translate_AppId
}
req.from_json_string(json.dumps(params))
resp = client.TextTranslate(req)
# print(resp.to_json_string())
return resp.TargetText
except TencentCloudSDKException as err:
print(err)
return ''
@staticmethod
def uploadImage(image_path:str) -> str:
token = None
scheme = 'https'
config = CosConfig(Region=tencent_cos_region, SecretId=tencent_secret_id, SecretKey=tencent_secret_key, Token=token, Scheme=scheme)
client = CosS3Client(config)
# 本地文件形式上传
# response = client.upload_file(
# Bucket=bucket_id,
# LocalFilePath=image_path,
# Key=image_path.split(os.sep)[-1],
# PartSize=1,
# MAXThread=10,
# EnableMD5=False
# )
# 网络文件形式上传
file_keyname = image_path.split('/')[-2] + '.jpg'
stream = requests.get(image_path)
response = client.put_object(
Bucket=tencent_cos_bucket_id,
Body=stream,
Key=file_keyname
)
print(response['ETag'])
img_url = 'https://{}.cos.{}.myqcloud.com/{}'.format(tencent_cos_bucket_id, tencent_cos_region, file_keyname)
return '上传成功, ETag: {}\nURL: {}'.format(response['ETag'], img_url)
@staticmethod
def weather(city_name:str='广州') -> str:
url_api_weather = 'https://devapi.qweather.com/v7/weather/now?'
url_api_geo = 'https://geoapi.qweather.com/v2/city/lookup?'
# 实况天气
def getCityId(city_kw):
url_v2 = url_api_geo + 'location=' + city_kw + '&key=' + weather_key
city = requests.get(url_v2).json()['location'][0]
return city['id']
city_id = getCityId(city_name)
url = url_api_weather + 'location=' + city_id + '&key=' + weather_key
res = requests.get(url).json()
text = "<天气信息获取失败>"
print(res)
if res['code'] == '200' or res['code'] == 200:
text = '实时天气:\n 亲爱的 小主, 您所在的地区为 {},\n 现在的天气是 {},\n 气温 {}°, 湿度 {}%,\n 体感气温为 {}°,\n 风向 {}, 风速 {}km/h'.format(
city_name, res['now']['text'], res['now']['temp'], res['now']['humidity'], res['now']['feelsLike'], res['now']['windDir'], res['now']['windSpeed'])
return text
@staticmethod
def hotNews(status_store:StatusStore) -> str:
def common(data):
url = data + '?key={}'.format(tianxing_key)
res = requests.get(url).json()
return res['newslist']
res = common('http://api.tianapi.com/topnews/index')
tops = []
index = 1
for item in res:
tops.append(str(index) + '. ' + item['title'])
index += 1
return '\n'.join(tops[0:10])
# 多功能函数的映射
# function: 功能对应函数名
# need_second: 是否需要经过两步:先发cmd指令,再发详细内容
# desc: 需要经过两步时,第一次返回的提示语
function_map = {
'翻译': {'function': MultiFunction.translate, 'need_second': True, 'desc': '请输入您要翻译的内容~'},
'天气': {'function': MultiFunction.weather, 'need_second': True, 'desc': '请问是哪座城市的天气呢?'},
'热搜': {'function': MultiFunction.hotNews, 'need_second': False},
'上传图片': {'function': MultiFunction.uploadImage, 'need_second': True, 'desc': '请发送图片过来吧~'},
}
def choiceFunction(store_obj:StatusStore):
res = ''
if function_map.get(store_obj.func_name):
res = function_map.get(store_obj.func_name)['function'](store_obj.msg)
return res
def xiaofengzai():
sleep_time = 1 # 轮询间隔
status_store = {}
session = bot.verifySession(auth_key)
logger.DebugLog(">> session: "+session)
bot.bindSession(session, bind_qq)
while True:
cnt = bot.getMessageCount(session)
if not cnt:
sleep(sleep_time)
continue
logger.DebugLog('>> 有消息了 => {}'.format(cnt))
logger.DebugLog('获取消息内容')
data = bot.fetchLatestMessage(session)
if len(data) == 0:
logger.DebugLog('消息为空')
sleep(sleep_time)
continue
logger.DebugLog(data)
logger.DebugLog('解析消息内容')
sender_id, msg_type, msg_text = bot.analyzeFriendMsg(data[0])
if not sender_id or sender_id != target_qq:
sleep(sleep_time)
continue
if msg_text.strip() == '功能清单':
res = '目前支持的关键词有:\n' + '\n'.join(function_map.keys())
elif msg_text.strip().lower().startswith('cmd'):
_, func_name = msg_text.strip().split('\n')[0].split()
func_name = func_name.strip()
store_obj = StatusStore(from_qq=sender_id, is_cmd=True, func_name=func_name)
# 不需要发两次,直接调用函数返回结果即可
func_info = function_map.get(func_name)
if not func_info:
res = '指令[{}]暂不支持'.format(func_name)
elif func_info.get('need_second'):
res = '收到你的指令:{}\n{}'.format(func_name, func_info.get('desc') or '已进入对应状态, 请继续发送详细内容')
# 添加或更新记录
status_store[sender_id] = store_obj
else:
res = '请求结果为:\n' + str(choiceFunction(store_obj))
status_store.pop(sender_id, '')
else:
res = '请先发送指令哦...'
store_obj = status_store.get(sender_id)
if store_obj and store_obj.is_cmd:
store_obj.msg = msg_text
res = '请求结果为:\n' + str(choiceFunction(store_obj))
status_store.pop(sender_id, '')
bot.sendFriendMessage(session, qq=sender_id, msg=res)
@app.route('/QQ/send', methods=['GET'])
def qqListenMsg():
# 类似于Qmsg的功能
# flask做得接收HTTP请求转为QQ消息
qq = request.args.get('target', None)
msg = request.args.get('msg', None)
bot.sendFriendMessage(bot.session, qq, msg)
return 'Hello World!'
@app.route('/QQ/send/friend', methods=['GET'])
def qqListenMsgToFriend():
# 类似于Qmsg的功能
# flask做得接收HTTP请求转为QQ消息
qq = request.args.get('target', None)
msg = request.args.get('msg', None)
bot.sendFriendMessage(bot.session, qq, msg)
return 'Hello World! Friend!'
@app.route('/QQ/send/group', methods=['GET'])
def qqListenMsgToGroup():
# 类似于Qmsg的功能
# flask做得接收HTTP请求转为QQ消息
qq = request.args.get('target', None)
msg = request.args.get('msg', None)
bot.sendPlainTextToGroup(bot.session, qq, msg)
return 'Hello World! Group!'
if __name__ == '__main__':
t = threading.Thread(target=xiaofengzai)
t.setDaemon(True)
t.start()
# t = threading.Thread(target=qqTransfer)
# t.setDaemon(True)
# t.start()
app.run(port='9966', host='0.0.0.0')
QQ群 ID:722072237