Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/java/org/apache/cassandra/db/filter/DataLimits.java
Original file line number Diff line number Diff line change
Expand Up @@ -625,14 +625,14 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
rowsInCurrentPartition = 0;
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
staticRowBytes = hasLiveStaticRow && bytesLimit != NO_LIMIT ? staticRow.originalDataSize() : 0;
staticRowBytes = hasLiveStaticRow && bytesLimit != NO_LIMIT ? staticRow.liveDataSize(nowInSec) : 0;
}

@Override
public Row applyToRow(Row row)
{
if (isLive(row))
incrementRowCount(bytesLimit != NO_LIMIT ? row.originalDataSize() : 0);
incrementRowCount(bytesLimit != NO_LIMIT ? row.liveDataSize(nowInSec) : 0);
return row;
}

Expand All @@ -647,9 +647,9 @@ public void onPartitionClose()
super.onPartitionClose();
}

protected void incrementRowCount(int rowSize)
protected void incrementRowCount(int rowLiveSize)
{
bytesCounted += rowSize;
bytesCounted += rowLiveSize;
rowsCounted++;
rowsInCurrentPartition++;
if (bytesCounted >= bytesLimit || rowsCounted >= rowLimit)
Expand Down Expand Up @@ -888,7 +888,7 @@ public DataLimits forShortReadRetry(int toFetch)
@Override
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// For the moment, we return the estimated number of rows as we have no good way of estimating
// For the moment, we return the estimated number of rows as we have no good way of estimating
// the number of groups that will be returned. Hopefully, we should be able to fix
// that problem at some point.
return super.estimateTotalResults(cfs);
Expand Down Expand Up @@ -1107,7 +1107,7 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
}
hasReturnedRowsFromCurrentPartition = false;
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
staticRowBytes = hasLiveStaticRow ? staticRow.originalDataSize() : 0;
staticRowBytes = hasLiveStaticRow ? staticRow.liveDataSize(nowInSec) : 0;
}
currentPartitionKey = partitionKey;
// If we are done we need to preserve the groupInCurrentPartition and rowsCountedInCurrentPartition
Expand Down Expand Up @@ -1173,7 +1173,7 @@ public Row applyToRow(Row row)
if (isLive(row))
{
hasUnfinishedGroup = true;
incrementRowCount(bytesLimit != NO_LIMIT ? row.originalDataSize() : 0);
incrementRowCount(bytesLimit != NO_LIMIT ? row.liveDataSize(nowInSec) : 0);
hasReturnedRowsFromCurrentPartition = true;
}

Expand Down Expand Up @@ -1210,11 +1210,11 @@ public int rowsCountedInCurrentPartition()
return rowsCountedInCurrentPartition;
}

protected void incrementRowCount(int rowSize)
protected void incrementRowCount(int rowLiveSize)
{
rowsCountedInCurrentPartition++;
rowsCounted++;
bytesCounted += rowSize;
bytesCounted += rowLiveSize;
if (rowsCounted >= rowLimit || bytesCounted >= bytesLimit)
stop();
}
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/db/rows/AbstractCell.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ public int dataSize()
+ (path == null ? 0 : path.dataSize());
}

@Override
public int liveDataSize(int nowInSec)
{
return isLive(nowInSec) ? dataSize() : 0;
}

public void digest(Digest digest)
{
if (isCounterCell())
Expand Down
40 changes: 9 additions & 31 deletions src/java/org/apache/cassandra/db/rows/BTreeRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,34 +88,18 @@ public class BTreeRow extends AbstractRow
// no expiring cells, this will be Integer.MAX_VALUE;
private final int minLocalDeletionTime;

/**
* The original data size of this row before purging it, or -1 if it hasn't been purged.
*/
private final int originalDataSize;

private BTreeRow(Clustering<?> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
int minLocalDeletionTime,
int originalDataSize)
int minLocalDeletionTime)
{
assert !deletion.isShadowedBy(primaryKeyLivenessInfo);
this.clustering = clustering;
this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
this.deletion = deletion;
this.btree = btree;
this.minLocalDeletionTime = minLocalDeletionTime;
this.originalDataSize = originalDataSize;
}

private BTreeRow(Clustering<?> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
int minLocalDeletionTime)
{
this(clustering, primaryKeyLivenessInfo, deletion, btree, minLocalDeletionTime, -1);
}

private BTreeRow(Clustering<?> clustering, Object[] btree, int minLocalDeletionTime)
Expand All @@ -139,23 +123,13 @@ public static BTreeRow create(Clustering<?> clustering,
return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
}

public static BTreeRow create(Clustering<?> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
int minDeletionTime,
int originalDataSize)
{
return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime, originalDataSize);
}

public static BTreeRow create(Clustering<?> clustering,
LivenessInfo primaryKeyLivenessInfo,
Deletion deletion,
Object[] btree,
int minDeletionTime)
{
return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime, -1);
return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
}

public static BTreeRow emptyRow(Clustering<?> clustering)
Expand Down Expand Up @@ -538,7 +512,7 @@ private Row update(LivenessInfo info, Deletion deletion, Object[] newTree)
return null;

int minDeletionTime = minDeletionTime(newTree, info, deletion.time());
return BTreeRow.create(clustering, info, deletion, newTree, minDeletionTime, originalDataSize());
return BTreeRow.create(clustering, info, deletion, newTree, minDeletionTime);
}

@Override
Expand Down Expand Up @@ -569,9 +543,13 @@ public int dataSize()
}

@Override
public int originalDataSize()
public int liveDataSize(int nowInSec)
{
return originalDataSize >= 0 ? originalDataSize : dataSize();
int dataSize = clustering.dataSize()
+ primaryKeyLivenessInfo.dataSize()
+ deletion.dataSize();

return Ints.checkedCast(accumulate((cd, v) -> v + cd.liveDataSize(nowInSec), dataSize));
}

public long unsharedHeapSizeExcludingData()
Expand Down
8 changes: 8 additions & 0 deletions src/java/org/apache/cassandra/db/rows/ColumnData.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,14 @@ protected ColumnData(ColumnMetadata column)
*/
public abstract int dataSize();

/**
* The size of the data hold by this {@code ColumnData} that is live at {@code nowInSec}.
*
* @param nowInSec the query timestamp in seconds
* @return the size used by the live data of this {@code ColumnData}.
*/
public abstract int liveDataSize(int nowInSec);

public abstract long unsharedHeapSizeExcludingData();

/**
Expand Down
9 changes: 6 additions & 3 deletions src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Digest;
import org.apache.cassandra.db.LivenessInfo;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ByteType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.DroppedColumn;
Expand Down Expand Up @@ -146,6 +143,12 @@ public int dataSize()
return size;
}

@Override
public int liveDataSize(int nowInSec)
{
return complexDeletion.isLive() ? dataSize() : 0;
}

public long unsharedHeapSizeExcludingData()
{
long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells);
Expand Down
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/db/rows/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>

/**
* Whether the row has some live information (i.e. it's not just deletion informations).
*
*
* @param nowInSec the current time to decide what is deleted and what isn't
* @param enforceStrictLiveness whether the row should be purged if there is no PK liveness info,
* normally retrieved from {@link CFMetaData#enforceStrictLiveness()}
Expand Down Expand Up @@ -310,12 +310,12 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
public int dataSize();

/**
* Returns the original data size in bytes of this row as it was returned by {@link #dataSize()} before purging it
* from all deletion info with {@link #purge}.
* Returns the size of the data hold by this row that is live at {@code nowInSec}.
*
* @return the original data size of this row in bytes before purging
* @param nowInSec the query timestamp in seconds
* @return the size of the data hold by this row that is live at {@code nowInSec}.
*/
int originalDataSize();
int liveDataSize(int nowInSec);

public long unsharedHeapSizeExcludingData();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ public int dataSize()
return cell.dataSize();
}

@Override
public int liveDataSize(int nowInSec)
{
return cell.liveDataSize(nowInSec);
}

@Override
public long unsharedHeapSizeExcludingData()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ public int dataSize()
}

@Override
public int originalDataSize()
public int liveDataSize(int nowInSec)
{
return row.originalDataSize();
return row.liveDataSize(nowInSec);
}

@Override
Expand Down
Loading
Loading