Photo by note thanun on Unsplash

How Intuit Debug Consumer Lags in Apache Beam

Antonio Si
8 min readApr 8, 2021

--

At Intuit, we have been using Apache Beam with FlinkRunner as a real time data processing pipelines platform for our data pipelines. Recently, we encountered an issue where our Kafka consumer lag was steadily increasing over time.

Being new to Beam, there have been very few blogs or documentations describing the internals of Beam and it took us quite some time to understand what was causing the lag. This blog shares our knowledge we learned through debugging this issue. We will also point to the source code for those who are interested in looking at the code for more details.

At the time we were solving this issue, we used Apache Beam v2.23.0, with FlinkRunner using Flink 1.9. We understand it is not the latest version. However, we believe the information described in this blog is still useful for those who are interested in understanding how the internals of Apache Beam works.

Pipeline Definition

Our pipeline is a stateless pipeline. It consumes data from 15 Kafka topics, applies several transformations, and output data to 1 single Kafka topic. A single KafkaIO.read() operator is used to manage all 15 Kafka input topics. Each topic has a different traffic rate, ranging from a high number of messages per second (TPS) to a very low TPS. Different topics also have different numbers of partitions. Higher TPS topics have a higher number of partitions, as high as 180 partitions, while low TPS topics have a lower number of partitions, as low as 12 partitions.

Our pipeline is running in a kubernetes (k8) cluster configured with 8 task managers and 1 job manager. The parallelism of our pipeline is set to 8 and the number of task slots per task manager is set to 1. The maxParallelism is set to 180. Each task manager is configured with 2 CPU cores, with 8 GB memory.

Our pipeline injects metrics to wavefront when it consumes messages from input topics and when it produces messages to the output topic.

The Problem

One of the metrics that we monitor is the consumer lag which is the number of messages that have not been processed for each Kafka partition. When monitoring our metrics in wavefront, we immediately observe that the consumer lag metric of our topics increases over time as shown in the following diagram:

We further observe that higher TPS topics will have a higher consumer lag over time when compared to lower TPS topics.

How do we Debug

Our approach to this issue was to enable jmx in all our task managers and connect jvisualvm to the jmx port of one of the task managers. We took a few thread dumps and found out a thread named Legacy Source Thread is the main thread that runs the pipeline. The thread involves a class called UnboundedSourceWrapper. Using these stack traces as a starting point, we investigated the Beam source code more closely and tried to step through the code with a remote debugger.

What have we Learned

UnboundedSourceWrapper and KafkaUnboundedReader are the two main classes involved in FlinkRunner and KafkaIO. The number of readers instantiated depends on a few parameters including maxParallelism, parallelism, and total number of partitions over all topics. According to the source code, the number of readers instantiated is calculated by

⌈min(Σ(no of partitions), max(maxParallelism, parallelism))/parallelism⌉

Therefore, in our configuration, there are a total of ⌈min(Σ(no of partitions), max(180, 8))/8⌉ = 23 readers instantiated as the total number of partitions across all our 15 topics is higher than 180. We also confirmed that from jvisualvm as indicated below:

Each KafkaUnboundedReader is assigned a certain number of topic partitions. Beam internally, creates

min(Σ(no of partitions), max(maxParallelism, parallelism))

buckets and assigns the topic partitions to each bucket in a round robin fashion.

To illustrate, consider a scale down environment in which we have 3 topics. The first topic has 9 partitions; the second topic has 6 partitions, and the third topic has 9 partitions, resulting in a total of 24 partitions. With a maxParallelism of 20 and a parallelism of 10, Beam internally, creates min(Σ(no of partitions), max(maxParallelism, parallelism)) = 20 buckets and assigns the topic partitions to each bucket as shown in the following diagram.

Each task manager will pick up the readers whose bucketId%taskId = 0.

Each KafkaUnboundedReader spawns a thread to poll records belonging to the assigned topic partitions from Kafka(code). So, in our case, it spawns a total of 23 threads as shown in jvisualvm:

KafkaUnboundedReader and the kafa record polling thread communicate via a single entry synchronous queue (code). When the polling thread obtains records from Kafka, it deposits the records to the queue, and waits for the reader to pick it up. Once the reader has picked up the records, the queue will become empty and the polling thread will move on to poll another batch of records from Kafka.

The described relationship between UnboundedSourceWrapper, KafkaUnboundedReader, and the polling thread is shown in the following diagram:

UnboundedSourceWrapper loops through all the readers and retrieves the next batch of records from each reader (code). However, if the next batch of records is not ready when the reader attempts to pick up, the reader will wait for 10 ms before it returns (code). For a topic with high volume of traffic, it will not be a problem because the next batch of records will be ready to be picked up. However, for a topic with slow traffic, the execution thread will wait for 10 ms and then discover that it does not have any records and returns.

In our pipeline, there are topics with very low TPS. That means processing of the high TPS topics will be blocked for 10 ms every time readers managing low TPS topics are inspected, thus causing the consumer lags for high TPS topic(s) to increase over time.

Workaround #1: set maxParallelism to -1

We have tried two workarounds to address this issue. In our first approach, we do not set the maxParallelism value. By default, maxParallelism is set to -1. This will in effect, create only 1 KafkaUnboundedReader as shown below:

In this case, all the 15 topics are managed by this reader and only 1 polling thread is responsible for polling records from any of these topics.

Since only 1 consumer is polling data from 15 topics and partitions during the pipeline execution, whenever this reader is inspected, the next batch of records will always be ready since there are topics with high TPS. Therefore, the consumer lags of the topics remain relatively constant. This is shown in the following diagram, depicting the consumer lag over time when we do not set the maxParallelism:

Workaround #2: use Flatten.pCollection()

Another attempt to improve consumer lag is to use multiple KafkaIO.read(), each managing 1 kafka topic. This will result in multiple PCollections, one per usage of KafkaIO.read(). We then use the flattening api to merge all the PCollections. This is illustrated in the following sample code snippet:

PCollectionList.of(pCollList)
.apply("flatten pCollList", Flatten.pCollections());

For each KafkaIO.read(), Beam creates 1 Legacy Source Thread. Each of these threads will have a KafkaUnboundedReader and a thread polling data from Kafka. This is shown below:

Here, each thread is responsible for polling data from its managed topic. Therefore, even though the thread handling low TPS topic will have to wait if the records are not ready, it will not block the thread handling high TPS topic, resulting in relatively constant consumer lag in this case as well:

One may notice that the consumer lag is not as stable as one obtained using -1 for maxParallelism, but when compared with the base case obtained when maxParallelism was set to 180, it is still comparatively more constant.

Recommendations

Based on our observations, setting maxParallelism to -1 would result in a more stable consumer lag. However, since we are using FlinkRunner for our pipeline, maxParallelism has a different meaning internally to Flink. Flink uses maxParallelism as a limit on the maximum number of task managers (since we set the number of task slots per manager to 1) the job can scale up to. Once it is set, you cannot scale up the number of task managers beyond the maxParallelism value. Setting maxParallelism to -1 would be equivalent to not setting maxParallelism at all according to Flink and Flink would use a default 128 for maxParallelism internally. That would mean you cannot scale your job beyond 128 task managers.

If 128 is enough for your job, setting maxParallelism to -1 may be a good alternative. However, if your job may need to scale beyond 128, you may need a higher value of maxParallelism and use Flatten.pCollection().

Conclusion

In this blog, we went over what we consider as the 2 central classes for FlinkRunner and KafkaIO of Beam and how it may affect pipeline performance. Beam is a powerful framework and what we have learned is only the beginning. We hope to share more knowledge as we learn more in this journey. We would also appreciate more technical blogs of this sort from the community for the collective benefit.

𝚂𝚙𝚎𝚌𝚒𝚊𝚕 𝚝𝚑𝚊𝚗𝚔𝚜 𝚝𝚘 𝚖𝚢 𝚌𝚘𝚕𝚕𝚎𝚊𝚐𝚞𝚎𝚜 𝚊𝚝 𝙸𝚗𝚝𝚞𝚒𝚝 𝚏𝚘𝚛 𝚜𝚞𝚐𝚐𝚎𝚜𝚝𝚒𝚘𝚗𝚜 𝚊𝚗𝚍 𝚖𝚞𝚕𝚝𝚒𝚙𝚕𝚎 𝚛𝚘𝚞𝚗𝚍𝚜 𝚘𝚏 𝚎𝚍𝚒𝚝𝚒𝚗𝚐 𝚘𝚗 𝚎𝚊𝚛𝚕𝚢 𝚍𝚛𝚊𝚏𝚝 𝚘𝚏 𝚝𝚑𝚒𝚜 𝚊𝚛𝚝𝚒𝚌𝚕𝚎.

--

--