diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java new file mode 100644 index 000000000000..058331a44cf4 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.time.Instant; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * A timestamp represented with configurable precision. + * + *

This logical type stores timestamps as a Row with two fields: + * + *

+ * + *

The subseconds field is always non-negative, even for timestamps before the epoch. For + * example, -1.5 seconds is represented as {seconds: -2, subseconds: 500000} for microsecond + * precision. This matches Java's {@link java.time.Instant} internal representation. + * + *

Note for users converting from single-integer timestamp representations: If you have + * timestamps stored as a single long value (e.g., microseconds since epoch), you must handle + * negative modulo correctly when converting: + * + *

{@code
+ * long timestampMicros = -1_500_000;
+ * long seconds = timestampMicros / 1_000_000;
+ * long micros = timestampMicros % 1_000_000;
+ * if (micros < 0) {
+ *   micros += 1_000_000;
+ *   seconds -= 1;
+ * }
+ * Instant instant = Instant.ofEpochSecond(seconds, micros * 1000);
+ * }
+ */ +public class Timestamp implements Schema.LogicalType { + public static final String IDENTIFIER = "beam:logical_type:timestamp:v1"; + static final int MIN_PRECISION = 0; + static final int MAX_PRECISION = 9; + + private final int precision; + private final int scalingFactor; + private final Schema timestampSchema; + + public static Timestamp of(int precision) { + return new Timestamp(precision); + } + + public static final Timestamp MILLIS = Timestamp.of(3); + public static final Timestamp MICROS = Timestamp.of(6); + public static final Timestamp NANOS = Timestamp.of(9); + + public Timestamp(int precision) { + checkArgument( + precision <= MAX_PRECISION && precision >= MIN_PRECISION, + "Timestamp precision must be between %s and %s (inclusive), but was %s.", + MIN_PRECISION, + MAX_PRECISION, + precision); + this.precision = precision; + this.scalingFactor = (int) Math.pow(10, MAX_PRECISION - precision); + if (precision < 5) { + this.timestampSchema = + Schema.builder().addInt64Field("seconds").addInt16Field("subseconds").build(); + } else { + this.timestampSchema = + Schema.builder().addInt64Field("seconds").addInt32Field("subseconds").build(); + } + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public Schema.FieldType getArgumentType() { + return Schema.FieldType.INT32; + } + + @Override + public Integer getArgument() { + return precision; + } + + @Override + public Schema.FieldType getBaseType() { + return Schema.FieldType.row(timestampSchema); + } + + @Override + public Row toBaseType(Instant input) { + // Avoid silent data loss + checkState( + input.getNano() % scalingFactor == 0, + "Timestamp logical type was configured with precision %s, but encountered " + + "a Java Instant with %s nanoseconds (not evenly divisible by scaling factor %s).", + precision, + input.getNano(), + scalingFactor); + + int subseconds = input.getNano() / scalingFactor; + + Row.Builder rowBuilder = Row.withSchema(timestampSchema).addValue(input.getEpochSecond()); + if (precision < 5) { + rowBuilder.addValue((short) subseconds); // Explicitly add as short + } else { + rowBuilder.addValue(subseconds); // Add as int + } + return rowBuilder.build(); + } + + @Override + public Instant toInputType(@NonNull Row base) { + long subseconds = + (precision < 5) + ? checkArgumentNotNull( + base.getInt16(1), + "While trying to convert to Instant: Row missing subseconds field") + : checkArgumentNotNull( + base.getInt32(1), + "While trying to convert to Instant: Row missing subseconds field"); + + checkArgument( + subseconds >= 0, + "While trying to convert to Instant: subseconds field must be non-negative, " + + "but was %s. This likely indicates data corruption.", + subseconds); + + int maxSubseconds = (int) (Math.pow(10, precision) - 1); + checkArgument( + subseconds <= maxSubseconds, + "While trying to convert to Instant: subseconds field must be <= %s for precision %s, " + + "but was %s. This likely indicates data corruption or precision mismatch.", + maxSubseconds, + precision, + subseconds); + + return Instant.ofEpochSecond( + checkArgumentNotNull( + base.getInt64(0), "While trying to convert to Instant: Row missing seconds field"), + subseconds * scalingFactor); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java index e1590408021a..3c1e9029db71 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java @@ -241,4 +241,271 @@ public void testVariableString() { // check argument invalid case assertThrows(IllegalArgumentException.class, () -> varibaleString.toInputType("123456")); } + + @Test + public void testTimestampMillis() { + Timestamp timestampType = Timestamp.MILLIS; + assertEquals(3, timestampType.getArgument().intValue()); + + // Positive timestamp with millisecond precision + Instant instant = Instant.ofEpochSecond(1609459200, 123_000_000); // 2021-01-01 00:00:00.123 UTC + Schema schema = Schema.builder().addLogicalTypeField("ts", timestampType).build(); + Row row = Row.withSchema(schema).addValue(instant).build(); + + assertEquals(instant, row.getLogicalTypeValue(0, Instant.class)); + + // Check base type conversion + Row baseRow = row.getBaseValue(0, Row.class); + assertEquals(1609459200L, baseRow.getInt64("seconds").longValue()); + assertEquals((short) 123, baseRow.getInt16("subseconds").shortValue()); + } + + @Test + public void testTimestampMicros() { + Timestamp timestampType = Timestamp.MICROS; + assertEquals(6, timestampType.getArgument().intValue()); + + // Positive timestamp with microsecond precision + Instant instant = + Instant.ofEpochSecond(1609459200, 123_456_000); // 2021-01-01 00:00:00.123456 UTC + Schema schema = Schema.builder().addLogicalTypeField("ts", timestampType).build(); + Row row = Row.withSchema(schema).addValue(instant).build(); + + assertEquals(instant, row.getLogicalTypeValue(0, Instant.class)); + + // Check base type conversion uses INT32 for micros + Row baseRow = row.getBaseValue(0, Row.class); + assertEquals(1609459200L, baseRow.getInt64("seconds").longValue()); + assertEquals(123_456, baseRow.getInt32("subseconds").intValue()); + } + + @Test + public void testTimestampNanos() { + Timestamp timestampType = Timestamp.NANOS; + assertEquals(9, timestampType.getArgument().intValue()); + + // Positive timestamp with nanosecond precision + Instant instant = + Instant.ofEpochSecond(1609459200, 123_456_789); // 2021-01-01 00:00:00.123456789 UTC + Schema schema = Schema.builder().addLogicalTypeField("ts", timestampType).build(); + Row row = Row.withSchema(schema).addValue(instant).build(); + + assertEquals(instant, row.getLogicalTypeValue(0, Instant.class)); + + // Check base type conversion uses INT32 for nanos + Row baseRow = row.getBaseValue(0, Row.class); + assertEquals(1609459200L, baseRow.getInt64("seconds").longValue()); + assertEquals(123_456_789, baseRow.getInt32("subseconds").intValue()); + } + + @Test + public void testTimestampNegative() { + Timestamp timestampType = Timestamp.MICROS; + + // Negative timestamp: -1.5 seconds before epoch + // Should be represented as {seconds: -2, subseconds: 500000} + Instant instant = Instant.ofEpochSecond(-2, 500_000_000); + Schema schema = Schema.builder().addLogicalTypeField("ts", timestampType).build(); + Row row = Row.withSchema(schema).addValue(instant).build(); + + assertEquals(instant, row.getLogicalTypeValue(0, Instant.class)); + + // Verify the internal representation + Row baseRow = row.getBaseValue(0, Row.class); + assertEquals(-2L, baseRow.getInt64("seconds").longValue()); + assertEquals(500_000, baseRow.getInt32("subseconds").intValue()); + } + + @Test + public void testTimestampZero() { + Timestamp timestampType = Timestamp.MICROS; + + // Epoch timestamp + Instant instant = Instant.ofEpochSecond(0, 0); + Schema schema = Schema.builder().addLogicalTypeField("ts", timestampType).build(); + Row row = Row.withSchema(schema).addValue(instant).build(); + + assertEquals(instant, row.getLogicalTypeValue(0, Instant.class)); + + Row baseRow = row.getBaseValue(0, Row.class); + assertEquals(0L, baseRow.getInt64("seconds").longValue()); + assertEquals(0, baseRow.getInt32("subseconds").intValue()); + } + + @Test + public void testTimestampPrecisionBoundary() { + // Test the boundary between INT16 and INT32 representation + Timestamp precision4 = Timestamp.of(4); + Timestamp precision5 = Timestamp.of(5); + + // Precision 4 should use INT16 + Instant instant4 = Instant.ofEpochSecond(100, 999_900_000); + Schema schema4 = Schema.builder().addLogicalTypeField("ts", precision4).build(); + Row row4 = Row.withSchema(schema4).addValue(instant4).build(); + Row baseRow4 = row4.getBaseValue(0, Row.class); + assertEquals((short) 999_9, baseRow4.getInt16("subseconds").shortValue()); + + // Precision 5 should use INT32 + Instant instant5 = Instant.ofEpochSecond(100, 999_990_000); + Schema schema5 = Schema.builder().addLogicalTypeField("ts", precision5).build(); + Row row5 = Row.withSchema(schema5).addValue(instant5).build(); + Row baseRow5 = row5.getBaseValue(0, Row.class); + assertEquals(999_99, baseRow5.getInt32("subseconds").intValue()); + } + + @Test + public void testTimestampDataLossDetection() { + Timestamp millisType = Timestamp.MILLIS; + + // Try to store microsecond-precision instant in millis logical type + Instant instant = Instant.ofEpochSecond(100, 123_456_000); // Has microseconds + Schema schema = Schema.builder().addLogicalTypeField("ts", millisType).build(); + + // Should throw because 123_456_000 nanos is not divisible by 1_000_000 + assertThrows( + IllegalStateException.class, () -> Row.withSchema(schema).addValue(instant).build()); + } + + @Test + public void testTimestampDataLossDetectionNanos() { + Timestamp microsType = Timestamp.MICROS; + + // Try to store nanosecond-precision instant in micros logical type + Instant instant = Instant.ofEpochSecond(100, 123_456_789); // Has nanoseconds + Schema schema = Schema.builder().addLogicalTypeField("ts", microsType).build(); + + // Should throw because 123_456_789 nanos is not divisible by 1_000 + assertThrows( + IllegalStateException.class, () -> Row.withSchema(schema).addValue(instant).build()); + } + + @Test + public void testTimestampInvalidPrecision() { + assertThrows(IllegalArgumentException.class, () -> Timestamp.of(-1)); + assertThrows(IllegalArgumentException.class, () -> Timestamp.of(10)); + } + + @Test + public void testTimestampRoundTrip() { + // Test that we can round-trip through base type for all precisions + for (int precision = 0; precision <= 9; precision++) { + Timestamp timestampType = Timestamp.of(precision); + + long nanos = 123_456_789; + int scalingFactor = (int) Math.pow(10, 9 - precision); + nanos = (nanos / scalingFactor) * scalingFactor; + + Instant original = Instant.ofEpochSecond(1609459200, nanos); + + Row baseRow = timestampType.toBaseType(original); + Instant roundTripped = timestampType.toInputType(baseRow); + + assertEquals(original, roundTripped); + } + } + + @Test + public void testTimestampNegativeRoundTrip() { + Timestamp timestampType = Timestamp.MICROS; + + Instant original = Instant.ofEpochSecond(-100, 500_000_000); + Row baseRow = timestampType.toBaseType(original); + Instant roundTripped = timestampType.toInputType(baseRow); + + assertEquals(original, roundTripped); + + assertEquals(-100L, baseRow.getInt64("seconds").longValue()); + assertEquals(500_000, baseRow.getInt32("subseconds").intValue()); + } + + @Test + public void testTimestampArgumentType() { + Timestamp timestampType = Timestamp.MICROS; + + // Check argument type is INT32 + assertEquals(FieldType.INT32, timestampType.getArgumentType()); + + // Check argument value + assertEquals(Integer.valueOf(6), timestampType.getArgument()); + } + + @Test + public void testTimestampBaseTypeStructure() { + Timestamp millisType = Timestamp.MILLIS; + Timestamp microsType = Timestamp.MICROS; + + // Check base type is a row schema + assertEquals(Schema.TypeName.ROW, millisType.getBaseType().getTypeName()); + assertEquals(Schema.TypeName.ROW, microsType.getBaseType().getTypeName()); + + // Check millis uses INT16 for subseconds (precision < 5) + Schema millisSchema = millisType.getBaseType().getRowSchema(); + assertEquals(2, millisSchema.getFieldCount()); + assertEquals("seconds", millisSchema.getField(0).getName()); + assertEquals(FieldType.INT64, millisSchema.getField(0).getType()); + assertEquals("subseconds", millisSchema.getField(1).getName()); + assertEquals(FieldType.INT16, millisSchema.getField(1).getType()); + + // Check micros uses INT32 for subseconds (precision >= 5) + Schema microsSchema = microsType.getBaseType().getRowSchema(); + assertEquals(2, microsSchema.getFieldCount()); + assertEquals("seconds", microsSchema.getField(0).getName()); + assertEquals(FieldType.INT64, microsSchema.getField(0).getType()); + assertEquals("subseconds", microsSchema.getField(1).getName()); + assertEquals(FieldType.INT32, microsSchema.getField(1).getType()); + } + + @Test + public void testTimestampCorruptedDataNegativeSubseconds() { + Timestamp timestampType = Timestamp.MICROS; + Schema baseSchema = timestampType.getBaseType().getRowSchema(); + + // Create a corrupted row with negative subseconds + Row corruptedRow = + Row.withSchema(baseSchema) + .addValue(-1L) // seconds + .addValue(-500_000) // subseconds + .build(); + + assertThrows(IllegalArgumentException.class, () -> timestampType.toInputType(corruptedRow)); + } + + @Test + public void testTimestampCorruptedDataOutOfRangeSubseconds() { + Timestamp millisType = Timestamp.MILLIS; + Schema baseSchema = millisType.getBaseType().getRowSchema(); + + // Create a corrupted row with subseconds > 999 for millis precision + Row corruptedRow = + Row.withSchema(baseSchema) + .addValue(100L) // seconds + .addValue((short) 1000) // subseconds + .build(); + + // Should throw when trying to convert back to Instant + assertThrows(IllegalArgumentException.class, () -> millisType.toInputType(corruptedRow)); + } + + @Test + public void testTimestampExtremeValues() { + Timestamp timestampType = Timestamp.MICROS; + int scalingFactor = 1000; // For micros + + // Round MAX/MIN to microsecond boundaries + Instant nearMin = Instant.MIN.plusSeconds(1000); + long nanos = (long) (nearMin.getNano() / scalingFactor) * scalingFactor; + nearMin = Instant.ofEpochSecond(nearMin.getEpochSecond(), nanos); + + Schema schema = Schema.builder().addLogicalTypeField("ts", timestampType).build(); + Row minRow = Row.withSchema(schema).addValue(nearMin).build(); + assertEquals(nearMin, minRow.getLogicalTypeValue(0, Instant.class)); + + // Same for MAX + Instant nearMax = Instant.MAX.minusSeconds(1000); + nanos = (long) (nearMax.getNano() / scalingFactor) * scalingFactor; + nearMax = Instant.ofEpochSecond(nearMax.getEpochSecond(), nanos); + + Row maxRow = Row.withSchema(schema).addValue(nearMax).build(); + assertEquals(nearMax, maxRow.getLogicalTypeValue(0, Instant.class)); + } }