数据库操作优化,提速 Upsert
反应式编程在客户端编程当中的应用相当广泛,而当前在服务端中的应用相对被提及较少。本篇将介绍如何在服务端编程中应用响应时编程来改进数据库操作的性能。
开篇就是结论
接续上一篇《数据库操作优化,从20秒到0.5秒》之后,这次,我们带来了关于利用反应式编程进行 upsert 优化的案例说明。建议读者可以先阅读一下前一篇,这样更容易理解本篇介绍的方法。
同样还是利用批量化的思路,将单个 upsert 操作批量进行合并。已达到减少数据库链接消耗从而大幅提升性能的目的。
业务场景
在最近的一篇文章《十万同时在线用户,需要多少内存?——Newbe.Claptrap框架水平扩展实验》中。我们通过激活多个常驻于内存当中的 Claptrap 来实现快速验证 JWT 正确性的目的。
但,当时有一个技术问题没有得到解决:
Newbe.Claptrap 框架设计了一个特性:当 Claptrap Deactive 时,可以选择将快照立即保存到数据库。因此,当尝试从集群中关闭一个节点时,如果节点上存在大量的 Claptrap ,那么将产生大量的数据库 upsert 操作。瞬间推高数据库消耗,甚至导致部分错误而保存失败。
一点点代码
有了前篇的IBatchOperator
,那么留给这篇的代码内容就非常少了。
首先,按照使用上一篇的 IBatchOperator 编写一个支持操作的 Repository,形如以下代码:
public class BatchUpsert : IUpsertRepository
{
private readonly IDatabase _database;
private readonly IBatchOperator<(int, int), int> _batchOperator;
public BatchUpsert(IDatabase database)
{
_database = database;
var options = new BatchOperatorOptions<(int, int), int>
{
BufferCount = 100,
BufferTime = TimeSpan.FromMilliseconds(50),
DoManyFunc = DoManyFunc
};
_batchOperator = new BatchOperator<(int, int), int>(options);
}
private Task<int> DoManyFunc(IEnumerable<(int, int)> arg)
{
return _database.UpsertMany(arg.ToDictionary(x => x.Item1, x => x.Item2));
}
public Task UpsertAsync(int key, int value)
{
return _batchOperator.CreateTask((key, value));
}
}
然后,只要实现对应数据库的 UpsertMany 方法,便可以很好地完成这项优化。
各种数据库的操作
结合 Newbe.Claptrap 现在项目的实际。目前,被支持的数据库分别有 SQLite、PostgreSQL、MySql 和 MongoDB。以下,分别对不同类型的数据库的批量 Upsert 操作进行说明。
由于在 Newbe.Claptrap 项目中的 Upsert 需求都是以主键作为对比键,因此以下也只讨论这种情况。
SQLite
根据官方文档,使用 INSERT OR REPLACE INTO
便可以实现主键冲突时替换数据的需求。
具体的语句格式形如以下:
INSERT OR REPLACE INTO TestTable (id, value)
VALUES
(@id0,@value0),
...
(@idn,@valuen);
因此只要直接拼接语句和参数调用即可。需要注意的是,SQLite 的可传入参数默认为 999,因此拼接的变量也不应大于该数量。
PostgreSQL
众所周知,PostgreSQL 在进行批量写入时,可以使用高效的COPY
语句来完成数据的高速导入,这远远快于INSERT
语句。但可惜的是COPY
并不能支持ON CONFLICT DO UPDATE
子句。因此,无法使用COPY
来完成 upsert 需求。
因此,我们还是回归使用INSERT
配合ON CONFLICT DO UPDATE
子句,以及unnest
函数来完成批量 upsert 的需求。
具体的语句格式形如以下:
INSERT INTO TestTable (id, value)
VALUES (unnest(@ids), unnest(@values))
ON CONFLICT ON CONSTRAINT TestTable_pkey
DO UPDATE SET value=excluded.value;
其中的 ids 和 values 分别为两个等长的数组对象,unnest 函数可以将数组对象转换为行数据的形式。
注意,可能会出现 ON CONFLICT DO UPDATE command cannot affect row a second time 错误。
因此如果尝试使用上述方案,需要在传入数据库之前,先在程序中去重一遍。而且,通常来说,在程序中进行一次去重可以减少向数据库中传入的数据,这本身也很有意义。
MySql
MySql 与 SQLite 类似,支持REPLACE
语法。具体语句形式如下:
REPLACE INTO TestTable (id, value)
VALUES
(@id0,@value0),
...
(@idn,@valuen);
MongoDB
MongoDB 原生支持 bulkWrite 的批量传输模式,也支持 replace 的 upsert 语法。因此操作非常简单。
那么这里展示一下 C# 操作方法:
private async Task SaveManyCoreMany(
IDbFactory dbFactory,
IEnumerable<StateEntity> entities)
{
var array = entities as StateEntity[] ?? entities.ToArray();
var items = array
.Select(x => new MongoStateEntity
{
claptrap_id = x.ClaptrapId,
claptrap_type_code = x.ClaptrapTypeCode,
version = x.Version,
state_data = x.StateData,
updated_time = x.UpdatedTime,
})
.ToArray();
var client = dbFactory.GetConnection(_connectionName);
var db = client.GetDatabase(_databaseName);
var collection = db.GetCollection<MongoStateEntity>(_stateCollectionName);
var upsertModels = items.Select(x =>
{
var filter = new ExpressionFilterDefinition<MongoStateEntity>(entity =>
entity.claptrap_id == x.claptrap_id && entity.claptrap_type_code == x.claptrap_type_code);
return new ReplaceOneModel<MongoStateEntity>(filter, x)
{
IsUpsert = true
};
});
await collection.BulkWriteAsync(upsertModels);
}
这是从 Newbe.Claptrap 项目业务场景中给出的代码,读者可以结合自身需求进行修改。
通用型解法
优化的本质是减少数据库链接的使用,尽可能在一个链接内完成更多的工作。因此如果特定的数据库不支持以上数据库类似的操作。那么还是存在一种通用型的解法:
- 以尽可能快地方式将数据写入一临时表
- 将临时表的数据已连表 update 的方式更新的目标表
- 删除临时表
性能测试
以 SQLite 为例,尝试对 12345 条数据进行 2 次 upsert 操作。