Skip to content

Conversation

@pnakhe
Copy link

@pnakhe pnakhe commented Dec 14, 2016

What changes were proposed in this pull request?

This pull request is to fix for SPARK-18779. When using kafka 0.10.1.0 messages are being read only from one partition. The current kafka-spark 0.10 integration ships with kafka 0.10.0.1 where messages are read from all partitions but using kafka client 0.10.1.0 client, messages are read from only one partition.

In the ConsumerStrategy class there is a pause on the consumer. We never resume the consumer and that seems to causing the issue. The KafkaConsumer implementation has changed between 10.0.1 and 10.1.0 which has exposed this issue. The solution to this issue is to resume the consumer before we find the position in DirectKafkaInputDStream class in the latestOffsets method.The reason the issue is not seen in the current setup is because pause/resume logic is changed in the latest kafka version. We dont seem to have a resume for the pause and hence this fix is necessary.

This patch fixes the issue.

How was this patch tested?

The spark-kafka test cases were run to check no regressions were caused. I have checked that messages are being read from all partitions for both 0.10.0.1 kafka client and 0.10.1.0 client.

In the ConsumerStrategy class there is a pause on the consumer. We never resume the consumer and that seems to causing the issue. The KafkaConsumer implementation has changed between 10.0.1 and 10.1.0 which has exposed this issue. The solution to this issue is to resume the consumer before we find the position in DirectKafkaInputDStream class in the latestOffsets method.
I have tested this fix and it works fine. The reason the issue is not seen in the current setup is because pause/resume logic is changed in the latest kafka version. We dont seem to have a resume for the pause and hence this fix is necessary.
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@zsxwing
Copy link
Member

zsxwing commented Dec 14, 2016

pause/resume logic is changed in the latest kafka version

Could you post the Kafka JIRA for this change? Just want to understand the issue.

@HyukjinKwon
Copy link
Member

(@pnakhe gentle ping, I am curious too)

@pnakhe
Copy link
Author

pnakhe commented Feb 9, 2017

@HyukjinKwon Well the issue was not with spark after all. It was a regression on kafka between 0.10.1.0 and 0.10.1.1. Its fixed as part of https://issues.apache.org/jira/browse/KAFKA-4547

I have updated the defect with the same

@HyukjinKwon
Copy link
Member

Aha, thanks for the details, then is this PR/JIRA closable maybe?

srowen added a commit to srowen/spark that referenced this pull request Mar 22, 2017
@srowen srowen mentioned this pull request Mar 22, 2017
@asfgit asfgit closed this in b70c03a Mar 23, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants