@JunQiu
2018-10-14T20:52:25.000000Z
字数 4556
阅读 2953
mongodb
summary_2018/10
1、涉及协调器和节点多次通信,通信时间较长。
2、因此整个事务的持续时间也变长,对资源的占有时间也会变长,会产生较长的等待。
## 假设一个场景
从账户A转钱到账户B,在关系型数据库中,我们可以在一个事务中完成,但在小于4.0的mongo中,我们需要使用2PC完成。
## 使用2PC来完成这个事务
// 初始化A/B两个账户
db.accounts.insert(
[
{ _id: "A", balance: 1000, pendingTransactions: [] },
{ _id: "B", balance: 1000, pendingTransactions: [] }
]
)
// 初始化一个协调者来协调所有参与的文档,在mongo中我们使用一个transactions集合,当需要完成一个事务时,插入一条数据,包含如下字段:
source 和 destination 字段,与 accounts 集合里的 _id 字段相关联的。
value 字段,指定影响 source 账户和 destination 账户 balance 的传输量,
state 字段,反应传输的当前状态。state 字段可以具有值 initial , pending , applied , done , canceling 和 canceled 。
lastModified 字段,反映了最后修改的日期。
db.transactions.insert(
{ _id: 1, source: "A", destination: "B", value: 100, state: "initial", lastModified: new Date() }
)
// 从协调器中取出需要完成的事务,即状态为initial
var t = db.transactions.findOne( { state: "initial" } )
// 处理取出来的事务
1、Update transaction state to pending
db.transactions.update(
{ _id: t._id, state: "initial" },
{
$set: { state: "pending" },
$currentDate: { lastModified: true }
}
)
2、Apply the transaction to both accounts.(并没有对资源保持持续占有,只保证不会重复操作,若要持续占有可以判定:pendingTransactions: []时)
db.accounts.update(
{ _id: t.source, pendingTransactions: { $ne: t._id } },
{ $inc: { balance: -t.value }, $push: { pendingTransactions: t._id } }
)
db.accounts.update(
{ _id: t.destination, pendingTransactions: { $ne: t._id } },
{ $inc: { balance: t.value }, $push: { pendingTransactions: t._id } }
)
3、Update transaction state to applied
db.transactions.update(
{ _id: t._id, state: "pending" },
{
$set: { state: "applied" },
$currentDate: { lastModified: true }
}
)
4、Update both accounts’ list of pending transactions.
db.accounts.update(
{ _id: t.source, pendingTransactions: t._id },
{ $pull: { pendingTransactions: t._id } }
)
db.accounts.update(
{ _id: t.destination, pendingTransactions: t._id },
{ $pull: { pendingTransactions: t._id } }
)
// 在协调器中标记事务完成
db.transactions.update(
{ _id: t._id, state: "applied" },
{
$set: { state: "done" },
$currentDate: { lastModified: true }
}
)
## Applied 状态的事务
这种情况,最好不要回滚事务,而是创建一个相同的事务来抵消该事务
## Pending 状态的事务
1、Update transaction state to canceling.
2、Undo the transaction on both accounts.
3、Update transaction state to canceled.
Tips:基本和进行事务的方式相同,进行2PC
## 其它注意点
1、在2PC的过程中,每一步都需要判定是否成功,根据具体情况对事务进行回滚/重试
2、如何保证在整个过程中,保证对资源的持续占有,其实上述过程中并没有保证对资源的持续占有,只保证了避免重复操作。
// 看一下一个其它人写的例子
//app.js
(async function() {
// 连接DB
const { MongoClient } = require('mongodb');
const uri = 'mongodb://localhost:1301/dbfour';
const client = await MongoClient.connect(uri, { useNewUrlParser: true });
const db = client.db();
await db.dropDatabase();
console.log('(1) 首先 删库 dbfour, then 跑路\n')
// 插入两个账户并充值一些金额
await db.collection('Account').insertMany([
{ name: 'A', balance: 50 },
{ name: 'B', balance: 10 }
]);
console.log('(2) 执行 insertMany, A 充值 50, B 充值 10\n')
await transfer('A', 'B', 10); // 成功
console.log('(3) 然后 A 给 B 转账 10\n')
try {
// 余额不足 转账失败
console.log('(4) A 再次转账给 B 50\n')
await transfer('A', 'B', 50);
} catch (error) {
//error.message; // "Insufficient funds: 40"
console.log(error.message)
console.log('\n(5) A 余额不够啊,所以这次转账操作不成功')
}
// 转账逻辑
async function transfer(from, to, amount) {
const session = client.startSession();
session.startTransaction();
try {
const opts = { session, returnOriginal: false };
const A = await db.collection('Account').
findOneAndUpdate({ name: from }, { $inc: { balance: -amount } }, opts).
then(res => res.value);
if (A.balance < 0) {
// 如果 A 的余额不足,转账失败 中止事务
// `session.abortTransaction()` 会撤销上面的 `findOneAndUpdate()` 操作
throw new Error('Insufficient funds: ' + (A.balance + amount));
}
const B = await db.collection('Account').
findOneAndUpdate({ name: to }, { $inc: { balance: amount } }, opts).
then(res => res.value);
await session.commitTransaction();
session.endSession();
return { from: A, to: B };
} catch (error) {
// 如果错误发生,中止全部事务并回退到修改之前
await session.abortTransaction();
session.endSession();
throw error; //使其调用者 catch error
}
}
})()
Tips:When using the drivers, you must pass the session to each operation in the transaction.