diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index ff95d8d0dd..3856b16d55 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -52,6 +52,41 @@ public class AmoroManagementConf { .defaultValue("admin") .withDescription("The administrator password"); + /** Enable master & slave mode, which supports horizontal scaling of AMS. */ + public static final ConfigOption USE_MASTER_SLAVE_MODE = + ConfigOptions.key("use-master-slave-mode") + .booleanType() + .defaultValue(false) + .withDescription("Enable master & slave mode, which supports horizontal scaling of AMS."); + + public static final ConfigOption BUCKET_ID_TOTAL_COUNT = + ConfigOptions.key("bucket-id.total-count") + .intType() + .defaultValue(100) + .withDescription( + "Total count of bucket IDs for assignment. Bucket IDs range from 1 to this value."); + + public static final ConfigOption NODE_OFFLINE_TIMEOUT = + ConfigOptions.key("node-offline.timeout") + .durationType() + .defaultValue(Duration.ofMinutes(5)) + .withDescription( + "Timeout duration to determine if a node is offline. After this duration, the node's bucket IDs will be reassigned."); + + public static final ConfigOption ASSIGN_INTERVAL = + ConfigOptions.key("bucket-assign.interval") + .durationType() + .defaultValue(Duration.ofSeconds(60)) + .withDescription( + "Interval for bucket assignment service to detect node changes and redistribute bucket IDs."); + + public static final ConfigOption BUCKET_TABLE_SYNC_INTERVAL = + ConfigOptions.key("bucket-table-sync.interval") + .durationType() + .defaultValue(Duration.ofSeconds(60)) + .withDescription( + "Interval for syncing tables assigned to bucket IDs in master-slave mode. Each node periodically loads tables from database based on its assigned bucket IDs."); + public static final ConfigOption CATALOG_META_CACHE_EXPIRATION_INTERVAL = ConfigOptions.key("catalog-meta-cache.expiration-interval") .durationType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 1f91119fc1..67fdec37f9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -18,6 +18,8 @@ package org.apache.amoro.server; +import static org.apache.amoro.server.AmoroManagementConf.USE_MASTER_SLAVE_MODE; + import io.javalin.Javalin; import io.javalin.http.HttpCode; import io.javalin.http.staticfiles.Location; @@ -96,6 +98,7 @@ public class AmoroServiceContainer { public static final Logger LOG = LoggerFactory.getLogger(AmoroServiceContainer.class); public static final String SERVER_CONFIG_FILENAME = "config.yaml"; + private static boolean IS_MASTER_SLAVE_MODE = false; private final HighAvailabilityContainer haContainer; private DataSource dataSource; @@ -110,6 +113,7 @@ public class AmoroServiceContainer { private TServer optimizingServiceServer; private Javalin httpServer; private AmsServiceMetrics amsServiceMetrics; + private AmsAssignService amsAssignService; public AmoroServiceContainer() throws Exception { initConfig(); @@ -128,15 +132,22 @@ public static void main(String[] args) { LOG.info("AMS service has been shut down"); })); service.startRestServices(); - while (true) { - try { - service.waitLeaderShip(); - service.startOptimizingService(); - service.waitFollowerShip(); - } catch (Exception e) { - LOG.error("AMS start error", e); - } finally { - service.disposeOptimizingService(); + if (IS_MASTER_SLAVE_MODE) { + // Even if one does not become the master, it cannot block the subsequent logic. + service.registAndElect(); + // Regardless of whether tp becomes the master, the service needs to be activated. + service.startOptimizingService(); + } else { + while (true) { + try { + service.waitLeaderShip(); + service.startOptimizingService(); + service.waitFollowerShip(); + } catch (Exception e) { + LOG.error("AMS start error", e); + } finally { + service.disposeOptimizingService(); + } } } } catch (Throwable t) { @@ -145,6 +156,10 @@ public static void main(String[] args) { } } + public void registAndElect() throws Exception { + haContainer.registAndElect(); + } + public void waitLeaderShip() throws Exception { haContainer.waitLeaderShip(); } @@ -171,11 +186,29 @@ public void startOptimizingService() throws Exception { TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager(); tableRuntimeFactoryManager.initialize(); + // In master-slave mode, create BucketAssignStore and AmsAssignService + BucketAssignStore bucketAssignStore = null; + if (IS_MASTER_SLAVE_MODE && haContainer != null && haContainer.getZkClient() != null) { + String clusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME); + bucketAssignStore = new ZkBucketAssignStore(haContainer.getZkClient(), clusterName); + // Create and start AmsAssignService for bucket assignment + amsAssignService = + new AmsAssignService(haContainer, serviceConfig, haContainer.getZkClient()); + amsAssignService.start(); + LOG.info("AmsAssignService started for master-slave mode"); + } + tableService = - new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager); + new DefaultTableService( + serviceConfig, + catalogManager, + tableRuntimeFactoryManager, + haContainer, + bucketAssignStore); optimizingService = - new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); + new DefaultOptimizingService( + serviceConfig, catalogManager, optimizerManager, tableService, haContainer); LOG.info("Setting up AMS table executors..."); InlineTableExecutors.getInstance().setup(tableService, serviceConfig); @@ -214,6 +247,11 @@ public void disposeOptimizingService() { LOG.info("Stopping optimizing server[serving:{}] ...", optimizingServiceServer.isServing()); optimizingServiceServer.stop(); } + if (amsAssignService != null) { + LOG.info("Stopping AmsAssignService..."); + amsAssignService.stop(); + amsAssignService = null; + } if (tableService != null) { LOG.info("Stopping table service..."); tableService.dispose(); @@ -256,6 +294,7 @@ public void dispose() { private void initConfig() throws Exception { LOG.info("initializing configurations..."); new ConfigurationHelper().init(); + IS_MASTER_SLAVE_MODE = serviceConfig.getBoolean(USE_MASTER_SLAVE_MODE); } private void startThriftService() { @@ -534,6 +573,12 @@ private void initContainerConfig() { containerProperties.putIfAbsent( OptimizerProperties.AMS_OPTIMIZER_URI, AmsUtil.getAMSThriftAddress(serviceConfig, Constants.THRIFT_OPTIMIZING_SERVICE_NAME)); + // Add master-slave mode flag to container properties + // Read from serviceConfig directly since IS_MASTER_SLAVE_MODE is set after + // initContainerConfig() + if (serviceConfig.getBoolean(USE_MASTER_SLAVE_MODE)) { + containerProperties.put(OptimizerProperties.OPTIMIZER_MASTER_SLAVE_MODE, "true"); + } // put addition system properties container.setProperties(containerProperties); containerList.add(container); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java new file mode 100644 index 0000000000..4049bcd1a3 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server; + +import org.apache.amoro.client.AmsServerInfo; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Service for assigning bucket IDs to AMS nodes in master-slave mode. Periodically detects node + * changes and redistributes bucket IDs evenly. + */ +public class AmsAssignService { + + private static final Logger LOG = LoggerFactory.getLogger(AmsAssignService.class); + + private final ScheduledExecutorService assignScheduler = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("ams-assign-scheduler-%d") + .setDaemon(true) + .build()); + + private final HighAvailabilityContainer haContainer; + private final BucketAssignStore assignStore; + private final Configurations serviceConfig; + private final int bucketIdTotalCount; + private final long nodeOfflineTimeoutMs; + private final long assignIntervalSeconds; + private volatile boolean running = false; + + // Package-private accessors for testing + BucketAssignStore getAssignStore() { + return assignStore; + } + + boolean isRunning() { + return running; + } + + void doAssignForTest() { + doAssign(); + } + + public AmsAssignService( + HighAvailabilityContainer haContainer, + Configurations serviceConfig, + CuratorFramework zkClient) { + this.haContainer = haContainer; + this.serviceConfig = serviceConfig; + this.bucketIdTotalCount = serviceConfig.getInteger(AmoroManagementConf.BUCKET_ID_TOTAL_COUNT); + this.nodeOfflineTimeoutMs = + serviceConfig.get(AmoroManagementConf.NODE_OFFLINE_TIMEOUT).toMillis(); + this.assignIntervalSeconds = + serviceConfig.get(AmoroManagementConf.ASSIGN_INTERVAL).getSeconds(); + String clusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME); + this.assignStore = new ZkBucketAssignStore(zkClient, clusterName); + } + + /** + * Start the assignment service. Only works in master-slave mode and when current node is leader. + */ + public void start() { + if (!serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE)) { + LOG.info("Master-slave mode is not enabled, skip starting bucket assignment service"); + return; + } + if (running) { + LOG.warn("Bucket assignment service is already running"); + return; + } + running = true; + assignScheduler.scheduleWithFixedDelay( + this::doAssign, 10, assignIntervalSeconds, TimeUnit.SECONDS); + LOG.info("Bucket assignment service started with interval: {} seconds", assignIntervalSeconds); + } + + /** Stop the assignment service. */ + public void stop() { + if (!running) { + return; + } + running = false; + assignScheduler.shutdown(); + try { + if (!assignScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + assignScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + assignScheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + LOG.info("Bucket assignment service stopped"); + } + + private void doAssign() { + try { + if (!haContainer.hasLeadership()) { + LOG.debug("Current node is not leader, skip bucket assignment"); + return; + } + + List aliveNodes = haContainer.getAliveNodes(); + if (aliveNodes.isEmpty()) { + LOG.debug("No alive nodes found, skip bucket assignment"); + return; + } + + Map> currentAssignments = assignStore.getAllAssignments(); + + // Create a mapping from stored nodes (may have null restBindPort) to alive nodes (complete + // info) + // Use host:thriftBindPort as the key for matching + Map aliveNodeMap = new java.util.HashMap<>(); + for (AmsServerInfo node : aliveNodes) { + String key = getNodeKey(node); + aliveNodeMap.put(key, node); + } + + // Normalize current assignments: map stored nodes to their corresponding alive nodes + Map> normalizedAssignments = new java.util.HashMap<>(); + Set currentAssignedNodes = new HashSet<>(); + for (Map.Entry> entry : currentAssignments.entrySet()) { + AmsServerInfo storedNode = entry.getKey(); + String nodeKey = getNodeKey(storedNode); + AmsServerInfo aliveNode = aliveNodeMap.get(nodeKey); + if (aliveNode != null) { + // Node is alive, use the complete node info from aliveNodes + normalizedAssignments.put(aliveNode, entry.getValue()); + currentAssignedNodes.add(aliveNode); + } else { + // Node is not in alive list, keep the stored node info for offline detection + normalizedAssignments.put(storedNode, entry.getValue()); + currentAssignedNodes.add(storedNode); + } + } + + Set aliveNodeSet = new HashSet<>(aliveNodes); + + // Detect new nodes and offline nodes + Set newNodes = new HashSet<>(aliveNodeSet); + newNodes.removeAll(currentAssignedNodes); + + Set offlineNodes = new HashSet<>(); + for (AmsServerInfo storedNode : currentAssignments.keySet()) { + String nodeKey = getNodeKey(storedNode); + if (!aliveNodeMap.containsKey(nodeKey)) { + offlineNodes.add(storedNode); + } + } + + // Check for nodes that haven't updated for a long time + long currentTime = System.currentTimeMillis(); + Set aliveNodeKeys = new HashSet<>(); + for (AmsServerInfo node : aliveNodes) { + aliveNodeKeys.add(getNodeKey(node)); + } + for (AmsServerInfo node : currentAssignedNodes) { + String nodeKey = getNodeKey(node); + if (aliveNodeKeys.contains(nodeKey)) { + long lastUpdateTime = assignStore.getLastUpdateTime(node); + if (lastUpdateTime > 0 && (currentTime - lastUpdateTime) > nodeOfflineTimeoutMs) { + // Find the stored node for this alive node to add to offlineNodes + for (AmsServerInfo storedNode : currentAssignments.keySet()) { + if (getNodeKey(storedNode).equals(nodeKey)) { + offlineNodes.add(storedNode); + break; + } + } + LOG.warn( + "Node {} is considered offline due to timeout. Last update: {}", + node, + lastUpdateTime); + } + } + } + + boolean needReassign = !newNodes.isEmpty() || !offlineNodes.isEmpty(); + + if (needReassign) { + LOG.info( + "Detected node changes - New nodes: {}, Offline nodes: {}, Performing incremental reassignment...", + newNodes.size(), + offlineNodes.size()); + + // Step 1: Handle offline nodes - collect their buckets for redistribution + List bucketsToRedistribute = new ArrayList<>(); + for (AmsServerInfo offlineNode : offlineNodes) { + try { + List offlineBuckets = currentAssignments.get(offlineNode); + if (offlineBuckets != null && !offlineBuckets.isEmpty()) { + bucketsToRedistribute.addAll(offlineBuckets); + LOG.info( + "Collected {} buckets from offline node {} for redistribution", + offlineBuckets.size(), + offlineNode); + } + assignStore.removeAssignments(offlineNode); + } catch (Exception e) { + LOG.warn("Failed to remove assignments for offline node {}", offlineNode, e); + } + } + + // Step 2: Calculate target assignment for balanced distribution + List allBuckets = generateBucketIds(); + int totalBuckets = allBuckets.size(); + int totalAliveNodes = aliveNodes.size(); + int targetBucketsPerNode = totalBuckets / totalAliveNodes; + int remainder = totalBuckets % totalAliveNodes; + + // Step 3: Incremental reassignment + // Keep existing assignments for nodes that are still alive + Map> newAssignments = new java.util.HashMap<>(); + Set offlineNodeKeys = new HashSet<>(); + for (AmsServerInfo offlineNode : offlineNodes) { + offlineNodeKeys.add(getNodeKey(offlineNode)); + } + for (AmsServerInfo node : aliveNodes) { + String nodeKey = getNodeKey(node); + if (!offlineNodeKeys.contains(nodeKey)) { + // Node is alive and not offline, check if it has existing assignments + List existingBuckets = normalizedAssignments.get(node); + if (existingBuckets != null && !existingBuckets.isEmpty()) { + // Keep existing buckets for alive nodes (not offline) + newAssignments.put(node, new ArrayList<>(existingBuckets)); + } else { + // New node + newAssignments.put(node, new ArrayList<>()); + } + } else { + // Node was offline, start with empty assignment + newAssignments.put(node, new ArrayList<>()); + } + } + + // Step 4: Redistribute buckets from offline nodes to alive nodes + if (!bucketsToRedistribute.isEmpty()) { + redistributeBucketsIncrementally( + aliveNodes, bucketsToRedistribute, newAssignments, targetBucketsPerNode); + } + + // Step 5: Handle new nodes - balance buckets from existing nodes + if (!newNodes.isEmpty()) { + balanceBucketsForNewNodes( + aliveNodes, newNodes, newAssignments, targetBucketsPerNode, remainder); + } + + // Step 6: Handle unassigned buckets (if any) + Set allAssignedBuckets = new HashSet<>(); + for (List buckets : newAssignments.values()) { + allAssignedBuckets.addAll(buckets); + } + List unassignedBuckets = new ArrayList<>(); + for (String bucket : allBuckets) { + if (!allAssignedBuckets.contains(bucket)) { + unassignedBuckets.add(bucket); + } + } + if (!unassignedBuckets.isEmpty()) { + redistributeBucketsIncrementally( + aliveNodes, unassignedBuckets, newAssignments, targetBucketsPerNode); + } + + // Step 7: Save all new assignments + for (Map.Entry> entry : newAssignments.entrySet()) { + try { + assignStore.saveAssignments(entry.getKey(), entry.getValue()); + LOG.info( + "Assigned {} buckets to node {}: {}", + entry.getValue().size(), + entry.getKey(), + entry.getValue()); + } catch (Exception e) { + LOG.error("Failed to save assignments for node {}", entry.getKey(), e); + } + } + } else { + // Update last update time for alive nodes + for (AmsServerInfo node : aliveNodes) { + assignStore.updateLastUpdateTime(node); + } + } + } catch (Exception e) { + LOG.error("Error during bucket assignment", e); + } + } + + /** + * Redistribute buckets incrementally to alive nodes using round-robin. This minimizes bucket + * migration by only redistributing buckets from offline nodes. + * + * @param aliveNodes List of alive nodes + * @param bucketsToRedistribute Buckets to redistribute (from offline nodes) + * @param currentAssignments Current assignments map (will be modified) + * @param targetBucketsPerNode Target number of buckets per node + */ + private void redistributeBucketsIncrementally( + List aliveNodes, + List bucketsToRedistribute, + Map> currentAssignments, + int targetBucketsPerNode) { + if (aliveNodes.isEmpty() || bucketsToRedistribute.isEmpty()) { + return; + } + + // Distribute buckets using round-robin to minimize migration + int nodeIndex = 0; + for (String bucketId : bucketsToRedistribute) { + AmsServerInfo node = aliveNodes.get(nodeIndex % aliveNodes.size()); + currentAssignments.get(node).add(bucketId); + nodeIndex++; + } + } + + /** + * Balance buckets for new nodes by taking buckets from existing nodes. This minimizes migration + * by only moving necessary buckets to new nodes. + * + * @param aliveNodes All alive nodes + * @param newNodes Newly added nodes + * @param currentAssignments Current assignments map (will be modified) + * @param targetBucketsPerNode Target number of buckets per node + * @param remainder Remainder when dividing total buckets by node count + */ + private void balanceBucketsForNewNodes( + List aliveNodes, + Set newNodes, + Map> currentAssignments, + int targetBucketsPerNode, + int remainder) { + if (newNodes.isEmpty()) { + return; + } + + // Calculate how many buckets each new node should get + int bucketsPerNewNode = targetBucketsPerNode; + int newNodeIndex = 0; + for (AmsServerInfo newNode : newNodes) { + // First 'remainder' nodes get one extra bucket + int targetForNewNode = bucketsPerNewNode + (newNodeIndex < remainder ? 1 : 0); + int currentCount = currentAssignments.get(newNode).size(); + int needed = targetForNewNode - currentCount; + + if (needed > 0) { + // Collect buckets from existing nodes (prefer nodes with more buckets) + List bucketsToMove = + collectBucketsFromExistingNodes(aliveNodes, newNodes, currentAssignments, needed); + currentAssignments.get(newNode).addAll(bucketsToMove); + LOG.info( + "Moved {} buckets to new node {} (target: {})", + bucketsToMove.size(), + newNode, + targetForNewNode); + } + newNodeIndex++; + } + } + + /** + * Collect buckets from existing nodes to balance for new nodes. Prefer taking from nodes that + * have more buckets than target. + * + * @param aliveNodes All alive nodes + * @param newNodes New nodes (excluded from source) + * @param currentAssignments Current assignments + * @param needed Number of buckets needed + * @return List of bucket IDs to move + */ + private List collectBucketsFromExistingNodes( + List aliveNodes, + Set newNodes, + Map> currentAssignments, + int needed) { + List bucketsToMove = new ArrayList<>(); + List existingNodes = new ArrayList<>(); + for (AmsServerInfo node : aliveNodes) { + if (!newNodes.contains(node)) { + existingNodes.add(node); + } + } + + if (existingNodes.isEmpty()) { + return bucketsToMove; + } + + // Sort existing nodes by current bucket count (descending) + // This ensures we take from nodes with more buckets first + existingNodes.sort( + (n1, n2) -> { + int count1 = currentAssignments.get(n1).size(); + int count2 = currentAssignments.get(n2).size(); + return Integer.compare(count2, count1); + }); + + // Collect buckets from existing nodes using round-robin + int nodeIndex = 0; + int collected = 0; + while (collected < needed && !existingNodes.isEmpty()) { + AmsServerInfo sourceNode = existingNodes.get(nodeIndex % existingNodes.size()); + List sourceBuckets = currentAssignments.get(sourceNode); + if (!sourceBuckets.isEmpty()) { + // Take one bucket from this node + String bucketToMove = sourceBuckets.remove(0); + bucketsToMove.add(bucketToMove); + collected++; + LOG.debug("Moving bucket {} from node {} to new node", bucketToMove, sourceNode); + } else { + // This node has no more buckets, remove it from consideration + existingNodes.remove(sourceNode); + if (existingNodes.isEmpty()) { + break; + } + nodeIndex = nodeIndex % existingNodes.size(); + continue; + } + nodeIndex++; + } + + return bucketsToMove; + } + + private List generateBucketIds() { + List bucketIds = new ArrayList<>(); + for (int i = 1; i <= bucketIdTotalCount; i++) { + bucketIds.add(String.valueOf(i)); + } + return bucketIds; + } + + /** + * Get node key for matching nodes. Uses host:thriftBindPort format, consistent with + * ZkBucketAssignStore.getNodeKey(). + */ + private String getNodeKey(AmsServerInfo nodeInfo) { + return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort(); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java new file mode 100644 index 0000000000..8b982f18ef --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server; + +import org.apache.amoro.client.AmsServerInfo; + +import java.util.List; +import java.util.Map; + +/** + * Interface for storing and retrieving bucket ID assignments to AMS nodes. Different + * implementations can use different storage backends (e.g., ZooKeeper, database). + */ +public interface BucketAssignStore { + + /** + * Save bucket ID assignments for a node. + * + * @param nodeInfo The node information + * @param bucketIds List of bucket IDs assigned to this node + * @throws Exception If save operation fails + */ + void saveAssignments(AmsServerInfo nodeInfo, List bucketIds) throws Exception; + + /** + * Get bucket ID assignments for a node. + * + * @param nodeInfo The node information + * @return List of bucket IDs assigned to this node, empty list if not found + * @throws Exception If retrieval operation fails + */ + List getAssignments(AmsServerInfo nodeInfo) throws Exception; + + /** + * Remove bucket ID assignments for a node. + * + * @param nodeInfo The node information + * @throws Exception If removal operation fails + */ + void removeAssignments(AmsServerInfo nodeInfo) throws Exception; + + /** + * Get all bucket ID assignments for all nodes. + * + * @return Map of node info to list of bucket IDs + * @throws Exception If retrieval operation fails + */ + Map> getAllAssignments() throws Exception; + + /** + * Get the last update time for a node's assignments. + * + * @param nodeInfo The node information + * @return Last update timestamp in milliseconds, 0 if not found + * @throws Exception If retrieval operation fails + */ + long getLastUpdateTime(AmsServerInfo nodeInfo) throws Exception; + + /** + * Update the last update time for a node's assignments. + * + * @param nodeInfo The node information + * @throws Exception If update operation fails + */ + void updateLastUpdateTime(AmsServerInfo nodeInfo) throws Exception; +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index 6cc05bc56b..9e1826978f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -60,6 +60,8 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -106,12 +108,15 @@ public class DefaultOptimizingService extends StatedPersistentBase private final TableService tableService; private final RuntimeHandlerChain tableHandlerChain; private final ExecutorService planExecutor; + private final HighAvailabilityContainer haContainer; + private final boolean isMasterSlaveMode; public DefaultOptimizingService( Configurations serviceConfig, CatalogManager catalogManager, OptimizerManager optimizerManager, - TableService tableService) { + TableService tableService, + HighAvailabilityContainer haContainer) { this.optimizerTouchTimeout = serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis(); this.taskAckTimeout = @@ -129,6 +134,9 @@ public DefaultOptimizingService( this.tableService = tableService; this.catalogManager = catalogManager; this.optimizerManager = optimizerManager; + this.haContainer = haContainer; + this.isMasterSlaveMode = + haContainer != null && serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE); this.tableHandlerChain = new TableRuntimeHandlerImpl(); this.planExecutor = Executors.newCachedThreadPool( @@ -485,23 +493,55 @@ public void dispose() { @Override public void run() { + // Use 1/4 of optimizerTouchTimeout as sync interval (default ~30 seconds),used for master + // slave mode. + long syncInterval = Math.max(5000, optimizerTouchTimeout / 4); + long lastSyncTime = 0; while (!stopped) { try { - OptimizerKeepingTask keepingTask = suspendingQueue.take(); - String token = keepingTask.getToken(); - boolean isExpired = !keepingTask.tryKeeping(); - Optional.ofNullable(keepingTask.getQueue()) - .ifPresent( - queue -> - queue - .collectTasks(buildSuspendingPredication(authOptimizers.keySet())) - .forEach(task -> retryTask(task, queue))); - if (isExpired) { - LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer()); - unregisterOptimizer(token); + // In master-slave mode, check leadership before processing + if (isMasterSlaveMode && (haContainer == null || !haContainer.hasLeadership())) { + // Not leader anymore, periodically sync from database + long currentTime = System.currentTimeMillis(); + if (currentTime - lastSyncTime >= syncInterval) { + loadOptimizersFromDatabase(); + lastSyncTime = currentTime; + } + // Use poll with timeout to allow periodic sync even when queue is empty + OptimizerKeepingTask keepingTask = + suspendingQueue.poll(syncInterval, TimeUnit.MILLISECONDS); + if (keepingTask != null) { + // Process any pending tasks, but don't update optimizer state in follower mode + Optional.ofNullable(keepingTask.getQueue()) + .ifPresent( + queue -> + queue + .collectTasks(buildSuspendingPredication(authOptimizers.keySet())) + .forEach(task -> retryTask(task, queue))); + // In follower mode, we don't update optimizer touch time or unregister optimizers + // as these operations should only be done by the leader + LOG.debug( + "Follower node: processed keeping task for optimizer {}, but not updating touch time", + keepingTask.getOptimizer()); + } } else { - LOG.debug("Optimizer {} is being touched, keep it", keepingTask.getOptimizer()); - keepInTouch(keepingTask.getOptimizer()); + // Leader mode: process optimizer keeping tasks normally + OptimizerKeepingTask keepingTask = suspendingQueue.take(); + String token = keepingTask.getToken(); + boolean isExpired = !keepingTask.tryKeeping(); + Optional.ofNullable(keepingTask.getQueue()) + .ifPresent( + queue -> + queue + .collectTasks(buildSuspendingPredication(authOptimizers.keySet())) + .forEach(task -> retryTask(task, queue))); + if (isExpired) { + LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer()); + unregisterOptimizer(token); + } else { + LOG.debug("Optimizer {} is being touched, keep it", keepingTask.getOptimizer()); + keepInTouch(keepingTask.getOptimizer()); + } } } catch (InterruptedException ignored) { } catch (Throwable t) { @@ -510,6 +550,107 @@ public void run() { } } + /** + * Load optimizer information from database. This is used in master-slave mode for follower + * nodes to sync optimizer state from database. This method performs incremental updates by + * comparing database state with local authOptimizers, only adding new optimizers and removing + * missing ones. + */ + private void loadOptimizersFromDatabase() { + try { + List dbOptimizers = + getAs(OptimizerMapper.class, OptimizerMapper::selectAll); + + // Build map of optimizers from database by token + Map dbOptimizersByToken = new HashMap<>(); + for (OptimizerInstance optimizer : dbOptimizers) { + String token = optimizer.getToken(); + if (token != null) { + dbOptimizersByToken.put(token, optimizer); + } + } + + // Find optimizers to add (in database but not in local authOptimizers) + Set localTokens = new HashSet<>(authOptimizers.keySet()); + Set dbTokens = new HashSet<>(dbOptimizersByToken.keySet()); + Set tokensToAdd = new HashSet<>(dbTokens); + tokensToAdd.removeAll(localTokens); + + // Find optimizers to remove (in local authOptimizers but not in database) + Set tokensToRemove = new HashSet<>(localTokens); + tokensToRemove.removeAll(dbTokens); + + // Find existing optimizers (in both local and database) + Set tokensToUpdate = new HashSet<>(localTokens); + tokensToUpdate.retainAll(dbTokens); + + // Add new optimizers + for (String token : tokensToAdd) { + OptimizerInstance optimizer = dbOptimizersByToken.get(token); + if (optimizer != null) { + registerOptimizerWithoutPersist(optimizer); + LOG.info("Added optimizer {} from database", token); + } + } + + // Update touch time for existing optimizers to prevent them from being expired + // after master-slave switch + for (String token : tokensToUpdate) { + OptimizerInstance localOptimizer = authOptimizers.get(token); + if (localOptimizer != null) { + localOptimizer.touch(); + LOG.debug("Updated touch time for existing optimizer {}", token); + } + } + + // Remove missing optimizers + for (String token : tokensToRemove) { + removeOptimizerFromLocal(token); + LOG.debug("Removed optimizer {} (not in database)", token); + } + + LOG.info( + "Synced optimizers from database: total={}, added={}, updated={}, removed={}, current={}", + dbOptimizersByToken.size(), + tokensToAdd.size(), + tokensToUpdate.size(), + tokensToRemove.size(), + authOptimizers.size()); + } catch (Exception e) { + LOG.error("Failed to load optimizers from database", e); + } + } + + /** + * Register optimizer without persisting to database. Used for follower nodes to sync optimizer + * state from database. + */ + private void registerOptimizerWithoutPersist(OptimizerInstance optimizer) { + OptimizingQueue optimizingQueue = optimizingQueueByGroup.get(optimizer.getGroupName()); + if (optimizingQueue == null) { + LOG.warn( + "Cannot register optimizer {}: optimizing queue for group {} not found", + optimizer.getToken(), + optimizer.getGroupName()); + return; + } + optimizingQueue.addOptimizer(optimizer); + authOptimizers.put(optimizer.getToken(), optimizer); + optimizingQueueByToken.put(optimizer.getToken(), optimizingQueue); + // Note: Don't call optimizerKeeper.keepInTouch() in follower mode + } + + /** + * Remove optimizer from local cache without deleting from database. Used for follower nodes. + */ + private void removeOptimizerFromLocal(String token) { + OptimizingQueue optimizingQueue = optimizingQueueByToken.remove(token); + OptimizerInstance optimizer = authOptimizers.remove(token); + if (optimizingQueue != null && optimizer != null) { + optimizingQueue.removeOptimizer(optimizer); + } + } + private void retryTask(TaskRuntime task, OptimizingQueue queue) { if (task.getStatus() == TaskRuntime.Status.ACKED && task.getStartTime() + taskExecuteTimeout < System.currentTimeMillis()) { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java index 6d15d37356..787eb1df5e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java @@ -35,6 +35,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; public class HighAvailabilityContainer implements LeaderLatchListener { @@ -45,11 +47,15 @@ public class HighAvailabilityContainer implements LeaderLatchListener { private final CuratorFramework zkClient; private final String tableServiceMasterPath; private final String optimizingServiceMasterPath; + private final String nodesPath; private final AmsServerInfo tableServiceServerInfo; private final AmsServerInfo optimizingServiceServerInfo; + private final boolean isMasterSlaveMode; private volatile CountDownLatch followerLatch; + private String registeredNodePath; public HighAvailabilityContainer(Configurations serviceConfig) throws Exception { + this.isMasterSlaveMode = serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE); if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) { String zkServerAddress = serviceConfig.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS); int zkSessionTimeout = @@ -59,6 +65,7 @@ public HighAvailabilityContainer(Configurations serviceConfig) throws Exception String haClusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME); tableServiceMasterPath = AmsHAProperties.getTableServiceMasterPath(haClusterName); optimizingServiceMasterPath = AmsHAProperties.getOptimizingServiceMasterPath(haClusterName); + nodesPath = AmsHAProperties.getNodesPath(haClusterName); ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000); this.zkClient = CuratorFrameworkFactory.builder() @@ -70,6 +77,7 @@ public HighAvailabilityContainer(Configurations serviceConfig) throws Exception zkClient.start(); createPathIfNeeded(tableServiceMasterPath); createPathIfNeeded(optimizingServiceMasterPath); + createPathIfNeeded(nodesPath); String leaderPath = AmsHAProperties.getLeaderPath(haClusterName); createPathIfNeeded(leaderPath); leaderLatch = new LeaderLatch(zkClient, leaderPath); @@ -90,8 +98,10 @@ public HighAvailabilityContainer(Configurations serviceConfig) throws Exception zkClient = null; tableServiceMasterPath = null; optimizingServiceMasterPath = null; + nodesPath = null; tableServiceServerInfo = null; optimizingServiceServerInfo = null; + registeredNodePath = null; // block follower latch forever when ha is disabled followerLatch = new CountDownLatch(1); } @@ -126,6 +136,27 @@ public void waitLeaderShip() throws Exception { LOG.info("Became the leader of AMS"); } + public void registAndElect() throws Exception { + if (!isMasterSlaveMode) { + LOG.debug("Master-slave mode is not enabled, skip node registration"); + return; + } + if (zkClient == null || nodesPath == null) { + LOG.warn("HA is not enabled, skip node registration"); + return; + } + // Register node to ZK using ephemeral node + // The node will be automatically deleted when the session expires + String nodeInfo = JacksonUtil.toJSONString(optimizingServiceServerInfo); + registeredNodePath = + zkClient + .create() + .creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) + .forPath(nodesPath + "/node-", nodeInfo.getBytes(StandardCharsets.UTF_8)); + LOG.info("Registered AMS node to ZK: {}", registeredNodePath); + } + public void waitFollowerShip() throws Exception { LOG.info("Waiting to become the follower of AMS"); if (followerLatch != null) { @@ -137,6 +168,18 @@ public void waitFollowerShip() throws Exception { public void close() { if (leaderLatch != null) { try { + // Unregister node from ZK + if (registeredNodePath != null) { + try { + zkClient.delete().forPath(registeredNodePath); + LOG.info("Unregistered AMS node from ZK: {}", registeredNodePath); + } catch (KeeperException.NoNodeException e) { + // Node already deleted, ignore + LOG.debug("Node {} already deleted", registeredNodePath); + } catch (Exception e) { + LOG.warn("Failed to unregister node from ZK: {}", registeredNodePath, e); + } + } this.leaderLatch.close(); this.zkClient.close(); } catch (IOException e) { @@ -171,6 +214,76 @@ private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int restB return amsServerInfo; } + /** + * Get list of alive nodes. Only the leader node can call this method. + * + * @return List of alive node information + */ + public List getAliveNodes() throws Exception { + List aliveNodes = new ArrayList<>(); + if (!isMasterSlaveMode) { + LOG.debug("Master-slave mode is not enabled, return empty node list"); + return aliveNodes; + } + if (zkClient == null || nodesPath == null) { + LOG.warn("HA is not enabled, return empty node list"); + return aliveNodes; + } + if (!leaderLatch.hasLeadership()) { + LOG.warn("Only leader node can get alive nodes list"); + return aliveNodes; + } + try { + List nodePaths = zkClient.getChildren().forPath(nodesPath); + for (String nodePath : nodePaths) { + try { + String fullPath = nodesPath + "/" + nodePath; + byte[] data = zkClient.getData().forPath(fullPath); + if (data != null && data.length > 0) { + String nodeInfoJson = new String(data, StandardCharsets.UTF_8); + AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson, AmsServerInfo.class); + aliveNodes.add(nodeInfo); + } + } catch (Exception e) { + LOG.warn("Failed to get node info for path: {}", nodePath, e); + } + } + } catch (KeeperException.NoNodeException e) { + LOG.debug("Nodes path {} does not exist", nodesPath); + } + return aliveNodes; + } + + /** + * Check if current node is the leader. + * + * @return true if current node is the leader, false otherwise + */ + public boolean hasLeadership() { + if (leaderLatch == null) { + return false; + } + return leaderLatch.hasLeadership(); + } + + /** + * Get the current node's table service server info. + * + * @return The current node's server info, null if HA is not enabled + */ + public AmsServerInfo getOptimizingServiceServerInfo() { + return optimizingServiceServerInfo; + } + + /** + * Get the ZooKeeper client. This is used for creating BucketAssignStore. + * + * @return The ZooKeeper client, null if HA is not enabled + */ + public CuratorFramework getZkClient() { + return zkClient; + } + private void createPathIfNeeded(String path) throws Exception { try { zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java b/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java new file mode 100644 index 0000000000..e4ae304386 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server; + +import org.apache.amoro.client.AmsServerInfo; +import org.apache.amoro.properties.AmsHAProperties; +import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework; +import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode; +import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException; +import org.apache.amoro.utils.JacksonUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * ZooKeeper-based implementation of BucketAssignStore. Stores bucket ID assignments in ZooKeeper + * with the following structure: /{namespace}/amoro/ams/bucket-assignments/{nodeKey}/assignments - + * bucket IDs /{namespace}/amoro/ams/bucket-assignments/{nodeKey}/last-update-time - timestamp + */ +public class ZkBucketAssignStore implements BucketAssignStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZkBucketAssignStore.class); + private static final String ASSIGNMENTS_SUFFIX = "/assignments"; + private static final String LAST_UPDATE_TIME_SUFFIX = "/last-update-time"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final TypeReference> LIST_STRING_TYPE = + new TypeReference>() {}; + + private final CuratorFramework zkClient; + private final String assignmentsBasePath; + + public ZkBucketAssignStore(CuratorFramework zkClient, String clusterName) { + this.zkClient = zkClient; + this.assignmentsBasePath = AmsHAProperties.getBucketAssignmentsPath(clusterName); + try { + createPathIfNeeded(assignmentsBasePath); + } catch (Exception e) { + LOG.error("Failed to create bucket assignments path", e); + throw new RuntimeException("Failed to initialize ZkBucketAssignStore", e); + } + } + + @Override + public void saveAssignments(AmsServerInfo nodeInfo, List bucketIds) throws Exception { + String nodeKey = getNodeKey(nodeInfo); + String assignmentsPath = assignmentsBasePath + "/" + nodeKey + ASSIGNMENTS_SUFFIX; + String assignmentsJson = JacksonUtil.toJSONString(bucketIds); + try { + if (zkClient.checkExists().forPath(assignmentsPath) != null) { + zkClient + .setData() + .forPath(assignmentsPath, assignmentsJson.getBytes(StandardCharsets.UTF_8)); + } else { + zkClient + .create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(assignmentsPath, assignmentsJson.getBytes(StandardCharsets.UTF_8)); + } + updateLastUpdateTime(nodeInfo); + LOG.debug("Saved bucket assignments for node {}: {}", nodeKey, bucketIds); + } catch (Exception e) { + LOG.error("Failed to save bucket assignments for node {}", nodeKey, e); + throw e; + } + } + + @Override + public List getAssignments(AmsServerInfo nodeInfo) throws Exception { + String nodeKey = getNodeKey(nodeInfo); + String assignmentsPath = assignmentsBasePath + "/" + nodeKey + ASSIGNMENTS_SUFFIX; + try { + if (zkClient.checkExists().forPath(assignmentsPath) == null) { + return new ArrayList<>(); + } + byte[] data = zkClient.getData().forPath(assignmentsPath); + if (data == null || data.length == 0) { + return new ArrayList<>(); + } + String assignmentsJson = new String(data, StandardCharsets.UTF_8); + return OBJECT_MAPPER.readValue(assignmentsJson, LIST_STRING_TYPE); + } catch (KeeperException.NoNodeException e) { + return new ArrayList<>(); + } catch (Exception e) { + LOG.error("Failed to get bucket assignments for node {}", nodeKey, e); + throw e; + } + } + + @Override + public void removeAssignments(AmsServerInfo nodeInfo) throws Exception { + String nodeKey = getNodeKey(nodeInfo); + String nodePath = assignmentsBasePath + "/" + nodeKey; + try { + if (zkClient.checkExists().forPath(nodePath) != null) { + zkClient.delete().deletingChildrenIfNeeded().forPath(nodePath); + LOG.debug("Removed bucket assignments for node {}", nodeKey); + } + } catch (KeeperException.NoNodeException e) { + // Already deleted, ignore + } catch (Exception e) { + LOG.error("Failed to remove bucket assignments for node {}", nodeKey, e); + throw e; + } + } + + @Override + public Map> getAllAssignments() throws Exception { + Map> allAssignments = new HashMap<>(); + try { + if (zkClient.checkExists().forPath(assignmentsBasePath) == null) { + return allAssignments; + } + List nodeKeys = zkClient.getChildren().forPath(assignmentsBasePath); + for (String nodeKey : nodeKeys) { + try { + AmsServerInfo nodeInfo = parseNodeKey(nodeKey); + List bucketIds = getAssignments(nodeInfo); + if (!bucketIds.isEmpty()) { + allAssignments.put(nodeInfo, bucketIds); + } + } catch (Exception e) { + LOG.warn("Failed to parse node key or get assignments: {}", nodeKey, e); + } + } + } catch (KeeperException.NoNodeException e) { + // Path doesn't exist, return empty map + } catch (Exception e) { + LOG.error("Failed to get all bucket assignments", e); + throw e; + } + return allAssignments; + } + + @Override + public long getLastUpdateTime(AmsServerInfo nodeInfo) throws Exception { + String nodeKey = getNodeKey(nodeInfo); + String timePath = assignmentsBasePath + "/" + nodeKey + LAST_UPDATE_TIME_SUFFIX; + try { + if (zkClient.checkExists().forPath(timePath) == null) { + return 0; + } + byte[] data = zkClient.getData().forPath(timePath); + if (data == null || data.length == 0) { + return 0; + } + return Long.parseLong(new String(data, StandardCharsets.UTF_8)); + } catch (KeeperException.NoNodeException e) { + return 0; + } catch (Exception e) { + LOG.error("Failed to get last update time for node {}", nodeKey, e); + throw e; + } + } + + @Override + public void updateLastUpdateTime(AmsServerInfo nodeInfo) throws Exception { + String nodeKey = getNodeKey(nodeInfo); + String timePath = assignmentsBasePath + "/" + nodeKey + LAST_UPDATE_TIME_SUFFIX; + long currentTime = System.currentTimeMillis(); + String timeStr = String.valueOf(currentTime); + try { + if (zkClient.checkExists().forPath(timePath) != null) { + zkClient.setData().forPath(timePath, timeStr.getBytes(StandardCharsets.UTF_8)); + } else { + zkClient + .create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(timePath, timeStr.getBytes(StandardCharsets.UTF_8)); + } + } catch (Exception e) { + LOG.error("Failed to update last update time for node {}", nodeKey, e); + throw e; + } + } + + private String getNodeKey(AmsServerInfo nodeInfo) { + return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort(); + } + + private AmsServerInfo parseNodeKey(String nodeKey) { + String[] parts = nodeKey.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid node key format: " + nodeKey); + } + AmsServerInfo nodeInfo = new AmsServerInfo(); + nodeInfo.setHost(parts[0]); + nodeInfo.setThriftBindPort(Integer.parseInt(parts[1])); + return nodeInfo; + } + + private void createPathIfNeeded(String path) throws Exception { + try { + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path); + } catch (KeeperException.NodeExistsException e) { + // ignore + } + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java index 16628406d3..3823551853 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java @@ -136,6 +136,13 @@ protected String buildOptimizerStartupArgsString(Resource resource) { if (StringUtils.isNotEmpty(resource.getResourceId())) { stringBuilder.append(" -id ").append(resource.getResourceId()); } + // Add master-slave mode flag if enabled + if (containerProperties != null + && "true" + .equalsIgnoreCase( + containerProperties.get(OptimizerProperties.OPTIMIZER_MASTER_SLAVE_MODE))) { + stringBuilder.append(" -msm"); + } return stringBuilder.toString(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java index 4cf587a978..5e5bac6f03 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -693,7 +693,7 @@ CloseableIterable fileScan( Comparable expireValue = getExpireValue(expirationConfig, field, expireTimestamp); return CloseableIterable.transform( CloseableIterable.withNoopClose(Iterables.concat(dataFiles, deleteFiles)), - contentFile -> { + (ContentFile contentFile) -> { Literal literal = getExpireTimestampLiteral( contentFile, @@ -702,7 +702,12 @@ CloseableIterable fileScan( expirationConfig.getDateTimePattern(), Locale.getDefault()), expirationConfig.getNumberDateFormat(), expireValue); - return new FileEntry(contentFile.copyWithoutStats(), literal); + // copyWithoutStats() returns ContentFile but with a captured wildcard type + // that needs explicit casting for type inference + @SuppressWarnings("rawtypes") + ContentFile fileWithoutStats = + (ContentFile) ((ContentFile) contentFile.copyWithoutStats()); + return new FileEntry(fileWithoutStats, literal); }); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketIdCount.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketIdCount.java new file mode 100644 index 0000000000..6441b67fd8 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketIdCount.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence; + +/** Simple class to hold bucketId and its table count. */ +public class BucketIdCount { + private String bucketId; + private Integer count; + + public String getBucketId() { + return bucketId; + } + + public void setBucketId(String bucketId) { + this.bucketId = bucketId; + } + + public Integer getCount() { + return count; + } + + public void setCount(Integer count) { + this.count = count; + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java index 3cfb69f2e9..2ea20d146e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java @@ -18,6 +18,7 @@ package org.apache.amoro.server.persistence.mapper; +import org.apache.amoro.server.persistence.BucketIdCount; import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.TableRuntimeState; import org.apache.ibatis.annotations.Delete; @@ -102,6 +103,21 @@ public interface TableRuntimeMapper { @ResultMap("tableRuntimeMeta") List selectAllRuntimes(); + @Select( + "") + @ResultMap("tableRuntimeMeta") + List selectRuntimesByBucketIds(@Param("bucketIds") List bucketIds); + @Select( "