Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import static org.apache.phoenix.util.ScanUtil.isDummy;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* ScannerContext has all methods package visible. To properly update the context progress for our
* scanners we need this helper
*/
public class PhoenixScannerContext extends ScannerContext {

private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixScannerContext.class);

// tracks the start time of the rpc on the server for server paging
private final long startTime;

/**
* The scanner remains open on the server during the course of multiple scan rpc requests. We need
* a way to determine during the next() call if it is a new scan rpc request on the same scanner.
* This is needed so that we can reset the start time for server paging. Every scan rpc request
* creates a new ScannerContext which has the lastPeekedCell set to null in the beginning.
* Subsequent next() calls will set this field in the ScannerContext.
*/
public static boolean isNewScanRpcRequest(ScannerContext scannerContext) {
return scannerContext != null && scannerContext.getLastPeekedCell() == null;
}

public PhoenixScannerContext(ScannerContext hbaseContext) {
// set limits to null to create no limit context
super(Objects.requireNonNull(hbaseContext).keepProgress, null,
Objects.requireNonNull(hbaseContext).isTrackingMetrics());
startTime = EnvironmentEdgeManager.currentTimeMillis();
}

public PhoenixScannerContext(boolean trackMetrics) {
super(false, null, trackMetrics);
startTime = EnvironmentEdgeManager.currentTimeMillis();
}

public long getStartTime() {
return startTime;
}

public void incrementSizeProgress(List<Cell> cells) {
for (Cell cell : cells) {
super.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), cell.heapSize());
}
}

/**
* returnImmediately is a private field in ScannerContext and there is no getter API on it But the
* checkTimeLimit API on the ScannerContext will return true if returnImmediately is set
*/
public boolean isReturnImmediately() {
return checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS);
}

/**
* Update the scanner context created by RSRpcServices so that it can act accordingly
* @param dst hbase scanner context created on every new scan rpc request
* @param result list of cells to be returned to the client as scan rpc response
*/
public void updateHBaseScannerContext(ScannerContext dst, List<Cell> result) {
if (dst == null) {
return;
}
// update last peeked cell
dst.setLastPeekedCell(getLastPeekedCell());
// update return immediately
if (isDummy(result) || isReturnImmediately()) {
// when a dummy row is returned by a lower layer, set returnImmediately
// on the ScannerContext to force HBase to return a response to the client
dst.returnImmediately();
}
// update metrics
if (isTrackingMetrics() && dst.isTrackingMetrics()) {
// getMetricsMap call resets the metrics internally
for (Map.Entry<String, Long> entry : getMetrics().getMetricsMap().entrySet()) {
dst.metrics.addToCounter(entry.getKey(), entry.getValue());
}
}
// update progress
dst.setProgress(getBatchProgress(), getDataSizeProgress(), getHeapSizeProgress());
}

public static boolean isTimedOut(ScannerContext context, long pageSizeMs) {
if (context == null || !(context instanceof PhoenixScannerContext)) {
return false;
}
PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) context;
return EnvironmentEdgeManager.currentTimeMillis() - phoenixScannerContext.startTime
> pageSizeMs;
}

/**
* Set returnImmediately on the ScannerContext to true, it will have the same behavior as reaching
* the time limit. Use this to make RSRpcService.scan return immediately.
*/
public static void setReturnImmediately(ScannerContext context) {
if (context == null || !(context instanceof PhoenixScannerContext)) {
return;
}
PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) context;
phoenixScannerContext.returnImmediately();
}

