From fda5d1a9a89ee7b4b2cdc415fc6b9c6c7dadff48 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Andr=C3=A9s=20de=20la=20Pe=C3=B1a?=
Date: Thu, 25 Sep 2025 14:50:02 +0100
Subject: [PATCH] CNDB-15435: Only count live data size in bytes-based paging
Only count live data size when we are using bytes-sized limits. This
prevents issues with paging when row purging or other unexpected
transformation changes the size of the read rows.
This fixes DBPE-16935 and DBPE-17751.
---
.../cassandra/db/filter/DataLimits.java | 18 +-
.../cassandra/db/rows/AbstractCell.java | 6 +
.../apache/cassandra/db/rows/BTreeRow.java | 40 +-
.../apache/cassandra/db/rows/ColumnData.java | 8 +
.../cassandra/db/rows/ComplexColumnData.java | 9 +-
.../org/apache/cassandra/db/rows/Row.java | 10 +-
.../index/sai/utils/CellWithSourceTable.java | 6 +
.../index/sai/utils/RowWithSourceTable.java | 4 +-
.../cql3/PagingAggregationQueryTest.java | 806 ++++++++++++++++++
9 files changed, 857 insertions(+), 50 deletions(-)
create mode 100644 test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index e395b283627c..30e1b9af17e0 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -625,14 +625,14 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
rowsInCurrentPartition = 0;
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
- staticRowBytes = hasLiveStaticRow && bytesLimit != NO_LIMIT ? staticRow.originalDataSize() : 0;
+ staticRowBytes = hasLiveStaticRow && bytesLimit != NO_LIMIT ? staticRow.liveDataSize(nowInSec) : 0;
}
@Override
public Row applyToRow(Row row)
{
if (isLive(row))
- incrementRowCount(bytesLimit != NO_LIMIT ? row.originalDataSize() : 0);
+ incrementRowCount(bytesLimit != NO_LIMIT ? row.liveDataSize(nowInSec) : 0);
return row;
}
@@ -647,9 +647,9 @@ public void onPartitionClose()
super.onPartitionClose();
}
- protected void incrementRowCount(int rowSize)
+ protected void incrementRowCount(int rowLiveSize)
{
- bytesCounted += rowSize;
+ bytesCounted += rowLiveSize;
rowsCounted++;
rowsInCurrentPartition++;
if (bytesCounted >= bytesLimit || rowsCounted >= rowLimit)
@@ -888,7 +888,7 @@ public DataLimits forShortReadRetry(int toFetch)
@Override
public float estimateTotalResults(ColumnFamilyStore cfs)
{
- // For the moment, we return the estimated number of rows as we have no good way of estimating
+ // For the moment, we return the estimated number of rows as we have no good way of estimating
// the number of groups that will be returned. Hopefully, we should be able to fix
// that problem at some point.
return super.estimateTotalResults(cfs);
@@ -1107,7 +1107,7 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
}
hasReturnedRowsFromCurrentPartition = false;
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
- staticRowBytes = hasLiveStaticRow ? staticRow.originalDataSize() : 0;
+ staticRowBytes = hasLiveStaticRow ? staticRow.liveDataSize(nowInSec) : 0;
}
currentPartitionKey = partitionKey;
// If we are done we need to preserve the groupInCurrentPartition and rowsCountedInCurrentPartition
@@ -1173,7 +1173,7 @@ public Row applyToRow(Row row)
if (isLive(row))
{
hasUnfinishedGroup = true;
- incrementRowCount(bytesLimit != NO_LIMIT ? row.originalDataSize() : 0);
+ incrementRowCount(bytesLimit != NO_LIMIT ? row.liveDataSize(nowInSec) : 0);
hasReturnedRowsFromCurrentPartition = true;
}
@@ -1210,11 +1210,11 @@ public int rowsCountedInCurrentPartition()
return rowsCountedInCurrentPartition;
}
- protected void incrementRowCount(int rowSize)
+ protected void incrementRowCount(int rowLiveSize)
{
rowsCountedInCurrentPartition++;
rowsCounted++;
- bytesCounted += rowSize;
+ bytesCounted += rowLiveSize;
if (rowsCounted >= rowLimit || bytesCounted >= bytesLimit)
stop();
}
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index bc70bfe92838..2d797de62fe1 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -121,6 +121,12 @@ public int dataSize()
+ (path == null ? 0 : path.dataSize());
}
+ @Override
+ public int liveDataSize(int nowInSec)
+ {
+ return isLive(nowInSec) ? dataSize() : 0;
+ }
+
public void digest(Digest digest)
{
if (isCounterCell())
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 836fcc93f355..873360efa888 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -88,17 +88,11 @@ public class BTreeRow extends AbstractRow
// no expiring cells, this will be Integer.MAX_VALUE;
private final int minLocalDeletionTime;
- /**
- * The original data size of this row before purging it, or -1 if it hasn't been purged.
- */
- private final int originalDataSize;
-
private BTreeRow(Clustering> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
- int minLocalDeletionTime,
- int originalDataSize)
+ int minLocalDeletionTime)
{
assert !deletion.isShadowedBy(primaryKeyLivenessInfo);
this.clustering = clustering;
@@ -106,16 +100,6 @@ private BTreeRow(Clustering> clustering,
this.deletion = deletion;
this.btree = btree;
this.minLocalDeletionTime = minLocalDeletionTime;
- this.originalDataSize = originalDataSize;
- }
-
- private BTreeRow(Clustering> clustering,
- LivenessInfo primaryKeyLivenessInfo,
- Deletion deletion,
- Object[] btree,
- int minLocalDeletionTime)
- {
- this(clustering, primaryKeyLivenessInfo, deletion, btree, minLocalDeletionTime, -1);
}
private BTreeRow(Clustering> clustering, Object[] btree, int minLocalDeletionTime)
@@ -139,23 +123,13 @@ public static BTreeRow create(Clustering> clustering,
return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
}
- public static BTreeRow create(Clustering> clustering,
- LivenessInfo primaryKeyLivenessInfo,
- Deletion deletion,
- Object[] btree,
- int minDeletionTime,
- int originalDataSize)
- {
- return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime, originalDataSize);
- }
-
public static BTreeRow create(Clustering> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
int minDeletionTime)
{
- return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime, -1);
+ return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
}
public static BTreeRow emptyRow(Clustering> clustering)
@@ -538,7 +512,7 @@ private Row update(LivenessInfo info, Deletion deletion, Object[] newTree)
return null;
int minDeletionTime = minDeletionTime(newTree, info, deletion.time());
- return BTreeRow.create(clustering, info, deletion, newTree, minDeletionTime, originalDataSize());
+ return BTreeRow.create(clustering, info, deletion, newTree, minDeletionTime);
}
@Override
@@ -569,9 +543,13 @@ public int dataSize()
}
@Override
- public int originalDataSize()
+ public int liveDataSize(int nowInSec)
{
- return originalDataSize >= 0 ? originalDataSize : dataSize();
+ int dataSize = clustering.dataSize()
+ + primaryKeyLivenessInfo.dataSize()
+ + deletion.dataSize();
+
+ return Ints.checkedCast(accumulate((cd, v) -> v + cd.liveDataSize(nowInSec), dataSize));
}
public long unsharedHeapSizeExcludingData()
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index 9edf48035ce9..248fdcf44d3d 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -244,6 +244,14 @@ protected ColumnData(ColumnMetadata column)
*/
public abstract int dataSize();
+ /**
+ * The size of the data hold by this {@code ColumnData} that is live at {@code nowInSec}.
+ *
+ * @param nowInSec the query timestamp in seconds
+ * @return the size used by the live data of this {@code ColumnData}.
+ */
+ public abstract int liveDataSize(int nowInSec);
+
public abstract long unsharedHeapSizeExcludingData();
/**
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index e445a716d3b0..a166a40da807 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -28,11 +28,8 @@
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Digest;
import org.apache.cassandra.db.LivenessInfo;
-import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ByteType;
-import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.DroppedColumn;
@@ -146,6 +143,12 @@ public int dataSize()
return size;
}
+ @Override
+ public int liveDataSize(int nowInSec)
+ {
+ return complexDeletion.isLive() ? dataSize() : 0;
+ }
+
public long unsharedHeapSizeExcludingData()
{
long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells);
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 319dd0db1280..7d4d58c50985 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -115,7 +115,7 @@ public interface Row extends Unfiltered, Iterable
/**
* Whether the row has some live information (i.e. it's not just deletion informations).
- *
+ *
* @param nowInSec the current time to decide what is deleted and what isn't
* @param enforceStrictLiveness whether the row should be purged if there is no PK liveness info,
* normally retrieved from {@link CFMetaData#enforceStrictLiveness()}
@@ -310,12 +310,12 @@ public interface Row extends Unfiltered, Iterable
public int dataSize();
/**
- * Returns the original data size in bytes of this row as it was returned by {@link #dataSize()} before purging it
- * from all deletion info with {@link #purge}.
+ * Returns the size of the data hold by this row that is live at {@code nowInSec}.
*
- * @return the original data size of this row in bytes before purging
+ * @param nowInSec the query timestamp in seconds
+ * @return the size of the data hold by this row that is live at {@code nowInSec}.
*/
- int originalDataSize();
+ int liveDataSize(int nowInSec);
public long unsharedHeapSizeExcludingData();
diff --git a/src/java/org/apache/cassandra/index/sai/utils/CellWithSourceTable.java b/src/java/org/apache/cassandra/index/sai/utils/CellWithSourceTable.java
index eb1cafda49fd..4150bc96579e 100644
--- a/src/java/org/apache/cassandra/index/sai/utils/CellWithSourceTable.java
+++ b/src/java/org/apache/cassandra/index/sai/utils/CellWithSourceTable.java
@@ -147,6 +147,12 @@ public int dataSize()
return cell.dataSize();
}
+ @Override
+ public int liveDataSize(int nowInSec)
+ {
+ return cell.liveDataSize(nowInSec);
+ }
+
@Override
public long unsharedHeapSizeExcludingData()
{
diff --git a/src/java/org/apache/cassandra/index/sai/utils/RowWithSourceTable.java b/src/java/org/apache/cassandra/index/sai/utils/RowWithSourceTable.java
index c5051ba42be5..797a4ca374b8 100644
--- a/src/java/org/apache/cassandra/index/sai/utils/RowWithSourceTable.java
+++ b/src/java/org/apache/cassandra/index/sai/utils/RowWithSourceTable.java
@@ -280,9 +280,9 @@ public int dataSize()
}
@Override
- public int originalDataSize()
+ public int liveDataSize(int nowInSec)
{
- return row.originalDataSize();
+ return row.liveDataSize(nowInSec);
}
@Override
diff --git a/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java b/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java
new file mode 100644
index 000000000000..b57f2a0581ae
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/PagingAggregationQueryTest.java
@@ -0,0 +1,806 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.assertj.core.api.Assertions;
+
+@RunWith(Parameterized.class)
+public class PagingAggregationQueryTest extends CQLTester
+{
+ public static final int NUM_PARTITIONS = 100;
+ public static final int NUM_CLUSTERINGS = 7;
+
+ @Parameterized.Parameters(name = "aggregation_sub_page_size={0} data_size={1} flush={2}")
+ public static Collection
+ * Before the DSP-22813/DBPE-16245/DBPE-16378/CNDB-13978 fix these queries would stop counting after hitting the
+ * {@code aggregation_sub_page_size} page size, returning only the count of a single page.
+ */
+ @Test
+ public void testAggregationWithCellTomsbstones()
+ {
+ // we are only interested in the NULL value case, which produces cell tombstones
+ Assume.assumeTrue(value == null && !flush);
+
+ createTable("CREATE TABLE %s (k int, c1 int, c2 int, v blob, PRIMARY KEY (k, c1, c2))");
+
+ int ks = NUM_PARTITIONS;
+ int c1s = NUM_CLUSTERINGS / 2;
+ int c2s = NUM_CLUSTERINGS / 2;
+
+ // insert some data
+ for (int k = 0; k < ks; k++)
+ {
+ for (int c1 = 0; c1 < c1s; c1++)
+ {
+ for (int c2 = 0; c2 < c2s; c2++)
+ {
+ execute("INSERT INTO %s (k, c1, c2, v) VALUES (?, ?, ?, null)", k, c1, c2);
+ }
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, c1s * c2s);
+ }
+
+ // test aggregation on range query
+ assertRangeCount(ks * c1s * c2s);
+
+ // test aggregation with group by
+ Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k").size()).isEqualTo(ks);
+ Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k, c1").size()).isEqualTo(ks * c1s);
+ Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k, c1, c2").size()).isEqualTo(ks * c1s * c2s);
+ }
+
+ @Test
+ public void testAggregationWithPartialRowDeletions()
+ {
+ createTable("CREATE TABLE %s (k bigint, c int, v blob, PRIMARY KEY(k, c))");
+
+ // insert some clusterings, and flush
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ // insert some clusterings
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, value);
+ }
+ }
+ maybeFlush();
+
+ // for each partition, delete the first and last clustering
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ execute("DELETE FROM %s WHERE k = ? AND c = ?", k, 1);
+ execute("DELETE FROM %s WHERE k = ? AND c = ?", k, NUM_CLUSTERINGS);
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS - 2);
+ long maxK = execute("SELECT max(k) FROM %s WHERE k=?", k).one().getLong("system.max(k)");
+ Assertions.assertThat(maxK).isEqualTo(k);
+ int maxC = execute("SELECT max(c) FROM %s WHERE k=?", k).one().getInt("system.max(c)");
+ Assertions.assertThat(maxC).isEqualTo(NUM_CLUSTERINGS - 1);
+ }
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * (NUM_CLUSTERINGS - 2));
+ long maxK = execute("SELECT max(k) FROM %s").one().getLong("system.max(k)");
+ Assertions.assertThat(maxK).isEqualTo(NUM_PARTITIONS);
+ int maxC = execute("SELECT max(c) FROM %s").one().getInt("system.max(c)");
+ Assertions.assertThat(maxC).isEqualTo(NUM_CLUSTERINGS - 1);
+
+ // test aggregation with group by
+ Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k").size()).isEqualTo(NUM_PARTITIONS);
+ Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k, c").size()).isEqualTo(NUM_PARTITIONS * (NUM_CLUSTERINGS - 2));
+ }
+
+ @Test
+ public void testAggregationWithCompleteRowDeletions()
+ {
+ createTable("CREATE TABLE %s (k bigint, c int, v blob, PRIMARY KEY(k, c))");
+
+ // insert some clusterings, and flush
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ // insert some clusterings
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, value);
+ }
+ }
+ maybeFlush();
+
+ // for each partition, delete all the clusterings
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("DELETE FROM %s WHERE k = ? AND c = ?", k, c);
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS - c);
+ }
+
+ Assertions.assertThat(execute("SELECT max(k) FROM %s WHERE k=?", k).one().getBytes("system.max(k)")).isNull();
+ Assertions.assertThat(execute("SELECT max(k) FROM %s WHERE k=?", k).one().getBytes("system.max(c)")).isNull();
+ }
+
+ // test aggregation on range query
+ assertRangeCount(0);
+ Assertions.assertThat(execute("SELECT max(k) FROM %s").one().getBytes("system.max(k)")).isNull();
+ Assertions.assertThat(execute("SELECT max(k) FROM %s").one().getBytes("system.max(c)")).isNull();
+
+ // test aggregation with group by
+ Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k").size()).isEqualTo(0);
+ Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k, c").size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testAggregationWithRangeRowDeletions()
+ {
+ createTable("CREATE TABLE %s (k bigint, c int, v blob, PRIMARY KEY(k, c))");
+
+ // insert some clusterings, and flush
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ // insert some clusterings
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, value);
+ }
+ }
+ maybeFlush();
+
+ // for each partition, delete the two first and two last clusterings
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ execute("DELETE FROM %s WHERE k = ? AND c <= ?", k, 2);
+ execute("DELETE FROM %s WHERE k = ? AND c >= ?", k, NUM_CLUSTERINGS - 1);
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS - 4);
+ long maxK = execute("SELECT max(k) FROM %s WHERE k=?", k).one().getLong("system.max(k)");
+ Assertions.assertThat(maxK).isEqualTo(k);
+ int maxC = execute("SELECT max(c) FROM %s WHERE k=?", k).one().getInt("system.max(c)");
+ Assertions.assertThat(maxC).isEqualTo(NUM_CLUSTERINGS - 2);
+ }
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * (NUM_CLUSTERINGS - 4));
+ long maxK = execute("SELECT max(k) FROM %s").one().getLong("system.max(k)");
+ Assertions.assertThat(maxK).isEqualTo(NUM_PARTITIONS);
+ int maxC = execute("SELECT max(c) FROM %s").one().getInt("system.max(c)");
+ Assertions.assertThat(maxC).isEqualTo(NUM_CLUSTERINGS - 2);
+
+ // test aggregation with group by
+ Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k").size()).isEqualTo(NUM_PARTITIONS);
+ Assertions.assertThat(execute("SELECT COUNT(*) FROM %s GROUP BY k, c").size()).isEqualTo(NUM_PARTITIONS * (NUM_CLUSTERINGS - 4));
+ }
+
+ @Test
+ public void testAggregationWithRangeRowDeletionsComposite()
+ {
+ createTable("CREATE TABLE %s (k int, c1 int, c2 int, v blob, PRIMARY KEY(k, c1, c2))");
+
+ int c1s = 11;
+ int c2s = 17;
+
+ // insert some rows, and flush
+ for (int k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c1 = 1; c1 <= c1s; c1++)
+ {
+ for (int c2 = 1; c2 <= c2s; c2++)
+ {
+ execute("INSERT INTO %s (k, c1, c2, v) VALUES (?, ?, ?, ?)", k, c1, c2, value);
+ }
+ }
+ }
+ maybeFlush();
+
+ // for each partition, delete the two first and two last clusterings
+ for (int k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c1 = 1; c1 <= c1s; c1++)
+ {
+ execute("DELETE FROM %s WHERE k = ? AND c1 = ? AND c2 <= ?", k, c1, 2);
+ execute("DELETE FROM %s WHERE k = ? AND c1 = ? AND c2 >= ?", k, c1, c2s - 1);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, c1s * (c2s - 4));
+ }
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * c1s * (c2s - 4));
+ }
+
+ @Test
+ public void testAggregationWithPartitionDeletionWithoutClustering()
+ {
+ createTable("CREATE TABLE %s (k bigint PRIMARY KEY, v blob)");
+
+ // insert some partitions, and flush
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ execute("INSERT INTO %s (k, v) VALUES (?, ?)", k, value);
+ }
+ maybeFlush();
+
+ // delete all the partitions, and flush
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ execute("DELETE FROM %s WHERE k = ?", k);
+ }
+ maybeFlush();
+
+ // re-insert part of the data, using the same partitions
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ execute("INSERT INTO %s (k) VALUES (?)", k);
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, 1);
+ long max = execute("SELECT max(k) FROM %s WHERE k=?", k).one().getLong("system.max(k)");
+ Assertions.assertThat(max).isEqualTo(k);
+ }
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS);
+ long max = execute("SELECT max(k) FROM %s").one().getLong("system.max(k)");
+ Assertions.assertThat(max).isEqualTo(NUM_PARTITIONS);
+ }
+
+ @Test
+ public void testAggregationWithPartitionDeletionWithClustering()
+ {
+ createTable("CREATE TABLE %s (k bigint, c int, v blob, PRIMARY KEY(k, c))");
+
+ int numClusteringsBeforeDeletion = 10;
+ int numClusteringsAfterDeletion = 7;
+
+ // insert some partitions, and flush
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= numClusteringsBeforeDeletion; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, value);
+ }
+ }
+ maybeFlush();
+
+ // delete all the partitions, and flush
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ execute("DELETE FROM %s WHERE k = ?", k);
+ }
+ maybeFlush();
+
+ // re-insert part of the data, using the same partitions
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= numClusteringsAfterDeletion; c++)
+ {
+ execute("INSERT INTO %s (k, c) VALUES (?, ?)", k, c);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, numClusteringsAfterDeletion);
+ long max = execute("SELECT max(k) FROM %s WHERE k=?", k).one().getLong("system.max(k)");
+ Assertions.assertThat(max).isEqualTo(k);
+ }
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * numClusteringsAfterDeletion);
+ long max = execute("SELECT max(k) FROM %s").one().getLong("system.max(k)");
+ Assertions.assertThat(max).isEqualTo(NUM_PARTITIONS);
+ }
+
+ @Test
+ public void testAggregationWithLists()
+ {
+ // we are only interested in the not NULL value case
+ Assume.assumeTrue(value != null);
+
+ createTable("CREATE TABLE %s (k bigint, c int, v list, PRIMARY KEY(k, c))");
+
+ // insert some clusterings, and flush
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ // insert some clusterings
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, list(value, value, value, value, value));
+ }
+ }
+ maybeFlush();
+
+ // for each row, delete some list elements
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("DELETE v[0] FROM %s WHERE k = ? AND c = ?", k, c);
+ execute("DELETE v[1] FROM %s WHERE k = ? AND c = ?", k, c);
+ execute("DELETE v[2] FROM %s WHERE k = ? AND c = ?", k, c);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+
+ // for each row, add some list elements
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("UPDATE %s SET v = v + ? WHERE k = ? AND c = ?", list(value, value), k, c);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+
+ // for each row, drop the entire list
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, null);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+ }
+
+ @Test
+ public void testAggregationWithSets()
+ {
+ // we are only interested in the not NULL value case
+ Assume.assumeTrue(value != null);
+
+ createTable("CREATE TABLE %s (k bigint, c int, v set, PRIMARY KEY(k, c))");
+
+ String v1 = ByteBufferUtil.toDebugHexString(value) + "_1";
+ String v2 = ByteBufferUtil.toDebugHexString(value) + "_2";
+ String v3 = ByteBufferUtil.toDebugHexString(value) + "_3";
+ String v4 = ByteBufferUtil.toDebugHexString(value) + "_4";
+ String v5 = ByteBufferUtil.toDebugHexString(value) + "_5";
+
+ // insert some clusterings, and flush
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ // insert some clusterings
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, set(v1, v2, v3, v4));
+ }
+ }
+ maybeFlush();
+
+ // for each row, delete some set elements
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("UPDATE %s SET v = v - { '" + v1 + "' } WHERE k = ? AND c = ?", k, c);
+ execute("UPDATE %s SET v = v - ? WHERE k = ? AND c = ?", set(v3, v5), k, c);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+
+ // for each row, add some set elements
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("UPDATE %s SET v = v + ? WHERE k = ? AND c = ?", set(v1, v3), k, c);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+
+ // for each row, drop the entire set
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, null);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+ }
+
+ @Test
+ public void testAggregationWithMaps()
+ {
+ // we are only interested in the not NULL value case
+ Assume.assumeTrue(value != null);
+
+ createTable("CREATE TABLE %s (k bigint, c int, v map, PRIMARY KEY(k, c))");
+
+ String stringValue = ByteBufferUtil.toDebugHexString(value);
+ String k1 = stringValue + "_k_1";
+ String k2 = stringValue + "_k_2";
+ String k3 = stringValue + "_k_3";
+ String k4 = stringValue + "_k_4";
+ String k5 = stringValue + "_k_5";
+ String v1 = stringValue + "_v_1";
+ String v2 = stringValue + "_v_2";
+ String v3 = stringValue + "_v_3";
+ String v4 = stringValue + "_v_4";
+ String v5 = stringValue + "_v_5";
+
+ // insert some clusterings, and flush
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ // insert some clusterings
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c,
+ map(k1, v1, k2, v2, k3, v3, k4, v4, k5, v5));
+ }
+ }
+ maybeFlush();
+
+ // for each row, delete some map elements
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("UPDATE %s SET v = v - { '" + k1 + "' } WHERE k = ? AND c = ?", k, c);
+ execute("UPDATE %s SET v = v - ? WHERE k = ? AND c = ?", set(k3, k5), k, c);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+
+ // for each row, add some map elements
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("UPDATE %s SET v = v + ? WHERE k = ? AND c = ?", map(k1, v1, k3, v3), k, c);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+
+ // for each row, drop the entire map
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, null);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+ }
+
+ @Test
+ public void testAggregationWithUDTs()
+ {
+ // we are only interested in the not NULL value case
+ Assume.assumeTrue(value != null);
+
+ String type = createType("CREATE TYPE %s (x blob, y blob)");
+ createTable("CREATE TABLE %s (k bigint, c int, v " + type + ", PRIMARY KEY(k, c))");
+
+ // insert some rows, and flush
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ // insert some clusterings
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, tuple(value, value));
+ }
+ }
+ maybeFlush();
+
+ // for each row, delete a tuple element
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("UPDATE %s SET v.x = null WHERE k = ? AND c = ?", k, c);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+
+ // for each row, update a tuple element
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("UPDATE %s SET v.x = ? WHERE k = ? AND c = ?", value, k, c);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+
+ // for each row, delete the tuple
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("DELETE v FROM %s WHERE k = ? AND c = ?", k, c);
+ }
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+ }
+
+ @Test
+ public void testTTLsWithSkinnyTable()
+ {
+ // we are only interested in the not NULL value case
+ Assume.assumeTrue(value != null);
+
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, c int, v text)");
+
+ String stringValue = ByteBufferUtil.toDebugHexString(value);
+ String v1 = stringValue + "_1";
+ String v2 = stringValue + "_2";
+
+ // insert some rows, and flush
+ for (int k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ execute("INSERT INTO %s (k, v) VALUES (?, ?)", k, v1);
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS);
+
+ // Give a TTL the two first and two last partitions
+ execute("UPDATE %s USING TTL 1 SET v = ? WHERE k = ? ", v2, 1);
+ execute("UPDATE %s USING TTL 1 SET v = ? WHERE k = ? ", v2, NUM_PARTITIONS - 1);
+ maybeFlush();
+
+ // wait for the TTL to expire
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS);
+ }
+
+ @Test
+ public void testTTLsWithWideTable()
+ {
+ // we are only interested in the not NULL value case
+ Assume.assumeTrue(value != null);
+
+ createTable("CREATE TABLE %s (k int, c int, v text, PRIMARY KEY(k, c))");
+
+ String stringValue = ByteBufferUtil.toDebugHexString(value);
+ String v1 = stringValue + "_1";
+ String v2 = stringValue + "_2";
+
+ // insert some rows, and flush
+ for (int k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", k, c, v1);
+ }
+ }
+ maybeFlush();
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+
+ // for each partition, give a TTL the two first and two last clusterings
+ for (int k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ execute("UPDATE %s USING TTL 1 SET v = ? WHERE k = ? AND c = ?", v2, k, 1);
+ execute("UPDATE %s USING TTL 1 SET v = ? WHERE k = ? AND c = ?", v2, k, NUM_CLUSTERINGS - 1);
+ }
+ maybeFlush();
+
+ // wait for the TTL to expire
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+ }
+
+ @Test
+ public void testAggregationWithPurgeableTombstones()
+ {
+ // create the table with a very short gc_grace_seconds, so there are tombstones to be purged on the replicas
+ createTable("CREATE TABLE %s (k bigint, c int, v blob, PRIMARY KEY(k, c)) WITH gc_grace_seconds = 0");
+
+ // insert some clusterings, and flush
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ // insert some clusterings
+ for (int c = 1; c <= NUM_CLUSTERINGS; c++)
+ {
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?) USING TIMESTAMP 1", k, c, value);
+ }
+ }
+ maybeFlush();
+
+ // for each partition, update the first and last clustering
+ for (long k = 1; k <= NUM_PARTITIONS; k++)
+ {
+ execute("DELETE v FROM %s USING TIMESTAMP 2 WHERE k = ? AND c = ?", k, 1);
+ execute("DELETE v FROM %s USING TIMESTAMP 2 WHERE k = ? AND c = ?", k, NUM_CLUSTERINGS);
+
+ // test aggregation on single partition query
+ assertPartitionCount(k, NUM_CLUSTERINGS);
+ }
+ maybeFlush();
+
+ // wait for the tombstones to be purgeable
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ // test aggregation on range query
+ assertRangeCount(NUM_PARTITIONS * NUM_CLUSTERINGS);
+ }
+
+ private void assertPartitionCount(Object k, int expectedCount)
+ {
+ assertCount("SELECT * FROM %s WHERE k=?", "SELECT COUNT(*) FROM %s WHERE k=?", expectedCount, k);
+ }
+
+ private void assertRangeCount(int expectedCount)
+ {
+ assertCount("SELECT * FROM %s", "SELECT COUNT(*) FROM %s", expectedCount);
+ }
+
+ private void assertCount(String selectQuery, String countQuery, int expectedCount, Object... args)
+ {
+ int selectRows = execute(selectQuery, args).size();
+ long selectCountRows = execute(countQuery, args).one().getLong("count");
+ Assertions.assertThat(selectRows).isEqualTo(expectedCount); // both are consistent
+ Assertions.assertThat(selectRows).isEqualTo(selectCountRows); // both are correct
+ }
+
+ private void maybeFlush()
+ {
+ if (flush)
+ flush();
+ }
+}