分类 Python 下的文章

非UI版,直接弹出选择界面

import tkinter as tk
from tkinter import filedialog, StringVar


def check_filename():
    root = tk.Tk()
    root.withdraw()
    filename = filedialog.askopenfilename()
    return filename.split('/')[-1] if len(filename) else ''

print(check_filename())

UI版,通过按钮点击

import tkinter as tk
from tkinter import filedialog, StringVar

def check_filename_ui():
    root = tk.Tk()
    root.title('请选择文件')
    path = StringVar()
    filename = ['']

    def check():
        temp = filedialog.askopenfilename()
        filename[0] = temp.split('/')[-1] if len(temp) else ''
        path.set(filename[0])

    tk.Label(root, text="目标路径:").pack()
    tk.Entry(root, textvariable=path).pack(fill=tk.BOTH, expand=tk.YES)
    tk.Button(root, text="选择文件", command=check).pack()
    tk.Button(text="确认并关闭", command=root.destroy).pack()
    root.mainloop()
    return filename[0]

print(check_filename_ui())

可视化拖拽布局助手

http://xfxuezhang.cn/web/tkinter-helper/


简介

通过小黄鸟抓包Fa米家App,发现没有什么校验,用模拟请求直接可以重发。。。

  • 账号验证是通过token字段;
  • 设备标识用deviceId;
  • 等等...
    基本上必须的几个参数就:

    {
      "blackbox": "tdfpeyxxxx",
      "device_id": "2f35xxxx",
      "fmversion": "3.0.2",
      "os": "android",
      "token": "eyneWxxxx",
      "useragent": "okhttp/4.7.2"
    }

代码

这就好办了,可以做一个自动Fa米粒签到(可以换商品),或者其他好玩的功能。
提供几个粗糙的函数:

import requests

class Fmapp:
    def __init__(self) -> None:
        self.base_headers = {
            'Host': 'fmapp.chinafamilymart.com.cn',
            'blackBox': '',
            'token': '',
            'deviceId': '',
            'User-Agent': 'okhttp/4.7.2',
            'Content-Type': 'application/json',
            'loginChannel': 'app',
            'channel': '333',
            'fmVersion': '3.0.2',
            'os': 'android',
        }
 
    def check_in(self):
        '''
        签到
        '''
        url = 'https://fmapp.chinafamilymart.com.cn/api/app/market/member/signin/sign'
        headers = self.base_headers.copy()
        res = requests.post(url=url, headers=headers).json()
        print(res)
 
 
    def verify_code(self, mobile, distinctId):
        '''
        请求发送短信验证码
        '''
        url = 'https://fmapp.chinafamilymart.com.cn/api/app/member/verifyCode'
        headers = self.base_headers.copy()
        data = {
            "mobile": mobile,
            "firstSend": True,
            "distinctId": distinctId,
            "newVersion": True
        }
        res = requests.post(url=url, json=data, headers=headers).json()
        print(res)
        if res['code'] == '200':
            return res['data']
        return None
 
    def login(self, mobile, code, distinctId):
        '''
        短信验证码登录
        '''
        url = 'https://fmapp.chinafamilymart.com.cn/api/app/login'
        headers = self.base_headers.copy()
        data = {
            "mobile": mobile,
            "verifyCode": code,
            "openId": "",
            "openChannelCd": "1",
            "grantTypeCd": "1",
            "distinctId": distinctId,
            "newVersion": True,
            "unionId": "",
            "jpushId": "120c83f760da1764565"
        }
        res = requests.post(url=url, json=data, headers=headers).json()
        print(res)
        if res['res'] == '200':
            return res['data']['token']
        return None
 
 
    def member_info(self):
        '''
        获取用户详情
        '''
        url = 'https://fmapp.chinafamilymart.com.cn/api/app/member/info'
        headers = self.base_headers.copy()
        res = requests.post(url=url, headers=headers).json()
        print(res)
        if res['code'] == '200':
            return True
        return False
 
    def mili_detail(self):
        '''
        获取Fa米粒详情
        '''
        url = 'https://fmapp.chinafamilymart.com.cn/api/app/member/v2/mili/detail'
        headers = self.base_headers.copy()
        data = {"pageNo":1, "pageSize":10}
        res = requests.post(url=url, json=data, headers=headers).json()
        print(res)
        total = -1
        if res['code'] == '200':
            total = res['data']['total']
        return total
 
 
    def process(self):
        # 需要抓包补全
        blackBox = 'tdfp'
        # 需要抓包补全
        deviceId = '2f356'
        # 需要抓包补全
        distinctId = "cb19df02c32d2079"
        # 可用手机验证码登录获取,或者手动抓包补全
        token = 'eyJhbG'
        self.base_headers['token'] = token
        self.base_headers['blackBox'] = blackBox
        self.base_headers['deviceId'] = deviceId
 
        # # 手机号
        # mobile = ""
        # # 收到的验证码
        # code = self.verify_code(mobile, distinctId)
        # if not code:
        #     return
        # token = self.login(mobile, code, distinctId)
        # if not token:
        #     return
        # self.base_headers['token'] = token
         
        if self.member_info():
            self.check_in()
            self.mili_detail()

Fmapp().process()

效果

签到:

个人资料:

米粒信息:

没有小黄鸟的童鞋看这里:https://sxf1024.lanzouv.com/iCbux09zoera
没有Fa米家的童鞋看这里:https://sxf1024.lanzouv.com/iCbKv09zoetc

福利

然后就可以在挂载服务器上每天自动运行了。
没有服务器的可以看看这个腾讯云的ECS,很便宜,一年只需65元,它不香吗?:
详情:https://curl.qcloud.com/fnG9lyjo


页面加载超时

{lamp/}

可使用:

driver.set_page_load_timeout()  # 设置页面加载超时
driver.set_script_timeout()  # 设置页面异步js执行超时

try:
    driver.get('https://baidu.com')
except Exception:
    driver.execute_script('window.stop()')

页面元素加载超时

# 隐式等待(implicit)
# 设置全局的查找页面元素的等待时间,在这个时间内没找到指定元素则抛出异常,只需设置一次
driver.implicitly_wait(time)

# 显示等待(explicit)
# 使用频率最高的获取页面元素超时设置,其原理是通过设置一个最大时间和一个周期时间,按照周期时间来检测是否出现等待元素,直到达到了最大等待时间。
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.by import By
WebDriverWait(driver, 3).until(EC.presence_of_element_located((By.ID, 'wrapper')))

常用显示等待条件:

WebDriverWait(driver,10).until(EC.title_is(u"百度一下,你就知道"))
# 判断title,返回布尔值

WebDriverWait(driver,10).until(EC.title_contains(u"百度一下"))
# 判断title,返回布尔值

WebDriverWait(driver,10).until(EC.presence_of_element_located((By.ID,'kw')))
# 判断某个元素是否被加到了dom树里,并不代表该元素一定可见,如果定位到就返回WebElement

WebDriverWait(driver,10).until(EC.visibility_of_element_located((By.ID,'su')))
# 判断某个元素是否被添加到了dom里并且可见,可见代表元素可显示且宽和高都大于0

WebDriverWait(driver,10).until(EC.visibility_of(driver.find_element(by=By.ID,value='kw')))
# 判断元素是否可见,如果可见就返回这个元素

WebDriverWait(driver,10).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR,'.mnav')))
# 判断是否至少有1个元素存在于dom树中,如果定位到就返回列表

WebDriverWait(driver,10).until(EC.visibility_of_any_elements_located((By.CSS_SELECTOR,'.mnav')))
# 判断是否至少有一个元素在页面中可见,如果定位到就返回列表

WebDriverWait(driver,10).until(EC.text_to_be_present_in_element((By.XPATH,"//*[@id='u1']/a[8]"),u'设置'))
# 判断指定的元素中是否包含了预期的字符串,返回布尔值

WebDriverWait(driver,10).until(EC.text_to_be_present_in_element_value((By.CSS_SELECTOR,'#su'),u'百度一下'))
# 判断指定元素的属性值中是否包含了预期的字符串,返回布尔值

#WebDriverWait(driver,10).until(EC.frame_to_be_available_and_switch_to_it(locator))
#  判断该frame是否可以switch进去,如果可以的话,返回True并且switch进去,否则返回False注意这里并没有一个frame可以切换进去

WebDriverWait(driver,10).until(EC.invisibility_of_element_located((By.CSS_SELECTOR,'#swfEveryCookieWrap')))
# 判断某个元素在是否存在于dom或不可见,如果可见返回False,不可见返回这个元素注意#swfEveryCookieWrap在此页面中是一个隐藏的元素

WebDriverWait(driver,10).until(EC.element_to_be_clickable((By.XPATH,"//*[@id='u1']/a[8]"))).click()
# 判断某个元素中是否可见并且是enable的,代表可点击
driver.find_element_by_xpath("//*[@id='wrapper']/div[6]/a[1]").click()
WebDriverWait(driver,10).until(EC.element_to_be_clickable((By.XPATH,"//*[@id='wrapper']/div[6]/a[1]"))).click()

WebDriverWait(driver,10).until(EC.staleness_of(driver.find_element(By.ID,'su')))
# 等待某个元素从dom树中移除

WebDriverWait(driver,10).until(EC.element_to_be_selected(driver.find_element(By.XPATH,"//*[@id='nr']/option[1]")))
# 判断某个元素是否被选中了,一般用在下拉列表

WebDriverWait(driver,10).until(EC.element_selection_state_to_be(driver.find_element(By.XPATH,"//*[@id='nr']/option[1]"),True))
# 判断某个元素的选中状态是否符合预期

WebDriverWait(driver,10).until(EC.element_located_selection_state_to_be((By.XPATH,"//*[@id='nr']/option[1]"),True))
# 判断某个元素的选中状态是否符合预期
driver.find_element_by_xpath(".//*[@id='gxszButton']/a[1]").click()

instance = WebDriverWait(driver,10).until(EC.alert_is_present())
# 判断页面上是否存在alert,如果有就切换到alert并返回alert的内容
instance.accept()
# 关闭弹窗

进一步非常详细的教程,篇幅较长,可收藏后慢慢看:

万字长文保姆级教你制作自己的多功能QQ机器人(http://xfxuezhang.cn/index.php/archives/418/)



(以下正文开始)

安装与使用 - 详细内容来这里:
https://xfxuezhang.blog.csdn.net/article/details/122793393


示例代码在这里:
文件布局

示例代码的配置文件,方便修改
conf.json

{
  "auth_key": "12345",
  "bind_qq":  "你的qq",
  "sleep_time": 1,
  "receive_groups": ["q群号1", "q群号2"],
  "send_groups": ["q群号1", "q群号2"],
  "debug_level": "debug"
}

示例代码真正内容
demo.py

import json
import os

import requests
from flask import Flask, request
from time import sleep
import threading

class Logger:
    def __init__(self, level='debug'):
        self.level = level

    def DebugLog(self, *args):
        if self.level == 'debug':
            print(*args)

    def TraceLog(self, *args):
        if self.level == 'trace':
            print(*args)

    def setDebugLevel(self, level):
        self.level = level.lower()


class QQBot:
    def __init__(self):
        self.addr = 'http://127.0.0.1:8888/'
        self.session = None

    def verifySession(self, auth_key):
        """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
        session Key在未进行校验的情况下,一定时间后将会被自动释放"""
        data = {"verifyKey": auth_key}
        url = self.addr+'verify'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return res['session']
        return None

    def bindSession(self, session, qq):
        """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
        data = {"sessionKey": session, "qq": qq}
        url = self.addr + 'bind'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            self.session = session
            return True
        return False

    def releaseSession(self, session, qq):
        """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
        否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
        data = {"sessionKey": session, "qq": qq}
        url = self.addr + 'release'
        res = requests.post(url, data=json.dumps(data)).json()
        logger.DebugLog(res)
        if res['code'] == 0:
            return True
        return False

    def getMsgFromGroup(self, session):
        url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return None

    def parseGroupMsg(self, data):
        res = []
        if data is None:
            return res
        for item in data:
            if item['type'] == 'GroupMessage':
                type = item['messageChain'][-1]['type']
                if type == 'Image':
                    text = item['messageChain'][-1]['url']
                elif type == 'Plain':
                    text = item['messageChain'][-1]['text']
                elif type == 'Face':
                    text = item['messageChain'][-1]['faceId']
                else:
                    logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
                    continue
                name = item['sender']['memberName']
                group_id = str(item['sender']['group']['id'])
                group_name = item['sender']['group']['name']
                res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
        return res

    def getMessageCount(self, session):
        url = self.addr + 'countMessage?sessionKey='+session
        res = requests.get(url).json()
        if res['code'] == 0:
            return res['data']
        return 0

    def sendMsgToGroup(self, session, group, msg):
        text = msg['text']
        type = msg['type']
        name = msg['name']
        group_id = msg['groupId']
        group_name = msg['groupName']
        content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
            name, group_id, group_name, text)
        content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
            name, group_id, group_name)
        logger.DebugLog(">> 消息类型:" + type)
        if type == 'Plain':
            message = [{"type": type, "text": content1}]
        elif type == 'Image':
            message = [
                {"type": 'Plain', "text": content2},
                {"type": type, "url": text}]
        elif type == 'Face':
            message = [{"type": 'Plain', "text": content2},
                       {"type": type, "faceId": text}]
        else:
            logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
            return 0
        data = {
                "sessionKey": session,
                "group": group,
                "messageChain": message
                }
        logger.DebugLog(">> 消息内容:" + str(data))
        url = self.addr + 'sendGroupMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 转发失败")
            return 0
        logger.DebugLog(">> 请求返回:" + str(res))
        if res['code'] == 0:
            return res['messageId']
        return 0

    def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
        # 对每条消息进行检查
        for msg in msg_data:
            group_id = msg['groupId']
            # 接收的消息群正确(目前只支持 消息类型)
            if group_id in receive_groups:
                # 依次将消息转发到目标群
                for g in send_groups:
                    logger.DebugLog(">> 当前群:"+g)
                    if g == group_id:
                        logger.DebugLog(">> 跳过此群")
                        continue
                    res = self.sendMsgToGroup(session, g, msg)
                    if res != 0:
                        logger.TraceLog(">> 转发成功!{}".format(g))

    def sendFriendMessage(self, session, qq, msg):
        data = {
          "sessionKey": session,
          "target": qq,
          "messageChain": [
            { "type": "Plain", "text": msg },
          ]
        }
        url = self.addr + 'sendFriendMessage'
        try:
            res = requests.post(url, data=json.dumps(data)).json()
        except:
            logger.DebugLog(">> 发送失败")
            return 0
        if res['code'] == 0:
            return res['messageId']
        return 0



