万字长文保姆级教你制作自己的多功能QQ机器人

songxf
2022-09-02 / 1 评论 / 410 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2022年11月12日,已超过22天没有更新,若内容或图片失效,请留言反馈。

[TOC]

转载请注明出处:小锋学长生活大爆炸(https://xfxuezhang.blog.csdn.net/)

前言

QQ、微信是我们平常使用最多的通讯工具,网上也有很多通过软件去控制QQ/微信的开源工具,通过这些工具,我们可以实现许多有意思的效果,而不仅仅局限于消息聊天。
自从微信网页版被官方禁用后,微信的软件工具几乎已经失效了,现有的一些是通过hook微信本身来实现,这种很容易被官方检测并封号。另一些是通过注册企业号来控制,但不直观且功能受限。
这里我们借助相对更开放的QQ来制作我们的机器人,对比几款工具后,最终选择了mirai


先放一张整体结构图:

功能清单

网上现有开源的机器人大多只是实现了类似“自动推送天气、接入图灵机器人自动聊天”等等,大多属于自娱自乐,没有发挥最大用途。
因此,我们的QQ机器人(暂且取名为“小锋仔”)是根据日常所需而制,包含常用功能且设计得易于扩展。
目前包含的功能有:

  • 类似QMsg酱的消息通知
  • QQ群消息转发
  • 翻译查询
  • 照片上传
  • 实况天气
  • 实时热搜
  • 控制树莓派舵机
  • 控制树莓派屏显
  • ... ...

将来可能包含的功能有:

  • 接入控制ESP32(实现智能家居控制)

接下来详细介绍如何自己搭建一个这样的QQ机器人。篇幅较长保姆级详细,建议收藏后慢慢看。

免费领取轻量应用服务器

首先为了能运行mirai,且随时随地能连接,我们需要有一个具备公网IP的服务器。这里使用腾讯云的免费服务器
如果你已有服务器了,可以跳过本节,当然也可以看一看下面介绍的性价比超高的服务器。

对比云服务器 CVM轻量应用服务器 更聚焦于中小企业、开发者、云计算入门者、学生等用户群体。详细的对比可以看:轻量应用服务器 与云服务器 CVM 对比-产品简介-文档中心-腾讯云
因此,对于个人学习与使用而言,轻量服务器更便宜、更实用,且性能不输。如果是想要进阶的童靴,可以上手CVM、ECS服务器。
对于还不想买的童靴,可以免费领取腾讯云提供的1个月服务器试用套餐。直接上领取步骤:

  1. 进入官网领取:云产品免费试用;需要选购的进:轻量应用服务器专场;不清楚怎么领取的可以看教程:腾讯云产品免费试用教程
  2. 领取完成后,由于后面需要用到端口,因此这里我们提前开放2个端口:8888和9966

image.png
image.png
这里腾讯云可能有个小特点。如果发现在控制台防火墙放行后,还是无法访问。需要再在服务器里放行一下端口。这里先写着,大家可以在后面一节中连接上了服务器,再回过来这里输入指令。

sudo apt install firewalld -y
sudo firewall-cmd --list-all
sudo firewall-cmd --permanent --zone=public --add-port=8888/tcp && sudo firewall-cmd --reload
sudo firewall-cmd --permanent --zone=public --add-port=9966/tcp && sudo firewall-cmd --reload
sudo systemctl start firewalld.service

SSH连接服务器

服务器初始化完成后,就可以通过SSH去连接了。这里我们可以直接使用powershell来连接,其他SSH软件我强推mobaxterm!!安装包也已经准备好了:MobaXterm.exe

  1. 搜索打开powershell:

image.png

  1. 输入以下命令连接SSH:
ssh 用户名@<公网ip>
  1. 或者使用MobaXterm软件:

image.png

  1. 先更新一下软件库:
sudo apt upgrade -y
sudo apt autoremove -y
  1. 一般不建议使用管理员账户,因此我们要自己新建一个账户:
sudo adduser sxf

image.png
然后将账户加入sudoers组:

sudo apt install vim
sudo vim /etc/sudoers

image.png
然后退出软件,重新用新建的账号登录即可。
至此,服务器环境就搭建完成了。

常见Ubuntu软件安装与问题修复

这篇博客里记录了很多我在使用过程中,常用软件的安装,非常详细且经过亲测,时不时也会更新内容,大家可以收藏以备下次使用。
Ubuntu20.04 + VirtualBox相关_小锋学长生活大爆炸的博客-CSDN博客
image.png

搭建mirai环境

接下来就要在服务器上搭建QQ机器人(mirai)基础环境。搭建完成后,我们就可以远程跟机器人进行交互。
官方mirai的github仓库:GitHub - mamoe/mirai: 高效率 QQ 机器人支持库
由于github是国外的,而官方已经不再支持gitee的维护,因此如果大家无法访问上面的连接,可以用我帮大家下载下来的安装包:
其他的一些文档:https://docs.mirai.mamoe.net/
官方论坛:https://mirai.mamoe.net/
下面开始正式安装:

  1. 先SSH连接上服务器,建议不要用root用户登录。
  2. 下载安装包mcl-installer-a02f711-linux-amd64:
mkdir qqbot
cd qqbot
wget http://xfxuezhang.cn/web/share/QQBot/mcl-installer-a02f711-linux-amd64
sudo chmod +x mcl-installer-a02f711-linux-amd64

此时需要输入密码(在上面选购并装完服务器后会显示,当时要求记下的)。

./mcl-installer-a02f711-linux-amd64

此时进入安装流程,弹出的几个选项都直接回车选默认即可。
image.png

  1. 安装完成后,还需要安装mirai-api-http。在当前页面下,继续输入:
./mcl --update-package net.mamoe:mirai-api-http --channel stable-v2 --type plugin
  1. 编辑_config/net.mamoe.mirai-api-http/setting.yml_配置文件 (没有则自行创建)
## 配置文件中的值,全为默认值

## 启用的 adapter, 内置有 http, ws, reverse-ws, webhook
adapters:
  - http
  - ws

## 是否开启认证流程, 若为 true 则建立连接时需要验证 verifyKey
## 建议公网连接时开启
enableVerify: true
verifyKey: 1234567890

## 开启一些调式信息
debug: false

## 是否开启单 session 模式, 若为 true,则自动创建 session 绑定 console 中登录的 bot
## 开启后,接口中任何 sessionKey 不需要传递参数
## 若 console 中有多个 bot 登录,则行为未定义
## 确保 console 中只有一个 bot 登陆时启用
singleMode: false

## 历史消息的缓存大小
## 同时,也是 http adapter 的消息队列容量
cacheSize: 4096

## adapter 的单独配置,键名与 adapters 项配置相同
adapterSettings:
  ## 详情看 http adapter 使用说明 配置
  http:
    # 0.0.0.0是允许远程访问,localhost只能同机器访问
    host: 0.0.0.0
    port: 8888
    cors: ["*"]
    unreadQueueMaxSize: 100
  
  ## 详情看 websocket adapter 使用说明 配置
  ws:
    host: localhost
    port: 8080
    reservedSyncId: -1
  1. 启动mirai即可:
./mcl

首次启动会自动下载jar包。等待启动完成后,输入"?",可以查看所有支持的mcl命令。
image.png

  1. 使用以下命令即可登录QQ号:
/login <qq> [password] 

如果想要启动mcl后自动登录QQ号,可以用:

/autoLogin add <account> <password> 

也可以设置不同的设备登录。

/autoLogin setConfig <account> protocol ANDROID_PAD

它对应的配置文件其实就在:config/Console/AutoLogin.yml
image.png

  1. 现在QQ风控很严了,第一次登录很有可能遇到“需要滑动验证码”的。建议申请小号使用,以免发生不测。并且首次使用时在QQ“账号安全设置”中关闭“安全登录检查”、“陌生设备登录保护”。如果遇到验证码,可以尝试:

    1. 将Captcha link通过另一个QQ,发给待登录的mirai-QQ,手机登录mirai-QQ并点击链接,手动完成滑块验证,然后回到mobaxterm输入回车;

image.png

  1. 如果不行,就参考这个链接的方法:https://github.com/project-mirai/mirai-login-solver-selenium
  2. 还不行,再参考这个链接的方法:https://docs.mirai.mamoe.net/mirai-login-solver-selenium/
  3. 还有一个小技巧可以尝试。在手机端先通过手机号登录QQ,如果没问题,再通过手机号在mirai上登录。手机建议先登录上mirai-QQ,有时可能会弹窗提示“是否允许陌生设备登录”等等,要手动点确认的。
  4. 另外,最新申请的QQ号,一般可以成功登录mirai。
  5. 如果以上都不行,目前的终极方案是使用miraiAndroidMiraiAndroid
  • 在手机上的MiraiAndroid登录QQ后导出device.json
  • 将cache目录下的3个文件account.secrets、servers.json、session.bin也复制出来

image.png

  • 接下来点击左上角, 再点击“工具”。选择你机器人的账号, 选择 导出 DEVICE.JSON 将其导出。

image.png

  • 再次回到服务器端,进入 “bots/<你的QQ号>” 下面, 将导出的 device.json 复制放入。对应的cache文件夹也复制放入。

image.png

  • 再次执行 ./mcl 启动 mirai-console 看看效果。
  • 若仍有问题,欢迎加入文末Q群交流。
  1. 至此,mcl就已经能正常接收QQ消息了。而我们的实现代码对mcl的控制,就是通过mirai-api-http插件来实现的。根据上面第4步配置的_setting.yml_文件,再参考官方API文档HttpAdapter文档,即可实现互联互通。(讲起来比较麻烦,no bibi,后面直接show me the code)。

image.png

Python控制mirai篇

当服务器成功运行了mirai后,我们就可以在本地进行Python脚本的编写了。由于最新的mirai-api-http变更过接口规范,因此网上某些一两年前的代码已经失效了。本教程对应的mirai-api-http使用的是最新的2.x版本。
接下来的操作,都默认已经完成“启动mcl并login了QQ号”
在上面setting.yml中,有两个配置项值得注意,他是我们脚本可以控制的密钥:

verifyKey: 1234567890
http: port: 8888

debug输出封装

简单封装下。直接用print也是可以的。

class Logger:
    def __init__(self, level='debug'):
        self.level = level

    def DebugLog(self, *args):
        if self.level == 'debug':
            print(*args)

    def TraceLog(self, *args):
        if self.level == 'trace':
            print(*args)

    def setDebugLevel(self, level):
        self.level = level.lower()

交互授权

在交互前,脚本需要先向mirai获取一个verifyKey,之后在每个请求时候,都需要带上这个key,也叫session。其中,参数auth_key对应了上面setting.yml里的verifyKey。

auth_key = '1234567890'

def verifySession(self, auth_key):
    """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
        session Key在未进行校验的情况下,一定时间后将会被自动释放"""
    data = {"verifyKey": auth_key}
    url = self.addr+'verify'
    res = requests.post(url, data=json.dumps(data)).json()
    logger.DebugLog(res)
    if res['code'] == 0:
        return res['session']
    return None

绑定bot

使用此方法校验并激活你的Session,同时将Session与一个已登录的Bot绑定。

qq = '121215'             # mirai登录的那个QQ
session = 'grge8484'     # 上面verifySession函数的返回值

def bindSession(self, session, qq):
    """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
    data = {"sessionKey": session, "qq": qq}
    url = self.addr + 'bind'
    res = requests.post(url, data=json.dumps(data)).json()
    logger.DebugLog(res)
    if res['code'] == 0:
        self.session = session
        return True
    return False

释放bot

使用此方式释放session及其相关资源(Bot不会被释放)

def releaseSession(self, session, qq):
    """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
        否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
    data = {"sessionKey": session, "qq": qq}
    url = self.addr + 'release'
    res = requests.post(url, data=json.dumps(data)).json()
    logger.DebugLog(res)
    if res['code'] == 0:
        return True
    return False

未读消息的数量

获取当前有多少条未读消息。

def getMessageCount(self, session):
    url = self.addr + 'countMessage?sessionKey='+session
    res = requests.get(url).json()
    if res['code'] == 0:
        return res['data']
    return 0

获取最新的消息

获取消息后会从队列中移除。

def fetchLatestMessage(self, session):
    url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
    res = requests.get(url).json()
    if res['code'] == 0:
        return res['data']
    return None

解析消息内容

简单实现了部分消息类型的解析,会有消息丢失,请根据使用需求自行调整。

data = 'xxx'  # 可以是上面getMsgFromGroup函数的返回值

def parseGroupMsg(self, data):
    res = []
    if data is None:
        return res
    for item in data:
        if item['type'] == 'GroupMessage':
            type = item['messageChain'][-1]['type']
            if type == 'Image':
                text = item['messageChain'][-1]['url']
            elif type == 'Plain':
                text = item['messageChain'][-1]['text']
            elif type == 'Face':
                text = item['messageChain'][-1]['faceId']
            else:
                logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
                continue
                name = item['sender']['memberName']
                group_id = str(item['sender']['group']['id'])
                group_name = item['sender']['group']['name']
                res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
                return res

向好友发送消息

向指定好友发送消息。

def sendFriendMessage(self, session, qq, msg):
    msg_list = msg.split(r'\n')
    msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]

    data = {
        "sessionKey": session,
        "target": qq,
        "messageChain": msg_chain
    }
    url = self.addr + 'sendFriendMessage'
    try:
        res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 发送失败")
            return 0
        if res['code'] == 0:
            return res['messageId']
        return 0

向群发送消息

也只是简单实现。

def sendMsgToGroup(self, session, group, msg):
    text = msg['text']
    type = msg['type']
    name = msg['name']
    group_id = msg['groupId']
    group_name = msg['groupName']
    content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
        name, group_id, group_name, text)
    content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
        name, group_id, group_name)
    logger.DebugLog(">> 消息类型:" + type)
    if type == 'Plain':
        message = [{"type": type, "text": content1}]
    elif type == 'Image':
        message = [
            {"type": 'Plain', "text": content2},
            {"type": type, "url": text}]
    elif type == 'Face':
        message = [{"type": 'Plain', "text": content2},
                   {"type": type, "faceId": text}]
    else:
        logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
        return 0
    data = {
        "sessionKey": session,
        "group": group,
        "messageChain": message
    }
    logger.DebugLog(">> 消息内容:" + str(data))
    url = self.addr + 'sendGroupMessage'
    try:
        res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

向群发送富文本消息

跟上面的差不多,消息类型变了一下,从而支持类似HTML形式的消息发送。

def sendPlainTextToGroup(self, session, group, msg):
    msg_list = msg.split(r'\n')
    msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
    data = {
        "sessionKey": session,
        "group": group,
        "messageChain": msg_chain
    }
    url = self.addr + 'sendGroupMessage'
    try:
        res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

