diff --git a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java new file mode 100644 index 00000000000..49d10685fc1 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.phoenix.util.ScanUtil.isDummy; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ScannerContext has all methods package visible. To properly update the context progress for our + * scanners we need this helper + */ +public class PhoenixScannerContext extends ScannerContext { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixScannerContext.class); + + // tracks the start time of the rpc on the server for server paging + private final long startTime; + + /** + * The scanner remains open on the server during the course of multiple scan rpc requests. We need + * a way to determine during the next() call if it is a new scan rpc request on the same scanner. + * This is needed so that we can reset the start time for server paging. Every scan rpc request + * creates a new ScannerContext which has the lastPeekedCell set to null in the beginning. + * Subsequent next() calls will set this field in the ScannerContext. + */ + public static boolean isNewScanRpcRequest(ScannerContext scannerContext) { + return scannerContext != null && scannerContext.getLastPeekedCell() == null; + } + + public PhoenixScannerContext(ScannerContext hbaseContext) { + // set limits to null to create no limit context + super(Objects.requireNonNull(hbaseContext).keepProgress, null, + Objects.requireNonNull(hbaseContext).isTrackingMetrics()); + startTime = EnvironmentEdgeManager.currentTimeMillis(); + } + + public PhoenixScannerContext(boolean trackMetrics) { + super(false, null, trackMetrics); + startTime = EnvironmentEdgeManager.currentTimeMillis(); + } + + public long getStartTime() { + return startTime; + } + + public void incrementSizeProgress(List cells) { + for (Cell cell : cells) { + super.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), cell.heapSize()); + } + } + + /** + * returnImmediately is a private field in ScannerContext and there is no getter API on it But the + * checkTimeLimit API on the ScannerContext will return true if returnImmediately is set + */ + public boolean isReturnImmediately() { + return checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS); + } + + /** + * Update the scanner context created by RSRpcServices so that it can act accordingly + * @param dst hbase scanner context created on every new scan rpc request + * @param result list of cells to be returned to the client as scan rpc response + */ + public void updateHBaseScannerContext(ScannerContext dst, List result) { + if (dst == null) { + return; + } + // update last peeked cell + dst.setLastPeekedCell(getLastPeekedCell()); + // update return immediately + if (isDummy(result) || isReturnImmediately()) { + // when a dummy row is returned by a lower layer, set returnImmediately + // on the ScannerContext to force HBase to return a response to the client + dst.returnImmediately(); + } + // update metrics + if (isTrackingMetrics() && dst.isTrackingMetrics()) { + // getMetricsMap call resets the metrics internally + for (Map.Entry entry : getMetrics().getMetricsMap().entrySet()) { + dst.metrics.addToCounter(entry.getKey(), entry.getValue()); + } + } + // update progress + dst.setProgress(getBatchProgress(), getDataSizeProgress(), getHeapSizeProgress()); + } + + public static boolean isTimedOut(ScannerContext context, long pageSizeMs) { + if (context == null || !(context instanceof PhoenixScannerContext)) { + return false; + } + PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) context; + return EnvironmentEdgeManager.currentTimeMillis() - phoenixScannerContext.startTime + > pageSizeMs; + } + + /** + * Set returnImmediately on the ScannerContext to true, it will have the same behavior as reaching + * the time limit. Use this to make RSRpcService.scan return immediately. + */ + public static void setReturnImmediately(ScannerContext context) { + if (context == null || !(context instanceof PhoenixScannerContext)) { + return; + } + PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) context; + phoenixScannerContext.returnImmediately(); + } + + public static boolean isReturnImmediately(ScannerContext context) { + if (context == null || !(context instanceof PhoenixScannerContext)) { + return false; + } + PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) context; + return phoenixScannerContext.isReturnImmediately(); + } +} diff --git a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java deleted file mode 100644 index 23bf60cde00..00000000000 --- a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.List; -import java.util.Map; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.PrivateCellUtil; - -/** - * ScannerContext has all methods package visible. To properly update the context progress for our - * scanners we need this helper - */ -public class ScannerContextUtil { - public static void incrementSizeProgress(ScannerContext sc, List cells) { - for (Cell cell : cells) { - sc.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), cell.heapSize()); - } - } - - public static void updateMetrics(ScannerContext src, ScannerContext dst) { - if (src != null && dst != null && src.isTrackingMetrics() && dst.isTrackingMetrics()) { - for (Map.Entry entry : src.getMetrics().getMetricsMap().entrySet()) { - dst.metrics.addToCounter(entry.getKey(), entry.getValue()); - } - } - } - - public static ScannerContext copyNoLimitScanner(ScannerContext sc) { - return new ScannerContext(sc.keepProgress, null, sc.isTrackingMetrics()); - } - - public static void updateTimeProgress(ScannerContext sc) { - sc.updateTimeProgress(); - } - - /** - * Set returnImmediately on the ScannerContext to true, it will have the same behavior as reaching - * the time limit. Use this to make RSRpcService.scan return immediately. - */ - public static void setReturnImmediately(ScannerContext sc) { - sc.returnImmediately(); - } - - /** - * returnImmediately is a private field in ScannerContext and there is no getter API on it But the - * checkTimeLimit API on the ScannerContext will return true if returnImmediately is set - */ - public static boolean checkTimeLimit(ScannerContext sc) { - return sc.checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS); - } -} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index b0a134151fe..723eaee0391 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -37,12 +37,12 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanOptions; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; -import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -191,6 +191,8 @@ private class RegionScannerHolder extends DelegateRegionScanner { private final Scan scan; private final ObserverContext c; private boolean wasOverriden; + // tracks the current phoenix scanner context corresponding to the hbase scanner context + private PhoenixScannerContext phoenixScannerContext; public RegionScannerHolder(ObserverContext c, Scan scan, final RegionScanner scanner) { @@ -248,10 +250,7 @@ public void close() throws IOException { @Override public boolean next(List result, ScannerContext scannerContext) throws IOException { - overrideDelegate(); - boolean res = super.next(result, scannerContext); - ScannerContextUtil.incrementSizeProgress(scannerContext, result); - return res; + return nextInternal(result, scannerContext, false); } @Override @@ -262,9 +261,30 @@ public boolean next(List result) throws IOException { @Override public boolean nextRaw(List result, ScannerContext scannerContext) throws IOException { + return nextInternal(result, scannerContext, true); + } + + private boolean nextInternal(List result, ScannerContext scannerContext, boolean isRaw) + throws IOException { overrideDelegate(); - boolean res = super.nextRaw(result, scannerContext); - ScannerContextUtil.incrementSizeProgress(scannerContext, result); + if (scannerContext instanceof PhoenixScannerContext) { + // This is an optimization to avoid creating multiple phoenix scanner context objects for + // the same scan rpc request when multiple RegionScannerHolder objects are stacked which + // happens if multiple coprocs (not scanners) are processing the scan like + // UngroupedAggregateRegionObserver and GlobalIndexChecker + phoenixScannerContext = (PhoenixScannerContext) scannerContext; + } else if (PhoenixScannerContext.isNewScanRpcRequest(scannerContext)) { + // An open scanner can process multiple scan rpcs during its lifetime. + // We need to create a new phoenix scanner context for every new scan rpc request. + phoenixScannerContext = new PhoenixScannerContext(scannerContext); + } + boolean res = isRaw + ? super.nextRaw(result, phoenixScannerContext) + : super.next(result, phoenixScannerContext); + if (!(scannerContext instanceof PhoenixScannerContext)) { + // only update the top level hbase scanner context + phoenixScannerContext.updateHBaseScannerContext(scannerContext, result); + } return res; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java index ad64e3ae58d..4563f102fd7 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -17,8 +17,6 @@ */ package org.apache.phoenix.coprocessor; -import static org.apache.phoenix.util.ScanUtil.isDummy; - import java.io.IOException; import java.util.List; import org.apache.hadoop.hbase.Cell; @@ -27,7 +25,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; -import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; public class DelegateRegionScanner implements RegionScanner { @@ -103,17 +100,7 @@ public RegionScanner getNewRegionScanner(Scan scan) throws IOException { private boolean next(List result, boolean raw, ScannerContext scannerContext) throws IOException { if (scannerContext != null) { - ScannerContext noLimitContext = ScannerContextUtil.copyNoLimitScanner(scannerContext); - boolean hasMore = - raw ? delegate.nextRaw(result, noLimitContext) : delegate.next(result, noLimitContext); - if (isDummy(result) || ScannerContextUtil.checkTimeLimit(noLimitContext)) { - // when a dummy row is returned by a lower layer or if the result is valid but the lower - // layer signals us to return immediately, we need to set returnImmediately - // on the ScannerContext to force HBase to return a response to the client - ScannerContextUtil.setReturnImmediately(scannerContext); - } - ScannerContextUtil.updateMetrics(noLimitContext, scannerContext); - return hasMore; + return raw ? delegate.nextRaw(result, scannerContext) : delegate.next(result, scannerContext); } return raw ? delegate.nextRaw(result) : delegate.next(result); } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 39d9527a83f..b7163c0a3f8 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; @@ -81,7 +82,6 @@ import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.EncodedColumnsUtil; -import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixKeyValueUtil; @@ -605,8 +605,6 @@ public boolean next(List resultsToReturn, ScannerContext scannerContext) private boolean nextInternal(List resultsToReturn, ScannerContext scannerContext) throws IOException { boolean hasMore; - long startTime = EnvironmentEdgeManager.currentTimeMillis(); - long now; Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); boolean acquiredLock = false; @@ -643,8 +641,11 @@ private boolean nextInternal(List resultsToReturn, ScannerContext scannerC // Aggregate values here aggregators.aggregate(rowAggregators, result); } - now = EnvironmentEdgeManager.currentTimeMillis(); - if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeMs) { + if ( + hasMore && groupByCache.size() < limit + && (PhoenixScannerContext.isReturnImmediately(scannerContext) + || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) + ) { return getDummyResult(resultsToReturn); } } while (hasMore && groupByCache.size() < limit); @@ -784,8 +785,7 @@ public boolean next(List results, ScannerContext scannerContext) throws IO boolean hasMore; boolean atLimit; boolean aggBoundary = false; - long startTime = EnvironmentEdgeManager.currentTimeMillis(); - long now; + boolean pageTimeout = false; Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); ImmutableBytesPtr key = null; @@ -835,8 +835,14 @@ public boolean next(List results, ScannerContext scannerContext) throws IO atLimit = rowCount + countOffset >= limit; // Do rowCount + 1 b/c we don't have to wait for a complete // row in the case of a DISTINCT with a LIMIT - now = EnvironmentEdgeManager.currentTimeMillis(); - } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeMs); + if ( + PhoenixScannerContext.isReturnImmediately(scannerContext) + || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs) + ) { + pageTimeout = true; + break; + } + } while (hasMore && !aggBoundary && !atLimit && !pageTimeout); } } catch (Exception e) { LOGGER.error("Ordered group-by scanner next encountered error for region {}", @@ -850,7 +856,7 @@ public boolean next(List results, ScannerContext scannerContext) throws IO if (acquiredLock) region.closeRegionOperation(); } try { - if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeMs) { + if (hasMore && !aggBoundary && !atLimit && pageTimeout) { updateDummyWithPrevRowKey(results, initStartRowKey, includeInitStartRowKey, scan); return true; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index e56d9028f45..ab62a7ddf7b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; @@ -59,7 +60,6 @@ import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ClientUtil; -import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.TupleUtil; public class HashJoinRegionScanner implements RegionScanner { @@ -306,7 +306,6 @@ public boolean nextRaw(List result, ScannerContext scannerContext) throws private boolean next(List result, boolean raw, ScannerContext scannerContext) throws IOException { try { - long startTime = EnvironmentEdgeManager.currentTimeMillis(); while (shouldAdvance()) { if (scannerContext != null) { hasMore = @@ -322,7 +321,10 @@ private boolean next(List result, boolean raw, ScannerContext scannerConte } Cell cell = result.get(0); processResults(result, false); - if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) { + if ( + PhoenixScannerContext.isReturnImmediately(scannerContext) + || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs) + ) { byte[] rowKey = CellUtil.cloneRow(cell); result.clear(); getDummyResult(rowKey, result); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java index e3f1d6afeab..7cde2a393f4 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; @@ -34,7 +35,6 @@ import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ScanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,17 +60,16 @@ public class PagingRegionScanner extends BaseRegionScanner { private PagingFilter pagingFilter; private MultiKeyPointLookup multiKeyPointLookup = null; private boolean initialized = false; + private long pageSizeMs; private class MultiKeyPointLookup { private SkipScanFilter skipScanFilter; private List pointLookupRanges = null; private int lookupPosition = 0; private byte[] lookupKeyPrefix = null; - private long pageSizeMs; private MultiKeyPointLookup(SkipScanFilter skipScanFilter) throws IOException { this.skipScanFilter = skipScanFilter; - pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan); pointLookupRanges = skipScanFilter.getPointLookupKeyRanges(); lookupPosition = findLookupPosition(scan.getStartRow()); if (skipScanFilter.getOffset() > 0) { @@ -133,7 +132,6 @@ private boolean hasMore() { private boolean next(List results, boolean raw, RegionScanner scanner, ScannerContext scannerContext) throws IOException { try { - long startTime = EnvironmentEdgeManager.currentTimeMillis(); while (true) { boolean hasMore; if (scannerContext != null) { @@ -152,15 +150,19 @@ private boolean next(List results, boolean raw, RegionScanner scanner, "Each scan is supposed to return only one row, scan " + scan + ", region " + region); } if (!results.isEmpty()) { + if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) { + // we got a valid result but scanner timed out so return immediately + PhoenixScannerContext.setReturnImmediately(scannerContext); + } return hasMore(); } // The scanner returned an empty result. This means that one of the rows - // has been deleted. + // has been deleted or the row key is not present in the table. if (!hasMore()) { return false; } - if (EnvironmentEdgeManager.currentTimeMillis() - startTime > pageSizeMs) { + if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) { byte[] rowKey = pointLookupRanges.get(lookupPosition - 1).getLowerRange(); ScanUtil.getDummyResult(rowKey, results); return true; @@ -187,6 +189,7 @@ public PagingRegionScanner(Region region, RegionScanner scanner, Scan scan) { this.region = region; this.scan = scan; pagingFilter = ScanUtil.getPhoenixPagingFilter(scan); + pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan); } @VisibleForTesting @@ -258,10 +261,11 @@ private boolean next(List results, boolean raw, ScannerContext scannerCont } } + boolean hasMore; if (multiKeyPointLookup != null) { return multiKeyPointLookup.next(results, raw, delegate, scannerContext); } - boolean hasMore; + if (scannerContext != null) { hasMore = raw ? delegate.nextRaw(results, scannerContext) : delegate.next(results, scannerContext); @@ -277,16 +281,23 @@ private boolean next(List results, boolean raw, ScannerContext scannerCont if (pagingFilter.isStopped()) { if (results.isEmpty()) { byte[] rowKey = pagingFilter.getCurrentRowKeyToBeExcluded(); - LOGGER.info("Page filter stopped, generating dummy key {} ", - Bytes.toStringBinary(rowKey)); + LOGGER.info("{} Paging filter stopped, generating dummy key {} ", + getRegionInfo().getRegionNameAsString(), Bytes.toStringBinary(rowKey)); ScanUtil.getDummyResult(rowKey, results); + } else { + // we got a valid result but page filter stopped set return immediately + PhoenixScannerContext.setReturnImmediately(scannerContext); } return true; } return false; } else { // We got a row from the HBase scanner within the configured time (i.e., - // the page size). We need to start a new page on the next next() call. + // the page size). + if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) { + // we got a valid result but scanner timed out so return immediately + PhoenixScannerContext.setReturnImmediately(scannerContext); + } return true; } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java index 8a4fc502996..6ed234db6d7 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.phoenix.schema.CompiledTTLExpression; @@ -212,7 +213,8 @@ private boolean isExpired(List result) throws IOException { return false; } - private boolean skipExpired(List result, boolean raw, boolean hasMore) throws IOException { + private boolean skipExpired(List result, boolean raw, boolean hasMore, + ScannerContext scannerContext) throws IOException { boolean expired = isExpired(result); if (!expired) { return hasMore; @@ -221,23 +223,28 @@ private boolean skipExpired(List result, boolean raw, boolean hasMore) thr if (!hasMore) { return false; } - long startTime = EnvironmentEdgeManager.currentTimeMillis(); do { - hasMore = raw ? delegate.nextRaw(result) : delegate.next(result); + hasMore = + raw ? delegate.nextRaw(result, scannerContext) : delegate.next(result, scannerContext); if (result.isEmpty() || ScanUtil.isDummy(result)) { - return hasMore; + break; } + // non dummy result check if it is expired if (!isExpired(result)) { - return hasMore; + break; } + // result is expired Cell cell = result.get(0); result.clear(); - if (EnvironmentEdgeManager.currentTimeMillis() - startTime > pageSizeMs) { + if ( + PhoenixScannerContext.isReturnImmediately(scannerContext) + || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs) + ) { ScanUtil.getDummyResult(CellUtil.cloneRow(cell), result); - return hasMore; + break; } } while (hasMore); - return false; + return hasMore; } private boolean next(List result, boolean raw, ScannerContext scannerContext) @@ -267,7 +274,7 @@ private boolean next(List result, boolean raw, ScannerContext scannerConte if (result.isEmpty() || ScanUtil.isDummy(result)) { return hasMore; } - hasMore = skipExpired(result, raw, hasMore); + hasMore = skipExpired(result, raw, hasMore, scannerContext); if (result.isEmpty() || ScanUtil.isDummy(result)) { return hasMore; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java index 6e1b853bac1..1b126a6591c 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; @@ -210,9 +211,8 @@ protected Scan prepareDataTableScan(Collection dataRowKeys, } } - protected boolean scanIndexTableRows(List result, final long startTime, - final byte[] actualStartKey, final int offset, ScannerContext scannerContext) - throws IOException { + protected boolean scanIndexTableRows(List result, final byte[] actualStartKey, + final int offset, ScannerContext scannerContext) throws IOException { boolean hasMore = false; if (actualStartKey != null) { do { @@ -231,7 +231,10 @@ protected boolean scanIndexTableRows(List result, final long startTime, firstCell.getRowLength(), actualStartKey, 0, actualStartKey.length) < 0 ) { result.clear(); - if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) { + if ( + PhoenixScannerContext.isReturnImmediately(scannerContext) + || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs) + ) { byte[] rowKey = CellUtil.cloneRow(firstCell); ScanUtil.getDummyResult(rowKey, result); return true; @@ -270,7 +273,10 @@ protected boolean scanIndexTableRows(List result, final long startTime, viewConstants)); indexRows.add(row); indexRowCount++; - if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) { + if ( + hasMore && (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs) + || PhoenixScannerContext.isReturnImmediately(scannerContext)) + ) { getDummyResult(lastIndexRowKey, result); // We do not need to change the state, State.SCANNING_INDEX // since we will continue scanning the index table after @@ -283,9 +289,9 @@ protected boolean scanIndexTableRows(List result, final long startTime, return hasMore; } - protected boolean scanIndexTableRows(List result, final long startTime, - ScannerContext scannerContext) throws IOException { - return scanIndexTableRows(result, startTime, null, 0, scannerContext); + protected boolean scanIndexTableRows(List result, ScannerContext scannerContext) + throws IOException { + return scanIndexTableRows(result, null, 0, scannerContext); } private boolean verifyIndexRowAndRepairIfNecessary(Result dataRow, byte[] indexRowKey, @@ -393,7 +399,9 @@ public boolean next(List result) throws IOException { */ @Override public boolean next(List result, ScannerContext scannerContext) throws IOException { - long startTime = EnvironmentEdgeManager.currentTimeMillis(); + long startTime = (scannerContext != null) + ? ((PhoenixScannerContext) scannerContext).getStartTime() + : EnvironmentEdgeManager.currentTimeMillis(); boolean hasMore; region.startRegionOperation(); try { @@ -409,7 +417,7 @@ public boolean next(List result, ScannerContext scannerContext) throws IOE state = State.SCANNING_INDEX; } if (state == State.SCANNING_INDEX) { - hasMore = scanIndexTableRows(result, startTime, scannerContext); + hasMore = scanIndexTableRows(result, scannerContext); if (isDummy(result)) { updateDummyWithPrevRowKey(result, initStartRowKey, includeInitStartRowKey, scan); return hasMore; diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java index 34d9fb67b9b..6b5d124ce04 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java @@ -105,9 +105,9 @@ protected void scanDataTableRows(long startTime) throws IOException { } @Override - protected boolean scanIndexTableRows(List result, final long startTime, - ScannerContext scannerContext) throws IOException { - return scanIndexTableRows(result, startTime, actualStartKey, offset, scannerContext); + protected boolean scanIndexTableRows(List result, ScannerContext scannerContext) + throws IOException { + return scanIndexTableRows(result, actualStartKey, offset, scannerContext); } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 7fdfdeeb609..d85f8005405 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; -import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -766,13 +765,6 @@ private RegionScanner rebuildIndices(RegionScanner innerScanner, final Region re private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats, final Region region, final Scan scan, Configuration config) throws IOException { - ScannerContext groupScannerContext; - if (scan.isScanMetricsEnabled()) { - groupScannerContext = - ScannerContext.newBuilder().setTrackMetrics(scan.isScanMetricsEnabled()).build(); - } else { - groupScannerContext = null; - } StatsCollectionCallable callable = new StatsCollectionCallable(stats, region, innerScanner, config, scan); byte[] asyncBytes = @@ -826,9 +818,6 @@ public void close() throws IOException { @Override public boolean next(List results, ScannerContext scannerContext) throws IOException { - if (groupScannerContext != null && scannerContext != null) { - ScannerContextUtil.updateMetrics(groupScannerContext, scannerContext); - } return next(results); } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java index 20ace9a02e0..1a4794e86e6 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java @@ -61,10 +61,10 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; -import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.GlobalCache; @@ -110,7 +110,6 @@ import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ClientUtil; import org.apache.phoenix.util.EncodedColumnsUtil; -import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; @@ -604,8 +603,6 @@ public boolean next(List resultsToReturn) throws IOException { public boolean next(List resultsToReturn, ScannerContext scannerContext) throws IOException { boolean hasMore; - boolean returnImmediately = false; - long startTime = EnvironmentEdgeManager.currentTimeMillis(); Configuration conf = env.getConfiguration(); final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan)); try (MemoryManager.MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) { @@ -648,10 +645,10 @@ public boolean next(List resultsToReturn, ScannerContext scannerContext) resultsToReturn.addAll(results); return true; } - // we got a dummy result from the lower scanner but hasAny is true which means that - // we have a valid result which can be returned to the client instead of a dummy. - // We need to signal the RPC handler to return. - returnImmediately = true; + // we got a page timeout from the lower scanner but hasAny is true which means that + // we have a valid result which we can return to the client instead of a dummy but we + // still need to finish the rpc and release the handler + PhoenixScannerContext.setReturnImmediately(scannerContext); break; } if (!results.isEmpty()) { @@ -705,13 +702,16 @@ public boolean next(List resultsToReturn, ScannerContext scannerContext) aggregators.aggregate(rowAggregators, result); hasAny = true; } - } while ( - hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeMs - ); - if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) { - // we hit a page scanner timeout, signal the RPC handler to return. - returnImmediately = true; - } + if ( + PhoenixScannerContext.isReturnImmediately(scannerContext) + || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs) + ) { + // we could have a valid result which we can return to the client instead of a dummy, + // but we still need to finish the rpc and release the handler + PhoenixScannerContext.setReturnImmediately(scannerContext); + break; + } + } while (hasMore); if (!mutations.isEmpty()) { if (!isSingleRowDelete) { annotateAndCommit(mutations); @@ -763,10 +763,6 @@ public boolean next(List resultsToReturn, ScannerContext scannerContext) } resultsToReturn.add(keyValue); } - if (returnImmediately && scannerContext != null) { - // signal the RPC handler to return - ScannerContextUtil.setReturnImmediately(scannerContext); - } return hasMore; } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index 8e82f170362..97f23c498f6 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; @@ -256,7 +257,6 @@ public boolean next(List result, boolean raw, ScannerContext scannerContex init(); initialized = true; } - long startTime = EnvironmentEdgeManager.currentTimeMillis(); do { if (raw) { hasMore = (scannerContext == null) @@ -277,7 +277,10 @@ public boolean next(List result, boolean raw, ScannerContext scannerContex if (verifyRowAndRepairIfNecessary(result)) { break; } - if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) { + if ( + hasMore && (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs) + || PhoenixScannerContext.isReturnImmediately(scannerContext)) + ) { byte[] rowKey = CellUtil.cloneRow(cell); result.clear(); getDummyResult(rowKey, result); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java index cf7832b794d..65affd6e792 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java @@ -38,10 +38,10 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; -import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; @@ -182,7 +182,11 @@ public RegionScanner getRegionScanner(final Scan scan, final RegionScanner s) th ScanUtil.isIncompatibleClientForServerReturnValidRowKey(scan); RegionScannerResultIterator iterator = new RegionScannerResultIterator(scan, innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme); - ScannerContext sc = iterator.getRegionScannerContext(); + // we need to create our own scanner context because we are still opening the scanner and + // and don't have a rpc scanner context which is created in the next() call. This scanner + // context is used when we are skipping the rows until we hit the offset + PhoenixScannerContext sc = new PhoenixScannerContext(scan.isScanMetricsEnabled()); + iterator.setRegionScannerContext(sc); innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator(iterator, scanOffset, getPageSizeMsForRegionScanner(scan), isIncompatibleClient), @@ -280,10 +284,15 @@ static OrderedResultIteratorWithScannerContext deserializeFromScan(Scan scan, Re EncodedColumnsUtil.getQualifierEncodingScheme(scan); RegionScannerResultIterator inner = new RegionScannerResultIterator(scan, s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme); + // we need to create our own scanner context because we are still opening the scanner and + // and don't have a rpc scanner context which is created in the next() call. This scanner + // context is used when we are iterating over the top n rows before the first next() call + PhoenixScannerContext sc = new PhoenixScannerContext(scan.isScanMetricsEnabled()); + inner.setRegionScannerContext(sc); OrderedResultIterator iterator = new OrderedResultIterator(inner, orderByExpressions, spoolingEnabled, thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize, getPageSizeMsForRegionScanner(scan), scan, s.getRegionInfo()); - return new OrderedResultIteratorWithScannerContext(inner.getRegionScannerContext(), iterator); + return new OrderedResultIteratorWithScannerContext(sc, iterator); } catch (IOException e) { throw new RuntimeException(e); } finally { @@ -296,15 +305,15 @@ static OrderedResultIteratorWithScannerContext deserializeFromScan(Scan scan, Re } private static class OrderedResultIteratorWithScannerContext { - private ScannerContext scannerContext; + private PhoenixScannerContext scannerContext; private OrderedResultIterator iterator; - OrderedResultIteratorWithScannerContext(ScannerContext sc, OrderedResultIterator ori) { + OrderedResultIteratorWithScannerContext(PhoenixScannerContext sc, OrderedResultIterator ori) { this.scannerContext = sc; this.iterator = ori; } - public ScannerContext getScannerContext() { + public PhoenixScannerContext getScannerContext() { return scannerContext; } @@ -363,7 +372,7 @@ private Expression[] deserializeServerParsedPositionalExpressionInfoFromScan(Sca private RegionScanner getOffsetScanner(final RegionScanner s, final OffsetResultIterator iterator, final boolean isLastScan, final boolean incompatibleClient, final Scan scan, - final ScannerContext sc) throws IOException { + final PhoenixScannerContext sc) throws IOException { final Tuple firstTuple; final Region region = getRegion(); region.startRegionOperation(); @@ -436,7 +445,9 @@ private RegionScanner getOffsetScanner(final RegionScanner s, final OffsetResult return new BaseRegionScanner(s) { private Tuple tuple = firstTuple; private byte[] previousResultRowKey; - private ScannerContext regionScannerContext = sc; + // scanner context used when we are opening the scanner and skipping up to offset rows + // We copy this context to the hbase rpc context on the first next call + private PhoenixScannerContext regionScannerContext = sc; @Override public boolean isFilterDone() { @@ -454,6 +465,18 @@ public boolean next(List results, ScannerContext scannerContext) throws IO if (isFilterDone()) { return false; } + if (regionScannerContext != null) { + regionScannerContext.updateHBaseScannerContext(scannerContext, results); + // we no longer need this context + regionScannerContext = null; + if (PhoenixScannerContext.isReturnImmediately(scannerContext)) { + return true; + } + } + RegionScannerResultIterator delegate = + (RegionScannerResultIterator) (iterator.getDelegate()); + // just use the scanner context passed to us from now on + delegate.setRegionScannerContext(scannerContext); Tuple nextTuple = iterator.next(); if (tuple.size() > 0 && !isDummy(tuple)) { for (int i = 0; i < tuple.size(); i++) { @@ -478,10 +501,6 @@ public boolean next(List results, ScannerContext scannerContext) throws IO } } tuple = nextTuple; - if (regionScannerContext != null) { - ScannerContextUtil.updateMetrics(regionScannerContext, scannerContext); - regionScannerContext = null; - } return !isFilterDone(); } catch (Throwable t) { LOGGER.error("Error while iterating Offset scanner.", t); @@ -541,7 +560,7 @@ private static KeyValue getOffsetKvWithLastScannedRowKey(byte[] value, Tuple tup * region) since after this everything is held in memory */ private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, final RegionScanner s, - final OrderedResultIterator iterator, ImmutableBytesPtr tenantId, ScannerContext sc) + final OrderedResultIterator iterator, ImmutableBytesPtr tenantId, PhoenixScannerContext sc) throws Throwable { final Tuple firstTuple; @@ -565,7 +584,9 @@ private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, final Reg } return new BaseRegionScanner(s) { private Tuple tuple = firstTuple; - private ScannerContext regionScannerContext = sc; + // scanner context used when we are opening the scanner and reading the topN rows + // We copy this context to the hbase rpc context on the first next call + private PhoenixScannerContext regionScannerContext = sc; @Override public boolean isFilterDone() { @@ -583,6 +604,14 @@ public boolean next(List results, ScannerContext scannerContext) throws IO if (isFilterDone()) { return false; } + if (regionScannerContext != null) { + regionScannerContext.updateHBaseScannerContext(scannerContext, results); + // we no longer need this context + regionScannerContext = null; + if (PhoenixScannerContext.isReturnImmediately(scannerContext)) { + return true; + } + } if (isDummy(tuple)) { ScanUtil.getDummyResult(CellUtil.cloneRow(tuple.getValue(0)), results); } else { @@ -590,11 +619,11 @@ public boolean next(List results, ScannerContext scannerContext) throws IO results.add(tuple.getValue(i)); } } + RegionScannerResultIterator delegate = + (RegionScannerResultIterator) (iterator.getDelegate()); + // just use the scanner context passed to us from now on + delegate.setRegionScannerContext(scannerContext); tuple = iterator.next(); - if (regionScannerContext != null) { - ScannerContextUtil.updateMetrics(regionScannerContext, scannerContext); - regionScannerContext = null; - } return !isFilterDone(); } catch (Throwable t) { ClientUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java index 9645ed413b0..bda112e7092 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; -import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; @@ -308,11 +307,6 @@ public boolean nextRaw(List result, ScannerContext scannerContext) throws if (extraLimit >= 0 && --extraLimit == 0) { return false; } - // There is a scanattribute set to retrieve the specific array element - if (scannerContext != null) { - ScannerContextUtil.incrementSizeProgress(scannerContext, result); - ScannerContextUtil.updateTimeProgress(scannerContext); - } return next; } catch (Throwable t) { ClientUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java index c0cc6ec84ed..adbca1f859b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java @@ -43,7 +43,7 @@ public class RegionScannerResultIterator extends BaseResultIterator { private final Pair minMaxQualifiers; private final boolean useQualifierAsIndex; private final QualifierEncodingScheme encodingScheme; - private final ScannerContext regionScannerContext; + private ScannerContext scannerContext; public RegionScannerResultIterator(Scan scan, RegionScanner scanner, Pair minMaxQualifiers, QualifierEncodingScheme encodingScheme) { @@ -51,12 +51,6 @@ public RegionScannerResultIterator(Scan scan, RegionScanner scanner, this.useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers); this.minMaxQualifiers = minMaxQualifiers; this.encodingScheme = encodingScheme; - if (scan.isScanMetricsEnabled()) { - regionScannerContext = - ScannerContext.newBuilder().setTrackMetrics(scan.isScanMetricsEnabled()).build(); - } else { - regionScannerContext = null; - } } @Override @@ -74,10 +68,10 @@ public Tuple next() throws SQLException { // since this is an indication of whether or not there are more values after the // ones returned boolean hasMore; - if (regionScannerContext == null) { + if (scannerContext == null) { hasMore = scanner.nextRaw(results); } else { - hasMore = scanner.nextRaw(results, regionScannerContext); + hasMore = scanner.nextRaw(results, scannerContext); } if (!hasMore && results.isEmpty()) { @@ -98,8 +92,8 @@ public Tuple next() throws SQLException { } } - public ScannerContext getRegionScannerContext() { - return regionScannerContext; + public void setRegionScannerContext(ScannerContext scannerContext) { + this.scannerContext = scannerContext; } @Override diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java index 1a431615687..b3565f33760 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java @@ -19,6 +19,7 @@ import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan; import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlanWithLimit; +import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.commitWithException; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER; import static org.apache.phoenix.query.QueryServices.USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; @@ -33,12 +34,17 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.PagingRegionScanner; +import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.jdbc.PhoenixStatement; @@ -60,6 +66,7 @@ @Category(NeedsOwnMiniClusterTest.class) public class ServerPagingIT extends ParallelStatsDisabledIT { + private static final Random RAND = new Random(11); @BeforeClass public static synchronized void doSetup() throws Exception { @@ -262,6 +269,210 @@ public void testOrderByNonAggregation() throws Exception { assertEquals(D2.getTime(), rs.getDate(1).getTime()); assertFalse(rs.next()); assertServerPagingMetric(tablename, rs, true); + Map> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); + long numRows = getMetricValue(metrics, MetricType.COUNT_ROWS_SCANNED); + assertEquals(6, numRows); + long numRpcs = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS); + // 3 regions * (2 rows per region + 1 scanner open with page timeout set to 0 ms) + assertEquals(9, numRpcs); + } + } + } + + @Test + public void testMultiKeyPointLookup() throws Exception { + final String tablename = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + // use a higher timeout value so that we can trigger a page timeout from the scanner + // rather than the page filter + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(5)); + String ddl = String.format("CREATE TABLE %s (id VARCHAR NOT NULL, k1 INTEGER NOT NULL, " + + "k2 INTEGER NOT NULL, k3 INTEGER, v1 VARCHAR CONSTRAINT pk PRIMARY KEY (id, k1, k2)) " + + "\"%s\" = true", tablename, USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute(ddl); + String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)"; + PreparedStatement ps = conn.prepareStatement(dml); + int totalRows = 10000; + for (int i = 0; i < totalRows; ++i) { + ps.setString(1, "id_" + i % 3); + ps.setInt(2, i % 20); + ps.setInt(3, i); + ps.setInt(4, i % 10); + ps.setString(5, "val"); + ps.executeUpdate(); + if (i != 0 && i % 100 == 0) { + conn.commit(); + } + } + conn.commit(); + int rowKeyCount = 1000; + List inList = + Stream.generate(() -> "(?, ?, ?)").limit(rowKeyCount).collect(Collectors.toList()); + String dql = String.format("select id, k1, k2 from %s where (id, k1, k2) IN (%s)", tablename, + String.join(",", inList)); + ps = conn.prepareStatement(dql); + int expectedValidRows = 0; + for (int i = 0; i < rowKeyCount; i++) { + ps.setString(3 * i + 1, "id_" + i % 3); + if (RAND.nextBoolean()) { + ++expectedValidRows; + ps.setInt(3 * i + 2, i % 20); + } else { + // generate a non-existing row key + ps.setInt(3 * i + 2, 78123); + } + ps.setInt(3 * i + 3, i); + } + int actualValidRows = 0; + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + ++actualValidRows; + } + assertEquals(expectedValidRows, actualValidRows); + Map> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); + long numRows = getMetricValue(metrics, MetricType.COUNT_ROWS_SCANNED); + assertEquals(expectedValidRows, numRows); + long numRpcs = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS); + assertTrue(numRpcs > 1); + } + } + } + + @Test + public void testPagingWithTTLMasking() throws Exception { + final String tablename = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + // use a higher timeout value so that we can trigger a page timeout from the scanner + // rather than the page filter + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(10)); + int ttl = 2; // 2 seconds + String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n" + + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n" + + "CONSTRAINT pk PRIMARY KEY (id, k1, k2)) TTL=" + ttl; + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute(ddl); + String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)"; + PreparedStatement ps = conn.prepareStatement(dml); + int totalRows = 10000; + for (int i = 0; i < totalRows; ++i) { + ps.setString(1, "id_" + i % 3); + ps.setInt(2, i % 20); + ps.setInt(3, i); + ps.setInt(4, i % 10); + ps.setString(5, "val"); + ps.executeUpdate(); + if (i != 0 && i % 100 == 0) { + conn.commit(); + } + } + conn.commit(); + // Sleep so that the rows expire + // Can't use EnvironmentEdgeManager because that messes up page timeout calculations + Thread.sleep(ttl * 1000 + 50); + String dql = String.format("SELECT count(*) from %s where id = '%s'", tablename, "id_2"); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertFalse(rs.next()); + Map> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); + long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS); + // multiple scan rpcs will be executed for every page timeout + assertTrue(String.format("Got %d", numRpc), numRpc > 1); + } + // Insert few more rows + int additionalRows = 5; + for (int i = 0; i < additionalRows; ++i) { + ps.setString(1, "id_2"); + ps.setInt(2, i % 20); + ps.setInt(3, i + totalRows); + ps.setInt(4, i % 10); + ps.setString(5, "val"); + ps.executeUpdate(); + } + conn.commit(); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + assertTrue(rs.next()); + assertEquals(additionalRows, rs.getInt(1)); + assertFalse(rs.next()); + Map> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); + long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS); + // multiple scan rpcs will be executed for every page timeout + assertTrue(String.format("Got %d", numRpc), numRpc > 1); + } + } + } + + @Test + public void testPagingWithUnverifiedIndexRows() throws Exception { + final String tablename = generateUniqueName(); + final String indexname = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + // use a higher timeout value so that we can trigger a page timeout from the scanner + // rather than the page filter + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(5)); + String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n" + + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n" + + "CONSTRAINT pk PRIMARY KEY (id, k1, k2))"; + String indexddl = "CREATE INDEX " + indexname + " ON " + tablename + "(k3) include(v1)"; + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute(ddl); + conn.createStatement().execute(indexddl); + String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)"; + PreparedStatement ps = conn.prepareStatement(dml); + int totalRows = 10000; + for (int i = 0; i < totalRows; ++i) { + ps.setString(1, "id_" + i % 3); + ps.setInt(2, i % 20); + ps.setInt(3, i); + ps.setInt(4, i % 10); + ps.setString(5, "val"); + ps.executeUpdate(); + if (i != 0 && i % 100 == 0) { + conn.commit(); + } + } + conn.commit(); + String dql = String.format("SELECT count(*) from %s where k3 = 5", tablename); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class); + String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator()); + assertTrue(explainPlan.contains(indexname)); + assertTrue(rs.next()); + assertEquals(totalRows / 10, rs.getInt(1)); + assertFalse(rs.next()); + Map> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); + long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS); + // multiple scan rpcs will be executed for every page timeout + assertTrue(String.format("Got %d", numRpc), numRpc > 1); + } + // Insert few unverified index rows by failing phase 2 + int additionalRows = 10; + for (int i = 0; i < additionalRows; ++i) { + ps.setString(1, "id_2"); + ps.setInt(2, i % 20); + ps.setInt(3, i + totalRows); + ps.setInt(4, 5); // set k3=5 + ps.setString(5, "val"); + ps.executeUpdate(); + } + IndexRegionObserver.setFailDataTableUpdatesForTesting(true); + try { + commitWithException(conn); + } finally { + IndexRegionObserver.setFailDataTableUpdatesForTesting(false); + } + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class); + String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator()); + assertTrue(explainPlan.contains(indexname)); + assertTrue(rs.next()); + assertEquals(totalRows / 10, rs.getInt(1)); + assertFalse(rs.next()); + Map> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); + long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS); + // multiple scan rpcs will be executed for every page timeout + assertTrue(String.format("Got %d", numRpc), numRpc > 1); } } } @@ -481,24 +692,46 @@ public void testNumberOfRPCsWithPaging() throws SQLException { PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); stmt.execute( "CREATE TABLE " + tableName + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)"); - for (int i = 1; i <= 200; i++) { + final int rowCount = 200; + for (int i = 1; i <= rowCount; i++) { String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName, i, i); stmt.execute(sql); } conn.commit(); // delete every alternate row - for (int i = 1; i <= 200; i = i + 2) { + for (int i = 1; i <= rowCount; i = i + 2) { stmt.execute("DELETE FROM " + tableName + " WHERE A = " + i); conn.commit(); } + // full table scan ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName); while (rs.next()) { } Map> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS); - Assert.assertEquals(101, numRpc); + // with 0ms page timeout every row whether it is valid or a delete marker will generate a page + // timeout so the number of rpcs will be row count + 1 + assertEquals(rowCount + 1, numRpc); + + // aggregate query + rs = stmt.executeQuery("SELECT count(*) FROM " + tableName); + assertTrue(rs.next()); + assertEquals(rowCount / 2, rs.getInt(1)); + assertFalse(rs.next()); + metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); + numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS); + assertEquals(rowCount + 1, numRpc); + + // aggregate query with a filter + rs = stmt.executeQuery("SELECT count(*) FROM " + tableName + " where Z % 4 = 0"); + assertTrue(rs.next()); + assertEquals(rowCount / 4, rs.getInt(1)); + assertFalse(rs.next()); + metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); + numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS); + assertEquals(rowCount + 1, numRpc); } @Test diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java index c13e9c40914..ee9acb4aefb 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java @@ -1661,7 +1661,7 @@ public void testOnDuplicateKeyWithIndex() throws Exception { } } - static private void commitWithException(Connection conn) { + public static void commitWithException(Connection conn) { try { conn.commit(); IndexRegionObserver.setFailPreIndexUpdatesForTesting(false); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java index d8ca7e0c79f..9758dd51f91 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java @@ -19,6 +19,7 @@ import static org.apache.phoenix.query.QueryServices.USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.sql.Connection; import java.sql.DriverManager; @@ -323,6 +324,37 @@ public void testUnionAll() throws Exception { assertEquals(142, count4); } + @Test + public void testLimitOffsetWithoutSplit() throws Exception { + final String tablename = generateUniqueName(); + final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", + "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" }; + String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n" + + "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + + "C2.v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))"; + try (Connection conn = DriverManager.getConnection(getUrl())) { + createTestTable(getUrl(), ddl); + for (int i = 0; i < 26; i++) { + conn.createStatement().execute("UPSERT INTO " + tablename + " values('" + STRINGS[i] + "'," + + i + "," + (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + "')"); + } + conn.commit(); + int limit = 12; + int offset = 5; + ResultSet rs; + rs = conn.createStatement().executeQuery( + "SELECT t_id from " + tablename + " order by t_id limit " + limit + " offset " + offset); + int i = 0; + while (i < limit) { + assertTrue(rs.next()); + assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i], + rs.getString(1)); + i++; + } + assertEquals(limit + offset, getRowsScanned(rs)); + } + } + private long countRowsScannedFromSql(Statement stmt, String sql) throws SQLException { ResultSet rs = stmt.executeQuery(sql); while (rs.next()) {