diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index ff95d8d0dd..8e572125a2 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -52,6 +52,13 @@ public class AmoroManagementConf { .defaultValue("admin") .withDescription("The administrator password"); + /** Enable master & slave mode, which supports horizontal scaling of AMS. */ + public static final ConfigOption USE_MASTER_SLAVE_MODE = + ConfigOptions.key("use-master-slave-mode") + .booleanType() + .defaultValue(false) + .withDescription("Enable master & slave mode, which supports horizontal scaling of AMS."); + public static final ConfigOption CATALOG_META_CACHE_EXPIRATION_INTERVAL = ConfigOptions.key("catalog-meta-cache.expiration-interval") .durationType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 1f91119fc1..ba9f959f57 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -18,6 +18,8 @@ package org.apache.amoro.server; +import static org.apache.amoro.server.AmoroManagementConf.USE_MASTER_SLAVE_MODE; + import io.javalin.Javalin; import io.javalin.http.HttpCode; import io.javalin.http.staticfiles.Location; @@ -96,6 +98,7 @@ public class AmoroServiceContainer { public static final Logger LOG = LoggerFactory.getLogger(AmoroServiceContainer.class); public static final String SERVER_CONFIG_FILENAME = "config.yaml"; + private static boolean IS_MASTER_SLAVE_MODE = false; private final HighAvailabilityContainer haContainer; private DataSource dataSource; @@ -128,15 +131,22 @@ public static void main(String[] args) { LOG.info("AMS service has been shut down"); })); service.startRestServices(); - while (true) { - try { - service.waitLeaderShip(); - service.startOptimizingService(); - service.waitFollowerShip(); - } catch (Exception e) { - LOG.error("AMS start error", e); - } finally { - service.disposeOptimizingService(); + if (IS_MASTER_SLAVE_MODE) { + // Even if one does not become the master, it cannot block the subsequent logic. + service.registAndElect(); + // Regardless of whether tp becomes the master, the service needs to be activated. + service.startOptimizingService(); + } else { + while (true) { + try { + service.waitLeaderShip(); + service.startOptimizingService(); + service.waitFollowerShip(); + } catch (Exception e) { + LOG.error("AMS start error", e); + } finally { + service.disposeOptimizingService(); + } } } } catch (Throwable t) { @@ -145,6 +155,10 @@ public static void main(String[] args) { } } + public void registAndElect() throws Exception { + haContainer.registAndElect(); + } + public void waitLeaderShip() throws Exception { haContainer.waitLeaderShip(); } @@ -256,6 +270,7 @@ public void dispose() { private void initConfig() throws Exception { LOG.info("initializing configurations..."); new ConfigurationHelper().init(); + IS_MASTER_SLAVE_MODE = serviceConfig.getBoolean(USE_MASTER_SLAVE_MODE); } private void startThriftService() { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java index 6d15d37356..5ac0ef2df2 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java @@ -126,6 +126,10 @@ public void waitLeaderShip() throws Exception { LOG.info("Became the leader of AMS"); } + public void registAndElect() throws Exception { + // TODO Here you can register for AMS and participate in the election. + } + public void waitFollowerShip() throws Exception { LOG.info("Waiting to become the follower of AMS"); if (followerLatch != null) { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalMixedCatalogService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalMixedCatalogService.java index 4514d7ed2b..063fafddbe 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalMixedCatalogService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalMixedCatalogService.java @@ -189,6 +189,14 @@ public void before() { @AfterEach public void after() { LOG.info("Test finished."); + try { + // explicitly clean up possible residual table runtime records + if (catalog.tableExists(tableIdentifier)) { + catalog.dropTable(tableIdentifier, true); + } + } catch (Exception e) { + LOG.warn("Failed to drop table during cleanup", e); + } catalog.dropDatabase(database); }