以上就是几个简单、常用的函数。基于这些函数,就已经可以实现蛮多有趣的功能了。

Q群消息转发

这部分可以直接参考之前的博客:Q群消息转发例程。其实也就是把上面的函数整合一下,放一个完整版:

import requests
from time import sleep

class Logger:
    def __init__(self, level='debug'):
        self.level = level

    def DebugLog(self, *args):
        if self.level == 'debug':
            print(*args)

    def TraceLog(self, *args):
        if self.level == 'trace':
            print(*args)

    def setDebugLevel(self, level):
        self.level = level.lower()

logger = Logger()
class QQBot:
    def __init__(self):
        self.addr = 'http://43.143.12.250:8888/'
        self.session = None

    def verifySession(self, auth_key):
        """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
        session Key在未进行校验的情况下,一定时间后将会被自动释放"""
        data = {"verifyKey": auth_key}
        url = self.addr+'verify'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return res['session']
        return None

    def bindSession(self, session, qq):
        """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
        data = {"sessionKey": session, "qq": qq}
        url = self.addr + 'bind'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            self.session = session
            return True
        return False

    def releaseSession(self, session, qq):
        """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
        否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
        data = {"sessionKey": session, "qq": qq}
        url = self.addr + 'release'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return True
        return False

    def fetchLatestMessage(self, session):
        url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return None

    def parseGroupMsg(self, data):
        res = []
        if data is None:
            return res
        for item in data:
            if item['type'] == 'GroupMessage':
                type = item['messageChain'][-1]['type']
                if type == 'Image':
                    text = item['messageChain'][-1]['url']
                elif type == 'Plain':
                    text = item['messageChain'][-1]['text']
                elif type == 'Face':
                    text = item['messageChain'][-1]['faceId']
                else:
                    logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
                    continue
                name = item['sender']['memberName']
                group_id = str(item['sender']['group']['id'])
                group_name = item['sender']['group']['name']
                res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
        return res

    def getMessageCount(self, session):
        url = self.addr + 'countMessage?sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return 0

    def sendPlainTextToGroup(self, session, group, msg):
        msg_list = msg.split(r'\n')
        msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
        data = {
          "sessionKey": session,
          "group": group,
          "messageChain": msg_chain
        }
        url = self.addr + 'sendGroupMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

    def sendMsgToGroup(self, session, group, msg):
        text = msg['text']
        type = msg['type']
        name = msg['name']
        group_id = msg['groupId']
        group_name = msg['groupName']
        content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
            name, group_id, group_name, text)
        content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
            name, group_id, group_name)
        logger.DebugLog(">> 消息类型:" + type)
        if type == 'Plain':
            message = [{"type": type, "text": content1}]
        elif type == 'Image':
            message = [
                {"type": 'Plain', "text": content2},
                {"type": type, "url": text}]
        elif type == 'Face':
            message = [{"type": 'Plain', "text": content2},
                       {"type": type, "faceId": text}]
        else:
            logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
            return 0
        data = {
                "sessionKey": session,
                "group": group,
                "messageChain": message
                }
        logger.DebugLog(">> 消息内容:" + str(data))
        url = self.addr + 'sendGroupMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

    def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
        # 对每条消息进行检查
        for msg in msg_data:
            group_id = msg['groupId']
            # 接收的消息群正确(目前只支持 消息类型)
            if group_id in receive_groups:
                # 依次将消息转发到目标群
                for g in send_groups:
                    logger.DebugLog(">> 当前群:"+g)
                    if g == group_id:
                        logger.DebugLog(">> 跳过此群")
                        continue
                    res = self.sendMsgToGroup(session, g, msg)
                    if res != 0:
                        logger.TraceLog(">> 转发成功!{}".format(g))

    def sendFriendMessage(self, session, qq, msg):
        msg_list = msg.split(r'\n')
        msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]

        data = {
          "sessionKey": session,
          "target": qq,
          "messageChain": msg_chain
        }
        url = self.addr + 'sendFriendMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 发送失败")
            return 0
        if res['code'] == 0:
            return res['messageId']
        return 0

def qqTransfer():
    with open('conf.json', 'r+', encoding="utf-8") as f:
        content = f.read()
    conf = json.loads(content)

    auth_key = conf['auth_key']
    bind_qq = conf['bind_qq']
    sleep_time = conf['sleep_time']
    debug_level = conf['debug_level']

    receive_groups = conf['receive_groups']
    send_groups = conf['send_groups']

    logger.setDebugLevel(debug_level)

    session = bot.verifySession(auth_key)
    logger.DebugLog(">> session: "+session)
    bot.bindSession(session, bind_qq)
    while True:
        cnt = bot.getMessageCount(session)
        if cnt:
            logger.DebugLog('>> 有消息了 => {}'.format(cnt))
            logger.DebugLog('获取消息内容')
            data = bot.fetchLatestMessage(session)
            if len(data) == 0:
                logger.DebugLog('消息为空')
                continue
            logger.DebugLog(data)
            logger.DebugLog('解析消息内容')
            data = bot.parseGroupMsg(data)
            logger.DebugLog(data)
            logger.DebugLog('转发消息内容')
            bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
        # else:
        #     logger.DebugLog('空闲')
        sleep(sleep_time)
    bot.releaseSession(session, bind_qq)

其中,conf.json内容为:

{
  "auth_key": "1234567890",
  "bind_qq":  "123456",                                                     # mirai登录的QQ (复制时记得删我)
  "sleep_time": 1,
  "receive_groups": ["913182235", "977307922"],  # 要接受消息的群 (复制时记得删我)
  "send_groups": ["913182235", "977307922"],         # 要发送消息的群 (复制时记得删我)
  "debug_level": "debug"
}

下面,我们就先从类似QMsg酱的消息通知开始。

类似QMsg酱的消息通知

设计目标:通过调用指定的URL,小锋仔机器人就会给指定的好友发送指定的消息。
关于QMsg酱的使用教程可以看:免费的QQ微信消息推送机器人
前面我们特地开放了9966端口,因此可以使用Flask来监听这个端口。
本着越简单越好的原则,我们把“发给好友还是群”、“目标好友或群的号”、“发送的内容”三部分都拼接到URL上,因此有:

