前言
最近在参与一个可穿戴设备的项目,本人负责前后端的开发工作
收到了一个需求,项目要模拟以下场景:共 30
个设备,每个设备以 100HZ
的频率发送数据,要求将接受到的数据读入数据库,并且不能有太大延迟(这应该是 3kQPS 吧,我查到的资料里有说上千 QPS 就算高并发,也有说不算的,那我就记作中高并发好了)
模拟发送的程序已经有前辈写好了,用 c 实现了一个,开 30 个线程,每个线程以 100HZ 发布数据
数据的收发使用的是 MQTT 协议,broker 用了 mosquitto
失败的尝试
先用 paho-mqtt
搓了一个简单的订阅,直接将消息数据写入数据库,下面是主要逻辑,敏感数据已经隐去
# peewee_model 中引入了 peewee 并定义了数据表模型
from peewee_model import *
from paho.mqtt import client as mqtt_client
from random import randint
import json
def connect_to_broker(broker: str, client_id: str, port=1883, username=None, password=None):
def on_connect(client, userdata, flags, return_code):
if return_code == 0:
print("connection success")
else:
print("connection fail, rc: %d" % return_code)
client = mqtt_client.Client(client_id, protocol=mqtt_client.MQTTv311)
if username and password:
client.username_pw_set(username=username, password=password)
client.on_connect = on_connect
client.connect(broker, port, 60)
return client
def subscribe(client: mqtt_client, topic: str) -> int:
def on_message(client, userdata, msg):
global num
num += 1
data = json.loads(msg.payload)
# 这里直接将接收到的数据写入数据库
my_data = MyData(
id = data['id'],
a**x = data['a**x'],
...
)
my_data.save()
print(num)
try:
client.subscribe(topic)
client.on_message = on_message_1
print(f"subscribe it: {topic}")
return 1
except:
print("subscribe fail")
return 0
num = 0
datas = []
client = connect_to_broker(
"192.168.***.***",
f'python-{randint(0, 10**5)}',
username="*****",
password="*****"
)
subscribe(client, "data/time")
client.loop_forever()
结果是延迟巨大,发送 40000 条数据的时候,接收端才记录了 3000 条数据,并且由于使用的是 Qos0
,丢数据的情况很严重
是哪个地方造成了这么巨大的消耗呢,经过测试,仅订阅数据 和 订阅数据+json解析 都不会造成明显延迟,一旦加上数据库操作延迟就迅速上升
想到这种业务是属于==IO 密集型==的,能否用多线程来优化呢,于是将写入操作用多线程实现:
from queue import Queue
import time
myqueue = Queue()
def db_write():
while True:
data = myqueue.get()
# 这里的 pool 是使用 peewee 定义的连接池
with pool.connection_context():
my_data = MyData(
id = data['id'],
a**x = data['a**x'],
...
)
my_data.save()
time.sleep(0.01)
for i in range(10):
t = threading.Thread(target=db_write, name=f"db_write_{i}")
t.start()
print(f"create db_write_{i}")
主线程则作为生产者线程,将接收到的数据直接扔进任务队列中
def on_message(client, userdata, msg):
data = json.loads(msg.payload)
myqueue.put(data)
跑起来之后发现,虽然有所改进,但是延迟依然无法接受,在发送 40000 条数据的时候,接收端记录了 6000 条数据
使用 多线程+批量提交 成功优化
在查到的资料中我发现,很多优化的方案都会涉及到 减少 commit 次数 的做法。目前每条数据都会触发一次 commit,由于提交事务的消耗,导致延迟一直居高不下。所以我打算尝试使用批量提交的方式来提速
查到 peewee 的 insert_many
函数能够进行批量提交,于是将消费者线程的逻辑改为:
def db_write():
while True:
data = myqueue.get()
with pool.connection_context():
sensor_data = SensorData.insert_many(data).execute()
time.sleep(0.01)
生产者线程则按照每10条数据一组进行打包,放入任务队列
def processing_data(data):
data = json.loads(data)
return {
"id": data['id'],
"a**x": data['a**x'],
...
}
def on_message(client, userdata, msg):
global num, datas
num += 1
datas.append(processing_data(msg.payload))
if num % 10 == 0:
myqueue.put(datas)
datas = []
在开启 10 条消费者线程、线程池最大数量为 8 的情况下,发送的数据数和写入数据库的数量几乎同步
感觉重点在于批量提交,多线程算是锦上添花,不过单线程+批量提交的代码没有测试过,不能下定论
学习笔记
借着此次记录把笔记整理一下
peewee | 开源 ORM 框架
安装
pip install peewee
连接数据库
使用原始 API 创建单个连接
这里只展示 Sqlite 和 MySQL 的连接方式,其余数据库同理
from peewee import *
sqlite_db = SqliteDatabase('database.db')
mysql_db = MySQLDatabase('my_database')
sqlite_db.connect()
mysql_db.connect()
使用扩展支持连接池
这里只展示 MySQL 的连接池创建方式,其余数据库同理
from peewee import *
from playhouse.pool import PooledMySQLDatabas
db = PooledMySQLDatabase(
max_connections=8,
stale_timeout=300,
timeout=0,
host="127.0.0.1",
port="3306",
user="user",
password="password",
database="mydatabase",
)
补充:playhouse
是 peewee 自带的用于放置扩展的命名空间,不是什么依赖项
所以说如果你的项目中引入了 peewee,在构造 requirement.txt
的时候不要写 playhouse。做项目的时候就有成员非要我在 requirement.txt
里声明对 playhouse 的依赖,让我流汗了
使用扩展支持数据库URL
from peewee import *
from playhouse.db_url import connect
sqlite_db = connect('sqlite:///my_database.db')
mysql_db = connect('mysql://user:passwd@ip:port/my_db')
表模型定义
class BaseModel(Model):
class Meta:
database = db
class Table1(BaseModel):
id = IntegerField(primary_key=True)
age = IntegerField(null=True)
name = CharField(max_length=255, null=True)
isvip = BooleanField(null=True)
startTime = DateTimeField(null=True)
class Meta:
table_name = 'table1'
class Table2(BaseModel):
id = AutoField(primary_key=True)
t1_id = ForeignKeyField(
Table1,
Table1.id,
db_column="t1_id",
constraint_name="t1_id"
)
a = CharField(max_length=5, null=True)
b = FloatField(null=True)
Timestamp = DateTimeField(null=True)
class Meta:
table_name = 'table2'
当不指定 primary_key
的时候,peewee 会自动创建一个 id 字段作为主键
上面这段代码涵盖了几种常用的字段类型
- 整型 IntegerField
- 文本型 CharField
- 日期型 DateTimeField
- 外键 ForeignKeyField
- 浮点型 FloatField
- 布尔型 BooleanField
一些巧思:自动生成关联表名和表模型的哈希map
使用 __init_subclass__
方法使继承 BaseModel
的模型类自动记录自身
models : list = []
"""存放了所有 Peewee Model 的列表"""
class BaseModel(Model):
global models
def __init_subclass__(cls) -> None:
models.append(cls)
return super().__init_subclass__()
class Meta:
database = pool
class MyTable(BaseModel):
id = CharField(max_length=20, primary_key=True)
class Meta:
table_name = 'test_table'
tables : dict = {}
for model in models:
tables[model._meta.table_name] = model
print(tables)
常用数据库操作
创建表、删除表
# 创建表
db.create_tables([Table1, Table2, ...])
# 删除表
db.drop_tables([Table1, Table2, ...])
插入数据、更新数据
# 插入数据
# 如果不指定主键的话, 下面这句话会创建一行新数据, 并自动赋予主键
s1 = Table1(column1='value1', column2=123, column3=True)
s1.save()
# 使用 insert 方法强制插入数据
Table1.insert(id=10, column1='value1', column2=123, column3=True).execute()
# 使用 xxx 批量插入数据
# 更新数据
# 如果指定了主键, 下面这句话会更新指定行的数据
# 如果指定主键不存在, 则不会更新任何数据
s2 = Table1(id=10, column1='value1', column2=123, column3=True)
s2.save()
查询数据
data = (WatchData
.select(WatchData.ID, WatchID.Department)
.where(WatchData.ID == id)
.order_by(WatchData.Timestamp.desc())
.limit(100))
data.get()
# 使用 model_to_dict 来将查询结果转换为字典形式
注意在涉及外键的查询中,model_to_dict
的输出和等效的纯 SQL
查询的输出稍有不同(在做项目的时候踩坑了)
连接池上下文
使用连接池对象的 connection_context
方法创建上下文
with pool.connection_context():
# 在这里进行数据库操作
使用SQL内置函数
需要使用 peewee 自带的 fn 对象
多线程
已经写过笔记了,就不再重复记录,地址: Python并行开发学习笔记