-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19800][SS][WIP] Implement one kind of streaming sampling - reservoir sampling #17141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Why does this need to be in Spark? |
|
Test build #73778 has finished for PR 17141 at commit
|
|
@srowen There are some unsupported |
| private val enc = Encoders.STRING.asInstanceOf[ExpressionEncoder[String]] | ||
| private val NUM_RECORDS_IN_PARTITION = enc.toRow("NUM_RECORDS_IN_PARTITION") | ||
| .asInstanceOf[UnsafeRow] | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NUM_RECORDS_IN_PARTITION calculate the total number of records in current partiton, and update at the end of sample.
| UnsafeProjection.create(withSumFieldTypes).apply(InternalRow.fromSeq( | ||
| new JoinedRow(kv._2, numRecordsTillNow) | ||
| .toSeq(withSumFieldTypes))) | ||
| }), {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we transfer the row to (row, numRecordsTillNow), and numRecordsTillNow is used to calculate the weight of item.
| .map(update => { | ||
| UnsafeProjection.create(withSumFieldTypes).apply(InternalRow.fromSeq( | ||
| new JoinedRow(update.value, numRecordsTillNow) | ||
| .toSeq(withSumFieldTypes))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
| .apply(InternalRow.fromSeq(row.toSeq(fieldTypes))) | ||
| ).iterator | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here, we do once global weight reservoir sampling.
| store.put(replacementIdx, r.asInstanceOf[UnsafeRow]) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In partiton, we just need to do once normal (without weight) reservoir sampling.
| .groupBy("value").count() | ||
| assert(df.count() == 3, "") | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new unit test needs to be improved.
|
Test build #74238 has finished for PR 17141 at commit
|
|
Test build #74841 has finished for PR 17141 at commit
|
|
Test build #74843 has finished for PR 17141 at commit
|
What changes were proposed in this pull request?
This pr adds a special streaming sample operator to support
sample. It adds a new evolving operatorreservoir, and introduce an new logical planReservoirSampleand two physical planStreamingReservoirSampleExecandReservoirSampleExec.The following cases are supported:
Not supported cases:
Followups:
reservoirintosampleoperatorHow was this patch tested?
add new unit tests.