logger = Logger()
bot = QQBot()
app = Flask(__name__)

def qqTransfer():
    with open('conf.json', 'r+', encoding="utf-8") as f:
        content = f.read()
    conf = json.loads(content)

    auth_key = conf['auth_key']
    bind_qq = conf['bind_qq']
    sleep_time = conf['sleep_time']
    debug_level = conf['debug_level']

    receive_groups = conf['receive_groups']
    send_groups = conf['send_groups']

    logger.setDebugLevel(debug_level)

    session = bot.verifySession(auth_key)
    logger.DebugLog(">> session: "+session)
    bot.bindSession(session, bind_qq)
    while True:
        cnt = bot.getMessageCount(session)
        if cnt:
            logger.DebugLog('>> 有消息了 => {}'.format(cnt))
            logger.DebugLog('获取消息内容')
            data = bot.getMsgFromGroup(session)
            if len(data) == 0:
                logger.DebugLog('消息为空')
                continue
            logger.DebugLog(data)
            logger.DebugLog('解析消息内容')
            data = bot.parseGroupMsg(data)
            logger.DebugLog(data)
            logger.DebugLog('转发消息内容')
            bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
        # else:
        #     logger.DebugLog('空闲')
        sleep(sleep_time)
    bot.releaseSession(session, bind_qq)


@app.route('/QQ/send', methods=['GET'])
def qqListenMsg():
    # 类似于Qmsg的功能
    # flask做得接收HTTP请求转为QQ消息
    qq = request.args.get('target', None)
    msg = request.args.get('msg', None)
    bot.sendFriendMessage(bot.session, qq, msg)
    return 'Hello World!'

if __name__ == '__main__':
    t = threading.Thread(target=qqTransfer)
    t.setDaemon(True)
    t.start()

    app.run(port='8080', host='0.0.0.0')

这个是插件的setting.yaml

## 配置文件中的值,全为默认值

## 启用的 adapter, 内置有 http, ws, reverse-ws, webhook
adapters:
  - http
  - ws

## 是否开启认证流程, 若为 true 则建立连接时需要验证 verifyKey
## 建议公网连接时开启
enableVerify: true
verifyKey: 12345

## 开启一些调式信息
debug: false

## 是否开启单 session 模式, 若为 true,则自动创建 session 绑定 console 中登录的 bot
## 开启后,接口中任何 sessionKey 不需要传递参数
## 若 console 中有多个 bot 登录,则行为未定义
## 确保 console 中只有一个 bot 登陆时启用
singleMode: false

## 历史消息的缓存大小
## 同时,也是 http adapter 的消息队列容量
cacheSize: 4096

