-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-19645. [ABFS][ReadAheadV2] Improve Metrics for Read Calls to identify type of read done. #7837
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
|
:::: AGGREGATED TEST RESULT :::: ============================================================
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds metrics to identify different types of read operations in the ABFS driver by enhancing the tracing header with operation-specific information. The main goal is to differentiate between various read types (direct, normal, prefetch, cache miss, footer, and small file reads) through the ClientRequestId header.
Key changes include:
- Adding a ReadType enum to categorize different read operations
- Updating the tracing header format to include versioning and operation-specific headers
- Modifying read operations throughout the codebase to set appropriate ReadType values
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| ReadType.java | New enum defining six read operation types with abbreviated string representations |
| AbfsHttpConstants.java | Added TracingHeaderVersion enum for header versioning |
| TracingContext.java | Enhanced header construction with versioning and operation-specific headers |
| Listener.java | Added interface method for updating ReadType |
| AbfsInputStream.java | Updated read operations to set appropriate ReadType values |
| ReadBufferWorker.java | Added imports for ReadType and TracingContext |
| TracingHeaderValidator.java | Updated validation logic for new header format |
| TestApacheHttpClientFallback.java | Fixed test assertions for new header structure |
| TestTracingContext.java | Updated header parsing for new format |
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
Show resolved
Hide resolved
...s/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java
Outdated
Show resolved
Hide resolved
This comment was marked as outdated.
This comment was marked as outdated.
| public static final String STAR = "*"; | ||
| public static final String COMMA = ","; | ||
| public static final String COLON = ":"; | ||
| public static final String HYPHEN = "-"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have CHAR_HYPHEN defined for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| return String.format("%s_%s", header, previousFailure); | ||
| } | ||
|
|
||
| private String getRetryHeader(final String previousFailure, String retryPolicyAbbreviation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add javadoc to all newly added methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| } | ||
|
|
||
| public int getFieldCount() { | ||
| return V1.fieldCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be just return this.fieldCount?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, Thanks for pointing out
| } | ||
|
|
||
| public String getVersion() { | ||
| return V1.version; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, it should be return this.version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| } | ||
|
|
||
| @Test | ||
| public void testReadTypeInTracingContextHeader() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java Doc missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
This comment was marked as outdated.
This comment was marked as outdated.
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
Show resolved
Hide resolved
| // bCursor that means the user requested data has not been read. | ||
| if (fCursor < contentLength && bCursor > limit) { | ||
| restorePointerState(); | ||
| tracingContext.setReadType(ReadType.NORMAL_READ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before readOneBlock we're setting TC as normal read both here and line 439. In readOneBlock method- we're setting TC again to normal read- do we need it twice?
We can keep it once in the method only otherwise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice Catch, that seemed redundant, hence removed
| + position + COLON | ||
| + operatedBlobCount + COLON | ||
| + httpOperation.getTracingContextSuffix() + COLON | ||
| + getOperationSpecificHeader(opType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we keep the op specific header before adding the HTTP client? It would get all req related info together and then network client.
Eg- .....:RE:1_EGR:NR:JDK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds Better, Taken
| return String.format("%s_%s", header, previousFailure); | ||
| } | ||
|
|
||
| private String getRetryHeader(final String previousFailure, String retryPolicyAbbreviation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove the addFailureReasons method- it has no usage now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| public enum TracingHeaderVersion { | ||
|
|
||
| V0("", 8), | ||
| V1("v1", 13); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the next versions would be V1.1/V1.2- so should we consider starting with V1.0/V1.1?
And with the version updates- would we update the version field in V1 only or new V1.1 enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So every time we add a new header, we need to add a new version ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will have simple version strings like v0, v1, v2 and so on. This will help reduce char count in clientReqId.
With any new changes in the schema of Tracing Header (add/delete/rearrange) we need to bump up version and update the schema and getCurrentVersion method to return the latest version.
| .contains(readType.toString()); | ||
| } | ||
|
|
||
| // private testReadTypeInTracingContextHeaderInternal(ReadType readType) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit- we can remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| private String primaryRequestIdForRetry; | ||
|
|
||
| private Integer operatedBlobCount = null; | ||
| private Integer operatedBlobCount = 1; // Only relevant for rename-delete over blob endpoint where it will be explicitly set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is it changed from null to 1 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it was coming out as null in ClientReqId. Having a null value does not looks good and can be prone to NPE if someone used this value anywhere.
Since this is set only in rename/delete other ops are prone to NPE.
As to why set to 1, I thought for every operation this has to be 1. I am open to suggestions for a better default value but strongly feel null should be avoided.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But there was a null check before it was added to the header which would avoid the NPE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but we decided to keep the header schema fix and publishing this value as null does not look good in Client Request Id as it can be exposed to user.
| } | ||
|
|
||
| public static TracingHeaderVersion getCurrentVersion() { | ||
| return V1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to be updated everytime a new version is introduced, can it be dynamically fetched ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to update it to the latest version every time we do a version upgrade.
| header += (":" + httpOperation.getTracingContextSuffix()); | ||
| metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : ""; | ||
| case ALL_ID_FORMAT: | ||
| header = TracingHeaderVersion.V1.getVersion() + COLON |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use getCurrentVersion here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| + streamID + COLON | ||
| + opType + COLON | ||
| + getRetryHeader(previousFailure, retryPolicyAbbreviation) + COLON | ||
| + ingressHandler + COLON |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these empty string checks are needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With empty checks we cannot have a fixed schema. We need the proper defined schema where each position after split is fixed for all the headers and analysis can be done easily without worrying about the position of info we need to analyse.
| case TWO_ID_FORMAT: | ||
| header = clientCorrelationID + ":" + clientRequestId; | ||
| metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : ""; | ||
| header = TracingHeaderVersion.V1.getVersion() + COLON |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above getCurrentVersion ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| private void checkHeaderForRetryPolicyAbbreviation(String header, String expectedFailureReason, String expectedRetryPolicyAbbreviation) { | ||
| String[] headerContents = header.split(":"); | ||
| String previousReqContext = headerContents[6]; | ||
| String[] headerContents = header.split(":", SPLIT_NO_LIMIT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
colon constant here as well since we are changing at other places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| numOfReadCalls += 3; // 3 blocks of 1MB each. | ||
| doReturn(false).when(spiedConfig).isReadAheadV2Enabled(); | ||
| doReturn(false).when(spiedConfig).isReadAheadEnabled(); | ||
| testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, NORMAL_READ, numOfReadCalls); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also verify that it is normal_read for all the three calls made, currently it verifies for contains
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| numOfReadCalls += 3; | ||
| doReturn(true).when(spiedConfig).isReadAheadEnabled(); | ||
| Mockito.doReturn(3).when(spiedConfig).getReadAheadQueueDepth(); | ||
| testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, PREFETCH_READ, numOfReadCalls); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here verify that 2 calls have prefetch_read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| doReturn(true).when(spiedConfig).readSmallFilesCompletely(); | ||
| doReturn(false).when(spiedConfig).optimizeFooterRead(); | ||
| testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, SMALLFILE_READ, numOfReadCalls); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One test for direct read as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| streamStatistics.remoteReadOperation(); | ||
| } | ||
| LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); | ||
| tracingContext.setPosition(String.valueOf(position)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a test to verify position is correctly added to tracing context? Position is a key identifier for read operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. I updated the current test to assert on position as well.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
| if (readType == PREFETCH_READ) { | ||
| /* | ||
| * For Prefetch Enabled, first read can be Normal or Missed Cache Read. | ||
| * Sow e will assert only for last 2 calls which should be Prefetched Read. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit typo: so
|
|
||
| /** | ||
| * Sets the value of the number of blobs operated on | ||
| * Sets the value of the number of blobs operated on976345 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo issue
| jdkCallsRegister[0]++; | ||
| if (AbfsApacheHttpClient.usable()) { | ||
| Assertions.assertThat(tc.getHeader()).contains(JDK_IMPL); | ||
| Assertions.assertThat(tc.getHeader()).endsWith(JDK_IMPL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might fail if we add new header where the network library is not maintained as the last header, so contains looks better to me
| verifyHeaderForReadTypeInTracingContextHeader(tracingContextList.get(i), readType, -1); | ||
| } | ||
| } else if (readType == DIRECT_READ) { | ||
| int expectedReadPos = ONE_MB/3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment for why are we starting with this position will help in clarity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already added in comment above.
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 LGTM
This comment was marked as outdated.
This comment was marked as outdated.
|
🎊 +1 overall
This message was automatically generated. |
…dentify type of read done. (apache#7837) Contributed by Anuj Modi
Description of PR
JIRA: https://issues.apache.org/jira/browse/HADOOP-19645
There are a number of ways in which ABFS driver can trigger a network call to read data. We need a way to identify what type of read call was made from client. Plan is to add an indication for this in already present ClientRequestId header.
Following are types of read we want to identify:
We will add another field in the Tracing Header (Client Request Id) for each request. We can call this field "Operation Specific Header" very similar to how we have "Retry Header" today. As part of this we will only use it for read operations keeping it empty for other operations. Moving ahead f we need to publish any operation specific info, same header can be used.
How was this patch tested?
New tests around changes in Tracing Header and intoduction of read specific header added.
Existing test suite ran across all combinations. Results added as comment