http://43.143.12.250:9966/QQ/send/friend?target=123&msg=hello
http://43.143.12.250:9966/QQ/send/group?target=123&msg=hello

因此,代码可以写成:

from flask import Flask, request

app = Flask(__name__)

@app.route('/QQ/send/friend', methods=['GET'])
def qqListenMsgToFriend():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendFriendMessage(bot.session, qq, msg)
    return 'Hello World! Friend!'

@app.route('/QQ/send/group', methods=['GET'])
def qqListenMsgToGroup():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendPlainTextToGroup(bot.session, qq, msg)
    return 'Hello World! Group!'

if __name__ == '__main__':
    app.run(port='9966', host='0.0.0.0')

由于Flask和小锋仔QQBot都要阻塞运行,因此稍微变动一下,让小锋仔以子线程的形式运行即可。

if __name__ == '__main__':
    t = threading.Thread(target=qqTransfer)
    t.setDaemon(True)
    t.start()

    app.run(port='9966', host='0.0.0.0')

测试一下:

http://localhost:9966/QQ/send/friend?target=1061700625&msg=hello

image.png
如果我们把这个脚本放到服务器上去运行,那么链接就变成了:

http://43.143.12.250:9966/QQ/send/friend?target=1061700625&msg=hello

当然,能发消息的前提是“先加好友”或“加群”啦。

多功能切换的实现设计

上面我们进行了简单地尝鲜。
1、从这部分开始,我们涉及的功能比较杂,为了能更好的区分功能,需要设计一个简单的交互协议。

  • 我们发送的内容可以分为:功能选择消息详情
  • 为了区分他俩,可以在选择功能时添加指定前缀,如“CMD+翻译”;
  • 小锋仔接收到后,进入翻译模式准备;
  • 发送指令详情时,就不加前缀。而小锋仔则将收到的消息进行翻译,再把结果返回。

根据以上内容,小锋仔需要记录的状态信息至少有:

class StatusStore:
    def __init__(self, from_qq:int=None, is_cmd:bool=False, func_name:str=None, need_second:bool=False, msg:str=None) -> None:
        self.from_qq = from_qq          # 发送者的QQ号
        self.is_cmd = is_cmd            # 是否是指令(选择功能)
        self.func_name = func_name      # 选择的功能的名称
        self.need_second = need_second  # 是否需要经过两步:先发cmd指令,再发详细内容
        self.msg = msg                  # 本次发送的消息内容
    
    def detail(self):
        return self.__dict__

2、并且我们设置,只有从指定QQ发过来消息,才能响应。因此在接收到消息时,需要判断对方的信息。对于好友类型的消息,mirai返回格式如消息类型说明

{
  "type": "FriendMessage",
  "sender": {
    "id": 123,
    "nickname": "",
    "remark": ""
  },
  "messageChain": [] // 数组,内容为下文消息类型
}

因此,我们可以从"type"和 "sender:id"入手判断。
3、我们暂时考虑只有一个主QQ能发送指令的情况。
4、定义一个类来专门管理不同功能的函数,例如:

class MultiFunction:
    """多功能函数集合"""
    def __init__(self) -> None:
        pass

    @staticmethod
    def translate(original:str, convert:str='zh2en') -> str:
        return '假装是翻译结果' 
    
    @staticmethod
    def uploadImage(image_path:str) -> str:
        return '假装是上传结果' 

    @staticmethod
    def weather(city:str) -> str:
        return '假装是天气结果' 
    
    @staticmethod
    def hotNews(status_store:StatusStore) -> str:
        return '假装是热搜结果' 


# 多功能函数的映射
# function: 功能对应函数名
# need_second: 是否需要经过两步:先发cmd指令,再发详细内容
# desc: 需要经过两步时,第一次返回的提示语
function_map = {
    '翻译': {'function': MultiFunction.translate, 'need_second': True, 'desc': '请输入您要翻译的内容~'}, 
    '天气': {'function': MultiFunction.weather, 'need_second': True, 'desc': '请问是哪座城市的天气呢?'}, 
    '热搜': {'function': MultiFunction.hotNews, 'need_second': False}
}

def choiceFunction(store_obj:StatusStore):
    res = ''
    if function_map.get(store_obj.func_name):
        res = function_map.get(store_obj.func_name)['function'](store_obj.msg)
    return res 

5、大致实现流程的想法是:

对应代码实现:

def analyzeFriendMsg(self, data):
    if data is None or data['type'] != 'FriendMessage':
        return None, None, None
    sender_id = data['sender']['id']
    msg_type = data['messageChain'][-1]['type']
    if msg_type == 'Plain':
        msg_text = data['messageChain'][-1]['text']
    elif msg_type == 'Image':
        msg_text = data['messageChain'][-1]['url']
    else:
        msg_text = ''
        return sender_id, msg_type, msg_text

最终的框架就是:

def xiaofengzai():
    auth_key = '1234567890'     # settings.yml中的verifyKey
    bind_qq = '3126229950'      # mirai登录的QQ
    target_qq = '1061700625'    # 我们自己用的主QQ
    target_qq = int(target_qq)  # 接收到的消息里,QQ是int类型的
    sleep_time = 1              # 轮询间隔
    status_store = {}

    session = bot.verifySession(auth_key)
    logger.DebugLog(">> session: "+session)
    bot.bindSession(session, bind_qq)
    while True:
        cnt = bot.getMessageCount(session)
        if not cnt:
            sleep(sleep_time)
            continue
        logger.DebugLog('>> 有消息了 => {}'.format(cnt))
        logger.DebugLog('获取消息内容')
        data = bot.fetchLatestMessage(session)
        if len(data) == 0:
            logger.DebugLog('消息为空')
            sleep(sleep_time)
            continue
        logger.DebugLog(data)
        logger.DebugLog('解析消息内容')

        sender_id, msg_type, msg_text = bot.analyzeFriendMsg(data[0])
        if not sender_id or sender_id != target_qq:
            sleep(sleep_time)
            continue

        if msg_text.strip().lower().startswith('cmd'):
            _, func_name = msg_text.strip().split('\n')[0].split()
            func_name = func_name.strip()
            store_obj = StatusStore(from_qq=sender_id, is_cmd=True, func_name=func_name)
            # 不需要发两次,直接调用函数返回结果即可
            func_info = function_map.get(func_name)
            if not func_info:
                res = '指令[{}]暂不支持'.format(func_name)
            elif func_info.get('need_second'):
                res = '收到你的指令:{}\n{}'.format(func_name, func_info.get('desc') or '已进入对应状态, 请继续发送详细内容')
                # 添加或更新记录
                status_store[sender_id] = store_obj
            else:
                res = '请求结果为:\n' + str(choiceFunction(store_obj))
                status_store.pop(sender_id, '')
        else:
            res = '请先发送指令哦...'
            store_obj = status_store.get(sender_id)
            if store_obj and store_obj.is_cmd:
                store_obj.msg = msg_text
                res = '请求结果为:\n' + str(choiceFunction(store_obj))
                status_store.pop(sender_id, '')
        
        bot.sendFriendMessage(session, qq=sender_id, msg=res)

