Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public class AmoroManagementConf {
.defaultValue("admin")
.withDescription("The administrator password");

/** Enable master & slave mode, which supports horizontal scaling of AMS. */
public static final ConfigOption<Boolean> USE_MASTER_SLAVE_MODE =
ConfigOptions.key("use-master-slave-mode")
.booleanType()
.defaultValue(false)
.withDescription("Enable master & slave mode, which supports horizontal scaling of AMS.");

public static final ConfigOption<Duration> CATALOG_META_CACHE_EXPIRATION_INTERVAL =
ConfigOptions.key("catalog-meta-cache.expiration-interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server;

import static org.apache.amoro.server.AmoroManagementConf.USE_MASTER_SLAVE_MODE;

import io.javalin.Javalin;
import io.javalin.http.HttpCode;
import io.javalin.http.staticfiles.Location;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class AmoroServiceContainer {
public static final Logger LOG = LoggerFactory.getLogger(AmoroServiceContainer.class);

public static final String SERVER_CONFIG_FILENAME = "config.yaml";
private static boolean IS_MASTER_SLAVE_MODE = false;

private final HighAvailabilityContainer haContainer;
private DataSource dataSource;
Expand Down Expand Up @@ -128,15 +131,22 @@ public static void main(String[] args) {
LOG.info("AMS service has been shut down");
}));
service.startRestServices();
while (true) {
try {
service.waitLeaderShip();
service.startOptimizingService();
service.waitFollowerShip();
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.disposeOptimizingService();
if (IS_MASTER_SLAVE_MODE) {
// Even if one does not become the master, it cannot block the subsequent logic.
service.registAndElect();
// Regardless of whether tp becomes the master, the service needs to be activated.
service.startOptimizingService();
} else {
while (true) {
try {
service.waitLeaderShip();
service.startOptimizingService();
service.waitFollowerShip();
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.disposeOptimizingService();
}
}
}
} catch (Throwable t) {
Expand All @@ -145,6 +155,10 @@ public static void main(String[] args) {
}
}

public void registAndElect() throws Exception {
haContainer.registAndElect();
}

public void waitLeaderShip() throws Exception {
haContainer.waitLeaderShip();
}
Expand Down Expand Up @@ -256,6 +270,7 @@ public void dispose() {
private void initConfig() throws Exception {
LOG.info("initializing configurations...");
new ConfigurationHelper().init();
IS_MASTER_SLAVE_MODE = serviceConfig.getBoolean(USE_MASTER_SLAVE_MODE);
}

private void startThriftService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public void waitLeaderShip() throws Exception {
LOG.info("Became the leader of AMS");
}

public void registAndElect() throws Exception {
// TODO Here you can register for AMS and participate in the election.
}

public void waitFollowerShip() throws Exception {
LOG.info("Waiting to become the follower of AMS");
if (followerLatch != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ public void before() {
@AfterEach
public void after() {
LOG.info("Test finished.");
try {
// explicitly clean up possible residual table runtime records
if (catalog.tableExists(tableIdentifier)) {
catalog.dropTable(tableIdentifier, true);
}
} catch (Exception e) {
LOG.warn("Failed to drop table during cleanup", e);
}
catalog.dropDatabase(database);
}

Expand Down