本篇文档提供了一个使用二阶段提交将数据写入多个文档的方法来处理多文档更新或“多文档事务”。在此基础上,你可以扩展实现类似数据回滚的功能。
在MongoDB数据库中,作用于单个document的操作总是原子性的;但是,涉及到多个document的操作,也就是我们常说的“多文档事务”,是非原子性的。 由于document可以设计的非常复杂并且能包含多个“内嵌”document,因此单文档原子性对很多实际场景提供了必要的支持。(译者注:比如你要批量更新某批商品的出厂日期,可以将这些商品信息放在同一个document中做内嵌。但是我几乎没有使用过这种方法,会有很多额外的问题,比如频繁操作会导致document move。)
尽管单文档原子操作能满足不少需求,但是在很多场景下仍然需要多文档事务的支持。当执行一个由几个顺序操作组成的事务时,可能会出现某些问题,例如:
对于需要多文档事务的场景,你可以在应用中实现二阶段提交来提供支持。二阶段提交可以保证数据的一致性,如果发生错误,事务前的状态是可恢复的。在事务执行过程中,无论发生什么情况都可以还原到数据和状态的准备阶段。
注意
因为在MongoDB中只有单文档操作是原子性的,二阶段提交只能提供类似事务的语义。在二阶段提交或回滚进行中,应用程序可以返回任意步骤点的中间数据。
概述
考虑这样一个场景,你想从账户A转账给账户B。在关系型数据库系统中,你可以在单个多语句事务中先减少账户A的资金然后为账户B增加资金。在MongoDB中,你可以模拟实现一个二阶段提交得到同样的结果。
本节中的所有示例使用下面两个集合:
1. 集合accounts保存账户信息。
2. 集合transactions保存转账事务信息。
初始化源账户和目标账户
将账户A和账户B的信息写入到集合accounts。
db.accounts.insert(
[
{ _id: "A", balance: 1000, pendingTransactions: [] },
{ _id: "B", balance: 1000, pendingTransactions: [] }
]
)
上面的语句返回一个BulkWriteResult()对象,包含了本次操作的状态信息。如果成功写入,BulkWriteResult()对象中的 nInserted的值为2。(译者注:在2.6版本后写操作都会返回WriteResult对象,批量写会返回BulkWriteResult,具体请见相关章节)
初始化转帐数据
将每笔转账信息写入到transactions表,转账数据包含以下字段:
将账户A向账户B转账100的操作信息初始化到transactions集合, state字段值为"initial", lastModified字段值设为当前时间:
db.transactions.insert(
{ _id: 1, source: "A", destination: "B", value: 100, state: "initial", lastModified: new Date() }
)
上面的语句返回一个WriteResult()对象,包含了本次操作的状态信息,如果写入成功, WriteResult()对象的 nInserted值为1。
使用二阶段提交转账
? 获取transaction集合的数据
从transactions集合查找一条state字段值为initial的数据。当前transactions集合中只有一条数据,也就是说我们在上文 初始化转账数据 这个步骤只写入了一条数据。如果集合中有另外的数据,下面的查询会返回任意state字段为initial的数据,除非你附加一些别的查询条件。
var t = db.transactions.findOne( { state: "initial" } )
在 mongo shell中定义变量t来打印返回的内容。上边的语句会得到如下输出:
{ "_id" : 1, "source" : "A", "destination" : "B", "value" : 100, "state" : "initial", "lastModified" : ISODate("2014-07-11T20:39:26.345Z") }
? 将transaction数据的state字段设为pending
将transaction数据的state字段从initial设为pending,并用 $currentDate 操作将lastModified字段设为当前时间。
db.transactions.update(
{ _id: t._id, state: "initial" },
{
$set: { state: "pending" },
$currentDate: { lastModified: true }
}
)
这个更新操作会返回一个WriteResult()对象,包含本次更新操作的状态信息,如果更新成功,nMatched 和 nModified 显示为1。
在这个更新语句中state: "initial" 条件确保没有其它线程更新过本条数据。如果nMatched和 nModified为0,回到第一步重新获取一条数据然后继续按步骤进行。
? 对账户进行转账
如果账户不包含transaction信息,用 update()方法更新帐户信息, 在更新条件中带有pendingTransactions: {$ne: t._id },这是为了避免重复同一次转账。
同时更新balance字段和pendingTransactions字段来实现转账。
更新源账户信息,为balance字段减去transaction 数据的value 值,并将transaction 的_id写入到pendingTransactions字段的数组中。
db.accounts.update(
{ _id: t.source, pendingTransactions: { $ne: t._id } },
{ $inc: { balance: -t.value }, $push: { pendingTransactions: t._id } }
)
操作成功后,方法会返回WriteResult() 对象, nMatched 和nModified值为1。
更新目标账户信息,为balance字段加上transaction 数据的value 值,并将transaction 的_id写入到pendingTransactions字段的数组中。
db.accounts.update(
{ _id: t.destination, pendingTransactions: { $ne: t._id } },
{ $inc: { balance: t.value }, $push: { pendingTransactions: t._id } }
)
操作成功后,方法会返回 WriteResult() 对象, nMatched 和nModified 值为1。
? 将transaction数据的state设为applied
用下面的update()操作将transaction数据的state 值设为applied operation to set the transaction’s state to applied,并更新lastModified字段值为当前时间:
db.transactions.update(
{ _id: t._id, state: "pending" },
{
$set: { state: "applied" },
$currentDate: { lastModified: true }
}
)
操作成功后,方法会返回 WriteResult() 对象, nMatched 和nModified 值为1。
? 将transaction 数据的_id值从两个账户的pendingTransactions字段中移除
从两个账户中的pendingTransactions 字段中移除state值为applied的 transaction数据的 _id值。
更新源账户
db.accounts.update(
{ _id: t.source, pendingTransactions: t._id },
{ $pull: { pendingTransactions: t._id } }
)
操作成功后,方法会返回 WriteResult() 对象, nMatched 和nModified 值为1。
更新目标账户
db.accounts.update(
{ _id: t.destination, pendingTransactions: t._id },
{ $pull: { pendingTransactions: t._id } }
)
操作成功后,方法会返回 WriteResult() 对象, nMatched 和nModified 值为1。
? 更新transaction数据的 state值为done.
将transaction 数据的state设为 done ,更新lastModified为当前时间,这也标志着本次事务的结束。
db.transactions.update(
{ _id: t._id, state: "applied" },
{
$set: { state: "done" },
$currentDate: { lastModified: true }
}
)
操作成功后,方法会返回 WriteResult() 对象, nMatched 和nModified 值为1。
其实最重要的部分不是上面示例中比较顺的场景,重要的当事务未成功完成时有没有可能从各种各样失败情况中恢复。这部分会概括各种可能出现的失败场景,并教你一些步骤,如何从这些事件中恢复。
恢复操作
二阶段提交模式允许应用程序有序的运行一些操作来恢复事务并达到一致性状态。在应用启动时运行恢复程序,可能是个定期执行的程序,用来捕获任何未完成的事务。
在一致性问题上对于时间的需求取决于应用间隔多长时间为每个事务进行恢复。
接下来举例的恢复程序根据lastModified字段做为指标来决定pending状态的事务是否需要进行恢复; 再具体点,如果pending 或 applied 状态的事务在30分钟内未更新过,恢复程序会认为这些事务需要进行恢复。你可以用不同的条件来决定事务是否需要恢复。
pending状态的事务
要恢复发生在上文举例的“将transaction数据的state设为pending.” 步骤之后,但发生在 “将transaction数据的 state设为applied.“步骤之前的错误,先从transactions集合中获取一条pending状态的数据:
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
var t = db.transactions.findOne( { state: "pending", lastModified: { $lt: dateThreshold } } );
然后从上文的 “对账户进行转账“步骤开始继续执行
applied状态的事务
要恢复发生在上文举例的 “将transaction数据的state设为applied”步骤之后,但发生在 but before “将transaction数据的state设为done.“步骤之前的错误,先从transactions集合中获取一条applied状态的数据:
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
var t = db.transactions.findOne( { state: "applied", lastModified: { $lt: dateThreshold } } );
然后从上文的“U将transactions数据的_id值从两个账户信息的pendingTransactions字段中删除.“步骤开始继续执行
回滚操作
在一些情况下,你可能需要“回滚”或取消事务;举例来说,比如应用程序主观的需要去“取消”事务,或者事务中的某个账户不存在,或者说在事务进行中账户不复存在了。
applied状态的事务
在 “将transaction数据state设为applied.”步骤之后,你最好不要回滚事务了。取而代之的方式应该是,完成这个事务,然后新启一个事务,将上个事务的源账户和目标账户调换一下,再做一次转账。
在 “将transaction数据 state设为pending.”步骤之后,在 “将 transaction数据 state设为applied.”步骤之前,你可以根据下面的流程来回滚事务:
? 将transaction数据state设为canceling
将transaction的 state 从pending 设为canceling。
db.transactions.update(
{ _id: t._id, state: "pending" },
{
$set: { state: "canceling" },
$currentDate: { lastModified: true }
}
)
操作成功后,方法会返回 WriteResult() 对象, nMatched 和nModified 值为1。
? 在两个账户上撤消事务
对两个账户做反向操作来撤消事务,在update条件的中加上pendingTransactions: t._id来筛选满足条件的数据。
更新目标账户信息,在balance 字段上减去transaction 数据的value 值,并将transaction 数据的 _id 从pendingTransactions 数组中移除。
db.accounts.update(
{ _id: t.destination, pendingTransactions: t._id },
{
$inc: { balance: -t.value },
$pull: { pendingTransactions: t._id }
}
)
操作成功后,方法会返回 WriteResult() 对象, nMatched 和nModified 值为1。 如果转账事务在之前没有发生在该账户上,那么上面的更新操作匹配不到数据, nMatched and nModified 值会是0。
更新源账户信息,在balance字段上加上transaction数据的 value值,并将transaction 数据的 _id 从pendingTransactions 数组中移除。
db.accounts.update(
{ _id: t.source, pendingTransactions: t._id },
{
$inc: { balance: t.value},
$pull: { pendingTransactions: t._id }
}
)
操作成功后,方法会返回 WriteResult() 对象, nMatched 和nModified 值为1。如果转账事务在之前没有发生在该账户上,那么上面的更新操作匹配不到数据, nMatched and nModified 值会是0。
? 将transaction数据state设为canceled
将transaction 数据的state 从canceling 设为cancelled来完成最终的回滚 。
db.transactions.update(
{ _id: t._id, state: "canceling" },
{
$set: { state: "cancelled" },
$currentDate: { lastModified: true }
}
)
操作成功后,方法会返回 WriteResult() 对象, nMatched 和nModified 值为1。
事务的存在,从某种程度上说,是为了便于多个应用并发的创建和执行操作,而不会引发数据不一定和数据冲突。在我们的程序中,更新或获取transaction集合的数据时,更新条件中都会包含state 字段条件,这能防止多应用冲突的申请transaction 数据。
例如,App1和App2同时获取了某条相同的state为initial的transaction 数据。在App2开工前,App1执行了完整的事务,当App2试图执行步骤 “将transaction数据state设为pending.”时,由于更新条件中包含有state: "initial"语句,更新操作匹配不到数据,nMatched 和nModified值会是0。这会让App2返回到第一步去获取另一条transaction数据重新开始事务流程。
当多个应用运行时,最关键的是在任意时刻只能有唯一一个应用能操作一条给定的transaction 数据。同样的,除了在更新条件中包含预期的事务状态之外,你还可以为transaction 数据创建一个标记来鉴别正在操作该transaction数据的应用。用findAndModify()方法原子性的修改并返回transaction数据:
t = db.transactions.findAndModify(
{
query: { state: "initial", application: { $exists: false } },
update:
{
$set: { state: "pending", application: "App1" },
$currentDate: { lastModified: true }
},
new: true
}
)
修改之前例子中的事务操作,可以确保只有匹配上application字段标识的应用才能操作相应的transaction数据。
如果App1在事务执行过程中失败了,你可以用恢复程序进行恢复,但是在恢复之前,应用程序必须确定它们“拥有”相应的transaction数据。例如要找到并继续执行一个pending状态的事务, 使用类似下面的查询:
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
db.transactions.find(
{
application: "App1",
state: "pending",
lastModified: { $lt: dateThreshold }
}
)
本文中的账户事务例子故意体现的很简单。比如,我们假设总能够对账户做回滚操作,并且账户余额是负数。
生产环境中的实现可能会更复杂一些,例如真实场景下账户需要的信息还包括当前余额、待转出、待转入。
对于所有的事务来说,在你部署时需要设置一个合适的写模式。(译者注:我觉得涉及事务的地方最好还是使用安全写比较靠谱)
原文地址:http://blog.csdn.net/zhangming1013/article/details/46679377