Skip to content

Commit 0630d29

Browse files
committed
Merge pull request #21 from marmbrus/streaming-infra
Initial draft of Streaming Dataframe infrastructure
2 parents 9012582 + f928595 commit 0630d29

File tree

15 files changed

+1137
-31
lines changed

15 files changed

+1137
-31
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
import org.apache.spark.sql.DataFrame
21+
22+
/**
23+
* Used to pass a batch of data through a streaming query execution along with an indication
24+
* of progress in the stream.
25+
*/
26+
class Batch(val end: Offset, val data: DataFrame)
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
/**
21+
* An ordered collection of offsets, used to track the progress of processing data from one or more
22+
* [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
23+
* vector clock that must progress linearly forward.
24+
*/
25+
case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
26+
/**
27+
* Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
28+
* or greater than the specified object.
29+
*/
30+
override def compareTo(other: Offset): Int = other match {
31+
case otherComposite: CompositeOffset if otherComposite.offsets.size == offsets.size =>
32+
val comparisons = offsets.zip(otherComposite.offsets).map {
33+
case (Some(a), Some(b)) => a compareTo b
34+
case (None, None) => 0
35+
case (None, _) => -1
36+
case (_, None) => 1
37+
}
38+
val signs = comparisons.map(sign).distinct
39+
if (signs.size != 1) {
40+
throw new IllegalArgumentException(
41+
s"Invalid comparison between non-linear histories: $this <=> $other")
42+
}
43+
signs.head
44+
case _ =>
45+
throw new IllegalArgumentException(s"Cannot compare $this <=> $other")
46+
}
47+
48+
private def sign(num: Int): Int = num match {
49+
case i if i < 0 => -1
50+
case i if i == 0 => 0
51+
case i if i > 0 => 1
52+
}
53+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
/**
21+
* A simple offset for sources that produce a single linear stream of data.
22+
*/
23+
case class LongOffset(offset: Long) extends Offset {
24+
25+
override def compareTo(other: Offset): Int = other match {
26+
case l: LongOffset => offset.compareTo(l.offset)
27+
case _ =>
28+
throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}")
29+
}
30+
31+
def +(increment: Long): LongOffset = new LongOffset(offset + increment)
32+
def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
33+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
/**
21+
* A offset is a monotonically increasing metric used to track progress in the computation of a
22+
* stream. An [[Offset]] must be comparable.
23+
*/
24+
trait Offset extends Serializable {
25+
26+
/**
27+
* Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
28+
* or greater than the specified object.
29+
*/
30+
def compareTo(other: Offset): Int
31+
32+
def >(other: Offset): Boolean = compareTo(other) > 0
33+
def <(other: Offset): Boolean = compareTo(other) < 0
34+
def <=(other: Offset): Boolean = compareTo(other) <= 0
35+
def >=(other: Offset): Boolean = compareTo(other) >= 0
36+
def ==(other: Offset): Boolean = compareTo(other) == 0
37+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.{DataFrame, SQLContext}
23+
24+
/**
25+
* An interface for systems that can collect the results of a streaming query.
26+
*
27+
* When new data is produced by a query, a [[Sink]] must be able to transactionally collect the
28+
* data and update the [[StreamProgress]]. In the case of a failure, the sink will be recreated
29+
* and must be able to return the [[StreamProgress]] for all of the data that is made durable.
30+
* This contract allows Spark to process data with exactly-once semantics, even in the case
31+
* of failures that require the computation to be restarted.
32+
*/
33+
trait Sink {
34+
/**
35+
* Returns the [[Offset]] for all data that is currently present in the sink, if any. This
36+
* function will be called by Spark when restarting a stream in order to determine at which point
37+
* in streamed input data computation should be resumed from.
38+
*/
39+
def currentProgress: Option[Offset]
40+
41+
/**
42+
* Accepts a new batch of data as well as a [[StreamProgress]] that denotes how far in the input
43+
* data computation has progressed to. When computation restarts after a failure, it is important
44+
* that a [[Sink]] returns the same [[Offset]] as the most recent batch of data that
45+
* has been persisted durrably. Note that this does not necessarily have to be the
46+
* [[Offset]] for the most recent batch of data that was given to the sink. For example,
47+
* it is valid to buffer data before persisting, as long as the [[Offset]] is stored
48+
* transactionally as data is eventually persisted.
49+
*/
50+
def addBatch(batch: Batch): Unit
51+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.sql.SQLContext
22+
import org.apache.spark.sql.catalyst.InternalRow
23+
import org.apache.spark.sql.types.StructType
24+
25+
/**
26+
* A source of continually arriving data for a streaming query. A [[Source]] must have a
27+
* monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark
28+
* will regularly query each [[Source]] to see if any more data is available.
29+
*/
30+
trait Source {
31+
32+
/** Returns the schema of the data from this source */
33+
def schema: StructType
34+
35+
/**
36+
* Returns the next batch of data that is available after `start`, if any is available.
37+
*/
38+
def getNextBatch(start: Option[Offset]): Option[Batch]
39+
}

0 commit comments

Comments
 (0)