topicNames = Arrays.asList(properties.getProperty("alarm_topics").split(","));
logger.info("Starting logger for '..State': " + topicNames);
+ // Create scheduler with configured or default thread pool size
+ int threadPoolSize = topicNames.size() * 2; // default to 2 threads per topic
+ Scheduler = Executors.newScheduledThreadPool(threadPoolSize);
+
final boolean standalone = Boolean.valueOf(properties.getProperty("standalone"));
// If the standalone is true, ignore the Schedulers for AlarmMessageLogger and AlarmCmdLogger
diff --git a/services/alarm-logger/src/main/resources/application.properties b/services/alarm-logger/src/main/resources/application.properties
index c95a000dfa..466d8341d6 100644
--- a/services/alarm-logger/src/main/resources/application.properties
+++ b/services/alarm-logger/src/main/resources/application.properties
@@ -48,9 +48,6 @@ use_dated_index_names=true
# The units of the indices date span: Days (D), Weeks(W), Months(M), Years(Y).
date_span_units=M
-# Size of the thread pool for message and command loggers. Two threads per topic/configuration are required
-thread_pool_size=4
-
# Standalone - Alarm Logger Service
standalone=false
diff --git a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmServerMain.java b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmServerMain.java
index 225eb86564..6edab58f52 100644
--- a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmServerMain.java
+++ b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmServerMain.java
@@ -18,11 +18,18 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.prefs.Preferences;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
import org.phoebus.applications.alarm.AlarmSystemConstants;
import org.phoebus.applications.alarm.client.ClientState;
import org.phoebus.applications.alarm.model.AlarmTreeItem;
@@ -35,6 +42,7 @@
import org.phoebus.util.shell.CommandShell;
import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.kafka.common.config.ConfigResource;
/** Alarm Server
* @author Kay Kasemir
@@ -72,6 +80,107 @@ public class AlarmServerMain implements ServerModelListener
"\trestart - Re-load alarm configuration and restart.\n" +
"\tshutdown - Shut alarm server down and exit.\n";
+ /**
+ * Ensure that the required Kafka topics exist and are correctly configured.
+ *
+ * Creates and configures the main alarm topic (compacted) and command/talk topics (deleted).
+ * For more details on alarm topic configuration, see:
+ * Refer to Configure Alarm Topics
+ *
+ * @param server Kafka server
+ * @param topic Base topic name
+ * @param kafka_props_file Extra Kafka properties file
+ * @throws Exception
+ */
+ private static void ensureKafkaTopics(String server, String topic, String kafka_props_file) throws Exception {
+ try (AdminClient admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server))) {
+ Set topics = admin.listTopics().names().get(60, TimeUnit.SECONDS);
+ // Compacted topic
+ String compactedTopic = topic;
+ if (!topics.contains(compactedTopic)) {
+ createTopic(admin, compactedTopic);
+ }
+ setCompactedConfig(admin, compactedTopic);
+
+ // Deleted topics
+ for (String suffix : List.of("Command", "Talk")) {
+ String deletedTopic = topic + suffix;
+ if (!topics.contains(deletedTopic)) {
+ createTopic(admin, deletedTopic);
+ }
+ setDeletedConfig(admin, deletedTopic);
+ }
+ }
+ }
+
+ /**
+ * Create topics
+ *
+ * @param admin Admin client
+ * @param topic Topic name
+ * @throws Exception
+ */
+ private static void createTopic(AdminClient admin, String topic) throws Exception {
+ NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
+ try {
+ admin.createTopics(List.of(newTopic)).all().get();
+ logger.info("Created topic: " + topic);
+ } catch (Exception e) {
+ if (e.getCause() instanceof org.apache.kafka.common.errors.TopicExistsException) {
+ logger.info("Topic already exists: " + topic);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Configure topic for alarm state storage with compaction to retain latest state.
+ * For configuration information, see:
+ *
+ * Refer to Configure Alarm Topics
+ *
+ * @param admin Admin client
+ * @param topic Topic name
+ * @throws Exception
+ */
+ private static void setCompactedConfig(AdminClient admin, String topic) throws Exception {
+ ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
+ List configOps = List.of(
+ new AlterConfigOp(new ConfigEntry("cleanup.policy", "compact"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("segment.ms", "10000"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.01"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("min.compaction.lag.ms", "1000"), AlterConfigOp.OpType.SET)
+ );
+ admin.incrementalAlterConfigs(Map.of(resource, configOps)).all().get();
+ logger.info("Set compacted config for topic: " + topic);
+ }
+
+ /**
+ * Configure topic for command/talk messages with time-based deletion.
+ * For configuration information, see:
+ *
+ * Refer to Configure Alarm Topics
+ *
+ * @param admin Admin client
+ * @param topic Topic name
+ * @throws Exception
+ */
+ private static void setDeletedConfig(AdminClient admin, String topic) throws Exception {
+ ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
+ List configOps = List.of(
+ new AlterConfigOp(new ConfigEntry("cleanup.policy", "delete"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("segment.ms", "10000"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.01"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("min.compaction.lag.ms", "1000"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("retention.ms", "20000"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("delete.retention.ms", "1000"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("file.delete.delay.ms", "1000"), AlterConfigOp.OpType.SET)
+ );
+ admin.incrementalAlterConfigs(Map.of(resource, configOps)).all().get();
+ logger.info("Set deleted config for topic: " + topic);
+ }
+
private AlarmServerMain(final String server, final String config, final boolean use_shell, final String kafka_props_file)
{
logger.info("Server: " + server);
@@ -85,6 +194,10 @@ private AlarmServerMain(final String server, final String config, final boolean
boolean run = true;
while (run)
{
+ logger.info("Verify topics exists and are correctly configured...");
+ // Create/verify topics before using Kafka
+ ensureKafkaTopics(server, config, kafka_props_file);
+
logger.info("Fetching past alarm states...");
final AlarmStateInitializer init = new AlarmStateInitializer(server, config, kafka_props_file);
if (init.awaitCompleteStates())
diff --git a/services/alarm-server/src/test/resources/docker/docker-compose.yml b/services/alarm-server/src/test/resources/docker/docker-compose.yml
index 9362645711..ec29480f81 100644
--- a/services/alarm-server/src/test/resources/docker/docker-compose.yml
+++ b/services/alarm-server/src/test/resources/docker/docker-compose.yml
@@ -1,29 +1,26 @@
version: '2.2'
-networks:
- rmoff_kafka:
- name: rmoff_kafka
-
services:
- zookeeper:
- image: confluentinc/cp-zookeeper:5.5.0
- container_name: zookeeper
- ports:
- - "2181:2181"
- environment:
- ZOOKEEPER_CLIENT_PORT: 2181
-
kafka:
- image: confluentinc/cp-kafka:5.5.0
+ image: confluentinc/cp-kafka:7.6.0
container_name: kafka
ports:
- "9092:9092"
- - "19092:19092"
- depends_on:
- - zookeeper
environment:
- KAFKA_BROKER_ID: 1
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,CONNECTIONS_FROM_HOST://localhost:19092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
\ No newline at end of file
+ CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
+ KAFKA_NODE_ID: 1
+ KAFKA_PROCESS_ROLES: broker,controller
+ KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
+ KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
+ KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
+ KAFKA_LOG_DIRS: /var/lib/kafka/data
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ volumes:
+ - kafka_data:/var/lib/kafka/data
+
+volumes:
+ kafka_data: