diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java index 5c8554273e..ef74193258 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java @@ -25,8 +25,10 @@ import org.apache.amoro.TableRuntime; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -84,18 +86,49 @@ protected void initHandler(List tableRuntimeList) { .forEach( tableRuntime -> { if (scheduledTables.add(tableRuntime.getTableIdentifier())) { - executor.schedule( - () -> executeTask(tableRuntime), getStartDelay(), TimeUnit.MILLISECONDS); + scheduleTableExecution( + tableRuntime, calculateExecutionDelay(tableRuntime, getCleanupOperation())); } }); logger.info("Table executor {} initialized", getClass().getSimpleName()); } + private long calculateExecutionDelay( + TableRuntime tableRuntime, CleanupOperation cleanupOperation) { + // If the table needs to be executed immediately, schedule it to run after a short delay. + if (shouldExecuteTask(tableRuntime, cleanupOperation)) { + return getStartDelay(); + } + + // If the table does not need to be executed immediately, schedule it for the next execution + // time. + // Adding getStartDelay() helps distribute the execution time of multiple tables, + // reducing the probability of simultaneous execution and system load spikes. + return getNextExecutingTime(tableRuntime) + getStartDelay(); + } + + /** + * Schedule a table for execution with the specified delay. + * + * @param tableRuntime The table runtime to schedule + * @param delay The delay in milliseconds before execution + */ + private void scheduleTableExecution(TableRuntime tableRuntime, long delay) { + executor.schedule(() -> executeTask(tableRuntime), delay, TimeUnit.MILLISECONDS); + logger.debug( + "Scheduled execution for table {} with delay {} ms", + tableRuntime.getTableIdentifier(), + delay); + } + private void executeTask(TableRuntime tableRuntime) { try { if (isExecutable(tableRuntime)) { execute(tableRuntime); + // Different tables take different amounts of time to execute the end of execute(), + // so you need to perform the update operation separately for each table. + persistUpdatingCleanupTime(tableRuntime); } } finally { scheduledTables.remove(tableRuntime.getTableIdentifier()); @@ -117,6 +150,99 @@ protected final void scheduleIfNecessary(TableRuntime tableRuntime, long millise protected abstract void execute(TableRuntime tableRuntime); + protected boolean shouldExecute(Long lastCleanupEndTime) { + return true; + } + + private void persistUpdatingCleanupTime(TableRuntime tableRuntime) { + CleanupOperation cleanupOperation = getCleanupOperation(); + if (shouldSkipOperation(tableRuntime, cleanupOperation)) { + return; + } + + try { + long currentTime = System.currentTimeMillis(); + ((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, currentTime); + + logger.debug( + "Update lastCleanTime for table {} with cleanup operation {}", + tableRuntime.getTableIdentifier().getTableName(), + cleanupOperation); + } catch (Exception e) { + logger.error( + "Failed to update lastCleanTime for table {}", + tableRuntime.getTableIdentifier().getTableName(), + e); + } + } + + /** + * Get cleanup operation. Default is NONE, subclasses should override this method to provide + * specific operation. + * + * @return cleanup operation + */ + protected CleanupOperation getCleanupOperation() { + return CleanupOperation.NONE; + } + + protected boolean shouldExecuteTask( + TableRuntime tableRuntime, CleanupOperation cleanupOperation) { + if (shouldSkipOperation(tableRuntime, cleanupOperation)) { + return true; + } + + long lastCleanupEndTime = + ((DefaultTableRuntime) tableRuntime).getLastCleanTime(cleanupOperation); + + // If it's zero, execute the task + if (lastCleanupEndTime == 0L) { + logger.debug( + "LastCleanupTime for table {} with operation {} is not exist, executing task", + tableRuntime.getTableIdentifier().getTableName(), + cleanupOperation); + return true; + } + + // After ams restarts, certain cleanup operations can only be re-executed + // if sufficient time has elapsed since the last cleanup. + boolean result = shouldExecute(lastCleanupEndTime); + logger.debug( + result + ? "Should execute task for table {} with {}" + : "Not enough time has passed since last cleanup for table {} with {}, delaying execution", + tableRuntime.getTableIdentifier().getTableName(), + cleanupOperation); + + return result; + } + + /** + * Check if the operation should be skipped based on common conditions. + * + * @param tableRuntime the table runtime to check + * @param cleanupOperation the cleanup operation to perform + * @return true if the operation should be skipped, false otherwise + */ + private boolean shouldSkipOperation( + TableRuntime tableRuntime, CleanupOperation cleanupOperation) { + if (cleanupOperation == CleanupOperation.NONE) { + logger.debug( + "No cleanup operation specified, skipping cleanup time check for table {}", + tableRuntime.getTableIdentifier().getTableName()); + return true; + } + + if (!(tableRuntime instanceof DefaultTableRuntime)) { + logger.debug( + "Table runtime is not DefaultTableRuntime, skipping cleanup time check for table {}", + tableRuntime.getTableIdentifier().getTableName()); + return true; + } + + return false; + } + protected String getThreadName() { return String.join("-", StringUtils.splitByCharacterTypeCamelCase(getClass().getSimpleName())) .toLowerCase(Locale.ROOT); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java index 16f80c9c0f..d7d8801f1a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java @@ -26,6 +26,7 @@ import org.apache.amoro.server.scheduler.PeriodicTableScheduler; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,16 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) { return INTERVAL; } + @Override + protected boolean shouldExecute(Long lastCleanupEndTime) { + return System.currentTimeMillis() - lastCleanupEndTime >= INTERVAL; + } + + @Override + protected CleanupOperation getCleanupOperation() { + return CleanupOperation.DANGLING_DELETE_FILES_CLEANING; + } + @Override protected boolean enabled(TableRuntime tableRuntime) { return tableRuntime instanceof DefaultTableRuntime diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java index 4990b74093..61f45860b9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java @@ -25,6 +25,7 @@ import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,16 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) { return interval.toMillis(); } + @Override + protected boolean shouldExecute(Long lastCleanupEndTime) { + return System.currentTimeMillis() - lastCleanupEndTime >= interval.toMillis(); + } + + @Override + protected CleanupOperation getCleanupOperation() { + return CleanupOperation.DATA_EXPIRING; + } + @Override protected boolean enabled(TableRuntime tableRuntime) { return tableRuntime.getTableConfiguration().getExpiringDataConfig().isEnabled(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java index 332c511411..21d60cd105 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java @@ -25,6 +25,7 @@ import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,16 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) { return interval.toMillis(); } + @Override + protected boolean shouldExecute(Long lastCleanupEndTime) { + return System.currentTimeMillis() - lastCleanupEndTime >= interval.toMillis(); + } + + @Override + protected CleanupOperation getCleanupOperation() { + return CleanupOperation.ORPHAN_FILES_CLEANING; + } + @Override protected boolean enabled(TableRuntime tableRuntime) { return tableRuntime.getTableConfiguration().isCleanOrphanEnabled(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java index f7d0cb927f..15f2d49d9e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java @@ -25,6 +25,7 @@ import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +56,16 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or scheduleIfNecessary(tableRuntime, getStartDelay()); } + @Override + protected boolean shouldExecute(Long lastCleanupEndTime) { + return System.currentTimeMillis() - lastCleanupEndTime >= INTERVAL; + } + + @Override + protected CleanupOperation getCleanupOperation() { + return CleanupOperation.SNAPSHOTS_EXPIRING; + } + @Override protected long getExecutorDelay() { return ThreadLocalRandom.current().nextLong(INTERVAL); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index f2845cc711..50bd12585a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -42,6 +42,8 @@ import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.table.blocker.TableBlocker; +import org.apache.amoro.server.table.cleanup.CleanupOperation; +import org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState; import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.server.utils.SnowflakeIdGenerator; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; @@ -81,11 +83,17 @@ public class DefaultTableRuntime extends AbstractTableRuntime .jsonType(AbstractOptimizingEvaluator.PendingInput.class) .defaultValue(new AbstractOptimizingEvaluator.PendingInput()); + private static final StateKey CLEANUP_STATE_KEY = + StateKey.stateKey("cleanup_state") + .jsonType(TableRuntimeCleanupState.class) + .defaultValue(new TableRuntimeCleanupState()); + private static final StateKey PROCESS_ID_KEY = StateKey.stateKey("process_id").longType().defaultValue(0L); public static final List> REQUIRED_STATES = - Lists.newArrayList(OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, PROCESS_ID_KEY); + Lists.newArrayList( + OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, PROCESS_ID_KEY, CLEANUP_STATE_KEY); private final Map processContainerMap = Maps.newConcurrentMap(); private final TableOptimizingMetrics optimizingMetrics; @@ -353,6 +361,47 @@ public void beginProcess(OptimizingProcess optimizingProcess) { .commit(); } + public long getLastCleanTime(CleanupOperation operation) { + TableRuntimeCleanupState state = store().getState(CLEANUP_STATE_KEY); + switch (operation) { + case ORPHAN_FILES_CLEANING: + return state.getLastOrphanFilesCleanTime(); + case DANGLING_DELETE_FILES_CLEANING: + return state.getLastDanglingDeleteFilesCleanTime(); + case DATA_EXPIRING: + return state.getLastDataExpiringTime(); + case SNAPSHOTS_EXPIRING: + return state.getLastSnapshotsExpiringTime(); + default: + return 0L; + } + } + + public void updateLastCleanTime(CleanupOperation operation, long time) { + store() + .begin() + .updateState( + CLEANUP_STATE_KEY, + state -> { + switch (operation) { + case ORPHAN_FILES_CLEANING: + state.setLastOrphanFilesCleanTime(time); + break; + case DANGLING_DELETE_FILES_CLEANING: + state.setLastDanglingDeleteFilesCleanTime(time); + break; + case DATA_EXPIRING: + state.setLastDataExpiringTime(time); + break; + case SNAPSHOTS_EXPIRING: + state.setLastSnapshotsExpiringTime(time); + break; + } + return state; + }) + .commit(); + } + public void completeProcess(boolean success) { OptimizingStatus originalStatus = getOptimizingStatus(); OptimizingType processType = optimizingProcess.getOptimizingType(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java new file mode 100644 index 0000000000..10afefe635 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.cleanup; + +/** Table cleanup operation enum. Defines different operation types for table cleanup tasks. */ +public enum CleanupOperation { + DANGLING_DELETE_FILES_CLEANING, + ORPHAN_FILES_CLEANING, + DATA_EXPIRING, + SNAPSHOTS_EXPIRING, + // NONE indicates operation types where no cleanup process records are + // saved in the table_runtime_state table. + NONE; +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java new file mode 100644 index 0000000000..9dfb98f6ed --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.cleanup; + +public class TableRuntimeCleanupState { + private long lastOrphanFilesCleanTime; + private long lastDanglingDeleteFilesCleanTime; + private long lastDataExpiringTime; + private long lastSnapshotsExpiringTime; + + public long getLastOrphanFilesCleanTime() { + return lastOrphanFilesCleanTime; + } + + public void setLastOrphanFilesCleanTime(long lastOrphanFilesCleanTime) { + this.lastOrphanFilesCleanTime = lastOrphanFilesCleanTime; + } + + public long getLastDanglingDeleteFilesCleanTime() { + return lastDanglingDeleteFilesCleanTime; + } + + public void setLastDanglingDeleteFilesCleanTime(long lastDanglingDeleteFilesCleanTime) { + this.lastDanglingDeleteFilesCleanTime = lastDanglingDeleteFilesCleanTime; + } + + public long getLastDataExpiringTime() { + return lastDataExpiringTime; + } + + public void setLastDataExpiringTime(long lastDataExpiringTime) { + this.lastDataExpiringTime = lastDataExpiringTime; + } + + public long getLastSnapshotsExpiringTime() { + return lastSnapshotsExpiringTime; + } + + public void setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) { + this.lastSnapshotsExpiringTime = lastSnapshotsExpiringTime; + } +} diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql index 3f4c9031e9..b820a762dd 100644 --- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql +++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql @@ -151,8 +151,6 @@ CREATE TABLE `table_process` KEY `table_index` (`table_id`, `create_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after each commit'; - - CREATE TABLE `optimizing_process_state` ( `process_id` bigint(20) NOT NULL COMMENT 'optimizing_procedure UUID', diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql index bc4375cd82..5f4073ec7f 100644 --- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql +++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql @@ -212,7 +212,6 @@ comment on column table_runtime_state.state_version is 'Table runtime state vers comment on column table_runtime_state.create_time is 'create time'; comment on column table_runtime_state.update_time is 'update time'; - CREATE TABLE table_process ( process_id bigserial PRIMARY KEY, table_id bigint NOT NULL, diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java new file mode 100644 index 0000000000..50c0e9d19d --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.scheduler.inline; + +import org.apache.amoro.TableRuntime; +import org.apache.amoro.server.scheduler.PeriodicTableScheduler; +import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; + +/** + * Test table executor implementation for testing PeriodicTableScheduler functionality. This class + * allows configuration of cleanup operations and enabled state for testing purposes. + */ +class PeriodicTableSchedulerTestBase extends PeriodicTableScheduler { + private final CleanupOperation cleanupOperation; + private final boolean enabled; + private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour + private static final long ORPHAN_FILES_CLEANING_INTERVAL = 24 * 60 * 60 * 1000L; // 1 day + private static final long DANGLING_DELETE_FILES_CLEANING_INTERVAL = 24 * 60 * 60 * 1000L; + private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour + + public PeriodicTableSchedulerTestBase( + TableService tableService, CleanupOperation cleanupOperation, boolean enabled) { + super(tableService, 1); + this.cleanupOperation = cleanupOperation; + this.enabled = enabled; + } + + @Override + protected CleanupOperation getCleanupOperation() { + return cleanupOperation; + } + + @Override + protected long getNextExecutingTime(TableRuntime tableRuntime) { + return 1000; + } + + @Override + protected boolean enabled(TableRuntime tableRuntime) { + return enabled; + } + + @Override + protected void execute(TableRuntime tableRuntime) { + // Do nothing in test + } + + @Override + protected long getExecutorDelay() { + return 0; + } + + @Override + protected boolean shouldExecute(Long lastCleanupEndTime) { + long currentTime = System.currentTimeMillis(); + switch (cleanupOperation) { + case SNAPSHOTS_EXPIRING: + return currentTime - lastCleanupEndTime >= SNAPSHOTS_EXPIRING_INTERVAL; + case ORPHAN_FILES_CLEANING: + return currentTime - lastCleanupEndTime >= ORPHAN_FILES_CLEANING_INTERVAL; + case DANGLING_DELETE_FILES_CLEANING: + return currentTime - lastCleanupEndTime >= DANGLING_DELETE_FILES_CLEANING_INTERVAL; + case DATA_EXPIRING: + return currentTime - lastCleanupEndTime >= DATA_EXPIRING_INTERVAL; + default: + return true; + } + } + + public boolean shouldExecuteTaskForTest( + TableRuntime tableRuntime, CleanupOperation cleanupOperation) { + return shouldExecuteTask(tableRuntime, cleanupOperation); + } +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java new file mode 100644 index 0000000000..c401c88d8c --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.scheduler.inline; + +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; +import org.apache.amoro.server.persistence.mapper.TableMetaMapper; +import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper; +import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.server.table.DefaultTableRuntimeStore; +import org.apache.amoro.server.table.TableRuntimeHandler; +import org.apache.amoro.server.table.cleanup.CleanupOperation; +import org.apache.amoro.table.TableRuntimeStore; +import org.apache.amoro.table.TableSummary; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * This class tests all aspects of cleanup operation handling in {@link + * org.apache.amoro.server.scheduler.PeriodicTableScheduler}. + */ +public class TestPeriodicTableSchedulerCleanup extends PersistentBase { + + private static final String TEST_CATALOG = "test_catalog"; + private static final String TEST_DB = "test_db"; + private static final String TEST_TABLE = "test_table"; + + static { + try { + Class.forName("org.apache.amoro.server.table.DerbyPersistence"); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize Derby persistence", e); + } + } + + private static final TableRuntimeHandler TEST_HANDLER = + new TableRuntimeHandler() { + @Override + public void handleTableChanged( + TableRuntime tableRuntime, + org.apache.amoro.server.optimizing.OptimizingStatus originalStatus) {} + + @Override + public void handleTableChanged( + TableRuntime tableRuntime, TableConfiguration originalConfig) {} + }; + + /** + * Create a test server table identifier with the given ID + * + * @param tableId the table ID + * @return a ServerTableIdentifier instance + */ + private ServerTableIdentifier createTableIdentifier(long tableId) { + return ServerTableIdentifier.of( + tableId, TEST_CATALOG, TEST_DB, TEST_TABLE + "_" + tableId, TableFormat.ICEBERG); + } + + /** + * Create a test DefaultTableRuntime with the given identifier + * + * @param identifier the table identifier + * @return a DefaultTableRuntime instance + */ + private DefaultTableRuntime createDefaultTableRuntime(ServerTableIdentifier identifier) { + // Create table runtime meta + TableRuntimeMeta meta = new TableRuntimeMeta(); + meta.setTableId(identifier.getId()); + meta.setGroupName("test_group"); + meta.setStatusCode(0); + meta.setTableConfig(Collections.emptyMap()); + meta.setTableSummary(new TableSummary()); + + // Create table runtime store + TableRuntimeStore store = + new DefaultTableRuntimeStore( + identifier, meta, DefaultTableRuntime.REQUIRED_STATES, Collections.emptyList()); + + return new DefaultTableRuntime(store); + } + + private void cleanUpTableRuntimeData(List tableIds) { + doAs( + TableRuntimeMapper.class, + mapper -> { + for (Long tableId : tableIds) { + try { + mapper.deleteRuntime(tableId); + mapper.removeAllTableStates(tableId); + } catch (Exception e) { + // Ignore if tables don't exist + } + } + }); + doAs( + TableMetaMapper.class, + mapper -> { + for (Long tableId : tableIds) { + try { + mapper.deleteTableIdById(tableId); + } catch (Exception e) { + // Ignore if tables don't exist + } + } + }); + } + + /** + * Prepare test environment by cleaning up test data and table runtime data + * + * @param testTableIds list of table IDs to clean up + */ + private void prepareTestEnvironment(List testTableIds) { + cleanUpTableRuntimeData(testTableIds); + } + + /** + * Create a test table executor + * + * @param cleanupOperation the cleanup operation to use + * @param enabled whether the executor should be enabled + * @return a new PeriodicTableSchedulerTestBase instance + */ + private PeriodicTableSchedulerTestBase createTestExecutor( + CleanupOperation cleanupOperation, boolean enabled) { + return new PeriodicTableSchedulerTestBase(null, cleanupOperation, enabled); + } + + /** + * Create a test table executor with default enabled state (true) + * + * @param cleanupOperation the cleanup operation to use + * @return a new PeriodicTableSchedulerTestBase instance + */ + private PeriodicTableSchedulerTestBase createTestExecutor(CleanupOperation cleanupOperation) { + return createTestExecutor(cleanupOperation, true); + } + + /** + * Test whether the executor should execute a task for a given table runtime and cleanup operation + */ + @Test + public void testShouldExecuteTaskWithNoPreviousCleanup() { + List operations = + Arrays.asList( + CleanupOperation.ORPHAN_FILES_CLEANING, + CleanupOperation.DANGLING_DELETE_FILES_CLEANING, + CleanupOperation.DATA_EXPIRING, + CleanupOperation.SNAPSHOTS_EXPIRING); + + for (CleanupOperation operation : operations) { + List testTableIds = Collections.singletonList(1L); + prepareTestEnvironment(testTableIds); + + PeriodicTableSchedulerTestBase executor = createTestExecutor(operation); + ServerTableIdentifier identifier = createTableIdentifier(1L); + DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier); + + boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, operation); + Assert.assertTrue( + "Should execute when there's no previous cleanup time for operation " + operation, + shouldExecute); + } + } + + /** Test should not execute task with recent cleanup */ + @Test + public void testShouldNotExecuteTaskWithRecentCleanup() { + List operations = + Arrays.asList( + CleanupOperation.ORPHAN_FILES_CLEANING, + CleanupOperation.DANGLING_DELETE_FILES_CLEANING, + CleanupOperation.DATA_EXPIRING, + CleanupOperation.SNAPSHOTS_EXPIRING); + + for (CleanupOperation operation : operations) { + List testTableIds = Collections.singletonList(1L); + cleanUpTableRuntimeData(testTableIds); + + PeriodicTableSchedulerTestBase executor = createTestExecutor(operation); + + // Create DefaultTableRuntime and set recent cleanup time + ServerTableIdentifier identifier = createTableIdentifier(1L); + DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier); + + // Simulate recent cleanup + long recentTime = System.currentTimeMillis() - 10000L; + tableRuntime.updateLastCleanTime(operation, recentTime); + + boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, operation); + Assert.assertFalse( + "Should not execute when recently cleaned up for operation " + operation, shouldExecute); + } + } + + /** Test should execute task with old cleanup */ + @Test + public void testShouldExecuteTaskWithOldCleanup() { + List operations = + Arrays.asList( + CleanupOperation.ORPHAN_FILES_CLEANING, + CleanupOperation.DANGLING_DELETE_FILES_CLEANING, + CleanupOperation.DATA_EXPIRING, + CleanupOperation.SNAPSHOTS_EXPIRING); + + for (CleanupOperation operation : operations) { + List testTableIds = Collections.singletonList(1L); + cleanUpTableRuntimeData(testTableIds); + + PeriodicTableSchedulerTestBase executor = createTestExecutor(operation); + + // Create DefaultTableRuntime and set old cleanup time + ServerTableIdentifier identifier = createTableIdentifier(1L); + DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier); + + // Simulate old cleanup time (30 hours ago) + long oldTime = System.currentTimeMillis() - 30 * 60 * 60 * 1000L; + tableRuntime.updateLastCleanTime(operation, oldTime); + + boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, operation); + Assert.assertTrue( + "Should execute when enough time has passed since last cleanup for operation " + + operation, + shouldExecute); + } + } + + @Test + public void testShouldExecuteTaskWithNoneOperation() { + List testTableIds = Collections.singletonList(1L); + prepareTestEnvironment(testTableIds); + + PeriodicTableSchedulerTestBase executor = createTestExecutor(CleanupOperation.NONE); + ServerTableIdentifier identifier = createTableIdentifier(1L); + DefaultTableRuntime tableRuntime = createDefaultTableRuntime(identifier); + + // Should always execute with NONE operation + boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, CleanupOperation.NONE); + Assert.assertTrue("Should always execute with NONE operation", shouldExecute); + } +}