Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
<apache.commons.math.version>3.6.1</apache.commons.math.version>
<junit.version>5.8.2</junit.version>
<elasticsearch.version>8.2.0</elasticsearch.version>
<kafka.version>3.6.1</kafka.version>
<kafka.version>3.9.1</kafka.version>
<!--<maven.repo.local>${project.build.directory}/.m2</maven.repo.local> -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,20 +228,6 @@ else if (cmd.equals("-logging"))
iter.remove();
LogManager.getLogManager().readConfiguration(new FileInputStream(filename));
}
else if(cmd.equals("-thread_pool_size")){
if (! iter.hasNext()){
throw new Exception("Missing -thread_pool_size value");
}
iter.remove();
try {
String size = iter.next();
Integer threadPoolSize = Integer.valueOf(size);
properties.put("thread_pool_size", size);
} catch (NumberFormatException e) {
logger.warning("Specified thread pool size is not a number, will use value from properties or default value");
}
iter.remove();
}
else
throw new Exception("Unknown option " + cmd);
}
Expand All @@ -257,23 +243,17 @@ else if(cmd.equals("-thread_pool_size")){
logger.info("Alarm Logging Service (PID " + ProcessHandle.current().pid() + ")");
context = SpringApplication.run(AlarmLoggingService.class, original_args);

// Create scheduler with configured or default thread pool size
Integer threadPoolSize;
try {
threadPoolSize = Integer.valueOf(properties.getProperty("thread_pool_size"));
} catch (NumberFormatException e) {
logger.info("Specified thread pool size is not a number, will default to 4");
threadPoolSize = 4;
}
Scheduler = Executors.newScheduledThreadPool(threadPoolSize);

logger.info("Properties:");
properties.forEach((k, v) -> { logger.info(k + ":" + v); });

// Read list of Topics
final List<String> topicNames = Arrays.asList(properties.getProperty("alarm_topics").split(","));
logger.info("Starting logger for '..State': " + topicNames);

// Create scheduler with configured or default thread pool size
int threadPoolSize = topicNames.size() * 2; // default to 2 threads per topic
Scheduler = Executors.newScheduledThreadPool(threadPoolSize);

final boolean standalone = Boolean.valueOf(properties.getProperty("standalone"));

// If the standalone is true, ignore the Schedulers for AlarmMessageLogger and AlarmCmdLogger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ use_dated_index_names=true
# The units of the indices date span: Days (D), Weeks(W), Months(M), Years(Y).
date_span_units=M

# Size of the thread pool for message and command loggers. Two threads per topic/configuration are required
thread_pool_size=4

# Standalone - Alarm Logger Service
standalone=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.prefs.Preferences;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.phoebus.applications.alarm.AlarmSystemConstants;
import org.phoebus.applications.alarm.client.ClientState;
import org.phoebus.applications.alarm.model.AlarmTreeItem;
Expand All @@ -35,6 +42,7 @@
import org.phoebus.util.shell.CommandShell;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.config.ConfigResource;

/** Alarm Server
* @author Kay Kasemir
Expand Down Expand Up @@ -72,6 +80,107 @@ public class AlarmServerMain implements ServerModelListener
"\trestart - Re-load alarm configuration and restart.\n" +
"\tshutdown - Shut alarm server down and exit.\n";

/**
* Ensure that the required Kafka topics exist and are correctly configured.
* <p>
* Creates and configures the main alarm topic (compacted) and command/talk topics (deleted).
* For more details on alarm topic configuration, see:
* Refer to <a href="https://github.com/ControlSystemStudio/phoebus/tree/master/app/alarm#configure-alarm-topics">Configure Alarm Topics</a>
*
* @param server Kafka server
* @param topic Base topic name
* @param kafka_props_file Extra Kafka properties file
* @throws Exception
*/
private static void ensureKafkaTopics(String server, String topic, String kafka_props_file) throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kafka_props_file argument is not used in this method.

try (AdminClient admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server))) {
Set<String> topics = admin.listTopics().names().get(60, TimeUnit.SECONDS);
// Compacted topic
String compactedTopic = topic;
if (!topics.contains(compactedTopic)) {
createTopic(admin, compactedTopic);
}
setCompactedConfig(admin, compactedTopic);

// Deleted topics
for (String suffix : List.of("Command", "Talk")) {
String deletedTopic = topic + suffix;
if (!topics.contains(deletedTopic)) {
createTopic(admin, deletedTopic);
}
setDeletedConfig(admin, deletedTopic);
}
}
}

