前言

最近在参与一个可穿戴设备的项目,本人负责前后端的开发工作

收到了一个需求,项目要模拟以下场景:共 30 个设备,每个设备以 100HZ 的频率发送数据,要求将接受到的数据读入数据库,并且不能有太大延迟(这应该是 3kQPS 吧,我查到的资料里有说上千 QPS 就算高并发,也有说不算的,那我就记作中高并发好了)

模拟发送的程序已经有前辈写好了,用 c 实现了一个,开 30 个线程,每个线程以 100HZ 发布数据

数据的收发使用的是 MQTT 协议,broker 用了 mosquitto

失败的尝试

先用 paho-mqtt 搓了一个简单的订阅,直接将消息数据写入数据库,下面是主要逻辑,敏感数据已经隐去

# peewee_model 中引入了 peewee 并定义了数据表模型
from peewee_model import *
from paho.mqtt import client as mqtt_client
from random import randint
import json


def connect_to_broker(broker: str, client_id: str, port=1883, username=None, password=None):
    def on_connect(client, userdata, flags, return_code):
        if return_code == 0:
            print("connection success")
        else:
            print("connection fail, rc: %d" % return_code)
    client = mqtt_client.Client(client_id, protocol=mqtt_client.MQTTv311)
    if username and password:
        client.username_pw_set(username=username, password=password)
    client.on_connect = on_connect
    client.connect(broker, port, 60)
    return client


def subscribe(client: mqtt_client, topic: str) -> int:
    def on_message(client, userdata, msg):
        global num
        num += 1
        data = json.loads(msg.payload)
        # 这里直接将接收到的数据写入数据库
        my_data = MyData(
            id = data['id'],
            a**x = data['a**x'],
            ...
        )
        my_data.save()
        print(num)
    try:
        client.subscribe(topic)
        client.on_message = on_message_1
        print(f"subscribe it: {topic}")
        return 1
    except:
        print("subscribe fail")
        return 0

num = 0
datas = []
client = connect_to_broker(
    "192.168.***.***",
    f'python-{randint(0, 10**5)}',
    username="*****",
    password="*****"
    )
subscribe(client, "data/time")
client.loop_forever()

结果是延迟巨大,发送 40000 条数据的时候,接收端才记录了 3000 条数据,并且由于使用的是 Qos0,丢数据的情况很严重

是哪个地方造成了这么巨大的消耗呢,经过测试,仅订阅数据订阅数据+json解析 都不会造成明显延迟,一旦加上数据库操作延迟就迅速上升

想到这种业务是属于==IO 密集型==的,能否用多线程来优化呢,于是将写入操作用多线程实现:

from queue import Queue
import time

myqueue = Queue()

def db_write():
    while True:
        data = myqueue.get()
        # 这里的 pool 是使用 peewee 定义的连接池
        with pool.connection_context():
            my_data = MyData(
                id = data['id'],
                a**x = data['a**x'],
                ...
            )
            my_data.save()
        time.sleep(0.01)

for i in range(10):
    t = threading.Thread(target=db_write, name=f"db_write_{i}")
    t.start()
    print(f"create db_write_{i}")

主线程则作为生产者线程,将接收到的数据直接扔进任务队列中

def on_message(client, userdata, msg):
    data = json.loads(msg.payload)
    myqueue.put(data)

跑起来之后发现,虽然有所改进,但是延迟依然无法接受,在发送 40000 条数据的时候,接收端记录了 6000 条数据

使用 多线程+批量提交 成功优化

在查到的资料中我发现,很多优化的方案都会涉及到 减少 commit 次数 的做法。目前每条数据都会触发一次 commit,由于提交事务的消耗,导致延迟一直居高不下。所以我打算尝试使用批量提交的方式来提速

查到 peewee 的 insert_many 函数能够进行批量提交,于是将消费者线程的逻辑改为:

def db_write():
    while True:
        data = myqueue.get()
        with pool.connection_context():
            sensor_data = SensorData.insert_many(data).execute()
        time.sleep(0.01)

生产者线程则按照每10条数据一组进行打包,放入任务队列

def processing_data(data):
    data = json.loads(data)
    return {
        "id": data['id'],
        "a**x": data['a**x'],
        ...
    }
def on_message(client, userdata, msg):
    global num, datas
    num += 1
    datas.append(processing_data(msg.payload))
    if num % 10 == 0:
        myqueue.put(datas)
        datas = []

在开启 10 条消费者线程、线程池最大数量为 8 的情况下,发送的数据数和写入数据库的数量几乎同步

感觉重点在于批量提交,多线程算是锦上添花,不过单线程+批量提交的代码没有测试过,不能下定论

学习笔记

借着此次记录把笔记整理一下

peewee | 开源 ORM 框架

peewee 中文文档 peewee github地址

安装

pip install peewee

连接数据库

使用原始 API 创建单个连接

这里只展示 Sqlite 和 MySQL 的连接方式,其余数据库同理

from peewee import *

sqlite_db = SqliteDatabase('database.db')
mysql_db = MySQLDatabase('my_database')

sqlite_db.connect()
mysql_db.connect()

使用扩展支持连接池

这里只展示 MySQL 的连接池创建方式,其余数据库同理

from peewee import *
from playhouse.pool import PooledMySQLDatabas

db = PooledMySQLDatabase(
    max_connections=8,
    stale_timeout=300,
    timeout=0,
    host="127.0.0.1",
    port="3306",
    user="user",
    password="password",
    database="mydatabase",
)

补充:playhouse 是 peewee 自带的用于放置扩展的命名空间,不是什么依赖项

img

所以说如果你的项目中引入了 peewee,在构造 requirement.txt 的时候不要写 playhouse。做项目的时候就有成员非要我在 requirement.txt 里声明对 playhouse 的依赖,让我流汗了

使用扩展支持数据库URL

from peewee import *
from playhouse.db_url import connect

sqlite_db = connect('sqlite:///my_database.db')
mysql_db = connect('mysql://user:passwd@ip:port/my_db')

表模型定义

class BaseModel(Model):
    class Meta:
        database = db

class Table1(BaseModel):
    id = IntegerField(primary_key=True)
    age = IntegerField(null=True)
    name = CharField(max_length=255, null=True)
    isvip = BooleanField(null=True)
    startTime = DateTimeField(null=True)
    class Meta:
        table_name = 'table1'

class Table2(BaseModel):
    id = AutoField(primary_key=True)
    t1_id = ForeignKeyField(
        Table1,
        Table1.id,
        db_column="t1_id",
        constraint_name="t1_id"
        )
    a = CharField(max_length=5, null=True)
    b = FloatField(null=True)
    Timestamp = DateTimeField(null=True)
    class Meta:
        table_name = 'table2'

当不指定 primary_key 的时候,peewee 会自动创建一个 id 字段作为主键

上面这段代码涵盖了几种常用的字段类型

  • 整型 IntegerField
  • 文本型 CharField
  • 日期型 DateTimeField
  • 外键 ForeignKeyField
  • 浮点型 FloatField
  • 布尔型 BooleanField

一些巧思:自动生成关联表名和表模型的哈希map

使用 __init_subclass__ 方法使继承 BaseModel 的模型类自动记录自身

models : list = []
"""存放了所有 Peewee Model 的列表"""
class BaseModel(Model):
    global models
    def __init_subclass__(cls) -> None:
        models.append(cls)
        return super().__init_subclass__()
    class Meta:
        database = pool

class MyTable(BaseModel):
    id = CharField(max_length=20, primary_key=True)
    class Meta:
        table_name = 'test_table'

tables : dict = {}
for model in models:
    tables[model._meta.table_name] = model

print(tables)

常用数据库操作

创建表、删除表

# 创建表
db.create_tables([Table1, Table2, ...])
# 删除表
db.drop_tables([Table1, Table2, ...])

插入数据、更新数据

# 插入数据
# 如果不指定主键的话, 下面这句话会创建一行新数据, 并自动赋予主键
s1 = Table1(column1='value1', column2=123, column3=True)
s1.save()
# 使用 insert 方法强制插入数据
Table1.insert(id=10, column1='value1', column2=123, column3=True).execute()
# 使用 xxx 批量插入数据


# 更新数据
# 如果指定了主键, 下面这句话会更新指定行的数据
# 如果指定主键不存在, 则不会更新任何数据
s2 = Table1(id=10, column1='value1', column2=123, column3=True)
s2.save()

查询数据

data = (WatchData
        .select(WatchData.ID, WatchID.Department)
        .where(WatchData.ID == id)
        .order_by(WatchData.Timestamp.desc())
        .limit(100))
data.get()

# 使用 model_to_dict 来将查询结果转换为字典形式

注意在涉及外键的查询中,model_to_dict 的输出和等效的纯 SQL 查询的输出稍有不同(在做项目的时候踩坑了)

连接池上下文

使用连接池对象的 connection_context 方法创建上下文

with pool.connection_context():
    # 在这里进行数据库操作

使用SQL内置函数

需要使用 peewee 自带的 fn 对象


多线程

已经写过笔记了,就不再重复记录,地址: Python并行开发学习笔记