public static boolean isReturnImmediately(ScannerContext context) {
if (context == null || !(context instanceof PhoenixScannerContext)) {
return false;
}
PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) context;
return phoenixScannerContext.isReturnImmediately();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanOptions;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
Expand Down Expand Up @@ -191,6 +191,8 @@ private class RegionScannerHolder extends DelegateRegionScanner {
private final Scan scan;
private final ObserverContext<RegionCoprocessorEnvironment> c;
private boolean wasOverriden;
// tracks the current phoenix scanner context corresponding to the hbase scanner context
private PhoenixScannerContext phoenixScannerContext;

public RegionScannerHolder(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
final RegionScanner scanner) {
Expand Down Expand Up @@ -248,10 +250,7 @@ public void close() throws IOException {

@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
overrideDelegate();
boolean res = super.next(result, scannerContext);
ScannerContextUtil.incrementSizeProgress(scannerContext, result);
return res;
return nextInternal(result, scannerContext, false);
}

@Override
Expand All @@ -262,9 +261,30 @@ public boolean next(List<Cell> result) throws IOException {

@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
return nextInternal(result, scannerContext, true);
}

private boolean nextInternal(List<Cell> result, ScannerContext scannerContext, boolean isRaw)
throws IOException {
overrideDelegate();
boolean res = super.nextRaw(result, scannerContext);
ScannerContextUtil.incrementSizeProgress(scannerContext, result);
if (scannerContext instanceof PhoenixScannerContext) {
// This is an optimization to avoid creating multiple phoenix scanner context objects for
// the same scan rpc request when multiple RegionScannerHolder objects are stacked which
// happens if multiple coprocs (not scanners) are processing the scan like
// UngroupedAggregateRegionObserver and GlobalIndexChecker
phoenixScannerContext = (PhoenixScannerContext) scannerContext;
} else if (PhoenixScannerContext.isNewScanRpcRequest(scannerContext)) {
// An open scanner can process multiple scan rpcs during its lifetime.
// We need to create a new phoenix scanner context for every new scan rpc request.
phoenixScannerContext = new PhoenixScannerContext(scannerContext);
}
boolean res = isRaw
? super.nextRaw(result, phoenixScannerContext)
: super.next(result, phoenixScannerContext);
if (!(scannerContext instanceof PhoenixScannerContext)) {
// only update the top level hbase scanner context
phoenixScannerContext.updateHBaseScannerContext(scannerContext, result);
}
return res;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.phoenix.coprocessor;

import static org.apache.phoenix.util.ScanUtil.isDummy;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -27,7 +25,6 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;

public class DelegateRegionScanner implements RegionScanner {

Expand Down Expand Up @@ -103,17 +100,7 @@ public RegionScanner getNewRegionScanner(Scan scan) throws IOException {
private boolean next(List<Cell> result, boolean raw, ScannerContext scannerContext)
throws IOException {
if (scannerContext != null) {
ScannerContext noLimitContext = ScannerContextUtil.copyNoLimitScanner(scannerContext);
boolean hasMore =
raw ? delegate.nextRaw(result, noLimitContext) : delegate.next(result, noLimitContext);
if (isDummy(result) || ScannerContextUtil.checkTimeLimit(noLimitContext)) {
// when a dummy row is returned by a lower layer or if the result is valid but the lower
// layer signals us to return immediately, we need to set returnImmediately
// on the ScannerContext to force HBase to return a response to the client
ScannerContextUtil.setReturnImmediately(scannerContext);
}
ScannerContextUtil.updateMetrics(noLimitContext, scannerContext);
return hasMore;
return raw ? delegate.nextRaw(result, scannerContext) : delegate.next(result, scannerContext);
}
return raw ? delegate.nextRaw(result) : delegate.next(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
Expand Down Expand Up @@ -81,7 +82,6 @@
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
Expand Down Expand Up @@ -605,8 +605,6 @@ public boolean next(List<Cell> resultsToReturn, ScannerContext scannerContext)
private boolean nextInternal(List<Cell> resultsToReturn, ScannerContext scannerContext)
throws IOException {
boolean hasMore;
long startTime = EnvironmentEdgeManager.currentTimeMillis();
long now;
Tuple result =
useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
boolean acquiredLock = false;
Expand Down Expand Up @@ -643,8 +641,11 @@ private boolean nextInternal(List<Cell> resultsToReturn, ScannerContext scannerC
// Aggregate values here
aggregators.aggregate(rowAggregators, result);
}
now = EnvironmentEdgeManager.currentTimeMillis();
if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeMs) {
if (
hasMore && groupByCache.size() < limit
&& (PhoenixScannerContext.isReturnImmediately(scannerContext)
|| PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs))
) {
return getDummyResult(resultsToReturn);
}
} while (hasMore && groupByCache.size() < limit);
Expand Down Expand Up @@ -784,8 +785,7 @@ public boolean next(List<Cell> results, ScannerContext scannerContext) throws IO
boolean hasMore;
boolean atLimit;
boolean aggBoundary = false;
long startTime = EnvironmentEdgeManager.currentTimeMillis();
long now;
boolean pageTimeout = false;
Tuple result =
useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
ImmutableBytesPtr key = null;
Expand Down Expand Up @@ -835,8 +835,14 @@ public boolean next(List<Cell> results, ScannerContext scannerContext) throws IO
atLimit = rowCount + countOffset >= limit;
// Do rowCount + 1 b/c we don't have to wait for a complete
// row in the case of a DISTINCT with a LIMIT
now = EnvironmentEdgeManager.currentTimeMillis();
} while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeMs);
if (
PhoenixScannerContext.isReturnImmediately(scannerContext)
|| PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
) {
pageTimeout = true;
break;
}
} while (hasMore && !aggBoundary && !atLimit && !pageTimeout);
}
} catch (Exception e) {
LOGGER.error("Ordered group-by scanner next encountered error for region {}",
Expand All @@ -850,7 +856,7 @@ public boolean next(List<Cell> results, ScannerContext scannerContext) throws IO
if (acquiredLock) region.closeRegionOperation();
}
try {
if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeMs) {
if (hasMore && !aggBoundary && !atLimit && pageTimeout) {
updateDummyWithPrevRowKey(results, initStartRowKey, includeInitStartRowKey, scan);
return true;
}
Expand Down
Loading