/**
* Create topics
*
* @param admin Admin client
* @param topic Topic name
* @throws Exception
*/
private static void createTopic(AdminClient admin, String topic) throws Exception {
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
try {
admin.createTopics(List.of(newTopic)).all().get();
logger.info("Created topic: " + topic);
} catch (Exception e) {
if (e.getCause() instanceof org.apache.kafka.common.errors.TopicExistsException) {
logger.info("Topic already exists: " + topic);
} else {
throw e;
}
}
}

/**
* Configure topic for alarm state storage with compaction to retain latest state.
* For configuration information, see:
* <p>
* Refer to <a href="https://github.com/ControlSystemStudio/phoebus/tree/master/app/alarm#configure-alarm-topics">Configure Alarm Topics</a>
*
* @param admin Admin client
* @param topic Topic name
* @throws Exception
*/
private static void setCompactedConfig(AdminClient admin, String topic) throws Exception {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
List<AlterConfigOp> configOps = List.of(
new AlterConfigOp(new ConfigEntry("cleanup.policy", "compact"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("segment.ms", "10000"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.01"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("min.compaction.lag.ms", "1000"), AlterConfigOp.OpType.SET)
);
admin.incrementalAlterConfigs(Map.of(resource, configOps)).all().get();
logger.info("Set compacted config for topic: " + topic);
}

/**
* Configure topic for command/talk messages with time-based deletion.
* For configuration information, see:
*
* Refer to <a href="https://github.com/ControlSystemStudio/phoebus/tree/master/app/alarm#configure-alarm-topics">Configure Alarm Topics</a>
*
* @param admin Admin client
* @param topic Topic name
* @throws Exception
*/
private static void setDeletedConfig(AdminClient admin, String topic) throws Exception {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
List<AlterConfigOp> configOps = List.of(
new AlterConfigOp(new ConfigEntry("cleanup.policy", "delete"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("segment.ms", "10000"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.01"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("min.compaction.lag.ms", "1000"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("retention.ms", "20000"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("delete.retention.ms", "1000"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("file.delete.delay.ms", "1000"), AlterConfigOp.OpType.SET)
);
admin.incrementalAlterConfigs(Map.of(resource, configOps)).all().get();
logger.info("Set deleted config for topic: " + topic);
}

private AlarmServerMain(final String server, final String config, final boolean use_shell, final String kafka_props_file)
{
logger.info("Server: " + server);
Expand All @@ -85,6 +194,10 @@ private AlarmServerMain(final String server, final String config, final boolean
boolean run = true;
while (run)
{
logger.info("Verify topics exists and are correctly configured...");
// Create/verify topics before using Kafka
ensureKafkaTopics(server, config, kafka_props_file);

logger.info("Fetching past alarm states...");
final AlarmStateInitializer init = new AlarmStateInitializer(server, config, kafka_props_file);
if (init.awaitCompleteStates())
Expand Down
39 changes: 18 additions & 21 deletions services/alarm-server/src/test/resources/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
version: '2.2'

networks:
rmoff_kafka:
name: rmoff_kafka

services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.0
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka:5.5.0
image: confluentinc/cp-kafka:7.6.0
container_name: kafka
ports:
- "9092:9092"
- "19092:19092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,CONNECTIONS_FROM_HOST://localhost:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
volumes:
- kafka_data:/var/lib/kafka/data

volumes:
kafka_data: