diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 464ffe07d767..903373796547 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -586,6 +586,12 @@ public enum CassandraRelevantProperties */ SAI_QUERY_KIND_PER_QUERY_METRICS_ENABLED("cassandra.sai.metrics.query_kind.per_query.enabled", "false"), + /** + * Whether to enable SAI index metrics such as memtable flush metrics, compaction metrics, and disk usage metrics. + * These metrics include timers, histograms, counters, and gauges for index operations. + */ + SAI_INDEX_METRICS_ENABLED("cassandra.sai.metrics.index.enabled", "true"), + /** * If true, while creating or altering schema, NetworkTopologyStrategy won't check if the DC exists. * This is to remain compatible with older workflows that first change the replication before adding the nodes. diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java b/src/java/org/apache/cassandra/index/sai/IndexContext.java index f6eb4a974f94..eab6c2e7e59e 100644 --- a/src/java/org/apache/cassandra/index/sai/IndexContext.java +++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java @@ -19,11 +19,7 @@ package org.apache.cassandra.index.sai; import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -38,7 +34,6 @@ import org.slf4j.LoggerFactory; import io.github.jbellis.jvector.vector.VectorSimilarityFunction; -import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.db.ClusteringComparator; @@ -73,7 +68,6 @@ import org.apache.cassandra.index.sai.iterators.KeyRangeUnionIterator; import org.apache.cassandra.index.sai.memory.MemtableIndex; import org.apache.cassandra.index.sai.memory.MemtableKeyRangeIterator; -import org.apache.cassandra.index.sai.memory.TrieMemtableIndex; import org.apache.cassandra.index.sai.metrics.ColumnQueryMetrics; import org.apache.cassandra.index.sai.metrics.IndexMetrics; import org.apache.cassandra.index.sai.plan.Expression; @@ -93,6 +87,7 @@ import org.apache.cassandra.utils.concurrent.OpOrder; import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_INDEX_READS_DISABLED; +import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_INDEX_METRICS_ENABLED; import static org.apache.cassandra.config.CassandraRelevantProperties.VALIDATE_MAX_TERM_SIZE_AT_COORDINATOR; /** @@ -140,6 +135,7 @@ public class IndexContext private final ConcurrentMap liveMemtables = new ConcurrentHashMap<>(); private final IndexViewManager viewManager; + @Nullable private final IndexMetrics indexMetrics; private final ColumnQueryMetrics columnQueryMetrics; private final IndexWriterConfig indexWriterConfig; @@ -190,7 +186,7 @@ public IndexContext(@Nonnull String keyspace, this.vectorSimilarityFunction = indexWriterConfig.getSimilarityFunction(); this.hasEuclideanSimilarityFunc = vectorSimilarityFunction == VectorSimilarityFunction.EUCLIDEAN; - this.indexMetrics = new IndexMetrics(this); + this.indexMetrics = SAI_INDEX_METRICS_ENABLED.getBoolean() ? new IndexMetrics(this) : null; this.columnQueryMetrics = isVector() ? new ColumnQueryMetrics.VectorIndexMetrics(keyspace, table, getIndexName()) : isLiteral() ? new ColumnQueryMetrics.TrieIndexMetrics(keyspace, table, getIndexName()) : new ColumnQueryMetrics.BKDIndexMetrics(keyspace, table, getIndexName()); @@ -235,9 +231,9 @@ public ClusteringComparator comparator() return clusteringComparator; } - public IndexMetrics getIndexMetrics() + public Optional getIndexMetrics() { - return indexMetrics; + return Optional.ofNullable(indexMetrics); } public ColumnQueryMetrics getColumnQueryMetrics() @@ -298,7 +294,8 @@ public void index(DecoratedKey key, Row row, Memtable memtable, OpOrder.Group op ByteBuffer value = getValueOf(key, row, FBUtilities.nowInSeconds()); target.index(key, row.clustering(), value, memtable, opGroup); } - indexMetrics.memtableIndexWriteLatency.update(System.nanoTime() - start, TimeUnit.NANOSECONDS); + if (indexMetrics != null) + indexMetrics.memtableIndexWriteLatency.update(System.nanoTime() - start, TimeUnit.NANOSECONDS); } /** @@ -690,8 +687,10 @@ public void invalidate(boolean obsolete) dropped = true; liveMemtables.clear(); viewManager.invalidate(obsolete); - indexMetrics.release(); - columnQueryMetrics.release(); + if (indexMetrics != null) + indexMetrics.release(); + if (columnQueryMetrics != null) + columnQueryMetrics.release(); analyzerFactory.close(); if (queryAnalyzerFactory != analyzerFactory) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java index a2cd5f23edd1..f06bc9fe7431 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java @@ -138,7 +138,7 @@ public void complete(Stopwatch stopwatch) throws IOException catch (Throwable t) { logger.error(perIndexComponents.logMessage("Error while flushing index {}"), t.getMessage(), t); - indexContext().getIndexMetrics().memtableIndexFlushErrors.inc(); + indexContext().getIndexMetrics().ifPresent(m -> m.memtableIndexFlushErrors.inc()); throw t; } @@ -263,7 +263,7 @@ private void completeIndexFlush(long cellCount, long startTime, Stopwatch stopwa { perIndexComponents.markComplete(); - indexContext().getIndexMetrics().memtableIndexFlushCount.inc(); + indexContext().getIndexMetrics().ifPresent(m -> m.memtableIndexFlushCount.inc()); long elapsedTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); @@ -273,6 +273,7 @@ private void completeIndexFlush(long cellCount, long startTime, Stopwatch stopwa elapsedTime - startTime, elapsedTime); - indexContext().getIndexMetrics().memtableFlushCellsPerSecond.update((long) (cellCount * 1000.0 / Math.max(1, elapsedTime - startTime))); + indexContext().getIndexMetrics() + .ifPresent(m -> m.memtableFlushCellsPerSecond.update((long) (cellCount * 1000.0 / Math.max(1, elapsedTime - startTime)))); } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index 7989ecb9086c..4cb7e8fd6cf4 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -191,12 +191,11 @@ public void complete(Stopwatch stopwatch) throws IOException } finally { - if (indexContext.getIndexMetrics() != null) - { - indexContext.getIndexMetrics().segmentsPerCompaction.update(segments.size()); + indexContext.getIndexMetrics().ifPresent(m -> { + m.segmentsPerCompaction.update(segments.size()); segments.clear(); - indexContext.getIndexMetrics().compactionCount.inc(); - } + m.compactionCount.inc(); + }); } } @@ -325,12 +324,12 @@ private void flushSegment() throws IOException segments.add(segmentMetadata); double rowCount = segmentMetadata.numRows; - if (indexContext.getIndexMetrics() != null) - indexContext.getIndexMetrics().compactionSegmentCellsPerSecond.update((long)(rowCount / flushMillis * 1000.0)); - double segmentBytes = segmentMetadata.componentMetadatas.indexSize(); - if (indexContext.getIndexMetrics() != null) - indexContext.getIndexMetrics().compactionSegmentBytesPerSecond.update((long)(segmentBytes / flushMillis * 1000.0)); + + indexContext.getIndexMetrics().ifPresent(m -> { + m.compactionSegmentCellsPerSecond.update((long)(rowCount / flushMillis * 1000.0)); + m.compactionSegmentBytesPerSecond.update((long)(segmentBytes / flushMillis * 1000.0)); + }); logger.debug("Flushed segment with {} cells for a total of {} in {} ms for index {} with starting row id {} for sstable {}", (long) rowCount, FBUtilities.prettyPrintMemory((long) segmentBytes), flushMillis, indexContext.getIndexName(), @@ -352,7 +351,7 @@ private void flushSegment() throws IOException logger.error("Failed to build index for SSTable {}", perIndexComponents.descriptor(), t); perIndexComponents.forceDeleteAllComponents(); - indexContext.getIndexMetrics().segmentFlushErrors.inc(); + indexContext.getIndexMetrics().ifPresent(m -> m.segmentFlushErrors.inc()); throw t; } diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java index 6b10fb2f01e3..bf3bf49ec649 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java @@ -378,8 +378,8 @@ private void updateIndexMetricsQueriesCount(Plan plan) queriedIndexesContexts.add(indexContext); return Plan.ControlFlow.Continue; }); - queriedIndexesContexts.forEach(indexContext -> - indexContext.getIndexMetrics().queriesCount.inc()); + queriedIndexesContexts.forEach(indexContext -> indexContext.getIndexMetrics() + .ifPresent(m -> m.queriesCount.inc())); } Plan buildPlan() diff --git a/test/unit/org/apache/cassandra/index/sai/SAITester.java b/test/unit/org/apache/cassandra/index/sai/SAITester.java index cd7bd191eb06..6a1c8ebb728e 100644 --- a/test/unit/org/apache/cassandra/index/sai/SAITester.java +++ b/test/unit/org/apache/cassandra/index/sai/SAITester.java @@ -466,6 +466,11 @@ protected Object getMetricValue(ObjectName metricObjectName) return metricValue; } + protected void assertMetricExists(ObjectName name) + { + Assertions.assertThatNoException().isThrownBy(() -> getMetricValue(name)); + } + protected void assertMetricDoesNotExist(ObjectName name) { Assertions.assertThatThrownBy(() -> getMetricValue(name)).hasRootCauseInstanceOf(InstanceNotFoundException.class); diff --git a/test/unit/org/apache/cassandra/index/sai/metrics/IndexMetricsTest.java b/test/unit/org/apache/cassandra/index/sai/metrics/IndexMetricsTest.java index 057d71b76fa2..342d20f1b4e3 100644 --- a/test/unit/org/apache/cassandra/index/sai/metrics/IndexMetricsTest.java +++ b/test/unit/org/apache/cassandra/index/sai/metrics/IndexMetricsTest.java @@ -19,14 +19,20 @@ import java.util.concurrent.TimeUnit; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.junit.Test; import com.datastax.driver.core.ResultSet; import org.apache.cassandra.utils.Throwables; +import javax.management.ObjectName; + import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; + +import org.apache.cassandra.inject.Injection; +import org.apache.cassandra.inject.Injections; +import org.apache.cassandra.index.sai.disk.v1.MemtableIndexWriter; public class IndexMetricsTest extends AbstractMetricsTest { @@ -144,6 +150,73 @@ public void testMetricsThroughWriteLifecycle() waitForHistogramMeanBetween(objectName("CompactionSegmentCellsPerSecond", KEYSPACE, table, index, "IndexMetrics"), 1.0, 1000000.0); } + @Test + public void testIndexMetricsEnabledAndDisabled() + { + testIndexMetrics(true); + testIndexMetrics(false); + } + + private void testIndexMetrics(boolean metricsEnabled) + { + // Set the property before creating any indexes + CassandraRelevantProperties.SAI_INDEX_METRICS_ENABLED.setBoolean(metricsEnabled); + + try + { + String table = createTable("CREATE TABLE %s (ID1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " + + "{'class' : 'SizeTieredCompactionStrategy', 'enabled' : false }"); + String index = createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s (v1) USING 'StorageAttachedIndex'"); + + // Test all Gauge metrics + assertMetricExistsIfEnabled(metricsEnabled, "SSTableCellCount", table, index); + assertMetricExistsIfEnabled(metricsEnabled, "LiveMemtableIndexWriteCount", table, index); + assertMetricExistsIfEnabled(metricsEnabled, "DiskUsedBytes", table, index); + assertMetricExistsIfEnabled(metricsEnabled, "MemtableOnHeapIndexBytes", table, index); + assertMetricExistsIfEnabled(metricsEnabled, "MemtableOffHeapIndexBytes", table, index); + assertMetricExistsIfEnabled(metricsEnabled, "IndexFileCacheBytes", table, index); + + // Test all Counter metrics + assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexFlushCount", table, index); + assertMetricExistsIfEnabled(metricsEnabled, "CompactionCount", table, index); + assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexFlushErrors", table, index); + assertMetricExistsIfEnabled(metricsEnabled, "CompactionSegmentFlushErrors", table, index); + assertMetricExistsIfEnabled(metricsEnabled, "QueriesCount", table, index); + + // Test all Histogram metrics + assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexFlushCellsPerSecond", table, index); + assertMetricExistsIfEnabled(metricsEnabled, "SegmentsPerCompaction", table, index); + assertMetricExistsIfEnabled(metricsEnabled, "CompactionSegmentCellsPerSecond", table, index); + assertMetricExistsIfEnabled(metricsEnabled, "CompactionSegmentBytesPerSecond", table, index); + + // Test Timer metrics + assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexWriteLatency", table, index); + + // Test indexing operations to ensure null indexMetrics is handled gracefully + execute("INSERT INTO %s (id1, v1, v2) VALUES ('0', 0, '0')"); + execute("INSERT INTO %s (id1, v1, v2) VALUES ('1', 1, '1')"); + execute("INSERT INTO %s (id1, v1, v2) VALUES ('2', 2, '2')"); + + // Verify MemtableIndexWriteLatency metric behavior after indexing operations + assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexWriteLatency", table, index); + } + finally + { + // Reset property to default + CassandraRelevantProperties.SAI_INDEX_METRICS_ENABLED.setBoolean(true); + } + } + + private void assertMetricExistsIfEnabled(boolean shouldExist, String metricName, String table, String index) + { + ObjectName name = objectName(metricName, KEYSPACE, table, index, "IndexMetrics"); + + if (shouldExist) + assertMetricExists(name); + else + assertMetricDoesNotExist(name); + } + private void assertIndexQueryCount(String index, long expectedCount) { assertEquals(expectedCount, @@ -190,4 +263,38 @@ public void testQueriesCount() assertIndexQueryCount(indexV2, 2L); assertIndexQueryCount(indexV3, 1L); } + + @Test + public void testMemtableIndexFlushErrorIncrementsMetric() throws Throwable + { + String table = createTable("CREATE TABLE %s (ID1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " + + "{'class' : 'SizeTieredCompactionStrategy', 'enabled' : false }"); + String index = createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s (v1) USING 'StorageAttachedIndex'"); + + // Write some data to ensure there is something to flush + execute("INSERT INTO %s (id1, v1, v2) VALUES ('0', 0, '0')"); + + assertEquals(0L, getMetricValue(objectName("MemtableIndexFlushErrors", KEYSPACE, table, index, "IndexMetrics"))); + + // Inject a failure at the entry of MemtableIndexWriter#flush(...) to force a flush error + Injection failure = newFailureOnEntry("sai_memtable_flush_error", MemtableIndexWriter.class, "flush", RuntimeException.class); + Injections.inject(failure); + + try + { + // Trigger a flush, which should hit the injected failure + flush(KEYSPACE, table); + } + catch (Throwable ignored) + { + // Expected due to injected failure + } + finally + { + failure.disable(); + } + + // Verify the memtable index flush error metric is incremented + assertEquals(1L, getMetricValue(objectName("MemtableIndexFlushErrors", KEYSPACE, table, index, "IndexMetrics"))); + } }