From 61171c5d6268246807b37c0bd2418b2c01c56f85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20de=20la=20Pe=C3=B1a?= Date: Wed, 24 Sep 2025 12:45:29 +0100 Subject: [PATCH 1/3] CNDB-15482: CNDB-15435: Only count live data size in bytes-based paging (#2010) Only count live data size when we are using bytes-sized limits. This prevents issues with paging when row purging or other unexpected transformation changes the size of the read rows. This fixes DBPE-16935 and DBPE-17751. Rebase notes: * nowInSec and minDeletionTime are not longs instead of ints (ref CASSANDRA-14227) --- .../cassandra/db/filter/DataLimits.java | 18 +- .../cassandra/db/rows/AbstractCell.java | 6 + .../apache/cassandra/db/rows/BTreeRow.java | 49 +- .../apache/cassandra/db/rows/ColumnData.java | 8 + .../cassandra/db/rows/ComplexColumnData.java | 6 + .../org/apache/cassandra/db/rows/Row.java | 10 +- .../index/sai/utils/CellWithSourceTable.java | 6 + .../index/sai/utils/RowWithSourceTable.java | 4 +- .../cql3/PagingAggregationQueryTest.java | 612 ++++++++++++++++-- 9 files changed, 620 insertions(+), 99 deletions(-) diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java index 6f2d457f3d1f..7bc519becfc8 100644 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@ -628,14 +628,14 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow) { rowsInCurrentPartition = 0; hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow); - staticRowBytes = hasLiveStaticRow && bytesLimit != NO_LIMIT ? staticRow.dataSizeBeforePurge() : 0; + staticRowBytes = hasLiveStaticRow && bytesLimit != NO_LIMIT ? staticRow.liveDataSize(nowInSec) : 0; } @Override public Row applyToRow(Row row) { if (isLive(row)) - incrementRowCount(bytesLimit != NO_LIMIT ? row.dataSizeBeforePurge() : 0); + incrementRowCount(bytesLimit != NO_LIMIT ? row.liveDataSize(nowInSec) : 0); return row; } @@ -650,9 +650,9 @@ public void onPartitionClose() super.onPartitionClose(); } - protected void incrementRowCount(int rowSize) + protected void incrementRowCount(int liveRowSize) { - bytesCounted += rowSize; + bytesCounted += liveRowSize; rowsCounted++; rowsInCurrentPartition++; if (bytesCounted >= bytesLimit || rowsCounted >= rowLimit) @@ -891,7 +891,7 @@ public DataLimits forShortReadRetry(int toFetch) @Override public float estimateTotalResults(ColumnFamilyStore cfs) { - // For the moment, we return the estimated number of rows as we have no good way of estimating + // For the moment, we return the estimated number of rows as we have no good way of estimating // the number of groups that will be returned. Hopefully, we should be able to fix // that problem at some point. return super.estimateTotalResults(cfs); @@ -1110,7 +1110,7 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow) } hasReturnedRowsFromCurrentPartition = false; hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow); - staticRowBytes = hasLiveStaticRow ? staticRow.dataSizeBeforePurge() : 0; + staticRowBytes = hasLiveStaticRow ? staticRow.liveDataSize(nowInSec) : 0; } currentPartitionKey = partitionKey; // If we are done we need to preserve the groupInCurrentPartition and rowsCountedInCurrentPartition @@ -1176,7 +1176,7 @@ public Row applyToRow(Row row) if (isLive(row)) { hasUnfinishedGroup = true; - incrementRowCount(bytesLimit != NO_LIMIT ? row.dataSizeBeforePurge() : 0); + incrementRowCount(bytesLimit != NO_LIMIT ? row.liveDataSize(nowInSec) : 0); hasReturnedRowsFromCurrentPartition = true; } @@ -1213,11 +1213,11 @@ public int rowsCountedInCurrentPartition() return rowsCountedInCurrentPartition; } - protected void incrementRowCount(int rowSize) + protected void incrementRowCount(int rowLiveSize) { rowsCountedInCurrentPartition++; rowsCounted++; - bytesCounted += rowSize; + bytesCounted += rowLiveSize; if (rowsCounted >= rowLimit || bytesCounted >= bytesLimit) stop(); } diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java index 79a52b8631d7..e4c306965d1e 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java @@ -127,6 +127,12 @@ public int dataSize() + (path == null ? 0 : path.dataSize()); } + @Override + public int liveDataSize(long nowInSec) + { + return isLive(nowInSec) ? dataSize() : 0; + } + public void digest(Digest digest) { if (isCounterCell()) diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index db6cb2d653a9..e5b38f7b93f2 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -90,17 +90,11 @@ public class BTreeRow extends AbstractRow // no expiring cells, this will be Cell.MAX_DELETION_TIME; private final long minLocalDeletionTime; - /** - * The original data size of this row before purging it, or -1 if it hasn't been purged. - */ - private final int dataSizeBeforePurge; - private BTreeRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree, - long minLocalDeletionTime, - int dataSizeBeforePurge) + long minLocalDeletionTime) { assert !deletion.isShadowedBy(primaryKeyLivenessInfo); this.clustering = clustering; @@ -108,16 +102,6 @@ private BTreeRow(Clustering clustering, this.deletion = deletion; this.btree = btree; this.minLocalDeletionTime = minLocalDeletionTime; - this.dataSizeBeforePurge = dataSizeBeforePurge; - } - - private BTreeRow(Clustering clustering, - LivenessInfo primaryKeyLivenessInfo, - Deletion deletion, - Object[] btree, - long minLocalDeletionTime) - { - this(clustering, primaryKeyLivenessInfo, deletion, btree, minLocalDeletionTime, -1); } private BTreeRow(Clustering clustering, Object[] btree, long minLocalDeletionTime) @@ -141,23 +125,13 @@ public static BTreeRow create(Clustering clustering, return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); } - public static BTreeRow create(Clustering clustering, - LivenessInfo primaryKeyLivenessInfo, - Deletion deletion, - Object[] btree, - long minDeletionTime, - int dataSizeBeforePurge) - { - return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime, dataSizeBeforePurge); - } - public static BTreeRow create(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree, long minDeletionTime) { - return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime, -1); + return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); } public static BTreeRow emptyRow(Clustering clustering) @@ -523,7 +497,7 @@ public Row purge(DeletionPurger purger, long nowInSec, boolean enforceStrictLive return null; Function columnDataPurger = (cd) -> cd.purge(purger, nowInSec); - return update(newInfo, newDeletion, BTree.transformAndFilter(btree, columnDataPurger), true); + return update(newInfo, newDeletion, BTree.transformAndFilter(btree, columnDataPurger)); } public Row purgeDataOlderThan(long timestamp, boolean enforceStrictLiveness) @@ -541,10 +515,10 @@ public Row purgeDataOlderThan(long timestamp, boolean enforceStrictLiveness) @Override public Row transformAndFilter(LivenessInfo info, Deletion deletion, Function function) { - return update(info, deletion, BTree.transformAndFilter(btree, function), false); + return update(info, deletion, BTree.transformAndFilter(btree, function)); } - private Row update(LivenessInfo info, Deletion deletion, Object[] newTree, boolean preserveDataSizeBeforePurge) + private Row update(LivenessInfo info, Deletion deletion, Object[] newTree) { if (btree == newTree && info == this.primaryKeyLivenessInfo && deletion == this.deletion) return this; @@ -554,8 +528,7 @@ private Row update(LivenessInfo info, Deletion deletion, Object[] newTree, boole long minDeletionTime = minDeletionTime(newTree, info, deletion.time()); - int dataSizeBeforePurge = preserveDataSizeBeforePurge ? dataSizeBeforePurge() : -1; - return BTreeRow.create(clustering, info, deletion, newTree, minDeletionTime, dataSizeBeforePurge); + return BTreeRow.create(clustering, info, deletion, newTree, minDeletionTime); } @Override @@ -566,7 +539,7 @@ public Row transformAndFilter(Function function) public Row transform(Function function) { - return update(primaryKeyLivenessInfo, deletion, BTree.transform(btree, function), false); + return update(primaryKeyLivenessInfo, deletion, BTree.transform(btree, function)); } @Override @@ -598,9 +571,13 @@ public long unsharedHeapSize() } @Override - public int dataSizeBeforePurge() + public int liveDataSize(long nowInSec) { - return dataSizeBeforePurge >= 0 ? dataSizeBeforePurge : dataSize(); + int dataSize = clustering.dataSize() + + primaryKeyLivenessInfo.dataSize() + + deletion.dataSize(); + + return Ints.checkedCast(accumulate((cd, v) -> v + cd.liveDataSize(nowInSec), dataSize)); } public long unsharedHeapSizeExcludingData() diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java index bc821c2166b7..a23cbaf60f35 100644 --- a/src/java/org/apache/cassandra/db/rows/ColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java @@ -245,6 +245,14 @@ protected ColumnData(ColumnMetadata column) */ public abstract int dataSize(); + /** + * The size of the data hold by this {@code ColumnData} that is live at {@code nowInSec}. + * + * @param nowInSec the query timestamp in seconds + * @return the size used by the live data of this {@code ColumnData}. + */ + public abstract int liveDataSize(long nowInSec); + public abstract long unsharedHeapSizeExcludingData(); public abstract long unsharedHeapSize(); diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java index 033dbf4919f2..2251b404e9a2 100644 --- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java @@ -149,6 +149,7 @@ public int dataSize() return size; } + @Override public long unsharedHeapSize() { long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells) + complexDeletion.unsharedHeapSize(); @@ -156,6 +157,11 @@ public long unsharedHeapSize() } @Override + public int liveDataSize(long nowInSec) + { + return complexDeletion.isLive() ? dataSize() : 0; + } + public long unsharedHeapSizeExcludingData() { long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells); diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 7ef6bd6286c1..f4d9f59f64b6 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -126,7 +126,7 @@ public interface Row extends Unfiltered, Iterable, IMeasurableMemory /** * Whether the row has some live information (i.e. it's not just deletion informations). - * + * * @param nowInSec the current time to decide what is deleted and what isn't * @param enforceStrictLiveness whether the row should be purged if there is no PK liveness info, * normally retrieved from {@link TableMetadata#enforceStrictLiveness()} @@ -326,12 +326,12 @@ public interface Row extends Unfiltered, Iterable, IMeasurableMemory public int dataSize(); /** - * Returns the original data size in bytes of this row as it was returned by {@link #dataSize()} before purging it - * from all deletion info with {@link #purge}. + * Returns the size of the data hold by this row that is live at {@code nowInSec}. * - * @return the original data size of this row in bytes before purging + * @param nowInSec the query timestamp in seconds + * @return the size of the data hold by this row that is live at {@code nowInSec}. */ - int dataSizeBeforePurge(); + int liveDataSize(long nowInSec); public long unsharedHeapSizeExcludingData(); diff --git a/src/java/org/apache/cassandra/index/sai/utils/CellWithSourceTable.java b/src/java/org/apache/cassandra/index/sai/utils/CellWithSourceTable.java index 1cc6689dbe07..041abded0496 100644 --- a/src/java/org/apache/cassandra/index/sai/utils/CellWithSourceTable.java +++ b/src/java/org/apache/cassandra/index/sai/utils/CellWithSourceTable.java @@ -147,6 +147,12 @@ public int dataSize() return cell.dataSize(); } + @Override + public int liveDataSize(long nowInSec) + { + return cell.liveDataSize(nowInSec); + } + @Override public long unsharedHeapSizeExcludingData() { diff --git a/src/java/org/apache/cassandra/index/sai/utils/RowWithSourceTable.java b/src/java/org/apache/cassandra/index/sai/utils/RowWithSourceTable.java index 2dd9707f64b1..cacebfb280d4 100644 --- a/src/java/org/apache/cassandra/index/sai/utils/RowWithSourceTable.java +++ b/src/java/org/apache/cassandra/index/sai/utils/RowWithSourceTable.java @@ -286,9 +286,9 @@ public int dataSize() } @Override - public int dataSizeBeforePurge() + public int liveDataSize(long nowInSec) { - return row.dataSizeBeforePurge(); + return row.liveDataSize(nowInSec); } @Override diff --git a/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java b/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java index 4de0041ca0b0..dfe9f5f7d470 100644 --- a/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java +++ b/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java @@ -23,19 +23,23 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.ByteBufferUtil; import org.assertj.core.api.Assertions; @RunWith(Parameterized.class) public class PagingAggregationQueryTest extends CQLTester { public static final int NUM_PARTITIONS = 100; + public static final int NUM_CLUSTERINGS = 7; @Parameterized.Parameters(name = "aggregation_sub_page_size={0} data_size={1} flush={2}") public static Collection generateParameters() @@ -101,9 +105,9 @@ public void testAggregationWithCellTomsbstones() createTable("CREATE TABLE %s (k int, c1 int, c2 int, v blob, PRIMARY KEY (k, c1, c2))"); - int ks = 13; - int c1s = 17; - int c2s = 19; + int ks = NUM_PARTITIONS; + int c1s = NUM_CLUSTERINGS / 2; + int c2s = NUM_CLUSTERINGS / 2; // insert some data for (int k = 0; k < ks; k++) @@ -117,15 +121,11 @@ public void testAggregationWithCellTomsbstones() } // test aggregation on single partition query - int numRows = execute("SELECT * FROM %s WHERE k=?", k).size(); - long count = execute("SELECT COUNT(*) FROM %s WHERE k=?", k).one().getLong("count"); - Assertions.assertThat(count).isEqualTo(numRows).isEqualTo(c1s * c2s); + assertPartitionCount(k, c1s * c2s); } // test aggregation on range query - int numRows = execute("SELECT * FROM %s").size(); - long count = execute("SELECT COUNT(*) FROM %s").one().getLong("count"); - Assertions.assertThat(count).isEqualTo(numRows).isEqualTo(ks * c1s * c2s); + assertRangeCount(ks * c1s * c2s); // test aggregation with group by Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k").size()).isEqualTo(ks); @@ -134,17 +134,15 @@ public void testAggregationWithCellTomsbstones() } @Test - public void testAggregationWithRowDeletions() + public void testAggregationWithPartialRowDeletions() { createTable("CREATE TABLE %s (k bigint, c int, v blob, PRIMARY KEY(k, c))"); - int numClusterings = 7; - // insert some clusterings, and flush for (long k = 1; k <= NUM_PARTITIONS; k++) { // insert some clusterings - for (int c = 1; c <= numClusterings; c++) + for (int c = 1; c <= NUM_CLUSTERINGS; c++) { execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, value); } @@ -155,34 +153,147 @@ public void testAggregationWithRowDeletions() for (long k = 1; k <= NUM_PARTITIONS; k++) { execute("DELETE FROM %s WHERE k = ? AND c = ?", k, 1); - execute("DELETE FROM %s WHERE k = ? AND c = ?", k, numClusterings); + execute("DELETE FROM %s WHERE k = ? AND c = ?", k, NUM_CLUSTERINGS); // test aggregation on single partition query - int numRows = execute("SELECT * FROM %s WHERE k=?", k).size(); - long count = execute("SELECT COUNT(*) FROM %s WHERE k=?", k).one().getLong("count"); - Assertions.assertThat(count) - .isEqualTo(numRows) - .isEqualTo(numClusterings - 2); + assertPartitionCount(k, NUM_CLUSTERINGS - 2); long maxK = execute("SELECT max(k) FROM %s WHERE k=?", k).one().getLong("system.max(k)"); Assertions.assertThat(maxK).isEqualTo(k); int maxC = execute("SELECT max(c) FROM %s WHERE k=?", k).one().getInt("system.max(c)"); - Assertions.assertThat(maxC).isEqualTo(numClusterings - 1); + Assertions.assertThat(maxC).isEqualTo(NUM_CLUSTERINGS - 1); } // test aggregation on range query - int selectRows = execute("SELECT * FROM %s").size(); - long selectCountRows = execute("SELECT COUNT(*) FROM %s").one().getLong("count"); - Assertions.assertThat(selectCountRows) - .isEqualTo(selectRows) - .isEqualTo(NUM_PARTITIONS * (numClusterings - 2)); + assertRangeCount(NUM_PARTITIONS * (NUM_CLUSTERINGS - 2)); long maxK = execute("SELECT max(k) FROM %s").one().getLong("system.max(k)"); Assertions.assertThat(maxK).isEqualTo(NUM_PARTITIONS); int maxC = execute("SELECT max(c) FROM %s").one().getInt("system.max(c)"); - Assertions.assertThat(maxC).isEqualTo(numClusterings - 1); + Assertions.assertThat(maxC).isEqualTo(NUM_CLUSTERINGS - 1); // test aggregation with group by Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k").size()).isEqualTo(NUM_PARTITIONS); - Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k, c").size()).isEqualTo(NUM_PARTITIONS * (numClusterings - 2)); + Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k, c").size()).isEqualTo(NUM_PARTITIONS * (NUM_CLUSTERINGS - 2)); + } + + @Test + public void testAggregationWithCompleteRowDeletions() + { + createTable("CREATE TABLE %s (k bigint, c int, v blob, PRIMARY KEY(k, c))"); + + // insert some clusterings, and flush + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + // insert some clusterings + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, value); + } + } + maybeFlush(); + + // for each partition, delete all the clusterings + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("DELETE FROM %s WHERE k = ? AND c = ?", k, c); + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS - c); + } + + Assertions.assertThat(execute("SELECT max(k) FROM %s WHERE k=?", k).one().getBytes("system.max(k)")).isNull(); + Assertions.assertThat(execute("SELECT max(k) FROM %s WHERE k=?", k).one().getBytes("system.max(c)")).isNull(); + } + + // test aggregation on range query + assertRangeCount(0); + Assertions.assertThat(execute("SELECT max(k) FROM %s").one().getBytes("system.max(k)")).isNull(); + Assertions.assertThat(execute("SELECT max(k) FROM %s").one().getBytes("system.max(c)")).isNull(); + + // test aggregation with group by + Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k").size()).isEqualTo(0); + Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k, c").size()).isEqualTo(0); + } + + @Test + public void testAggregationWithRangeRowDeletions() + { + createTable("CREATE TABLE %s (k bigint, c int, v blob, PRIMARY KEY(k, c))"); + + // insert some clusterings, and flush + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + // insert some clusterings + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, value); + } + } + maybeFlush(); + + // for each partition, delete the two first and two last clusterings + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + execute("DELETE FROM %s WHERE k = ? AND c <= ?", k, 2); + execute("DELETE FROM %s WHERE k = ? AND c >= ?", k, NUM_CLUSTERINGS - 1); + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS - 4); + long maxK = execute("SELECT max(k) FROM %s WHERE k=?", k).one().getLong("system.max(k)"); + Assertions.assertThat(maxK).isEqualTo(k); + int maxC = execute("SELECT max(c) FROM %s WHERE k=?", k).one().getInt("system.max(c)"); + Assertions.assertThat(maxC).isEqualTo(NUM_CLUSTERINGS - 2); + } + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * (NUM_CLUSTERINGS - 4)); + long maxK = execute("SELECT max(k) FROM %s").one().getLong("system.max(k)"); + Assertions.assertThat(maxK).isEqualTo(NUM_PARTITIONS); + int maxC = execute("SELECT max(c) FROM %s").one().getInt("system.max(c)"); + Assertions.assertThat(maxC).isEqualTo(NUM_CLUSTERINGS - 2); + + // test aggregation with group by + Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k").size()).isEqualTo(NUM_PARTITIONS); + Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k, c").size()).isEqualTo(NUM_PARTITIONS * (NUM_CLUSTERINGS - 4)); + } + + @Test + public void testAggregationWithRangeRowDeletionsComposite() + { + createTable("CREATE TABLE %s (k int, c1 int, c2 int, v blob, PRIMARY KEY(k, c1, c2))"); + + int c1s = 11; + int c2s = 17; + + // insert some rows, and flush + for (int k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c1 = 1; c1 <= c1s; c1++) + { + for (int c2 = 1; c2 <= c2s; c2++) + { + execute("INSERT INTO %s (k, c1, c2, v) VALUES (?, ?, ?, ?)", k, c1, c2, value); + } + } + } + maybeFlush(); + + // for each partition, delete the two first and two last clusterings + for (int k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c1 = 1; c1 <= c1s; c1++) + { + execute("DELETE FROM %s WHERE k = ? AND c1 = ? AND c2 <= ?", k, c1, 2); + execute("DELETE FROM %s WHERE k = ? AND c1 = ? AND c2 >= ?", k, c1, c2s - 1); + } + + // test aggregation on single partition query + assertPartitionCount(k, c1s * (c2s - 4)); + } + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * c1s * (c2s - 4)); } @Test @@ -210,21 +321,13 @@ public void testAggregationWithPartitionDeletionWithoutClustering() execute("INSERT INTO %s (k) VALUES (?)", k); // test aggregation on single partition query - int numRows = execute("SELECT * FROM %s WHERE k=?", k).size(); - long count = execute("SELECT COUNT(*) FROM %s WHERE k=?", k).one().getLong("count"); - Assertions.assertThat(count) - .isEqualTo(numRows) - .isEqualTo(1); + assertPartitionCount(k, 1); long max = execute("SELECT max(k) FROM %s WHERE k=?", k).one().getLong("system.max(k)"); Assertions.assertThat(max).isEqualTo(k); } // test aggregation on range query - int selectRows = execute("SELECT * FROM %s").size(); - long selectCountRows = execute("SELECT COUNT(*) FROM %s").one().getLong("count"); - Assertions.assertThat(selectCountRows) - .isEqualTo(selectRows) - .isEqualTo(NUM_PARTITIONS); + assertRangeCount(NUM_PARTITIONS); long max = execute("SELECT max(k) FROM %s").one().getLong("system.max(k)"); Assertions.assertThat(max).isEqualTo(NUM_PARTITIONS); } @@ -263,25 +366,440 @@ public void testAggregationWithPartitionDeletionWithClustering() } // test aggregation on single partition query - int numRows = execute("SELECT * FROM %s WHERE k=?", k).size(); - long count = execute("SELECT COUNT(*) FROM %s WHERE k=?", k).one().getLong("count"); - Assertions.assertThat(count) - .isEqualTo(numRows) - .isEqualTo(numClusteringsAfterDeletion); + assertPartitionCount(k, numClusteringsAfterDeletion); long max = execute("SELECT max(k) FROM %s WHERE k=?", k).one().getLong("system.max(k)"); Assertions.assertThat(max).isEqualTo(k); } // test aggregation on range query - int selectRows = execute("SELECT * FROM %s").size(); - long selectCountRows = execute("SELECT COUNT(*) FROM %s").one().getLong("count"); - Assertions.assertThat(selectCountRows) - .isEqualTo(selectRows) - .isEqualTo(NUM_PARTITIONS * numClusteringsAfterDeletion); + assertRangeCount(NUM_PARTITIONS * numClusteringsAfterDeletion); long max = execute("SELECT max(k) FROM %s").one().getLong("system.max(k)"); Assertions.assertThat(max).isEqualTo(NUM_PARTITIONS); } + @Test + public void testAggregationWithLists() + { + // we are only interested in the not NULL value case + Assume.assumeTrue(value != null); + + createTable("CREATE TABLE %s (k bigint, c int, v list, PRIMARY KEY(k, c))"); + + // insert some clusterings, and flush + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + // insert some clusterings + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, list(value, value, value, value, value)); + } + } + maybeFlush(); + + // for each row, delete some list elements + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("DELETE v[0] FROM %s WHERE k = ? AND c = ?", k, c); + execute("DELETE v[1] FROM %s WHERE k = ? AND c = ?", k, c); + execute("DELETE v[2] FROM %s WHERE k = ? AND c = ?", k, c); + } + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + + // for each row, add some list elements + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("UPDATE %s SET v = v + ? WHERE k = ? AND c = ?", list(value, value), k, c); + } + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + + // for each row, drop the entire list + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, null); + } + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + } + + @Test + public void testAggregationWithSets() + { + // we are only interested in the not NULL value case + Assume.assumeTrue(value != null); + + createTable("CREATE TABLE %s (k bigint, c int, v set, PRIMARY KEY(k, c))"); + + String v1 = ByteBufferUtil.toDebugHexString(value) + "_1"; + String v2 = ByteBufferUtil.toDebugHexString(value) + "_2"; + String v3 = ByteBufferUtil.toDebugHexString(value) + "_3"; + String v4 = ByteBufferUtil.toDebugHexString(value) + "_4"; + String v5 = ByteBufferUtil.toDebugHexString(value) + "_5"; + + // insert some clusterings, and flush + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + // insert some clusterings + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, set(v1, v2, v3, v4)); + } + } + maybeFlush(); + + // for each row, delete some set elements + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("UPDATE %s SET v = v - { '" + v1 + "' } WHERE k = ? AND c = ?", k, c); + execute("UPDATE %s SET v = v - ? WHERE k = ? AND c = ?", set(v3, v5), k, c); + } + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + + // for each row, add some set elements + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("UPDATE %s SET v = v + ? WHERE k = ? AND c = ?", set(v1, v3), k, c); + } + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + + // for each row, drop the entire set + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, null); + } + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + } + + @Test + public void testAggregationWithMaps() + { + // we are only interested in the not NULL value case + Assume.assumeTrue(value != null); + + createTable("CREATE TABLE %s (k bigint, c int, v map, PRIMARY KEY(k, c))"); + + String stringValue = ByteBufferUtil.toDebugHexString(value); + String k1 = stringValue + "_k_1"; + String k2 = stringValue + "_k_2"; + String k3 = stringValue + "_k_3"; + String k4 = stringValue + "_k_4"; + String k5 = stringValue + "_k_5"; + String v1 = stringValue + "_v_1"; + String v2 = stringValue + "_v_2"; + String v3 = stringValue + "_v_3"; + String v4 = stringValue + "_v_4"; + String v5 = stringValue + "_v_5"; + + // insert some clusterings, and flush + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + // insert some clusterings + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, + map(k1, v1, k2, v2, k3, v3, k4, v4, k5, v5)); + } + } + maybeFlush(); + + // for each row, delete some map elements + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("UPDATE %s SET v = v - { '" + k1 + "' } WHERE k = ? AND c = ?", k, c); + execute("UPDATE %s SET v = v - ? WHERE k = ? AND c = ?", set(k3, k5), k, c); + } + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + + // for each row, add some map elements + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("UPDATE %s SET v = v + ? WHERE k = ? AND c = ?", map(k1, v1, k3, v3), k, c); + } + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + + // for each row, drop the entire map + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, null); + } + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + } + + @Test + public void testAggregationWithUDTs() + { + // we are only interested in the not NULL value case + Assume.assumeTrue(value != null); + + String type = createType("CREATE TYPE %s (x blob, y blob)"); + createTable("CREATE TABLE %s (k bigint, c int, v " + type + ", PRIMARY KEY(k, c))"); + + // insert some rows, and flush + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + // insert some clusterings + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, tuple(value, value)); + } + } + maybeFlush(); + + // for each row, delete a tuple element + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("UPDATE %s SET v.x = null WHERE k = ? AND c = ?", k, c); + } + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + + // for each row, update a tuple element + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("UPDATE %s SET v.x = ? WHERE k = ? AND c = ?", value, k, c); + } + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + + // for each row, delete the tuple + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("DELETE v FROM %s WHERE k = ? AND c = ?", k, c); + } + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + } + + @Test + public void testTTLsWithSkinnyTable() + { + // we are only interested in the not NULL value case + Assume.assumeTrue(value != null); + + createTable("CREATE TABLE %s (k int PRIMARY KEY, c int, v text)"); + + String stringValue = ByteBufferUtil.toDebugHexString(value); + String v1 = stringValue + "_1"; + String v2 = stringValue + "_2"; + + // insert some rows, and flush + for (int k = 1; k <= NUM_PARTITIONS; k++) + { + execute("INSERT INTO %s (k, v) VALUES (?, ?)", k, v1); + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS); + + // Give a TTL the two first and two last partitions + execute("UPDATE %s USING TTL 1 SET v = ? WHERE k = ? ", v2, 1); + execute("UPDATE %s USING TTL 1 SET v = ? WHERE k = ? ", v2, NUM_PARTITIONS - 1); + maybeFlush(); + + // wait for the TTL to expire + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS); + } + + @Test + public void testTTLsWithWideTable() + { + // we are only interested in the not NULL value case + Assume.assumeTrue(value != null); + + createTable("CREATE TABLE %s (k int, c int, v text, PRIMARY KEY(k, c))"); + + String stringValue = ByteBufferUtil.toDebugHexString(value); + String v1 = stringValue + "_1"; + String v2 = stringValue + "_2"; + + // insert some rows, and flush + for (int k = 1; k <= NUM_PARTITIONS; k++) + { + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, v1); + } + } + maybeFlush(); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + + // for each partition, give a TTL the two first and two last clusterings + for (int k = 1; k <= NUM_PARTITIONS; k++) + { + execute("UPDATE %s USING TTL 1 SET v = ? WHERE k = ? AND c = ?", v2, k, 1); + execute("UPDATE %s USING TTL 1 SET v = ? WHERE k = ? AND c = ?", v2, k, NUM_CLUSTERINGS - 1); + } + maybeFlush(); + + // wait for the TTL to expire + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + } + + @Test + public void testAggregationWithPurgeableTombstones() + { + // create the table with a very short gc_grace_seconds, so there are tombstones to be purged on the replicas + createTable("CREATE TABLE %s (k bigint, c int, v blob, PRIMARY KEY(k, c)) WITH gc_grace_seconds = 0"); + + // insert some clusterings, and flush + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + // insert some clusterings + for (int c = 1; c <= NUM_CLUSTERINGS; c++) + { + execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?) USING TIMESTAMP 1", k, c, value); + } + } + maybeFlush(); + + // for each partition, update the first and last clustering + for (long k = 1; k <= NUM_PARTITIONS; k++) + { + execute("DELETE v FROM %s USING TIMESTAMP 2 WHERE k = ? AND c = ?", k, 1); + execute("DELETE v FROM %s USING TIMESTAMP 2 WHERE k = ? AND c = ?", k, NUM_CLUSTERINGS); + + // test aggregation on single partition query + assertPartitionCount(k, NUM_CLUSTERINGS); + } + maybeFlush(); + + // wait for the tombstones to be purgeable + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + // test aggregation on range query + assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS); + } + + private void assertPartitionCount(Object k, int expectedCount) + { + assertCount("SELECT * FROM %s WHERE k=?", "SELECT COUNT(*) FROM %s WHERE k=?", expectedCount, k); + } + + private void assertRangeCount(int expectedCount) + { + assertCount("SELECT * FROM %s", "SELECT COUNT(*) FROM %s", expectedCount); + } + + private void assertCount(String selectQuery, String countQuery, int expectedCount, Object... args) + { + int selectRows = execute(selectQuery, args).size(); + long selectCountRows = execute(countQuery, args).one().getLong("count"); + Assertions.assertThat(selectRows).isEqualTo(expectedCount); // both are consistent + Assertions.assertThat(selectRows).isEqualTo(selectCountRows); // both are correct + } + private void maybeFlush() { if (flush) From 88f2d4a20fce6e4abdb26d4253a31de88b1f80d3 Mon Sep 17 00:00:00 2001 From: mck Date: Fri, 26 Sep 2025 22:37:10 +0200 Subject: [PATCH 2/3] =?UTF-8?q?SQUASH=20=E2=80=93=C2=A0=20PagingAggregatio?= =?UTF-8?q?nQueryTest=20needs=20CoordinatorWarnings.init()?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/cassandra/cql3/PagingAggregationQueryTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java b/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java index dfe9f5f7d470..5b55575c36fa 100644 --- a/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java +++ b/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java @@ -27,11 +27,13 @@ import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Assume; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings; import org.apache.cassandra.utils.ByteBufferUtil; import org.assertj.core.api.Assertions; @@ -90,6 +92,12 @@ public PagingAggregationQueryTest(PageSize subPageSize, DataSize dataSize, boole enableCoordinatorExecution(); } + @BeforeClass + public static void setup() + { + CoordinatorWarnings.init(); + } + /** * DSP-22813, DBPE-16245, DBPE-16378 and CNDB-13978: Test that count(*) aggregation queries return the correct * number of rows, even if there are tombstones and paging is required. From c62bc37ba1d276ffaefd76c900e1bce249cc9c5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20de=20la=20Pe=C3=B1a?= Date: Thu, 2 Oct 2025 12:05:29 +0100 Subject: [PATCH 3/3] =?UTF-8?q?SQUASH=20=E2=80=93=C2=A0=20Disable=20respon?= =?UTF-8?q?se=20size=20thresholds?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cassandra/cql3/PagingAggregationQueryTest.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java b/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java index 5b55575c36fa..726255e57ad8 100644 --- a/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java +++ b/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java @@ -27,13 +27,11 @@ import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Assume; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings; import org.apache.cassandra.utils.ByteBufferUtil; import org.assertj.core.api.Assertions; @@ -90,12 +88,12 @@ public PagingAggregationQueryTest(PageSize subPageSize, DataSize dataSize, boole this.value = dataSize.value(); this.flush = flush; enableCoordinatorExecution(); - } - @BeforeClass - public static void setup() - { - CoordinatorWarnings.init(); + // disable read size thresholds, since we are testing with abnormally large responses + DatabaseDescriptor.setLocalReadSizeWarnThreshold(null); + DatabaseDescriptor.setLocalReadSizeFailThreshold(null); + DatabaseDescriptor.setCoordinatorReadSizeWarnThreshold(null); + DatabaseDescriptor.setCoordinatorReadSizeFailThreshold(null); } /** @@ -813,4 +811,4 @@ private void maybeFlush() if (flush) flush(); } -} \ No newline at end of file +}