A Work Queue using MongoDB
In this scenario, the work that needs to be done takes a long time, so when a request comes, it is stored in a work queue, and processed later by a group of workers. There can be many writers to the queue, and many workers reading from the queue. The point is to ensure no work is scheduled by two workers at the same time.
There’s a lot of open-source libraries out there to do this. You should seriously consider integrating one of those into your program before implementing what I describe here. However, if you are careful about a few crucial points, this is very easy to embed into your MongoDB application.
Assumptions
Code here is using mgo
MongoDB driver, but same ideas can be applied
to the official driver as well. This implementation needs one
additional index and a field in your documents. I’ve never tried this
with a sharded collection. In general it is best to avoid sharding if
you’re worried about concurrency issues.
This implementation assumes there are multiple writers and readers for the work queue. The point of the implementation is to ensure that all readers can acquire work items, and no work item can be acquired by two readers. There is no communication between the readers.
How does this work?
There are no restrictions on how work items are added to the queue.
When a worker needs to get a work item from the queue, it has to do
this using an atomic operation, otherwise there is a change that two
workers get the same item. All MongoDB update operations are atomic at
the document level, so we can run an update operation to acquire a
work item. But we can’t simply use a state
field, and write
acquired
there, because if multiple workers update the same document
one after the other, there is no way to figure out who really acquired
the work.
Instead, each worker competing for the same work generates a random
number (a UUID), and execute an update operation to update an empty
field on a document to their random number. Since updates are atomic,
we know that if a document gets a worker’s random number, then at the
time update took place the random field was empty, and after the
update no other worker can overwrite it because that field is no
longer empty. After the update operation, the workers execute a find
operation using their own random numbers, and only one of them will
find a match. That is the worker that acquired the work.
Setup
For this to work, read mode must be primary, otherwise reads may not get what was just written:
|
|
We need to add a new field to the work item:
|
|
This field will keep a random number.
We need a unique index on that random number field, otherwise things may get slow:
|
|
Operation
When a worker is ready to acquire new work, it generates a random
number num
(a UUID), and executes the following update:
|
|
This will find one work item whose rnd
field is empty along with
other criteria you need (such as priority, state, etc.), and update
its rnd
field to the random number you just picked. The update can
also update other items (such as setting state to in progress
).
Next step is finding if the previous operation picked any items:
|
|
If this query finds an item, it is acquired, and other operations can be performed on it without worrying about interference from other workers.
Improvements
This thing is simple and effective enough that it can be used without any additions. However, these type of systems are also prone to job failures, so it makes sense to keep track of the duration a piece of work remains acquired, and if it is still not completed, to retry it somehow. The workers can periodically update their state in a persistent store to recover operation in case of such a retry, or you can retry it from the beginning. You have to make sure that you don’t end up with two different result sets though.
It might be worthwhile to implement a prioritization scheme. In that case, you may need to run multiple update statements to try different priority levels.