进一步非常详细的教程,篇幅较长,可收藏后慢慢看:
万字长文保姆级教你制作自己的多功能QQ机器人(http://xfxuezhang.cn/index.php/archives/418/)
(以下正文开始)
安装与使用 - 详细内容来这里:
https://xfxuezhang.blog.csdn.net/article/details/122793393
示例代码在这里:
文件布局
示例代码的配置文件,方便修改
conf.json
{
"auth_key": "12345",
"bind_qq": "你的qq",
"sleep_time": 1,
"receive_groups": ["q群号1", "q群号2"],
"send_groups": ["q群号1", "q群号2"],
"debug_level": "debug"
}
示例代码真正内容
demo.py
import json
import os
import requests
from flask import Flask, request
from time import sleep
import threading
class Logger:
def __init__(self, level='debug'):
self.level = level
def DebugLog(self, *args):
if self.level == 'debug':
print(*args)
def TraceLog(self, *args):
if self.level == 'trace':
print(*args)
def setDebugLevel(self, level):
self.level = level.lower()
class QQBot:
def __init__(self):
self.addr = 'http://127.0.0.1:8888/'
self.session = None
def verifySession(self, auth_key):
"""每个Session只能绑定一个Bot,但一个Bot可有多个Session。
session Key在未进行校验的情况下,一定时间后将会被自动释放"""
data = {"verifyKey": auth_key}
url = self.addr+'verify'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
return res['session']
return None
def bindSession(self, session, qq):
"""校验并激活Session,同时将Session与一个已登录的Bot绑定"""
data = {"sessionKey": session, "qq": qq}
url = self.addr + 'bind'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
self.session = session
return True
return False
def releaseSession(self, session, qq):
"""不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
data = {"sessionKey": session, "qq": qq}
url = self.addr + 'release'
res = requests.post(url, data=json.dumps(data)).json()
logger.DebugLog(res)
if res['code'] == 0:
return True
return False
def getMsgFromGroup(self, session):
url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
res = requests.get(url).json()
if res['code'] == 0:
return res['data']
return None
def parseGroupMsg(self, data):
res = []
if data is None:
return res
for item in data:
if item['type'] == 'GroupMessage':
type = item['messageChain'][-1]['type']
if type == 'Image':
text = item['messageChain'][-1]['url']
elif type == 'Plain':
text = item['messageChain'][-1]['text']
elif type == 'Face':
text = item['messageChain'][-1]['faceId']
else:
logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
continue
name = item['sender']['memberName']
group_id = str(item['sender']['group']['id'])
group_name = item['sender']['group']['name']
res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
return res
def getMessageCount(self, session):
url = self.addr + 'countMessage?sessionKey='+session
res = requests.get(url).json()
if res['code'] == 0:
return res['data']
return 0
def sendMsgToGroup(self, session, group, msg):
text = msg['text']
type = msg['type']
name = msg['name']
group_id = msg['groupId']
group_name = msg['groupName']
content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
name, group_id, group_name, text)
content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
name, group_id, group_name)
logger.DebugLog(">> 消息类型:" + type)
if type == 'Plain':
message = [{"type": type, "text": content1}]
elif type == 'Image':
message = [
{"type": 'Plain', "text": content2},
{"type": type, "url": text}]
elif type == 'Face':
message = [{"type": 'Plain', "text": content2},
{"type": type, "faceId": text}]
else:
logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
return 0
data = {
"sessionKey": session,
"group": group,
"messageChain": message
}
logger.DebugLog(">> 消息内容:" + str(data))
url = self.addr + 'sendGroupMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 转发失败")
return 0
logger.DebugLog(">> 请求返回:" + str(res))
if res['code'] == 0:
return res['messageId']
return 0
def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
# 对每条消息进行检查
for msg in msg_data:
group_id = msg['groupId']
# 接收的消息群正确(目前只支持 消息类型)
if group_id in receive_groups:
# 依次将消息转发到目标群
for g in send_groups:
logger.DebugLog(">> 当前群:"+g)
if g == group_id:
logger.DebugLog(">> 跳过此群")
continue
res = self.sendMsgToGroup(session, g, msg)
if res != 0:
logger.TraceLog(">> 转发成功!{}".format(g))
def sendFriendMessage(self, session, qq, msg):
data = {
"sessionKey": session,
"target": qq,
"messageChain": [
{ "type": "Plain", "text": msg },
]
}
url = self.addr + 'sendFriendMessage'
try:
res = requests.post(url, data=json.dumps(data)).json()
except:
logger.DebugLog(">> 发送失败")
return 0
if res['code'] == 0:
return res['messageId']
return 0
logger = Logger()
bot = QQBot()
app = Flask(__name__)
def qqTransfer():
with open('conf.json', 'r+', encoding="utf-8") as f:
content = f.read()
conf = json.loads(content)
auth_key = conf['auth_key']
bind_qq = conf['bind_qq']
sleep_time = conf['sleep_time']
debug_level = conf['debug_level']
receive_groups = conf['receive_groups']
send_groups = conf['send_groups']
logger.setDebugLevel(debug_level)
session = bot.verifySession(auth_key)
logger.DebugLog(">> session: "+session)
bot.bindSession(session, bind_qq)
while True:
cnt = bot.getMessageCount(session)
if cnt:
logger.DebugLog('>> 有消息了 => {}'.format(cnt))
logger.DebugLog('获取消息内容')
data = bot.getMsgFromGroup(session)
if len(data) == 0:
logger.DebugLog('消息为空')
continue
logger.DebugLog(data)
logger.DebugLog('解析消息内容')
data = bot.parseGroupMsg(data)
logger.DebugLog(data)
logger.DebugLog('转发消息内容')
bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
# else:
# logger.DebugLog('空闲')
sleep(sleep_time)
bot.releaseSession(session, bind_qq)
@app.route('/QQ/send', methods=['GET'])
def qqListenMsg():
# 类似于Qmsg的功能
# flask做得接收HTTP请求转为QQ消息
qq = request.args.get('target', None)
msg = request.args.get('msg', None)
bot.sendFriendMessage(bot.session, qq, msg)
return 'Hello World!'
if __name__ == '__main__':
t = threading.Thread(target=qqTransfer)
t.setDaemon(True)
t.start()
app.run(port='8080', host='0.0.0.0')
这个是插件的setting.yaml
## 配置文件中的值,全为默认值
## 启用的 adapter, 内置有 http, ws, reverse-ws, webhook
adapters:
- http
- ws
## 是否开启认证流程, 若为 true 则建立连接时需要验证 verifyKey
## 建议公网连接时开启
enableVerify: true
verifyKey: 12345
## 开启一些调式信息
debug: false
## 是否开启单 session 模式, 若为 true,则自动创建 session 绑定 console 中登录的 bot
## 开启后,接口中任何 sessionKey 不需要传递参数
## 若 console 中有多个 bot 登录,则行为未定义
## 确保 console 中只有一个 bot 登陆时启用
singleMode: false
## 历史消息的缓存大小
## 同时,也是 http adapter 的消息队列容量
cacheSize: 4096
## adapter 的单独配置,键名与 adapters 项配置相同
adapterSettings:
## 详情看 http adapter 使用说明 配置
http:
host: localhost
port: 8888
cors: ["*"]
## 详情看 websocket adapter 使用说明 配置
ws:
host: localhost
port: 8888
reservedSyncId: -1
gitee:
https://gitee.com/songxf1024/qqmsg-transfer
API文档可参考:
https://docs.mirai.mamoe.net/mirai-api-http/adapter/HttpAdapter.html