From b42a42871205a705cb1ede26be699a879e1413f7 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Tue, 29 Apr 2025 11:31:06 +0530 Subject: [PATCH 1/5] PHOENIX-7593: Enable CompactionScanner for flushes --- .../coprocessor/CompactionScanner.java | 34 ++++++++++++++--- .../UngroupedAggregateRegionObserver.java | 23 +++++++++++ .../end2end/MaxLookbackExtendedIT.java | 38 +++++++++++++++++++ .../apache/phoenix/end2end/TableTTLIT.java | 10 +++-- 4 files changed, 95 insertions(+), 10 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 232a4a20c25..e619af806cd 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -65,6 +65,7 @@ import org.apache.phoenix.filter.RowKeyComparisonFilter.RowKeyTuple; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixPreparedStatement; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.CompiledConditionalTTLExpression; @@ -144,13 +145,13 @@ public class CompactionScanner implements InternalScanner { private final long maxLookbackInMillis; private int minVersion; private int maxVersion; - private final boolean emptyCFStore; + private boolean emptyCFStore; private final boolean localIndex; private final int familyCount; private KeepDeletedCells keepDeletedCells; private long compactionTime; - private final byte[] emptyCF; - private final byte[] emptyCQ; + private byte[] emptyCF; + private byte[] emptyCQ; private final byte[] storeColumnFamily; private final String tableName; private final String columnFamilyName; @@ -180,13 +181,13 @@ public CompactionScanner(RegionCoprocessorEnvironment env, // Empty column family and qualifier are always needed to compute which all empty cells to retain // even during minor compactions. If required empty cells are not retained during // minor compactions then we can run into the risk of partial row expiry on next major compaction. - this.emptyCF = SchemaUtil.getEmptyColumnFamily(table); - this.emptyCQ = SchemaUtil.getEmptyColumnQualifier(table); + this.emptyCF = table != null ? SchemaUtil.getEmptyColumnFamily(table) : EMPTY_BYTE_ARRAY; + this.emptyCQ = table != null ? SchemaUtil.getEmptyColumnQualifier(table) : EMPTY_BYTE_ARRAY; compactionTime = EnvironmentEdgeManager.currentTimeMillis(); columnFamilyName = store.getColumnFamilyName(); storeColumnFamily = columnFamilyName.getBytes(); tableName = region.getRegionInfo().getTable().getNameAsString(); - String dataTableName = table.getName().toString(); + String dataTableName = table != null ? table.getName().toString() : ""; Long overriddenMaxLookback = maxLookbackMap.get(tableName + SEPARATOR + columnFamilyName); this.maxLookbackInMillis = overriddenMaxLookback == null ? maxLookbackAgeInMillis : Math.max(maxLookbackAgeInMillis, overriddenMaxLookback); @@ -416,11 +417,32 @@ private void postProcessForConditionalTTL(List result) { } } + private void determineEmptyCfCq(List result) { + for (Cell cell : result) { + emptyCF = CellUtil.cloneFamily(cell); + if(ScanUtil.isEmptyColumn(cell, emptyCF, QueryConstants.EMPTY_COLUMN_BYTES)) { + emptyCQ = QueryConstants.EMPTY_COLUMN_BYTES; + emptyCFStore = true; + break; + } //Empty column is always encoded in FOUR_BYTE format, since it's a reserved qualifier. See EncodedColumnsUtil#isReservedColumnQualifier + else if(ScanUtil.isEmptyColumn(cell, emptyCF, QueryConstants.ENCODED_EMPTY_COLUMN_BYTES)) { + emptyCQ = QueryConstants.ENCODED_EMPTY_COLUMN_BYTES; + emptyCFStore = true; + break; + } + } + } + @Override public boolean next(List result) throws IOException { boolean hasMore = storeScanner.next(result); inputCellCount += result.size(); if (!result.isEmpty()) { + // This will happen only during flushes as then we don't pass PTable object + // to determine emptyCF and emptyCQ + if (emptyCQ == EMPTY_BYTE_ARRAY) { + determineEmptyCfCq(result); + } // This is for debugging // printRow(result, "Input for " + tableName + " " + columnFamilyName, true, false); phoenixLevelRowCompactor.compact(result, false); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index b3314752a2a..405bcb3ee1f 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.Region; @@ -611,6 +612,28 @@ private boolean areMutationsInSameTable(Table targetHTable, Region region) { region.getTableDescriptor().getTableName().getName()) == 0); } + @Override + public InternalScanner preFlush(ObserverContext c, Store store, + InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { + if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) { + return scanner; + } else { + return User.runAsLoginUser(new PrivilegedExceptionAction() { + @Override public InternalScanner run() throws Exception { + String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable() + .getNameAsString(); + Configuration conf = c.getEnvironment().getConfiguration(); + long maxLookbackInMillis = + BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf); + maxLookbackInMillis = CompactionScanner.getMaxLookbackInMillis(tableName, + store.getColumnFamilyName(), maxLookbackInMillis); + return new CompactionScanner(c.getEnvironment(), store, scanner, + maxLookbackInMillis, false, true, null); + } + }); + } + } + @Override public InternalScanner preCompact(ObserverContext c, Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java index 987d631bfe3..f6a65444004 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java @@ -610,6 +610,44 @@ public void testRetainingLastRowVersion() throws Exception { } } + @Test(timeout=60000) + public void testRetainingLastRowVersionForFlushes() throws Exception { + try(Connection conn = DriverManager.getConnection(getUrl())) { + String tableName = generateUniqueName(); + createTable(tableName); + long timeIntervalBetweenTwoUpserts = (ttl / 2) + 1; + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + TableName dataTableName = TableName.valueOf(tableName); + injectEdge.incrementValue(1); + Statement stmt = conn.createStatement(); + stmt.execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000); + stmt.execute("upsert into " + tableName + " values ('a', 'ab1')"); + conn.commit(); + injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000); + stmt.execute("upsert into " + tableName + " values ('a', 'ab2')"); + conn.commit(); + injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000); + stmt.execute("upsert into " + tableName + " values ('a', 'ab3')"); + conn.commit(); + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000); + TestUtil.dumpTable(conn, dataTableName); + TestUtil.flush(utility, dataTableName); + injectEdge.incrementValue(1); + TestUtil.dumpTable(conn, dataTableName); + majorCompact(dataTableName); + injectEdge.incrementValue(1); + TestUtil.dumpTable(conn, dataTableName); + ResultSet rs = stmt.executeQuery("select * from " + dataTableName + " where id = 'a'"); + while(rs.next()) { + assertNotNull(rs.getString(3)); + assertNotNull(rs.getString(4)); + } + } + } + private void flush(TableName table) throws IOException { Admin admin = getUtility().getAdmin(); admin.flush(table); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java index 1d68ce776c7..1a27ee9c90c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java @@ -273,10 +273,12 @@ public void testMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled() Thread.sleep(1); } flush(TableName.valueOf(tableName)); - // Flushes dump and retain all the cells to HFile. - // Doing MAX_COLUMN_INDEX + 1 to account for empty cells - assertEquals(TestUtil.getRawCellCount(conn, TableName.valueOf(tableName), row), - rowUpdateCounter * (MAX_COLUMN_INDEX + 1)); + // At every flush, extra cell versions should be removed. + // MAX_COLUMN_INDEX table columns and one empty column will be retained for + // each row version. + int rawCellCount = TestUtil.getRawCellCount( + conn, TableName.valueOf(tableName), row); + assertEquals((i + 1) * (MAX_COLUMN_INDEX + 1) * versions, rawCellCount); } // Run one minor compaction (in case no minor compaction has happened yet) TestUtil.minorCompact(utility, TableName.valueOf(tableName)); From 2bfcc8ef2ad6fe319f11a928730b406762ebc865 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Wed, 30 Apr 2025 12:35:12 +0530 Subject: [PATCH 2/5] Address PR comments --- .../coprocessor/CompactionScanner.java | 32 +++++++++++++------ .../end2end/MaxLookbackExtendedIT.java | 29 ++++++++++++++--- .../apache/phoenix/end2end/TableTTLIT.java | 30 ++++++++++++----- 3 files changed, 70 insertions(+), 21 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index e619af806cd..63d79424b35 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -163,6 +163,7 @@ public class CompactionScanner implements InternalScanner { private long outputCellCount = 0; private boolean phoenixLevelOnly = false; private boolean isCDCIndex; + private boolean isEmptyCfCqInitialized; // Only for forcing minor compaction while testing private static boolean forceMinorCompaction = false; @@ -181,8 +182,10 @@ public CompactionScanner(RegionCoprocessorEnvironment env, // Empty column family and qualifier are always needed to compute which all empty cells to retain // even during minor compactions. If required empty cells are not retained during // minor compactions then we can run into the risk of partial row expiry on next major compaction. - this.emptyCF = table != null ? SchemaUtil.getEmptyColumnFamily(table) : EMPTY_BYTE_ARRAY; - this.emptyCQ = table != null ? SchemaUtil.getEmptyColumnQualifier(table) : EMPTY_BYTE_ARRAY; + this.emptyCF = table != null ? SchemaUtil.getEmptyColumnFamily(table) : null; + this.emptyCQ = table != null ? SchemaUtil.getEmptyColumnQualifier(table) : null; + // PTable will be null only for flushes + this.isEmptyCfCqInitialized = table != null; compactionTime = EnvironmentEdgeManager.currentTimeMillis(); columnFamilyName = store.getColumnFamilyName(); storeColumnFamily = columnFamilyName.getBytes(); @@ -418,19 +421,29 @@ private void postProcessForConditionalTTL(List result) { } private void determineEmptyCfCq(List result) { + // This should be called only per instance + assert ! isEmptyCfCqInitialized; + byte[] emptyCF = null; for (Cell cell : result) { emptyCF = CellUtil.cloneFamily(cell); - if(ScanUtil.isEmptyColumn(cell, emptyCF, QueryConstants.EMPTY_COLUMN_BYTES)) { + if (ScanUtil.isEmptyColumn(cell, emptyCF, QueryConstants.EMPTY_COLUMN_BYTES)) { emptyCQ = QueryConstants.EMPTY_COLUMN_BYTES; - emptyCFStore = true; break; - } //Empty column is always encoded in FOUR_BYTE format, since it's a reserved qualifier. See EncodedColumnsUtil#isReservedColumnQualifier - else if(ScanUtil.isEmptyColumn(cell, emptyCF, QueryConstants.ENCODED_EMPTY_COLUMN_BYTES)) { + } + else if (ScanUtil.isEmptyColumn(cell, emptyCF, + QueryConstants.ENCODED_EMPTY_COLUMN_BYTES)) { + //Empty column is always encoded in FOUR_BYTE format, since it's a reserved + // qualifier. See EncodedColumnsUtil#isReservedColumnQualifier. emptyCQ = QueryConstants.ENCODED_EMPTY_COLUMN_BYTES; - emptyCFStore = true; break; } } + if (emptyCQ == QueryConstants.EMPTY_COLUMN_BYTES + || emptyCQ == QueryConstants.ENCODED_EMPTY_COLUMN_BYTES) { + this.emptyCF = emptyCF; + this.emptyCFStore = true; + } + this.isEmptyCfCqInitialized = true; } @Override @@ -440,7 +453,7 @@ public boolean next(List result) throws IOException { if (!result.isEmpty()) { // This will happen only during flushes as then we don't pass PTable object // to determine emptyCF and emptyCQ - if (emptyCQ == EMPTY_BYTE_ARRAY) { + if (!this.isEmptyCfCqInitialized) { determineEmptyCfCq(result); } // This is for debugging @@ -2481,7 +2494,8 @@ private void getLastRowVersionInMaxLookbackWindow(List result, } if (cell.getType() == Cell.Type.Put) { lastRowVersion.add(cell); - if (ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) { + if (emptyCF != null && emptyCQ != null + && ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) { index = addEmptyColumn(result, currentColumnCell, index, emptyColumn); } else { index = skipColumn(result, currentColumnCell, retainedCells, index); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java index f6a65444004..42a25119e5a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java @@ -604,8 +604,8 @@ public void testRetainingLastRowVersion() throws Exception { TestUtil.dumpTable(conn, dataTableName); ResultSet rs = stmt.executeQuery("select * from " + dataTableName + " where id = 'a'"); while(rs.next()) { - assertNotNull(rs.getString(3)); - assertNotNull(rs.getString(4)); + assertEquals("abc", rs.getString(3)); + assertEquals("abcd", rs.getString(4)); } } } @@ -633,17 +633,38 @@ public void testRetainingLastRowVersionForFlushes() throws Exception { stmt.execute("upsert into " + tableName + " values ('a', 'ab3')"); conn.commit(); injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000); + TestUtil.dumpTable(conn, dataTableName); + byte[] rowKey = Bytes.toBytes("a"); + int rawCellCount = TestUtil.getRawCellCount(conn, dataTableName, rowKey); + // 6 non-empty cells (ab3, ab2, ab1, ab, abc, abcd) + 4 empty cells (for 4 upserts) + assertEquals(10, rawCellCount); + TestUtil.flush(utility, dataTableName); injectEdge.incrementValue(1); + rawCellCount = TestUtil.getRawCellCount(conn, dataTableName, rowKey); + // 1 non-empty cell (ab3) and 1 empty cell in max lookback window are + // immediately retained. + // 3 non-empty cells outside max lookback window will be retained (ab2, abc, abcd) + // 3 (for multi-CF)/ 2 (single-CF) empty cells will be retained outside + // max lookback window + assertEquals(multiCF ? 8 : 7, rawCellCount); TestUtil.dumpTable(conn, dataTableName); + majorCompact(dataTableName); injectEdge.incrementValue(1); + rawCellCount = TestUtil.getRawCellCount(conn, dataTableName, rowKey); + // 1 non-empty cell (ab3) and 1 empty cell at the edge of max lookback window will be + // retained + // 2 non-empty cells outside max lookback window will be retained (abc, abcd) + // 2 empty cells will be retained outside max lookback window + assertEquals(6, rawCellCount); TestUtil.dumpTable(conn, dataTableName); + ResultSet rs = stmt.executeQuery("select * from " + dataTableName + " where id = 'a'"); while(rs.next()) { - assertNotNull(rs.getString(3)); - assertNotNull(rs.getString(4)); + assertEquals("abc", rs.getString(3)); + assertEquals("abcd", rs.getString(4)); } } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java index 1a27ee9c90c..9165dbc087a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java @@ -128,8 +128,8 @@ public static synchronized Collection data() { { false, false, KeepDeletedCells.FALSE, 1, 100, null}, { false, false, KeepDeletedCells.TRUE, 5, 50, null}, { false, false, KeepDeletedCells.TTL, 1, 25, null}, - { true, false, KeepDeletedCells.FALSE, 5, 50, null}, - { true, false, KeepDeletedCells.TRUE, 1, 25, null}, + { true, false, KeepDeletedCells.FALSE, 5, 50, 0}, + { true, false, KeepDeletedCells.TRUE, 1, 25, 0}, { true, false, KeepDeletedCells.TTL, 5, 100, null}, { false, false, KeepDeletedCells.FALSE, 1, 100, 0}, { false, false, KeepDeletedCells.TRUE, 5, 50, 0}, @@ -246,7 +246,7 @@ public void testMaskingAndMajorCompaction() throws Exception { } @Test - public void testMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled() + public void testMinorCompactionAndFlushesShouldNotRetainCellsWhenMaxLookbackIsDisabled() throws Exception { if (tableLevelMaxLookback == null || tableLevelMaxLookback != 0) { return; @@ -274,16 +274,30 @@ public void testMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled() } flush(TableName.valueOf(tableName)); // At every flush, extra cell versions should be removed. - // MAX_COLUMN_INDEX table columns and one empty column will be retained for - // each row version. + // MAX_COLUMN_INDEX table columns will be retained for each row version. + int expectedMaxRawCellCount; + if (multiCF) { + // All empty cells are retained for multiCF tables for flushes and minor + // compactions + expectedMaxRawCellCount = + ((i + 1) * MAX_COLUMN_INDEX * versions) + rowUpdateCounter; + } + else { + // Only empty column is retained for each row version + expectedMaxRawCellCount = (i + 1) * (MAX_COLUMN_INDEX + 1) * versions; + } int rawCellCount = TestUtil.getRawCellCount( conn, TableName.valueOf(tableName), row); - assertEquals((i + 1) * (MAX_COLUMN_INDEX + 1) * versions, rawCellCount); + // Need inequality check here as a minor compaction could have happened + assertTrue(rawCellCount <= expectedMaxRawCellCount); } // Run one minor compaction (in case no minor compaction has happened yet) TestUtil.minorCompact(utility, TableName.valueOf(tableName)); - assertEquals(TestUtil.getRawCellCount(conn, TableName.valueOf(tableName), - Bytes.toBytes("a")), (MAX_COLUMN_INDEX + 1) * versions); + int rawCellCount = TestUtil.getRawCellCount(conn, TableName.valueOf(tableName), + Bytes.toBytes("a")); + int expectedRawCellCount = (MAX_COLUMN_INDEX * versions) + + (multiCF ? rowUpdateCounter : versions); + assertEquals(expectedRawCellCount, rawCellCount); } catch (AssertionError e) { TestUtil.dumpTable(conn, TableName.valueOf(tableName)); throw e; From cac426052386c8b8e35be853ebe4438be02a2556 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Wed, 30 Apr 2025 21:43:10 +0530 Subject: [PATCH 3/5] Address style check issues --- .../java/org/apache/phoenix/coprocessor/CompactionScanner.java | 3 +-- .../phoenix/coprocessor/UngroupedAggregateRegionObserver.java | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 63d79424b35..97065a9833c 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -429,8 +429,7 @@ private void determineEmptyCfCq(List result) { if (ScanUtil.isEmptyColumn(cell, emptyCF, QueryConstants.EMPTY_COLUMN_BYTES)) { emptyCQ = QueryConstants.EMPTY_COLUMN_BYTES; break; - } - else if (ScanUtil.isEmptyColumn(cell, emptyCF, + } else if (ScanUtil.isEmptyColumn(cell, emptyCF, QueryConstants.ENCODED_EMPTY_COLUMN_BYTES)) { //Empty column is always encoded in FOUR_BYTE format, since it's a reserved // qualifier. See EncodedColumnsUtil#isReservedColumnQualifier. diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 405bcb3ee1f..bb79a9c3088 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -614,7 +614,8 @@ private boolean areMutationsInSameTable(Table targetHTable, Region region) { @Override public InternalScanner preFlush(ObserverContext c, Store store, - InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { + InternalScanner scanner, FlushLifeCycleTracker tracker) + throws IOException { if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) { return scanner; } else { From 924e615e7854f37140189a7070984a94f0d2d9d6 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Fri, 9 May 2025 13:05:19 +0530 Subject: [PATCH 4/5] Add flag to disable compaction scanner for flushes --- .../apache/phoenix/query/QueryServices.java | 4 + .../BaseScannerRegionObserver.java | 7 + .../UngroupedAggregateRegionObserver.java | 2 +- ...CompactionScannerForFlushesDisabledIT.java | 172 ++++++++++++++++++ .../end2end/MaxLookbackExtendedIT.java | 2 +- 5 files changed, 185 insertions(+), 2 deletions(-) create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/end2end/CompactionScannerForFlushesDisabledIT.java diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index bc9320723c9..90a3a386c02 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -376,6 +376,10 @@ public interface QueryServices extends SQLCloseable { public static final String PHOENIX_SERVER_PAGE_SIZE_MS = "phoenix.server.page.size.ms"; // Phoenix TTL implemented by CompactionScanner and TTLRegionScanner is enabled public static final String PHOENIX_TABLE_TTL_ENABLED = "phoenix.table.ttl.enabled"; + // Enable CompactionScanner for flushes to remove extra versions + public static final String PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED = + "phoenix.compaction.scanner.for.flushes.enabled"; + public static final boolean DEFAULT_PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED = true; // Copied here to avoid dependency on hbase-server public static final String WAL_EDIT_CODEC_ATTRIB = "hbase.regionserver.wal.codec"; //Property to know whether TTL at View Level is enabled diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index bf7ca32d89b..73b3931328e 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -543,4 +543,11 @@ public static boolean isPhoenixTableTTLEnabled(Configuration conf) { return conf.getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED, QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED); } + + public static boolean isCompactionScannerEnabledForFlushes(Configuration conf) { + boolean isPhoenixTableTTLEnabled = isPhoenixTableTTLEnabled(conf); + return isPhoenixTableTTLEnabled && conf.getBoolean( + QueryServices.PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED, + QueryServices.DEFAULT_PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED); + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index bb79a9c3088..c72b70cb4da 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -616,7 +616,7 @@ private boolean areMutationsInSameTable(Table targetHTable, Region region) { public InternalScanner preFlush(ObserverContext c, Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { - if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) { + if (!isCompactionScannerEnabledForFlushes(c.getEnvironment().getConfiguration())) { return scanner; } else { return User.runAsLoginUser(new PrivilegedExceptionAction() { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CompactionScannerForFlushesDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CompactionScannerForFlushesDisabledIT.java new file mode 100644 index 00000000000..e323ac32fa1 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CompactionScannerForFlushesDisabledIT.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +@Category(NeedsOwnMiniClusterTest.class) +@RunWith(Parameterized.class) +public class CompactionScannerForFlushesDisabledIT extends BaseTest { + private static final int MAX_LOOKBACK_AGE = 15; + private String tableDDLOptions; + private StringBuilder optionBuilder; + ManualEnvironmentEdge injectEdge; + private int ttl; + private boolean multiCF; + + public CompactionScannerForFlushesDisabledIT(boolean multiCF) { + this.multiCF = multiCF; + } + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(2); + props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); + props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + props.put(QueryServices.PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED, "false"); + props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); + props.put(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, "0"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest(){ + EnvironmentEdgeManager.reset(); + optionBuilder = new StringBuilder(); + ttl = 30; + optionBuilder.append(" TTL=" + ttl); + this.tableDDLOptions = optionBuilder.toString(); + injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis()); + } + + @After + public synchronized void afterTest() throws Exception { + boolean refCountLeaked = isAnyStoreRefCountLeaked(); + + EnvironmentEdgeManager.reset(); + Assert.assertFalse("refCount leaked", refCountLeaked); + } + + @Parameterized.Parameters(name = "CompactionScannerForFlushesDisabledIT_multiCF={0}") + public static Collection data() { + return Arrays.asList(false, true); + } + + @Test + public void testRetainingAllRowVersions() throws Exception { + try(Connection conn = DriverManager.getConnection(getUrl())) { + String tableName = generateUniqueName(); + createTable(tableName); + long timeIntervalBetweenTwoUpserts = (ttl / 2) + 1; + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + TableName dataTableName = TableName.valueOf(tableName); + injectEdge.incrementValue(1); + Statement stmt = conn.createStatement(); + stmt.execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000); + stmt.execute("upsert into " + tableName + " values ('a', 'ab1')"); + conn.commit(); + injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000); + stmt.execute("upsert into " + tableName + " values ('a', 'ab2')"); + conn.commit(); + injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000); + stmt.execute("upsert into " + tableName + " values ('a', 'ab3')"); + conn.commit(); + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000); + + TestUtil.dumpTable(conn, dataTableName); + byte[] rowKey = Bytes.toBytes("a"); + int rawCellCount = TestUtil.getRawCellCount(conn, dataTableName, rowKey); + // 6 non-empty cells (ab3, ab2, ab1, ab, abc, abcd) + 4 empty cells (for 4 upserts) + assertEquals(10, rawCellCount); + + TestUtil.flush(utility, dataTableName); + injectEdge.incrementValue(1); + rawCellCount = TestUtil.getRawCellCount(conn, dataTableName, rowKey); + // Compaction scanner wasn't used during flushes and excess rows versions are flushed + assertEquals(10, rawCellCount); + TestUtil.dumpTable(conn, dataTableName); + + TestUtil.majorCompact(getUtility(), dataTableName); + injectEdge.incrementValue(1); + rawCellCount = TestUtil.getRawCellCount(conn, dataTableName, rowKey); + // 1 non-empty cell (ab3) and 1 empty cell at the edge of max lookback window will be + // retained + // 2 non-empty cells outside max lookback window will be retained (abc, abcd) + // 2 empty cells will be retained outside max lookback window + assertEquals(6, rawCellCount); + TestUtil.dumpTable(conn, dataTableName); + + ResultSet rs = stmt.executeQuery("select * from " + dataTableName + " where id = 'a'"); + while(rs.next()) { + assertEquals("abc", rs.getString(3)); + assertEquals("abcd", rs.getString(4)); + } + } + } + + private void createTable(String tableName) throws SQLException { + try(Connection conn = DriverManager.getConnection(getUrl())) { + String createSql; + if (multiCF) { + createSql = "create table " + tableName + + " (id varchar(10) not null primary key, val1 varchar(10), " + + "a.val2 varchar(10), b.val3 varchar(10))" + tableDDLOptions; + } + else { + createSql = "create table " + tableName + + " (id varchar(10) not null primary key, val1 varchar(10), " + + "val2 varchar(10), val3 varchar(10))" + tableDDLOptions; + } + conn.createStatement().execute(createSql); + conn.commit(); + } + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java index 42a25119e5a..28efda7baea 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java @@ -565,7 +565,7 @@ public void testOverrideMaxLookbackForCompaction() throws Exception { } @Test(timeout=60000) - public void testRetainingLastRowVersion() throws Exception { + public void testRetainingLastRowVersionForMinorCompaction() throws Exception { try(Connection conn = DriverManager.getConnection(getUrl())) { String tableName = generateUniqueName(); createTable(tableName); From f1c32f00f743dbf84a4fcb80a26ebd9874fe22f9 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Fri, 9 May 2025 23:45:37 +0530 Subject: [PATCH 5/5] Fix style checks --- .../src/main/java/org/apache/phoenix/query/QueryServices.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 90a3a386c02..d86c11ac379 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -377,9 +377,9 @@ public interface QueryServices extends SQLCloseable { // Phoenix TTL implemented by CompactionScanner and TTLRegionScanner is enabled public static final String PHOENIX_TABLE_TTL_ENABLED = "phoenix.table.ttl.enabled"; // Enable CompactionScanner for flushes to remove extra versions - public static final String PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED = + String PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED = "phoenix.compaction.scanner.for.flushes.enabled"; - public static final boolean DEFAULT_PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED = true; + boolean DEFAULT_PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED = true; // Copied here to avoid dependency on hbase-server public static final String WAL_EDIT_CODEC_ATTRIB = "hbase.regionserver.wal.codec"; //Property to know whether TTL at View Level is enabled