[关闭]
@JunQiu 2018-10-14T20:52:25.000000Z 字数 4556 阅读 2953

mongodb多文档事务:两段提交(<4.0)/4.0多文档事务

mongodb summary_2018/10


1、概述

2、建模实现多文档原子性(<4.0)

3、两阶段提交实现多文档事务(<4.0)

  1. 1、涉及协调器和节点多次通信,通信时间较长。
  2. 2、因此整个事务的持续时间也变长,对资源的占有时间也会变长,会产生较长的等待。

  1. ## 假设一个场景
  2. 从账户A转钱到账户B,在关系型数据库中,我们可以在一个事务中完成,但在小于4.0mongo中,我们需要使用2PC完成。
  3. ## 使用2PC来完成这个事务
  4. // 初始化A/B两个账户
  5. db.accounts.insert(
  6. [
  7. { _id: "A", balance: 1000, pendingTransactions: [] },
  8. { _id: "B", balance: 1000, pendingTransactions: [] }
  9. ]
  10. )
  11. // 初始化一个协调者来协调所有参与的文档,在mongo中我们使用一个transactions集合,当需要完成一个事务时,插入一条数据,包含如下字段:
  12. source destination 字段,与 accounts 集合里的 _id 字段相关联的。
  13. value 字段,指定影响 source 账户和 destination 账户 balance 的传输量,
  14. state 字段,反应传输的当前状态。state 字段可以具有值 initial pending applied done canceling canceled
  15. lastModified 字段,反映了最后修改的日期。
  16. db.transactions.insert(
  17. { _id: 1, source: "A", destination: "B", value: 100, state: "initial", lastModified: new Date() }
  18. )
  19. // 从协调器中取出需要完成的事务,即状态为initial
  20. var t = db.transactions.findOne( { state: "initial" } )
  21. // 处理取出来的事务
  22. 1Update transaction state to pending
  23. db.transactions.update(
  24. { _id: t._id, state: "initial" },
  25. {
  26. $set: { state: "pending" },
  27. $currentDate: { lastModified: true }
  28. }
  29. )
  30. 2Apply the transaction to both accounts.(并没有对资源保持持续占有,只保证不会重复操作,若要持续占有可以判定:pendingTransactions: []时)
  31. db.accounts.update(
  32. { _id: t.source, pendingTransactions: { $ne: t._id } },
  33. { $inc: { balance: -t.value }, $push: { pendingTransactions: t._id } }
  34. )
  35. db.accounts.update(
  36. { _id: t.destination, pendingTransactions: { $ne: t._id } },
  37. { $inc: { balance: t.value }, $push: { pendingTransactions: t._id } }
  38. )
  39. 3Update transaction state to applied
  40. db.transactions.update(
  41. { _id: t._id, state: "pending" },
  42. {
  43. $set: { state: "applied" },
  44. $currentDate: { lastModified: true }
  45. }
  46. )
  47. 4Update both accounts list of pending transactions.
  48. db.accounts.update(
  49. { _id: t.source, pendingTransactions: t._id },
  50. { $pull: { pendingTransactions: t._id } }
  51. )
  52. db.accounts.update(
  53. { _id: t.destination, pendingTransactions: t._id },
  54. { $pull: { pendingTransactions: t._id } }
  55. )
  56. // 在协调器中标记事务完成
  57. db.transactions.update(
  58. { _id: t._id, state: "applied" },
  59. {
  60. $set: { state: "done" },
  61. $currentDate: { lastModified: true }
  62. }
  63. )
  1. ## Applied 状态的事务
  2. 这种情况,最好不要回滚事务,而是创建一个相同的事务来抵消该事务
  3. ## Pending 状态的事务
  4. 1Update transaction state to canceling.
  5. 2Undo the transaction on both accounts.
  6. 3Update transaction state to canceled.
  7. Tips:基本和进行事务的方式相同,进行2PC
  8. ## 其它注意点
  9. 1、在2PC的过程中,每一步都需要判定是否成功,根据具体情况对事务进行回滚/重试
  10. 2、如何保证在整个过程中,保证对资源的持续占有,其实上述过程中并没有保证对资源的持续占有,只保证了避免重复操作。

4、4.0多文档事务

  1. // 看一下一个其它人写的例子
  2. //app.js
  3. (async function() {
  4. // 连接DB
  5. const { MongoClient } = require('mongodb');
  6. const uri = 'mongodb://localhost:1301/dbfour';
  7. const client = await MongoClient.connect(uri, { useNewUrlParser: true });
  8. const db = client.db();
  9. await db.dropDatabase();
  10. console.log('(1) 首先 删库 dbfour, then 跑路\n')
  11. // 插入两个账户并充值一些金额
  12. await db.collection('Account').insertMany([
  13. { name: 'A', balance: 50 },
  14. { name: 'B', balance: 10 }
  15. ]);
  16. console.log('(2) 执行 insertMany, A 充值 50, B 充值 10\n')
  17. await transfer('A', 'B', 10); // 成功
  18. console.log('(3) 然后 A 给 B 转账 10\n')
  19. try {
  20. // 余额不足 转账失败
  21. console.log('(4) A 再次转账给 B 50\n')
  22. await transfer('A', 'B', 50);
  23. } catch (error) {
  24. //error.message; // "Insufficient funds: 40"
  25. console.log(error.message)
  26. console.log('\n(5) A 余额不够啊,所以这次转账操作不成功')
  27. }
  28. // 转账逻辑
  29. async function transfer(from, to, amount) {
  30. const session = client.startSession();
  31. session.startTransaction();
  32. try {
  33. const opts = { session, returnOriginal: false };
  34. const A = await db.collection('Account').
  35. findOneAndUpdate({ name: from }, { $inc: { balance: -amount } }, opts).
  36. then(res => res.value);
  37. if (A.balance < 0) {
  38. // 如果 A 的余额不足,转账失败 中止事务
  39. // `session.abortTransaction()` 会撤销上面的 `findOneAndUpdate()` 操作
  40. throw new Error('Insufficient funds: ' + (A.balance + amount));
  41. }
  42. const B = await db.collection('Account').
  43. findOneAndUpdate({ name: to }, { $inc: { balance: amount } }, opts).
  44. then(res => res.value);
  45. await session.commitTransaction();
  46. session.endSession();
  47. return { from: A, to: B };
  48. } catch (error) {
  49. // 如果错误发生,中止全部事务并回退到修改之前
  50. await session.abortTransaction();
  51. session.endSession();
  52. throw error; //使其调用者 catch error
  53. }
  54. }
  55. })()
  56. Tips:When using the drivers, you must pass the session to each operation in the transaction.

5、参考文档

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注