Graceful shutdown of Pub/Sub consumers in Go

3 hours ago 3

Introduction

Pub/Sub is commonly used to run tasks asynchronously via queued messages. These tasks can often be long-running and involve multi-step, stateful I/O operations. As much as I'd wish that all the pubsub events have atomic and transactional logic, the world is not ideal and sometimes we must design for chaos, especially in eventually consistent systems. In this post, we’ll explore a simple pattern for gracefully shutting down Pub/Sub consumers in Go.

TL;DR

Gracefully shutting down Pub/Sub consumers can be achieved with a simple approach:

  • Listen for termination signals (e.g., SIGINT, SIGTERM).
  • Upon receiving a signal, complete the processing of any in-progress messages with a strict timeout
  • Reject any new incoming messages
  • After the in-progress messages are processed, cancel the context and shut down the consumer.

Problem

A consumer can shut down for various reasons—Kubernetes upgrades, autoscaling restarts, manual throttling, redeployments, or even just Murphy's law. Abrupt shutdowns can be dangerous. I’ve encountered incidents where a consumer took a distributed lock before restarting during a redeployment, and the lock was never released. When consumers aren’t fully idempotent or transactional, these sudden shutdowns can leave asynchronous state machines stuck in an inconsistent state, sometimes for an extended period or even permanently.

To avoid this, let’s explore a recipe to make the system resilient to terminations and interruptions. While it's hard to handle cases like OOM failures where no termination signal is sent, we can definitely make consumers resilient to most shutdown scenarios.

Recipe

A good way to implement graceful shutdown for Pub/Sub consumers is to think of it like a restaurant. When a restaurant closes (say at 12PM), it stops accepting new customers but continues serving those already inside until they finish. Eventually, the restaurant shuts down completely. We can apply a similar strategy to Pub/Sub consumers:

  1. Listen for termination signals.
  2. Reject any new incoming messages.
  3. Upon receiving a signal, complete the processing of in-progress messages with a strict timeout.
  4. Once the in-progress messages are processed, cancel the context and shut down the consumer.

1. Listen for Termination Signals

In Go, you can listen for termination signals using channels. Channels are perfect for sending and receiving signals between goroutines, and since Google’s official Pub/Sub SDK processes messages in goroutines, they fit our use case well.

type terminationChan chan bool // Create a channel to handle termination signals for routines func CreateTerminationChannel() terminationChan { // Create a buffered channel to hold the termination signal terminationChannel := make(chan bool, 1) // Listen for SIGTERM, SIGINT, and OS interrupt signals sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) go func() { <-sigChan // Send termination signal through the channel terminationChannel <- true }() // Return the termination channel return terminationChannel }

Here, we’ve created a boolean termination channel to capture termination signals. Now that we have this channel, we can pass it to our Pub/Sub client to handle shutdowns gracefully.

2. Reject New Incoming Messages

Once a termination signal is received, it’s important to stop processing any new messages. This can be done in multiple ways, but I chose to simply wrap my subscriber handler in a decorator.

func withGracefulShutdown(handler func(context.Context, *pubsub.Message), terminationChannel chan bool) func(context.Context, *pubsub.Message) { return func(ctx context.Context, message *pubsub.Message) { select { case <-terminationChannel: message.Nack() // Reject the message default: handler(ctx, message) // Process the message if no termination signal } } }

This function takes the message handler and the termination channel, then returns a decorated handler. If a termination signal is received, new messages are immediately rejected using Nack(). This ensures no new messages are processed after receiving the shutdown signal.

3. Finish Ongoing Message Processing and Exit

When message processing starts just before a termination signal is received, it's important to allow time for those messages to complete processing to avoid leaving them in a half-processed state. We can use Go’s waitgroups to handle this. By incrementing a global waitgroup whenever a message starts processing and decrementing it once processing finishes, we can track how many messages are still being processed at any given time.

Upon receiving a termination signal, we wait for the waitgroup to reach zero before shutting down the consumer. The code would look something like this:

func startConsumer(ctx context.Context, subscription *pubsub.Subscription, terminationChannel chan bool) { cancellableCtx, cancel := context.WithCancel(ctx) var handlerWaitGroup sync.WaitGroup go func() { <-terminationChannel logger.Info("Received termination signal") logger.Info("Waiting for ongoing message processing") handlerWaitGroup.Wait() logger.Info("All ongoing messages processed; shutting down subscriber") cancel() logger.Info("Subscriber terminated") }() subscription.Receive(cancellableCtx, withGracefulShutdown(func(messageCtx context.Context, message *pubsub.Message) { // increment the waitgroup when the processing starts and decrement it when the handler returns handlerWaitGroup.Add(1) defer handlerWaitGroup.Done() // Message processing logic }, terminationChannel)) }

This approach ensures that all messages are fully processed before the consumer shuts down. The Receive method will return once the context is cancelled, thus exiting the consumer elegantly.

Conclusion

This approach is effective for most use cases where messages are short-lived, typically running for 3-5 minutes or less—depending on how much delay you can tolerate for deployment or autoscaling. However, based on your specific requirements, there are additional steps you can take to further improve your graceful shutdown logic.

  • Timeout on Waitgroups: Add a timeout for how long you wait on waitgroups to avoid tasks running indefinitely. This ensures that if a message is taking too long to process, you can force a shutdown after a certain period.
  • Rollback Logic for Long-Running Tasks: If your consumer handles long-running tasks, consider implementing rollback logic for stateful operations and distributed locks. When a termination signal is received, you can stop accepting new messages, rollback any in-progress operations, and abort the task to avoid leaving the system in an inconsistent state.
  • Checkpointing your progress: for long-running tasks. By periodically saving the state of the task, you can resume processing after a restart or termination, reducing the need for full rollbacks. This approach can significantly improve resiliency for long-running and stateful workloads.
  • Handling Unexpected Kills (OOM/Crashes): For OOM kills or unexpected restarts, set up alerts for your dead letter queue (DLQ). This will help you quickly identify and address messages that failed to process due to abrupt terminations.

Rishichandra Wawhal, Engineering, Zamp

Read Entire Article