Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/java/org/apache/cassandra/db/filter/DataLimits.java
Original file line number Diff line number Diff line change
Expand Up @@ -625,14 +625,14 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
rowsInCurrentPartition = 0;
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
staticRowBytes = hasLiveStaticRow && bytesLimit != NO_LIMIT ? staticRow.dataSizeBeforePurge() : 0;
staticRowBytes = hasLiveStaticRow && bytesLimit != NO_LIMIT ? staticRow.liveDataSize(nowInSec) : 0;
}

@Override
public Row applyToRow(Row row)
{
if (isLive(row))
incrementRowCount(bytesLimit != NO_LIMIT ? row.dataSizeBeforePurge() : 0);
incrementRowCount(bytesLimit != NO_LIMIT ? row.liveDataSize(nowInSec) : 0);
return row;
}

Expand All @@ -647,9 +647,9 @@ public void onPartitionClose()
super.onPartitionClose();
}

protected void incrementRowCount(int rowSize)
protected void incrementRowCount(int liveRowSize)
{
bytesCounted += rowSize;
bytesCounted += liveRowSize;
rowsCounted++;
rowsInCurrentPartition++;
if (bytesCounted >= bytesLimit || rowsCounted >= rowLimit)
Expand Down Expand Up @@ -888,7 +888,7 @@ public DataLimits forShortReadRetry(int toFetch)
@Override
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// For the moment, we return the estimated number of rows as we have no good way of estimating
// For the moment, we return the estimated number of rows as we have no good way of estimating
// the number of groups that will be returned. Hopefully, we should be able to fix
// that problem at some point.
return super.estimateTotalResults(cfs);
Expand Down Expand Up @@ -1107,7 +1107,7 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
}
hasReturnedRowsFromCurrentPartition = false;
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
staticRowBytes = hasLiveStaticRow ? staticRow.dataSizeBeforePurge() : 0;
staticRowBytes = hasLiveStaticRow ? staticRow.liveDataSize(nowInSec) : 0;
}
currentPartitionKey = partitionKey;
// If we are done we need to preserve the groupInCurrentPartition and rowsCountedInCurrentPartition
Expand Down Expand Up @@ -1173,7 +1173,7 @@ public Row applyToRow(Row row)
if (isLive(row))
{
hasUnfinishedGroup = true;
incrementRowCount(bytesLimit != NO_LIMIT ? row.dataSizeBeforePurge() : 0);
incrementRowCount(bytesLimit != NO_LIMIT ? row.liveDataSize(nowInSec) : 0);
hasReturnedRowsFromCurrentPartition = true;
}

Expand Down Expand Up @@ -1210,11 +1210,11 @@ public int rowsCountedInCurrentPartition()
return rowsCountedInCurrentPartition;
}

protected void incrementRowCount(int rowSize)
protected void incrementRowCount(int rowLiveSize)
{
rowsCountedInCurrentPartition++;
rowsCounted++;
bytesCounted += rowSize;
bytesCounted += rowLiveSize;
if (rowsCounted >= rowLimit || bytesCounted >= bytesLimit)
stop();
}
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/db/rows/AbstractCell.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ public int dataSize()
+ (path == null ? 0 : path.dataSize());
}

@Override
public int liveDataSize(int nowInSec)
{
return isLive(nowInSec) ? dataSize() : 0;
}

public void digest(Digest digest)
{
if (isCounterCell())
Expand Down
49 changes: 13 additions & 36 deletions src/java/org/apache/cassandra/db/rows/BTreeRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,34 +88,18 @@ public class BTreeRow extends AbstractRow
// no expiring cells, this will be Integer.MAX_VALUE;
private final int minLocalDeletionTime;

/**
* The original data size of this row before purging it, or -1 if it hasn't been purged.
*/
private final int dataSizeBeforePurge;

private BTreeRow(Clustering<?> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
int minLocalDeletionTime,
int dataSizeBeforePurge)
int minLocalDeletionTime)
{
assert !deletion.isShadowedBy(primaryKeyLivenessInfo);
this.clustering = clustering;
this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
this.deletion = deletion;
this.btree = btree;
this.minLocalDeletionTime = minLocalDeletionTime;
this.dataSizeBeforePurge = dataSizeBeforePurge;
}

private BTreeRow(Clustering<?> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
int minLocalDeletionTime)
{
this(clustering, primaryKeyLivenessInfo, deletion, btree, minLocalDeletionTime, -1);
}

private BTreeRow(Clustering<?> clustering, Object[] btree, int minLocalDeletionTime)
Expand All @@ -139,23 +123,13 @@ public static BTreeRow create(Clustering<?> clustering,
return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
}

public static BTreeRow create(Clustering<?> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
int minDeletionTime,
int dataSizeBeforePurge)
{
return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime, dataSizeBeforePurge);
}

public static BTreeRow create(Clustering<?> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
int minDeletionTime)
{
return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime, -1);
return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
}

public static BTreeRow emptyRow(Clustering<?> clustering)
Expand Down Expand Up @@ -522,16 +496,16 @@ public Row purge(DeletionPurger purger, int nowInSec, boolean enforceStrictLiven
return null;

Function<ColumnData, ColumnData> columnDataPurger = (cd) -> cd.purge(purger, nowInSec);
return update(newInfo, newDeletion, BTree.transformAndFilter(btree, columnDataPurger), true);
return update(newInfo, newDeletion, BTree.transformAndFilter(btree, columnDataPurger));
}

@Override
public Row transformAndFilter(LivenessInfo info, Deletion deletion, Function<ColumnData, ColumnData> function)
{
return update(info, deletion, BTree.transformAndFilter(btree, function), false);
return update(info, deletion, BTree.transformAndFilter(btree, function));
}

private Row update(LivenessInfo info, Deletion deletion, Object[] newTree, boolean preserveDataSizeBeforePurge)
private Row update(LivenessInfo info, Deletion deletion, Object[] newTree)
{
if (btree == newTree && info == this.primaryKeyLivenessInfo && deletion == this.deletion)
return this;
Expand All @@ -541,8 +515,7 @@ private Row update(LivenessInfo info, Deletion deletion, Object[] newTree, boole

int minDeletionTime = minDeletionTime(newTree, info, deletion.time());

int dataSizeBeforePurge = preserveDataSizeBeforePurge ? dataSizeBeforePurge() : -1;
return BTreeRow.create(clustering, info, deletion, newTree, minDeletionTime, dataSizeBeforePurge);
return BTreeRow.create(clustering, info, deletion, newTree, minDeletionTime);
}

@Override
Expand All @@ -553,7 +526,7 @@ public Row transformAndFilter(Function<ColumnData, ColumnData> function)

public Row transform(Function<ColumnData, ColumnData> function)
{
return update(primaryKeyLivenessInfo, deletion, BTree.transform(btree, function), false);
return update(primaryKeyLivenessInfo, deletion, BTree.transform(btree, function));
}

@Override
Expand All @@ -573,9 +546,13 @@ public int dataSize()
}

@Override
public int dataSizeBeforePurge()
public int liveDataSize(int nowInSec)
{
return dataSizeBeforePurge >= 0 ? dataSizeBeforePurge : dataSize();
int dataSize = clustering.dataSize()
+ primaryKeyLivenessInfo.dataSize()
+ deletion.dataSize();

return Ints.checkedCast(accumulate((cd, v) -> v + cd.liveDataSize(nowInSec), dataSize));
}

public long unsharedHeapSizeExcludingData()
Expand Down
8 changes: 8 additions & 0 deletions src/java/org/apache/cassandra/db/rows/ColumnData.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,14 @@ protected ColumnData(ColumnMetadata column)
*/
public abstract int dataSize();

/**
* The size of the data hold by this {@code ColumnData} that is live at {@code nowInSec}.
*
* @param nowInSec the query timestamp in seconds
* @return the size used by the live data of this {@code ColumnData}.
*/
public abstract int liveDataSize(int nowInSec);

public abstract long unsharedHeapSizeExcludingData();

/**
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ public int dataSize()
return size;
}

@Override
public int liveDataSize(int nowInSec)
{
return complexDeletion.isLive() ? dataSize() : 0;
}

public long unsharedHeapSizeExcludingData()
{
long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells);
Expand Down
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/db/rows/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>

/**
* Whether the row has some live information (i.e. it's not just deletion informations).
*
*
* @param nowInSec the current time to decide what is deleted and what isn't
* @param enforceStrictLiveness whether the row should be purged if there is no PK liveness info,
* normally retrieved from {@link CFMetaData#enforceStrictLiveness()}
Expand Down Expand Up @@ -310,12 +310,12 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
public int dataSize();

/**
* Returns the original data size in bytes of this row as it was returned by {@link #dataSize()} before purging it
* from all deletion info with {@link #purge}.
* Returns the size of the data hold by this row that is live at {@code nowInSec}.
*
* @return the original data size of this row in bytes before purging
* @param nowInSec the query timestamp in seconds
* @return the size of the data hold by this row that is live at {@code nowInSec}.
*/
int dataSizeBeforePurge();
int liveDataSize(int nowInSec);

public long unsharedHeapSizeExcludingData();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ public int dataSize()
return cell.dataSize();
}

@Override
public int liveDataSize(int nowInSec)
{
return cell.liveDataSize(nowInSec);
}

@Override
public long unsharedHeapSizeExcludingData()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ public int dataSize()
}

@Override
public int dataSizeBeforePurge()
public int liveDataSize(int nowInSec)
{
return row.dataSizeBeforePurge();
return row.liveDataSize(nowInSec);
}

@Override
Expand Down
Loading