diff --git a/pom.xml b/pom.xml index dcb29fed3b..242479548b 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,7 @@ 3.6.1 5.8.2 8.2.0 - 3.6.1 + 3.9.1 UTF-8 UTF-8 diff --git a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmLoggingService.java b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmLoggingService.java index a10f8ad8c0..da2d866639 100644 --- a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmLoggingService.java +++ b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmLoggingService.java @@ -228,20 +228,6 @@ else if (cmd.equals("-logging")) iter.remove(); LogManager.getLogManager().readConfiguration(new FileInputStream(filename)); } - else if(cmd.equals("-thread_pool_size")){ - if (! iter.hasNext()){ - throw new Exception("Missing -thread_pool_size value"); - } - iter.remove(); - try { - String size = iter.next(); - Integer threadPoolSize = Integer.valueOf(size); - properties.put("thread_pool_size", size); - } catch (NumberFormatException e) { - logger.warning("Specified thread pool size is not a number, will use value from properties or default value"); - } - iter.remove(); - } else throw new Exception("Unknown option " + cmd); } @@ -257,16 +243,6 @@ else if(cmd.equals("-thread_pool_size")){ logger.info("Alarm Logging Service (PID " + ProcessHandle.current().pid() + ")"); context = SpringApplication.run(AlarmLoggingService.class, original_args); - // Create scheduler with configured or default thread pool size - Integer threadPoolSize; - try { - threadPoolSize = Integer.valueOf(properties.getProperty("thread_pool_size")); - } catch (NumberFormatException e) { - logger.info("Specified thread pool size is not a number, will default to 4"); - threadPoolSize = 4; - } - Scheduler = Executors.newScheduledThreadPool(threadPoolSize); - logger.info("Properties:"); properties.forEach((k, v) -> { logger.info(k + ":" + v); }); @@ -274,6 +250,10 @@ else if(cmd.equals("-thread_pool_size")){ final List topicNames = Arrays.asList(properties.getProperty("alarm_topics").split(",")); logger.info("Starting logger for '..State': " + topicNames); + // Create scheduler with configured or default thread pool size + int threadPoolSize = topicNames.size() * 2; // default to 2 threads per topic + Scheduler = Executors.newScheduledThreadPool(threadPoolSize); + final boolean standalone = Boolean.valueOf(properties.getProperty("standalone")); // If the standalone is true, ignore the Schedulers for AlarmMessageLogger and AlarmCmdLogger diff --git a/services/alarm-logger/src/main/resources/application.properties b/services/alarm-logger/src/main/resources/application.properties index c95a000dfa..466d8341d6 100644 --- a/services/alarm-logger/src/main/resources/application.properties +++ b/services/alarm-logger/src/main/resources/application.properties @@ -48,9 +48,6 @@ use_dated_index_names=true # The units of the indices date span: Days (D), Weeks(W), Months(M), Years(Y). date_span_units=M -# Size of the thread pool for message and command loggers. Two threads per topic/configuration are required -thread_pool_size=4 - # Standalone - Alarm Logger Service standalone=false diff --git a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmServerMain.java b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmServerMain.java index 225eb86564..6edab58f52 100644 --- a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmServerMain.java +++ b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmServerMain.java @@ -18,11 +18,18 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.LogManager; import java.util.prefs.Preferences; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; import org.phoebus.applications.alarm.AlarmSystemConstants; import org.phoebus.applications.alarm.client.ClientState; import org.phoebus.applications.alarm.model.AlarmTreeItem; @@ -35,6 +42,7 @@ import org.phoebus.util.shell.CommandShell; import com.fasterxml.jackson.databind.JsonNode; +import org.apache.kafka.common.config.ConfigResource; /** Alarm Server * @author Kay Kasemir @@ -72,6 +80,107 @@ public class AlarmServerMain implements ServerModelListener "\trestart - Re-load alarm configuration and restart.\n" + "\tshutdown - Shut alarm server down and exit.\n"; + /** + * Ensure that the required Kafka topics exist and are correctly configured. + *

+ * Creates and configures the main alarm topic (compacted) and command/talk topics (deleted). + * For more details on alarm topic configuration, see: + * Refer to Configure Alarm Topics + * + * @param server Kafka server + * @param topic Base topic name + * @param kafka_props_file Extra Kafka properties file + * @throws Exception + */ + private static void ensureKafkaTopics(String server, String topic, String kafka_props_file) throws Exception { + try (AdminClient admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server))) { + Set topics = admin.listTopics().names().get(60, TimeUnit.SECONDS); + // Compacted topic + String compactedTopic = topic; + if (!topics.contains(compactedTopic)) { + createTopic(admin, compactedTopic); + } + setCompactedConfig(admin, compactedTopic); + + // Deleted topics + for (String suffix : List.of("Command", "Talk")) { + String deletedTopic = topic + suffix; + if (!topics.contains(deletedTopic)) { + createTopic(admin, deletedTopic); + } + setDeletedConfig(admin, deletedTopic); + } + } + } + + /** + * Create topics + * + * @param admin Admin client + * @param topic Topic name + * @throws Exception + */ + private static void createTopic(AdminClient admin, String topic) throws Exception { + NewTopic newTopic = new NewTopic(topic, 1, (short) 1); + try { + admin.createTopics(List.of(newTopic)).all().get(); + logger.info("Created topic: " + topic); + } catch (Exception e) { + if (e.getCause() instanceof org.apache.kafka.common.errors.TopicExistsException) { + logger.info("Topic already exists: " + topic); + } else { + throw e; + } + } + } + + /** + * Configure topic for alarm state storage with compaction to retain latest state. + * For configuration information, see: + *

+ * Refer to Configure Alarm Topics + * + * @param admin Admin client + * @param topic Topic name + * @throws Exception + */ + private static void setCompactedConfig(AdminClient admin, String topic) throws Exception { + ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic); + List configOps = List.of( + new AlterConfigOp(new ConfigEntry("cleanup.policy", "compact"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("segment.ms", "10000"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.01"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("min.compaction.lag.ms", "1000"), AlterConfigOp.OpType.SET) + ); + admin.incrementalAlterConfigs(Map.of(resource, configOps)).all().get(); + logger.info("Set compacted config for topic: " + topic); + } + + /** + * Configure topic for command/talk messages with time-based deletion. + * For configuration information, see: + * + * Refer to Configure Alarm Topics + * + * @param admin Admin client + * @param topic Topic name + * @throws Exception + */ + private static void setDeletedConfig(AdminClient admin, String topic) throws Exception { + ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic); + List configOps = List.of( + new AlterConfigOp(new ConfigEntry("cleanup.policy", "delete"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("segment.ms", "10000"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.01"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("min.compaction.lag.ms", "1000"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("retention.ms", "20000"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("delete.retention.ms", "1000"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("file.delete.delay.ms", "1000"), AlterConfigOp.OpType.SET) + ); + admin.incrementalAlterConfigs(Map.of(resource, configOps)).all().get(); + logger.info("Set deleted config for topic: " + topic); + } + private AlarmServerMain(final String server, final String config, final boolean use_shell, final String kafka_props_file) { logger.info("Server: " + server); @@ -85,6 +194,10 @@ private AlarmServerMain(final String server, final String config, final boolean boolean run = true; while (run) { + logger.info("Verify topics exists and are correctly configured..."); + // Create/verify topics before using Kafka + ensureKafkaTopics(server, config, kafka_props_file); + logger.info("Fetching past alarm states..."); final AlarmStateInitializer init = new AlarmStateInitializer(server, config, kafka_props_file); if (init.awaitCompleteStates()) diff --git a/services/alarm-server/src/test/resources/docker/docker-compose.yml b/services/alarm-server/src/test/resources/docker/docker-compose.yml index 9362645711..ec29480f81 100644 --- a/services/alarm-server/src/test/resources/docker/docker-compose.yml +++ b/services/alarm-server/src/test/resources/docker/docker-compose.yml @@ -1,29 +1,26 @@ version: '2.2' -networks: - rmoff_kafka: - name: rmoff_kafka - services: - zookeeper: - image: confluentinc/cp-zookeeper:5.5.0 - container_name: zookeeper - ports: - - "2181:2181" - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - kafka: - image: confluentinc/cp-kafka:5.5.0 + image: confluentinc/cp-kafka:7.6.0 container_name: kafka ports: - "9092:9092" - - "19092:19092" - depends_on: - - zookeeper environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,CONNECTIONS_FROM_HOST://localhost:19092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 \ No newline at end of file + CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LOG_DIRS: /var/lib/kafka/data + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + volumes: + - kafka_data:/var/lib/kafka/data + +volumes: + kafka_data: