From c84958e9785c67d83922b17f62659444f3003156 Mon Sep 17 00:00:00 2001 From: rotems Date: Sat, 18 Jun 2016 10:52:08 +0300 Subject: [PATCH 1/2] [SPARK-12197] [SparkCore] Kryo & Avro - Support Schema Repo --- .../spark/serializer/KryoSerializer.scala | 16 ++- .../{ => avro}/GenericAvroSerializer.scala | 106 ++++++++++----- .../spark/serializer/avro/SchemaRepo.scala | 86 ++++++++++++ .../GenericAvroSerializerSuite.scala | 123 ++++++++++++++++-- docs/configuration.md | 10 ++ 5 files changed, 295 insertions(+), 46 deletions(-) rename core/src/main/scala/org/apache/spark/serializer/{ => avro}/GenericAvroSerializer.scala (70%) create mode 100644 core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index bc9fd50c2cd2b..af12a38b71070 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -36,6 +36,7 @@ import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} +import org.apache.spark.serializer.avro.{GenericAvroSerializer, EmptySchemaRepo, SchemaRepo} import org.apache.spark.storage._ import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils} import org.apache.spark.util.collection.CompactBuffer @@ -108,8 +109,9 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer()) kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer()) - kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas)) - kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas)) + val schemaRepo = SchemaRepo(conf).getOrElse(EmptySchemaRepo) + kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas, schemaRepo)) + kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas, schemaRepo)) try { // scalastyle:off classforname @@ -184,8 +186,8 @@ class KryoSerializer(conf: SparkConf) private[spark] class KryoSerializationStream( - serInstance: KryoSerializerInstance, - outStream: OutputStream) extends SerializationStream { + serInstance: KryoSerializerInstance, + outStream: OutputStream) extends SerializationStream { private[this] var output: KryoOutput = new KryoOutput(outStream) private[this] var kryo: Kryo = serInstance.borrowKryo() @@ -217,8 +219,8 @@ class KryoSerializationStream( private[spark] class KryoDeserializationStream( - serInstance: KryoSerializerInstance, - inStream: InputStream) extends DeserializationStream { + serInstance: KryoSerializerInstance, + inStream: InputStream) extends DeserializationStream { private[this] var input: KryoInput = new KryoInput(inStream) private[this] var kryo: Kryo = serInstance.borrowKryo() @@ -450,7 +452,7 @@ private class JavaIterableWrapperSerializer } override def read(kryo: Kryo, in: KryoInput, clz: Class[java.lang.Iterable[_]]) - : java.lang.Iterable[_] = { + : java.lang.Iterable[_] = { kryo.readClassAndObject(in) match { case scalaIterable: Iterable[_] => scalaIterable.asJava case javaIterable: java.lang.Iterable[_] => javaIterable diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/avro/GenericAvroSerializer.scala similarity index 70% rename from core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala rename to core/src/main/scala/org/apache/spark/serializer/avro/GenericAvroSerializer.scala index 3d5b7105f0ca8..0ae97307049d7 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/avro/GenericAvroSerializer.scala @@ -15,22 +15,22 @@ * limitations under the License. */ -package org.apache.spark.serializer +package org.apache.spark.serializer.avro import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer -import scala.collection.mutable - -import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} -import org.apache.avro.{Schema, SchemaNormalization} +import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} +import org.apache.avro.Schema.Parser import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} import org.apache.commons.io.IOUtils - -import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.io.CompressionCodec +import org.apache.spark.{SparkEnv, SparkException} + +import scala.collection.mutable /** * Custom serializer used for generic Avro records. If the user registers the schemas @@ -42,7 +42,8 @@ import org.apache.spark.io.CompressionCodec * string representation of the Avro schema, used to decrease the amount of data * that needs to be serialized. */ -private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String], + schemaRepo: SchemaRepo = EmptySchemaRepo) extends KSerializer[GenericRecord] { /** Used to reduce the amount of effort to compress the schema */ @@ -96,6 +97,40 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) def serializeDatum[R <: GenericRecord](datum: R, output: KryoOutput): Unit = { val encoder = EncoderFactory.get.binaryEncoder(output, null) val schema = datum.getSchema + + serializeSchema(datum, schema, output) + + writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema)) + .asInstanceOf[DatumWriter[R]] + .write(datum, encoder) + encoder.flush() + } + + /** + * Deserializes generic records into their in-memory form. There is internal + * state to keep a cache of already seen schemas and datum readers. + */ + def deserializeDatum(input: KryoInput): GenericRecord = { + val schema: Schema = deserializeSchema(input) + + val decoder = DecoderFactory.get.directBinaryDecoder(input, null) + readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema)) + .asInstanceOf[DatumReader[GenericRecord]] + .read(null, decoder) + } + + + /** + * Serialize schema + * Step 1: Calculate the schema's finger print using Avro's SchemaNormilization mechanism. + * Step 2: Use fingerprint to look for the schema in the pre-registered schemas, if found serialize the fingerprint, else step 3 + * Step 3: Use SchemaRepo to find the schemaId of record, if found serialize the schemaId as fingerprint, else step 4 + * Step 4: Serialize the entire schema with indicator of this behavior. + * @param datum - datum to extract id from + * @param schema - schema to serialize + * @param output - kryo output + */ + private def serializeSchema[R <: GenericRecord](datum: R, schema: Schema, output: KryoOutput) = { val fingerprint = fingerprintCache.getOrElseUpdate(schema, { SchemaNormalization.parsingFingerprint64(schema) }) @@ -104,34 +139,48 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) output.writeBoolean(true) output.writeLong(fingerprint) case None => - output.writeBoolean(false) - val compressedSchema = compress(schema) - output.writeInt(compressedSchema.length) - output.writeBytes(compressedSchema) + schemaRepo.extractSchemaId(datum) match { + case Some(schemaId) if schemaRepo.contains(schemaId) => + output.writeBoolean(true) + output.writeLong(schemaId) + case _ => + output.writeBoolean(false) + val compressedSchema = compress(schema) + output.writeInt(compressedSchema.length) + output.writeBytes(compressedSchema) + } } - - writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema)) - .asInstanceOf[DatumWriter[R]] - .write(datum, encoder) - encoder.flush() } + /** - * Deserializes generic records into their in-memory form. There is internal - * state to keep a cache of already seen schemas and datum readers. + * Deserialize schema + * If the indicator boolean of finger using is false: + * 1: Deserialize the schema itself from bytes. + * If the indicator boolean of fingerprint using is true: + * Step 1: Search for the schema in the explicitly registered schemas using the fingerprint. If schema was not found, move to step 2. + * Step 2: Search in the schema repository using the fingerprint. At that point if the schema was not found - throw exception. + * @param input KryoInput + * @return the deserialized schema */ - def deserializeDatum(input: KryoInput): GenericRecord = { + private def deserializeSchema(input: KryoInput): Schema = { val schema = { if (input.readBoolean()) { val fingerprint = input.readLong() schemaCache.getOrElseUpdate(fingerprint, { schemas.get(fingerprint) match { - case Some(s) => new Schema.Parser().parse(s) + case Some(s) => new Parser().parse(s) case None => - throw new SparkException( - "Error reading attempting to read avro data -- encountered an unknown " + - s"fingerprint: $fingerprint, not sure what schema to use. This could happen " + - "if you registered additional schemas after starting your spark context.") + schemaRepo.getSchema(fingerprint) match { + case Some(res_schema) => res_schema + case None => + throw new SparkException( + s"""Error reading attempting to read avro data -- + |encountered an unknown fingerprint: $fingerprint, not sure what schema to use. + |This could happen if you registered additional schemas after starting your + |spark context.""".stripMargin) + } + } }) } else { @@ -139,10 +188,7 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) decompress(ByteBuffer.wrap(input.readBytes(length))) } } - val decoder = DecoderFactory.get.directBinaryDecoder(input, null) - readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema)) - .asInstanceOf[DatumReader[GenericRecord]] - .read(null, decoder) + schema } override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit = @@ -150,4 +196,4 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord = deserializeDatum(input) -} +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala b/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala new file mode 100644 index 0000000000000..ba9bd06597651 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala @@ -0,0 +1,86 @@ +package org.apache.spark.serializer.avro + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.spark.{Logging, SparkConf} + +/** + * Created by rotems on 12/6/15. + */ +/** + * A schema repository for avro records. + * This repo assumes that it is possible to extract the schemaId of a record from the record itself. + * @param config sparkConf for configuration purposes + */ +abstract class SchemaRepo(config: SparkConf) { + + /** + * Receive from repo an avro schema as string by its ID + * @param schemaId - the schemaId + * @return schema if found, none otherwise + */ + def getRawSchema(schemaId : Long) : Option[String] + + /** + * Extract schemaId from record. + * @param record current avro record + * @return schemaId if managed to extract, none otherwise + */ + def extractSchemaId(record: GenericRecord) : Option[Long] + + /** + * Checks whether the schema repository contains the following schemaId + * @param schemaId - the schemaId + * @return true if found in repo, false otherwise. + */ + def contains(schemaId: Long) : Boolean + + /** + * Get schema from repo using schemaId as Schema type + * @param schemaId - the schemaId + * @return schema if found, none otherwise + */ + def getSchema(schemaId : Long) : Option[Schema] = { + getRawSchema(schemaId) match { + case Some(s) => Some(new Schema.Parser().parse(s)) + case None => None + + } + } + +} + +object SchemaRepo extends Logging { + val SCHEMA_REPO_KEY = "spark.kryo.avro.schema.repo" + + /** + * Create a schemaRepo using SparkConf + * @param conf - spark conf used to configure the repo. + * @return the initiated SchemaRepo or None if anything goes wrong + */ + def apply(conf: SparkConf) : Option[SchemaRepo]= { + try { + conf.getOption(SCHEMA_REPO_KEY) match { + case Some(clazz) => Some(Class.forName(clazz).getConstructor(classOf[SparkConf]) + .newInstance(conf).asInstanceOf[SchemaRepo]) + case None => None + } + } catch { + case t: Throwable => + log.error(s"Failed to build schema repo. ", t) + None + } + } +} + +/** + * A dummy empty schema repository. + */ +object EmptySchemaRepo extends SchemaRepo(null) { + + override def getRawSchema(schemaId: Long): Option[String] = None + + override def extractSchemaId(record: GenericRecord): Option[Long] = None + + override def contains(schemaId: Long): Boolean = false +} \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index 3734f1cb408fe..684ac99bc9384 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -17,24 +17,21 @@ package org.apache.spark.serializer +import org.apache.spark.serializer.GenericAvroSerializerSuite._ import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer -import com.esotericsoftware.kryo.io.{Input, Output} -import org.apache.avro.{Schema, SchemaBuilder} +import com.esotericsoftware.kryo.io.{Output, Input} +import org.apache.avro.generic.GenericRecord +import org.apache.avro.{SchemaBuilder, Schema} import org.apache.avro.generic.GenericData.Record +import org.apache.spark.serializer.avro.{GenericAvroSerializer, SchemaRepo} -import org.apache.spark.{SharedSparkContext, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkFunSuite, SharedSparkContext} class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - val schema : Schema = SchemaBuilder - .record("testRecord").fields() - .requiredString("data") - .endRecord() - val record = new Record(schema) - record.put("data", "test data") test("schema compression and decompression") { val genericSer = new GenericAvroSerializer(conf.getAvroSchema) @@ -81,4 +78,112 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { assert(compressedSchema.eq(genericSer.compress(schema))) assert(decompressedSchema.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema)))) } + + test("found in schema repository") { + val schemaRepo = new TestSchemaRepo(conf) + val genericSerFingerWithRepo = new GenericAvroSerializer(conf.getAvroSchema, schemaRepo) + + val outputStream = new ByteArrayOutputStream() + val output = new Output(outputStream) + genericSerFingerWithRepo.serializeDatum(record, output) + + output.flush() + output.close() + + val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) + assert(genericSerFingerWithRepo.deserializeDatum(input) === record) + + } + + test("extracted schemaId which is missing from schema repository") { + val schemaRepo = new TestSchemaRepo(conf) + val genericSerFingerWithRepo = new GenericAvroSerializer(conf.getAvroSchema, schemaRepo) + + val outputStream = new ByteArrayOutputStream() + val output = new Output(outputStream) + genericSerFingerWithRepo.serializeDatum(record2, output) + + output.flush() + output.close() + + val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) + assert(genericSerFingerWithRepo.deserializeDatum(input) === record2) + } + + test("no schemaId extracted from record") { + val schemaRepo = new TestSchemaRepo(conf) + val genericSerFingerWithRepo = new GenericAvroSerializer(conf.getAvroSchema, schemaRepo) + + val outputStream = new ByteArrayOutputStream() + val output = new Output(outputStream) + genericSerFingerWithRepo.serializeDatum(record3, output) + + output.flush() + output.close() + + val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) + assert(genericSerFingerWithRepo.deserializeDatum(input) === record3) + } + + test("registered schemas takes precedence over schema repository") { + conf.registerAvroSchemas(schema) + val schemaRepo = new TestSchemaRepo(conf) + val genericSerFingerWithRepo = new GenericAvroSerializer(conf.getAvroSchema, schemaRepo) + + val outputStream = new ByteArrayOutputStream() + val output = new Output(outputStream) + genericSerFingerWithRepo.serializeDatum(record2, output) + + output.flush() + output.close() + + val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) + assert(genericSerFingerWithRepo.deserializeDatum(input) === record2) + } + + class TestSchemaRepo(conf: SparkConf) extends SchemaRepo(conf) { + val repo = Map[Long,String](1L -> schema.toString) + /** + * Receive from repo an avro schema as string by its ID + * @param schemaId - the schemaId + * @return schema if found, none otherwise + */ + override def getRawSchema(schemaId: Long): Option[String] = { + repo.get(schemaId) + } + + /** + * Extract schemaId from record. + * @param r current avro record + * @return schemaId if managed to extract, none otherwise + */ + override def extractSchemaId(r: GenericRecord): Option[Long] = { + if(r equals record) Some(1L) + else if(r equals record2) Some(2L) + else None + } + + /** + * Checks whether the schema repository contains the following schemaId + * @param schemaId - the schemaId + * @return true if found in repo, false otherwise. + */ + override def contains(schemaId: Long): Boolean = repo.contains(schemaId) + } } + +object GenericAvroSerializerSuite { + val schema : Schema = SchemaBuilder + .record("testRecord").fields() + .requiredString("data") + .endRecord() + val record = new Record(schema) + record.put("data", "test data") + + val record2 = new Record(schema) + record2.put("data", "test data2") + + val record3 = new Record(schema) + record3.put("data", "test data3") + +} \ No newline at end of file diff --git a/docs/configuration.md b/docs/configuration.md index 7d743d572b582..ca2595557b382 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -665,6 +665,16 @@ Apart from these, the following properties are also available, and may be useful See the tuning guide for more details. + +spark.kryo.avro.schema.repo + (none) + + If you use Kryo serialization and Avro's GenericRecords, give a schema repository class implementation to improve shuffle performance. This + property is useful if you need to serialize GenericRecord and/or GenericData objects and don't know their schema ahead of time. + Otherwise SparkConf.registerAvroSchemas is simpler. It should be + set to classes that extend org.apache.spark.serializer.avro.SchemaRepo + + spark.kryoserializer.buffer.max 64m From b9e6e19ee07af7e48a6e57e438ee964fac84065b Mon Sep 17 00:00:00 2001 From: rotems Date: Sat, 23 Jul 2016 10:56:51 +0300 Subject: [PATCH 2/2] Changing import --- .../scala/org/apache/spark/serializer/avro/SchemaRepo.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala b/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala index ba9bd06597651..5b2336ad1c7d9 100644 --- a/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala +++ b/core/src/main/scala/org/apache/spark/serializer/avro/SchemaRepo.scala @@ -2,7 +2,8 @@ package org.apache.spark.serializer.avro import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging /** * Created by rotems on 12/6/15.