Work queue with at most n workers
I have an API where people can submit a request to get a report on a rather large set of data items. It is not feasible to run the report when API request is received. Because of this, I decided to implement two APIs: first to submit the request, and second to retrieve (partial) results, while the report runs as a separate process.
I implemented this so that the report runner is a separate process
that polls a work queue. The report submission API simply adds a
report request to the queue, and returns a report id to the caller.
The report running polls the work queue, picks a report, runs it, and
stores the results in temporary storage, so other APIs can check the
status of the report and get the results. This scheme also allows me
to run many report runners if necessary. If there is significant
number of report requests, then the report runner will be fairly busy,
but if not, then there should be a way to wake it up instead of
waiting for the next time it polls the work queue, so the report
runner also provides a ping
API that causes it to check the queue if
it is idle.
Code
The complete implementation is below. This is the part that orchestrates the polling of the work queue and spinning new goroutines if necessary. There is a limit on the number of concurrent goroutines, which is specified by a configuration, so a single report runner will not pick up all the work to choke itself later. This runs in a container, so the polling loop is cancelable. It should also handle signals so it can gracefully stop in case of container shutdown. The full implementation is as follows:
|
|
Signals
This code runs in a container. When a container receives the terminate signal, it has 10 seconds to terminate. I’d like to pass this signal to all the workers, so they can store their current states, and later another worker can pick it up and continue from where they left. The following snippet does that:
|
|
Here, we’re listening to SIGTERM
and SIGINT
signals ina separate
goroutine, and canceling a context once the signal is received. This
context will be passed to all workers, so when the workers can poll
for the context Done()
channel, and terminate if the context is
canceled.
Polling
If there are already enough workers are running, this leaves work
nil. Here, atomic load is sufficient instead of lock/unlock, but we
have to be careful to read/write the nWorkers
using atomic functions
only. The nWorkers
will be decremented when a worker goroutine ends.
|
|
If there is no new work, or if there are already enogh workers, we
have to wait. The following select
waits on four channels. If
ctx.Done()
, then it means we received a SIGTERM
. Since all workers
share the same context, the workers terminate as well, so here we wait
for them to exit.
The timer channel is there for the polling loop to wait for a few seconds before polling the work queue.
The <-done
channel is selected whenever a worker goroutine
ends. That means, we can poll for more work.
The <-pingChannel
is used to wake up a waiting polling loop. If
there isn no work, the polling loop waits for a timeout. If new works
comes during this time, then ping allows immediate scheduling of the
new work.
We need both pingChannel
and timed wait, because not all clients are
guaranteed to ping
the work queue.
|
|
The actual work is done here:
|
|
On work completion, this should decrease the nWorkers
, and also
release the polling loop using the done
channel. This will work,
because if there are already nWorkers
workers, then the polling loop
will not get new work, and select
will read from the done
channel.