diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index cedb239541c7..0a7b929dafea 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -273,10 +273,7 @@ private void initDataReader( this.dataColumn = new VectorizedRleValuesReader(); this.isCurrentPageDictionaryEncoded = true; } else { - if (dataEncoding != Encoding.PLAIN) { - throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding); - } - this.dataColumn = new VectorizedPlainValuesReader(); + this.dataColumn = getValuesReader(dataEncoding); this.isCurrentPageDictionaryEncoded = false; } @@ -287,6 +284,20 @@ private void initDataReader( } } + private ValuesReader getValuesReader(Encoding encoding) { + switch (encoding) { + case PLAIN: + return new VectorizedPlainValuesReader(); + case DELTA_BYTE_ARRAY: + return new VectorizedDeltaByteArrayReader(); + case DELTA_BINARY_PACKED: + return new VectorizedDeltaBinaryPackedReader(); + default: + throw new UnsupportedOperationException("Unsupported encoding: " + encoding); + } + } + + private int readPageV1(DataPageV1 page) throws IOException { if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) { throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding()); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java new file mode 100644 index 000000000000..62fb5f8c96bb --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet; + +import java.io.IOException; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.values.bitpacking.BytePackerForLong; +import org.apache.parquet.column.values.bitpacking.Packer; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.spark.sql.catalyst.util.RebaseDateTime; +import org.apache.spark.sql.execution.datasources.DataSourceUtils; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; + +/** + * An implementation of the Parquet DELTA_BINARY_PACKED decoder that supports the vectorized + * interface. DELTA_BINARY_PACKED is a delta encoding for integer and long types that stores values + * as a delta between consecutive values. Delta values are themselves bit packed. Similar to RLE but + * is more effective in the case of large variation of values in the encoded column. + *
+ * DELTA_BINARY_PACKED is the default encoding for integer and long columns in Parquet V2. + *
+ * Supported Types: INT32, INT64 + *
+ * + * @see + * Parquet format encodings: DELTA_BINARY_PACKED + */ +public class VectorizedDeltaBinaryPackedReader extends VectorizedReaderBase { + + // header data + private int blockSizeInValues; + private int miniBlockNumInABlock; + private int totalValueCount; + private long firstValue; + + private int miniBlockSizeInValues; + + // values read by the caller + private int valuesRead = 0; + + // variables to keep state of the current block and miniblock + private long lastValueRead; // needed to compute the next value + private long minDeltaInCurrentBlock; // needed to compute the next value + // currentMiniBlock keeps track of the mini block within the current block that + // we read and decoded most recently. Only used as an index into + // bitWidths array + private int currentMiniBlock = 0; + private int[] bitWidths; // bit widths for each miniBlock in the current block + private int remainingInBlock = 0; // values in current block still to be read + private int remainingInMiniBlock = 0; // values in current mini block still to be read + private long[] unpackedValuesBuffer; + + private ByteBufferInputStream in; + + // temporary buffers used by readByte, readShort, readInteger, and readLong + byte byteVal; + short shortVal; + int intVal; + long longVal; + + @Override + public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { + Preconditions.checkArgument(valueCount >= 1, + "Page must have at least one value, but it has " + valueCount); + this.in = in; + // Read the header + this.blockSizeInValues = BytesUtils.readUnsignedVarInt(in); + this.miniBlockNumInABlock = BytesUtils.readUnsignedVarInt(in); + double miniSize = (double) blockSizeInValues / miniBlockNumInABlock; + Preconditions.checkArgument(miniSize % 8 == 0, + "miniBlockSize must be multiple of 8, but it's " + miniSize); + this.miniBlockSizeInValues = (int) miniSize; + this.totalValueCount = BytesUtils.readUnsignedVarInt(in); + this.bitWidths = new int[miniBlockNumInABlock]; + this.unpackedValuesBuffer = new long[miniBlockSizeInValues]; + // read the first value + firstValue = BytesUtils.readZigZagVarLong(in); + } + + @Override + public byte readByte() { + readValues(1, null, 0, (w, r, v) -> byteVal = (byte) v); + return byteVal; + } + + @Override + public short readShort() { + readValues(1, null, 0, (w, r, v) -> shortVal = (short) v); + return shortVal; + } + + @Override + public int readInteger() { + readValues(1, null, 0, (w, r, v) -> intVal = (int) v); + return intVal; + } + + @Override + public long readLong() { + readValues(1, null, 0, (w, r, v) -> longVal = v); + return longVal; + } + + @Override + public void readBytes(int total, WritableColumnVector c, int rowId) { + readValues(total, c, rowId, (w, r, v) -> w.putByte(r, (byte) v)); + } + + @Override + public void readShorts(int total, WritableColumnVector c, int rowId) { + readValues(total, c, rowId, (w, r, v) -> w.putShort(r, (short) v)); + } + + @Override + public void readIntegers(int total, WritableColumnVector c, int rowId) { + readValues(total, c, rowId, (w, r, v) -> w.putInt(r, (int) v)); + } + + // Based on VectorizedPlainValuesReader.readIntegersWithRebase + @Override + public final void readIntegersWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { + readValues(total, c, rowId, (w, r, v) -> { + if (v < RebaseDateTime.lastSwitchJulianDay()) { + if (failIfRebase) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + w.putInt(r, RebaseDateTime.rebaseJulianToGregorianDays((int) v)); + } + } else { + w.putInt(r, (int) v); + } + }); + } + + @Override + public void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) { + readValues(total, c, rowId, (w, r, v) -> { + w.putLong(r, Integer.toUnsignedLong((int) v)); + }); + } + + @Override + public void readUnsignedLongs(int total, WritableColumnVector c, int rowId) { + readValues(total, c, rowId, (w, r, v) -> { + w.putByteArray(r, new BigInteger(Long.toUnsignedString(v)).toByteArray()); + }); + } + + @Override + public void readLongs(int total, WritableColumnVector c, int rowId) { + readValues(total, c, rowId, WritableColumnVector::putLong); + } + + @Override + public final void readLongsWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase, String timeZone) { + readValues(total, c, rowId, (w, r, v) -> { + if (v < RebaseDateTime.lastSwitchJulianTs()) { + if (failIfRebase) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + w.putLong(r, RebaseDateTime.rebaseJulianToGregorianMicros(timeZone, v)); + } + } else { + w.putLong(r, v); + } + }); + } + + @Override + public void skipBytes(int total) { + skipValues(total); + } + + @Override + public void skipShorts(int total) { + skipValues(total); + } + + @Override + public void skipIntegers(int total) { + skipValues(total); + } + + @Override + public void skipLongs(int total) { + skipValues(total); + } + + private void readValues(int total, WritableColumnVector c, int rowId, + IntegerOutputWriter outputWriter) { + if (valuesRead + total > totalValueCount) { + throw new ParquetDecodingException( + "No more values to read. Total values read: " + valuesRead + ", total count: " + + totalValueCount + ", trying to read " + total + " more."); + } + int remaining = total; + // First value + if (valuesRead == 0) { + outputWriter.write(c, rowId, firstValue); + lastValueRead = firstValue; + rowId++; + remaining--; + } + while (remaining > 0) { + int n; + try { + n = loadMiniBlockToOutput(remaining, c, rowId, outputWriter); + } catch (IOException e) { + throw new ParquetDecodingException("Error reading mini block.", e); + } + rowId += n; + remaining -= n; + } + valuesRead = total - remaining; + } + + + /** + * Read from a mini block. Read at most 'remaining' values into output. + * + * @return the number of values read into output + */ + private int loadMiniBlockToOutput(int remaining, WritableColumnVector c, int rowId, + IntegerOutputWriter outputWriter) throws IOException { + + // new block; read the block header + if (remainingInBlock == 0) { + readBlockHeader(); + } + + // new miniblock, unpack the miniblock + if (remainingInMiniBlock == 0) { + unpackMiniBlock(); + } + + // read values from miniblock + int valuesRead = 0; + for (int i = miniBlockSizeInValues - remainingInMiniBlock; + i < miniBlockSizeInValues && valuesRead < remaining; i++) { + // calculate values from deltas unpacked for current block + long outValue = lastValueRead + minDeltaInCurrentBlock + unpackedValuesBuffer[i]; + lastValueRead = outValue; + outputWriter.write(c, rowId + valuesRead, outValue); + remainingInBlock--; + remainingInMiniBlock--; + valuesRead++; + } + + return valuesRead; + } + + private void readBlockHeader() { + try { + minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(in); + } catch (IOException e) { + throw new ParquetDecodingException("Can not read min delta in current block", e); + } + readBitWidthsForMiniBlocks(); + remainingInBlock = blockSizeInValues; + currentMiniBlock = 0; + remainingInMiniBlock = 0; + } + + /** + * mini block has a size of 8*n, unpack 32 value each time + * + * see org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader#unpackMiniBlock + */ + private void unpackMiniBlock() throws IOException { + Arrays.fill(this.unpackedValuesBuffer, 0); + BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong( + bitWidths[currentMiniBlock]); + for (int j = 0; j < miniBlockSizeInValues; j += 8) { + ByteBuffer buffer = in.slice(packer.getBitWidth()); + packer.unpack8Values(buffer, buffer.position(), unpackedValuesBuffer, j); + } + remainingInMiniBlock = miniBlockSizeInValues; + currentMiniBlock++; + } + + // From org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader + private void readBitWidthsForMiniBlocks() { + for (int i = 0; i < miniBlockNumInABlock; i++) { + try { + bitWidths[i] = BytesUtils.readIntLittleEndianOnOneByte(in); + } catch (IOException e) { + throw new ParquetDecodingException("Can not decode bitwidth in block header", e); + } + } + } + + private void skipValues(int total) { + // Read the values but don't write them out (the writer output method is a no-op) + readValues(total, null, -1, (w, r, v) -> {}); + } + +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java new file mode 100644 index 000000000000..72b760d426ea --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet; + +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; +import org.apache.parquet.io.api.Binary; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized interface. + */ +public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase { + private final DeltaByteArrayReader deltaByteArrayReader = new DeltaByteArrayReader(); + + @Override + public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { + deltaByteArrayReader.initFromPage(valueCount, in); + } + + @Override + public Binary readBinary(int len) { + return deltaByteArrayReader.readBytes(); + } + + @Override + public void readBinary(int total, WritableColumnVector c, int rowId) { + for (int i = 0; i < total; i++) { + Binary binary = deltaByteArrayReader.readBytes(); + ByteBuffer buffer = binary.toByteBuffer(); + if (buffer.hasArray()) { + c.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(), + binary.length()); + } else { + byte[] bytes = new byte[binary.length()]; + buffer.get(bytes); + c.putByteArray(rowId + i, bytes); + } + } + } + + @Override + public void skipBinary(int total) { + for (int i = 0; i < total; i++) { + deltaByteArrayReader.skip(); + } + } + +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java new file mode 100644 index 000000000000..b6715f1e7a07 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet; + +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.api.Binary; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; + +/** + * Base class for implementations of VectorizedValuesReader. Mainly to avoid duplication + * of methods that are not supported by concrete implementations + */ +public class VectorizedReaderBase extends ValuesReader implements VectorizedValuesReader { + + @Override + public void skip() { + throw new UnsupportedOperationException(); + } + + @Override + public byte readByte() { + throw new UnsupportedOperationException(); + } + + @Override + public short readShort() { + throw new UnsupportedOperationException(); + } + + @Override + public Binary readBinary(int len) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBooleans(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBytes(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readShorts(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readIntegers(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, + boolean failIfRebase) { + throw new UnsupportedOperationException(); + } + + @Override + public void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readUnsignedLongs(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readLongs(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readLongsWithRebase(int total, WritableColumnVector c, int rowId, + boolean failIfRebase, String timeZone) { + throw new UnsupportedOperationException(); + } + + @Override + public void readFloats(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readDoubles(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBinary(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void skipBooleans(int total) { + throw new UnsupportedOperationException(); + } + + @Override + public void skipBytes(int total) { + throw new UnsupportedOperationException(); + } + + @Override + public void skipShorts(int total) { + throw new UnsupportedOperationException(); + } + + @Override + public void skipIntegers(int total) { + throw new UnsupportedOperationException(); + } + + @Override + public void skipLongs(int total) { + throw new UnsupportedOperationException(); + } + + @Override + public void skipFloats(int total) { + throw new UnsupportedOperationException(); + } + + @Override + public void skipDoubles(int total) { + throw new UnsupportedOperationException(); + } + + @Override + public void skipBinary(int total) { + throw new UnsupportedOperationException(); + } + + @Override + public void skipFixedLenByteArray(int total, int len) { + throw new UnsupportedOperationException(); + } + +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java index 92161f1f59cc..7ddece068e09 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java @@ -68,4 +68,22 @@ void readLongsWithRebase( void skipDoubles(int total); void skipBinary(int total); void skipFixedLenByteArray(int total, int len); + + /** + * A functional interface to write integer values to columnar output + */ + @FunctionalInterface + interface IntegerOutputWriter { + + /** + * A functional interface that writes a long value to a specified row in an output column + * vector + * + * @param outputColumnVector the vector to write to + * @param rowId the row to write to + * @param val value to write + */ + void write(WritableColumnVector outputColumnVector, int rowId, long val); + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala index 0e9e9a706027..31cee48c1787 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala @@ -21,6 +21,9 @@ import java.io.File import scala.collection.JavaConverters._ import scala.util.Random +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.ParquetOutputFormat + import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} @@ -76,6 +79,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { saveAsCsvTable(testDf, dir.getCanonicalPath + "/csv") saveAsJsonTable(testDf, dir.getCanonicalPath + "/json") saveAsParquetTable(testDf, dir.getCanonicalPath + "/parquet") + saveAsParquetV2Table(testDf, dir.getCanonicalPath + "/parquetV2") saveAsOrcTable(testDf, dir.getCanonicalPath + "/orc") } @@ -94,6 +98,14 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { spark.read.parquet(dir).createOrReplaceTempView("parquetTable") } + private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String): Unit = { + withSQLConf(ParquetOutputFormat.WRITER_VERSION -> + ParquetProperties.WriterVersion.PARQUET_2_0.toString) { + df.mode("overwrite").option("compression", "snappy").parquet(dir) + spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table") + } + } + private def saveAsOrcTable(df: DataFrameWriter[Row], dir: String): Unit = { df.mode("overwrite").option("compression", "snappy").orc(dir) spark.read.orc(dir).createOrReplaceTempView("orcTable") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaEncodingSuite.scala new file mode 100644 index 000000000000..844fa543145f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaEncodingSuite.scala @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.IOException +import java.nio.ByteBuffer +import java.util.Random + +import org.apache.parquet.bytes.{ByteBufferInputStream, DirectByteBufferAllocator} +import org.apache.parquet.column.values.ValuesWriter +import org.apache.parquet.column.values.delta.{DeltaBinaryPackingValuesWriterForInteger, DeltaBinaryPackingValuesWriterForLong} +import org.apache.parquet.io.ParquetDecodingException + +import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, IntegralType, LongType} + +/** + * Read tests for vectorized Delta binary packed reader. + * Translated from + * org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForIntegerTest + * org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLongTest + */ +abstract class ParquetDeltaEncodingSuite[T] extends ParquetCompatibilityTest + with SharedSparkSession { + protected var blockSize = 128 + protected var miniBlockNum = 4 + protected var reader: VectorizedDeltaBinaryPackedReader = _ + protected var writableColumnVector: WritableColumnVector = _ + protected var writer: ValuesWriter = _ + protected var random: Random = _ + + protected def getSparkSqlType: IntegralType + + protected def writeData(data: Array[T]): Unit + + protected def writeData(data: Array[T], length: Int): Unit + + protected def initValuesWriter (): Unit + + protected def allocDataArray(size: Int): Array[T] + + protected def getNextRandom: T + + protected def getTypeMinValue: T + + protected def getTypeMaxValue: T + + protected def readData(total: Int, columnVector : WritableColumnVector, rowId: Int): Unit + + protected def skip(total: Int): Unit + + protected def readDataFromVector(columnVector : WritableColumnVector, rowId: Int): T + + protected def estimatedSize(length: Int) : Double + + protected def setValue(arr: Array[T], index: Int, value: Int): Unit + protected def compareValues(expected: Int, actual: T) : Boolean + + + protected override def beforeEach(): Unit = { + random = new Random(0) + initValuesWriter() + super.beforeAll() + } + + test("read when data is aligned with block") { + val data = allocDataArray(5 * blockSize) + for (i <- 0 until blockSize * 5) { + data(i) = getNextRandom + } + shouldWriteAndRead(data) + } + + test("read when block is not fully written") { + val data = allocDataArray(blockSize - 3) + for (i <- data.indices) { + data(i) = getNextRandom + } + shouldWriteAndRead(data) + } + + test("read when mini block is not fully written") { + val miniBlockSize = blockSize / miniBlockNum + val data = allocDataArray(miniBlockSize - 3) + for (i <- data.indices) { + data(i) = getNextRandom + } + shouldWriteAndRead(data) + } + + test("read with negative deltas") { + val data = allocDataArray(blockSize) + for (i <- data.indices) { + setValue(data, i, 10 - (i * 32 - random.nextInt(6))) + } + shouldWriteAndRead(data) + } + + test("read when deltas are same") { + val data = allocDataArray(2 * blockSize) + for (i <- 0 until blockSize) { + setValue(data, i, i * 32) + } + for (i <- blockSize until 2 * blockSize) { + setValue(data, i, 0) + } + shouldWriteAndRead(data) + } + + test("read when values are same") { + val data = allocDataArray(2 * blockSize) + for (i <- 0 until blockSize) { + setValue(data, i, 3) + } + for (i <- blockSize until 2 * blockSize) { + setValue(data, i, 0) + } + shouldWriteAndRead(data) + } + + test("read when delta is 0 for each block") { + val data = allocDataArray(5 * blockSize + 1) + for (i <- data.indices) { + setValue(data, i, (i - 1) / blockSize) + } + shouldWriteAndRead(data) + } + + test("read when data is not aligned with block") { + val data = allocDataArray(5 * blockSize + 3) + for (i <- data.indices) { + setValue(data, i, random.nextInt(20) - 10) + } + shouldWriteAndRead(data) + } + + test("read max min value") { + val data = allocDataArray(10) + for (i <- data.indices) { + if (i % 2 == 0) data(i) = getTypeMinValue + else data(i) = getTypeMaxValue + } + shouldWriteAndRead(data) + } + + test("throw exception when read more than written") { + val data = allocDataArray(5 * blockSize + 1) + for (i <- data.indices) { + setValue(data, i, i * 32) + } + shouldWriteAndRead(data) + try readData(1, writableColumnVector, data.length) + catch { + case e: ParquetDecodingException => + // No more values to read. Total values read: 641, total count: 641, trying to read 1 more. + assert(e.getMessage.startsWith("No more values to read.")) + } + } + + test("skip()") { + val data = allocDataArray(5 * blockSize + 1) + for (i <- data.indices) { + setValue(data, i, i * 32) + } + writeData(data) + reader = new VectorizedDeltaBinaryPackedReader + reader.initFromPage(100, writer.getBytes.toInputStream) + writableColumnVector = new OnHeapColumnVector(data.length, getSparkSqlType) + for (i <- data.indices) { + if (i % 3 == 0) { + skip(1) + } else { + readData(1, writableColumnVector, i) + assert(compareValues(i * 32, readDataFromVector(writableColumnVector, i))) + } + } + } + + test("SkipN()") { + val data = allocDataArray(5 * blockSize + 1) + for (i <- data.indices) { + setValue(data, i, i * 32) + } + writeData(data) + reader = new VectorizedDeltaBinaryPackedReader + reader.initFromPage(100, writer.getBytes.toInputStream) + writableColumnVector = new OnHeapColumnVector(data.length, getSparkSqlType) + var skipCount = 0 + var i = 0 + while (i < data.length) { + skipCount = (data.length - i) / 2 + readData(1, writableColumnVector, i) + assert(compareValues(i * 32, readDataFromVector(writableColumnVector, i))) + skip(skipCount) + + i += skipCount + 1 + } + } + + test("random data test") { + val maxSize = 1000 + val data = allocDataArray(maxSize) + for (round <- 0 until 100000) { + val size = random.nextInt(maxSize) + for (i <- 0 until size) { + data(i) = getNextRandom + } + shouldReadAndWrite(data, size) + writer.reset() + } + } + + @throws[IOException] + private def shouldWriteAndRead(data: Array[T]): Unit = { + shouldReadAndWrite(data, data.length) + } + + private def shouldReadAndWrite(data: Array[T], length: Int): Unit = { + writeData(data, length) + reader = new VectorizedDeltaBinaryPackedReader + val page = writer.getBytes.toByteArray + + assert(estimatedSize(length) >= page.length) + writableColumnVector = new OnHeapColumnVector(data.length, getSparkSqlType) + reader.initFromPage(100, ByteBufferInputStream.wrap(ByteBuffer.wrap(page))) + readData(length, writableColumnVector, 0) + for (i <- 0 until length) { + assert(data(i) == readDataFromVector(writableColumnVector, i)) + } + } + +} + +class ParquetDeltaEncodingInteger extends ParquetDeltaEncodingSuite[Int] { + + override protected def getSparkSqlType: IntegralType = IntegerType + override protected def writeData(data: Array[Int]): Unit = writeData(data, data.length) + + override protected def writeData(data: Array[Int], length: Int): Unit = { + for (i <- 0 until length) { + writer.writeInteger(data(i)) + } + } + + override protected def initValuesWriter (): Unit = { + writer = new DeltaBinaryPackingValuesWriterForInteger( + blockSize, + miniBlockNum, + 100, + 200, + new DirectByteBufferAllocator()) + } + + override protected def allocDataArray(size: Int): Array[Int] = new Array[Int](size) + + override protected def getNextRandom: Int = random.nextInt + override protected def getTypeMinValue: Int = Int.MinValue + override protected def getTypeMaxValue: Int = Int.MaxValue + + override protected def readData(total: Int, columnVector : WritableColumnVector, rowId: Int): Unit + = reader.readIntegers(total, columnVector, rowId) + + override protected def skip(total: Int): Unit = reader.skipIntegers(total) + + override protected def readDataFromVector(columnVector: WritableColumnVector, rowId: Int): Int = + columnVector.getInt(rowId) + + override protected def estimatedSize(length: Int) : Double = { + val miniBlockSize = blockSize / miniBlockNum + val miniBlockFlushed = Math.ceil((length.toDouble - 1) / miniBlockSize) + val blockFlushed = Math.ceil((length.toDouble - 1) / blockSize) + 4 * 5 /* blockHeader */ + + 4 * miniBlockFlushed * miniBlockSize /* data(aligned to miniBlock) */ + + blockFlushed * miniBlockNum /* bitWidth of mini blocks */ + + (5.0 * blockFlushed) /* min delta for each block */ + } + + override protected def setValue(arr: Array[Int], index: Int, value: Int): Unit = + arr(index) = value + + override protected def compareValues(expected: Int, actual: Int) : Boolean = + expected == actual + +} + +class ParquetDeltaEncodingLong extends ParquetDeltaEncodingSuite[Long] { + + override protected def getSparkSqlType: IntegralType = LongType + override protected def writeData(data: Array[Long]): Unit = writeData(data, data.length) + + override protected def writeData(data: Array[Long], length: Int): Unit = { + for (i <- 0 until length) { + writer.writeLong(data(i)) + } + } + + override protected def initValuesWriter (): Unit = { + writer = new DeltaBinaryPackingValuesWriterForLong( + blockSize, + miniBlockNum, + 100, + 200, + new DirectByteBufferAllocator()) + } + + override protected def allocDataArray(size: Int): Array[Long] = new Array[Long](size) + + override protected def getNextRandom: Long = random.nextLong + override protected def getTypeMinValue: Long = Long.MinValue + override protected def getTypeMaxValue: Long = Long.MaxValue + + override protected def readData(total: Int, columnVector: WritableColumnVector, rowId: Int): Unit + = reader.readLongs(total, columnVector, rowId) + + override protected def skip(total: Int): Unit = reader.skipLongs(total) + + override protected def readDataFromVector(columnVector: WritableColumnVector, rowId: Int): Long = + columnVector.getLong(rowId) + + override protected def estimatedSize(length: Int) : Double = { + val miniBlockSize = blockSize / miniBlockNum + val miniBlockFlushed = Math.ceil((length.toDouble - 1) / miniBlockSize) + val blockFlushed = Math.ceil((length.toDouble - 1) / blockSize) + 3 * 5 + 1 * 10 /* blockHeader + 3 * int + 1 * long */ + + 8 * miniBlockFlushed * miniBlockSize /* data(aligned to miniBlock) */ + + blockFlushed * miniBlockNum /* bitWidth of mini blocks */ + + (10.0 * blockFlushed) /* min delta for each block */ + } + + override protected def setValue(arr: Array[Long], index: Int, value: Int): Unit = { + arr(index) = value.toLong + } + + override protected def compareValues(expected: Int, actual: Long) : Boolean = { + expected.toLong == actual + } + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index 79b8c9e2c571..f545e8851770 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -16,12 +16,19 @@ */ package org.apache.spark.sql.execution.datasources.parquet +import java.math.BigDecimal +import java.sql.{Date, Timestamp} import java.time.{Duration, Period} import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.Path +import org.apache.parquet.column.{Encoding, ParquetProperties} import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession // TODO: this needs a lot more testing but it's currently not easy to test with the parquet @@ -125,4 +132,54 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess } } } + + test("parquet v2 pages - delta encoding") { + val extraOptions = Map[String, String]( + ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString, + ParquetOutputFormat.ENABLE_DICTIONARY -> "false" + ) + + val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) + withSQLConf( + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", + ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/test.parquet" + + val data = (1 to 3).map { i => + ( i, i.toLong, i.toShort, Array[Byte](i.toByte), s"test_${i}", + DateTimeUtils.fromJavaDate(Date.valueOf(s"2021-11-0" + i)), + DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(s"2020-11-01 12:00:0" + i)), + Period.of(1, i, 0), Duration.ofMillis(i * 100), + new BigDecimal(java.lang.Long.toUnsignedString(i*100000)) + ) + } + + spark.createDataFrame(data) + .write.options(extraOptions).mode("overwrite").parquet(path) + + val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head + val columnChunkMetadataList = blockMetadata.getColumns.asScala + + // Verify that indeed delta encoding is used for each column + assert(columnChunkMetadataList.length === 10) + assert(columnChunkMetadataList(0).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + assert(columnChunkMetadataList(1).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + assert(columnChunkMetadataList(2).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + // Both fixed-length byte array and variable-length byte array (also called BINARY) + // are use DELTA_BYTE_ARRAY for encoding + assert(columnChunkMetadataList(3).getEncodings.contains(Encoding.DELTA_BYTE_ARRAY)) + assert(columnChunkMetadataList(4).getEncodings.contains(Encoding.DELTA_BYTE_ARRAY)) + + assert(columnChunkMetadataList(5).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + assert(columnChunkMetadataList(6).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + assert(columnChunkMetadataList(7).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + assert(columnChunkMetadataList(8).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + assert(columnChunkMetadataList(9).getEncodings.contains(Encoding.DELTA_BYTE_ARRAY)) + + val actual = spark.read.parquet(path).collect() + assert(actual.sortBy(_.getInt(0)) === data.map(Row.fromTuple)); + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index d9683fa47ebe..49251af54193 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -134,33 +134,35 @@ abstract class ParquetRebaseDatetimeSuite tsOutputType: String = "TIMESTAMP_MICROS", inWriteConf: String = SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, inReadConf: String = SQLConf.PARQUET_REBASE_MODE_IN_READ.key): Unit = { - withTempPaths(2) { paths => - paths.foreach(_.delete()) + withAllParquetWriters { + withTempPaths(2) { paths => + paths.foreach(_.delete()) val oldPath = getResourceParquetFilePath("test-data/" + fileName) val path3_x = paths(0).getCanonicalPath val path3_x_rebase = paths(1).getCanonicalPath - val df = Seq.tabulate(N)(rowFunc).toDF("dict", "plain") - .select($"dict".cast(catalystType), $"plain".cast(catalystType)) - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsOutputType) { + val df = Seq.tabulate(N)(rowFunc).toDF("dict", "plain") + .select($"dict".cast(catalystType), $"plain".cast(catalystType)) + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsOutputType) { checkDefaultLegacyRead(oldPath) - withSQLConf(inWriteConf -> CORRECTED.toString) { + withSQLConf(inWriteConf -> CORRECTED.toString) { df.write.mode("overwrite").parquet(path3_x) - } - withSQLConf(inWriteConf -> LEGACY.toString) { + } + withSQLConf(inWriteConf -> LEGACY.toString) { df.write.parquet(path3_x_rebase) + } } - } - // For Parquet files written by Spark 3.0, we know the writer info and don't need the - // config to guide the rebase behavior. - runInMode(inReadConf, Seq(LEGACY)) { options => - checkAnswer( + // For Parquet files written by Spark 3.0, we know the writer info and don't need the + // config to guide the rebase behavior. + runInMode(inReadConf, Seq(LEGACY)) { options => + checkAnswer( spark.read.format("parquet").options(options).load(oldPath, path3_x, path3_x_rebase), - (0 until N).flatMap { i => - val (dictS, plainS) = rowFunc(i) - Seq.tabulate(3) { _ => - Row(toJavaType(dictS), toJavaType(plainS)) - } - }) + (0 until N).flatMap { i => + val (dictS, plainS) = rowFunc(i) + Seq.tabulate(3) { _ => + Row(toJavaType(dictS), toJavaType(plainS)) + } + }) + } } } } @@ -219,60 +221,62 @@ abstract class ParquetRebaseDatetimeSuite test("SPARK-31159, SPARK-37705: rebasing timestamps in write") { val N = 8 Seq(false, true).foreach { dictionaryEncoding => - Seq( - ( - "TIMESTAMP_MILLIS", - "1001-01-01 01:02:03.123", - "1001-01-07 01:09:05.123", - SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, - SQLConf.PARQUET_REBASE_MODE_IN_READ.key), - ( - "TIMESTAMP_MICROS", - "1001-01-01 01:02:03.123456", - "1001-01-07 01:09:05.123456", - SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, - SQLConf.PARQUET_REBASE_MODE_IN_READ.key), - ( - "INT96", - "1001-01-01 01:02:03.123456", - "1001-01-07 01:09:05.123456", - SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key, - SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ.key - ) - ).foreach { case (outType, tsStr, nonRebased, inWriteConf, inReadConf) => + withAllParquetWriters { + Seq( + ( + "TIMESTAMP_MILLIS", + "1001-01-01 01:02:03.123", + "1001-01-07 01:09:05.123", + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, + SQLConf.PARQUET_REBASE_MODE_IN_READ.key), + ( + "TIMESTAMP_MICROS", + "1001-01-01 01:02:03.123456", + "1001-01-07 01:09:05.123456", + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, + SQLConf.PARQUET_REBASE_MODE_IN_READ.key), + ( + "INT96", + "1001-01-01 01:02:03.123456", + "1001-01-07 01:09:05.123456", + SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key, + SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ.key + ) + ).foreach { case (outType, tsStr, nonRebased, inWriteConf, inReadConf) => // Ignore the default JVM time zone and use the session time zone instead of it in rebasing. DateTimeTestUtils.withDefaultTimeZone(DateTimeTestUtils.JST) { withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.LA.getId) { - withClue(s"output type $outType") { - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) { - withTempPath { dir => - val path = dir.getAbsolutePath - withSQLConf(inWriteConf -> LEGACY.toString) { - Seq.tabulate(N)(_ => tsStr).toDF("tsS") - .select($"tsS".cast("timestamp").as("ts")) - .repartition(1) - .write - .option("parquet.enable.dictionary", dictionaryEncoding) - .parquet(path) - } + withClue(s"output type $outType") { + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) { + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(inWriteConf -> LEGACY.toString) { + Seq.tabulate(N)(_ => tsStr).toDF("tsS") + .select($"tsS".cast("timestamp").as("ts")) + .repartition(1) + .write + .option("parquet.enable.dictionary", dictionaryEncoding) + .parquet(path) + } - withAllParquetReaders { + withAllParquetReaders { // The file metadata indicates if it needs rebase or not, so we can always get // the correct result regardless of the "rebase mode" config. - runInMode(inReadConf, Seq(LEGACY, CORRECTED, EXCEPTION)) { options => - checkAnswer( + runInMode(inReadConf, Seq(LEGACY, CORRECTED, EXCEPTION)) { options => + checkAnswer( spark.read.options(options).parquet(path).select($"ts".cast("string")), Seq.tabulate(N)(_ => Row(tsStr))) - } + } - // Force to not rebase to prove the written datetime values are rebased - // and we will get wrong result if we don't rebase while reading. - withSQLConf("spark.test.forceNoRebase" -> "true") { - checkAnswer( + // Force to not rebase to prove the written datetime values are rebased + // and we will get wrong result if we don't rebase while reading. + withSQLConf("spark.test.forceNoRebase" -> "true") { + checkAnswer( spark.read.parquet(path).select($"ts".cast("string")), Seq.tabulate(N)(_ => Row(nonRebased))) } } + } } } } @@ -285,34 +289,36 @@ abstract class ParquetRebaseDatetimeSuite test("SPARK-31159: rebasing dates in write") { val N = 8 Seq(false, true).foreach { dictionaryEncoding => - withTempPath { dir => - val path = dir.getAbsolutePath - withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { - Seq.tabulate(N)(_ => "1001-01-01").toDF("dateS") - .select($"dateS".cast("date").as("date")) - .repartition(1) - .write - .option("parquet.enable.dictionary", dictionaryEncoding) - .parquet(path) - } - - withAllParquetReaders { - // The file metadata indicates if it needs rebase or not, so we can always get the - // correct result regardless of the "rebase mode" config. - runInMode( - SQLConf.PARQUET_REBASE_MODE_IN_READ.key, - Seq(LEGACY, CORRECTED, EXCEPTION)) { options => - checkAnswer( - spark.read.options(options).parquet(path), - Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01")))) + withAllParquetWriters { + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { + Seq.tabulate(N)(_ => "1001-01-01").toDF("dateS") + .select($"dateS".cast("date").as("date")) + .repartition(1) + .write + .option("parquet.enable.dictionary", dictionaryEncoding) + .parquet(path) } - // Force to not rebase to prove the written datetime values are rebased and we will get - // wrong result if we don't rebase while reading. - withSQLConf("spark.test.forceNoRebase" -> "true") { - checkAnswer( - spark.read.parquet(path), - Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07")))) + withAllParquetReaders { + // The file metadata indicates if it needs rebase or not, so we can always get the + // correct result regardless of the "rebase mode" config. + runInMode( + SQLConf.PARQUET_REBASE_MODE_IN_READ.key, + Seq(LEGACY, CORRECTED, EXCEPTION)) { options => + checkAnswer( + spark.read.options(options).parquet(path), + Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01")))) + } + + // Force to not rebase to prove the written datetime values are rebased and we will get + // wrong result if we don't rebase while reading. + withSQLConf("spark.test.forceNoRebase" -> "true") { + checkAnswer( + spark.read.parquet(path), + Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07")))) + } } } } @@ -338,18 +344,23 @@ abstract class ParquetRebaseDatetimeSuite } } } - withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { - withTempPath { dir => - checkMetadataKey(dir, exists = true) + withAllParquetWriters { + withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { + withTempPath { dir => + checkMetadataKey(dir, exists = true) + } } - } - withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { - withTempPath { dir => - checkMetadataKey(dir, exists = false) + withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { + withTempPath { dir => + checkMetadataKey(dir, exists = false) + } + } + withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) { + withTempPath { dir => intercept[SparkException] { + checkMetadataKey(dir, exists = false) + } + } } - } - withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) { - withTempPath { dir => intercept[SparkException] { checkMetadataKey(dir, exists = false) } } } } @@ -361,28 +372,33 @@ abstract class ParquetRebaseDatetimeSuite .write .parquet(dir.getAbsolutePath) } - withSQLConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { - withTempPath { dir => - saveTs(dir) + withAllParquetWriters { + withSQLConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { + withTempPath { dir => + saveTs(dir) assert(getMetaData(dir)(SPARK_LEGACY_INT96_METADATA_KEY) === "") assert(getMetaData(dir)(SPARK_TIMEZONE_METADATA_KEY) === SQLConf.get.sessionLocalTimeZone) + } } - } - withSQLConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { - withTempPath { dir => - saveTs(dir) + withSQLConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { + withTempPath { dir => + saveTs(dir) assert(getMetaData(dir).get(SPARK_LEGACY_INT96_METADATA_KEY).isEmpty) assert(getMetaData(dir).get(SPARK_TIMEZONE_METADATA_KEY).isEmpty) + } } - } - withSQLConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) { - withTempPath { dir => intercept[SparkException] { saveTs(dir) } } - } - withSQLConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) { - withTempPath { dir => - saveTs(dir, "2020-10-22 01:02:03") + withSQLConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) { + withTempPath { dir => intercept[SparkException] { + saveTs(dir) + } + } + } + withSQLConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) { + withTempPath { dir => + saveTs(dir, "2020-10-22 01:02:03") assert(getMetaData(dir).get(SPARK_LEGACY_INT96_METADATA_KEY).isEmpty) assert(getMetaData(dir).get(SPARK_TIMEZONE_METADATA_KEY).isEmpty) + } } } } @@ -400,25 +416,27 @@ abstract class ParquetRebaseDatetimeSuite assert(errMsg.contains("You may get a different result due to the upgrading")) } } - withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) { - Seq(TIMESTAMP_MICROS, TIMESTAMP_MILLIS).foreach { tsType => - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsType.toString) { - checkTsWrite() + withAllParquetWriters { + withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) { + Seq(TIMESTAMP_MICROS, TIMESTAMP_MILLIS).foreach { tsType => + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsType.toString) { + checkTsWrite() + } } - } - withTempPath { dir => - val df = Seq(java.sql.Date.valueOf("1001-01-01")).toDF("dt") - val e = intercept[SparkException] { - df.write.parquet(dir.getCanonicalPath) + withTempPath { dir => + val df = Seq(java.sql.Date.valueOf("1001-01-01")).toDF("dt") + val e = intercept[SparkException] { + df.write.parquet(dir.getCanonicalPath) + } + val errMsg = e.getCause.getCause.getCause.asInstanceOf[SparkUpgradeException].getMessage + assert(errMsg.contains("You may get a different result due to the upgrading")) } - val errMsg = e.getCause.getCause.getCause.asInstanceOf[SparkUpgradeException].getMessage - assert(errMsg.contains("You may get a different result due to the upgrading")) } - } - withSQLConf( - SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString, - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> INT96.toString) { - checkTsWrite() + withSQLConf( + SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> INT96.toString) { + checkTsWrite() + } } def checkRead(fileName: String): Unit = { @@ -428,14 +446,16 @@ abstract class ParquetRebaseDatetimeSuite val errMsg = e.getCause.asInstanceOf[SparkUpgradeException].getMessage assert(errMsg.contains("You may get a different result due to the upgrading")) } - withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_READ.key -> EXCEPTION.toString) { - Seq( - "before_1582_date_v2_4_5.snappy.parquet", - "before_1582_timestamp_micros_v2_4_5.snappy.parquet", - "before_1582_timestamp_millis_v2_4_5.snappy.parquet").foreach(checkRead) - } - withSQLConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ.key -> EXCEPTION.toString) { - checkRead("before_1582_timestamp_int96_dict_v2_4_5.snappy.parquet") + withAllParquetWriters { + withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_READ.key -> EXCEPTION.toString) { + Seq( + "before_1582_date_v2_4_5.snappy.parquet", + "before_1582_timestamp_micros_v2_4_5.snappy.parquet", + "before_1582_timestamp_millis_v2_4_5.snappy.parquet").foreach(checkRead) + } + withSQLConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ.key -> EXCEPTION.toString) { + checkRead("before_1582_timestamp_int96_dict_v2_4_5.snappy.parquet") + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index 7a4a382f7f5c..47723166213d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -26,8 +26,9 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.HadoopReadOptions +import org.apache.parquet.column.ParquetProperties import org.apache.parquet.format.converter.ParquetMetadataConverter -import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter} +import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter, ParquetOutputFormat} import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, ParquetMetadata} import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.schema.MessageType @@ -168,6 +169,15 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code) } + def withAllParquetWriters(code: => Unit): Unit = { + // Parquet version 1 + withSQLConf(ParquetOutputFormat.WRITER_VERSION -> + ParquetProperties.WriterVersion.PARQUET_1_0.toString)(code) + // Parquet version 2 + withSQLConf(ParquetOutputFormat.WRITER_VERSION -> + ParquetProperties.WriterVersion.PARQUET_2_0.toString)(code) + } + def getMetaData(dir: java.io.File): Map[String, String] = { val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0) val conf = new Configuration()