Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static final class Builder {
private byte[] value;
private Filter filter;
private TimeRange timeRange;
private boolean queryMetricsEnabled = false;

private Builder(byte[] row) {
this.row = Preconditions.checkNotNull(row, "row is null");
Expand Down Expand Up @@ -133,6 +134,21 @@ public Builder timeRange(TimeRange timeRange) {
return this;
}

/**
* Enables the return of {@link QueryMetrics} alongside the corresponding result for this query
* <p>
* This is intended for advanced users who need result-granular, server-side metrics
* <p>
* Does not work
* @param queryMetricsEnabled {@code true} to enable collection of per-result query metrics
* {@code false} to disable metrics collection (resulting in
* {@code null} metrics)
*/
public Builder queryMetricsEnabled(boolean queryMetricsEnabled) {
this.queryMetricsEnabled = queryMetricsEnabled;
return this;
}

private void preCheck(Row action) {
Preconditions.checkNotNull(action, "action is null");
if (!Bytes.equals(row, action.getRow())) {
Expand All @@ -154,9 +170,10 @@ private void preCheck(Row action) {
public CheckAndMutate build(Put put) {
preCheck(put);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, put);
return new CheckAndMutate(row, filter, timeRange, put, queryMetricsEnabled);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, put);
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, put,
queryMetricsEnabled);
}
}

Expand All @@ -168,9 +185,10 @@ public CheckAndMutate build(Put put) {
public CheckAndMutate build(Delete delete) {
preCheck(delete);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, delete);
return new CheckAndMutate(row, filter, timeRange, delete, queryMetricsEnabled);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, delete);
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, delete,
queryMetricsEnabled);
}
}

Expand All @@ -182,9 +200,10 @@ public CheckAndMutate build(Delete delete) {
public CheckAndMutate build(Increment increment) {
preCheck(increment);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, increment);
return new CheckAndMutate(row, filter, timeRange, increment, queryMetricsEnabled);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, increment);
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, increment,
queryMetricsEnabled);
}
}

Expand All @@ -196,9 +215,10 @@ public CheckAndMutate build(Increment increment) {
public CheckAndMutate build(Append append) {
preCheck(append);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, append);
return new CheckAndMutate(row, filter, timeRange, append, queryMetricsEnabled);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, append);
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, append,
queryMetricsEnabled);
}
}

Expand All @@ -210,9 +230,10 @@ public CheckAndMutate build(Append append) {
public CheckAndMutate build(RowMutations mutations) {
preCheck(mutations);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, mutations);
return new CheckAndMutate(row, filter, timeRange, mutations, queryMetricsEnabled);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutations);
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutations,
queryMetricsEnabled);
}
}
}
Expand All @@ -234,9 +255,10 @@ public static Builder newBuilder(byte[] row) {
private final Filter filter;
private final TimeRange timeRange;
private final Row action;
private final boolean queryMetricsEnabled;

private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier, final CompareOperator op,
byte[] value, TimeRange timeRange, Row action) {
byte[] value, TimeRange timeRange, Row action, boolean queryMetricsEnabled) {
this.row = row;
this.family = family;
this.qualifier = qualifier;
Expand All @@ -245,9 +267,11 @@ private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier, final Compar
this.filter = null;
this.timeRange = timeRange != null ? timeRange : TimeRange.allTime();
this.action = action;
this.queryMetricsEnabled = queryMetricsEnabled;
}

private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row action) {
private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row action,
boolean queryMetricsEnabled) {
this.row = row;
this.family = null;
this.qualifier = null;
Expand All @@ -256,6 +280,7 @@ private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row actio
this.filter = filter;
this.timeRange = timeRange != null ? timeRange : TimeRange.allTime();
this.action = action;
this.queryMetricsEnabled = queryMetricsEnabled;
}

/** Returns the row */
Expand Down Expand Up @@ -326,4 +351,9 @@ public TimeRange getTimeRange() {
public Row getAction() {
return action;
}

/** Returns whether query metrics are enabled */
public boolean isQueryMetricsEnabled() {
return queryMetricsEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class CheckAndMutateResult {
private final boolean success;
private final Result result;

private QueryMetrics metrics = null;

public CheckAndMutateResult(boolean success, Result result) {
this.success = success;
this.result = result;
Expand All @@ -41,4 +43,13 @@ public boolean isSuccess() {
public Result getResult() {
return result;
}

public CheckAndMutateResult setMetrics(QueryMetrics metrics) {
this.metrics = metrics;
return this;
}

public QueryMetrics getMetrics() {
return metrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public Get(Get get) {
this.setFilter(get.getFilter());
this.setReplicaId(get.getReplicaId());
this.setConsistency(get.getConsistency());
this.setQueryMetricsEnabled(get.isQueryMetricsEnabled());
// from Get
this.cacheBlocks = get.getCacheBlocks();
this.maxVersions = get.getMaxVersions();
Expand Down Expand Up @@ -511,6 +512,7 @@ public Map<String, Object> toMap(int maxCols) {
map.put("colFamTimeRangeMap", colFamTimeRangeMapStr);
}
map.put("priority", getPriority());
map.put("queryMetricsEnabled", queryMetricsEnabled);
return map;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,10 +744,8 @@ public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] q
.setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
HBaseSemanticAttributes.Operation.PUT);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, put)
.isSuccess(),
supplier);
return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL,
value, null, null, put, false).isSuccess(), supplier);
}

@Override
Expand All @@ -759,7 +757,7 @@ public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] q
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
HBaseSemanticAttributes.Operation.PUT);
return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier,
toCompareOperator(compareOp), value, null, null, put).isSuccess(), supplier);
toCompareOperator(compareOp), value, null, null, put, false).isSuccess(), supplier);
}

@Override
Expand All @@ -771,7 +769,7 @@ public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] q
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
HBaseSemanticAttributes.Operation.PUT);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(),
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put, false).isSuccess(),
supplier);
}

Expand All @@ -784,7 +782,7 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
HBaseSemanticAttributes.Operation.DELETE);
return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL,
value, null, null, delete).isSuccess(), supplier);
value, null, null, delete, false).isSuccess(), supplier);
}

@Override
Expand All @@ -796,7 +794,7 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
HBaseSemanticAttributes.Operation.DELETE);
return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier,
toCompareOperator(compareOp), value, null, null, delete).isSuccess(), supplier);
toCompareOperator(compareOp), value, null, null, delete, false).isSuccess(), supplier);
}

@Override
Expand All @@ -807,9 +805,9 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[
.setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
HBaseSemanticAttributes.Operation.DELETE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(),
supplier);
return TraceUtil
.trace(() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete, false)
.isSuccess(), supplier);
}

@Override
Expand All @@ -826,7 +824,8 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter)

private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange, final RowMutations rm) throws IOException {
final TimeRange timeRange, final RowMutations rm, boolean queryMetricsEnabled)
throws IOException {
long nonceGroup = getNonceGroup();
long nonce = getNonce();
CancellableRegionServerCallable<MultiResponse> callable =
Expand All @@ -835,9 +834,9 @@ private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] fam
rm.getMaxPriority(), requestAttributes) {
@Override
protected MultiResponse rpcCall() throws Exception {
MultiRequest request =
RequestConverter.buildMultiRequest(getLocation().getRegionInfo().getRegionName(), row,
family, qualifier, op, value, filter, timeRange, rm, nonceGroup, nonce);
MultiRequest request = RequestConverter.buildMultiRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
filter, timeRange, rm, nonceGroup, nonce, queryMetricsEnabled);
ClientProtos.MultiResponse response = doMulti(request);
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
if (res.hasException()) {
Expand Down Expand Up @@ -880,7 +879,7 @@ public boolean checkAndMutate(final byte[] row, final byte[] family, final byte[
.setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(rm);
return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier,
toCompareOperator(compareOp), value, null, null, rm).isSuccess(), supplier);
toCompareOperator(compareOp), value, null, null, rm, false).isSuccess(), supplier);
}

@Override
Expand All @@ -891,7 +890,7 @@ public boolean checkAndMutate(final byte[] row, final byte[] family, final byte[
.setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(rm);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(),
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm, false).isSuccess(),
supplier);
}

Expand All @@ -910,28 +909,31 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws
}
return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Mutation) action);
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Mutation) action,
checkAndMutate.isQueryMetricsEnabled());
} else {
return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action);
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action,
checkAndMutate.isQueryMetricsEnabled());
}
}, supplier);
}

private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange, final Mutation mutation) throws IOException {
final TimeRange timeRange, final Mutation mutation, boolean queryMetricsEnabled)
throws IOException {
long nonceGroup = getNonceGroup();
long nonce = getNonce();
ClientServiceCallable<CheckAndMutateResult> callable =
new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row,
this.rpcControllerFactory.newController(), mutation.getPriority(), requestAttributes) {
@Override
protected CheckAndMutateResult rpcCall() throws Exception {
MutateRequest request =
RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row,
family, qualifier, op, value, filter, timeRange, mutation, nonceGroup, nonce);
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
filter, timeRange, mutation, nonceGroup, nonce, queryMetricsEnabled);
MutateResponse response = doMutate(request);
if (response.hasResult()) {
return new CheckAndMutateResult(response.getProcessed(),
Expand Down Expand Up @@ -1419,7 +1421,7 @@ public boolean thenPut(Put put) throws IOException {
return TraceUtil.trace(() -> {
validatePut(put);
preCheck();
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put)
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put, false)
.isSuccess();
}, supplier);
}
Expand All @@ -1430,7 +1432,7 @@ public boolean thenDelete(Delete delete) throws IOException {
.setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(() -> {
preCheck();
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete)
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete, false)
.isSuccess();
}, supplier);
}
Expand All @@ -1441,7 +1443,7 @@ public boolean thenMutate(RowMutations mutation) throws IOException {
.setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(() -> {
preCheck();
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, mutation)
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, mutation, false)
.isSuccess();
}, supplier);
}
Expand Down Expand Up @@ -1475,26 +1477,28 @@ public boolean thenPut(Put put) throws IOException {
.setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(() -> {
validatePut(put);
return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put).isSuccess();
return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put, false)
.isSuccess();
}, supplier);
}

@Override
public boolean thenDelete(Delete delete) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess(),
supplier);
return TraceUtil
.trace(() -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete, false)
.isSuccess(), supplier);
}

@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil
.trace(() -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation)
.isSuccess(), supplier);
return TraceUtil.trace(
() -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation, false)
.isSuccess(),
supplier);
}
}
}
Loading