看一下效果:
image.png
至此,骨架有了,接下来开始填充功能了。

翻译查询

根据上面的骨架可知,我们只需要实现MultiFunction类下的translate函数即可。如果想快速测试函数效果,可以使用以下代码,而不用先启动mirai:

res = choiceFunction(StatusStore(func_name='翻译', msg='你好'))
print(res)

领取腾讯免费翻译API

要做翻译,最方便的就是调用API了(没错,调包侠!)。
这里使用腾讯的翻译API,可以免费领取:领取腾讯翻译API。点进链接后,往下拖到“云产品体验”专区,选择“人工智能”,下面有“机器翻译”。他的调用量是每月更新,非常的良心了。
image.png
点击“立即体验”,进入控制台界面,虽然上面显示的是“开通付费版”,但不用担心,他是有免费额度的,更何况你账户里又没充余额,哈哈哈。
image.png
支持很多类型的翻译,这次我们先选文本翻译机器翻译 文本翻译-API 文档-文档中心-腾讯云
image.png
我们用SDK的方式,免去了自己封装复杂的加密步骤:

pip install --upgrade tencentcloud-sdk-python

然后去获取密钥API密钥管理,记下APPID、SecretId、SecretKey
image.png

机器人接入翻译功能

小锋仔bot结合翻译功能,直接上代码:

import json
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.tmt.v20180321 import tmt_client, models

def translate(original:str, convert:str='en'):
    secretId = 'xxx'  # 从API控制台获取
    secretKey = 'xxx' # 从API控制台获取
    AppId = 12123     # 从API控制台获取
    try:
        cred = credential.Credential(secretId, secretKey)
        client = tmt_client.TmtClient(cred, "ap-guangzhou")
        req = models.TextTranslateRequest()
        params = {
            "SourceText": original,
            "Source": "auto",
            "Target": convert,
            "ProjectId": AppId
        }
        req.from_json_string(json.dumps(params))
        resp = client.TextTranslate(req)
        # print(resp.to_json_string())
        return resp.TargetText
    except TencentCloudSDKException as err:
        print(err)
        return ''

使用测试效果:

print(choiceFunction(StatusStore(func_name='翻译', msg='你好')))

# 输出:
{"TargetText": "Hello", "Source": "zh", "Target": "en", "RequestId": "a1b17f47-751e-44cd-89a5-6a22e9f2c444"}
Hello

image.png

实时天气

领取免费的和风天气API

天气部分,我们是用免费的和风天气API:实时天气 - API
首先也要进行登录并获取KEY,这个步骤官网讲的很详细,图文并茂的,这边就不多写了,大家可以跳转过去(注意我们选的是Web API):创建应用和KEY - RESOURCE

机器人接入天气功能

同样的,直接上代码:

def weather(city:str) -> str:
    url_api_weather = 'https://devapi.qweather.com/v7/weather/now?'
    url_api_geo = 'https://geoapi.qweather.com/v2/city/lookup?'
    weather_key = 'xxxxx'  # 和风天气控制台的key

    # 实况天气
    def getCityId(city_kw):
        url_v2 = url_api_geo + 'location=' + city_kw + '&key=' + weather_key
        city = requests.get(url_v2).json()['location'][0]
        return city['id']

    city_name = '广州'
    city_id = getCityId(city_name)
    url = url_api_weather + 'location=' + city_id + '&key=' + weather_key
    res = requests.get(url).json()
    text = "<天气信息获取失败>"
    if res['code'] == '200' or res['code'] == 200:
        text = '实时天气:\n 亲爱的 小主, 您所在的地区为 {},\n 现在的天气是 {},\n 气温 {}°, 湿度 {}%,\n 体感气温为 {}°,\n 风向 {}, 风速 {}km/h'.format(
            city_name, res['now']['text'], res['now']['temp'], res['now']['humidity'], res['now']['feelsLike'], res['now']['windDir'], res['now']['windSpeed']) 
    return text 

测试效果:
image.png

实时热搜

领取免费的天行热搜API

这部分用的是天行数据,免费会员每天赠送100次调用额度:今日头条新闻API接口 - 天行数据TianAPI。先注册账号,然后点击“申请接口”即可。
image.png
注意,首次注册需要在控制台完成“实名认证”和“邮箱验证”(马上通过,不需要等待审核)。
image.png
对于密钥Key,是在“控制台-数据管理-我的密钥KEY”中。
image.png

机器人接入热搜功能

同样的,直接上代码:

def hotNews(status_store:StatusStore) -> str:
    tianxing_key = 'e05966abe0b054686c9f6b7d60e59a8d'
    def common(data):
        url = data + '?key={}'.format(tianxing_key)
        res = requests.get(url).json()
        return res['newslist']
    res = common('http://api.tianapi.com/topnews/index')
    tops = []
    index = 1
    for item in res:
        tops.append(str(index) + '. ' + item['title'])
        index += 1
        return '\n'.join(tops[0:10])

测试效果:
image.png

照片上传

有时候我们想保存一些照片,但又不想放手机里,那我们可以做个“通过把照片发给小锋仔机器人,让小锋仔再上传到服务器或者COS上”的功能。

领取腾讯对象存储COS

还是这个链接:云产品体验 - 腾讯云,在“云产品体验-基础-对象存储COS”下面。对象存储不止可以用来存文件,这里我们只用来存图片。
image.png

  1. 领取完成后,进入控制台,创建存储桶:

image.png

  1. 配置存储桶信息,访问权限设置为“公有读私有写”,这样别人就能看到了,便于分享图片:

image.png
image.png
image.png

  1. 创建完成后,就可以通过Python APi去控制上传了。不过需要先安装下SDK:
pip install -U cos-python-sdk-v5
  1. 上传图片部分,我们现在随便拿一张图测试:
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
import os

def uploadImage(image_path:str) -> str:
    bucket_id = 'image-1253093297'  # 存储桶的名称
    secret_id = 'xxx'
    secret_key = 'xxx'
    region = 'ap-guangzhou'  # 存储桶的地区
    token = None              
    scheme = 'https'          
    config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
    client = CosS3Client(config)
    # 本地文件形式上传
    # response = client.upload_file(
    #     Bucket=bucket_id,
    #     LocalFilePath=image_path,
    #     Key=image_path.split(os.sep)[-1],
    #     PartSize=1,
    #     MAXThread=10,
    #     EnableMD5=False
    # )

    # 网络文件形式上传
    file_keyname = image_path.split('/')[-2] + '.jpg'
    stream = requests.get(image_path)
    response = client.put_object(
        Bucket=bucket_id,
        Body=stream,
        Key=file_keyname
    )
    print(response['ETag'])
    img_url = 'https://{}.cos.{}.myqcloud.com/{}'.format(bucket_id, region, file_keyname)
    return '上传成功, ETag: {}\nURL: {}'.format(response['ETag'], img_url)
  1. 上传完成后,在控制台就可以看到了。

image.png

  1. 更多cos操作可看官方文档:对象存储 快速入门-SDK 文档-文档中心-腾讯云

机器人接入图片上传功能

通过mirai文档可知,图片消息格式为:

[{'type': 'FriendMessage', 'messageChain': [{'type': 'Source', 'id': 55312, 'time': 1662048857}, {'type': 'Image', 'imageId': '{DCAD8B29-D606-B354-117D-F39479C14FE3}.jpg', 'url': 'http://c2cpicdw.qpic.cn/offpic_new/1061700625//1061700625-141936558-DCAD8B29D606B354117DF39479C14FE3/0?term=2', 'path': None, 'base64': None}], 'sender': {'id': 1061700625, 'nickname': '热心市民', 'remark': '热心市民'}}]

因此只需要拿到里面的URL就行,而我们的analyzeFriendMsg函数就已经提取了URl了,因此啥都不用多改!!(结构好,就是方便呀~)
直接测试:
image.png

自行添加小功能函数总结

通过上面几个小功能,不难发现我们的程序在功能上很方便扩展,总结一下,就2步

  1. MultiFunction类中添加功能函数的实现,入参尽量为字符串型,返回也为字符串型
  2. function_map中添加函数信息;

下面提供几个好玩的接口,给大家留个作业,自己集成到机器人中去:

# 疫情信息
def getYiQing():
    url = 'https://c.m.163.com/ug/api/wuhan/app/data/list-total?t={}'.format(329091037164)
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/96.0.4664.110 Safari/537.36 '
    }
    res = requests.get(url, headers=headers).json()
    total = res['data']['chinaTotal']['total']
    today = res['data']['chinaTotal']['today']
    a = 99
    symbol_today = '+' if today['confirm'] >= 0 else ''
    symbol_total = '+' if today['storeConfirm'] >= 0 else ''
    symbol_input = '+' if today['input'] >= 0 else ''
    confirmTotal = '累计确诊:{},较昨日:{}{}'.format(total['confirm'], symbol_today, today['confirm'])
    confirmToday = '现有确诊:{},较昨日:{}{}'.format(total['confirm'] - total['dead'] - total['heal'], symbol_total, today['storeConfirm'])
    inputs = '境外输入:{},较昨日:{}{}'.format(total['input'], symbol_input, today['input'])
    return inputs + '\n' + confirmToday + '\n' + confirmTotal

# 历史上的今天
def getHistoryToday():
    url = 'https://api.oick.cn/lishi/api.php'
    res = requests.get(url).json()
    historyToday = []
    for item in res['result']:
        historyToday.append(item['date'] + ', ' + item['title'])
    return '\n'.join(random.choices(historyToday, k=3))

# 一言
def dailysentence():
    url = 'https://res.abeim.cn/api-text_yiyan'
    res = requests.get(url).json()
    return res['content']

# 天行api
def common(data):
    tianxing_key = ''  # 天行key
    url = data + '?key={}'.format(tianxing_key)
    res = requests.get(url).json()
    return res['newslist']

# 天行api - 小窍门
def dailyTips():
    res = common('http://api.tianapi.com/qiaomen/index')
    tipsArray = res[0]
    return tipsArray['content']

# 天行api - 健康小知识
def healthTips():
    res = common('http://api.tianapi.com/healthtip/index')
    tipsArray = res[0]
    return tipsArray['content']

如果以后功能越来越多,我们很容易记不住关键词是啥,因此,稍稍变动一下,让我们可以知道功能清单。在xiaofengzai函数这个位置添加一段代码:

if msg_text.strip() == '功能清单':
    res = '目前支持的关键词有:\n' + '\n'.join(function_map.keys())

image.png
image.png

控制树莓派舵机与屏显

这部分摘自我前面的博客:

与树莓派的主要交互,这里主要有两种方式:

  • 树莓派上也运行mirai。通过设置不同的protocol,是可以实现同时在线的。
  • 通过MQTT通信。这个比较好用,是个物联网协议,广泛适用于IoT场景,推荐。

我的另一个大型项目“基于树莓派的智能魔镜”,它里面树莓派与手机的通信,就是通过MQTT实现的。很贴心的,B站还有配套的视频教程,欢迎来踩,哈哈哈~小锋学长生活大爆炸的个人空间


腾讯云服务器搭建MQTT环境

树莓派由于不在身边,因此这部分暂时先略过,大家可以通过上面几篇博客自学一下,他们也都是使用到了mirai的。这里讲一下MQTT的安装,也可以参考安装EMQX MQTT

  1. SSH进入我们的服务器后,输入指令:
sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
sudo apt-get update
sudo apt-get install mosquitto
sudo apt-get install mosquitto-clients
sudo apt clean
  1. 然后去腾讯云控制台开放下1883端口。
  2. 再在防火墙软件上也放行下1883端口
sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp && sudo firewall-cmd --reload
sudo systemctl start firewalld.service
  1. 通过Python调用MQTT的示例可以参考:Python MQTT
  2. 想了解学习MQTT概念的可以参考:MQTT V3.1协议规范
  3. 更多MQTT使用示例可以参考:

    1. Ubuntu18和Raspbian搭建LAMP环境+部署图片上传网页+安装Mosquitto
    2. 纯JavaScript实现的MQTT智能门锁
    3. Qt搭建MQTT环境

待实现功能

接入控制ESP32(实现智能家居控制)

ESP32是一块可以链接WIFI的嵌入式开发板,支持MQTT协议。这样一来,只要通过跟我们的机器人互相订阅Topic,在通过设计一套通信协议,就可以实现远程交互了。进一步地,给ESP32接入外设,就可以很容易的实现一个智能家居,而我们则可以通过QQ机器人来实现对智能家居的控制。

完整代码整理

