mongoose副本集事务实践

背景

最近因公司的CRM项目用的mongodb,浏览了所有旧代码,看到特别多多表原子操作的问题。借此机会就来看看mongodb4.0后出来的副本集事务的能力。

问题还原

多表操作场景

涉及异常回滚的原子性问题,

1
2
3
4
5
// 创建流水号(流水号表自增)

throw Error -> 异常中断

// 创建订单

问题:当异常中断时候,实际流水号表已经成功自增1,但是创建订单失败。当下一次进行操作的时候,实际流水号缺失了1位的订单流水号。

这里其实还隐藏着一个问题,高并发未加锁,会导致流水号异常。

这就是事务的原子性,实际应该当异常中断,启动事务回滚,回滚流水号的自增创建。

问题还原2

redis版本异常

1
2
3
4
5
6
7

// 订单入库
await ctx.model.Delivery.VirtualOrder.create(orders)

// 发送通知 -> 异常出现redis报错
const jobs = await ctx.app.noticeQueue.addBulk(datas)

问题:当异常redis中断导致发送消息失败,应该启动事务回滚

正确操作:启动事务,进行异常回滚,如下

app/extend/context.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
module.exports = {
async getSession(
opt = {
readPreference: { mode: 'primary' },
}
) {
const { mongoose } = this.app
const session = await mongoose.startSession(opt)
await session.startTransaction({
readConcern: { level: 'majority' },
writeConcern: { w: 'majority' },
})
return session
},
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

const { ctx } = this
const session = await this.ctx.getSession()
try {
// 订单入库
await ctx.model.Delivery.VirtualOrder.create(orders, { session })
// 发送通知 -> 异常出现redis报错
const jobs = await ctx.app.noticeQueue.addBulk(datas)
} catch (error) {
console.log('开始回滚')
await session.abortTransaction()
session.endSession()
console.log(error)
}

错误或不建议的操作:人工删除

1
2
3
4
5
6
7
8
9
10
11
try {
// 创建
// 创建
// 创建
// 创建
// 创建
// 创建
} catch{
// 人工删除创建
}

mongodb副本集环境搭建

bash 脚本

1
2
3
4
5
#!/bin/bash
# 生成 keyfile
mkdir ./keyfile
openssl rand -base64 745 > ./keyfile/mongoReplSet-keyfile
chmod 600 ./keyfile/mongoReplSet-keyfile

出现如下错误:

1
configsvr01    | {"t":{"$date":"2021-05-29T17:38:02.750+00:00"},"s":"I",  "c":"ACCESS",   "id":20254,   "ctx":"main","msg":"Read security file failed","attr":{"error":{"code":30,"codeName":"InvalidPath","errmsg":"error opening file: /data/mongo.key: bad file"}}}

解决办法: 变更mongoReplSet-keyfile 所属用户
chown 999 mongoReplSet-keyfile

启动 Docker

docker-compose -f docker-compose.yml up -d
docker-compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
version: '3.1'
services:
mongo1:
image: mongo
hostname: mongo1
container_name: mongo1
restart: always
ports:
- 27011:27017
volumes:
- ./keyfile:/data/keyfile
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: 123123
command: mongod --auth --keyFile /data/keyfile/mongoReplSet-keyfile --bind_ip_all --replSet rs0

mongo2:
image: mongo
hostname: mongo2
container_name: mongo2
restart: always
ports:
- 27012:27017
volumes:
- ./keyfile:/data/keyfile
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: 123123
command: mongod --auth --keyFile /data/keyfile/mongoReplSet-keyfile --bind_ip_all --replSet rs0

mongo3:
image: mongo
hostname: mongo3
container_name: mongo3
restart: always
ports:
- 27013:27017
volumes:
- ./keyfile:/data/keyfile
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: 123123
command: mongod --auth --keyFile /data/keyfile/mongoReplSet-keyfile --bind_ip_all --replSet rs0
容器名 ip 备注
mongo1 10.8.99.44:27011 Primary(主, 读写)
mongo2 10.8.99.44:27012 Secondary1(从,读)
mongo3 10.8.99.44:27013 Secondary2(从, 读)

配置副本集

1
2
3
4
5
6
7
8
9
10
11
12
13
docker exec -it <container> mongo
mongo -u root -p 123123
use admin
rs.initiate(
{
_id : 'rs0',
members: [
{ _id : 0, host : "10.8.99.44:27011", priority:3 },
{ _id : 1, host : "10.8.99.44:27012", priority:3 },
{ _id : 2, host : "10.8.99.44:27013", priority:3 }
]
}
)

相关副本集命令

重置

1
2
3
4
5
6
7
8
9
10
rs.reconfig(
{
_id : 'rs0',
members: [
{ _id : 0, host : "10.8.99.44:27011", priority:3 },
{ _id : 1, host : "10.8.99.44:27012", priority:1 },
{ _id : 2, host : "10.8.99.44:27013", priority:1 }
]
}
)
1
rs.config()

强制修改副本集host

1
2
3
4
5
6
7
8
9
10
11
12
13
rs.reconfig(
{
_id : 'rs0',
members: [
{ _id : 0, host : "192.168.199.98:27011", priority:3 },
{ _id : 1, host : "192.168.199.98:27012", priority:1 },
{ _id : 2, host : "192.168.199.98:27013", priority:1 }
]
},
{
"force":true
}
)

修改优先级

必须在primary节点上执行此操作,副本集中通过设置priority的值来决定优先权的大小。这个值的范围是0–100,值越大,优先权越高. 如果值是0,那么不能成为primay。适用于做冷备。

1
2
3
PRIMARY> config=rs.conf()
PRIMARY>config.members[0].priority = 6
PRIMARY> rs.reconfig(config)
1
2
3
4
5
6
7
8
查看副本集状态
rs.status()
rs.isMaster()
查看副本集配置
rs.conf()
查看Secondary同步状态
rs.printReplicationInfo()
rs.printSecondaryReplicationInfo()

事务实践

如何通过mongoose连接副本集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
config.mongoose = {
url: 'mongodb://10.8.99.44:27011,10.8.99.44:27012,10.8.99.44:27013/log',
options: {
auth: {
user: 'admin',
password: '123123',
},
replicaSet: 'rs0', // 副本集
readPreference: 'secondaryPreferred', // 读哪个副本集 从哪个副本集读数据
readConcern: { level: 'majority' }, // 读的策略 解决脏读 决定这个节点上的数据哪些是可读的,类似于关系数据库的隔离级别 读取在大多数节点上提交完成的数据;
// write concern 当配置majority时,大多数节点为2,Primary感知到两个节点被写入完成时就代表成功写入
writeConcern: {
w: 'majority',
// j,该参数表示是否写操作要进行journal持久化之后才向用户确认;
// {j: true} 要求primary写操作进行了journal持久化之后才向用户确认;
// {j: false} 要求写操作已经在journal缓存中即可向用户确认;journal后续会将持久化到磁盘,默认是100ms;
// j: true,
j: true,
wtimeout: 1000,
},
keepAlive: true,
keepAliveInitialDelay: 300000,
useNewUrlParser: true,
useFindAndModify: false,
useCreateIndex: true,
useUnifiedTopology: true,
serverSelectionTimeoutMS: 30000,
socketTimeoutMS: 45000,
},
}

readPreference

默认情况下,读写都指定到副本集中的 Primary 节点。对于读多写少的情况我们可以使用读写分离来减轻 DB 的压力。MongoDB 驱动程序支持五种读取首选项(Read Preference) 模式。

Read Preference 描述
primary 默认模式。 所有操作都从当前副本集 primary 读取。
primaryPreferred 在大多数情况下,从 primary 读取,但如果不可用,则从 secondary 读取。
secondary 所有操作都从 secondary 中读取。
secondaryPreferred 在大多数情况下,从 secondary 读取,但如果没有 secondary 可用,则从 primary 读取。
nearest 无论成员的类型如何,操作都从具有最小网络延迟的副本集成员读取

mongoose事务回滚

app/extend/context.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
module.exports = {
async getSession(
opt = {
readPreference: { mode: 'primary' },
}
) {
const { mongoose } = this.app
const session = await mongoose.startSession(opt)
await session.startTransaction({
readConcern: { level: 'majority' },
writeConcern: { w: 'majority' },
})
return session
},
}

app/service/test.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
async transaction() {
const { ctx } = this
const session = await this.ctx.getSession()
const db = this.app.mongoose.connection
try {
const res1 = await ctx.model.VpnLog.create(
[
{
mac_address: 'test',
session: 'test',
source_ip: 'test',
destination_ip: 'test',
hub: 'test',
items_per_second: 111,
created_at: new Date(),
},
],
{ session }
)
throw new Error('出错') -> 出错,如其他程序的操作错误
const res2 = await ctx.model.VpnLog.create(
[
{
mac_address: 'test2',
session: 'test2',
source_ip: 'test2',
destination_ip: 'test2',
hub: 'test2',
items_per_second: 222,
created_at: new Date(),
},
],
{ session }
)
await session.commitTransaction()
session.endSession()
return {
res1,
res2,
}
} catch (error) {
console.log('开始回滚')
await session.abortTransaction()
session.endSession()
console.log(error)
}
}

可以看看res1 是不是回滚创建,在数据库中找不到了

相关链接

https://www.jianshu.com/p/8d7dea5c067b
https://www.zhangshengrong.com/p/Ap1ZeQ2PX0/