Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,12 @@ public enum CassandraRelevantProperties
*/
SAI_QUERY_KIND_PER_QUERY_METRICS_ENABLED("cassandra.sai.metrics.query_kind.per_query.enabled", "false"),

/**
* Whether to enable SAI index metrics such as memtable flush metrics, compaction metrics, and disk usage metrics.
* These metrics include timers, histograms, counters, and gauges for index operations.
*/
SAI_INDEX_METRICS_ENABLED("cassandra.sai.metrics.index.enabled", "true"),

/**
* If true, while creating or altering schema, NetworkTopologyStrategy won't check if the DC exists.
* This is to remain compatible with older workflows that first change the replication before adding the nodes.
Expand Down
25 changes: 12 additions & 13 deletions src/java/org/apache/cassandra/index/sai/IndexContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
package org.apache.cassandra.index.sai;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -38,7 +34,6 @@
import org.slf4j.LoggerFactory;

import io.github.jbellis.jvector.vector.VectorSimilarityFunction;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.cql3.statements.schema.IndexTarget;
import org.apache.cassandra.db.ClusteringComparator;
Expand Down Expand Up @@ -73,7 +68,6 @@
import org.apache.cassandra.index.sai.iterators.KeyRangeUnionIterator;
import org.apache.cassandra.index.sai.memory.MemtableIndex;
import org.apache.cassandra.index.sai.memory.MemtableKeyRangeIterator;
import org.apache.cassandra.index.sai.memory.TrieMemtableIndex;
import org.apache.cassandra.index.sai.metrics.ColumnQueryMetrics;
import org.apache.cassandra.index.sai.metrics.IndexMetrics;
import org.apache.cassandra.index.sai.plan.Expression;
Expand All @@ -93,6 +87,7 @@
import org.apache.cassandra.utils.concurrent.OpOrder;

import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_INDEX_READS_DISABLED;
import static org.apache.cassandra.config.CassandraRelevantProperties.SAI_INDEX_METRICS_ENABLED;
import static org.apache.cassandra.config.CassandraRelevantProperties.VALIDATE_MAX_TERM_SIZE_AT_COORDINATOR;

/**
Expand Down Expand Up @@ -140,6 +135,7 @@ public class IndexContext
private final ConcurrentMap<Memtable, MemtableIndex> liveMemtables = new ConcurrentHashMap<>();

private final IndexViewManager viewManager;
@Nullable
private final IndexMetrics indexMetrics;
private final ColumnQueryMetrics columnQueryMetrics;
private final IndexWriterConfig indexWriterConfig;
Expand Down Expand Up @@ -190,7 +186,7 @@ public IndexContext(@Nonnull String keyspace,
this.vectorSimilarityFunction = indexWriterConfig.getSimilarityFunction();
this.hasEuclideanSimilarityFunc = vectorSimilarityFunction == VectorSimilarityFunction.EUCLIDEAN;

this.indexMetrics = new IndexMetrics(this);
this.indexMetrics = SAI_INDEX_METRICS_ENABLED.getBoolean() ? new IndexMetrics(this) : null;
this.columnQueryMetrics = isVector() ? new ColumnQueryMetrics.VectorIndexMetrics(keyspace, table, getIndexName()) :
isLiteral() ? new ColumnQueryMetrics.TrieIndexMetrics(keyspace, table, getIndexName())
: new ColumnQueryMetrics.BKDIndexMetrics(keyspace, table, getIndexName());
Expand Down Expand Up @@ -235,9 +231,9 @@ public ClusteringComparator comparator()
return clusteringComparator;
}

public IndexMetrics getIndexMetrics()
public Optional<IndexMetrics> getIndexMetrics()
{
return indexMetrics;
return Optional.ofNullable(indexMetrics);
}

public ColumnQueryMetrics getColumnQueryMetrics()
Expand Down Expand Up @@ -298,7 +294,8 @@ public void index(DecoratedKey key, Row row, Memtable memtable, OpOrder.Group op
ByteBuffer value = getValueOf(key, row, FBUtilities.nowInSeconds());
target.index(key, row.clustering(), value, memtable, opGroup);
}
indexMetrics.memtableIndexWriteLatency.update(System.nanoTime() - start, TimeUnit.NANOSECONDS);
if (indexMetrics != null)
indexMetrics.memtableIndexWriteLatency.update(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}

/**
Expand Down Expand Up @@ -690,8 +687,10 @@ public void invalidate(boolean obsolete)
dropped = true;
liveMemtables.clear();
viewManager.invalidate(obsolete);
indexMetrics.release();
columnQueryMetrics.release();
if (indexMetrics != null)
indexMetrics.release();
if (columnQueryMetrics != null)
columnQueryMetrics.release();

analyzerFactory.close();
if (queryAnalyzerFactory != analyzerFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void complete(Stopwatch stopwatch) throws IOException
catch (Throwable t)
{
logger.error(perIndexComponents.logMessage("Error while flushing index {}"), t.getMessage(), t);
indexContext().getIndexMetrics().memtableIndexFlushErrors.inc();
indexContext().getIndexMetrics().ifPresent(m -> m.memtableIndexFlushErrors.inc());

throw t;
}
Expand Down Expand Up @@ -263,7 +263,7 @@ private void completeIndexFlush(long cellCount, long startTime, Stopwatch stopwa
{
perIndexComponents.markComplete();

indexContext().getIndexMetrics().memtableIndexFlushCount.inc();
indexContext().getIndexMetrics().ifPresent(m -> m.memtableIndexFlushCount.inc());

long elapsedTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);

Expand All @@ -273,6 +273,7 @@ private void completeIndexFlush(long cellCount, long startTime, Stopwatch stopwa
elapsedTime - startTime,
elapsedTime);

indexContext().getIndexMetrics().memtableFlushCellsPerSecond.update((long) (cellCount * 1000.0 / Math.max(1, elapsedTime - startTime)));
indexContext().getIndexMetrics()
.ifPresent(m -> m.memtableFlushCellsPerSecond.update((long) (cellCount * 1000.0 / Math.max(1, elapsedTime - startTime))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,11 @@ public void complete(Stopwatch stopwatch) throws IOException
}
finally
{
if (indexContext.getIndexMetrics() != null)
{
indexContext.getIndexMetrics().segmentsPerCompaction.update(segments.size());
indexContext.getIndexMetrics().ifPresent(m -> {
m.segmentsPerCompaction.update(segments.size());
segments.clear();
indexContext.getIndexMetrics().compactionCount.inc();
}
m.compactionCount.inc();
});
}
}

Expand Down Expand Up @@ -325,12 +324,12 @@ private void flushSegment() throws IOException
segments.add(segmentMetadata);

double rowCount = segmentMetadata.numRows;
if (indexContext.getIndexMetrics() != null)
indexContext.getIndexMetrics().compactionSegmentCellsPerSecond.update((long)(rowCount / flushMillis * 1000.0));

double segmentBytes = segmentMetadata.componentMetadatas.indexSize();
if (indexContext.getIndexMetrics() != null)
indexContext.getIndexMetrics().compactionSegmentBytesPerSecond.update((long)(segmentBytes / flushMillis * 1000.0));

indexContext.getIndexMetrics().ifPresent(m -> {
m.compactionSegmentCellsPerSecond.update((long)(rowCount / flushMillis * 1000.0));
m.compactionSegmentBytesPerSecond.update((long)(segmentBytes / flushMillis * 1000.0));
});

logger.debug("Flushed segment with {} cells for a total of {} in {} ms for index {} with starting row id {} for sstable {}",
(long) rowCount, FBUtilities.prettyPrintMemory((long) segmentBytes), flushMillis, indexContext.getIndexName(),
Expand All @@ -352,7 +351,7 @@ private void flushSegment() throws IOException
logger.error("Failed to build index for SSTable {}", perIndexComponents.descriptor(), t);
perIndexComponents.forceDeleteAllComponents();

indexContext.getIndexMetrics().segmentFlushErrors.inc();
indexContext.getIndexMetrics().ifPresent(m -> m.segmentFlushErrors.inc());

throw t;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ private void updateIndexMetricsQueriesCount(Plan plan)
queriedIndexesContexts.add(indexContext);
return Plan.ControlFlow.Continue;
});
queriedIndexesContexts.forEach(indexContext ->
indexContext.getIndexMetrics().queriesCount.inc());
queriedIndexesContexts.forEach(indexContext -> indexContext.getIndexMetrics()
.ifPresent(m -> m.queriesCount.inc()));
}

Plan buildPlan()
Expand Down
5 changes: 5 additions & 0 deletions test/unit/org/apache/cassandra/index/sai/SAITester.java
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,11 @@ protected Object getMetricValue(ObjectName metricObjectName)
return metricValue;
}

protected void assertMetricExists(ObjectName name)
{
Assertions.assertThatNoException().isThrownBy(() -> getMetricValue(name));
}

protected void assertMetricDoesNotExist(ObjectName name)
{
Assertions.assertThatThrownBy(() -> getMetricValue(name)).hasRootCauseInstanceOf(InstanceNotFoundException.class);
Expand Down
111 changes: 109 additions & 2 deletions test/unit/org/apache/cassandra/index/sai/metrics/IndexMetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@

import java.util.concurrent.TimeUnit;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.junit.Test;

import com.datastax.driver.core.ResultSet;
import org.apache.cassandra.utils.Throwables;

import javax.management.ObjectName;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

import org.apache.cassandra.inject.Injection;
import org.apache.cassandra.inject.Injections;
import org.apache.cassandra.index.sai.disk.v1.MemtableIndexWriter;

public class IndexMetricsTest extends AbstractMetricsTest
{
Expand Down Expand Up @@ -144,6 +150,73 @@ public void testMetricsThroughWriteLifecycle()
waitForHistogramMeanBetween(objectName("CompactionSegmentCellsPerSecond", KEYSPACE, table, index, "IndexMetrics"), 1.0, 1000000.0);
}

@Test
public void testIndexMetricsEnabledAndDisabled()
{
testIndexMetrics(true);
testIndexMetrics(false);
}

private void testIndexMetrics(boolean metricsEnabled)
{
// Set the property before creating any indexes
CassandraRelevantProperties.SAI_INDEX_METRICS_ENABLED.setBoolean(metricsEnabled);

try
{
String table = createTable("CREATE TABLE %s (ID1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " +
"{'class' : 'SizeTieredCompactionStrategy', 'enabled' : false }");
String index = createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s (v1) USING 'StorageAttachedIndex'");

// Test all Gauge metrics
assertMetricExistsIfEnabled(metricsEnabled, "SSTableCellCount", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "LiveMemtableIndexWriteCount", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "DiskUsedBytes", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "MemtableOnHeapIndexBytes", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "MemtableOffHeapIndexBytes", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "IndexFileCacheBytes", table, index);

// Test all Counter metrics
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexFlushCount", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "CompactionCount", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexFlushErrors", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "CompactionSegmentFlushErrors", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "QueriesCount", table, index);

// Test all Histogram metrics
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexFlushCellsPerSecond", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "SegmentsPerCompaction", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "CompactionSegmentCellsPerSecond", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "CompactionSegmentBytesPerSecond", table, index);

// Test Timer metrics
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexWriteLatency", table, index);

// Test indexing operations to ensure null indexMetrics is handled gracefully
execute("INSERT INTO %s (id1, v1, v2) VALUES ('0', 0, '0')");
execute("INSERT INTO %s (id1, v1, v2) VALUES ('1', 1, '1')");
execute("INSERT INTO %s (id1, v1, v2) VALUES ('2', 2, '2')");

// Verify MemtableIndexWriteLatency metric behavior after indexing operations
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexWriteLatency", table, index);
}
finally
{
// Reset property to default
CassandraRelevantProperties.SAI_INDEX_METRICS_ENABLED.setBoolean(true);
}
}

private void assertMetricExistsIfEnabled(boolean shouldExist, String metricName, String table, String index)
{
ObjectName name = objectName(metricName, KEYSPACE, table, index, "IndexMetrics");

if (shouldExist)
assertMetricExists(name);
else
assertMetricDoesNotExist(name);
}

private void assertIndexQueryCount(String index, long expectedCount)
{
assertEquals(expectedCount,
Expand Down Expand Up @@ -190,4 +263,38 @@ public void testQueriesCount()
assertIndexQueryCount(indexV2, 2L);
assertIndexQueryCount(indexV3, 1L);
}

@Test
public void testMemtableIndexFlushErrorIncrementsMetric() throws Throwable
{
String table = createTable("CREATE TABLE %s (ID1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " +
"{'class' : 'SizeTieredCompactionStrategy', 'enabled' : false }");
String index = createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s (v1) USING 'StorageAttachedIndex'");

// Write some data to ensure there is something to flush
execute("INSERT INTO %s (id1, v1, v2) VALUES ('0', 0, '0')");

assertEquals(0L, getMetricValue(objectName("MemtableIndexFlushErrors", KEYSPACE, table, index, "IndexMetrics")));

// Inject a failure at the entry of MemtableIndexWriter#flush(...) to force a flush error
Injection failure = newFailureOnEntry("sai_memtable_flush_error", MemtableIndexWriter.class, "flush", RuntimeException.class);
Injections.inject(failure);

try
{
// Trigger a flush, which should hit the injected failure
flush(KEYSPACE, table);
}
catch (Throwable ignored)
{
// Expected due to injected failure
}
finally
{
failure.disable();
}

// Verify the memtable index flush error metric is incremented
assertEquals(1L, getMetricValue(objectName("MemtableIndexFlushErrors", KEYSPACE, table, index, "IndexMetrics")));
}
}