Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,50 @@ public class PostgresCollection implements Collection {
private final PostgresSubDocumentUpdater subDocUpdater;
private final PostgresQueryExecutor queryExecutor;
private final UpdateValidator updateValidator;
private final PostgresColumnRegistry columnRegistry;

public PostgresCollection(final PostgresClient client, final String collectionName) {
this(client, PostgresTableIdentifier.parse(collectionName));
this(
client,
PostgresTableIdentifier.parse(collectionName),
new PostgresColumnRegistryFallback());
}

public PostgresCollection(
final PostgresClient client,
final String collectionName,
final PostgresColumnRegistry columnRegistry) {
this(client, PostgresTableIdentifier.parse(collectionName), columnRegistry);
}

PostgresCollection(final PostgresClient client, final PostgresTableIdentifier tableIdentifier) {
this(client, tableIdentifier, new PostgresColumnRegistryFallback());
}

PostgresCollection(
final PostgresClient client,
final PostgresTableIdentifier tableIdentifier,
final PostgresColumnRegistry columnRegistry) {
this.client = client;
this.tableIdentifier = tableIdentifier;
this.columnRegistry =
columnRegistry != null ? columnRegistry : new PostgresColumnRegistryFallback();
this.subDocUpdater =
new PostgresSubDocumentUpdater(new PostgresQueryBuilder(this.tableIdentifier));
new PostgresSubDocumentUpdater(
new PostgresQueryBuilder(this.tableIdentifier, this.columnRegistry));
this.queryExecutor = new PostgresQueryExecutor(this.tableIdentifier);
this.updateValidator = new CommonUpdateValidator();
}

/**
* Gets the column registry for this collection.
*
* @return the PostgresColumnRegistry instance
*/
public PostgresColumnRegistry getColumnRegistry() {
return columnRegistry;
}

@Override
public boolean upsert(Key key, Document document) throws IOException {
try (PreparedStatement preparedStatement =
Expand Down Expand Up @@ -488,15 +518,16 @@ private CloseableIterator<Document> search(Query query, boolean removeDocumentId
@Override
public CloseableIterator<Document> find(
final org.hypertrace.core.documentstore.query.Query query) {
return queryExecutor.execute(client.getConnection(), query);
return queryExecutor.execute(client.getConnection(), query, null, columnRegistry);
}

@Override
public CloseableIterator<Document> query(
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) {
String flatStructureCollectionName =
client.getCustomParameters().get(FLAT_STRUCTURE_COLLECTION_KEY);
return queryExecutor.execute(client.getConnection(), query, flatStructureCollectionName);
return queryExecutor.execute(
client.getConnection(), query, flatStructureCollectionName, columnRegistry);
}

@Override
Expand Down Expand Up @@ -545,7 +576,7 @@ public Optional<Document> update(
.build();

try (final CloseableIterator<Document> iterator =
queryExecutor.execute(connection, findByIdQuery)) {
queryExecutor.execute(connection, findByIdQuery, null, columnRegistry)) {
returnDocument = getFirstDocument(iterator).orElseThrow();
}
} else if (updateOptions.getReturnDocumentType() == NONE) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.hypertrace.core.documentstore.postgres;

import java.util.Optional;
import java.util.Set;

/**
* Registry for PostgreSQL column metadata that provides type-aware query generation support.
*
* <p>This interface replaces hardcoded column checks with dynamic database metadata lookups,
* enabling support for typed columns (String, Long, Double, Boolean, TextArray) while maintaining
* JSONB fallback for complex types.
*/
public interface PostgresColumnRegistry {

/**
* Determines if a field should be treated as a first-class typed column rather than a JSONB
* document field.
*
* @param fieldName the field name to check
* @return true if the field has a first-class column mapping, false if it should use JSONB
* processing
*/
boolean isFirstClassColumn(String fieldName);

/**
* Gets the PostgreSQL data type for a field.
*
* @param fieldName the field name to look up
* @return Optional containing the data type if mapped, empty if not found
*/
Optional<PostgresDataType> getColumnDataType(String fieldName);

/**
* Gets all field names that have first-class column mappings.
*
* @return set of all first-class column field names
*/
Set<String> getAllFirstClassColumns();

/**
* Checks if the registry supports a specific PostgreSQL data type.
*
* @param dataType the data type to check
* @return true if the data type is supported for first-class column processing
*/
boolean supportsDataType(PostgresDataType dataType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.hypertrace.core.documentstore.postgres;

import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.core.documentstore.postgres.utils.PostgresUtils;

/**
* Fallback implementation of PostgresColumnRegistry that maintains original hardcoded behavior when
* database metadata queries fail.
*
* <p>This ensures system continues to function even if registry creation encounters errors, falling
* back to the original OUTER_COLUMNS approach.
*/
@Slf4j
public class PostgresColumnRegistryFallback implements PostgresColumnRegistry {

public PostgresColumnRegistryFallback() {
log.debug("Created PostgresColumnRegistryFallback using hardcoded OUTER_COLUMNS");
}

@Override
public boolean isFirstClassColumn(String fieldName) {
// Fall back to original hardcoded behavior
return fieldName != null && PostgresUtils.OUTER_COLUMNS.contains(fieldName);
}

@Override
public Optional<PostgresDataType> getColumnDataType(String fieldName) {
// Map the hardcoded columns to their known types
if (fieldName == null) {
return Optional.empty();
}

switch (fieldName) {
case "id":
return Optional.of(PostgresDataType.TEXT);
case "created_at":
case "updated_at":
// These are timestamp types, but we'll map them as first-class for compatibility
return Optional.of(PostgresDataType.TEXT); // Can be enhanced to support timestamp later
default:
return Optional.empty();
}
}

@Override
public Set<String> getAllFirstClassColumns() {
return Set.copyOf(PostgresUtils.OUTER_COLUMNS);
}

@Override
public boolean supportsDataType(PostgresDataType dataType) {
return dataType != null && dataType.isFirstClassType();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package org.hypertrace.core.documentstore.postgres;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.core.documentstore.postgres.utils.PostgresUtils;

/**
* Implementation of PostgresColumnRegistry that queries database metadata to build dynamic
* field-to-datatype mappings.
*
* <p>This implementation replaces hardcoded OUTER_COLUMNS with dynamic database metadata discovery,
* enabling support for typed columns while maintaining JSONB fallback compatibility.
*/
@Slf4j
public class PostgresColumnRegistryImpl implements PostgresColumnRegistry {

private final Map<String, PostgresDataType> columnTypes;
private final String tableName;

/**
* Creates a new registry by querying database metadata for the specified table.
*
* @param connection database connection for metadata queries
* @param tableName the table name to analyze
*/
public PostgresColumnRegistryImpl(Connection connection, String tableName) {
this.tableName = tableName;
this.columnTypes = buildColumnMappings(connection, tableName);

log.debug(
"Created PostgresColumnRegistry for table '{}' with {} mapped columns: {}",
tableName,
columnTypes.size(),
columnTypes.keySet());
}

@Override
public boolean isFirstClassColumn(String fieldName) {
if (fieldName == null) {
return false;
}

// Check dynamic registry mappings first
if (columnTypes.containsKey(fieldName)) {
PostgresDataType type = columnTypes.get(fieldName);
return type.isFirstClassType();
}

// Fallback to original hardcoded columns for compatibility
// This ensures existing functionality continues to work during migration
return PostgresUtils.OUTER_COLUMNS.contains(fieldName);
}

@Override
public Optional<PostgresDataType> getColumnDataType(String fieldName) {
return Optional.ofNullable(columnTypes.get(fieldName));
}

@Override
public Set<String> getAllFirstClassColumns() {
return columnTypes.entrySet().stream()
.filter(entry -> entry.getValue().isFirstClassType())
.map(Map.Entry::getKey)
.collect(java.util.stream.Collectors.toSet());
}

@Override
public boolean supportsDataType(PostgresDataType dataType) {
return dataType != null && dataType.isFirstClassType();
}

/**
* Queries database metadata to build field-to-datatype mappings.
*
* @param connection database connection
* @param table table name to analyze
* @return map of field names to PostgreSQL data types
*/
private Map<String, PostgresDataType> buildColumnMappings(Connection connection, String table) {
Map<String, PostgresDataType> mappings = new HashMap<>();

String sql =
"SELECT column_name, data_type, udt_name "
+ "FROM information_schema.columns "
+ "WHERE table_name = ? "
+ "AND table_schema = current_schema() "
+ "ORDER BY ordinal_position";

try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, table);

try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
String columnName = rs.getString("column_name");
String dataType = rs.getString("data_type");
String udtName = rs.getString("udt_name");

// Use udt_name for array types (e.g., "_text"), data_type for others
String typeToMap = "ARRAY".equals(dataType) ? udtName : dataType;
PostgresDataType postgresType = PostgresDataType.fromPostgresTypeName(typeToMap);

mappings.put(columnName, postgresType);

log.debug("Mapped column '{}' with type '{}' -> {}", columnName, typeToMap, postgresType);
}
}

} catch (SQLException e) {
log.warn("Failed to query column metadata for table '{}': {}", table, e.getMessage());
log.debug("SQLException details", e);
// Return empty map on error - will fall back to JSONB processing
}

return mappings;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.hypertrace.core.documentstore.postgres;

import java.util.Arrays;
import java.util.Set;

/**
* Enumeration of PostgreSQL data types supported for first-class column processing.
*
* <p>Maps PostgreSQL native type names to our supported data types, enabling dynamic type-based
* query generation and parser selection.
*/
public enum PostgresDataType {

/** Text/string types - maps to String in Java */
TEXT("text", "varchar"),

/** Big integer type - maps to Long in Java */
BIGINT("bigint", "int8"),

/** Double precision floating point - maps to Double in Java */
DOUBLE_PRECISION("double precision", "float8"),

/** Boolean type - maps to Boolean in Java */
BOOLEAN("boolean", "bool"),

/** Text array type - maps to TextArray/String[] in Java */
TEXT_ARRAY("_text"),

/** Additional integer type for future support */
INTEGER("integer", "int4"),

/** JSONB type - fallback for complex types (excluded from first-class processing) */
JSONB("jsonb");

private final Set<String> postgresTypeNames;

PostgresDataType(String... typeNames) {
this.postgresTypeNames = Set.of(typeNames);
}

/**
* Maps a PostgreSQL type name to our enum value.
*
* @param postgresTypeName the PostgreSQL type name from database metadata
* @return the corresponding enum value, or JSONB as fallback
*/
public static PostgresDataType fromPostgresTypeName(String postgresTypeName) {
if (postgresTypeName == null) {
return JSONB;
}

return Arrays.stream(values())
.filter(type -> type.postgresTypeNames.contains(postgresTypeName.toLowerCase()))
.findFirst()
.orElse(JSONB);
}

/**
* Checks if this data type should be treated as a first-class column.
*
* @return true if this type supports first-class column processing
*/
public boolean isFirstClassType() {
return this != JSONB;
}

/**
* Gets the PostgreSQL type names that map to this enum value.
*
* @return set of PostgreSQL type names
*/
public Set<String> getPostgresTypeNames() {
return postgresTypeNames;
}
}
Loading