Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
e527acb
[Subtask]: Use a new configuration item to control whether master & s…
Oct 29, 2025
064f4ea
[Subtask]: Use a new configuration item to control whether master & s…
Oct 30, 2025
879e398
[Subtask]: Add a registration function for table allocation in master…
Nov 10, 2025
bb1593b
[Subtask]: Add a registration function for table allocation in master…
Nov 10, 2025
2c60a4d
[Subtask]: Add a registration function for table allocation in master…
Nov 10, 2025
a3acd97
[Subtask]: Replace zk with mocking. #3919
Nov 10, 2025
b635c0d
[Subtask]: Replace zk with mocking. #3919
Nov 10, 2025
3335790
[Subtask]: add AmsAssignService to implement balanced bucket allocati…
Nov 10, 2025
79a1d85
Merge branch 'amoro#3919' into amoro#3921
Nov 10, 2025
80ba8f2
[Subtask]: add AmsAssignService to implement balanced bucket allocati…
Nov 10, 2025
0c91452
[Subtask]: add AmsAssignService to implement balanced bucket allocati…
Nov 11, 2025
f2ecc06
[Subtask]: add AmsAssignService to implement balanced bucket allocati…
Nov 12, 2025
b1da165
[Subtask]: Modify DefaultTableService to be compatible with master-sl…
Nov 12, 2025
9fdda5c
[Subtask]: Modify DefaultTableService to be compatible with master-sl…
Nov 12, 2025
d4d9073
[Subtask]: Modify DefaultTableService to be compatible with master-sl…
Nov 12, 2025
d0cb6b2
[Subtask]: Fix unit test failure issue #3923
Nov 14, 2025
cf7b3ad
[Subtask]: In master-slave mode, each AMS should automatically senses…
Nov 14, 2025
4ec6be4
[Subtask]: Modify the optimizer to support obtaining tasks from each …
Nov 17, 2025
62d2af1
[Subtask]: Modify the optimizer to support obtaining tasks from each …
Nov 17, 2025
8506dd5
[Subtask]: Optimize the logic for retrieving the AMS list from ZooKee…
Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,41 @@ public class AmoroManagementConf {
.defaultValue("admin")
.withDescription("The administrator password");

/** Enable master & slave mode, which supports horizontal scaling of AMS. */
public static final ConfigOption<Boolean> USE_MASTER_SLAVE_MODE =
ConfigOptions.key("use-master-slave-mode")
.booleanType()
.defaultValue(false)
.withDescription("Enable master & slave mode, which supports horizontal scaling of AMS.");

public static final ConfigOption<Integer> BUCKET_ID_TOTAL_COUNT =
ConfigOptions.key("bucket-id.total-count")
.intType()
.defaultValue(100)
.withDescription(
"Total count of bucket IDs for assignment. Bucket IDs range from 1 to this value.");

public static final ConfigOption<Duration> NODE_OFFLINE_TIMEOUT =
ConfigOptions.key("node-offline.timeout")
.durationType()
.defaultValue(Duration.ofMinutes(5))
.withDescription(
"Timeout duration to determine if a node is offline. After this duration, the node's bucket IDs will be reassigned.");

public static final ConfigOption<Duration> ASSIGN_INTERVAL =
ConfigOptions.key("bucket-assign.interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription(
"Interval for bucket assignment service to detect node changes and redistribute bucket IDs.");

public static final ConfigOption<Duration> BUCKET_TABLE_SYNC_INTERVAL =
ConfigOptions.key("bucket-table-sync.interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription(
"Interval for syncing tables assigned to bucket IDs in master-slave mode. Each node periodically loads tables from database based on its assigned bucket IDs.");

public static final ConfigOption<Duration> CATALOG_META_CACHE_EXPIRATION_INTERVAL =
ConfigOptions.key("catalog-meta-cache.expiration-interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server;

import static org.apache.amoro.server.AmoroManagementConf.USE_MASTER_SLAVE_MODE;

import io.javalin.Javalin;
import io.javalin.http.HttpCode;
import io.javalin.http.staticfiles.Location;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class AmoroServiceContainer {
public static final Logger LOG = LoggerFactory.getLogger(AmoroServiceContainer.class);

public static final String SERVER_CONFIG_FILENAME = "config.yaml";
private static boolean IS_MASTER_SLAVE_MODE = false;

private final HighAvailabilityContainer haContainer;
private DataSource dataSource;
Expand All @@ -110,6 +113,7 @@ public class AmoroServiceContainer {
private TServer optimizingServiceServer;
private Javalin httpServer;
private AmsServiceMetrics amsServiceMetrics;
private AmsAssignService amsAssignService;

public AmoroServiceContainer() throws Exception {
initConfig();
Expand All @@ -128,15 +132,22 @@ public static void main(String[] args) {
LOG.info("AMS service has been shut down");
}));
service.startRestServices();
while (true) {
try {
service.waitLeaderShip();
service.startOptimizingService();
service.waitFollowerShip();
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.disposeOptimizingService();
if (IS_MASTER_SLAVE_MODE) {
// Even if one does not become the master, it cannot block the subsequent logic.
service.registAndElect();
// Regardless of whether tp becomes the master, the service needs to be activated.
service.startOptimizingService();
} else {
while (true) {
try {
service.waitLeaderShip();
service.startOptimizingService();
service.waitFollowerShip();
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.disposeOptimizingService();
}
}
}
} catch (Throwable t) {
Expand All @@ -145,6 +156,10 @@ public static void main(String[] args) {
}
}

public void registAndElect() throws Exception {
haContainer.registAndElect();
}

public void waitLeaderShip() throws Exception {
haContainer.waitLeaderShip();
}
Expand All @@ -171,11 +186,29 @@ public void startOptimizingService() throws Exception {
TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager();
tableRuntimeFactoryManager.initialize();

// In master-slave mode, create BucketAssignStore and AmsAssignService
BucketAssignStore bucketAssignStore = null;
if (IS_MASTER_SLAVE_MODE && haContainer != null && haContainer.getZkClient() != null) {
String clusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
bucketAssignStore = new ZkBucketAssignStore(haContainer.getZkClient(), clusterName);
// Create and start AmsAssignService for bucket assignment
amsAssignService =
new AmsAssignService(haContainer, serviceConfig, haContainer.getZkClient());
amsAssignService.start();
LOG.info("AmsAssignService started for master-slave mode");
}

tableService =
new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager);
new DefaultTableService(
serviceConfig,
catalogManager,
tableRuntimeFactoryManager,
haContainer,
bucketAssignStore);

optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
new DefaultOptimizingService(
serviceConfig, catalogManager, optimizerManager, tableService, haContainer);

LOG.info("Setting up AMS table executors...");
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
Expand Down Expand Up @@ -214,6 +247,11 @@ public void disposeOptimizingService() {
LOG.info("Stopping optimizing server[serving:{}] ...", optimizingServiceServer.isServing());
optimizingServiceServer.stop();
}
if (amsAssignService != null) {
LOG.info("Stopping AmsAssignService...");
amsAssignService.stop();
amsAssignService = null;
}
if (tableService != null) {
LOG.info("Stopping table service...");
tableService.dispose();
Expand Down Expand Up @@ -256,6 +294,7 @@ public void dispose() {
private void initConfig() throws Exception {
LOG.info("initializing configurations...");
new ConfigurationHelper().init();
IS_MASTER_SLAVE_MODE = serviceConfig.getBoolean(USE_MASTER_SLAVE_MODE);
}

private void startThriftService() {
Expand Down Expand Up @@ -534,6 +573,12 @@ private void initContainerConfig() {
containerProperties.putIfAbsent(
OptimizerProperties.AMS_OPTIMIZER_URI,
AmsUtil.getAMSThriftAddress(serviceConfig, Constants.THRIFT_OPTIMIZING_SERVICE_NAME));
// Add master-slave mode flag to container properties
// Read from serviceConfig directly since IS_MASTER_SLAVE_MODE is set after
// initContainerConfig()
if (serviceConfig.getBoolean(USE_MASTER_SLAVE_MODE)) {
containerProperties.put(OptimizerProperties.OPTIMIZER_MASTER_SLAVE_MODE, "true");
}
// put addition system properties
container.setProperties(containerProperties);
containerList.add(container);
Expand Down
Loading