Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/java/org/apache/cassandra/db/filter/DataLimits.java
Original file line number Diff line number Diff line change
Expand Up @@ -628,14 +628,14 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
rowsInCurrentPartition = 0;
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
staticRowBytes = hasLiveStaticRow && bytesLimit != NO_LIMIT ? staticRow.dataSizeBeforePurge() : 0;
staticRowBytes = hasLiveStaticRow && bytesLimit != NO_LIMIT ? staticRow.liveDataSize(nowInSec) : 0;
}

@Override
public Row applyToRow(Row row)
{
if (isLive(row))
incrementRowCount(bytesLimit != NO_LIMIT ? row.dataSizeBeforePurge() : 0);
incrementRowCount(bytesLimit != NO_LIMIT ? row.liveDataSize(nowInSec) : 0);
return row;
}

Expand All @@ -650,9 +650,9 @@ public void onPartitionClose()
super.onPartitionClose();
}

protected void incrementRowCount(int rowSize)
protected void incrementRowCount(int liveRowSize)
{
bytesCounted += rowSize;
bytesCounted += liveRowSize;
rowsCounted++;
rowsInCurrentPartition++;
if (bytesCounted >= bytesLimit || rowsCounted >= rowLimit)
Expand Down Expand Up @@ -891,7 +891,7 @@ public DataLimits forShortReadRetry(int toFetch)
@Override
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// For the moment, we return the estimated number of rows as we have no good way of estimating
// For the moment, we return the estimated number of rows as we have no good way of estimating
// the number of groups that will be returned. Hopefully, we should be able to fix
// that problem at some point.
return super.estimateTotalResults(cfs);
Expand Down Expand Up @@ -1110,7 +1110,7 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
}
hasReturnedRowsFromCurrentPartition = false;
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
staticRowBytes = hasLiveStaticRow ? staticRow.dataSizeBeforePurge() : 0;
staticRowBytes = hasLiveStaticRow ? staticRow.liveDataSize(nowInSec) : 0;
}
currentPartitionKey = partitionKey;
// If we are done we need to preserve the groupInCurrentPartition and rowsCountedInCurrentPartition
Expand Down Expand Up @@ -1176,7 +1176,7 @@ public Row applyToRow(Row row)
if (isLive(row))
{
hasUnfinishedGroup = true;
incrementRowCount(bytesLimit != NO_LIMIT ? row.dataSizeBeforePurge() : 0);
incrementRowCount(bytesLimit != NO_LIMIT ? row.liveDataSize(nowInSec) : 0);
hasReturnedRowsFromCurrentPartition = true;
}

Expand Down Expand Up @@ -1213,11 +1213,11 @@ public int rowsCountedInCurrentPartition()
return rowsCountedInCurrentPartition;
}

protected void incrementRowCount(int rowSize)
protected void incrementRowCount(int rowLiveSize)
{
rowsCountedInCurrentPartition++;
rowsCounted++;
bytesCounted += rowSize;
bytesCounted += rowLiveSize;
if (rowsCounted >= rowLimit || bytesCounted >= bytesLimit)
stop();
}
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/db/rows/AbstractCell.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ public int dataSize()
+ (path == null ? 0 : path.dataSize());
}

@Override
public int liveDataSize(long nowInSec)
{
return isLive(nowInSec) ? dataSize() : 0;
}

public void digest(Digest digest)
{
if (isCounterCell())
Expand Down
49 changes: 13 additions & 36 deletions src/java/org/apache/cassandra/db/rows/BTreeRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,34 +90,18 @@ public class BTreeRow extends AbstractRow
// no expiring cells, this will be Cell.MAX_DELETION_TIME;
private final long minLocalDeletionTime;

/**
* The original data size of this row before purging it, or -1 if it hasn't been purged.
*/
private final int dataSizeBeforePurge;

private BTreeRow(Clustering<?> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
long minLocalDeletionTime,
int dataSizeBeforePurge)
long minLocalDeletionTime)
{
assert !deletion.isShadowedBy(primaryKeyLivenessInfo);
this.clustering = clustering;
this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
this.deletion = deletion;
this.btree = btree;
this.minLocalDeletionTime = minLocalDeletionTime;
this.dataSizeBeforePurge = dataSizeBeforePurge;
}

private BTreeRow(Clustering<?> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
long minLocalDeletionTime)
{
this(clustering, primaryKeyLivenessInfo, deletion, btree, minLocalDeletionTime, -1);
}

private BTreeRow(Clustering<?> clustering, Object[] btree, long minLocalDeletionTime)
Expand All @@ -141,23 +125,13 @@ public static BTreeRow create(Clustering<?> clustering,
return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
}

public static BTreeRow create(Clustering<?> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
long minDeletionTime,
int dataSizeBeforePurge)
{
return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime, dataSizeBeforePurge);
}

public static BTreeRow create(Clustering<?> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
long minDeletionTime)
{
return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime, -1);
return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
}

public static BTreeRow emptyRow(Clustering<?> clustering)
Expand Down Expand Up @@ -523,7 +497,7 @@ public Row purge(DeletionPurger purger, long nowInSec, boolean enforceStrictLive
return null;

Function<ColumnData, ColumnData> columnDataPurger = (cd) -> cd.purge(purger, nowInSec);
return update(newInfo, newDeletion, BTree.transformAndFilter(btree, columnDataPurger), true);
return update(newInfo, newDeletion, BTree.transformAndFilter(btree, columnDataPurger));
}

public Row purgeDataOlderThan(long timestamp, boolean enforceStrictLiveness)
Expand All @@ -541,10 +515,10 @@ public Row purgeDataOlderThan(long timestamp, boolean enforceStrictLiveness)
@Override
public Row transformAndFilter(LivenessInfo info, Deletion deletion, Function<ColumnData, ColumnData> function)
{
return update(info, deletion, BTree.transformAndFilter(btree, function), false);
return update(info, deletion, BTree.transformAndFilter(btree, function));
}

private Row update(LivenessInfo info, Deletion deletion, Object[] newTree, boolean preserveDataSizeBeforePurge)
private Row update(LivenessInfo info, Deletion deletion, Object[] newTree)
{
if (btree == newTree && info == this.primaryKeyLivenessInfo && deletion == this.deletion)
return this;
Expand All @@ -554,8 +528,7 @@ private Row update(LivenessInfo info, Deletion deletion, Object[] newTree, boole

long minDeletionTime = minDeletionTime(newTree, info, deletion.time());

int dataSizeBeforePurge = preserveDataSizeBeforePurge ? dataSizeBeforePurge() : -1;
return BTreeRow.create(clustering, info, deletion, newTree, minDeletionTime, dataSizeBeforePurge);
return BTreeRow.create(clustering, info, deletion, newTree, minDeletionTime);
}

@Override
Expand All @@ -566,7 +539,7 @@ public Row transformAndFilter(Function<ColumnData, ColumnData> function)

public Row transform(Function<ColumnData, ColumnData> function)
{
return update(primaryKeyLivenessInfo, deletion, BTree.transform(btree, function), false);
return update(primaryKeyLivenessInfo, deletion, BTree.transform(btree, function));
}

@Override
Expand Down Expand Up @@ -598,9 +571,13 @@ public long unsharedHeapSize()
}

@Override
public int dataSizeBeforePurge()
public int liveDataSize(long nowInSec)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd move this method up so it's right after the very similar dataSize method.

{
return dataSizeBeforePurge >= 0 ? dataSizeBeforePurge : dataSize();
int dataSize = clustering.dataSize()
+ primaryKeyLivenessInfo.dataSize()
+ deletion.dataSize();

return Ints.checkedCast(accumulate((cd, v) -> v + cd.liveDataSize(nowInSec), dataSize));
}

public long unsharedHeapSizeExcludingData()
Expand Down
8 changes: 8 additions & 0 deletions src/java/org/apache/cassandra/db/rows/ColumnData.java
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@ protected ColumnData(ColumnMetadata column)
*/
public abstract int dataSize();

/**
* The size of the data hold by this {@code ColumnData} that is live at {@code nowInSec}.
*
* @param nowInSec the query timestamp in seconds
* @return the size used by the live data of this {@code ColumnData}.
*/
public abstract int liveDataSize(long nowInSec);

public abstract long unsharedHeapSizeExcludingData();

public abstract long unsharedHeapSize();
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,19 @@ public int dataSize()
return size;
}

@Override
public long unsharedHeapSize()
{
long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells) + complexDeletion.unsharedHeapSize();
return BTree.<Cell>accumulate(cells, (cell, value) -> value + cell.unsharedHeapSize(), heapSize);
}

@Override
public int liveDataSize(long nowInSec)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, I'd move this to be right after dataSize()

{
return complexDeletion.isLive() ? dataSize() : 0;
}

public long unsharedHeapSizeExcludingData()
{
long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells);
Expand Down
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/db/rows/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>, IMeasurableMemory

/**
* Whether the row has some live information (i.e. it's not just deletion informations).
*
*
* @param nowInSec the current time to decide what is deleted and what isn't
* @param enforceStrictLiveness whether the row should be purged if there is no PK liveness info,
* normally retrieved from {@link TableMetadata#enforceStrictLiveness()}
Expand Down Expand Up @@ -326,12 +326,12 @@ public interface Row extends Unfiltered, Iterable<ColumnData>, IMeasurableMemory
public int dataSize();

/**
* Returns the original data size in bytes of this row as it was returned by {@link #dataSize()} before purging it
* from all deletion info with {@link #purge}.
* Returns the size of the data hold by this row that is live at {@code nowInSec}.
*
* @return the original data size of this row in bytes before purging
* @param nowInSec the query timestamp in seconds
* @return the size of the data hold by this row that is live at {@code nowInSec}.
*/
int dataSizeBeforePurge();
int liveDataSize(long nowInSec);

public long unsharedHeapSizeExcludingData();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ public int dataSize()
return cell.dataSize();
}

@Override
public int liveDataSize(long nowInSec)
{
return cell.liveDataSize(nowInSec);
}

@Override
public long unsharedHeapSizeExcludingData()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,9 @@ public int dataSize()
}

@Override
public int dataSizeBeforePurge()
public int liveDataSize(long nowInSec)
{
return row.dataSizeBeforePurge();
return row.liveDataSize(nowInSec);
}

@Override
Expand Down
Loading