Skip to content

Conversation

@koeninger
Copy link
Contributor

…ed KafkaRDD methods. Possible fix for [SPARK-7122], but probably a worthwhile optimization regardless.

…ed KafkaRDD methods. Possible fix for [SPARK-7122], but probably a worthwhile optimization regardless.
@srowen
Copy link
Member

srowen commented Jun 4, 2015

Jenkins, add to whitelist

@srowen
Copy link
Member

srowen commented Jun 4, 2015

ok to test

@srowen
Copy link
Member

srowen commented Jun 4, 2015

At a glance this makes sense to me. Let's see what tests say.

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34181 has finished for PR 6632 at commit c3768c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it, and I have a little concern about this. What if someone create a KafkaRDD with wrong offset ranges, which does not exist. In the current state, count will fail which is the correct thing to do. However, with this patch, it will give a count which is technically incorrect, rather than fail. May be a good idea to validate the limits of the offset ranges by actually querying Kafka, to verify that they exist before returning the count. And do it just once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now checking offset ranges in the createRdd method

@tdas
Copy link
Contributor

tdas commented Jun 4, 2015

Can you make a JIRA for this and add it to the title of the PR (see other PR's formatting). And title of the JIRA and PR could be a little more obvious - KafkaRDD optimizations for count() and take()

@tdas
Copy link
Contributor

tdas commented Jun 4, 2015

Other than that this looks very promising.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nonEmptyPartitions

@koeninger koeninger changed the title [Streaming][Kafka] Take advantage of offset range info for size-relat… [Streaming][Kafka][SPARK-8127] KafkaRDD optimize count() take() isEmpty() Jun 5, 2015
@SparkQA
Copy link

SparkQA commented Jun 5, 2015

Test build #34323 has finished for PR 6632 at commit 8974b9e.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 6, 2015

Test build #34348 has finished for PR 6632 at commit 253031d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor Author

@tdas is there anything else you feel needs to be done on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@koeninger I know this is probably a good scala way, but this is kinda hard to read for the nesting. Could you take a the for-yield and put it in a separate variable? And then check for errors?

@tdas
Copy link
Contributor

tdas commented Jun 19, 2015

I think its almost good to go. Few minor points.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is not check whether isEmpty is successful.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35310 has finished for PR 6632 at commit f68bd32.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • // class ParentClass(parentField: Int)
    • // class ChildClass(childField: Int) extends ParentClass(1)
    • // If the class type corresponding to current slot has writeObject() defined,
    • // then its not obvious which fields of the class will be serialized as the writeObject()
    • abstract class GeneratedClass
    • case class Bin(child: Expression)
    • case class Md5(child: Expression)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this check? Shouldnt it check that rdd.take(1) === "the" // whatever is expected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's asserting that item taken from the rdd is a member of the set of
messages sent

On Fri, Jun 19, 2015 at 4:07 PM, Tathagata Das [email protected]
wrote:

In
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
#6632 (comment):

@@ -68,6 +68,21 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {

 val received = rdd.map(_._2).collect.toSet
 assert(received === messages)
  • // size-related method optimizations return sane results
  • assert(rdd.count === messages.size)
  • assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
  • assert(! rdd.isEmpty)
  • assert(rdd.take(1).size === 1)
  • assert(messages(rdd.take(1).head._2))

What does this check? Shouldnt it check that rdd.take(1) === "the" //
whatever is expected


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/6632/files#r32869380.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt the test be stronger that it return the expected message from the right offset and not just any of the messages? Basically if there is a bug in the code where take(1) returns the last message in the offset range rather than the first message, it wont be caught.

@tdas
Copy link
Contributor

tdas commented Jun 19, 2015

Just a couple of more comments on the tests.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35329 has finished for PR 6632 at commit 5a05d0f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • // class ParentClass(parentField: Int)
    • // class ChildClass(childField: Int) extends ParentClass(1)
    • // If the class type corresponding to current slot has writeObject() defined,
    • // then its not obvious which fields of the class will be serialized as the writeObject()
    • abstract class GeneratedClass
    • case class Bin(child: Expression)
    • case class Md5(child: Expression)

@SparkQA
Copy link

SparkQA commented Jun 20, 2015

Test build #35331 has finished for PR 6632 at commit 321340d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • // class ParentClass(parentField: Int)
    • // class ChildClass(childField: Int) extends ParentClass(1)
    • // If the class type corresponding to current slot has writeObject() defined,
    • // then its not obvious which fields of the class will be serialized as the writeObject()
    • abstract class GeneratedClass
    • case class Bin(child: Expression)
    • case class Md5(child: Expression)

@tdas
Copy link
Contributor

tdas commented Jun 20, 2015

Merging this to master, thanks a lot.

@tdas
Copy link
Contributor

tdas commented Jun 20, 2015

Wait, oh, the title, please fix order :/

@koeninger koeninger changed the title [Streaming][Kafka][SPARK-8127] KafkaRDD optimize count() take() isEmpty() [SPARK-8127][Streaming][Kafka] KafkaRDD optimize count() take() isEmpty() Jun 20, 2015
@koeninger
Copy link
Contributor Author

fixed title

@tdas
Copy link
Contributor

tdas commented Jun 20, 2015

Merging to master.

@asfgit asfgit closed this in 1b6fe9b Jun 20, 2015
@tdas
Copy link
Contributor

tdas commented Jun 24, 2015

I forgot to say, thanks Cody! :)

@koeninger
Copy link
Contributor Author

Cheers :)

On Wed, Jun 24, 2015 at 2:06 PM, Tathagata Das [email protected]
wrote:

I forgot to say, thanks Cody! :)


Reply to this email directly or view it on GitHub
#6632 (comment).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants