diff --git a/flink-formats/flink-orc-nohive/pom.xml b/flink-formats/flink-orc-nohive/pom.xml index e574c46f94fec..f441709fd29f9 100644 --- a/flink-formats/flink-orc-nohive/pom.xml +++ b/flink-formats/flink-orc-nohive/pom.xml @@ -36,7 +36,7 @@ under the License. - + org.apache.flink @@ -45,6 +45,8 @@ under the License. provided + + org.apache.flink flink-orc_${scala.binary.version} @@ -57,6 +59,8 @@ under the License. + + org.apache.orc orc-core @@ -69,8 +73,9 @@ under the License. - - + + + org.apache.flink flink-orc_${scala.binary.version} diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml index d8819b5a6750c..f902f3bbbad4b 100644 --- a/flink-formats/flink-orc/pom.xml +++ b/flink-formats/flink-orc/pom.xml @@ -36,32 +36,16 @@ under the License. - + org.apache.flink - flink-streaming-scala_${scala.binary.version} + flink-streaming-java_${scala.binary.version} ${project.version} provided - - - org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} - ${project.version} - provided - true - - - - org.apache.flink - flink-table-planner_${scala.binary.version} - ${project.version} - provided - true - org.apache.flink @@ -70,6 +54,8 @@ under the License. provided + + org.apache.orc orc-core @@ -113,12 +99,7 @@ under the License. provided - - - - org.apache.flink - flink-test-utils-junit - + org.apache.flink diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java deleted file mode 100644 index 9f4d6e46233f2..0000000000000 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java +++ /dev/null @@ -1,1559 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.types.Row; - -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.orc.TypeDescription; - -import java.lang.reflect.Array; -import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; -import java.sql.Date; -import java.sql.Timestamp; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; -import java.util.function.DoubleFunction; -import java.util.function.Function; -import java.util.function.LongFunction; - -/** A class that provides utility methods for orc file reading. */ -class OrcBatchReader { - - private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000 - private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); - - /** - * Converts an ORC schema to a Flink TypeInformation. - * - * @param schema The ORC schema. - * @return The TypeInformation that corresponds to the ORC schema. - */ - static TypeInformation schemaToTypeInfo(TypeDescription schema) { - switch (schema.getCategory()) { - case BOOLEAN: - return BasicTypeInfo.BOOLEAN_TYPE_INFO; - case BYTE: - return BasicTypeInfo.BYTE_TYPE_INFO; - case SHORT: - return BasicTypeInfo.SHORT_TYPE_INFO; - case INT: - return BasicTypeInfo.INT_TYPE_INFO; - case LONG: - return BasicTypeInfo.LONG_TYPE_INFO; - case FLOAT: - return BasicTypeInfo.FLOAT_TYPE_INFO; - case DOUBLE: - return BasicTypeInfo.DOUBLE_TYPE_INFO; - case DECIMAL: - return BasicTypeInfo.BIG_DEC_TYPE_INFO; - case STRING: - case CHAR: - case VARCHAR: - return BasicTypeInfo.STRING_TYPE_INFO; - case DATE: - return SqlTimeTypeInfo.DATE; - case TIMESTAMP: - return SqlTimeTypeInfo.TIMESTAMP; - case BINARY: - return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; - case STRUCT: - List fieldSchemas = schema.getChildren(); - TypeInformation[] fieldTypes = new TypeInformation[fieldSchemas.size()]; - for (int i = 0; i < fieldSchemas.size(); i++) { - fieldTypes[i] = schemaToTypeInfo(fieldSchemas.get(i)); - } - String[] fieldNames = schema.getFieldNames().toArray(new String[] {}); - return new RowTypeInfo(fieldTypes, fieldNames); - case LIST: - TypeDescription elementSchema = schema.getChildren().get(0); - TypeInformation elementType = schemaToTypeInfo(elementSchema); - // arrays of primitive types are handled as object arrays to support null values - return ObjectArrayTypeInfo.getInfoFor(elementType); - case MAP: - TypeDescription keySchema = schema.getChildren().get(0); - TypeDescription valSchema = schema.getChildren().get(1); - TypeInformation keyType = schemaToTypeInfo(keySchema); - TypeInformation valType = schemaToTypeInfo(valSchema); - return new MapTypeInfo<>(keyType, valType); - case UNION: - throw new UnsupportedOperationException("UNION type is not supported yet."); - default: - throw new IllegalArgumentException("Unknown type " + schema); - } - } - - /** - * Fills an ORC batch into an array of Row. - * - * @param rows The batch of rows need to be filled. - * @param schema The schema of the ORC data. - * @param batch The ORC data. - * @param selectedFields The list of selected ORC fields. - * @return The number of rows that were filled. - */ - static int fillRows( - Row[] rows, TypeDescription schema, VectorizedRowBatch batch, int[] selectedFields) { - - int rowsToRead = Math.min((int) batch.count(), rows.length); - - List fieldTypes = schema.getChildren(); - // read each selected field - for (int fieldIdx = 0; fieldIdx < selectedFields.length; fieldIdx++) { - int orcIdx = selectedFields[fieldIdx]; - readField(rows, fieldIdx, fieldTypes.get(orcIdx), batch.cols[orcIdx], rowsToRead); - } - return rowsToRead; - } - - /** - * Reads a vector of data into an array of objects. - * - * @param vals The array that needs to be filled. - * @param fieldIdx If the vals array is an array of Row, the index of the field that needs to be - * filled. Otherwise a -1 must be passed and the data is directly filled into the array. - * @param schema The schema of the vector to read. - * @param vector The vector to read. - * @param childCount The number of vector entries to read. - */ - private static void readField( - Object[] vals, - int fieldIdx, - TypeDescription schema, - ColumnVector vector, - int childCount) { - - // check the type of the vector to decide how to read it. - switch (schema.getCategory()) { - case BOOLEAN: - if (vector.noNulls) { - readNonNullLongColumn( - vals, - fieldIdx, - (LongColumnVector) vector, - childCount, - OrcBatchReader::readBoolean); - } else { - readLongColumn( - vals, - fieldIdx, - (LongColumnVector) vector, - childCount, - OrcBatchReader::readBoolean); - } - break; - case BYTE: - if (vector.noNulls) { - readNonNullLongColumn( - vals, - fieldIdx, - (LongColumnVector) vector, - childCount, - OrcBatchReader::readByte); - } else { - readLongColumn( - vals, - fieldIdx, - (LongColumnVector) vector, - childCount, - OrcBatchReader::readByte); - } - break; - case SHORT: - if (vector.noNulls) { - readNonNullLongColumn( - vals, - fieldIdx, - (LongColumnVector) vector, - childCount, - OrcBatchReader::readShort); - } else { - readLongColumn( - vals, - fieldIdx, - (LongColumnVector) vector, - childCount, - OrcBatchReader::readShort); - } - break; - case INT: - if (vector.noNulls) { - readNonNullLongColumn( - vals, - fieldIdx, - (LongColumnVector) vector, - childCount, - OrcBatchReader::readInt); - } else { - readLongColumn( - vals, - fieldIdx, - (LongColumnVector) vector, - childCount, - OrcBatchReader::readInt); - } - break; - case LONG: - if (vector.noNulls) { - readNonNullLongColumn( - vals, - fieldIdx, - (LongColumnVector) vector, - childCount, - OrcBatchReader::readLong); - } else { - readLongColumn( - vals, - fieldIdx, - (LongColumnVector) vector, - childCount, - OrcBatchReader::readLong); - } - break; - case FLOAT: - if (vector.noNulls) { - readNonNullDoubleColumn( - vals, - fieldIdx, - (DoubleColumnVector) vector, - childCount, - OrcBatchReader::readFloat); - } else { - readDoubleColumn( - vals, - fieldIdx, - (DoubleColumnVector) vector, - childCount, - OrcBatchReader::readFloat); - } - break; - case DOUBLE: - if (vector.noNulls) { - readNonNullDoubleColumn( - vals, - fieldIdx, - (DoubleColumnVector) vector, - childCount, - OrcBatchReader::readDouble); - } else { - readDoubleColumn( - vals, - fieldIdx, - (DoubleColumnVector) vector, - childCount, - OrcBatchReader::readDouble); - } - break; - case CHAR: - case VARCHAR: - case STRING: - if (vector.noNulls) { - readNonNullBytesColumnAsString( - vals, fieldIdx, (BytesColumnVector) vector, childCount); - } else { - readBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, childCount); - } - break; - case DATE: - if (vector.noNulls) { - readNonNullLongColumnAsDate( - vals, fieldIdx, (LongColumnVector) vector, childCount); - } else { - readLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, childCount); - } - break; - case TIMESTAMP: - if (vector.noNulls) { - readNonNullTimestampColumn( - vals, fieldIdx, (TimestampColumnVector) vector, childCount); - } else { - readTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, childCount); - } - break; - case BINARY: - if (vector.noNulls) { - readNonNullBytesColumnAsBinary( - vals, fieldIdx, (BytesColumnVector) vector, childCount); - } else { - readBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, childCount); - } - break; - case DECIMAL: - if (vector.noNulls) { - readNonNullDecimalColumn( - vals, fieldIdx, (DecimalColumnVector) vector, childCount); - } else { - readDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, childCount); - } - break; - case STRUCT: - if (vector.noNulls) { - readNonNullStructColumn( - vals, fieldIdx, (StructColumnVector) vector, schema, childCount); - } else { - readStructColumn( - vals, fieldIdx, (StructColumnVector) vector, schema, childCount); - } - break; - case LIST: - if (vector.noNulls) { - readNonNullListColumn( - vals, fieldIdx, (ListColumnVector) vector, schema, childCount); - } else { - readListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, childCount); - } - break; - case MAP: - if (vector.noNulls) { - readNonNullMapColumn( - vals, fieldIdx, (MapColumnVector) vector, schema, childCount); - } else { - readMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, childCount); - } - break; - case UNION: - throw new UnsupportedOperationException("UNION type not supported yet"); - default: - throw new IllegalArgumentException("Unknown type " + schema); - } - } - - private static void readNonNullLongColumn( - Object[] vals, - int fieldIdx, - LongColumnVector vector, - int childCount, - LongFunction reader) { - - if (vector.isRepeating) { // fill complete column with first value - T repeatingValue = reader.apply(vector.vector[0]); - fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount); - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = reader.apply(vector.vector[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, reader.apply(vector.vector[i])); - } - } - } - } - - private static void readNonNullDoubleColumn( - Object[] vals, - int fieldIdx, - DoubleColumnVector vector, - int childCount, - DoubleFunction reader) { - - if (vector.isRepeating) { // fill complete column with first value - T repeatingValue = reader.apply(vector.vector[0]); - fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount); - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = reader.apply(vector.vector[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, reader.apply(vector.vector[i])); - } - } - } - } - - private static void readNonNullBytesColumnAsString( - Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) { - if (bytes.isRepeating) { // fill complete column with first value - String repeatingValue = readString(bytes.vector[0], bytes.start[0], bytes.length[0]); - fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount); - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = readString(bytes.vector[i], bytes.start[i], bytes.length[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField( - fieldIdx, readString(bytes.vector[i], bytes.start[i], bytes.length[i])); - } - } - } - } - - private static void readNonNullBytesColumnAsBinary( - Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) { - if (bytes.isRepeating) { // fill complete column with first value - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - // don't reuse repeating val to avoid object mutation - vals[i] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - // don't reuse repeating val to avoid object mutation - rows[i].setField( - fieldIdx, readBinary(bytes.vector[0], bytes.start[0], bytes.length[0])); - } - } - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField( - fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i])); - } - } - } - } - - private static void readNonNullLongColumnAsDate( - Object[] vals, int fieldIdx, LongColumnVector vector, int childCount) { - - if (vector.isRepeating) { // fill complete column with first value - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - // do not reuse repeated value due to mutability of Date - vals[i] = readDate(vector.vector[0]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - // do not reuse repeated value due to mutability of Date - rows[i].setField(fieldIdx, readDate(vector.vector[0])); - } - } - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = readDate(vector.vector[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, readDate(vector.vector[i])); - } - } - } - } - - private static void readNonNullTimestampColumn( - Object[] vals, int fieldIdx, TimestampColumnVector vector, int childCount) { - - if (vector.isRepeating) { // fill complete column with first value - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - // do not reuse value to prevent object mutation - vals[i] = readTimestamp(vector.time[0], vector.nanos[0]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - // do not reuse value to prevent object mutation - rows[i].setField(fieldIdx, readTimestamp(vector.time[0], vector.nanos[0])); - } - } - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = readTimestamp(vector.time[i], vector.nanos[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, readTimestamp(vector.time[i], vector.nanos[i])); - } - } - } - } - - private static void readNonNullDecimalColumn( - Object[] vals, int fieldIdx, DecimalColumnVector vector, int childCount) { - - if (vector.isRepeating) { // fill complete column with first value - fillColumnWithRepeatingValue( - vals, fieldIdx, readBigDecimal(vector.vector[0]), childCount); - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = readBigDecimal(vector.vector[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i])); - } - } - } - } - - private static void readNonNullStructColumn( - Object[] vals, - int fieldIdx, - StructColumnVector structVector, - TypeDescription schema, - int childCount) { - - List childrenTypes = schema.getChildren(); - - int numFields = childrenTypes.size(); - // create a batch of Rows to read the structs - Row[] structs = new Row[childCount]; - // TODO: possible improvement: reuse existing Row objects - for (int i = 0; i < childCount; i++) { - structs[i] = new Row(numFields); - } - - // read struct fields - // we don't have to handle isRepeating because ORC assumes that it is propagated into the - // children. - for (int i = 0; i < numFields; i++) { - readField(structs, i, childrenTypes.get(i), structVector.fields[i], childCount); - } - - if (fieldIdx == -1) { // set struct as an object - System.arraycopy(structs, 0, vals, 0, childCount); - } else { // set struct as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, structs[i]); - } - } - } - - private static void readNonNullListColumn( - Object[] vals, - int fieldIdx, - ListColumnVector list, - TypeDescription schema, - int childCount) { - - TypeDescription fieldType = schema.getChildren().get(0); - // get class of list elements - Class classType = getClassForType(fieldType); - - if (list.isRepeating) { - - int offset = (int) list.offsets[0]; - int length = (int) list.lengths[0]; - // we only need to read until offset + length. - int entriesToRead = offset + length; - - // read children - Object[] children = (Object[]) Array.newInstance(classType, entriesToRead); - readField(children, -1, fieldType, list.child, entriesToRead); - - // get function to copy list - Function copyList = getCopyFunction(schema); - - // create first list that will be copied - Object[] first; - if (offset == 0) { - first = children; - } else { - first = (Object[]) Array.newInstance(classType, length); - System.arraycopy(children, offset, first, 0, length); - } - - // create copies of first list and set copies as result - for (int i = 0; i < childCount; i++) { - Object[] copy = (Object[]) copyList.apply(first); - if (fieldIdx == -1) { - vals[i] = copy; - } else { - ((Row) vals[i]).setField(fieldIdx, copy); - } - } - } else { - - // read children - Object[] children = (Object[]) Array.newInstance(classType, list.childCount); - readField(children, -1, fieldType, list.child, list.childCount); - - // fill lists with children - for (int i = 0; i < childCount; i++) { - int offset = (int) list.offsets[i]; - int length = (int) list.lengths[i]; - - Object[] temp = (Object[]) Array.newInstance(classType, length); - System.arraycopy(children, offset, temp, 0, length); - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - - private static void readNonNullMapColumn( - Object[] vals, - int fieldIdx, - MapColumnVector mapsVector, - TypeDescription schema, - int childCount) { - - List fieldType = schema.getChildren(); - TypeDescription keyType = fieldType.get(0); - TypeDescription valueType = fieldType.get(1); - - ColumnVector keys = mapsVector.keys; - ColumnVector values = mapsVector.values; - - if (mapsVector.isRepeating) { - // first map is repeated - - // get map copy function - Function copyMap = getCopyFunction(schema); - - // set all key and value entries except those of the first map to null - int offset = (int) mapsVector.offsets[0]; - int length = (int) mapsVector.lengths[0]; - // we only need to read until offset + length. - int entriesToRead = offset + length; - - Object[] keyRows = new Object[entriesToRead]; - Object[] valueRows = new Object[entriesToRead]; - - // read map keys and values - readField(keyRows, -1, keyType, keys, entriesToRead); - readField(valueRows, -1, valueType, values, entriesToRead); - - // create first map that will be copied - HashMap map = readHashMap(keyRows, valueRows, offset, length); - - // copy first map and set copy as result - for (int i = 0; i < childCount; i++) { - if (fieldIdx == -1) { - vals[i] = copyMap.apply(map); - } else { - ((Row) vals[i]).setField(fieldIdx, copyMap.apply(map)); - } - } - - } else { - - Object[] keyRows = new Object[mapsVector.childCount]; - Object[] valueRows = new Object[mapsVector.childCount]; - - // read map keys and values - readField(keyRows, -1, keyType, keys, keyRows.length); - readField(valueRows, -1, valueType, values, valueRows.length); - - long[] lengthVectorMap = mapsVector.lengths; - int offset = 0; - - for (int i = 0; i < childCount; i++) { - long numMapEntries = lengthVectorMap[i]; - HashMap map = readHashMap(keyRows, valueRows, offset, numMapEntries); - offset += numMapEntries; - - if (fieldIdx == -1) { - vals[i] = map; - } else { - ((Row) vals[i]).setField(fieldIdx, map); - } - } - } - } - - private static void readLongColumn( - Object[] vals, - int fieldIdx, - LongColumnVector vector, - int childCount, - LongFunction reader) { - - if (vector.isRepeating) { // fill complete column with first value - if (vector.isNull[0]) { - // fill vals with null values - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - // read repeating non-null value by forwarding call. - readNonNullLongColumn(vals, fieldIdx, vector, childCount, reader); - } - } else { - boolean[] isNullVector = vector.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = reader.apply(vector.vector[i]); - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, reader.apply(vector.vector[i])); - } - } - } - } - } - - private static void readDoubleColumn( - Object[] vals, - int fieldIdx, - DoubleColumnVector vector, - int childCount, - DoubleFunction reader) { - - if (vector.isRepeating) { // fill complete column with first value - if (vector.isNull[0]) { - // fill vals with null values - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - // read repeating non-null value by forwarding call - readNonNullDoubleColumn(vals, fieldIdx, vector, childCount, reader); - } - } else { - boolean[] isNullVector = vector.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = reader.apply(vector.vector[i]); - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, reader.apply(vector.vector[i])); - } - } - } - } - } - - private static void readBytesColumnAsString( - Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) { - - if (bytes.isRepeating) { // fill complete column with first value - if (bytes.isNull[0]) { - // fill vals with null values - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - // read repeating non-null value by forwarding call - readNonNullBytesColumnAsString(vals, fieldIdx, bytes, childCount); - } - } else { - boolean[] isNullVector = bytes.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = readString(bytes.vector[i], bytes.start[i], bytes.length[i]); - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField( - fieldIdx, - readString(bytes.vector[i], bytes.start[i], bytes.length[i])); - } - } - } - } - } - - private static void readBytesColumnAsBinary( - Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) { - - if (bytes.isRepeating) { // fill complete column with first value - if (bytes.isNull[0]) { - // fill vals with null values - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - // read repeating non-null value by forwarding call - readNonNullBytesColumnAsBinary(vals, fieldIdx, bytes, childCount); - } - } else { - boolean[] isNullVector = bytes.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]); - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField( - fieldIdx, - readBinary(bytes.vector[i], bytes.start[i], bytes.length[i])); - } - } - } - } - } - - private static void readLongColumnAsDate( - Object[] vals, int fieldIdx, LongColumnVector vector, int childCount) { - - if (vector.isRepeating) { // fill complete column with first value - if (vector.isNull[0]) { - // fill vals with null values - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - // read repeating non-null value by forwarding call - readNonNullLongColumnAsDate(vals, fieldIdx, vector, childCount); - } - } else { - boolean[] isNullVector = vector.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = readDate(vector.vector[i]); - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, readDate(vector.vector[i])); - } - } - } - } - } - - private static void readTimestampColumn( - Object[] vals, int fieldIdx, TimestampColumnVector vector, int childCount) { - - if (vector.isRepeating) { // fill complete column with first value - if (vector.isNull[0]) { - // fill vals with null values - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - // read repeating non-null value by forwarding call - readNonNullTimestampColumn(vals, fieldIdx, vector, childCount); - } - } else { - boolean[] isNullVector = vector.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]); - vals[i] = ts; - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]); - rows[i].setField(fieldIdx, ts); - } - } - } - } - } - - private static void readDecimalColumn( - Object[] vals, int fieldIdx, DecimalColumnVector vector, int childCount) { - - if (vector.isRepeating) { // fill complete column with first value - if (vector.isNull[0]) { - // fill vals with null values - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - // read repeating non-null value by forwarding call - readNonNullDecimalColumn(vals, fieldIdx, vector, childCount); - } - } else { - boolean[] isNullVector = vector.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = readBigDecimal(vector.vector[i]); - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i])); - } - } - } - } - } - - private static void readStructColumn( - Object[] vals, - int fieldIdx, - StructColumnVector structVector, - TypeDescription schema, - int childCount) { - - List childrenTypes = schema.getChildren(); - - int numFields = childrenTypes.size(); - - // Early out if struct column is repeating and always null. - // This is the only repeating case we need to handle. - // ORC assumes that repeating values have been pushed to the children. - if (structVector.isRepeating && structVector.isNull[0]) { - if (fieldIdx < 0) { - for (int i = 0; i < childCount; i++) { - vals[i] = null; - } - } else { - for (int i = 0; i < childCount; i++) { - ((Row) vals[i]).setField(fieldIdx, null); - } - } - return; - } - - // create a batch of Rows to read the structs - Row[] structs = new Row[childCount]; - // TODO: possible improvement: reuse existing Row objects - for (int i = 0; i < childCount; i++) { - structs[i] = new Row(numFields); - } - - // read struct fields - for (int i = 0; i < numFields; i++) { - ColumnVector fieldVector = structVector.fields[i]; - if (!fieldVector.isRepeating) { - // Reduce fieldVector reads by setting all entries null where struct is null. - if (fieldVector.noNulls) { - // fieldVector had no nulls. Just use struct null information. - System.arraycopy( - structVector.isNull, - 0, - fieldVector.isNull, - 0, - structVector.isNull.length); - structVector.fields[i].noNulls = false; - } else { - // fieldVector had nulls. Merge field nulls with struct nulls. - for (int j = 0; j < structVector.isNull.length; j++) { - structVector.fields[i].isNull[j] = - structVector.isNull[j] || structVector.fields[i].isNull[j]; - } - } - } - readField(structs, i, childrenTypes.get(i), structVector.fields[i], childCount); - } - - boolean[] isNullVector = structVector.isNull; - - if (fieldIdx == -1) { // set struct as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = structs[i]; - } - } - } else { // set struct as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, structs[i]); - } - } - } - } - - private static void readListColumn( - Object[] vals, - int fieldIdx, - ListColumnVector list, - TypeDescription schema, - int childCount) { - - TypeDescription fieldType = schema.getChildren().get(0); - // get class of list elements - Class classType = getClassForType(fieldType); - - if (list.isRepeating) { - // list values are repeating. we only need to read the first list and copy it. - - if (list.isNull[0]) { - // Even better. The first list is null and so are all lists are null - for (int i = 0; i < childCount; i++) { - if (fieldIdx == -1) { - vals[i] = null; - } else { - ((Row) vals[i]).setField(fieldIdx, null); - } - } - - } else { - // Get function to copy list - Function copyList = getCopyFunction(schema); - - int offset = (int) list.offsets[0]; - int length = (int) list.lengths[0]; - // we only need to read until offset + length. - int entriesToRead = offset + length; - - // read entries - Object[] children = (Object[]) Array.newInstance(classType, entriesToRead); - readField(children, -1, fieldType, list.child, entriesToRead); - - // create first list which will be copied - Object[] temp; - if (offset == 0) { - temp = children; - } else { - temp = (Object[]) Array.newInstance(classType, length); - System.arraycopy(children, offset, temp, 0, length); - } - - // copy repeated list and set copy as result - for (int i = 0; i < childCount; i++) { - Object[] copy = (Object[]) copyList.apply(temp); - if (fieldIdx == -1) { - vals[i] = copy; - } else { - ((Row) vals[i]).setField(fieldIdx, copy); - } - } - } - - } else { - if (!list.child.isRepeating) { - boolean[] childIsNull = new boolean[list.childCount]; - Arrays.fill(childIsNull, true); - // forward info of null lists into child vector - for (int i = 0; i < childCount; i++) { - // preserve isNull info of entries of non-null lists - if (!list.isNull[i]) { - int offset = (int) list.offsets[i]; - int length = (int) list.lengths[i]; - System.arraycopy(list.child.isNull, offset, childIsNull, offset, length); - } - } - // override isNull of children vector - list.child.isNull = childIsNull; - list.child.noNulls = false; - } - - // read children - Object[] children = (Object[]) Array.newInstance(classType, list.childCount); - readField(children, -1, fieldType, list.child, list.childCount); - - Object[] temp; - // fill lists with children - for (int i = 0; i < childCount; i++) { - - if (list.isNull[i]) { - temp = null; - } else { - int offset = (int) list.offsets[i]; - int length = (int) list.lengths[i]; - - temp = (Object[]) Array.newInstance(classType, length); - System.arraycopy(children, offset, temp, 0, length); - } - - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - - private static void readMapColumn( - Object[] vals, - int fieldIdx, - MapColumnVector map, - TypeDescription schema, - int childCount) { - - List fieldType = schema.getChildren(); - TypeDescription keyType = fieldType.get(0); - TypeDescription valueType = fieldType.get(1); - - ColumnVector keys = map.keys; - ColumnVector values = map.values; - - if (map.isRepeating) { - // map values are repeating. we only need to read the first map and copy it. - - if (map.isNull[0]) { - // Even better. The first map is null and so are all maps are null - for (int i = 0; i < childCount; i++) { - if (fieldIdx == -1) { - vals[i] = null; - } else { - ((Row) vals[i]).setField(fieldIdx, null); - } - } - - } else { - // Get function to copy map - Function copyMap = getCopyFunction(schema); - - int offset = (int) map.offsets[0]; - int length = (int) map.lengths[0]; - // we only need to read until offset + length. - int entriesToRead = offset + length; - - Object[] keyRows = new Object[entriesToRead]; - Object[] valueRows = new Object[entriesToRead]; - - // read map keys and values - readField(keyRows, -1, keyType, keys, entriesToRead); - readField(valueRows, -1, valueType, values, entriesToRead); - - // create first map which will be copied - HashMap temp = readHashMap(keyRows, valueRows, offset, length); - - // copy repeated map and set copy as result - for (int i = 0; i < childCount; i++) { - if (fieldIdx == -1) { - vals[i] = copyMap.apply(temp); - } else { - ((Row) vals[i]).setField(fieldIdx, copyMap.apply(temp)); - } - } - } - } else { - // ensure only keys and values that are referenced by non-null maps are set to non-null - - if (!keys.isRepeating) { - // propagate is null info of map into keys vector - boolean[] keyIsNull = new boolean[map.childCount]; - Arrays.fill(keyIsNull, true); - for (int i = 0; i < childCount; i++) { - // preserve isNull info for keys of non-null maps - if (!map.isNull[i]) { - int offset = (int) map.offsets[i]; - int length = (int) map.lengths[i]; - System.arraycopy(keys.isNull, offset, keyIsNull, offset, length); - } - } - // override isNull of keys vector - keys.isNull = keyIsNull; - keys.noNulls = false; - } - if (!values.isRepeating) { - // propagate is null info of map into values vector - boolean[] valIsNull = new boolean[map.childCount]; - Arrays.fill(valIsNull, true); - for (int i = 0; i < childCount; i++) { - // preserve isNull info for vals of non-null maps - if (!map.isNull[i]) { - int offset = (int) map.offsets[i]; - int length = (int) map.lengths[i]; - System.arraycopy(values.isNull, offset, valIsNull, offset, length); - } - } - // override isNull of values vector - values.isNull = valIsNull; - values.noNulls = false; - } - - Object[] keyRows = new Object[map.childCount]; - Object[] valueRows = new Object[map.childCount]; - - // read map keys and values - readField(keyRows, -1, keyType, keys, keyRows.length); - readField(valueRows, -1, valueType, values, valueRows.length); - - boolean[] isNullVector = map.isNull; - long[] lengths = map.lengths; - long[] offsets = map.offsets; - - if (fieldIdx == -1) { // set map as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = readHashMap(keyRows, valueRows, (int) offsets[i], lengths[i]); - } - } - } else { // set map as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField( - fieldIdx, - readHashMap(keyRows, valueRows, (int) offsets[i], lengths[i])); - } - } - } - } - } - - /** - * Sets a repeating value to all objects or row fields of the passed vals array. - * - * @param vals The array of objects or Rows. - * @param fieldIdx If the objs array is an array of Row, the index of the field that needs to be - * filled. Otherwise a -1 must be passed and the data is directly filled into the array. - * @param repeatingValue The value that is set. - * @param childCount The number of times the value is set. - */ - private static void fillColumnWithRepeatingValue( - Object[] vals, int fieldIdx, Object repeatingValue, int childCount) { - - if (fieldIdx == -1) { - // set value as an object - Arrays.fill(vals, 0, childCount, repeatingValue); - } else { - // set value as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, repeatingValue); - } - } - } - - private static Class getClassForType(TypeDescription schema) { - - // check the type of the vector to decide how to read it. - switch (schema.getCategory()) { - case BOOLEAN: - return Boolean.class; - case BYTE: - return Byte.class; - case SHORT: - return Short.class; - case INT: - return Integer.class; - case LONG: - return Long.class; - case FLOAT: - return Float.class; - case DOUBLE: - return Double.class; - case CHAR: - case VARCHAR: - case STRING: - return String.class; - case DATE: - return Date.class; - case TIMESTAMP: - return Timestamp.class; - case BINARY: - return byte[].class; - case DECIMAL: - return BigDecimal.class; - case STRUCT: - return Row.class; - case LIST: - Class childClass = getClassForType(schema.getChildren().get(0)); - return Array.newInstance(childClass, 0).getClass(); - case MAP: - return HashMap.class; - case UNION: - throw new UnsupportedOperationException("UNION type not supported yet"); - default: - throw new IllegalArgumentException("Unknown type " + schema); - } - } - - private static Boolean readBoolean(long l) { - return l != 0; - } - - private static Byte readByte(long l) { - return (byte) l; - } - - private static Short readShort(long l) { - return (short) l; - } - - private static Integer readInt(long l) { - return (int) l; - } - - private static Long readLong(long l) { - return l; - } - - private static Float readFloat(double d) { - return (float) d; - } - - private static Double readDouble(double d) { - return d; - } - - private static Date readDate(long l) { - // day to milliseconds - final long t = l * MILLIS_PER_DAY; - // adjust by local timezone - return new java.sql.Date(t - LOCAL_TZ.getOffset(t)); - } - - private static String readString(byte[] bytes, int start, int length) { - return new String(bytes, start, length, StandardCharsets.UTF_8); - } - - private static byte[] readBinary(byte[] src, int srcPos, int length) { - byte[] result = new byte[length]; - System.arraycopy(src, srcPos, result, 0, length); - return result; - } - - private static BigDecimal readBigDecimal(HiveDecimalWritable hiveDecimalWritable) { - HiveDecimal hiveDecimal = hiveDecimalWritable.getHiveDecimal(); - return hiveDecimal.bigDecimalValue(); - } - - private static Timestamp readTimestamp(long time, int nanos) { - Timestamp ts = new Timestamp(time); - ts.setNanos(nanos); - return ts; - } - - private static HashMap readHashMap( - Object[] keyRows, Object[] valueRows, int offset, long length) { - HashMap resultMap = new HashMap<>(); - for (int j = 0; j < length; j++) { - resultMap.put(keyRows[offset], valueRows[offset]); - offset++; - } - return resultMap; - } - - @SuppressWarnings("unchecked") - private static Function getCopyFunction(TypeDescription schema) { - // check the type of the vector to decide how to read it. - switch (schema.getCategory()) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - case FLOAT: - case DOUBLE: - case CHAR: - case VARCHAR: - case STRING: - case DECIMAL: - return OrcBatchReader::returnImmutable; - case DATE: - return OrcBatchReader::copyDate; - case TIMESTAMP: - return OrcBatchReader::copyTimestamp; - case BINARY: - return OrcBatchReader::copyBinary; - case STRUCT: - List fieldTypes = schema.getChildren(); - Function[] copyFields = new Function[fieldTypes.size()]; - for (int i = 0; i < fieldTypes.size(); i++) { - copyFields[i] = getCopyFunction(fieldTypes.get(i)); - } - return new CopyStruct(copyFields); - case LIST: - TypeDescription entryType = schema.getChildren().get(0); - Function copyEntry = getCopyFunction(entryType); - Class entryClass = getClassForType(entryType); - return new CopyList(copyEntry, entryClass); - case MAP: - TypeDescription keyType = schema.getChildren().get(0); - TypeDescription valueType = schema.getChildren().get(1); - Function copyKey = getCopyFunction(keyType); - Function copyValue = getCopyFunction(valueType); - return new CopyMap(copyKey, copyValue); - case UNION: - throw new UnsupportedOperationException("UNION type not supported yet"); - default: - throw new IllegalArgumentException("Unknown type " + schema); - } - } - - private static Object returnImmutable(Object o) { - return o; - } - - private static Date copyDate(Object o) { - if (o == null) { - return null; - } else { - long date = ((Date) o).getTime(); - return new Date(date); - } - } - - private static Timestamp copyTimestamp(Object o) { - if (o == null) { - return null; - } else { - long millis = ((Timestamp) o).getTime(); - int nanos = ((Timestamp) o).getNanos(); - Timestamp copy = new Timestamp(millis); - copy.setNanos(nanos); - return copy; - } - } - - private static byte[] copyBinary(Object o) { - if (o == null) { - return null; - } else { - int length = ((byte[]) o).length; - return Arrays.copyOf((byte[]) o, length); - } - } - - private static class CopyStruct implements Function { - - private final Function[] copyFields; - - CopyStruct(Function[] copyFields) { - this.copyFields = copyFields; - } - - @Override - public Object apply(Object o) { - if (o == null) { - return null; - } else { - Row r = (Row) o; - Row copy = new Row(copyFields.length); - for (int i = 0; i < copyFields.length; i++) { - copy.setField(i, copyFields[i].apply(r.getField(i))); - } - return copy; - } - } - } - - private static class CopyList implements Function { - - private final Function copyEntry; - private final Class entryClass; - - CopyList(Function copyEntry, Class entryClass) { - this.copyEntry = copyEntry; - this.entryClass = entryClass; - } - - @Override - public Object apply(Object o) { - if (o == null) { - return null; - } else { - Object[] l = (Object[]) o; - Object[] copy = (Object[]) Array.newInstance(entryClass, l.length); - for (int i = 0; i < l.length; i++) { - copy[i] = copyEntry.apply(l[i]); - } - return copy; - } - } - } - - @SuppressWarnings("unchecked") - private static class CopyMap implements Function { - - private final Function copyKey; - private final Function copyValue; - - CopyMap(Function copyKey, Function copyValue) { - this.copyKey = copyKey; - this.copyValue = copyValue; - } - - @Override - public Object apply(Object o) { - if (o == null) { - return null; - } else { - Map m = (Map) o; - HashMap copy = new HashMap<>(m.size()); - - for (Map.Entry e : m.entrySet()) { - Object keyCopy = copyKey.apply(e.getKey()); - Object valueCopy = copyValue.apply(e.getValue()); - copy.put(keyCopy, valueCopy); - } - return copy; - } - } - } -} diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcInputFormat.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcInputFormat.java deleted file mode 100644 index 2c9cd04f8b707..0000000000000 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcInputFormat.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.io.FileInputFormat; -import org.apache.flink.core.fs.Path; -import org.apache.flink.orc.OrcFilters.ColumnPredicate; -import org.apache.flink.orc.OrcFilters.Not; -import org.apache.flink.orc.OrcFilters.Or; -import org.apache.flink.orc.OrcFilters.Predicate; - -import org.apache.hadoop.conf.Configuration; -import org.apache.orc.TypeDescription; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; - -/** InputFormat to read ORC files. */ -public abstract class OrcInputFormat extends FileInputFormat { - - // the number of fields rows to read in a batch - protected int batchSize; - // the configuration to read with - protected Configuration conf; - // the schema of the ORC files to read - protected TypeDescription schema; - - // the fields of the ORC schema that the returned Rows are composed of. - protected int[] selectedFields; - - protected ArrayList conjunctPredicates = new ArrayList<>(); - - protected transient OrcSplitReader reader; - - /** - * Creates an OrcInputFormat. - * - * @param path The path to read ORC files from. - * @param orcSchema The schema of the ORC files as ORC TypeDescription. - * @param orcConfig The configuration to read the ORC files with. - * @param batchSize The number of Row objects to read in a batch. - */ - public OrcInputFormat( - Path path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) { - super(path); - - // configure OrcInputFormat - this.schema = orcSchema; - this.conf = orcConfig; - this.batchSize = batchSize; - - // set default selection mask, i.e., all fields. - this.selectedFields = new int[this.schema.getChildren().size()]; - for (int i = 0; i < selectedFields.length; i++) { - this.selectedFields[i] = i; - } - } - - /** - * Selects the fields from the ORC schema that are returned by InputFormat. - * - * @param selectedFields The indices of the fields of the ORC schema that are returned by the - * InputFormat. - */ - public void selectFields(int... selectedFields) { - // set field mapping - this.selectedFields = selectedFields; - } - - /** - * Adds a filter predicate to reduce the number of rows to be returned by the input format. - * Multiple conjunctive predicates can be added by calling this method multiple times. - * - *

Note: Predicates can significantly reduce the amount of data that is read. However, the - * OrcInputFormat does not guarantee that all returned rows qualify the predicates. Moreover, - * predicates are only applied if the referenced field is among the selected fields. - * - * @param predicate The filter predicate. - */ - public void addPredicate(Predicate predicate) { - // validate - validatePredicate(predicate); - // add predicate - this.conjunctPredicates.add(predicate); - } - - private void validatePredicate(Predicate pred) { - if (pred instanceof ColumnPredicate) { - // check column name - String colName = ((ColumnPredicate) pred).columnName; - if (!this.schema.getFieldNames().contains(colName)) { - throw new IllegalArgumentException( - "Predicate cannot be applied. " - + "Column '" - + colName - + "' does not exist in ORC schema."); - } - } else if (pred instanceof Not) { - validatePredicate(((Not) pred).child()); - } else if (pred instanceof Or) { - for (Predicate p : ((Or) pred).children()) { - validatePredicate(p); - } - } - } - - @Override - public void close() throws IOException { - if (reader != null) { - this.reader.close(); - } - this.reader = null; - } - - @Override - public void closeInputFormat() throws IOException { - this.schema = null; - } - - @Override - public boolean reachedEnd() throws IOException { - return reader.reachedEnd(); - } - - @Override - public T nextRecord(T reuse) throws IOException { - return reader.nextRecord(reuse); - } - - @VisibleForTesting - OrcSplitReader getReader() { - return reader; - } - - // -------------------------------------------------------------------------------------------- - // Custom serialization methods - // -------------------------------------------------------------------------------------------- - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeInt(batchSize); - this.conf.write(out); - out.writeUTF(schema.toString()); - - out.writeInt(selectedFields.length); - for (int f : selectedFields) { - out.writeInt(f); - } - - out.writeInt(conjunctPredicates.size()); - for (Predicate p : conjunctPredicates) { - out.writeObject(p); - } - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - batchSize = in.readInt(); - Configuration configuration = new Configuration(); - configuration.readFields(in); - - if (this.conf == null) { - this.conf = configuration; - } - this.schema = TypeDescription.fromString(in.readUTF()); - - this.selectedFields = new int[in.readInt()]; - for (int i = 0; i < selectedFields.length; i++) { - this.selectedFields[i] = in.readInt(); - } - - this.conjunctPredicates = new ArrayList<>(); - int numPreds = in.readInt(); - for (int i = 0; i < numPreds; i++) { - conjunctPredicates.add((Predicate) in.readObject()); - } - } - - @Override - public boolean supportsMultiPaths() { - return true; - } - - // -------------------------------------------------------------------------------------------- - // Getter methods for tests - // -------------------------------------------------------------------------------------------- - - @VisibleForTesting - Configuration getConfiguration() { - return conf; - } - - @VisibleForTesting - int getBatchSize() { - return batchSize; - } - - @VisibleForTesting - String getSchema() { - return schema.toString(); - } -} diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java deleted file mode 100644 index 9b6505a462d90..0000000000000 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.types.Row; - -import org.apache.hadoop.conf.Configuration; -import org.apache.orc.TypeDescription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** InputFormat to read ORC files into {@link Row}. */ -public class OrcRowInputFormat extends OrcInputFormat implements ResultTypeQueryable { - - private static final Logger LOG = LoggerFactory.getLogger(OrcRowInputFormat.class); - - // the number of rows read in a batch - private static final int DEFAULT_BATCH_SIZE = 1000; - - // the type information of the Rows returned by this InputFormat. - private transient RowTypeInfo rowType; - - /** - * Creates an OrcRowInputFormat. - * - * @param path The path to read ORC files from. - * @param schemaString The schema of the ORC files as String. - * @param orcConfig The configuration to read the ORC files with. - */ - public OrcRowInputFormat(String path, String schemaString, Configuration orcConfig) { - this(path, TypeDescription.fromString(schemaString), orcConfig, DEFAULT_BATCH_SIZE); - } - - /** - * Creates an OrcRowInputFormat. - * - * @param path The path to read ORC files from. - * @param schemaString The schema of the ORC files as String. - * @param orcConfig The configuration to read the ORC files with. - * @param batchSize The number of Row objects to read in a batch. - */ - public OrcRowInputFormat( - String path, String schemaString, Configuration orcConfig, int batchSize) { - this(path, TypeDescription.fromString(schemaString), orcConfig, batchSize); - } - - /** - * Creates an OrcRowInputFormat. - * - * @param path The path to read ORC files from. - * @param orcSchema The schema of the ORC files as ORC TypeDescription. - * @param orcConfig The configuration to read the ORC files with. - * @param batchSize The number of Row objects to read in a batch. - */ - public OrcRowInputFormat( - String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) { - super(new Path(path), orcSchema, orcConfig, batchSize); - this.rowType = (RowTypeInfo) OrcBatchReader.schemaToTypeInfo(orcSchema); - } - - @Override - public void selectFields(int... selectedFields) { - super.selectFields(selectedFields); - // adapt result type - this.rowType = RowTypeInfo.projectFields(this.rowType, selectedFields); - } - - @Override - public void open(FileInputSplit fileSplit) throws IOException { - LOG.debug("Opening ORC file {}", fileSplit.getPath()); - this.reader = - new OrcRowSplitReader( - conf, - schema, - selectedFields, - conjunctPredicates, - batchSize, - fileSplit.getPath(), - fileSplit.getStart(), - fileSplit.getLength()); - } - - @Override - public TypeInformation getProducedType() { - return rowType; - } -} diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowSplitReader.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowSplitReader.java deleted file mode 100644 index 40320bd4d6ef5..0000000000000 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowSplitReader.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.core.fs.Path; -import org.apache.flink.orc.shim.OrcShim; -import org.apache.flink.types.Row; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.TypeDescription; - -import java.io.IOException; -import java.util.List; - -/** {@link OrcSplitReader} to read ORC files into {@link Row}. */ -public class OrcRowSplitReader extends OrcSplitReader { - - private final TypeDescription schema; - private final int[] selectedFields; - // the vector of rows that is read in a batch - private final Row[] rows; - - public OrcRowSplitReader( - Configuration conf, - TypeDescription schema, - int[] selectedFields, - List conjunctPredicates, - int batchSize, - Path path, - long splitStart, - long splitLength) - throws IOException { - super( - OrcShim.defaultShim(), - conf, - schema, - selectedFields, - conjunctPredicates, - batchSize, - path, - splitStart, - splitLength); - this.schema = schema; - this.selectedFields = selectedFields; - // create and initialize the row batch - this.rows = new Row[batchSize]; - for (int i = 0; i < batchSize; i++) { - rows[i] = new Row(selectedFields.length); - } - } - - @Override - protected int fillRows() { - return OrcBatchReader.fillRows(rows, schema, rowBatchWrapper.getBatch(), selectedFields); - } - - @Override - public Row nextRecord(Row reuse) { - // return the next row - return rows[this.nextRow++]; - } -} diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java deleted file mode 100644 index 9bf8d35c3ec84..0000000000000 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java +++ /dev/null @@ -1,576 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.orc.OrcFilters.Predicate; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.expressions.Attribute; -import org.apache.flink.table.expressions.BinaryComparison; -import org.apache.flink.table.expressions.EqualTo; -import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.GreaterThan; -import org.apache.flink.table.expressions.GreaterThanOrEqual; -import org.apache.flink.table.expressions.IsNotNull; -import org.apache.flink.table.expressions.IsNull; -import org.apache.flink.table.expressions.LessThan; -import org.apache.flink.table.expressions.LessThanOrEqual; -import org.apache.flink.table.expressions.Literal; -import org.apache.flink.table.expressions.Not; -import org.apache.flink.table.expressions.NotEqualTo; -import org.apache.flink.table.expressions.Or; -import org.apache.flink.table.expressions.UnaryExpression; -import org.apache.flink.table.sources.BatchTableSource; -import org.apache.flink.table.sources.FilterableTableSource; -import org.apache.flink.table.sources.ProjectableTableSource; -import org.apache.flink.table.sources.TableSource; -import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; -import org.apache.orc.TypeDescription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * A TableSource to read ORC files. - * - *

The {@link OrcTableSource} supports projection and filter push-down. - * - *

An {@link OrcTableSource} is used as shown in the example below. - * - *

{@code
- * OrcTableSource orcSrc = OrcTableSource.builder()
- *   .path("file:///my/data/file.orc")
- *   .forOrcSchema("struct")
- *   .build();
- *
- * tEnv.registerTableSourceInternal("orcTable", orcSrc);
- * Table res = tableEnv.sqlQuery("SELECT * FROM orcTable");
- * }
- */ -public class OrcTableSource - implements BatchTableSource, ProjectableTableSource, FilterableTableSource { - - private static final Logger LOG = LoggerFactory.getLogger(OrcTableSource.class); - - private static final int DEFAULT_BATCH_SIZE = 1000; - - // path to read ORC files from - private final String path; - // schema of the ORC file - private final TypeDescription orcSchema; - // the schema of the Table - private final TableSchema tableSchema; - // the configuration to read the file - private final Configuration orcConfig; - // the number of rows to read in a batch - private final int batchSize; - // flag whether a path is recursively enumerated - private final boolean recursiveEnumeration; - - // type information of the data returned by the InputFormat - private final RowTypeInfo typeInfo; - // list of selected ORC fields to return - private final int[] selectedFields; - // list of predicates to apply - private final Predicate[] predicates; - - /** - * Creates an OrcTableSouce from an ORC TypeDescription. - * - * @param path The path to read the ORC files from. - * @param orcSchema The schema of the ORC files as TypeDescription. - * @param orcConfig The configuration to read the ORC files. - * @param batchSize The number of Rows to read in a batch, default is 1000. - * @param recursiveEnumeration Flag whether the path should be recursively enumerated or not. - */ - private OrcTableSource( - String path, - TypeDescription orcSchema, - Configuration orcConfig, - int batchSize, - boolean recursiveEnumeration) { - this(path, orcSchema, orcConfig, batchSize, recursiveEnumeration, null, null); - } - - private OrcTableSource( - String path, - TypeDescription orcSchema, - Configuration orcConfig, - int batchSize, - boolean recursiveEnumeration, - int[] selectedFields, - Predicate[] predicates) { - - Preconditions.checkNotNull(path, "Path must not be null."); - Preconditions.checkNotNull(orcSchema, "OrcSchema must not be null."); - Preconditions.checkNotNull(path, "Configuration must not be null."); - Preconditions.checkArgument(batchSize > 0, "Batch size must be larger than null."); - this.path = path; - this.orcSchema = orcSchema; - this.orcConfig = orcConfig; - this.batchSize = batchSize; - this.recursiveEnumeration = recursiveEnumeration; - this.selectedFields = selectedFields; - this.predicates = predicates; - - // determine the type information from the ORC schema - RowTypeInfo typeInfoFromSchema = - (RowTypeInfo) OrcBatchReader.schemaToTypeInfo(this.orcSchema); - - // set return type info - if (selectedFields == null) { - this.typeInfo = typeInfoFromSchema; - } else { - this.typeInfo = RowTypeInfo.projectFields(typeInfoFromSchema, selectedFields); - } - - // create a TableSchema that corresponds to the ORC schema - this.tableSchema = - new TableSchema( - typeInfoFromSchema.getFieldNames(), typeInfoFromSchema.getFieldTypes()); - } - - @Override - public DataSet getDataSet(ExecutionEnvironment execEnv) { - OrcRowInputFormat orcIF = buildOrcInputFormat(); - orcIF.setNestedFileEnumeration(recursiveEnumeration); - if (selectedFields != null) { - orcIF.selectFields(selectedFields); - } - if (predicates != null) { - for (OrcFilters.Predicate pred : predicates) { - orcIF.addPredicate(pred); - } - } - return execEnv.createInput(orcIF).name(explainSource()); - } - - @VisibleForTesting - protected OrcRowInputFormat buildOrcInputFormat() { - return new OrcRowInputFormat(path, orcSchema, orcConfig, batchSize); - } - - @Override - public TypeInformation getReturnType() { - return typeInfo; - } - - @Override - public TableSchema getTableSchema() { - return this.tableSchema; - } - - @Override - public TableSource projectFields(int[] selectedFields) { - // create a copy of the OrcTableSouce with new selected fields - return new OrcTableSource( - path, - orcSchema, - orcConfig, - batchSize, - recursiveEnumeration, - selectedFields, - predicates); - } - - @Override - public TableSource applyPredicate(List predicates) { - ArrayList orcPredicates = new ArrayList<>(); - - // we do not remove any predicates from the list because ORC does not fully apply predicates - for (Expression pred : predicates) { - Predicate orcPred = toOrcPredicate(pred); - if (orcPred != null) { - LOG.info( - "Predicate [{}] converted into OrcPredicate [{}] and pushed into OrcTableSource for path {}.", - pred, - orcPred, - path); - orcPredicates.add(orcPred); - } else { - LOG.info( - "Predicate [{}] could not be pushed into OrcTableSource for path {}.", - pred, - path); - } - } - - return new OrcTableSource( - path, - orcSchema, - orcConfig, - batchSize, - recursiveEnumeration, - selectedFields, - orcPredicates.toArray(new Predicate[] {})); - } - - @Override - public boolean isFilterPushedDown() { - return this.predicates != null; - } - - @Override - public String explainSource() { - return "OrcFile[path=" - + path - + ", schema=" - + orcSchema - + ", filter=" - + predicateString() - + ", selectedFields=" - + Arrays.toString(selectedFields) - + "]"; - } - - private String predicateString() { - if (predicates == null) { - return "NULL"; - } else if (predicates.length == 0) { - return "TRUE"; - } else { - return "AND(" + Arrays.toString(predicates) + ")"; - } - } - - // Predicate conversion for filter push-down. - - private Predicate toOrcPredicate(Expression pred) { - if (pred instanceof Or) { - Predicate c1 = toOrcPredicate(((Or) pred).left()); - Predicate c2 = toOrcPredicate(((Or) pred).right()); - if (c1 == null || c2 == null) { - return null; - } else { - return new OrcFilters.Or(c1, c2); - } - } else if (pred instanceof Not) { - Predicate c = toOrcPredicate(((Not) pred).child()); - if (c == null) { - return null; - } else { - return new OrcFilters.Not(c); - } - } else if (pred instanceof BinaryComparison) { - - BinaryComparison binComp = (BinaryComparison) pred; - - if (!isValid(binComp)) { - // not a valid predicate - LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred); - return null; - } - PredicateLeaf.Type litType = getLiteralType(binComp); - if (litType == null) { - // unsupported literal type - LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred); - return null; - } - - boolean literalOnRight = literalOnRight(binComp); - String colName = getColumnName(binComp); - - // fetch literal and ensure it is serializable - Object literalObj = getLiteral(binComp); - Serializable literal; - // validate that literal is serializable - if (literalObj instanceof Serializable) { - literal = (Serializable) literalObj; - } else { - LOG.warn( - "Encountered a non-serializable literal of type {}. " - + "Cannot push predicate [{}] into OrcTableSource. " - + "This is a bug and should be reported.", - literalObj.getClass().getCanonicalName(), - pred); - return null; - } - - if (pred instanceof EqualTo) { - return new OrcFilters.Equals(colName, litType, literal); - } else if (pred instanceof NotEqualTo) { - return new OrcFilters.Not(new OrcFilters.Equals(colName, litType, literal)); - } else if (pred instanceof GreaterThan) { - if (literalOnRight) { - return new OrcFilters.Not( - new OrcFilters.LessThanEquals(colName, litType, literal)); - } else { - return new OrcFilters.LessThan(colName, litType, literal); - } - } else if (pred instanceof GreaterThanOrEqual) { - if (literalOnRight) { - return new OrcFilters.Not(new OrcFilters.LessThan(colName, litType, literal)); - } else { - return new OrcFilters.LessThanEquals(colName, litType, literal); - } - } else if (pred instanceof LessThan) { - if (literalOnRight) { - return new OrcFilters.LessThan(colName, litType, literal); - } else { - return new OrcFilters.Not( - new OrcFilters.LessThanEquals(colName, litType, literal)); - } - } else if (pred instanceof LessThanOrEqual) { - if (literalOnRight) { - return new OrcFilters.LessThanEquals(colName, litType, literal); - } else { - return new OrcFilters.Not(new OrcFilters.LessThan(colName, litType, literal)); - } - } else { - // unsupported predicate - LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred); - return null; - } - } else if (pred instanceof UnaryExpression) { - - UnaryExpression unary = (UnaryExpression) pred; - if (!isValid(unary)) { - // not a valid predicate - LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred); - return null; - } - PredicateLeaf.Type colType = toOrcType(((UnaryExpression) pred).child().resultType()); - if (colType == null) { - // unsupported type - LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred); - return null; - } - - String colName = getColumnName(unary); - - if (pred instanceof IsNull) { - return new OrcFilters.IsNull(colName, colType); - } else if (pred instanceof IsNotNull) { - return new OrcFilters.Not(new OrcFilters.IsNull(colName, colType)); - } else { - // unsupported predicate - LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred); - return null; - } - } else { - // unsupported predicate - LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred); - return null; - } - } - - private boolean isValid(UnaryExpression unary) { - return unary.child() instanceof Attribute; - } - - private boolean isValid(BinaryComparison comp) { - return (comp.left() instanceof Literal && comp.right() instanceof Attribute) - || (comp.left() instanceof Attribute && comp.right() instanceof Literal); - } - - private boolean literalOnRight(BinaryComparison comp) { - if (comp.left() instanceof Literal && comp.right() instanceof Attribute) { - return false; - } else if (comp.left() instanceof Attribute && comp.right() instanceof Literal) { - return true; - } else { - throw new RuntimeException("Invalid binary comparison."); - } - } - - private String getColumnName(UnaryExpression unary) { - return ((Attribute) unary.child()).name(); - } - - private String getColumnName(BinaryComparison comp) { - if (literalOnRight(comp)) { - return ((Attribute) comp.left()).name(); - } else { - return ((Attribute) comp.right()).name(); - } - } - - private PredicateLeaf.Type getLiteralType(BinaryComparison comp) { - if (literalOnRight(comp)) { - return toOrcType(((Literal) comp.right()).resultType()); - } else { - return toOrcType(((Literal) comp.left()).resultType()); - } - } - - private Object getLiteral(BinaryComparison comp) { - if (literalOnRight(comp)) { - return ((Literal) comp.right()).value(); - } else { - return ((Literal) comp.left()).value(); - } - } - - private PredicateLeaf.Type toOrcType(TypeInformation type) { - if (type == BasicTypeInfo.BYTE_TYPE_INFO - || type == BasicTypeInfo.SHORT_TYPE_INFO - || type == BasicTypeInfo.INT_TYPE_INFO - || type == BasicTypeInfo.LONG_TYPE_INFO) { - return PredicateLeaf.Type.LONG; - } else if (type == BasicTypeInfo.FLOAT_TYPE_INFO - || type == BasicTypeInfo.DOUBLE_TYPE_INFO) { - return PredicateLeaf.Type.FLOAT; - } else if (type == BasicTypeInfo.BOOLEAN_TYPE_INFO) { - return PredicateLeaf.Type.BOOLEAN; - } else if (type == BasicTypeInfo.STRING_TYPE_INFO) { - return PredicateLeaf.Type.STRING; - } else if (type == SqlTimeTypeInfo.TIMESTAMP) { - return PredicateLeaf.Type.TIMESTAMP; - } else if (type == SqlTimeTypeInfo.DATE) { - return PredicateLeaf.Type.DATE; - } else if (type == BasicTypeInfo.BIG_DEC_TYPE_INFO) { - return PredicateLeaf.Type.DECIMAL; - } else { - // unsupported type - return null; - } - } - - // Builder - - public static Builder builder() { - return new Builder(); - } - - /** Constructs an {@link OrcTableSource}. */ - public static class Builder { - - private String path; - - private TypeDescription schema; - - private Configuration config; - - private int batchSize = 0; - - private boolean recursive = true; - - /** - * Sets the path of the ORC file(s). If the path specifies a directory, it will be - * recursively enumerated. - * - * @param path The path of the ORC file(s). - * @return The builder. - */ - public Builder path(String path) { - Preconditions.checkNotNull(path, "Path must not be null."); - Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty."); - this.path = path; - return this; - } - - /** - * Sets the path of the ORC file(s). - * - * @param path The path of the ORC file(s). - * @param recursive Flag whether the to enumerate - * @return The builder. - */ - public Builder path(String path, boolean recursive) { - Preconditions.checkNotNull(path, "Path must not be null."); - Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty."); - this.path = path; - this.recursive = recursive; - return this; - } - - /** - * Sets the ORC schema of the files to read as a String. - * - * @param orcSchema The ORC schema of the files to read as a String. - * @return The builder. - */ - public Builder forOrcSchema(String orcSchema) { - Preconditions.checkNotNull(orcSchema, "ORC schema must not be null."); - this.schema = TypeDescription.fromString(orcSchema); - return this; - } - - /** - * Sets the ORC schema of the files to read as a {@link TypeDescription}. - * - * @param orcSchema The ORC schema of the files to read as a String. - * @return The builder. - */ - public Builder forOrcSchema(TypeDescription orcSchema) { - Preconditions.checkNotNull(orcSchema, "ORC Schema must not be null."); - this.schema = orcSchema; - return this; - } - - /** - * Sets a Hadoop {@link Configuration} for the ORC reader. If no configuration is - * configured, an empty configuration is used. - * - * @param config The Hadoop Configuration for the ORC reader. - * @return The builder. - */ - public Builder withConfiguration(Configuration config) { - Preconditions.checkNotNull(config, "Configuration must not be null."); - this.config = config; - return this; - } - - /** - * Sets the number of rows that are read in a batch. If not configured, the ORC files are - * read with a batch size of 1000. - * - * @param batchSize The number of rows that are read in a batch. - * @return The builder. - */ - public Builder withBatchSize(int batchSize) { - Preconditions.checkArgument(batchSize > 0, "Batch size must be greater than zero."); - this.batchSize = batchSize; - return this; - } - - /** - * Builds the OrcTableSource for this builder. - * - * @return The OrcTableSource for this builder. - */ - public OrcTableSource build() { - Preconditions.checkNotNull(this.path, "Path must not be null."); - Preconditions.checkNotNull(this.schema, "ORC schema must not be null."); - if (this.config == null) { - this.config = new Configuration(); - } - if (this.batchSize == 0) { - // set default batch size - this.batchSize = DEFAULT_BATCH_SIZE; - } - return new OrcTableSource( - this.path, this.schema, this.config, this.batchSize, this.recursive); - } - } -} diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java deleted file mode 100644 index 4752a86966dc5..0000000000000 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; - -import org.apache.orc.TypeDescription; -import org.junit.Assert; -import org.junit.Test; - -/** Unit tests for {@link OrcBatchReader}. */ -public class OrcBatchReaderTest { - - @Test - public void testFlatSchemaToTypeInfo1() { - - String schema = - "struct<" - + "boolean1:boolean," - + "byte1:tinyint," - + "short1:smallint," - + "int1:int," - + "long1:bigint," - + "float1:float," - + "double1:double," - + "bytes1:binary," - + "string1:string," - + "date1:date," - + "timestamp1:timestamp," - + "decimal1:decimal(5,2)" - + ">"; - TypeInformation typeInfo = - OrcBatchReader.schemaToTypeInfo(TypeDescription.fromString(schema)); - - Assert.assertNotNull(typeInfo); - Assert.assertTrue(typeInfo instanceof RowTypeInfo); - RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo; - - // validate field types - Assert.assertArrayEquals( - new TypeInformation[] { - Types.BOOLEAN, - Types.BYTE, - Types.SHORT, - Types.INT, - Types.LONG, - Types.FLOAT, - Types.DOUBLE, - PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, - Types.STRING, - Types.SQL_DATE, - Types.SQL_TIMESTAMP, - BasicTypeInfo.BIG_DEC_TYPE_INFO - }, - rowTypeInfo.getFieldTypes()); - - // validate field names - Assert.assertArrayEquals( - new String[] { - "boolean1", - "byte1", - "short1", - "int1", - "long1", - "float1", - "double1", - "bytes1", - "string1", - "date1", - "timestamp1", - "decimal1" - }, - rowTypeInfo.getFieldNames()); - } - - @Test - public void testNestedSchemaToTypeInfo1() { - - String schema = - "struct<" - + "middle:struct<" - + "list:array<" - + "struct<" - + "int1:int," - + "string1:string" - + ">" - + ">" - + ">," - + "list:array<" - + "struct<" - + "int1:int," - + "string1:string" - + ">" - + ">," - + "map:map<" - + "string," - + "struct<" - + "int1:int," - + "string1:string" - + ">" - + ">" - + ">"; - TypeInformation typeInfo = - OrcBatchReader.schemaToTypeInfo(TypeDescription.fromString(schema)); - - Assert.assertNotNull(typeInfo); - Assert.assertTrue(typeInfo instanceof RowTypeInfo); - RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo; - - // validate field types - Assert.assertArrayEquals( - new TypeInformation[] { - Types.ROW_NAMED( - new String[] {"list"}, - ObjectArrayTypeInfo.getInfoFor( - Types.ROW_NAMED( - new String[] {"int1", "string1"}, - Types.INT, - Types.STRING))), - ObjectArrayTypeInfo.getInfoFor( - Types.ROW_NAMED( - new String[] {"int1", "string1"}, Types.INT, Types.STRING)), - new MapTypeInfo<>( - Types.STRING, - Types.ROW_NAMED( - new String[] {"int1", "string1"}, Types.INT, Types.STRING)) - }, - rowTypeInfo.getFieldTypes()); - - // validate field names - Assert.assertArrayEquals( - new String[] {"middle", "list", "map"}, rowTypeInfo.getFieldNames()); - } -} diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java deleted file mode 100644 index 681af7522148d..0000000000000 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java +++ /dev/null @@ -1,1103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.types.Row; -import org.apache.flink.util.InstantiationUtil; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; -import org.apache.orc.RecordReader; -import org.apache.orc.StripeInformation; -import org.apache.orc.impl.RecordReaderImpl; -import org.apache.orc.impl.SchemaEvolution; -import org.junit.After; -import org.junit.Test; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import static org.apache.commons.lang3.reflect.FieldUtils.readDeclaredField; -import static org.apache.flink.orc.shim.OrcShimV200.getOffsetAndLengthForSplit; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -/** Unit tests for {@link OrcRowInputFormat}. */ -public class OrcRowInputFormatTest { - - private OrcRowInputFormat rowOrcInputFormat; - - @After - public void tearDown() throws IOException { - if (rowOrcInputFormat != null) { - rowOrcInputFormat.close(); - rowOrcInputFormat.closeInputFormat(); - } - rowOrcInputFormat = null; - } - - private static final String TEST_FILE_FLAT = "test-data-flat.orc"; - private static final String TEST_SCHEMA_FLAT = - "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>"; - - private static final String TEST_FILE_NESTED = "test-data-nested.orc"; - private static final String TEST_SCHEMA_NESTED = - "struct<" - + "boolean1:boolean," - + "byte1:tinyint," - + "short1:smallint," - + "int1:int," - + "long1:bigint," - + "float1:float," - + "double1:double," - + "bytes1:binary," - + "string1:string," - + "middle:struct<" - + "list:array<" - + "struct<" - + "int1:int," - + "string1:string" - + ">" - + ">" - + ">," - + "list:array<" - + "struct<" - + "int1:int," - + "string1:string" - + ">" - + ">," - + "map:map<" - + "string," - + "struct<" - + "int1:int," - + "string1:string" - + ">" - + ">" - + ">"; - - private static final String TEST_FILE_TIMETYPES = "test-data-timetypes.orc"; - private static final String TEST_SCHEMA_TIMETYPES = "struct"; - - private static final String TEST_FILE_DECIMAL = "test-data-decimal.orc"; - private static final String TEST_SCHEMA_DECIMAL = "struct<_col0:decimal(10,5)>"; - - private static final String TEST_FILE_NESTEDLIST = "test-data-nestedlist.orc"; - private static final String TEST_SCHEMA_NESTEDLIST = - "struct>>>"; - - /** Generated by {@code OrcTestFileGenerator#writeCompositeTypesWithNullsFile(String)}. */ - private static final String TEST_FILE_COMPOSITES_NULLS = "test-data-composites-with-nulls.orc"; - - private static final String TEST_SCHEMA_COMPOSITES_NULLS = - "struct<" - + "int1:int," - + "record1:struct," - + "list1:array>>>," - + "list2:array>" - + ">"; - - /** Generated by {@code OrcTestFileGenerator#writeCompositeTypesWithRepeatingFile(String)}. */ - private static final String TEST_FILE_REPEATING = "test-data-repeating.orc"; - - private static final String TEST_SCHEMA_REPEATING = - "struct<" - + "int1:int," - + "int2:int," - + "int3:int," - + "record1:struct," - + "record2:struct," - + "list1:array," - + "list2:array," - + "list3:array," - + "map1:map," - + "map2:map" - + ">"; - - @Test(expected = FileNotFoundException.class) - public void testInvalidPath() throws IOException { - rowOrcInputFormat = - new OrcRowInputFormat("/does/not/exist", TEST_SCHEMA_FLAT, new Configuration()); - rowOrcInputFormat.openInputFormat(); - FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1); - rowOrcInputFormat.open(inputSplits[0]); - } - - @Test(expected = IndexOutOfBoundsException.class) - public void testInvalidProjection1() throws IOException { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); - int[] projectionMask = {1, 2, 3, -1}; - rowOrcInputFormat.selectFields(projectionMask); - } - - @Test(expected = IndexOutOfBoundsException.class) - public void testInvalidProjection2() throws IOException { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); - int[] projectionMask = {1, 2, 3, 9}; - rowOrcInputFormat.selectFields(projectionMask); - } - - @Test - public void testProjectionMaskNested() throws Exception { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); - - OrcRowInputFormat spy = spy(rowOrcInputFormat); - - spy.selectFields(9, 11, 2); - spy.openInputFormat(); - FileInputSplit[] splits = spy.createInputSplits(1); - spy.open(splits[0]); - - // top-level struct is false - boolean[] expected = - new boolean[] { - false, // top level - false, false, // flat fields 0, 1 are out - true, // flat field 2 is in - false, false, false, false, false, - false, // flat fields 3, 4, 5, 6, 7, 8 are out - true, true, true, true, true, // nested field 9 is in - false, false, false, false, // nested field 10 is out - true, true, true, true, true - }; // nested field 11 is in - assertArrayEquals(expected, getInclude(spy.getReader().getRecordReader())); - } - - private static boolean[] getInclude(RecordReader reader) throws IllegalAccessException { - SchemaEvolution evolution = (SchemaEvolution) readDeclaredField(reader, "evolution", true); - return evolution.getReaderIncluded(); - } - - @Test - public void testSplitStripesGivenSplits() throws Exception { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); - - OrcRowInputFormat spy = spy(rowOrcInputFormat); - - FileInputSplit[] splits = spy.createInputSplits(3); - - spy.openInputFormat(); - spy.open(splits[0]); - assertOffsetAndLen(spy.getReader(), 3L, 137005L); - spy.open(splits[1]); - assertOffsetAndLen(spy.getReader(), 137008L, 136182L); - spy.open(splits[2]); - assertOffsetAndLen(spy.getReader(), 273190L, 123633L); - } - - @SuppressWarnings("unchecked") - private static List getStripes(RecordReader reader) - throws IllegalAccessException { - return (List) readDeclaredField(reader, "stripes", true); - } - - private static void assertOffsetAndLen(OrcSplitReader reader, long offset, long length) - throws IllegalAccessException { - List stripes = getStripes(reader.getRecordReader()); - long min = Long.MAX_VALUE; - long max = Long.MIN_VALUE; - for (StripeInformation stripe : stripes) { - if (stripe.getOffset() < min) { - min = stripe.getOffset(); - } - if (stripe.getOffset() + stripe.getLength() > max) { - max = stripe.getOffset() + stripe.getLength(); - } - } - - assertEquals(offset, min); - assertEquals(length, max - min); - } - - @Test - public void testSplitStripesCustomSplits() throws IOException { - // mock list of stripes - List stripes = new ArrayList<>(); - StripeInformation stripe1 = mock(StripeInformation.class); - when(stripe1.getOffset()).thenReturn(10L); - when(stripe1.getLength()).thenReturn(90L); - StripeInformation stripe2 = mock(StripeInformation.class); - when(stripe2.getOffset()).thenReturn(100L); - when(stripe2.getLength()).thenReturn(100L); - StripeInformation stripe3 = mock(StripeInformation.class); - when(stripe3.getOffset()).thenReturn(200L); - when(stripe3.getLength()).thenReturn(100L); - StripeInformation stripe4 = mock(StripeInformation.class); - when(stripe4.getOffset()).thenReturn(300L); - when(stripe4.getLength()).thenReturn(100L); - StripeInformation stripe5 = mock(StripeInformation.class); - when(stripe5.getOffset()).thenReturn(400L); - when(stripe5.getLength()).thenReturn(100L); - stripes.add(stripe1); - stripes.add(stripe2); - stripes.add(stripe3); - stripes.add(stripe4); - stripes.add(stripe5); - - // split ranging 2 stripes - assertEquals(new Tuple2<>(10L, 190L), getOffsetAndLengthForSplit(0, 150, stripes)); - - // split ranging 0 stripes - assertEquals(new Tuple2<>(0L, 0L), getOffsetAndLengthForSplit(150, 10, stripes)); - - // split ranging 1 stripe - assertEquals(new Tuple2<>(200L, 100L), getOffsetAndLengthForSplit(160, 41, stripes)); - - // split ranging 2 stripe - assertEquals(new Tuple2<>(300L, 200L), getOffsetAndLengthForSplit(201, 299, stripes)); - } - - @Test - public void testProducedType() throws IOException { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); - - assertTrue(rowOrcInputFormat.getProducedType() instanceof RowTypeInfo); - RowTypeInfo producedType = (RowTypeInfo) rowOrcInputFormat.getProducedType(); - - assertArrayEquals( - new TypeInformation[] { - // primitives - Types.BOOLEAN, - Types.BYTE, - Types.SHORT, - Types.INT, - Types.LONG, - Types.FLOAT, - Types.DOUBLE, - // binary - PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, - // string - Types.STRING, - // struct - Types.ROW_NAMED( - new String[] {"list"}, - ObjectArrayTypeInfo.getInfoFor( - Types.ROW_NAMED( - new String[] {"int1", "string1"}, - Types.INT, - Types.STRING))), - // list - ObjectArrayTypeInfo.getInfoFor( - Types.ROW_NAMED( - new String[] {"int1", "string1"}, Types.INT, Types.STRING)), - // map - new MapTypeInfo<>( - Types.STRING, - Types.ROW_NAMED( - new String[] {"int1", "string1"}, Types.INT, Types.STRING)) - }, - producedType.getFieldTypes()); - assertArrayEquals( - new String[] { - "boolean1", - "byte1", - "short1", - "int1", - "long1", - "float1", - "double1", - "bytes1", - "string1", - "middle", - "list", - "map" - }, - producedType.getFieldNames()); - } - - @Test - public void testProducedTypeWithProjection() throws IOException { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); - - rowOrcInputFormat.selectFields(9, 3, 7, 10); - - assertTrue(rowOrcInputFormat.getProducedType() instanceof RowTypeInfo); - RowTypeInfo producedType = (RowTypeInfo) rowOrcInputFormat.getProducedType(); - - assertArrayEquals( - new TypeInformation[] { - // struct - Types.ROW_NAMED( - new String[] {"list"}, - ObjectArrayTypeInfo.getInfoFor( - Types.ROW_NAMED( - new String[] {"int1", "string1"}, - Types.INT, - Types.STRING))), - // int - Types.INT, - // binary - PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, - // list - ObjectArrayTypeInfo.getInfoFor( - Types.ROW_NAMED( - new String[] {"int1", "string1"}, Types.INT, Types.STRING)) - }, - producedType.getFieldTypes()); - assertArrayEquals( - new String[] {"middle", "int1", "bytes1", "list"}, producedType.getFieldNames()); - } - - @Test - public void testSerialization() throws Exception { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); - - rowOrcInputFormat.selectFields(0, 4, 1); - rowOrcInputFormat.addPredicate( - new OrcFilters.Equals("_col1", PredicateLeaf.Type.STRING, "M")); - - byte[] bytes = InstantiationUtil.serializeObject(rowOrcInputFormat); - OrcRowInputFormat copy = - InstantiationUtil.deserializeObject(bytes, getClass().getClassLoader()); - - FileInputSplit[] splits = copy.createInputSplits(1); - copy.openInputFormat(); - copy.open(splits[0]); - assertFalse(copy.reachedEnd()); - Row row = copy.nextRecord(null); - - assertNotNull(row); - assertEquals(3, row.getArity()); - // check first row - assertEquals(1, row.getField(0)); - assertEquals(500, row.getField(1)); - assertEquals("M", row.getField(2)); - } - - @Test - public void testNumericBooleanStringPredicates() throws Exception { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); - - rowOrcInputFormat.selectFields(0, 1, 2, 3, 4, 5, 6, 8); - - // boolean pred - rowOrcInputFormat.addPredicate( - new OrcFilters.Equals("boolean1", PredicateLeaf.Type.BOOLEAN, false)); - // boolean pred - rowOrcInputFormat.addPredicate( - new OrcFilters.LessThan("byte1", PredicateLeaf.Type.LONG, 1)); - // boolean pred - rowOrcInputFormat.addPredicate( - new OrcFilters.LessThanEquals("short1", PredicateLeaf.Type.LONG, 1024)); - // boolean pred - rowOrcInputFormat.addPredicate( - new OrcFilters.Between("int1", PredicateLeaf.Type.LONG, -1, 65536)); - // boolean pred - rowOrcInputFormat.addPredicate( - new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 9223372036854775807L)); - // boolean pred - rowOrcInputFormat.addPredicate( - new OrcFilters.Equals("float1", PredicateLeaf.Type.FLOAT, 1.0)); - // boolean pred - rowOrcInputFormat.addPredicate( - new OrcFilters.Equals("double1", PredicateLeaf.Type.FLOAT, -15.0)); - // boolean pred - rowOrcInputFormat.addPredicate(new OrcFilters.IsNull("string1", PredicateLeaf.Type.STRING)); - // boolean pred - rowOrcInputFormat.addPredicate( - new OrcFilters.Equals("string1", PredicateLeaf.Type.STRING, "hello")); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); - rowOrcInputFormat.openInputFormat(); - - // mock options to check configuration of ORC reader - OrcRowInputFormat spy = spy(rowOrcInputFormat); - - spy.openInputFormat(); - spy.open(splits[0]); - - // verify predicate configuration - SearchArgument sarg = getSearchArgument(spy.getReader().getRecordReader()); - assertNotNull(sarg); - assertEquals( - "(and leaf-0 leaf-1 leaf-2 leaf-3 leaf-4 leaf-5 leaf-6 leaf-7 leaf-8)", - sarg.getExpression().toString()); - assertEquals(9, sarg.getLeaves().size()); - List leaves = sarg.getLeaves(); - assertEquals("(EQUALS boolean1 false)", leaves.get(0).toString()); - assertEquals("(LESS_THAN byte1 1)", leaves.get(1).toString()); - assertEquals("(LESS_THAN_EQUALS short1 1024)", leaves.get(2).toString()); - assertEquals("(BETWEEN int1 -1 65536)", leaves.get(3).toString()); - assertEquals("(EQUALS long1 9223372036854775807)", leaves.get(4).toString()); - assertEquals("(EQUALS float1 1.0)", leaves.get(5).toString()); - assertEquals("(EQUALS double1 -15.0)", leaves.get(6).toString()); - assertEquals("(IS_NULL string1)", leaves.get(7).toString()); - assertEquals("(EQUALS string1 hello)", leaves.get(8).toString()); - } - - private static SearchArgument getSearchArgument(RecordReader reader) - throws IllegalAccessException { - RecordReaderImpl.SargApplier applier = - (RecordReaderImpl.SargApplier) readDeclaredField(reader, "sargApp", true); - return (SearchArgument) readDeclaredField(applier, "sarg", true); - } - - @Test - public void testTimePredicates() throws Exception { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new Configuration()); - - rowOrcInputFormat.addPredicate( - // OR - new OrcFilters.Or( - // timestamp pred - new OrcFilters.Equals( - "time", - PredicateLeaf.Type.TIMESTAMP, - Timestamp.valueOf("1900-05-05 12:34:56.100")), - // date pred - new OrcFilters.Equals( - "date", PredicateLeaf.Type.DATE, Date.valueOf("1900-12-25")))); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); - rowOrcInputFormat.openInputFormat(); - - // mock options to check configuration of ORC reader - OrcRowInputFormat spy = spy(rowOrcInputFormat); - - spy.openInputFormat(); - spy.open(splits[0]); - - // verify predicate configuration - SearchArgument sarg = getSearchArgument(spy.getReader().getRecordReader()); - assertNotNull(sarg); - assertEquals("(or leaf-0 leaf-1)", sarg.getExpression().toString()); - assertEquals(2, sarg.getLeaves().size()); - List leaves = sarg.getLeaves(); - assertEquals("(EQUALS time 1900-05-05 12:34:56.1)", leaves.get(0).toString()); - assertEquals("(EQUALS date 1900-12-25)", leaves.get(1).toString()); - } - - @Test - public void testDecimalPredicate() throws Exception { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new Configuration()); - - rowOrcInputFormat.addPredicate( - new OrcFilters.Not( - // decimal pred - new OrcFilters.Equals( - "_col0", PredicateLeaf.Type.DECIMAL, BigDecimal.valueOf(-1000.5)))); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); - rowOrcInputFormat.openInputFormat(); - - // mock options to check configuration of ORC reader - OrcRowInputFormat spy = spy(rowOrcInputFormat); - - spy.openInputFormat(); - spy.open(splits[0]); - - // verify predicate configuration - SearchArgument sarg = getSearchArgument(spy.getReader().getRecordReader()); - assertNotNull(sarg); - assertEquals("(not leaf-0)", sarg.getExpression().toString()); - assertEquals(1, sarg.getLeaves().size()); - List leaves = sarg.getLeaves(); - assertEquals("(EQUALS _col0 -1000.5)", leaves.get(0).toString()); - } - - @Test(expected = IllegalArgumentException.class) - public void testPredicateWithInvalidColumn() throws Exception { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); - - rowOrcInputFormat.addPredicate( - new OrcFilters.Equals("unknown", PredicateLeaf.Type.LONG, 42)); - } - - @Test - public void testReadNestedFile() throws IOException { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); - assertEquals(1, splits.length); - rowOrcInputFormat.openInputFormat(); - rowOrcInputFormat.open(splits[0]); - - assertFalse(rowOrcInputFormat.reachedEnd()); - Row row = rowOrcInputFormat.nextRecord(null); - - // validate first row - assertNotNull(row); - assertEquals(12, row.getArity()); - assertEquals(false, row.getField(0)); - assertEquals((byte) 1, row.getField(1)); - assertEquals((short) 1024, row.getField(2)); - assertEquals(65536, row.getField(3)); - assertEquals(9223372036854775807L, row.getField(4)); - assertEquals(1.0f, row.getField(5)); - assertEquals(-15.0d, row.getField(6)); - assertArrayEquals(new byte[] {0, 1, 2, 3, 4}, (byte[]) row.getField(7)); - assertEquals("hi", row.getField(8)); - // check nested field - assertTrue(row.getField(9) instanceof Row); - Row nested1 = (Row) row.getField(9); - assertEquals(1, nested1.getArity()); - assertTrue(nested1.getField(0) instanceof Object[]); - Object[] nestedList1 = (Object[]) nested1.getField(0); - assertEquals(2, nestedList1.length); - assertEquals(Row.of(1, "bye"), nestedList1[0]); - assertEquals(Row.of(2, "sigh"), nestedList1[1]); - // check list - assertTrue(row.getField(10) instanceof Object[]); - Object[] list1 = (Object[]) row.getField(10); - assertEquals(2, list1.length); - assertEquals(Row.of(3, "good"), list1[0]); - assertEquals(Row.of(4, "bad"), list1[1]); - // check map - assertTrue(row.getField(11) instanceof HashMap); - HashMap map1 = (HashMap) row.getField(11); - assertEquals(0, map1.size()); - - // read second row - assertFalse(rowOrcInputFormat.reachedEnd()); - row = rowOrcInputFormat.nextRecord(null); - - // validate second row - assertNotNull(row); - assertEquals(12, row.getArity()); - assertEquals(true, row.getField(0)); - assertEquals((byte) 100, row.getField(1)); - assertEquals((short) 2048, row.getField(2)); - assertEquals(65536, row.getField(3)); - assertEquals(9223372036854775807L, row.getField(4)); - assertEquals(2.0f, row.getField(5)); - assertEquals(-5.0d, row.getField(6)); - assertArrayEquals(new byte[] {}, (byte[]) row.getField(7)); - assertEquals("bye", row.getField(8)); - // check nested field - assertTrue(row.getField(9) instanceof Row); - Row nested2 = (Row) row.getField(9); - assertEquals(1, nested2.getArity()); - assertTrue(nested2.getField(0) instanceof Object[]); - Object[] nestedList2 = (Object[]) nested2.getField(0); - assertEquals(2, nestedList2.length); - assertEquals(Row.of(1, "bye"), nestedList2[0]); - assertEquals(Row.of(2, "sigh"), nestedList2[1]); - // check list - assertTrue(row.getField(10) instanceof Object[]); - Object[] list2 = (Object[]) row.getField(10); - assertEquals(3, list2.length); - assertEquals(Row.of(100000000, "cat"), list2[0]); - assertEquals(Row.of(-100000, "in"), list2[1]); - assertEquals(Row.of(1234, "hat"), list2[2]); - // check map - assertTrue(row.getField(11) instanceof HashMap); - HashMap map = (HashMap) row.getField(11); - assertEquals(2, map.size()); - assertEquals(Row.of(5, "chani"), map.get("chani")); - assertEquals(Row.of(1, "mauddib"), map.get("mauddib")); - - assertTrue(rowOrcInputFormat.reachedEnd()); - } - - @Test - public void testReadTimeTypeFile() throws IOException { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new Configuration()); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); - assertEquals(1, splits.length); - rowOrcInputFormat.openInputFormat(); - rowOrcInputFormat.open(splits[0]); - - assertFalse(rowOrcInputFormat.reachedEnd()); - Row row = rowOrcInputFormat.nextRecord(null); - - // validate first row - assertNotNull(row); - assertEquals(2, row.getArity()); - assertEquals(Timestamp.valueOf("1900-05-05 12:34:56.1"), row.getField(0)); - assertEquals(Date.valueOf("1900-12-25"), row.getField(1)); - - // check correct number of rows - long cnt = 1; - while (!rowOrcInputFormat.reachedEnd()) { - assertNotNull(rowOrcInputFormat.nextRecord(null)); - cnt++; - } - assertEquals(70000, cnt); - } - - @Test - public void testReadDecimalTypeFile() throws IOException { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new Configuration()); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); - assertEquals(1, splits.length); - rowOrcInputFormat.openInputFormat(); - rowOrcInputFormat.open(splits[0]); - - assertFalse(rowOrcInputFormat.reachedEnd()); - Row row = rowOrcInputFormat.nextRecord(null); - - // validate first row - assertNotNull(row); - assertEquals(1, row.getArity()); - assertEquals(BigDecimal.valueOf(-1000.5d), row.getField(0)); - - // check correct number of rows - long cnt = 1; - while (!rowOrcInputFormat.reachedEnd()) { - assertNotNull(rowOrcInputFormat.nextRecord(null)); - cnt++; - } - assertEquals(6000, cnt); - } - - @Test - public void testReadNestedListFile() throws Exception { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_NESTEDLIST), TEST_SCHEMA_NESTEDLIST, new Configuration()); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); - assertEquals(1, splits.length); - rowOrcInputFormat.openInputFormat(); - rowOrcInputFormat.open(splits[0]); - - assertFalse(rowOrcInputFormat.reachedEnd()); - - Row row = null; - long cnt = 0; - - // read all rows - while (!rowOrcInputFormat.reachedEnd()) { - - row = rowOrcInputFormat.nextRecord(row); - assertEquals(1, row.getArity()); - - // outer list - Object[] list = (Object[]) row.getField(0); - assertEquals(1, list.length); - - // nested list of rows - Row[] nestedRows = (Row[]) list[0]; - assertEquals(1, nestedRows.length); - assertEquals(1, nestedRows[0].getArity()); - - // verify list value - assertEquals(cnt, nestedRows[0].getField(0)); - cnt++; - } - // number of rows in file - assertEquals(100, cnt); - } - - @Test - public void testReadCompositesNullsFile() throws Exception { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_COMPOSITES_NULLS), - TEST_SCHEMA_COMPOSITES_NULLS, - new Configuration()); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); - assertEquals(1, splits.length); - rowOrcInputFormat.openInputFormat(); - rowOrcInputFormat.open(splits[0]); - - assertFalse(rowOrcInputFormat.reachedEnd()); - - Row row = null; - long cnt = 0; - - int structNullCnt = 0; - int nestedListNullCnt = 0; - int mapListNullCnt = 0; - - // read all rows - while (!rowOrcInputFormat.reachedEnd()) { - - row = rowOrcInputFormat.nextRecord(row); - assertEquals(4, row.getArity()); - - assertTrue(row.getField(0) instanceof Integer); - - if (row.getField(1) == null) { - structNullCnt++; - } else { - Object f = row.getField(1); - assertTrue(f instanceof Row); - assertEquals(2, ((Row) f).getArity()); - } - - if (row.getField(2) == null) { - nestedListNullCnt++; - } else { - Object f = row.getField(2); - assertTrue(f instanceof Row[][][]); - assertEquals(4, ((Row[][][]) f).length); - } - - if (row.getField(3) == null) { - mapListNullCnt++; - } else { - Object f = row.getField(3); - assertTrue(f instanceof HashMap[]); - assertEquals(3, ((HashMap[]) f).length); - } - cnt++; - } - // number of rows in file - assertEquals(2500, cnt); - // check number of null fields - assertEquals(1250, structNullCnt); - assertEquals(835, nestedListNullCnt); - assertEquals(835, mapListNullCnt); - } - - @SuppressWarnings("unchecked") - @Test - public void testReadRepeatingValuesFile() throws IOException { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_REPEATING), TEST_SCHEMA_REPEATING, new Configuration()); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); - assertEquals(1, splits.length); - rowOrcInputFormat.openInputFormat(); - rowOrcInputFormat.open(splits[0]); - - assertFalse(rowOrcInputFormat.reachedEnd()); - - Row row = null; - long cnt = 0; - - Row firstRow1 = null; - Integer[] firstList1 = null; - HashMap firstMap1 = null; - - // read all rows - while (!rowOrcInputFormat.reachedEnd()) { - - cnt++; - row = rowOrcInputFormat.nextRecord(row); - assertEquals(10, row.getArity()); - - // check first int field (always 42) - assertNotNull(row.getField(0)); - assertTrue(row.getField(0) instanceof Integer); - assertEquals(42, ((Integer) row.getField(0)).intValue()); - - // check second int field (always null) - assertNull(row.getField(1)); - - // check first int field (always 99) - assertNotNull(row.getField(2)); - assertTrue(row.getField(2) instanceof Integer); - assertEquals(99, ((Integer) row.getField(2)).intValue()); - - // check first row field (always (23, null)) - assertNotNull(row.getField(3)); - assertTrue(row.getField(3) instanceof Row); - Row nestedRow = (Row) row.getField(3); - // check first field of nested row - assertNotNull(nestedRow.getField(0)); - assertTrue(nestedRow.getField(0) instanceof Integer); - assertEquals(23, ((Integer) nestedRow.getField(0)).intValue()); - // check second field of nested row - assertNull(nestedRow.getField(1)); - // validate reference - if (firstRow1 == null) { - firstRow1 = nestedRow; - } else { - // repeated rows must be different instances - assertTrue(firstRow1 != nestedRow); - } - - // check second row field (always null) - assertNull(row.getField(4)); - - // check first list field (always [1, 2, 3]) - assertNotNull(row.getField(5)); - assertTrue(row.getField(5) instanceof Integer[]); - Integer[] list1 = ((Integer[]) row.getField(5)); - assertEquals(1, list1[0].intValue()); - assertEquals(2, list1[1].intValue()); - assertEquals(3, list1[2].intValue()); - // validate reference - if (firstList1 == null) { - firstList1 = list1; - } else { - // repeated list must be different instances - assertTrue(firstList1 != list1); - } - - // check second list field (always [7, 7, 7]) - assertNotNull(row.getField(6)); - assertTrue(row.getField(6) instanceof Integer[]); - Integer[] list2 = ((Integer[]) row.getField(6)); - assertEquals(7, list2[0].intValue()); - assertEquals(7, list2[1].intValue()); - assertEquals(7, list2[2].intValue()); - - // check third list field (always null) - assertNull(row.getField(7)); - - // check first map field (always {2->"Hello", 4->"Hello}) - assertNotNull(row.getField(8)); - assertTrue(row.getField(8) instanceof HashMap); - HashMap map = (HashMap) row.getField(8); - assertEquals(2, map.size()); - assertEquals("Hello", map.get(2)); - assertEquals("Hello", map.get(4)); - // validate reference - if (firstMap1 == null) { - firstMap1 = map; - } else { - // repeated list must be different instances - assertTrue(firstMap1 != map); - } - - // check second map field (always null) - assertNull(row.getField(9)); - } - - rowOrcInputFormat.close(); - rowOrcInputFormat.closeInputFormat(); - - assertEquals(256, cnt); - } - - @Test - public void testReadWithProjection() throws IOException { - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); - - rowOrcInputFormat.selectFields(7, 0, 10, 8); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); - assertEquals(1, splits.length); - rowOrcInputFormat.openInputFormat(); - rowOrcInputFormat.open(splits[0]); - - assertFalse(rowOrcInputFormat.reachedEnd()); - Row row = rowOrcInputFormat.nextRecord(null); - - // validate first row - assertNotNull(row); - assertEquals(4, row.getArity()); - // check binary - assertArrayEquals(new byte[] {0, 1, 2, 3, 4}, (byte[]) row.getField(0)); - // check boolean - assertEquals(false, row.getField(1)); - // check list - assertTrue(row.getField(2) instanceof Object[]); - Object[] list1 = (Object[]) row.getField(2); - assertEquals(2, list1.length); - assertEquals(Row.of(3, "good"), list1[0]); - assertEquals(Row.of(4, "bad"), list1[1]); - // check string - assertEquals("hi", row.getField(3)); - - // check that there is a second row with four fields - assertFalse(rowOrcInputFormat.reachedEnd()); - row = rowOrcInputFormat.nextRecord(null); - assertNotNull(row); - assertEquals(4, row.getArity()); - assertTrue(rowOrcInputFormat.reachedEnd()); - } - - @Test - public void testReadFileInSplits() throws IOException { - - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); - rowOrcInputFormat.selectFields(0, 1); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(4); - assertEquals(4, splits.length); - rowOrcInputFormat.openInputFormat(); - - long cnt = 0; - // read all splits - for (FileInputSplit split : splits) { - - // open split - rowOrcInputFormat.open(split); - // read and count all rows - while (!rowOrcInputFormat.reachedEnd()) { - assertNotNull(rowOrcInputFormat.nextRecord(null)); - cnt++; - } - } - // check that all rows have been read - assertEquals(1920800, cnt); - } - - @Test - public void testReadFileInManySplits() throws IOException { - - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); - rowOrcInputFormat.selectFields(0, 1); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(4); - assertEquals(4, splits.length); - rowOrcInputFormat.openInputFormat(); - - long cnt = 0; - // read all splits - for (FileInputSplit split : splits) { - - // open split - rowOrcInputFormat.open(split); - // read and count all rows - while (!rowOrcInputFormat.reachedEnd()) { - assertNotNull(rowOrcInputFormat.nextRecord(null)); - cnt++; - } - rowOrcInputFormat.close(); - } - // check that all rows have been read - assertEquals(1920800, cnt); - } - - @Test - public void testReadFileWithFilter() throws IOException { - - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); - rowOrcInputFormat.selectFields(0, 1); - - // read head and tail of file - rowOrcInputFormat.addPredicate( - new OrcFilters.Or( - new OrcFilters.LessThan("_col0", PredicateLeaf.Type.LONG, 10L), - new OrcFilters.Not( - new OrcFilters.LessThanEquals( - "_col0", PredicateLeaf.Type.LONG, 1920000L)))); - rowOrcInputFormat.addPredicate( - new OrcFilters.Equals("_col1", PredicateLeaf.Type.STRING, "M")); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); - assertEquals(1, splits.length); - rowOrcInputFormat.openInputFormat(); - - // open split - rowOrcInputFormat.open(splits[0]); - - // read and count all rows - long cnt = 0; - while (!rowOrcInputFormat.reachedEnd()) { - assertNotNull(rowOrcInputFormat.nextRecord(null)); - cnt++; - } - // check that only the first and last stripes of the file have been read. - // Each stripe has 5000 rows, except the last which has 800 rows. - assertEquals(5800, cnt); - } - - @Test - public void testReadFileWithEvolvedSchema() throws IOException { - - rowOrcInputFormat = - new OrcRowInputFormat( - getPath(TEST_FILE_FLAT), - "struct<_col0:int,_col1:string,_col4:string,_col3:string>", // previous - // version of - // schema - new Configuration()); - rowOrcInputFormat.selectFields(3, 0, 2); - - rowOrcInputFormat.addPredicate( - new OrcFilters.LessThan("_col0", PredicateLeaf.Type.LONG, 10L)); - - FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); - assertEquals(1, splits.length); - rowOrcInputFormat.openInputFormat(); - - // open split - rowOrcInputFormat.open(splits[0]); - - // read and validate first row - assertFalse(rowOrcInputFormat.reachedEnd()); - Row row = rowOrcInputFormat.nextRecord(null); - assertNotNull(row); - assertEquals(3, row.getArity()); - assertEquals("Primary", row.getField(0)); - assertEquals(1, row.getField(1)); - assertEquals("M", row.getField(2)); - - // read and count remaining rows - long cnt = 1; - while (!rowOrcInputFormat.reachedEnd()) { - assertNotNull(rowOrcInputFormat.nextRecord(null)); - cnt++; - } - // check that only the first and last stripes of the file have been read. - // Each stripe has 5000 rows, except the last which has 800 rows. - assertEquals(5000, cnt); - } - - private String getPath(String fileName) { - return getClass().getClassLoader().getResource(fileName).getPath(); - } -} diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java deleted file mode 100644 index e56b66cd61c15..0000000000000 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; -import org.apache.flink.table.api.internal.TableEnvironmentInternal; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.Row; - -import org.junit.Test; - -import java.util.List; - -import static org.junit.Assert.assertEquals; - -/** Tests for {@link OrcTableSource}. */ -public class OrcTableSourceITCase extends MultipleProgramsTestBase { - - private static final String TEST_FILE_FLAT = "test-data-flat.orc"; - private static final String TEST_SCHEMA_FLAT = - "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>"; - - public OrcTableSourceITCase() { - super(TestExecutionMode.COLLECTION); - } - - @Test - public void testFullScan() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); - - OrcTableSource orc = - OrcTableSource.builder() - .path(getPath(TEST_FILE_FLAT)) - .forOrcSchema(TEST_SCHEMA_FLAT) - .build(); - ((TableEnvironmentInternal) tEnv).registerTableSourceInternal("OrcTable", orc); - - String query = - "SELECT COUNT(*), " - + "MIN(_col0), MAX(_col0), " - + "MIN(_col1), MAX(_col1), " - + "MIN(_col2), MAX(_col2), " - + "MIN(_col3), MAX(_col3), " - + "MIN(_col4), MAX(_col4), " - + "MIN(_col5), MAX(_col5), " - + "MIN(_col6), MAX(_col6), " - + "MIN(_col7), MAX(_col7), " - + "MIN(_col8), MAX(_col8) " - + "FROM OrcTable"; - Table t = tEnv.sqlQuery(query); - - DataSet dataSet = tEnv.toDataSet(t, Row.class); - List result = dataSet.collect(); - - assertEquals(1, result.size()); - assertEquals( - "+I[1920800, 1, 1920800, F, M, D, W, 2 yr Degree, Unknown, 500, 10000, Good, Unknown, 0, 6, 0, 6, 0, 6]", - result.get(0).toString()); - } - - @Test - public void testScanWithProjectionAndFilter() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); - - OrcTableSource orc = - OrcTableSource.builder() - .path(getPath(TEST_FILE_FLAT)) - .forOrcSchema(TEST_SCHEMA_FLAT) - .build(); - ((TableEnvironmentInternal) tEnv).registerTableSourceInternal("OrcTable", orc); - - String query = - "SELECT " - + "MIN(_col4), MAX(_col4), " - + "MIN(_col3), MAX(_col3), " - + "MIN(_col0), MAX(_col0), " - + "MIN(_col2), MAX(_col2), " - + "COUNT(*) " - + "FROM OrcTable " - + "WHERE (_col0 BETWEEN 4975 and 5024 OR _col0 BETWEEN 9975 AND 10024) AND _col1 = 'F'"; - Table t = tEnv.sqlQuery(query); - - DataSet dataSet = tEnv.toDataSet(t, Row.class); - List result = dataSet.collect(); - - assertEquals(1, result.size()); - assertEquals( - "+I[1500, 6000, 2 yr Degree, Unknown, 4976, 10024, D, W, 50]", - result.get(0).toString()); - } - - private String getPath(String fileName) { - return getClass().getClassLoader().getResource(fileName).getPath(); - } -} diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java deleted file mode 100644 index bb50edb4ddbb8..0000000000000 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java +++ /dev/null @@ -1,384 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.expressions.EqualTo; -import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.GetCompositeField; -import org.apache.flink.table.expressions.GreaterThan; -import org.apache.flink.table.expressions.ItemAt; -import org.apache.flink.table.expressions.Literal; -import org.apache.flink.table.expressions.PlannerResolvedFieldReference; -import org.apache.flink.types.Row; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.powermock.api.mockito.PowerMockito.when; - -/** Unit Tests for {@link OrcTableSource}. */ -public class OrcTableSourceTest { - - private static final String TEST_FILE_NESTED = "test-data-nested.orc"; - private static final String TEST_SCHEMA_NESTED = - "struct<" - + "boolean1:boolean," - + "byte1:tinyint," - + "short1:smallint," - + "int1:int," - + "long1:bigint," - + "float1:float," - + "double1:double," - + "bytes1:binary," - + "string1:string," - + "middle:struct<" - + "list:array<" - + "struct<" - + "int1:int," - + "string1:string" - + ">" - + ">" - + ">," - + "list:array<" - + "struct<" - + "int1:int," - + "string1:string" - + ">" - + ">," - + "map:map<" - + "string," - + "struct<" - + "int1:int," - + "string1:string" - + ">" - + ">" - + ">"; - - @Test - public void testGetReturnType() throws Exception { - - OrcTableSource orc = - OrcTableSource.builder() - .path(getPath(TEST_FILE_NESTED)) - .forOrcSchema(TEST_SCHEMA_NESTED) - .build(); - - TypeInformation returnType = orc.getReturnType(); - assertNotNull(returnType); - assertTrue(returnType instanceof RowTypeInfo); - RowTypeInfo rowType = (RowTypeInfo) returnType; - - TypeInformation expected = - Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()); - assertEquals(expected, rowType); - } - - @Test - public void testGetTableSchema() throws Exception { - - OrcTableSource orc = - OrcTableSource.builder() - .path(getPath(TEST_FILE_NESTED)) - .forOrcSchema(TEST_SCHEMA_NESTED) - .build(); - - TableSchema schema = orc.getTableSchema(); - assertNotNull(schema); - assertArrayEquals(getNestedFieldNames(), schema.getFieldNames()); - assertArrayEquals(getNestedFieldTypes(), schema.getFieldTypes()); - } - - @Test - @SuppressWarnings("unchecked") - public void testProjectFields() throws Exception { - - OrcTableSource orc = - OrcTableSource.builder() - .path(getPath(TEST_FILE_NESTED)) - .forOrcSchema(TEST_SCHEMA_NESTED) - .build(); - - OrcTableSource projected = (OrcTableSource) orc.projectFields(new int[] {3, 5, 1, 0}); - - // ensure copy is returned - assertTrue(orc != projected); - - // ensure table schema is identical - assertEquals(orc.getTableSchema(), projected.getTableSchema()); - - // ensure return type was adapted - String[] fieldNames = getNestedFieldNames(); - TypeInformation[] fieldTypes = getNestedFieldTypes(); - assertEquals( - Types.ROW_NAMED( - new String[] {fieldNames[3], fieldNames[5], fieldNames[1], fieldNames[0]}, - new TypeInformation[] { - fieldTypes[3], fieldTypes[5], fieldTypes[1], fieldTypes[0] - }), - projected.getReturnType()); - - // ensure IF is configured with selected fields - OrcTableSource spyTS = spy(projected); - OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class); - doReturn(mockIF).when(spyTS).buildOrcInputFormat(); - ExecutionEnvironment env = mock(ExecutionEnvironment.class); - when(env.createInput(any(InputFormat.class))).thenReturn(mock(DataSource.class)); - spyTS.getDataSet(env); - verify(mockIF).selectFields(eq(3), eq(5), eq(1), eq(0)); - } - - @Test - @SuppressWarnings("unchecked") - public void testApplyPredicate() throws Exception { - - OrcTableSource orc = - OrcTableSource.builder() - .path(getPath(TEST_FILE_NESTED)) - .forOrcSchema(TEST_SCHEMA_NESTED) - .build(); - - // expressions for supported predicates - Expression pred1 = - new GreaterThan( - new PlannerResolvedFieldReference("int1", Types.INT), - new Literal(100, Types.INT)); - Expression pred2 = - new EqualTo( - new PlannerResolvedFieldReference("string1", Types.STRING), - new Literal("hello", Types.STRING)); - // invalid predicate - Expression invalidPred = - new EqualTo( - new PlannerResolvedFieldReference("long1", Types.LONG), - // some invalid, non-serializable literal (here an object of this test - // class) - new Literal(new OrcTableSourceTest(), Types.LONG)); - - ArrayList preds = new ArrayList<>(); - preds.add(pred1); - preds.add(pred2); - preds.add(unsupportedPred()); - preds.add(invalidPred); - - // apply predicates on TableSource - OrcTableSource projected = (OrcTableSource) orc.applyPredicate(preds); - - // ensure copy is returned - assertTrue(orc != projected); - - // ensure table schema is identical - assertEquals(orc.getTableSchema(), projected.getTableSchema()); - - // ensure return type is identical - assertEquals( - Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()), - projected.getReturnType()); - - // ensure IF is configured with valid/supported predicates - OrcTableSource spyTS = spy(projected); - OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class); - doReturn(mockIF).when(spyTS).buildOrcInputFormat(); - ExecutionEnvironment environment = mock(ExecutionEnvironment.class); - when(environment.createInput(any(InputFormat.class))).thenReturn(mock(DataSource.class)); - spyTS.getDataSet(environment); - - ArgumentCaptor arguments = - ArgumentCaptor.forClass(OrcFilters.Predicate.class); - verify(mockIF, times(2)).addPredicate(arguments.capture()); - List values = - arguments.getAllValues().stream() - .map(Object::toString) - .collect(Collectors.toList()); - assertTrue( - values.contains( - new OrcFilters.Not( - new OrcFilters.LessThanEquals( - "int1", PredicateLeaf.Type.LONG, 100)) - .toString())); - assertTrue( - values.contains( - new OrcFilters.Equals("string1", PredicateLeaf.Type.STRING, "hello") - .toString())); - - // ensure filter pushdown is correct - assertTrue(spyTS.isFilterPushedDown()); - assertFalse(orc.isFilterPushedDown()); - } - - private Expression unsupportedPred() { - return new EqualTo( - new GetCompositeField( - new ItemAt( - new PlannerResolvedFieldReference( - "list", - ObjectArrayTypeInfo.getInfoFor( - Types.ROW_NAMED( - new String[] {"int1", "string1"}, - Types.INT, - Types.STRING))), - new Literal(1, Types.INT)), - "int1"), - new Literal(1, Types.INT)); - } - - @Test - public void testUnsupportedPredOnly() { - OrcTableSource orc = - OrcTableSource.builder() - .path(getPath(TEST_FILE_NESTED)) - .forOrcSchema(TEST_SCHEMA_NESTED) - .build(); - - // apply predicates on TableSource - OrcTableSource projected = - (OrcTableSource) orc.applyPredicate(Collections.singletonList(unsupportedPred())); - - assertNotEquals(orc.explainSource(), projected.explainSource()); - } - - @Test - public void testBuilder() throws Exception { - - // validate path, schema, and recursive enumeration default (enabled) - OrcTableSource orc1 = - OrcTableSource.builder() - .path(getPath(TEST_FILE_NESTED)) - .forOrcSchema(TEST_SCHEMA_NESTED) - .build(); - - DataSet rows1 = orc1.getDataSet(ExecutionEnvironment.createLocalEnvironment()); - OrcRowInputFormat orcIF1 = (OrcRowInputFormat) ((DataSource) rows1).getInputFormat(); - assertEquals(true, orcIF1.getNestedFileEnumeration()); - assertEquals(getPath(TEST_FILE_NESTED), orcIF1.getFilePath().toString()); - assertEquals(TEST_SCHEMA_NESTED, orcIF1.getSchema()); - - // validate recursive enumeration disabled - OrcTableSource orc2 = - OrcTableSource.builder() - .path(getPath(TEST_FILE_NESTED), false) - .forOrcSchema(TEST_SCHEMA_NESTED) - .build(); - - DataSet rows2 = orc2.getDataSet(ExecutionEnvironment.createLocalEnvironment()); - OrcRowInputFormat orcIF2 = (OrcRowInputFormat) ((DataSource) rows2).getInputFormat(); - assertEquals(false, orcIF2.getNestedFileEnumeration()); - - // validate Hadoop configuration - Configuration conf = new Configuration(); - conf.set("testKey", "testValue"); - OrcTableSource orc3 = - OrcTableSource.builder() - .path(getPath(TEST_FILE_NESTED)) - .forOrcSchema(TEST_SCHEMA_NESTED) - .withConfiguration(conf) - .build(); - - DataSet rows3 = orc3.getDataSet(ExecutionEnvironment.createLocalEnvironment()); - OrcRowInputFormat orcIF3 = (OrcRowInputFormat) ((DataSource) rows3).getInputFormat(); - assertEquals(conf, orcIF3.getConfiguration()); - - // validate batch size - OrcTableSource orc4 = - OrcTableSource.builder() - .path(getPath(TEST_FILE_NESTED)) - .forOrcSchema(TEST_SCHEMA_NESTED) - .withBatchSize(987) - .build(); - - DataSet rows4 = orc4.getDataSet(ExecutionEnvironment.createLocalEnvironment()); - OrcRowInputFormat orcIF4 = (OrcRowInputFormat) ((DataSource) rows4).getInputFormat(); - assertEquals(987, orcIF4.getBatchSize()); - } - - private String getPath(String fileName) { - return getClass().getClassLoader().getResource(fileName).getPath(); - } - - private String[] getNestedFieldNames() { - return new String[] { - "boolean1", - "byte1", - "short1", - "int1", - "long1", - "float1", - "double1", - "bytes1", - "string1", - "middle", - "list", - "map" - }; - } - - private TypeInformation[] getNestedFieldTypes() { - return new TypeInformation[] { - Types.BOOLEAN, - Types.BYTE, - Types.SHORT, - Types.INT, - Types.LONG, - Types.FLOAT, - Types.DOUBLE, - PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, - Types.STRING, - Types.ROW_NAMED( - new String[] {"list"}, - ObjectArrayTypeInfo.getInfoFor( - Types.ROW_NAMED( - new String[] {"int1", "string1"}, Types.INT, Types.STRING))), - ObjectArrayTypeInfo.getInfoFor( - Types.ROW_NAMED(new String[] {"int1", "string1"}, Types.INT, Types.STRING)), - new MapTypeInfo<>( - Types.STRING, - Types.ROW_NAMED(new String[] {"int1", "string1"}, Types.INT, Types.STRING)) - }; - } -} diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java deleted file mode 100644 index f2830c7e34bbd..0000000000000 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java +++ /dev/null @@ -1,374 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc.util; - -import org.apache.flink.orc.OrcRowInputFormatTest; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.OrcFile; -import org.apache.orc.TypeDescription; -import org.apache.orc.Writer; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; - -/** A generator for ORC test files. */ -public class OrcTestFileGenerator { - - public static void main(String[] args) throws IOException { - writeCompositeTypesWithNullsFile(args[0]); - // writeCompositeTypesWithRepeatingFile(args[0]); - } - - /** - * Writes an ORC file with nested composite types and null values on different levels. Generates - * {@link OrcRowInputFormatTest#TEST_FILE_COMPOSITES_NULLS}. - */ - private static void writeCompositeTypesWithNullsFile(String path) throws IOException { - - Path filePath = new Path(path); - Configuration conf = new Configuration(); - - TypeDescription schema = - TypeDescription.fromString( - "struct<" - + "int1:int," - + "record1:struct," - + "list1:array>>>," - + "list2:array>" - + ">"); - - Writer writer = - OrcFile.createWriter(filePath, OrcFile.writerOptions(conf).setSchema(schema)); - - VectorizedRowBatch batch = schema.createRowBatch(); - LongColumnVector int1 = (LongColumnVector) batch.cols[0]; - - StructColumnVector record1 = (StructColumnVector) batch.cols[1]; - LongColumnVector record1F1 = (LongColumnVector) record1.fields[0]; - BytesColumnVector record1F2 = (BytesColumnVector) record1.fields[1]; - - ListColumnVector list1 = (ListColumnVector) batch.cols[2]; - ListColumnVector nestedList = (ListColumnVector) list1.child; - ListColumnVector nestedList2 = (ListColumnVector) nestedList.child; - StructColumnVector listEntries = (StructColumnVector) nestedList2.child; - BytesColumnVector entryField1 = (BytesColumnVector) listEntries.fields[0]; - BytesColumnVector entryField2 = (BytesColumnVector) listEntries.fields[1]; - - ListColumnVector list2 = (ListColumnVector) batch.cols[3]; - MapColumnVector map1 = (MapColumnVector) list2.child; - BytesColumnVector keys = (BytesColumnVector) map1.keys; - LongColumnVector vals = (LongColumnVector) map1.values; - - final int list1Size = 4; - final int nestedListSize = 3; - final int nestedList2Size = 2; - final int list2Size = 3; - final int mapSize = 3; - - final int batchSize = batch.getMaxSize(); - - // Ensure the vectors have sufficient capacity - nestedList.ensureSize(batchSize * list1Size, false); - nestedList2.ensureSize(batchSize * list1Size * nestedListSize, false); - listEntries.ensureSize(batchSize * list1Size * nestedListSize * nestedList2Size, false); - map1.ensureSize(batchSize * list2Size, false); - keys.ensureSize(batchSize * list2Size * mapSize, false); - vals.ensureSize(batchSize * list2Size * mapSize, false); - - // add 2500 rows to file - for (int r = 0; r < 2500; ++r) { - int row = batch.size++; - - // mark nullable fields - list1.noNulls = false; - nestedList.noNulls = false; - listEntries.noNulls = false; - entryField1.noNulls = false; - record1.noNulls = false; - record1F2.noNulls = false; - list2.noNulls = false; - map1.noNulls = false; - keys.noNulls = false; - vals.noNulls = false; - - // first field: int1 - int1.vector[row] = r; - - // second field: struct - if (row % 2 != 0) { - // in every second row, the struct is null - record1F1.vector[row] = row; - if (row % 5 != 0) { - // in every fifth row, the second field of the struct is null - record1F2.setVal(row, ("f2-" + row).getBytes(StandardCharsets.UTF_8)); - } else { - record1F2.isNull[row] = true; - } - } else { - record1.isNull[row] = true; - } - - // third field: deeply nested list - if (row % 3 != 0) { - // in every third row, the nested list is null - list1.offsets[row] = list1.childCount; - list1.lengths[row] = list1Size; - list1.childCount += list1Size; - - for (int i = 0; i < list1Size; i++) { - - int listOffset = (int) list1.offsets[row] + i; - if (i != 2) { - // second nested list is always null - nestedList.offsets[listOffset] = nestedList.childCount; - nestedList.lengths[listOffset] = nestedListSize; - nestedList.childCount += nestedListSize; - - for (int j = 0; j < nestedListSize; j++) { - int nestedOffset = (int) nestedList.offsets[listOffset] + j; - nestedList2.offsets[nestedOffset] = nestedList2.childCount; - nestedList2.lengths[nestedOffset] = nestedList2Size; - nestedList2.childCount += nestedList2Size; - - for (int k = 0; k < nestedList2Size; k++) { - int nestedOffset2 = (int) nestedList2.offsets[nestedOffset] + k; - // list entries - if (k != 1) { - // second struct is always null - if (k != 0) { - // first struct field in first struct is always null - entryField1.setVal( - nestedOffset2, - ("f1-" + k).getBytes(StandardCharsets.UTF_8)); - } else { - entryField1.isNull[nestedOffset2] = true; - } - entryField2.setVal( - nestedOffset2, - ("f2-" + k).getBytes(StandardCharsets.UTF_8)); - } else { - listEntries.isNull[nestedOffset2] = true; - } - } - } - } else { - nestedList.isNull[listOffset] = true; - } - } - } else { - list1.isNull[row] = true; - } - - // forth field: map in list - if (row % 3 != 0) { - // in every third row, the map list is null - list2.offsets[row] = list2.childCount; - list2.lengths[row] = list2Size; - list2.childCount += list2Size; - - for (int i = 0; i < list2Size; i++) { - int mapOffset = (int) list2.offsets[row] + i; - - if (i != 2) { - // second map list entry is always null - map1.offsets[mapOffset] = map1.childCount; - map1.lengths[mapOffset] = mapSize; - map1.childCount += mapSize; - - for (int j = 0; j < mapSize; j++) { - int mapEntryOffset = (int) map1.offsets[mapOffset] + j; - - if (j != 1) { - // key in second map entry is always null - keys.setVal( - mapEntryOffset, - ("key-" + row + "-" + j).getBytes(StandardCharsets.UTF_8)); - } else { - keys.isNull[mapEntryOffset] = true; - } - if (j != 2) { - // value in third map entry is always null - vals.vector[mapEntryOffset] = row + i + j; - } else { - vals.isNull[mapEntryOffset] = true; - } - } - } else { - map1.isNull[mapOffset] = true; - } - } - } else { - list2.isNull[row] = true; - } - - if (row == batchSize - 1) { - writer.addRowBatch(batch); - batch.reset(); - } - } - if (batch.size != 0) { - writer.addRowBatch(batch); - batch.reset(); - } - writer.close(); - } - - /** - * Writes an ORC file with nested composite types and repeated values. Generates {@link - * OrcRowInputFormatTest#TEST_FILE_REPEATING}. - */ - private static void writeCompositeTypesWithRepeatingFile(String path) throws IOException { - - Path filePath = new Path(path); - Configuration conf = new Configuration(); - - TypeDescription schema = - TypeDescription.fromString( - "struct<" - + "int1:int," - + "int2:int," - + "int3:int," - + "record1:struct," - + "record2:struct," - + "list1:array," - + "list2:array," - + "list3:array," - + "map1:map," - + "map2:map" - + ">"); - - Writer writer = - OrcFile.createWriter(filePath, OrcFile.writerOptions(conf).setSchema(schema)); - - VectorizedRowBatch batch = schema.createRowBatch(); - - LongColumnVector int1 = (LongColumnVector) batch.cols[0]; - LongColumnVector int2 = (LongColumnVector) batch.cols[1]; - LongColumnVector int3 = (LongColumnVector) batch.cols[2]; - - StructColumnVector record1 = (StructColumnVector) batch.cols[3]; - LongColumnVector record1F1 = (LongColumnVector) record1.fields[0]; - BytesColumnVector record1F2 = (BytesColumnVector) record1.fields[1]; - StructColumnVector record2 = (StructColumnVector) batch.cols[4]; - - ListColumnVector list1 = (ListColumnVector) batch.cols[5]; - LongColumnVector list1int = (LongColumnVector) list1.child; - ListColumnVector list2 = (ListColumnVector) batch.cols[6]; - LongColumnVector list2int = (LongColumnVector) list2.child; - ListColumnVector list3 = (ListColumnVector) batch.cols[7]; - - MapColumnVector map1 = (MapColumnVector) batch.cols[8]; - LongColumnVector map1keys = (LongColumnVector) map1.keys; - BytesColumnVector map1vals = (BytesColumnVector) map1.values; - MapColumnVector map2 = (MapColumnVector) batch.cols[9]; - - final int listSize = 3; - final int mapSize = 2; - - final int batchSize = batch.getMaxSize(); - - // Ensure the vectors have sufficient capacity - list1int.ensureSize(batchSize * listSize, false); - list2int.ensureSize(batchSize * listSize, false); - map1keys.ensureSize(batchSize * mapSize, false); - map1vals.ensureSize(batchSize * mapSize, false); - - // int1: all values are 42 - int1.noNulls = true; - int1.setRepeating(true); - int1.vector[0] = 42; - - // int2: all values are null - int2.noNulls = false; - int2.setRepeating(true); - int2.isNull[0] = true; - - // int3: all values are 99 - int3.noNulls = false; - int3.setRepeating(true); - int3.isNull[0] = false; - int3.vector[0] = 99; - - // record1: all records are [23, "Hello"] - record1.noNulls = true; - record1.setRepeating(true); - for (int i = 0; i < batchSize; i++) { - record1F1.vector[i] = i + 23; - } - record1F2.noNulls = false; - record1F2.isNull[0] = true; - - // record2: all records are null - record2.noNulls = false; - record2.setRepeating(true); - record2.isNull[0] = true; - - // list1: all lists are [1, 2, 3] - list1.noNulls = true; - list1.setRepeating(true); - list1.lengths[0] = listSize; - list1.offsets[0] = 1; - for (int i = 0; i < batchSize * listSize; i++) { - list1int.vector[i] = i; - } - - // list2: all lists are [7, 7, 7] - list2.noNulls = true; - list2.setRepeating(true); - list2.lengths[0] = listSize; - list2.offsets[0] = 0; - list2int.setRepeating(true); - list2int.vector[0] = 7; - - // list3: all lists are null - list3.noNulls = false; - list3.setRepeating(true); - list3.isNull[0] = true; - - // map1: all maps are [2 -> "HELLO", 4 -> "HELLO"] - map1.noNulls = true; - map1.setRepeating(true); - map1.lengths[0] = mapSize; - map1.offsets[0] = 1; - for (int i = 0; i < batchSize * mapSize; i++) { - map1keys.vector[i] = i * 2; - } - map1vals.setRepeating(true); - map1vals.setVal(0, "Hello".getBytes(StandardCharsets.UTF_8)); - - // map2: all maps are null - map2.noNulls = false; - map2.setRepeating(true); - map2.isNull[0] = true; - - batch.size = 256; - - writer.addRowBatch(batch); - batch.reset(); - writer.close(); - } -}