Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public void createOrder(String productName){
}

SalesSystem.orders.put(orderDate, order);
//Check if the Order entered and present
if (SalesSystem.orders.containsKey(orderDate)) {
System.out.println("New order verified to be present in hashmap: " + SalesSystem.orders.get(orderDate));
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommendation generated by Amazon CodeGuru Reviewer. Leave feedback on this recommendation by replying to the comment or by reacting to the comment using emoji.

Problem
You are using a ConcurrentHashMap, but your usage of containsKey() and get() may not be thread-safe at lines: 65 and 66. In between the check and the get() another thread can remove the key and the get() will return null. The remove that can remove the key is at line: 59.

Fix
Consider calling get(), checking instead of your current check if the returned object is null, and then using that object only, without calling get() again.

More info
View an example on GitHub (external link).


}
id++;
} catch (IllegalArgumentException e){
//e.printStackTrace();
Expand Down
175 changes: 175 additions & 0 deletions src/main/java/com/company/sample/application/EventHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package com.shipmentEvents.handlers;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.shipmentEvents.util.Constants;
import java.util.concurrent.ConcurrentHashMap;


import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;


public class EventHandler implements RequestHandler<ScheduledEvent, String> {

/**
* Shipment events for a carrier are uploaded to separate S3 buckets based on the source of events. E.g., events originating form
* the hand-held scanner are stored in a separate bucket than the ones from mobile App. The Lambda processes events from multiple
* sources and updates the latest status of the package in a summary S3 bucket every 15 minutes.
*
* The events are stored in following format:
* - Each status update is a file, where the name of the file is tracking number + random id
* - Each file has status and time-stamp as the first 2 lines respectively
* - The time at which is file is stored in S3 is not an indication of the time-stamp of the event
* - Once the status is marked as DELIVERED, we can stop tracking the package
*
* A Sample files looks as below:
* FILE-NAME-> '8787323232232332--55322798-dd29-4a04-97f4-93e18feed554'
* >status:IN TRANSIT
* >timestamp: 1573410202
* >Other fields like...tracking history and address
*/
public String handleRequest(ScheduledEvent scheduledEvent, Context context) {

final LambdaLogger logger = context.getLogger();
try {
processShipmentUpdates(logger);
return "SUCCESS";
} catch (final Exception ex) {
logger.log(String.format("Failed to process shipment Updates in %s due to %s", scheduledEvent.getAccount(), ex.getMessage()));
throw new RuntimeException(ex);
}
}


private void processShipmentUpdates(final LambdaLogger logger) throws InterruptedException {

final List<String> bucketsToProcess = Constants.BUCKETS_TO_PROCESS;
final ConcurrentHashMap<String, Pair<Long, String>> latestStatusForTrackingNumber = new ConcurrentHashMap<String, Pair<Long, String>>();
final ConcurrentHashMap<String, List<KeyVersion>> filesToDelete = new ConcurrentHashMap<String, List<DeleteObjectsRequest.KeyVersion>>();
bucketsToProcess.parallelStream().forEach(bucketName -> {
final List<KeyVersion> filesProcessed = processEventsInBucket(bucketName, logger, latestStatusForTrackingNumber);
filesToDelete.put(bucketName, filesProcessed);
});

final AmazonS3 s3Client = EventHandler.getS3Client();
//Create a new file in the Constants.SUMMARY_BUCKET
logger.log("Map of statuses -> " + latestStatusForTrackingNumber);
String summaryUpdateName = Long.toString(System.currentTimeMillis());

EventHandler.getS3Client().putObject(Constants.SUMMARY_BUCKET, summaryUpdateName, latestStatusForTrackingNumber.toString());

long expirationTime = System.currentTimeMillis() + Duration.ofMinutes(1).toMillis();
while(System.currentTimeMillis() < expirationTime) {
if (s3Client.doesObjectExist(Constants.SUMMARY_BUCKET, summaryUpdateName)) {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommendation generated by Amazon CodeGuru Reviewer. Leave feedback on this recommendation by replying to the comment or by reacting to the comment using emoji.

This code appears to be waiting for a resource before it runs. You could use the waiters feature to help improve efficiency. Consider using ObjectExists or ObjectNotExists. For more information, see https://aws.amazon.com/blogs/developer/waiters-in-the-aws-sdk-for-java/

break;
}
logger.log("waiting for file to be created " + summaryUpdateName);
Thread.sleep(1000);
}

// Before we delete the shipment updates make sure the summary update file exists
if (EventHandler.getS3Client().doesObjectExist(Constants.SUMMARY_BUCKET, summaryUpdateName)) {
deleteProcessedFiles(filesToDelete);
logger.log("All updates successfully processed");
} else {
throw new RuntimeException("Failed to write sumary status, will be retried in 15 minutes");
}

}

private List<KeyVersion> processEventsInBucket(String bucketName, LambdaLogger logger, ConcurrentHashMap<String, Pair<Long, String>> latestStatusForTrackingNumber) {

final AmazonS3 s3Client = EventHandler.getS3Client();
logger.log("Processing Bucket: " + bucketName);

ObjectListing files = s3Client.listObjects(bucketName);
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommendation generated by Amazon CodeGuru Reviewer. Leave feedback on this recommendation by replying to the comment or by reacting to the comment using emoji.

This code uses an outdated API. ListObjectsV2 is the revised List Objects API, and we recommend you use this revised API for new application developments.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommendation generated by Amazon CodeGuru Reviewer. Leave feedback on this recommendation by replying to the comment or by reacting to the comment using emoji.

This code might not produce accurate results if the operation returns paginated results instead of all results. Consider adding another call to check for additional results.

List<KeyVersion> filesProcessed = new ArrayList<DeleteObjectsRequest.KeyVersion>();

for (Iterator<?> iterator = files.getObjectSummaries().iterator(); iterator.hasNext(); ) {
S3ObjectSummary summary = (S3ObjectSummary) iterator.next();
logger.log("Reading Object: " + summary.getKey());

String trackingNumber = summary.getKey().split("--")[0];
Pair<Long, String> lastKnownStatus = latestStatusForTrackingNumber.get(trackingNumber);

// Check if this shipment has already been delivered, skip this file
if (lastKnownStatus != null && "DELIVERED".equals(lastKnownStatus.getRight())) {
continue;
}

String fileContents = s3Client.getObjectAsString(bucketName, summary.getKey());

if (!isValidFile(fileContents)) {
logger.log(String.format("Skipping invalid file %s", summary.getKey()));
continue;
}

if (!fileContents.contains("\n")) {

}
String[] lines = fileContents.split("\n");
String line1 = lines[0];
String line2 = lines[1];

String status = line1.split(":")[1];
Long timeStamp = Long.parseLong(line2.split(":")[1]);


if (null == lastKnownStatus || lastKnownStatus.getLeft() < timeStamp) {
lastKnownStatus = new MutablePair<Long, String>(timeStamp, status);
latestStatusForTrackingNumber.put(trackingNumber, lastKnownStatus);
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommendation generated by Amazon CodeGuru Reviewer. Leave feedback on this recommendation by replying to the comment or by reacting to the comment using emoji.

Problem
You are using a ConcurrentHashMap, but your usage of get() and put() may not be thread-safe at lines: 110, 113, 135, and 137. Two threads can perform this same check at the same time and one thread can overwrite the value written by the other thread.

Fix
Consider replacing put() with putIfAbsent() to help prevent accidental overwriting. putIfAbsent() puts the value only if the ConcurrentHashMap does not contain the key and therefore avoids overwriting the value written there by the other thread's putIfAbsent().

More info
putIfAbsent() returns null if the value did not exist and returns the value in the map if one already exists.

View an example on GitHub (external link).

}

//Add to list of processed files
filesProcessed.add(new KeyVersion(summary.getKey()));
logger.log("logging Contents of the file" + fileContents);
}
return filesProcessed;
}


private void deleteProcessedFiles(Map<String, List<KeyVersion>> filesToDelete) {
final AmazonS3 s3Client = EventHandler.getS3Client();
for (Entry<String, List<KeyVersion>> entry : filesToDelete.entrySet()) {
final DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(entry.getKey()).withKeys(entry.getValue()).withQuiet(false);
s3Client.deleteObjects(deleteRequest);
}
}

private boolean isValidFile(String fileContents) {
if (!fileContents.contains("\n")) {
return false;
}
String[] lines = fileContents.split("\n");
for (String l: lines) {
if (!l.contains(":")) {
return false;
}
}
return true;
}

public static AmazonS3 getS3Client() {
return AmazonS3ClientBuilder.standard().withRegion(Regions.DEFAULT_REGION).build();
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommendation generated by Amazon CodeGuru Reviewer. Leave feedback on this recommendation by replying to the comment or by reacting to the comment using emoji.

This code is written so that the client cannot be reused across invocations of the Lambda function.
To improve the performance of the Lambda function, consider using static initialization/constructor, global/static variables and singletons. It allows to keep alive and reuse HTTP connections that were established during a previous invocation.
Learn more about best practices for working with AWS Lambda functions.

}


}