How we built a queue on top of Kafka

Read the article

At Monzo we have been using Kafka as a message queue for several years. A message oriented queue allows engineers to build systems that process and acknowledge messages independently. Kafka's underlying architecture of a distributed log presents several challenges to providing a message oriented interface as consumption proceeds in message order, message-by-message through the log. To address these challenges we implemented a rich client library and set of abstractions that sit on top of Kafka. This includes deadlettering to skip un-processable messages and concurrent processing of messages rather than strictly in log order to increase single process throughput. At the time of writing there are over 400 services using this library to subscribe to upwards of 2,500 Kafka topics. Given our success in operating Kafka in this model, and recent community interest in implementing Queues for Kafka, we felt the time was right to share our approach and experiences.

Kafka’s durability motivated us to choose it as part of our stack

Asynchronous messaging has been a key component of Monzo’s microservices architecture since the very beginning. We came out of the gates using NSQ. In many ways it served us well, with one standout exception — its lack of message durability. This is the only fact standing between us and full at-least-once delivery. We saw Kafka as a solution to the durability story, but wanted to try and preserve the convenient message oriented semantics from NSQ that worked so well for us. Yet, turning a distributed log into a message oriented queue system isn't a problem we wanted every engineer to solve. Enter our client library.

Not needing ordering let us improve queue-style usability substantially

Our client library unlocked our ability to leverage Kafka’s durability while minimising the sharp edges that result from using Kafka as a queue. Two sharp edges stand out in particular. The first is the inability to process messages independently so one message can’t block another, or in other words — out of order. The second is achieving high throughput with a small number of partitions. Ordering again makes this difficult as you can’t leverage concurrency within a partition. In practice, Monzo has very few use cases where ordering actually matters. This means we could afford to remove this requirement, enabling us to bring a queue to life on top of Kafka.

Deadlettering is how we prevent poison pills from stalling the queue

The first version of this library let engineers consume events in a pure Kafka fashion. Within a partition you consume a single message at a time, in order. You commit offsets as you make your way through the ordered log. That was it. 

As anyone who has used Kafka in anger will know, it’s only a matter of time before a partition contains a poison pill. A poison pill is an event that your code is unable to process, stalling all work for that partition until a fix is shipped. If you need strict ordering, this is the only way it can work. In that world, that downside may be worth it.

An example topic, balance_updated, with a red arrow indicating a message can't be processed and so the offset can be committed.

If you can’t process event 2, you can’t skip it to go to process message 3

Indefinite pauses in consumption without engineer intervention is not acceptable to Monzo 99% of the time and we’re willing to break ordering to ensure we don’t end up in this situation. This is where we introduced the concept of a deadletter topic into our library. If a message fails to be processed after maxAttempts times, the library moves it onto the deadletter topic. There we can alert a human to look at it and primary consumer can continue down the log none-the-wiser. Since Kafka lets multiple consumer groups consume the same topic, we decided to use a unique deadletter topic for each unique consumer group and topic combination. Deadlettering looks something like this:

The failing message being moved to a new topic balance_updated__deadletter__service.account, allowing the offset to be committed.

We try to process an event at offset 2 maxAttempts times, we then move it to the deadletter and can carry on to event 3. (service.account is the name of the consumer group, and is included in the deadletter topic name)

Great, we can continue to make progress on the parent topic, balance_updated. We can notify a human about a deadletter event. How does the human replay the event once they’ve fixed their bug? We already have a consumer with all the required logic attached to the parent topic - we could just publish the message there again. We briefly considered this but realised that then you’d re-deliver the message to all consumer groups, even ones that didn’t deadletter the message. The way we resolved this is by also introducing a retry topic for each consumer group (like we did for the deadletter topic) and a Command Line Interface (CLI) to take this action. It looks something like this:

A CLI backend copies the message to the retry topic and commits the offset on the deadletter topic.

The CLI backend publishes the event onto the retry topic, which is actively consumed by service.account and commits an offset on the deadletter to track the event as processed.

If we fail to process an event maxAttempts on the retry topic the event goes back to the deadletter topic again. But what if they want to drain the deadletter instead and drop the event because we never want to process it due to a terminal error? The CLI is equally able to do this. It commits offsets against the deadletter topic without doing anything with the event itself. 

The second sharp edge that we experienced at Monzo when using Kafka as a queue is topic sizing and scaling. Let's look at this one next.

Processing a partition concurrently lets us scale within partitions

Getting the number of partitions for a topic right is a classic Kafka problem. We didn’t want this to be a golden question, where getting it wrong can be extremely painful. In particular, having too few partitions can unintentionally result in a standing backlog that you can burn through with patience or by skipping messages. Neither is great. If you could consume multiple messages concurrently within a partition and change that level of concurrency to alter throughput instead — then getting the partition count right is a less consequential decision. For this reason we set off to introduce concurrent consumption within individual partitions.

The majority of Monzo is written in Go, so spinning up goroutines to consume multiple events concurrently was easy on the surface. The complexity lies in committing offsets. Consider the following scenario:

The committed offset on balance_updated is at the lowest in progress message.

We have three concurrent goroutines processing events 2, 4 and 5. 3 has already been successfully processed. We can’t commit the offset for 3 until 2 has been successfully processed. If we did that and the service restarted, we’d see offset 3 as committed. We’d never process 2 again and effectively skip it forever. To address this issue we built an UnorderedOffsetManager, an abstraction that keeps track of all inflight events and the committable watermark. It handles committing the offset as all messages below it are successfully processed.

Introducing this concept of unordered concurrency does introduce some new failures to think about. What if we can’t process event 2 in this case?

If the committed offset can't progress, the difference between the highest in progress message and the committed offset increases.

Theoretically the other two goroutines could run away from the committed offset, 2, forever. This means that if the consumer were to restart you’d start again from 2 and be forced to reprocess all the other events - so much wasted, duplicate consumption. We addressed this in two ways. One, we only let you use this unordered concurrent consumer if you are happy to have a deadletter topic as well. This way we can have high confidence that the offset will move after maxAttempts retries. The only way it wouldn’t is if the consumer has a genuine deadlock - but the odds that this impacts just one event seemed so low we didn’t design for this. Two, we introduced the concept of a “max runaway.” This is an artificial limit on how much work can be in progress. If the difference between the lowest committed offset and the next offset a goroutine wants to pick up exceeds that value, we stop picking up new work and wait for a higher offset to be committed. You can think of this as a cap on how much duplicate work you’re willing to tolerate in the face of restarts or failure.

As you can see, this introduces some complexity but it does mean that we can effectively scale horizontally within a partition by adding goroutines. Of course there remains a limit on the size of the consumer itself, eventually it can’t spin up more goroutines without needing vertical scaling itself. In addition, there is still a point at which the only way to really increase throughput is by adding partitions. We’ve made this less painful, but not painless.

This results in two big wins for Monzo. The first is that we generally find consumers needing fewer partitions to get the throughput they want - a win for operating the cluster. The second and arguably bigger win is the fact that now you have a lot more agency as an engineer to scale up a consumer in the face of a standing backlog. Imagine you got your partition count wrong or had a spike in message publishing - you can just turn up the maximum concurrency. Over time the library could start doing this dynamically but today we let engineers control the concurrency and max runaway if the defaults don’t suffice.

Hopefully you see a queue emerging from the combination of all this functionality. You may very rightly be thinking —  if I was using this, how do I even decide what behavior I want? There’s a whole menu of options.

We generate a typed, choose-your-own-adventure fluent interface for defining consumers that grows with Monzo and the library

This is probably easier to show than describe in words. To setup the most powerful consumer described in this blog post you’d create a subscription like this:

subscription := balanceproto.BalanceUpdatedTopic.

If you remove WithDeadletter the WithUnorderedConcurrency option is no longer available and you get a compile error. Any invalid combination in options results in a compile error. If you drop both the deadletter and unordered concurrency option you have a pure Kafka subscription that consumes events in an ordered, blocking fashion.

We say this interface will grow with us because of the extensibility this design offers. Additional options can be added in a way that still only allows for valid combinations to compile. We build it up incrementally so it’s super clear to engineers exactly what they’re getting. Though we won’t cover it in this blog post, we already support more interesting options with even more in the works. Two quick teasers. One, we already support the library taking distributed locks over partitions before starting consumption to guarantee exclusive access even across Kafka consumer group generations. Two, we’re working on library support to take local locks on partition keys within an unordered concurrent subscription so you can get ordering within partition keys back while processing different keys concurrently! 

We’ve been using this for years and continue to invest into it

This library that we now think of as a queue on top of Kafka has grown over the years, with new, more sophisticated functionality being added over time. The core idea of sacrificing ordering to enable deadlettering and unordered concurrency has worked really well for us. Monzo’s engineering philosophy has always been to be opinionated so we can make the road-most-travelled easy and delightful. This library is a core way in which we do that and we’re excited to see how it continues to evolve.

Come work with us!

If you're keen to learn more about engineering at Monzo and what we do, please check out our careers page or check out these jobs 👇

Staff Engineer

Senior Backend Engineer