## adapter 的单独配置,键名与 adapters 项配置相同
adapterSettings:
  ## 详情看 http adapter 使用说明 配置
  http:
    host: localhost
    port: 8888
    cors: ["*"]
  
  ## 详情看 websocket adapter 使用说明 配置
  ws:
    host: localhost
    port: 8888
    reservedSyncId: -1

gitee:
https://gitee.com/songxf1024/qqmsg-transfer

API文档可参考:
https://docs.mirai.mamoe.net/mirai-api-http/adapter/HttpAdapter.html


直接在缓存中互读

import io
import numpy as np
import PIL
import matplotlib.pyplot as plt
 
#... fig = ...
#假设你有了一个绘图结果fig
 
#申请缓存
buffer_ = io.BytesIO()
fig.savefig(buffer_, format = "png")
buffer_.seek(0)
image = PIL.Image.open(buffer_)
#转换为numpy array
ar = np.asarray(image)
cv2.imshow("demo", ar)
#释放缓存
buffer_.close()

来源:https://blog.csdn.net/ngy321/article/details/80109088

OpenCV添加文字

cv2.putText(img, text, (40, 50), cv2.FONT_HERSHEY_PLAIN, 2.0, (0, 0, 255), 2)

格式互转

opencv默认BGR顺序,matplotlib和PIL默认RGB顺序。

# Image转cv2
cv2_img = cv2.cvtColor(numpy.asarray(Img_img),cv2.COLOR_RGB2BGR)

# cv2转Image
pil_img = Image.fromarray(cv2.cvtColor(cv_img,cv2.COLOR_BGR2RGB))

隐藏坐标轴文本刻度或刻度标签

# 隐藏包括轴标签的坐标轴
xaxis.set_visible(False)/yaxis.set_visible(False)
# 在 Matplotlib 中隐藏坐标轴
xaxis.set_ticks([])/yaxis.set_ticks([]) 
# 在 Matplotlib 中隐藏轴标签/文本
xaxis.set_ticklabels([])/yaxis.set_ticklabels([]) 
# 来隐藏 Matplotlib 中的坐标轴标签/文本
xticks(color='w')/yticks(color='w') 

来源:https://www.delftstack.com/zh/howto/matplotlib/how-to-hide-axis-text-ticks-and-or-tick-labels-in-matplotlib/

关闭1个fig

plt.close(fig)

保存时去除白边

# pad_inches=0:去除所有白边
# bbox_inches='tight':去除坐标轴占用的空间
plt.savefig(path, format='png', pad_inches=0, bbox_inches='tight')

调整图片间隙

fig.tight_layout()  # 调整整体空白
plt.subplots_adjust(wspace=0, hspace=0)  # 调整子图间距
plt.subplot_tool() # 提供了一种交互式方法来拖动 subplot_tool 中的条以更改子图的布局

取消边框

axes[i][j].spines['top'].set_visible(False)
axes[i][j].spines['right'].set_visible(False)
axes[i][j].spines['bottom'].set_visible(False)
axes[i][j].spines['left'].set_visible(False)
````

## 生成空白图

使用Numpy创建一张A4(2105×1487)纸

格式:h,w,c

img = np.zeros((2105,1487,3), np.uint8)

使用白色填充图片区域,默认为黑色

img.fill(255)


## 显示中文

plt.rcParams["font.sans-serif"]=["SimHei"] #设置字体
plt.rcParams["axes.unicode_minus"]=False #该语句解决图像中的“-”负号的乱码问题


## 显示双坐标

fig, axes = plt.subplots(nrows=3, ncols=3)

坐标1

axes0.set_title('aaa', fontsize=17)
axes0.set_xlim(0, len(all_algs)+1)
axes0.set_xticks(range(1, 10))
axes0.set_ylim(0, 105)
axes0.grid(True)

坐标2(子坐标)

ax2 = axes0.twinx()

具体用法跟上面一样

ax2.set_ylabel('Number')
ax2.set_ylim(0, 5)
ax2.bar(x, fine, color='b', alpha=0.4, width=0.4, label=alg)
ax2.legend(prop={'size': 11}, loc="upper right" , labelcolor='linecolor')


## 坐标显示中文

axes0.set_xticks([1, 2, 3], ['a', 'b', 'c'])


## 坐标轴倾斜

axes0.set_xticklabels(all_algs, rotation=15)


## 坐标轴显示分式
latex语法即可

axes.set_xlabel(r'aaa ($\frac{x}{y}$)', fontsize=16)