1. [SPARK-23623][SS] Avoid concurrent use of cached consumers in (commit: 6937571ab8818a62ec2457a373eb3f6f618985e1) (details)
Commit 6937571ab8818a62ec2457a373eb3f6f618985e1 by zsxwing
[SPARK-23623][SS] Avoid concurrent use of cached consumers in
CachedKafkaConsumer (branch-2.3)
This is a backport of #20767 to branch 2.3
## What changes were proposed in this pull request? CacheKafkaConsumer
in the project `kafka-0-10-sql` is designed to maintain a pool of
KafkaConsumers that can be reused. However, it was built with the
assumption there will be only one task using trying to read the same
Kafka TopicPartition at the same time. Hence, the cache was keyed by the
TopicPartition a consumer is supposed to read. And any cases where this
assumption may not be true, we have SparkPlan flag to disable the use of
a cache. So it was up to the planner to correctly identify when it was
not safe to use the cache and set the flag accordingly.
Fundamentally, this is the wrong way to approach the problem. It is HARD
for a high-level planner to reason about the low-level execution model,
whether there will be multiple tasks in the same query trying to read
the same partition. Case in point, 2.3.0 introduced stream-stream joins,
and you can build a streaming self-join query on Kafka. It's pretty
non-trivial to figure out how this leads to two tasks reading the same
partition twice, possibly concurrently. And due to the non-triviality,
it is hard to figure this out in the planner and set the flag to avoid
the cache / consumer pool. And this can inadvertently lead to
ConcurrentModificationException ,or worse, silent reading of incorrect
Here is a better way to design this. The planner shouldnt have to
understand these low-level optimizations. Rather the consumer pool
should be smart enough avoid concurrent use of a cached consumer.
Currently, it tries to do so but incorrectly (the flag inuse is not
checked when returning a cached consumer, see
If there is another request for the same partition as a currently in-use
consumer, the pool should automatically return a fresh consumer that
should be closed when the task is done. Then the planner does not have
to have a flag to avoid reuses.
This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
- Cached consumer - this should be returned to the pool at task end
- Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference
from the users of the consumer so that the client code does not have to
reason about whether to stop and release. They simply called `val
consumer = KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer
is generated.
- If there is a concurrent attempt of the same task, then a new consumer
is generated, and the existing cached consumer is marked for close upon
- In addition, I renamed the classes because CachedKafkaConsumer is a
misnomer given that what it returns may or may not be cached.
This PR does not remove the planner flag to avoid reuse to make this
patch safe enough for merging in branch-2.3. This can be done later in
## How was this patch tested? A new stress test that verifies it is safe
to concurrently get consumers for the same partition from the consumer
Author: Tathagata Das <>
Closes #20848 from tdas/SPARK-23623-2.3.
(commit: 6937571ab8818a62ec2457a373eb3f6f618985e1)
The file was removedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
The file was addedexternal/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
The file was addedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala (diff)
The file was removedexternal/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumerSuite.scala