Skip to content

Commit b41a39e

Browse files
Davies Liumateiz
authored andcommitted
[SPARK-4186] add binaryFiles and binaryRecords in Python
add binaryFiles() and binaryRecords() in Python ``` binaryFiles(self, path, minPartitions=None): :: Developer API :: Read a directory of binary files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file. Note: Small files are preferred, large file is also allowable, but may cause bad performance. binaryRecords(self, path, recordLength): Load data from a flat binary file, assuming each record is a set of numbers with the specified numerical format (see ByteBuffer), and the number of bytes per record is constant. :param path: Directory to the input data files :param recordLength: The length at which to split the records ``` Author: Davies Liu <[email protected]> Closes #3078 from davies/binary and squashes the following commits: cd0bdbd [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary 3aa349b [Davies Liu] add experimental notes 24e84b6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary 5ceaa8a [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary 1900085 [Davies Liu] bugfix bb22442 [Davies Liu] add binaryFiles and binaryRecords in Python
1 parent 5f27ae1 commit b41a39e

File tree

5 files changed

+90
-22
lines changed

5 files changed

+90
-22
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
560560

561561

562562
/**
563+
* :: Experimental ::
564+
*
563565
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
564566
* (useful for binary data)
565567
*
@@ -602,6 +604,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
602604
}
603605

604606
/**
607+
* :: Experimental ::
608+
*
605609
* Load data from a flat binary file, assuming the length of each record is constant.
606610
*
607611
* @param path Directory to the input data files

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,14 @@ import java.io.Closeable
2121
import java.util
2222
import java.util.{Map => JMap}
2323

24-
import java.io.DataInputStream
25-
26-
import org.apache.hadoop.io.{BytesWritable, LongWritable}
27-
import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat}
28-
2924
import scala.collection.JavaConversions
3025
import scala.collection.JavaConversions._
3126
import scala.language.implicitConversions
3227
import scala.reflect.ClassTag
3328

3429
import com.google.common.base.Optional
3530
import org.apache.hadoop.conf.Configuration
31+
import org.apache.spark.input.PortableDataStream
3632
import org.apache.hadoop.mapred.{InputFormat, JobConf}
3733
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3834

@@ -286,6 +282,8 @@ class JavaSparkContext(val sc: SparkContext)
286282
new JavaPairRDD(sc.binaryFiles(path, minPartitions))
287283

288284
/**
285+
* :: Experimental ::
286+
*
289287
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
290288
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
291289
* record and returned in a key-value pair, where the key is the path of each file,
@@ -312,15 +310,19 @@ class JavaSparkContext(val sc: SparkContext)
312310
*
313311
* @note Small files are preferred; very large files but may cause bad performance.
314312
*/
313+
@Experimental
315314
def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] =
316315
new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions))
317316

318317
/**
318+
* :: Experimental ::
319+
*
319320
* Load data from a flat binary file, assuming the length of each record is constant.
320321
*
321322
* @param path Directory to the input data files
322323
* @return An RDD of data with values, represented as byte arrays
323324
*/
325+
@Experimental
324326
def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = {
325327
new JavaRDD(sc.binaryRecords(path, recordLength))
326328
}

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.io._
2121
import java.net._
2222
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
2323

24+
import org.apache.spark.input.PortableDataStream
25+
2426
import scala.collection.JavaConversions._
2527
import scala.collection.mutable
2628
import scala.language.existentials
@@ -395,22 +397,33 @@ private[spark] object PythonRDD extends Logging {
395397
newIter.asInstanceOf[Iterator[String]].foreach { str =>
396398
writeUTF(str, dataOut)
397399
}
398-
case pair: Tuple2[_, _] =>
399-
pair._1 match {
400-
case bytePair: Array[Byte] =>
401-
newIter.asInstanceOf[Iterator[Tuple2[Array[Byte], Array[Byte]]]].foreach { pair =>
402-
dataOut.writeInt(pair._1.length)
403-
dataOut.write(pair._1)
404-
dataOut.writeInt(pair._2.length)
405-
dataOut.write(pair._2)
406-
}
407-
case stringPair: String =>
408-
newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { pair =>
409-
writeUTF(pair._1, dataOut)
410-
writeUTF(pair._2, dataOut)
411-
}
412-
case other =>
413-
throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass)
400+
case stream: PortableDataStream =>
401+
newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
402+
val bytes = stream.toArray()
403+
dataOut.writeInt(bytes.length)
404+
dataOut.write(bytes)
405+
}
406+
case (key: String, stream: PortableDataStream) =>
407+
newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
408+
case (key, stream) =>
409+
writeUTF(key, dataOut)
410+
val bytes = stream.toArray()
411+
dataOut.writeInt(bytes.length)
412+
dataOut.write(bytes)
413+
}
414+
case (key: String, value: String) =>
415+
newIter.asInstanceOf[Iterator[(String, String)]].foreach {
416+
case (key, value) =>
417+
writeUTF(key, dataOut)
418+
writeUTF(value, dataOut)
419+
}
420+
case (key: Array[Byte], value: Array[Byte]) =>
421+
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
422+
case (key, value) =>
423+
dataOut.writeInt(key.length)
424+
dataOut.write(key)
425+
dataOut.writeInt(value.length)
426+
dataOut.write(value)
414427
}
415428
case other =>
416429
throw new SparkException("Unexpected element type " + first.getClass)

python/pyspark/context.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from pyspark.files import SparkFiles
3030
from pyspark.java_gateway import launch_gateway
3131
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
32-
PairDeserializer, CompressedSerializer, AutoBatchedSerializer
32+
PairDeserializer, CompressedSerializer, AutoBatchedSerializer, NoOpSerializer
3333
from pyspark.storagelevel import StorageLevel
3434
from pyspark.rdd import RDD
3535
from pyspark.traceback_utils import CallSite, first_spark_call
@@ -388,6 +388,36 @@ def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
388388
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
389389
PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode)))
390390

391+
def binaryFiles(self, path, minPartitions=None):
392+
"""
393+
:: Experimental ::
394+
395+
Read a directory of binary files from HDFS, a local file system
396+
(available on all nodes), or any Hadoop-supported file system URI
397+
as a byte array. Each file is read as a single record and returned
398+
in a key-value pair, where the key is the path of each file, the
399+
value is the content of each file.
400+
401+
Note: Small files are preferred, large file is also allowable, but
402+
may cause bad performance.
403+
"""
404+
minPartitions = minPartitions or self.defaultMinPartitions
405+
return RDD(self._jsc.binaryFiles(path, minPartitions), self,
406+
PairDeserializer(UTF8Deserializer(), NoOpSerializer()))
407+
408+
def binaryRecords(self, path, recordLength):
409+
"""
410+
:: Experimental ::
411+
412+
Load data from a flat binary file, assuming each record is a set of numbers
413+
with the specified numerical format (see ByteBuffer), and the number of
414+
bytes per record is constant.
415+
416+
:param path: Directory to the input data files
417+
:param recordLength: The length at which to split the records
418+
"""
419+
return RDD(self._jsc.binaryRecords(path, recordLength), self, NoOpSerializer())
420+
391421
def _dictToJavaMap(self, d):
392422
jm = self._jvm.java.util.HashMap()
393423
if not d:

python/pyspark/tests.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,6 +1110,25 @@ def test_converters(self):
11101110
(u'\x03', [2.0])]
11111111
self.assertEqual(maps, em)
11121112

1113+
def test_binary_files(self):
1114+
path = os.path.join(self.tempdir.name, "binaryfiles")
1115+
os.mkdir(path)
1116+
data = "short binary data"
1117+
with open(os.path.join(path, "part-0000"), 'w') as f:
1118+
f.write(data)
1119+
[(p, d)] = self.sc.binaryFiles(path).collect()
1120+
self.assertTrue(p.endswith("part-0000"))
1121+
self.assertEqual(d, data)
1122+
1123+
def test_binary_records(self):
1124+
path = os.path.join(self.tempdir.name, "binaryrecords")
1125+
os.mkdir(path)
1126+
with open(os.path.join(path, "part-0000"), 'w') as f:
1127+
for i in range(100):
1128+
f.write('%04d' % i)
1129+
result = self.sc.binaryRecords(path, 4).map(int).collect()
1130+
self.assertEqual(range(100), result)
1131+
11131132

11141133
class OutputFormatTests(ReusedPySparkTestCase):
11151134

0 commit comments

Comments
 (0)