mirai QQ机器人最详细教程 [附Q群消息转发例程]
mirai QQ机器人最详细教程 [附Q群消息转发例程]
小锋学长生活大爆炸

mirai QQ机器人最详细教程 [附Q群消息转发例程]

hualala
2022-02-07 / 5 评论 / 477 阅读
温馨提示:
本文最后更新于2022年09月05日,已超过808天没有更新,若内容或图片失效,请留言反馈。

进一步非常详细的教程,篇幅较长,可收藏后慢慢看:

万字长文保姆级教你制作自己的多功能QQ机器人(http://xfxuezhang.cn/index.php/archives/418/)



(以下正文开始)

安装与使用 - 详细内容来这里:
https://xfxuezhang.blog.csdn.net/article/details/122793393


示例代码在这里:
文件布局

示例代码的配置文件,方便修改
conf.json

{
  "auth_key": "12345",
  "bind_qq":  "你的qq",
  "sleep_time": 1,
  "receive_groups": ["q群号1", "q群号2"],
  "send_groups": ["q群号1", "q群号2"],
  "debug_level": "debug"
}

示例代码真正内容
demo.py

import json
import os

import requests
from flask import Flask, request
from time import sleep
import threading

class Logger:
    def __init__(self, level='debug'):
        self.level = level

    def DebugLog(self, *args):
        if self.level == 'debug':
            print(*args)

    def TraceLog(self, *args):
        if self.level == 'trace':
            print(*args)

    def setDebugLevel(self, level):
        self.level = level.lower()


class QQBot:
    def __init__(self):
        self.addr = 'http://127.0.0.1:8888/'
        self.session = None

    def verifySession(self, auth_key):
        """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
        session Key在未进行校验的情况下,一定时间后将会被自动释放"""
        data = {"verifyKey": auth_key}
        url = self.addr+'verify'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return res['session']
        return None

    def bindSession(self, session, qq):
        """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
        data = {"sessionKey": session, "qq": qq}
        url = self.addr + 'bind'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            self.session = session
            return True
        return False

    def releaseSession(self, session, qq):
        """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
        否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
        data = {"sessionKey": session, "qq": qq}
        url = self.addr + 'release'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return True
        return False

    def getMsgFromGroup(self, session):
        url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return None

    def parseGroupMsg(self, data):
        res = []
        if data is None:
            return res
        for item in data:
            if item['type'] == 'GroupMessage':
                type = item['messageChain'][-1]['type']
                if type == 'Image':
                    text = item['messageChain'][-1]['url']
                elif type == 'Plain':
                    text = item['messageChain'][-1]['text']
                elif type == 'Face':
                    text = item['messageChain'][-1]['faceId']
                else:
                    logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
                    continue
                name = item['sender']['memberName']
                group_id = str(item['sender']['group']['id'])
                group_name = item['sender']['group']['name']
                res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
        return res

    def getMessageCount(self, session):
        url = self.addr + 'countMessage?sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return 0

    def sendMsgToGroup(self, session, group, msg):
        text = msg['text']
        type = msg['type']
        name = msg['name']
        group_id = msg['groupId']
        group_name = msg['groupName']
        content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
            name, group_id, group_name, text)
        content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
            name, group_id, group_name)
        logger.DebugLog(">> 消息类型:" + type)
        if type == 'Plain':
            message = [{"type": type, "text": content1}]
        elif type == 'Image':
            message = [
                {"type": 'Plain', "text": content2},
                {"type": type, "url": text}]
        elif type == 'Face':
            message = [{"type": 'Plain', "text": content2},
                       {"type": type, "faceId": text}]
        else:
            logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
            return 0
        data = {
                "sessionKey": session,
                "group": group,
                "messageChain": message
                }
        logger.DebugLog(">> 消息内容:" + str(data))
        url = self.addr + 'sendGroupMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

    def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
        # 对每条消息进行检查
        for msg in msg_data:
            group_id = msg['groupId']
            # 接收的消息群正确(目前只支持 消息类型)
            if group_id in receive_groups:
                # 依次将消息转发到目标群
                for g in send_groups:
                    logger.DebugLog(">> 当前群:"+g)
                    if g == group_id:
                        logger.DebugLog(">> 跳过此群")
                        continue
                    res = self.sendMsgToGroup(session, g, msg)
                    if res != 0:
                        logger.TraceLog(">> 转发成功!{}".format(g))

    def sendFriendMessage(self, session, qq, msg):
        data = {
          "sessionKey": session,
          "target": qq,
          "messageChain": [
            { "type": "Plain", "text": msg },
          ]
        }
        url = self.addr + 'sendFriendMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 发送失败")
            return 0
        if res['code'] == 0:
            return res['messageId']
        return 0



logger = Logger()
bot = QQBot()
app = Flask(__name__)

def qqTransfer():
    with open('conf.json', 'r+', encoding="utf-8") as f:
        content = f.read()
    conf = json.loads(content)

    auth_key = conf['auth_key']
    bind_qq = conf['bind_qq']
    sleep_time = conf['sleep_time']
    debug_level = conf['debug_level']

    receive_groups = conf['receive_groups']
    send_groups = conf['send_groups']

    logger.setDebugLevel(debug_level)

    session = bot.verifySession(auth_key)
    logger.DebugLog(">> session: "+session)
    bot.bindSession(session, bind_qq)
    while True:
        cnt = bot.getMessageCount(session)
        if cnt:
            logger.DebugLog('>> 有消息了 => {}'.format(cnt))
            logger.DebugLog('获取消息内容')
            data = bot.getMsgFromGroup(session)
            if len(data) == 0:
                logger.DebugLog('消息为空')
                continue
            logger.DebugLog(data)
            logger.DebugLog('解析消息内容')
            data = bot.parseGroupMsg(data)
            logger.DebugLog(data)
            logger.DebugLog('转发消息内容')
            bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
        # else:
        #     logger.DebugLog('空闲')
        sleep(sleep_time)
    bot.releaseSession(session, bind_qq)


@app.route('/QQ/send', methods=['GET'])
def qqListenMsg():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendFriendMessage(bot.session, qq, msg)
    return 'Hello World!'

if __name__ == '__main__':
    t = threading.Thread(target=qqTransfer)
    t.setDaemon(True)
    t.start()

    app.run(port='8080', host='0.0.0.0')

这个是插件的setting.yaml

## 配置文件中的值,全为默认值

## 启用的 adapter, 内置有 http, ws, reverse-ws, webhook
adapters:
  - http
  - ws

## 是否开启认证流程, 若为 true 则建立连接时需要验证 verifyKey
## 建议公网连接时开启
enableVerify: true
verifyKey: 12345

## 开启一些调式信息
debug: false

## 是否开启单 session 模式, 若为 true,则自动创建 session 绑定 console 中登录的 bot
## 开启后,接口中任何 sessionKey 不需要传递参数
## 若 console 中有多个 bot 登录,则行为未定义
## 确保 console 中只有一个 bot 登陆时启用
singleMode: false

## 历史消息的缓存大小
## 同时,也是 http adapter 的消息队列容量
cacheSize: 4096

## adapter 的单独配置,键名与 adapters 项配置相同
adapterSettings:
  ## 详情看 http adapter 使用说明 配置
  http:
    host: localhost
    port: 8888
    cors: ["*"]
  
  ## 详情看 websocket adapter 使用说明 配置
  ws:
    host: localhost
    port: 8888
    reservedSyncId: -1

gitee:
https://gitee.com/songxf1024/qqmsg-transfer

API文档可参考:
https://docs.mirai.mamoe.net/mirai-api-http/adapter/HttpAdapter.html

0

评论 (5)

取消
  1. 头像
    龍丞
    Windows 10 or 11 · Chrome . 中国广东省清远市电信

    怎么改成私聊消息转发???本人小白

    回复
    1. 头像
      songxf 作者
      Windows 10 or 11 · Chrome . 美国伊利诺伊芝加哥科进
      @ 龍丞

      sendFriendMessage,这个是发送到个人。监听消息,判断是group还是个人,然后发送

      回复
  2. 头像
    仙九玖玖
    Windows 10 or 11 · Chrome . 中国江西省南昌市移动

    画图

    回复
  3. 头像
    1
    Windows 10 or 11 · Chrome . 中国福建省泉州市电信

    画图

    回复
  4. 头像
    秃头
    Windows 10 or 11 · Chrome . 中国广东省广州市教育网

    太强了

    回复