为了方便,我们把所有需要修改的变量,都统一提取到了最前面。大家在复制过程中,务必记得都填上自己的!!
![Q073C@O[}BS4ON(A]H$C_YX.png](https://cdn.nlark.com/yuque/0/2022/png/21876370/1662051010396-4111cadd-c0f7-444e-88bf-f5ab77abd9c1.png#clientId=u1cf12aa3-c8f3-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=547&id=u1448e26e&margin=%5Bobject%20Object%5D&name=Q073C%40O%5B%7DBS4ON%28A%5DH%24C_YX.png&originHeight=903&originWidth=1013&originalType=binary&ratio=1&rotation=0&showTitle=false&size=46750&status=done&style=none&taskId=ud48045de-328e-4049-870b-8561bc1feb4&title=&width=614)
最后,贴上完整代码。由于水平有限,写的可能不是很好。也欢迎大家DIY魔改成自己的。如果有问题,欢迎加入文末Q群一起交流~~~

import json
import os

import requests
from flask import Flask, request
from time import sleep
import threading
import json
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.tmt.v20180321 import tmt_client, models
import requests
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client


# ---------------------------- 变量定义区 ---------------------------- #
# 自己运行mirai的服务器IP和mirai-api-http的监听端口
mirai_server_url = 'http://43.143.12.250:8888/'
# settings.yml中的verifyKey
auth_key = '1234567890'     
# mirai登录的QQ
bind_qq = 'xxx'      
# 我们自己用的主QQ。接收到的消息里,QQ是int类型的
target_qq = 123123

# 腾讯COS存储桶的名称
tencent_cos_bucket_id = 'xxx'  
# 腾讯COS存储桶的地区
tencent_cos_region = 'ap-guangzhou'  
# 腾讯控制台的SecretId
tencent_secret_id = 'xxx'
# 腾讯控制台的SecretKey
tencent_secret_key = 'xxx'
# 腾讯机器翻译的appid
tencent_translate_AppId = xxx
# 和风天气API的key
weather_key = 'xxx'
# 天行API的key
tianxing_key = 'xxx'
# ------------------------------------------------------------------ #


class Logger:
    def __init__(self, level='debug'):
        self.level = level

    def DebugLog(self, *args):
        if self.level == 'debug':
            print(*args)

    def TraceLog(self, *args):
        if self.level == 'trace':
            print(*args)

    def setDebugLevel(self, level):
        self.level = level.lower()


class QQBot:
    def __init__(self):
        self.addr = mirai_server_url
        self.session = None

    def verifySession(self, auth_key):
        """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
        session Key在未进行校验的情况下,一定时间后将会被自动释放"""
        data = {"verifyKey": auth_key}
        url = self.addr+'verify'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return res['session']
        return None

    def bindSession(self, session, qq):
        """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
        data = {"sessionKey": session, "qq": qq}
        url = self.addr + 'bind'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            self.session = session
            return True
        return False

    def releaseSession(self, session, qq):
        """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
        否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
        data = {"sessionKey": session, "qq": qq}
        url = self.addr + 'release'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return True
        return False

    def fetchLatestMessage(self, session):
        url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return None


    def parseGroupMsg(self, data):
        res = []
        if data is None:
            return res
        for item in data:
            if item['type'] == 'GroupMessage':
                type = item['messageChain'][-1]['type']
                if type == 'Image':
                    text = item['messageChain'][-1]['url']
                elif type == 'Plain':
                    text = item['messageChain'][-1]['text']
                elif type == 'Face':
                    text = item['messageChain'][-1]['faceId']
                else:
                    logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
                    continue
                name = item['sender']['memberName']
                group_id = str(item['sender']['group']['id'])
                group_name = item['sender']['group']['name']
                res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
        return res

    def getMessageCount(self, session):
        url = self.addr + 'countMessage?sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return 0

    def peekMessage(self, session):
        url = self.addr + 'peekMessage?sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return 0

    def sendPlainTextToGroup(self, session, group, msg):
        msg_list = msg.split(r'\n')
        msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
        data = {
          "sessionKey": session,
          "group": group,
          "messageChain": msg_chain
        }
        url = self.addr + 'sendGroupMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

    def sendMsgToGroup(self, session, group, msg):
        text = msg['text']
        type = msg['type']
        name = msg['name']
        group_id = msg['groupId']
        group_name = msg['groupName']
        content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
            name, group_id, group_name, text)
        content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
            name, group_id, group_name)
        logger.DebugLog(">> 消息类型:" + type)
        if type == 'Plain':
            message = [{"type": type, "text": content1}]
        elif type == 'Image':
            message = [
                {"type": 'Plain', "text": content2},
                {"type": type, "url": text}]
        elif type == 'Face':
            message = [{"type": 'Plain', "text": content2},
                       {"type": type, "faceId": text}]
        else:
            logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
            return 0
        data = {
                "sessionKey": session,
                "group": group,
                "messageChain": message
                }
        logger.DebugLog(">> 消息内容:" + str(data))
        url = self.addr + 'sendGroupMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

    def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
        # 对每条消息进行检查
        for msg in msg_data:
            group_id = msg['groupId']
            # 接收的消息群正确(目前只支持 消息类型)
            if group_id in receive_groups:
                # 依次将消息转发到目标群
                for g in send_groups:
                    logger.DebugLog(">> 当前群:"+g)
                    if g == group_id:
                        logger.DebugLog(">> 跳过此群")
                        continue
                    res = self.sendMsgToGroup(session, g, msg)
                    if res != 0:
                        logger.TraceLog(">> 转发成功!{}".format(g))

    def sendFriendMessage(self, session, qq, msg):
        msg_list = msg.split(r'\n')
        msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]

        data = {
          "sessionKey": session,
          "target": qq,
          "messageChain": msg_chain
        }
        url = self.addr + 'sendFriendMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 发送失败")
            return 0
        if res['code'] == 0:
            return res['messageId']
        return 0


    def analyzeFriendMsg(self, data):
        if data is None or data['type'] != 'FriendMessage':
            return None, None, None
        sender_id = data['sender']['id']
        msg_type = data['messageChain'][-1]['type']
        if msg_type == 'Plain':
            msg_text = data['messageChain'][-1]['text']
        elif msg_type == 'Image':
            msg_text = data['messageChain'][-1]['url']
        else:
            msg_text = ''
        return sender_id, msg_type, msg_text




logger = Logger()
bot = QQBot()
app = Flask(__name__)

def qqTransfer():
    with open('conf.json', 'r+', encoding="utf-8") as f:
        content = f.read()
    conf = json.loads(content)

    auth_key = conf['auth_key']
    bind_qq = conf['bind_qq']
    sleep_time = conf['sleep_time']
    debug_level = conf['debug_level']

    receive_groups = conf['receive_groups']
    send_groups = conf['send_groups']

    logger.setDebugLevel(debug_level)

    session = bot.verifySession(auth_key)
    logger.DebugLog(">> session: "+session)
    bot.bindSession(session, bind_qq)
    while True:
        cnt = bot.getMessageCount(session)
        if cnt:
            logger.DebugLog('>> 有消息了 => {}'.format(cnt))
            logger.DebugLog('获取消息内容')
            data = bot.fetchLatestMessage(session)
            if len(data) == 0:
                logger.DebugLog('消息为空')
                continue
            logger.DebugLog(data)
            logger.DebugLog('解析消息内容')
            data = bot.parseGroupMsg(data)
            logger.DebugLog(data)
            logger.DebugLog('转发消息内容')
            bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
        # else:
        #     logger.DebugLog('空闲')
        sleep(sleep_time)
    bot.releaseSession(session, bind_qq)





class StatusStore:
    def __init__(self, from_qq:int=None, is_cmd:bool=False, func_name:str=None, need_second:bool=False, msg:str=None) -> None:
        self.from_qq = from_qq          # 发送者的QQ号
        self.is_cmd = is_cmd            # 是否是指令(选择功能)
        self.func_name = func_name      # 选择的功能的名称
        self.need_second = need_second  # 是否需要经过两步:先发cmd指令,再发详细内容
        self.msg = msg                  # 本次发送的消息内容
    
    def detail(self):
        return self.__dict__

