diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java
new file mode 100644
index 000000000000..058331a44cf4
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.time.Instant;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * A timestamp represented with configurable precision.
+ *
+ *
This logical type stores timestamps as a Row with two fields:
+ *
+ *
+ * - seconds: INT64 - seconds since Unix epoch (can be negative)
+ *
- subseconds: INT16 or INT32 - always non-negative (0 to 10^precision - 1)
+ *
+ *
+ * The subseconds field is always non-negative, even for timestamps before the epoch. For
+ * example, -1.5 seconds is represented as {seconds: -2, subseconds: 500000} for microsecond
+ * precision. This matches Java's {@link java.time.Instant} internal representation.
+ *
+ *
Note for users converting from single-integer timestamp representations: If you have
+ * timestamps stored as a single long value (e.g., microseconds since epoch), you must handle
+ * negative modulo correctly when converting:
+ *
+ *
{@code
+ * long timestampMicros = -1_500_000;
+ * long seconds = timestampMicros / 1_000_000;
+ * long micros = timestampMicros % 1_000_000;
+ * if (micros < 0) {
+ * micros += 1_000_000;
+ * seconds -= 1;
+ * }
+ * Instant instant = Instant.ofEpochSecond(seconds, micros * 1000);
+ * }
+ */
+public class Timestamp implements Schema.LogicalType {
+ public static final String IDENTIFIER = "beam:logical_type:timestamp:v1";
+ static final int MIN_PRECISION = 0;
+ static final int MAX_PRECISION = 9;
+
+ private final int precision;
+ private final int scalingFactor;
+ private final Schema timestampSchema;
+
+ public static Timestamp of(int precision) {
+ return new Timestamp(precision);
+ }
+
+ public static final Timestamp MILLIS = Timestamp.of(3);
+ public static final Timestamp MICROS = Timestamp.of(6);
+ public static final Timestamp NANOS = Timestamp.of(9);
+
+ public Timestamp(int precision) {
+ checkArgument(
+ precision <= MAX_PRECISION && precision >= MIN_PRECISION,
+ "Timestamp precision must be between %s and %s (inclusive), but was %s.",
+ MIN_PRECISION,
+ MAX_PRECISION,
+ precision);
+ this.precision = precision;
+ this.scalingFactor = (int) Math.pow(10, MAX_PRECISION - precision);
+ if (precision < 5) {
+ this.timestampSchema =
+ Schema.builder().addInt64Field("seconds").addInt16Field("subseconds").build();
+ } else {
+ this.timestampSchema =
+ Schema.builder().addInt64Field("seconds").addInt32Field("subseconds").build();
+ }
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Schema.FieldType getArgumentType() {
+ return Schema.FieldType.INT32;
+ }
+
+ @Override
+ public Integer getArgument() {
+ return precision;
+ }
+
+ @Override
+ public Schema.FieldType getBaseType() {
+ return Schema.FieldType.row(timestampSchema);
+ }
+
+ @Override
+ public Row toBaseType(Instant input) {
+ // Avoid silent data loss
+ checkState(
+ input.getNano() % scalingFactor == 0,
+ "Timestamp logical type was configured with precision %s, but encountered "
+ + "a Java Instant with %s nanoseconds (not evenly divisible by scaling factor %s).",
+ precision,
+ input.getNano(),
+ scalingFactor);
+
+ int subseconds = input.getNano() / scalingFactor;
+
+ Row.Builder rowBuilder = Row.withSchema(timestampSchema).addValue(input.getEpochSecond());
+ if (precision < 5) {
+ rowBuilder.addValue((short) subseconds); // Explicitly add as short
+ } else {
+ rowBuilder.addValue(subseconds); // Add as int
+ }
+ return rowBuilder.build();
+ }
+
+ @Override
+ public Instant toInputType(@NonNull Row base) {
+ long subseconds =
+ (precision < 5)
+ ? checkArgumentNotNull(
+ base.getInt16(1),
+ "While trying to convert to Instant: Row missing subseconds field")
+ : checkArgumentNotNull(
+ base.getInt32(1),
+ "While trying to convert to Instant: Row missing subseconds field");
+
+ checkArgument(
+ subseconds >= 0,
+ "While trying to convert to Instant: subseconds field must be non-negative, "
+ + "but was %s. This likely indicates data corruption.",
+ subseconds);
+
+ int maxSubseconds = (int) (Math.pow(10, precision) - 1);
+ checkArgument(
+ subseconds <= maxSubseconds,
+ "While trying to convert to Instant: subseconds field must be <= %s for precision %s, "
+ + "but was %s. This likely indicates data corruption or precision mismatch.",
+ maxSubseconds,
+ precision,
+ subseconds);
+
+ return Instant.ofEpochSecond(
+ checkArgumentNotNull(
+ base.getInt64(0), "While trying to convert to Instant: Row missing seconds field"),
+ subseconds * scalingFactor);
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java
index e1590408021a..3c1e9029db71 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java
@@ -241,4 +241,271 @@ public void testVariableString() {
// check argument invalid case
assertThrows(IllegalArgumentException.class, () -> varibaleString.toInputType("123456"));
}
+
+ @Test
+ public void testTimestampMillis() {
+ Timestamp timestampType = Timestamp.MILLIS;
+ assertEquals(3, timestampType.getArgument().intValue());
+
+ // Positive timestamp with millisecond precision
+ Instant instant = Instant.ofEpochSecond(1609459200, 123_000_000); // 2021-01-01 00:00:00.123 UTC
+ Schema schema = Schema.builder().addLogicalTypeField("ts", timestampType).build();
+ Row row = Row.withSchema(schema).addValue(instant).build();
+
+ assertEquals(instant, row.getLogicalTypeValue(0, Instant.class));
+
+ // Check base type conversion
+ Row baseRow = row.getBaseValue(0, Row.class);
+ assertEquals(1609459200L, baseRow.getInt64("seconds").longValue());
+ assertEquals((short) 123, baseRow.getInt16("subseconds").shortValue());
+ }
+
+ @Test
+ public void testTimestampMicros() {
+ Timestamp timestampType = Timestamp.MICROS;
+ assertEquals(6, timestampType.getArgument().intValue());
+
+ // Positive timestamp with microsecond precision
+ Instant instant =
+ Instant.ofEpochSecond(1609459200, 123_456_000); // 2021-01-01 00:00:00.123456 UTC
+ Schema schema = Schema.builder().addLogicalTypeField("ts", timestampType).build();
+ Row row = Row.withSchema(schema).addValue(instant).build();
+
+ assertEquals(instant, row.getLogicalTypeValue(0, Instant.class));
+
+ // Check base type conversion uses INT32 for micros
+ Row baseRow = row.getBaseValue(0, Row.class);
+ assertEquals(1609459200L, baseRow.getInt64("seconds").longValue());
+ assertEquals(123_456, baseRow.getInt32("subseconds").intValue());
+ }
+
+ @Test
+ public void testTimestampNanos() {
+ Timestamp timestampType = Timestamp.NANOS;
+ assertEquals(9, timestampType.getArgument().intValue());
+
+ // Positive timestamp with nanosecond precision
+ Instant instant =
+ Instant.ofEpochSecond(1609459200, 123_456_789); // 2021-01-01 00:00:00.123456789 UTC
+ Schema schema = Schema.builder().addLogicalTypeField("ts", timestampType).build();
+ Row row = Row.withSchema(schema).addValue(instant).build();
+
+ assertEquals(instant, row.getLogicalTypeValue(0, Instant.class));
+
+ // Check base type conversion uses INT32 for nanos
+ Row baseRow = row.getBaseValue(0, Row.class);
+ assertEquals(1609459200L, baseRow.getInt64("seconds").longValue());
+ assertEquals(123_456_789, baseRow.getInt32("subseconds").intValue());
+ }
+
+ @Test
+ public void testTimestampNegative() {
+ Timestamp timestampType = Timestamp.MICROS;
+
+ // Negative timestamp: -1.5 seconds before epoch
+ // Should be represented as {seconds: -2, subseconds: 500000}
+ Instant instant = Instant.ofEpochSecond(-2, 500_000_000);
+ Schema schema = Schema.builder().addLogicalTypeField("ts", timestampType).build();
+ Row row = Row.withSchema(schema).addValue(instant).build();
+
+ assertEquals(instant, row.getLogicalTypeValue(0, Instant.class));
+
+ // Verify the internal representation
+ Row baseRow = row.getBaseValue(0, Row.class);
+ assertEquals(-2L, baseRow.getInt64("seconds").longValue());
+ assertEquals(500_000, baseRow.getInt32("subseconds").intValue());
+ }
+
+ @Test
+ public void testTimestampZero() {
+ Timestamp timestampType = Timestamp.MICROS;
+
+ // Epoch timestamp
+ Instant instant = Instant.ofEpochSecond(0, 0);
+ Schema schema = Schema.builder().addLogicalTypeField("ts", timestampType).build();
+ Row row = Row.withSchema(schema).addValue(instant).build();
+
+ assertEquals(instant, row.getLogicalTypeValue(0, Instant.class));
+
+ Row baseRow = row.getBaseValue(0, Row.class);
+ assertEquals(0L, baseRow.getInt64("seconds").longValue());
+ assertEquals(0, baseRow.getInt32("subseconds").intValue());
+ }
+
+ @Test
+ public void testTimestampPrecisionBoundary() {
+ // Test the boundary between INT16 and INT32 representation
+ Timestamp precision4 = Timestamp.of(4);
+ Timestamp precision5 = Timestamp.of(5);
+
+ // Precision 4 should use INT16
+ Instant instant4 = Instant.ofEpochSecond(100, 999_900_000);
+ Schema schema4 = Schema.builder().addLogicalTypeField("ts", precision4).build();
+ Row row4 = Row.withSchema(schema4).addValue(instant4).build();
+ Row baseRow4 = row4.getBaseValue(0, Row.class);
+ assertEquals((short) 999_9, baseRow4.getInt16("subseconds").shortValue());
+
+ // Precision 5 should use INT32
+ Instant instant5 = Instant.ofEpochSecond(100, 999_990_000);
+ Schema schema5 = Schema.builder().addLogicalTypeField("ts", precision5).build();
+ Row row5 = Row.withSchema(schema5).addValue(instant5).build();
+ Row baseRow5 = row5.getBaseValue(0, Row.class);
+ assertEquals(999_99, baseRow5.getInt32("subseconds").intValue());
+ }
+
+ @Test
+ public void testTimestampDataLossDetection() {
+ Timestamp millisType = Timestamp.MILLIS;
+
+ // Try to store microsecond-precision instant in millis logical type
+ Instant instant = Instant.ofEpochSecond(100, 123_456_000); // Has microseconds
+ Schema schema = Schema.builder().addLogicalTypeField("ts", millisType).build();
+
+ // Should throw because 123_456_000 nanos is not divisible by 1_000_000
+ assertThrows(
+ IllegalStateException.class, () -> Row.withSchema(schema).addValue(instant).build());
+ }
+
+ @Test
+ public void testTimestampDataLossDetectionNanos() {
+ Timestamp microsType = Timestamp.MICROS;
+
+ // Try to store nanosecond-precision instant in micros logical type
+ Instant instant = Instant.ofEpochSecond(100, 123_456_789); // Has nanoseconds
+ Schema schema = Schema.builder().addLogicalTypeField("ts", microsType).build();
+
+ // Should throw because 123_456_789 nanos is not divisible by 1_000
+ assertThrows(
+ IllegalStateException.class, () -> Row.withSchema(schema).addValue(instant).build());
+ }
+
+ @Test
+ public void testTimestampInvalidPrecision() {
+ assertThrows(IllegalArgumentException.class, () -> Timestamp.of(-1));
+ assertThrows(IllegalArgumentException.class, () -> Timestamp.of(10));
+ }
+
+ @Test
+ public void testTimestampRoundTrip() {
+ // Test that we can round-trip through base type for all precisions
+ for (int precision = 0; precision <= 9; precision++) {
+ Timestamp timestampType = Timestamp.of(precision);
+
+ long nanos = 123_456_789;
+ int scalingFactor = (int) Math.pow(10, 9 - precision);
+ nanos = (nanos / scalingFactor) * scalingFactor;
+
+ Instant original = Instant.ofEpochSecond(1609459200, nanos);
+
+ Row baseRow = timestampType.toBaseType(original);
+ Instant roundTripped = timestampType.toInputType(baseRow);
+
+ assertEquals(original, roundTripped);
+ }
+ }
+
+ @Test
+ public void testTimestampNegativeRoundTrip() {
+ Timestamp timestampType = Timestamp.MICROS;
+
+ Instant original = Instant.ofEpochSecond(-100, 500_000_000);
+ Row baseRow = timestampType.toBaseType(original);
+ Instant roundTripped = timestampType.toInputType(baseRow);
+
+ assertEquals(original, roundTripped);
+
+ assertEquals(-100L, baseRow.getInt64("seconds").longValue());
+ assertEquals(500_000, baseRow.getInt32("subseconds").intValue());
+ }
+
+ @Test
+ public void testTimestampArgumentType() {
+ Timestamp timestampType = Timestamp.MICROS;
+
+ // Check argument type is INT32
+ assertEquals(FieldType.INT32, timestampType.getArgumentType());
+
+ // Check argument value
+ assertEquals(Integer.valueOf(6), timestampType.getArgument());
+ }
+
+ @Test
+ public void testTimestampBaseTypeStructure() {
+ Timestamp millisType = Timestamp.MILLIS;
+ Timestamp microsType = Timestamp.MICROS;
+
+ // Check base type is a row schema
+ assertEquals(Schema.TypeName.ROW, millisType.getBaseType().getTypeName());
+ assertEquals(Schema.TypeName.ROW, microsType.getBaseType().getTypeName());
+
+ // Check millis uses INT16 for subseconds (precision < 5)
+ Schema millisSchema = millisType.getBaseType().getRowSchema();
+ assertEquals(2, millisSchema.getFieldCount());
+ assertEquals("seconds", millisSchema.getField(0).getName());
+ assertEquals(FieldType.INT64, millisSchema.getField(0).getType());
+ assertEquals("subseconds", millisSchema.getField(1).getName());
+ assertEquals(FieldType.INT16, millisSchema.getField(1).getType());
+
+ // Check micros uses INT32 for subseconds (precision >= 5)
+ Schema microsSchema = microsType.getBaseType().getRowSchema();
+ assertEquals(2, microsSchema.getFieldCount());
+ assertEquals("seconds", microsSchema.getField(0).getName());
+ assertEquals(FieldType.INT64, microsSchema.getField(0).getType());
+ assertEquals("subseconds", microsSchema.getField(1).getName());
+ assertEquals(FieldType.INT32, microsSchema.getField(1).getType());
+ }
+
+ @Test
+ public void testTimestampCorruptedDataNegativeSubseconds() {
+ Timestamp timestampType = Timestamp.MICROS;
+ Schema baseSchema = timestampType.getBaseType().getRowSchema();
+
+ // Create a corrupted row with negative subseconds
+ Row corruptedRow =
+ Row.withSchema(baseSchema)
+ .addValue(-1L) // seconds
+ .addValue(-500_000) // subseconds
+ .build();
+
+ assertThrows(IllegalArgumentException.class, () -> timestampType.toInputType(corruptedRow));
+ }
+
+ @Test
+ public void testTimestampCorruptedDataOutOfRangeSubseconds() {
+ Timestamp millisType = Timestamp.MILLIS;
+ Schema baseSchema = millisType.getBaseType().getRowSchema();
+
+ // Create a corrupted row with subseconds > 999 for millis precision
+ Row corruptedRow =
+ Row.withSchema(baseSchema)
+ .addValue(100L) // seconds
+ .addValue((short) 1000) // subseconds
+ .build();
+
+ // Should throw when trying to convert back to Instant
+ assertThrows(IllegalArgumentException.class, () -> millisType.toInputType(corruptedRow));
+ }
+
+ @Test
+ public void testTimestampExtremeValues() {
+ Timestamp timestampType = Timestamp.MICROS;
+ int scalingFactor = 1000; // For micros
+
+ // Round MAX/MIN to microsecond boundaries
+ Instant nearMin = Instant.MIN.plusSeconds(1000);
+ long nanos = (long) (nearMin.getNano() / scalingFactor) * scalingFactor;
+ nearMin = Instant.ofEpochSecond(nearMin.getEpochSecond(), nanos);
+
+ Schema schema = Schema.builder().addLogicalTypeField("ts", timestampType).build();
+ Row minRow = Row.withSchema(schema).addValue(nearMin).build();
+ assertEquals(nearMin, minRow.getLogicalTypeValue(0, Instant.class));
+
+ // Same for MAX
+ Instant nearMax = Instant.MAX.minusSeconds(1000);
+ nanos = (long) (nearMax.getNano() / scalingFactor) * scalingFactor;
+ nearMax = Instant.ofEpochSecond(nearMax.getEpochSecond(), nanos);
+
+ Row maxRow = Row.withSchema(schema).addValue(nearMax).build();
+ assertEquals(nearMax, maxRow.getLogicalTypeValue(0, Instant.class));
+ }
}