[SPARK-24987][SS] - Fix Kafka consumer leak when no new offsets for
## What changes were proposed in this pull request?
This small fix adds a `consumer.release()` call to `KafkaSourceRDD` in
the case where we've retrieved offsets from Kafka, but the `fromOffset`
is equal to the `lastOffset`, meaning there is no new data to read for a
particular topic partition. Up until now, we'd just return an empty
iterator without closing the consumer which would cause a FD leak.
If accepted, this pull request should be merged into master as well.
## How was this patch tested?
Haven't ran any specific tests, would love help on how to test methods
running inside `RDD.compute`.
Author: Yuval Itzchakov <>
Closes #21997 from YuvalItzchakov/master.
