Skip to content

Conversation

@uncleGen
Copy link
Contributor

@uncleGen uncleGen commented Mar 2, 2017

What changes were proposed in this pull request?

This pr adds a special streaming sample operator to support sample. It adds a new evolving operator reservoir, and introduce an new logical plan ReservoirSample and two physical plan StreamingReservoirSampleExec and ReservoirSampleExec.

The following cases are supported:

  • batch table reservoir sampling
  • stream table reservoir sampling with/without aggregation and watermark in Update/Complete output mode

Not supported cases:

  • reservoir sampling in Append output mode (No meaning)

Followups:

  • move reservoir into sample operator

How was this patch tested?

add new unit tests.

@srowen
Copy link
Member

srowen commented Mar 2, 2017

Why does this need to be in Spark?

@SparkQA
Copy link

SparkQA commented Mar 2, 2017

Test build #73778 has finished for PR 17141 at commit 288c124.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@uncleGen
Copy link
Contributor Author

uncleGen commented Mar 3, 2017

@srowen There are some unsupported operator for Structured Streaming. You can view here: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala#L184
This pr adds support for sample operator, more exactly reservoir operator. But next step, I will try to combine reservoir into sample

@uncleGen
Copy link
Contributor Author

uncleGen commented Mar 3, 2017

cc @zsxwing and @tdas

private val enc = Encoders.STRING.asInstanceOf[ExpressionEncoder[String]]
private val NUM_RECORDS_IN_PARTITION = enc.toRow("NUM_RECORDS_IN_PARTITION")
.asInstanceOf[UnsafeRow]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NUM_RECORDS_IN_PARTITION calculate the total number of records in current partiton, and update at the end of sample.

UnsafeProjection.create(withSumFieldTypes).apply(InternalRow.fromSeq(
new JoinedRow(kv._2, numRecordsTillNow)
.toSeq(withSumFieldTypes)))
}), {})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we transfer the row to (row, numRecordsTillNow), and numRecordsTillNow is used to calculate the weight of item.

.map(update => {
UnsafeProjection.create(withSumFieldTypes).apply(InternalRow.fromSeq(
new JoinedRow(update.value, numRecordsTillNow)
.toSeq(withSumFieldTypes)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

.apply(InternalRow.fromSeq(row.toSeq(fieldTypes)))
).iterator
})
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, we do once global weight reservoir sampling.

store.put(replacementIdx, r.asInstanceOf[UnsafeRow])
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In partiton, we just need to do once normal (without weight) reservoir sampling.

.groupBy("value").count()
assert(df.count() == 3, "")
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new unit test needs to be improved.

@uncleGen
Copy link
Contributor Author

uncleGen commented Mar 7, 2017

ping @tdas and @zsxwing

@SparkQA
Copy link

SparkQA commented Mar 9, 2017

Test build #74238 has finished for PR 17141 at commit c4008cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public static class LongWrapper
  • public static class IntWrapper
  • case class ResolveInlineTables(conf: CatalystConf) extends Rule[LogicalPlan]
  • case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper
  • case class JoinPlan(itemIds: Set[Int], plan: LogicalPlan, joinConds: Set[Expression], cost: Cost)
  • case class Cost(rows: BigInt, size: BigInt)
  • abstract class RepartitionOperation extends UnaryNode
  • case class FlatMapGroupsWithState(
  • class CSVOptions(
  • class UnivocityParser(
  • trait WatermarkSupport extends UnaryExecNode
  • case class FlatMapGroupsWithStateExec(

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74841 has finished for PR 17141 at commit 1ddb82e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74843 has finished for PR 17141 at commit 02d44aa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ReservoirSampleExec(reservoirSize: Int, child: SparkPlan) extends UnaryExecNode

@jiangxb1987
Copy link
Contributor

Is anyone still working on this or this could be closed? @uncleGen @zsxwing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants