Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSource._
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.spark.sql.kafka010

import org.apache.kafka.common.TopicPartition

import org.apache.spark.sql.execution.streaming.SerializedOffset
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}

/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* restart checkpoints. Sources should provide an Offset implementation which they can use to
* reconstruct the stream position where the offset was taken.
*/
public abstract class Offset {
public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset {
/**
* A JSON-serialized representation of an Offset that is
* used for saving offsets to the offset log.
Expand All @@ -41,8 +41,8 @@ public abstract class Offset {
*/
@Override
public boolean equals(Object obj) {
if (obj instanceof Offset) {
return this.json().equals(((Offset) obj).json());
if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
return this.json().equals(((org.apache.spark.sql.execution.streaming.Offset) obj).json());
} else {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.types.StructType

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import scala.util.control.Exception._
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.sql.sources.v2.reader.Offset

/**
* Offset for the [[FileStreamSource]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.sources.v2.reader.Offset

/**
* A simple offset for sources that produce a single linear stream of data.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.{Clock, Utils}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming;

/**
* This is an internal, deprecated interface. New source implementations should use the
* org.apache.spark.sql.sources.v2.reader.Offset class, which is the one that will be supported
* in the long term.
*
* This class will be removed in a future release.
*/
public abstract class Offset {
/**
* A JSON-serialized representation of an Offset that is
* used for saving offsets to the offset log.
* Note: We assume that equivalent/equal offsets serialize to
* identical JSON strings.
*
* @return JSON string encoding
*/
public abstract String json();

/**
* Equality based on JSON string representation. We leverage the
* JSON representation for normalization between the Offset's
* in memory and on disk representations.
*/
@Override
public boolean equals(Object obj) {
if (obj instanceof Offset) {
return this.json().equals(((Offset) obj).json());
} else {
return false;
}
}

@Override
public int hashCode() {
return this.json().hashCode();
}

@Override
public String toString() {
return this.json();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.internal.SQLConf.{SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS}
import org.apache.spark.sql.sources.v2.reader.Offset

/**
* An ordered collection of offsets, used to track the progress of processing data from one or more
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets._
import scala.io.{Source => IOSource}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.sources.v2.reader.Offset

/**
* This class is used to log offsets to persistent files in HDFS.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.streaming.continuous.ContinuousRateStreamReader
import org.apache.spark.sql.execution.streaming.sources.RateStreamV2Reader
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, MicroBatchReader, Offset}
import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, MicroBatchReader}
import org.apache.spark.sql.types._
import org.apache.spark.util.{ManualClock, SystemClock}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package org.apache.spark.sql.execution.streaming
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization

import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.sources.v2

case class RateStreamOffset(partitionToValueAndRunTimeMs: Map[Int, (Long, Long)])
extends Offset {
extends v2.reader.Offset {
implicit val defaultFormats: DefaultFormats = DefaultFormats
override val json = Serialization.write(partitionToValueAndRunTimeMs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.sources.v2.reader.Offset


/**
* Used when loading a JSON serialized offset from external storage.
* We are currently not responsible for converting JSON serialized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.types.StructType

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.streaming

import scala.collection.{immutable, GenTraversableOnce}

import org.apache.spark.sql.sources.v2.reader.Offset

/**
* A helper class that looks like a Map[Source, Offset].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.json4s.jackson.Serialization
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.{RateSourceProvider, RateStreamOffset}
import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, Statistics}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming
package org.apache.spark.sql.execution.streaming.sources

import java.util.Optional

Expand All @@ -27,6 +27,7 @@ import org.json4s.jackson.Serialization

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.RateStreamOffset
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType}
Expand Down Expand Up @@ -59,7 +60,9 @@ class RateStreamV2Reader(options: DataSourceV2Options)
private var start: RateStreamOffset = _
private var end: RateStreamOffset = _

override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {
override def setOffsetRange(
start: Optional[Offset],
end: Optional[Offset]): Unit = {
this.start = start.orElse(
RateStreamSourceV2.createInitialOffset(numPartitions, creationTimeMs))
.asInstanceOf[RateStreamOffset]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming
package org.apache.spark.sql.execution.streaming.sources

import javax.annotation.concurrent.GuardedBy

Expand All @@ -26,6 +26,7 @@ import scala.util.control.NonFatal
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, DataSourceV2Options, MicroBatchWriteSupport}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.streaming.OutputMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.streaming.sources._
import org.apache.spark.sql.streaming.{OutputMode, StreamTest}

class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask, RateStreamSourceV2, RateStreamV2Reader}
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2Options, MicroBatchReadSupport}
import org.apache.spark.sql.streaming.StreamTest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSQLContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.spark.sql.streaming

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.execution.streaming.{LongOffset, SerializedOffset}
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, SerializedOffset}

trait OffsetSuite extends SparkFunSuite {
/** Creates test to check all the comparisons of offsets given a `one` that is less than `two`. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreCon
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.{Clock, SystemClock, Utils}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.expressions.scalalang.typed
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.OutputMode._
import org.apache.spark.sql.streaming.util.{MockSourceProvider, StreamManualClock}
import org.apache.spark.sql.types.StructType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.scheduler._
import org.apache.spark.sql.{Encoder, SparkSession}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.util.JsonProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ManualClock


class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging with MockitoSugar {

import AwaitTerminationTester._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.{ProcessingTime => DeprecatedProcessingTime, _}
import org.apache.spark.sql.streaming.Trigger._
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ package org.apache.spark.sql.streaming.util
import java.util.concurrent.CountDownLatch

import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.streaming.{LongOffset, Sink, Source}
import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source}
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

Expand Down