1818package org .apache .spark .sql .execution .datasources
1919
2020import scala .collection .mutable
21+ import scala .concurrent .{ExecutionContext , Future }
22+ import scala .concurrent .duration .Duration
2123
2224import org .apache .spark .{Partition => RDDPartition , TaskContext }
2325import org .apache .spark .deploy .SparkHadoopUtil
2426import org .apache .spark .rdd .{InputFileNameHolder , RDD }
2527import org .apache .spark .sql .SparkSession
2628import org .apache .spark .sql .catalyst .InternalRow
2729import org .apache .spark .sql .execution .vectorized .ColumnarBatch
30+ import org .apache .spark .util .ThreadUtils
2831
2932/**
3033 * A single file that should be read, along with partition column values that
@@ -50,12 +53,28 @@ case class PartitionedFile(
5053 */
5154case class FilePartition (index : Int , files : Seq [PartitionedFile ]) extends RDDPartition
5255
56+ object FileScanRDD {
57+ private val ioExecutionContext = ExecutionContext .fromExecutorService(
58+ ThreadUtils .newDaemonCachedThreadPool(" FileScanRDD" , 16 ))
59+ }
60+
5361class FileScanRDD (
5462 @ transient private val sparkSession : SparkSession ,
5563 readFunction : (PartitionedFile ) => Iterator [InternalRow ],
5664 @ transient val filePartitions : Seq [FilePartition ])
5765 extends RDD [InternalRow ](sparkSession.sparkContext, Nil ) {
5866
67+ /**
68+ * To get better interleaving of CPU and IO, this RDD will create a future to prepare the next
69+ * file while the current one is being processed. `currentIterator` is the current file and
70+ * `nextFile` is the future that will initialize the next file to be read. This includes things
71+ * such as starting up connections to open the file and any initial buffering. The expectation
72+ * is that `currentIterator` is CPU intensive and `nextFile` is IO intensive.
73+ */
74+ val isAsyncIOEnabled = sparkSession.sessionState.conf.filesAsyncIO
75+
76+ case class NextFile (file : PartitionedFile , iter : Iterator [Object ])
77+
5978 override def compute (split : RDDPartition , context : TaskContext ): Iterator [InternalRow ] = {
6079 val iterator = new Iterator [Object ] with AutoCloseable {
6180 private val inputMetrics = context.taskMetrics().inputMetrics
@@ -88,6 +107,9 @@ class FileScanRDD(
88107 private [this ] var currentFile : PartitionedFile = null
89108 private [this ] var currentIterator : Iterator [Object ] = null
90109
110+ private [this ] var nextFile : Future [NextFile ] =
111+ if (isAsyncIOEnabled) prepareNextFile() else null
112+
91113 def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator()
92114 def next () = {
93115 val nextElement = currentIterator.next()
@@ -107,29 +129,45 @@ class FileScanRDD(
107129 /** Advances to the next file. Returns true if a new non-empty iterator is available. */
108130 private def nextIterator (): Boolean = {
109131 updateBytesReadWithFileSize()
110- if (files.hasNext) {
111- currentFile = files.next()
112- logInfo(s " Reading File $currentFile" )
113- InputFileNameHolder .setInputFileName(currentFile.filePath)
114-
115- try {
116- currentIterator = readFunction(currentFile)
117- } catch {
118- case e : java.io.FileNotFoundException =>
119- throw new java.io.FileNotFoundException (
120- e.getMessage + " \n " +
121- " It is possible the underlying files have been updated. " +
122- " You can explicitly invalidate the cache in Spark by " +
123- " running 'REFRESH TABLE tableName' command in SQL or " +
124- " by recreating the Dataset/DataFrame involved."
125- )
132+ if (isAsyncIOEnabled) {
133+ if (nextFile != null ) {
134+ // Wait for the async task to complete
135+ val file = ThreadUtils .awaitResult(nextFile, Duration .Inf )
136+ InputFileNameHolder .setInputFileName(file.file.filePath)
137+ currentIterator = file.iter
138+ // Asynchronously start the next file.
139+ nextFile = prepareNextFile()
140+ hasNext
141+ } else {
142+ currentFile = null
143+ InputFileNameHolder .unsetInputFileName()
144+ false
126145 }
127-
128- hasNext
129146 } else {
130- currentFile = null
131- InputFileNameHolder .unsetInputFileName()
132- false
147+ if (files.hasNext) {
148+ currentFile = files.next()
149+ logInfo(s " Reading File $currentFile" )
150+ InputFileNameHolder .setInputFileName(currentFile.filePath)
151+
152+ try {
153+ currentIterator = readFunction(currentFile)
154+ } catch {
155+ case e : java.io.FileNotFoundException =>
156+ throw new java.io.FileNotFoundException (
157+ e.getMessage + " \n " +
158+ " It is possible the underlying files have been updated. " +
159+ " You can explicitly invalidate the cache in Spark by " +
160+ " running 'REFRESH TABLE tableName' command in SQL or " +
161+ " by recreating the Dataset/DataFrame involved."
162+ )
163+ }
164+
165+ hasNext
166+ } else {
167+ currentFile = null
168+ InputFileNameHolder .unsetInputFileName()
169+ false
170+ }
133171 }
134172 }
135173
@@ -138,6 +176,33 @@ class FileScanRDD(
138176 updateBytesReadWithFileSize()
139177 InputFileNameHolder .unsetInputFileName()
140178 }
179+
180+ def prepareNextFile (): Future [NextFile ] = {
181+ if (files.hasNext) {
182+ Future {
183+ val nextFile = files.next()
184+ val nextFileIter =
185+ try {
186+ readFunction(nextFile)
187+ } catch {
188+ case e : java.io.FileNotFoundException =>
189+ throw new java.io.FileNotFoundException (
190+ e.getMessage + " \n " +
191+ " It is possible the underlying files have been updated. " +
192+ " You can explicitly invalidate the cache in Spark by " +
193+ " running 'REFRESH TABLE tableName' command in SQL or " +
194+ " by recreating the Dataset/DataFrame involved."
195+ )
196+ }
197+
198+ // Read something from the file to trigger some initial IO.
199+ nextFileIter.hasNext
200+ NextFile (nextFile, nextFileIter)
201+ }(FileScanRDD .ioExecutionContext)
202+ } else {
203+ null
204+ }
205+ }
141206 }
142207
143208 // Register an on-task-completion callback to close the input stream.
0 commit comments