A Work Queue using MongoDB

A Work Queue using MongoDB

In this scenario, the work that needs to be done takes a long time, so when a request comes, it is stored in a work queue, and processed later by a group of workers. There can be many writers to the queue, and many workers reading from the queue. The point is to ensure no work is scheduled by two workers at the same time.

There’s a lot of open-source libraries out there to do this. You should seriously consider integrating one of those into your program before implementing what I describe here. However, if you are careful about a few crucial points, this is very easy to embed into your MongoDB application.

Assumptions

Code here is using mgo MongoDB driver, but same ideas can be applied to the official driver as well. This implementation needs one additional index and a field in your documents. I’ve never tried this with a sharded collection. In general it is best to avoid sharding if you’re worried about concurrency issues.

This implementation assumes there are multiple writers and readers for the work queue. The point of the implementation is to ensure that all readers can acquire work items, and no work item can be acquired by two readers. There is no communication between the readers.

How does this work?

There are no restrictions on how work items are added to the queue.

When a worker needs to get a work item from the queue, it has to do this using an atomic operation, otherwise there is a change that two workers get the same item. All MongoDB update operations are atomic at the document level, so we can run an update operation to acquire a work item. But we can’t simply use a state field, and write acquired there, because if multiple workers update the same document one after the other, there is no way to figure out who really acquired the work.

Instead, each worker competing for the same work generates a random number (a UUID), and execute an update operation to update an empty field on a document to their random number. Since updates are atomic, we know that if a document gets a worker’s random number, then at the time update took place the random field was empty, and after the update no other worker can overwrite it because that field is no longer empty. After the update operation, the workers execute a find operation using their own random numbers, and only one of them will find a match. That is the worker that acquired the work.

Setup

For this to work, read mode must be primary, otherwise reads may not get what was just written:

1
session.SetMode(mgo.Primary,true)

We need to add a new field to the work item:

1
2
3
4
type WorkItem struct {
  ...
  Rnd string `bson:"rnd"`
}

This field will keep a random number.

We need a unique index on that random number field, otherwise things may get slow:

1
collection.EnsureIndex(mgo.Index{Key: []string{"rnd"}, Unique: true})

Operation

When a worker is ready to acquire new work, it generates a random number num (a UUID), and executes the following update:

1
2
collection.Update(bson.M{"rnd":"", other selection criteria},
  bson.M{"$set":bson.M{"rnd":num}})

This will find one work item whose rnd field is empty along with other criteria you need (such as priority, state, etc.), and update its rnd field to the random number you just picked. The update can also update other items (such as setting state to in progress).

Next step is finding if the previous operation picked any items:

1
2
3
if collection.Find(bson.M{"rnd":num}).One(&work) == nil {
  // Work acquired
}

If this query finds an item, it is acquired, and other operations can be performed on it without worrying about interference from other workers.

Improvements

This thing is simple and effective enough that it can be used without any additions. However, these type of systems are also prone to job failures, so it makes sense to keep track of the duration a piece of work remains acquired, and if it is still not completed, to retry it somehow. The workers can periodically update their state in a persistent store to recover operation in case of such a retry, or you can retry it from the beginning. You have to make sure that you don’t end up with two different result sets though.

It might be worthwhile to implement a prioritization scheme. In that case, you may need to run multiple update statements to try different priority levels.