class MultiFunction:
    """多功能函数集合"""
    def __init__(self) -> None:
        pass

    @staticmethod
    def translate(original:str, convert:str='en'):
        try:
            cred = credential.Credential(tencent_secret_id, tencent_secret_key)
            client = tmt_client.TmtClient(cred, "ap-guangzhou")
            req = models.TextTranslateRequest()
            params = {
                "SourceText": original,
                "Source": "auto",
                "Target": convert,
                "ProjectId": tencent_translate_AppId
            }
            req.from_json_string(json.dumps(params))
            resp = client.TextTranslate(req)
            # print(resp.to_json_string())
            return resp.TargetText
        except TencentCloudSDKException as err:
            print(err)
        return ''
    
    @staticmethod
    def uploadImage(image_path:str) -> str:
        token = None              
        scheme = 'https'          
        config = CosConfig(Region=tencent_cos_region, SecretId=tencent_secret_id, SecretKey=tencent_secret_key, Token=token, Scheme=scheme)
        client = CosS3Client(config)
        # 本地文件形式上传
        # response = client.upload_file(
        #     Bucket=bucket_id,
        #     LocalFilePath=image_path,
        #     Key=image_path.split(os.sep)[-1],
        #     PartSize=1,
        #     MAXThread=10,
        #     EnableMD5=False
        # )

        # 网络文件形式上传
        file_keyname = image_path.split('/')[-2] + '.jpg'
        stream = requests.get(image_path)
        response = client.put_object(
            Bucket=tencent_cos_bucket_id,
            Body=stream,
            Key=file_keyname
        )
        print(response['ETag'])
        img_url = 'https://{}.cos.{}.myqcloud.com/{}'.format(tencent_cos_bucket_id, tencent_cos_region, file_keyname)
        return '上传成功, ETag: {}\nURL: {}'.format(response['ETag'], img_url)

    @staticmethod
    def weather(city_name:str='广州') -> str:
        url_api_weather = 'https://devapi.qweather.com/v7/weather/now?'
        url_api_geo = 'https://geoapi.qweather.com/v2/city/lookup?'
        
        # 实况天气
        def getCityId(city_kw):
            url_v2 = url_api_geo + 'location=' + city_kw + '&key=' + weather_key
            city = requests.get(url_v2).json()['location'][0]
            return city['id']

        city_id = getCityId(city_name)
        url = url_api_weather + 'location=' + city_id + '&key=' + weather_key
        res = requests.get(url).json()
        text = "<天气信息获取失败>"
        print(res)
        if res['code'] == '200' or res['code'] == 200:
            text = '实时天气:\n 亲爱的 小主, 您所在的地区为 {},\n 现在的天气是 {},\n 气温 {}°, 湿度 {}%,\n 体感气温为 {}°,\n 风向 {}, 风速 {}km/h'.format(
                city_name, res['now']['text'], res['now']['temp'], res['now']['humidity'], res['now']['feelsLike'], res['now']['windDir'], res['now']['windSpeed']) 
        return text 
    
    @staticmethod
    def hotNews(status_store:StatusStore) -> str:
        def common(data):
            url = data + '?key={}'.format(tianxing_key)
            res = requests.get(url).json()
            return res['newslist']
        res = common('http://api.tianapi.com/topnews/index')
        tops = []
        index = 1
        for item in res:
            tops.append(str(index) + '. ' + item['title'])
            index += 1
        return '\n'.join(tops[0:10])






# 多功能函数的映射
# function: 功能对应函数名
# need_second: 是否需要经过两步:先发cmd指令,再发详细内容
# desc: 需要经过两步时,第一次返回的提示语
function_map = {
    '翻译': {'function': MultiFunction.translate, 'need_second': True, 'desc': '请输入您要翻译的内容~'}, 
    '天气': {'function': MultiFunction.weather, 'need_second': True, 'desc': '请问是哪座城市的天气呢?'}, 
    '热搜': {'function': MultiFunction.hotNews, 'need_second': False},
    '上传图片': {'function': MultiFunction.uploadImage, 'need_second': True, 'desc': '请发送图片过来吧~'},
}
def choiceFunction(store_obj:StatusStore):
    res = ''
    if function_map.get(store_obj.func_name):
        res = function_map.get(store_obj.func_name)['function'](store_obj.msg)
    return res 






def xiaofengzai():
    sleep_time = 1              # 轮询间隔
    status_store = {}

    session = bot.verifySession(auth_key)
    logger.DebugLog(">> session: "+session)
    bot.bindSession(session, bind_qq)
    while True:
        cnt = bot.getMessageCount(session)
        if not cnt:
            sleep(sleep_time)
            continue
        logger.DebugLog('>> 有消息了 => {}'.format(cnt))
        logger.DebugLog('获取消息内容')
        data = bot.fetchLatestMessage(session)
        if len(data) == 0:
            logger.DebugLog('消息为空')
            sleep(sleep_time)
            continue
        logger.DebugLog(data)
        logger.DebugLog('解析消息内容')

        sender_id, msg_type, msg_text = bot.analyzeFriendMsg(data[0])
        if not sender_id or sender_id != target_qq:
            sleep(sleep_time)
            continue

        if msg_text.strip() == '功能清单':
            res = '目前支持的关键词有:\n' + '\n'.join(function_map.keys())
        elif msg_text.strip().lower().startswith('cmd'):
            _, func_name = msg_text.strip().split('\n')[0].split()
            func_name = func_name.strip()
            store_obj = StatusStore(from_qq=sender_id, is_cmd=True, func_name=func_name)
            # 不需要发两次,直接调用函数返回结果即可
            func_info = function_map.get(func_name)
            if not func_info:
                res = '指令[{}]暂不支持'.format(func_name)
            elif func_info.get('need_second'):
                res = '收到你的指令:{}\n{}'.format(func_name, func_info.get('desc') or '已进入对应状态, 请继续发送详细内容')
                # 添加或更新记录
                status_store[sender_id] = store_obj
            else:
                res = '请求结果为:\n' + str(choiceFunction(store_obj))
                status_store.pop(sender_id, '')
        else:
            res = '请先发送指令哦...'
            store_obj = status_store.get(sender_id)
            if store_obj and store_obj.is_cmd:
                store_obj.msg = msg_text
                res = '请求结果为:\n' + str(choiceFunction(store_obj))
                status_store.pop(sender_id, '')
        
        bot.sendFriendMessage(session, qq=sender_id, msg=res)

        


@app.route('/QQ/send', methods=['GET'])
def qqListenMsg():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendFriendMessage(bot.session, qq, msg)
    return 'Hello World!'

@app.route('/QQ/send/friend', methods=['GET'])
def qqListenMsgToFriend():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendFriendMessage(bot.session, qq, msg)
    return 'Hello World! Friend!'

@app.route('/QQ/send/group', methods=['GET'])
def qqListenMsgToGroup():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendPlainTextToGroup(bot.session, qq, msg)
    return 'Hello World! Group!'


if __name__ == '__main__':
    t = threading.Thread(target=xiaofengzai)
    t.setDaemon(True)
    t.start()

    # t = threading.Thread(target=qqTransfer)
    # t.setDaemon(True)
    # t.start()

    app.run(port='9966', host='0.0.0.0')
0

评论 (1)

取消
  1. 头像
    songxf 作者
    Windows 10 · Google Chrome

    QQ群 ID:722072237

    回复