-
Notifications
You must be signed in to change notification settings - Fork 11
pubsub: Enable sendAck and sendNack apis for AWS Pubsub #134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #134 +/- ##
============================================
+ Coverage 83.06% 83.32% +0.25%
Complexity 36 36
============================================
Files 148 148
Lines 7635 7710 +75
Branches 888 901 +13
============================================
+ Hits 6342 6424 +82
+ Misses 859 853 -6
+ Partials 434 433 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| List<String> receiptHandles = new ArrayList<>(); | ||
| for (AckID ackID : ackIDs) { | ||
| if (ackID instanceof AwsAckID) { | ||
| receiptHandles.add(((AwsAckID) ackID).getReceiptHandle()); | ||
| } else { | ||
| throw new IllegalArgumentException("Invalid AckID type: " + ackID.getClass()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
under what scenario will this not be instance of AwsAckID ? this seems like internals to SDK and throwing IllegalArgumentException to user for something abstracted doesn't make much sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep.. The instanceof check here doesn’t add any value. I removed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually thinking aloud, why do we need AckID interface? AWS and GCP both have strings at AckIDs, then not sure if this interface provides any value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a similar thought when I was working on the GCP implementation. this AckID interface doesn’t seem to add much value.
But I wasn’t sure how AWS or Ali drivers were handling their AckIDs, and I didn’t want to touch the driver-level architecture at that time.
I’m trying to replace it with strings now, and it should have no functional impact.
| // SQS supports max 10 messages per batch operation | ||
| for (int i = 0; i < receiptHandles.size(); i += 10) { | ||
| int endIndex = Math.min(i + 10, receiptHandles.size()); | ||
| List<String> batch = receiptHandles.subList(i, endIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why would we need additional memory when already have receipt handles ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated. Removed subList() and now use receiptHandles directly to avoid extra memory and simplify batching.
|
|
||
| List<String> receiptHandles = new ArrayList<>(); | ||
| for (AckID ackID : ackIDs) { | ||
| if (ackID instanceof AwsAckID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
|
||
| for (int i = 0; i < receiptHandles.size(); i += 10) { | ||
| int endIndex = Math.min(i + 10, receiptHandles.size()); | ||
| List<String> batch = receiptHandles.subList(i, endIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
|
||
| if (!actualFailures.isEmpty()) { | ||
| BatchResultErrorEntry firstFailure = actualFailures.get(0); | ||
| throw new RuntimeException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not throw RunTimeException
| List<ContentPattern<?>> bodyPatterns = stub.getRequest().getBodyPatterns(); | ||
| if (bodyPatterns == null) { | ||
| return stub; | ||
| } | ||
|
|
||
| // Relax only batch operations | ||
| if (amzTarget.contains("DeleteMessageBatch")) { | ||
| bodyPatterns.clear(); | ||
| bodyPatterns.add(new MatchesJsonPathPattern("$.Entries[*].ReceiptHandle")); | ||
| } | ||
| else if (amzTarget.contains("ChangeMessageVisibilityBatch")) { | ||
| bodyPatterns.clear(); | ||
| bodyPatterns.add(new MatchesJsonPathPattern("$.Entries[*].ReceiptHandle")); | ||
| bodyPatterns.add(new MatchesJsonPathPattern("$.Entries[*].VisibilityTimeout")); | ||
| } | ||
| else if (amzTarget.contains("SendMessageBatch")) { | ||
| // keep body integrity to preserve MD5 validation | ||
| // just verify structure, but don't clear existing patterns | ||
| bodyPatterns.add(new MatchesJsonPathPattern("$.Entries[*].MessageBody")); | ||
| } | ||
| // For ReceiveMessage — normally no body, skip | ||
| else if (amzTarget.contains("ReceiveMessage")) { | ||
| return stub; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we doing these special handling ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these batch operations (DeleteMessageBatch, ChangeMessageVisibilityBatch, SendMessageBatch) need relaxed matching because their request bodies include dynamic fields like receipt handles or timestamps that vary between replay runs. Without this special handling, our recorded stubs would fail to match during replay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how's that happening. Receipt handle is from server side, the client don't create it. Where does it getting randomized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batch size changes during replay can cause WireMock to match a different ReceiveMessage stub and return a different AWS-recorded ReceiptHandle. This mismatch causes the replay phase cannot match the recorded mappings.
During record:
AWS returned multiple different ReceiptHandles across multiple ReceiveMessage calls:
| Call | batchSize | Mapping matched | ReceiptHandle returned |
|---|---|---|---|
| 1 | 1 | mapping A | A1 |
| 2 | 2 | mapping B | B1 |
| 3 | 1 | mapping C | C1 |
During replay:
If the batchSize changes (e.g., from 2 → 1), the second ReceiveMessage call may match mapping C instead of mapping B.
So replay returns:
Record used: B1
Replay uses: C1 (from mapping C recorded earlier)
Both B1 and C1 were generated by server side. replay is picking a different recorded mapping because the request no longer matches the same one.
Summary
doSendAcks()anddoSendNacks()to support acknowledging and negatively acknowledging messages for AWS SQS.DeleteMessageBatchAPI to remove successfully processed messages.ChangeMessageVisibilityBatchAPI to make failed messages visible again for redelivery.AwsMatcherRelaxingTransformerto relax WireMock’s body matching for DeleteMessageBatch or ChangeMessageVisibilityBatch operations.Some conventions to follow
docstore:for document store module,blobstorefor Blob Store moduletest:perf: