Skip to content

Commit c837114

Browse files
rsotn-maprmgorbov
authored andcommitted
Spark OJAI JAVA: load to RDD, save from RDD implementation (apache#195)
* MapR [SPARK-124] Loading to JavaRDD implemented * MapR [SPARK-124] MapRDBJavaSparkContext constructor changed * MapR [SPARK-124] implemented RDD[Row] saving
1 parent 2a8a6c1 commit c837114

File tree

8 files changed

+147
-238
lines changed

8 files changed

+147
-238
lines changed

external/maprdb/src/main/scala/com/mapr/db/spark/api/java/MapRDBJavaContext.java

Lines changed: 0 additions & 43 deletions
This file was deleted.
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package com.mapr.db.spark.api.java
2+
3+
import com.mapr.db.spark.RDD.api.java.MapRDBJavaRDD
4+
import com.mapr.db.spark.RDD.{PairedDocumentRDDFunctions, RDDTYPE}
5+
import com.mapr.db.spark.impl.OJAIDocument
6+
import com.mapr.db.spark.utils.{MapRDBUtils, MapRSpark}
7+
import com.mapr.db.spark.writers.{OJAIKey, OJAIValue}
8+
import org.apache.hadoop.conf.Configuration
9+
import org.apache.spark.SparkContext
10+
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
11+
import org.apache.spark.rdd.RDD
12+
import org.apache.spark.sql.Row
13+
import org.ojai.DocumentConstants
14+
15+
class MapRDBJavaSparkContext(val sparkContext: SparkContext) {
16+
17+
def this(javaSparkContext: JavaSparkContext) = this(JavaSparkContext.toSparkContext(javaSparkContext))
18+
19+
def loadFromMapRDB(tableName: String): MapRDBJavaRDD[OJAIDocument] = {
20+
val rdd = MapRSpark.builder()
21+
.sparkContext(sparkContext)
22+
.configuration(new Configuration)
23+
.setTable(tableName)
24+
.build()
25+
.toJavaRDD(classOf[OJAIDocument])
26+
27+
MapRDBJavaRDD(rdd)
28+
}
29+
30+
def loadFromMapRDB[D <: java.lang.Object](tableName: String, clazz: Class[D]): MapRDBJavaRDD[D] = {
31+
import scala.reflect._
32+
implicit val ct: ClassTag[D] = ClassTag(clazz)
33+
implicit val rddType: RDDTYPE[D] = RDDTYPE.overrideJavaDefaultType[D]
34+
val rdd = MapRSpark.builder()
35+
.sparkContext(sparkContext)
36+
.configuration(new Configuration)
37+
.setTable(tableName)
38+
.build()
39+
.toJavaRDD(clazz)
40+
41+
MapRDBJavaRDD(rdd)
42+
}
43+
44+
import com.mapr.db.spark._
45+
46+
def saveToMapRDB[D](javaRDD: JavaRDD[D],
47+
tableName: String,
48+
createTable: Boolean,
49+
bulkInsert: Boolean,
50+
idField: String): Unit = {
51+
52+
require(javaRDD != null, "RDD can not be null")
53+
54+
55+
javaRDD.rdd.asInstanceOf[RDD[OJAIValue[D]]].saveToMapRDB(tableName, createTable, bulkInsert, idField)
56+
}
57+
58+
def saveToMapRDB[D](javaRDD: JavaRDD[D], tableName: String, createTable: Boolean, bulkInsert: Boolean): Unit = {
59+
this.saveToMapRDB(javaRDD, tableName, createTable, bulkInsert, DocumentConstants.ID_KEY)
60+
}
61+
62+
def saveToMapRDB[D](javaRDD: JavaRDD[D], tableName: String, createTable: Boolean): Unit = {
63+
this.saveToMapRDB(javaRDD, tableName, createTable, bulkInsert = false, DocumentConstants.ID_KEY)
64+
}
65+
66+
def saveToMapRDB[D](javaRDD: JavaRDD[D], tableName: String): Unit = {
67+
this.saveToMapRDB(javaRDD, tableName, createTable = false, bulkInsert = false, DocumentConstants.ID_KEY)
68+
}
69+
70+
def saveRowRDDToMapRDB(javaRDD: JavaRDD[Row],
71+
tableName: String,
72+
createTable: Boolean,
73+
bulkInsert: Boolean,
74+
idField: String): Unit = {
75+
76+
require(javaRDD != null, "RDD can not be null")
77+
javaRDD.rdd.asInstanceOf[RDD[Row]].saveToMapRDB(tableName, createTable, bulkInsert, idField)
78+
}
79+
80+
81+
def saveRowRDDToMapRDB(javaRDD: JavaRDD[Row], tableName: String, createTable: Boolean, bulkInsert: Boolean): Unit = {
82+
this.saveRowRDDToMapRDB(javaRDD, tableName, createTable, bulkInsert, DocumentConstants.ID_KEY)
83+
}
84+
85+
def saveRowRDDToMapRDB(javaRDD: JavaRDD[Row], tableName: String, createTable: Boolean): Unit = {
86+
this.saveRowRDDToMapRDB(javaRDD, tableName, createTable, bulkInsert = false, DocumentConstants.ID_KEY)
87+
}
88+
89+
def saveRowRDDToMapRDB(javaRDD: JavaRDD[Row], tableName: String): Unit = {
90+
this.saveRowRDDToMapRDB(javaRDD, tableName, createTable = false, bulkInsert = false, DocumentConstants.ID_KEY)
91+
}
92+
93+
def saveToMapRDB[K, V <: AnyRef](javaPairRDD: JavaPairRDD[K, V],
94+
keyClazz: Class[K],
95+
valueClazz: Class[V],
96+
tableName: String,
97+
createTable: Boolean,
98+
bulkInsert: Boolean): Unit = {
99+
100+
require(javaPairRDD != null, "RDD can not be null")
101+
require(keyClazz != null, "Key class can not be null")
102+
require(valueClazz != null, "Value class can not be null")
103+
104+
import scala.reflect._
105+
implicit val vct: ClassTag[V] = ClassTag(valueClazz)
106+
implicit val v: OJAIValue[V] = OJAIValue.overrideDefault[V]
107+
108+
implicit val ct: ClassTag[K] = ClassTag(keyClazz)
109+
implicit val f: OJAIKey[K] = MapRDBUtils.getOjaiKey[K]()
110+
111+
PairedDocumentRDDFunctions(javaPairRDD.rdd).saveToMapRDB(tableName, createTable, bulkInsert)
112+
}
113+
114+
def saveToMapRDB[K, V <: AnyRef](javaPairRDD: JavaPairRDD[K, V],
115+
keyClazz: Class[K],
116+
valueClazz: Class[V],
117+
tableName: String,
118+
createTable: Boolean): Unit = {
119+
120+
this.saveToMapRDB(javaPairRDD, keyClazz, valueClazz, tableName, createTable, bulkInsert = false)
121+
}
122+
123+
def saveToMapRDB[K, V <: AnyRef](javaPairRDD: JavaPairRDD[K, V],
124+
keyClazz: Class[K],
125+
valueClazz: Class[V],
126+
tableName: String): Unit = {
127+
128+
this.saveToMapRDB(javaPairRDD, keyClazz, valueClazz, tableName, createTable = false, bulkInsert = false)
129+
}
130+
131+
}

external/maprdb/src/main/scala/com/mapr/db/spark/api/java/PairedRDDBeanJavaFunctions.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

external/maprdb/src/main/scala/com/mapr/db/spark/api/java/PairedRDDJavaFunctions.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

external/maprdb/src/main/scala/com/mapr/db/spark/api/java/RDDBeanJavaFunctions.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

external/maprdb/src/main/scala/com/mapr/db/spark/api/java/RDDJavaFunctions.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

external/maprdb/src/main/scala/com/mapr/db/spark/api/java/SparkContextJavaFunctions.scala

Lines changed: 0 additions & 46 deletions
This file was deleted.

0 commit comments

Comments
 (0)