Work queue with at most n workers

Work queue with at most n workers

I have an API where people can submit a request to get a report on a rather large set of data items. It is not feasible to run the report when API request is received. Because of this, I decided to implement two APIs: first to submit the request, and second to retrieve (partial) results, while the report runs as a separate process.

I implemented this so that the report runner is a separate process that polls a work queue. The report submission API simply adds a report request to the queue, and returns a report id to the caller. The report running polls the work queue, picks a report, runs it, and stores the results in temporary storage, so other APIs can check the status of the report and get the results. This scheme also allows me to run many report runners if necessary. If there is significant number of report requests, then the report runner will be fairly busy, but if not, then there should be a way to wake it up instead of waiting for the next time it polls the work queue, so the report runner also provides a ping API that causes it to check the queue if it is idle.

Code

The complete implementation is below. This is the part that orchestrates the polling of the work queue and spinning new goroutines if necessary. There is a limit on the number of concurrent goroutines, which is specified by a configuration, so a single report runner will not pick up all the work to choke itself later. This runs in a container, so the polling loop is cancelable. It should also handle signals so it can gracefully stop in case of container shutdown. The full implementation is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
go func() {

    // This block deals with signals. When this is running in a 
    // container, we get SIGTERM when container is terminating.
    // This is used to stop the polling loop gracefully so
    // running tasks can record state information
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    go func() {
        <-sig
        cancel()
    }()

    // Number of active workers
    var nWorkers int32 
    // wg will be used to wait for all workers
    // when we're shutting down
    wg := sync.WaitGroup{}
    // done channel will be used to signal worker
    // termination
    done := make(chan struct{})

    // This is the polling loop
    for {

        // Poll for work. If we already have 
        // maxWorkers running, pass. Otherwise
        // get next work item
        var work *Work
        if atomic.LoadInt32(&nWorkers) >= maxWorkers {
            work = nil
        } else {
            work = getNextWork()
        }
        if rep == nil {
            // Start waiting
            select {
            case <-ctx.Done():
                // Shutting down
                // Wait for all workers to shut down
                wg.Wait()
                return

            case <-time.After(pollDuration):
                // Timeout, poll again
                break

            case <-done:
                // A goroutine completed processing
                // poll
                break

            case <-pingChannel:
                // Received a ping. Poll
                break
            }
        } else {
            // There is a new work item
            atomic.AddInt32(&nWorkers, 1)
            // Start the worker process
            go func() {
                wg.Add(1)
                defer func() {
                    atomic.AddInt32(&nWorkers, -1)
                    wg.Done()
                    // Signal that a worker is completed
                    done <- struct{}{}
                }()
                processWork(ctx, work)
            }()

        }
    }
}()

Signals

This code runs in a container. When a container receives the terminate signal, it has 10 seconds to terminate. I’d like to pass this signal to all the workers, so they can store their current states, and later another worker can pick it up and continue from where they left. The following snippet does that:

1
2
3
4
5
6
7
8
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
    <-sig
    cancel()
}()

Here, we’re listening to SIGTERM and SIGINT signals ina separate goroutine, and canceling a context once the signal is received. This context will be passed to all workers, so when the workers can poll for the context Done() channel, and terminate if the context is canceled.

Polling

If there are already enough workers are running, this leaves work nil. Here, atomic load is sufficient instead of lock/unlock, but we have to be careful to read/write the nWorkers using atomic functions only. The nWorkers will be decremented when a worker goroutine ends.

1
2
3
4
5
6
7
for {
  var work *Work
  if atomic.LoadInt32(&nWorkers) >= maxWorkers {
    work = nil
  } else {
     work = getNextWork()
  }

If there is no new work, or if there are already enogh workers, we have to wait. The following select waits on four channels. If ctx.Done(), then it means we received a SIGTERM. Since all workers share the same context, the workers terminate as well, so here we wait for them to exit.

The timer channel is there for the polling loop to wait for a few seconds before polling the work queue.

The <-done channel is selected whenever a worker goroutine ends. That means, we can poll for more work.

The <-pingChannel is used to wake up a waiting polling loop. If there isn no work, the polling loop waits for a timeout. If new works comes during this time, then ping allows immediate scheduling of the new work.

We need both pingChannel and timed wait, because not all clients are guaranteed to ping the work queue.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
select {
case <-ctx.Done():
    // Shutting down
    // Wait for all workers to shut down
    wg.Wait()
    return

case <-time.After(pollDuration):
    // Timeout, poll again
    break

case <-done:
    // A goroutine completed processing
    // poll
    break

case <-pingChannel:
    // Received a ping. Poll
    break
}

The actual work is done here:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
atomic.AddInt32(&nWorkers, 1)
// Start the worker process
go func() {
    wg.Add(1)
    defer func() {
        atomic.AddInt32(&nWorkers, -1)
        wg.Done()
        // Signal that a worker is completed
        done <- struct{}{}
    }()
    processWork(ctx, work)
}()

On work completion, this should decrease the nWorkers, and also release the polling loop using the done channel. This will work, because if there are already nWorkers workers, then the polling loop will not get new work, and select will read from the done channel.