Skip to content

Commit a30360d

Browse files
author
Pankaj Agrawal
committed
feat: Expose SDK v2 s3 client
1 parent 9f20b10 commit a30360d

File tree

9 files changed

+276
-115
lines changed

9 files changed

+276
-115
lines changed

docs/utilities/sqs_large_message_handling.md

+27
Original file line numberDiff line numberDiff line change
@@ -180,4 +180,31 @@ processing.
180180
return "ok";
181181
}
182182
}
183+
```
184+
185+
## Passing custom S3Client
186+
187+
If you need to pass custom S3Client such as region to the SDK, you can pass your own `S3Client` to be used by utility either for
188+
**[SqsLargeMessage annotation](#lambda-handler)**, or **[SqsUtils Utility API](#utility)**.
189+
190+
=== "App.java"
191+
192+
```java hl_lines="4 5 11"
193+
import software.amazon.lambda.powertools.sqs.SqsLargeMessage;
194+
195+
static {
196+
SqsUtils.overrideS3Client(S3Client.builder()
197+
.build());
198+
}
199+
200+
public class SqsMessageHandler implements RequestHandler<SQSEvent, String> {
201+
202+
@Override
203+
@SqsLargeMessage
204+
public String handleRequest(SQSEvent sqsEvent, Context context) {
205+
// process messages
206+
207+
return "ok";
208+
}
209+
}
183210
```

powertools-sqs/pom.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@
5454
<artifactId>aws-lambda-java-events</artifactId>
5555
</dependency>
5656
<dependency>
57-
<groupId>software.amazon.payloadoffloading</groupId>
58-
<artifactId>payloadoffloading-common</artifactId>
57+
<groupId>software.amazon.awssdk</groupId>
58+
<artifactId>sqs</artifactId>
5959
</dependency>
6060
<dependency>
6161
<groupId>software.amazon.awssdk</groupId>
62-
<artifactId>sqs</artifactId>
62+
<artifactId>s3</artifactId>
6363
</dependency>
6464
<dependency>
6565
<groupId>com.fasterxml.jackson.core</groupId>

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@
2222
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
2323
import com.fasterxml.jackson.core.JsonProcessingException;
2424
import com.fasterxml.jackson.databind.ObjectMapper;
25-
2625
import org.slf4j.Logger;
2726
import org.slf4j.LoggerFactory;
28-
27+
import software.amazon.awssdk.services.s3.S3Client;
2928
import software.amazon.awssdk.services.sqs.SqsClient;
3029
import software.amazon.lambda.powertools.sqs.internal.BatchContext;
31-
import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect;
3230
import software.amazon.payloadoffloading.PayloadS3Pointer;
31+
import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect;
3332

3433
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
3534
import static software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect.processMessages;
@@ -42,6 +41,7 @@ public final class SqsUtils {
4241

4342
private static final ObjectMapper objectMapper = new ObjectMapper();
4443
private static SqsClient client;
44+
private static S3Client s3Client;
4545

4646
private SqsUtils() {
4747
}
@@ -98,6 +98,16 @@ public static void overrideSqsClient(SqsClient client) {
9898
SqsUtils.client = client;
9999
}
100100

101+
/**
102+
* Provides ability to set default {@link S3Client} to be used by utility.
103+
* If no default configuration is provided, client is instantiated via {@link S3Client#create()}
104+
*
105+
* @param s3Client {@link S3Client} to be used by utility
106+
*/
107+
public static void overrideS3Client(S3Client s3Client) {
108+
SqsUtils.s3Client = s3Client;
109+
}
110+
101111
/**
102112
* This utility method is used to process each {@link SQSMessage} inside the received {@link SQSEvent}
103113
*
@@ -524,4 +534,12 @@ private static SQSMessage clonedMessage(final SQSMessage sqsMessage) {
524534
public static ObjectMapper objectMapper() {
525535
return objectMapper;
526536
}
537+
538+
public static S3Client s3Client() {
539+
if(null == s3Client) {
540+
SqsUtils.s3Client = S3Client.create();
541+
}
542+
543+
return s3Client;
544+
}
527545
}

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsLargeMessageAspect.java

+34-30
Original file line numberDiff line numberDiff line change
@@ -5,35 +5,33 @@
55
import java.util.List;
66
import java.util.function.Function;
77

8-
import com.amazonaws.AmazonServiceException;
9-
import com.amazonaws.SdkClientException;
108
import com.amazonaws.services.lambda.runtime.Context;
119
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
12-
import com.amazonaws.services.s3.AmazonS3;
13-
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
14-
import com.amazonaws.services.s3.model.S3Object;
15-
import com.amazonaws.services.s3.model.S3ObjectInputStream;
16-
import com.amazonaws.util.IOUtils;
17-
1810
import org.aspectj.lang.ProceedingJoinPoint;
1911
import org.aspectj.lang.annotation.Around;
2012
import org.aspectj.lang.annotation.Aspect;
2113
import org.aspectj.lang.annotation.Pointcut;
2214
import org.slf4j.Logger;
2315
import org.slf4j.LoggerFactory;
24-
16+
import software.amazon.awssdk.core.ResponseInputStream;
17+
import software.amazon.awssdk.core.exception.SdkClientException;
18+
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
19+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
20+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
21+
import software.amazon.awssdk.services.s3.model.S3Exception;
22+
import software.amazon.awssdk.utils.IoUtils;
2523
import software.amazon.lambda.powertools.sqs.SqsLargeMessage;
2624
import software.amazon.payloadoffloading.PayloadS3Pointer;
2725

2826
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
2927
import static java.lang.String.format;
3028
import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod;
29+
import static software.amazon.lambda.powertools.sqs.SqsUtils.s3Client;
3130

3231
@Aspect
3332
public class SqsLargeMessageAspect {
3433

3534
private static final Logger LOG = LoggerFactory.getLogger(SqsLargeMessageAspect.class);
36-
private static AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();
3735

3836
@SuppressWarnings({"EmptyMethod"})
3937
@Pointcut("@annotation(sqsLargeMessage)")
@@ -52,7 +50,7 @@ && placedOnSqsEventRequestHandler(pjp)) {
5250
Object proceed = pjp.proceed(proceedArgs);
5351

5452
if (sqsLargeMessage.deletePayloads()) {
55-
pointersToDelete.forEach(this::deleteMessageFromS3);
53+
pointersToDelete.forEach(SqsLargeMessageAspect::deleteMessage);
5654
}
5755
return proceed;
5856
}
@@ -69,15 +67,21 @@ public static List<PayloadS3Pointer> processMessages(final List<SQSMessage> reco
6967
List<PayloadS3Pointer> s3Pointers = new ArrayList<>();
7068
for (SQSMessage sqsMessage : records) {
7169
if (isBodyLargeMessagePointer(sqsMessage.getBody())) {
72-
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody());
7370

74-
S3Object s3Object = callS3Gracefully(s3Pointer, pointer -> {
75-
S3Object object = amazonS3.getObject(pointer.getS3BucketName(), pointer.getS3Key());
71+
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody())
72+
.orElseThrow(() -> new FailedProcessingLargePayloadException(format("Failed processing SQS body to extract S3 details. [ %s ].", sqsMessage.getBody())));
73+
74+
ResponseInputStream<GetObjectResponse> s3Object = callS3Gracefully(s3Pointer, pointer -> {
75+
ResponseInputStream<GetObjectResponse> response = s3Client().getObject(GetObjectRequest.builder()
76+
.bucket(pointer.getS3BucketName())
77+
.key(pointer.getS3Key())
78+
.build());
79+
7680
LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key());
77-
return object;
81+
return response;
7882
});
7983

80-
sqsMessage.setBody(readStringFromS3Object(s3Object));
84+
sqsMessage.setBody(readStringFromS3Object(s3Object, s3Pointer));
8185
s3Pointers.add(s3Pointer);
8286
}
8387
}
@@ -89,26 +93,22 @@ private static boolean isBodyLargeMessagePointer(String record) {
8993
return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\"");
9094
}
9195

92-
private static String readStringFromS3Object(S3Object object) {
93-
try (S3ObjectInputStream is = object.getObjectContent()) {
94-
return IOUtils.toString(is);
96+
private static String readStringFromS3Object(ResponseInputStream<GetObjectResponse> response,
97+
PayloadS3Pointer s3Pointer) {
98+
try (ResponseInputStream<GetObjectResponse> content = response) {
99+
return IoUtils.toUtf8String(content);
95100
} catch (IOException e) {
96101
LOG.error("Error converting S3 object to String", e);
97-
throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", object.getBucketName(), object.getKey()), e);
102+
throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", s3Pointer.getS3BucketName(), s3Pointer.getS3Key()), e);
98103
}
99104
}
100105

101-
private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
102-
callS3Gracefully(s3Pointer, pointer -> {
103-
amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
104-
LOG.info("Message deleted from S3: " + s3Pointer.toJson());
105-
return null;
106-
});
107-
}
108-
109106
public static void deleteMessage(PayloadS3Pointer s3Pointer) {
110107
callS3Gracefully(s3Pointer, pointer -> {
111-
amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
108+
s3Client().deleteObject(DeleteObjectRequest.builder()
109+
.bucket(pointer.getS3BucketName())
110+
.key(pointer.getS3Key())
111+
.build());
112112
LOG.info("Message deleted from S3: " + s3Pointer.toJson());
113113
return null;
114114
});
@@ -118,7 +118,7 @@ private static <R> R callS3Gracefully(final PayloadS3Pointer pointer,
118118
final Function<PayloadS3Pointer, R> function) {
119119
try {
120120
return function.apply(pointer);
121-
} catch (AmazonServiceException e) {
121+
} catch (S3Exception e) {
122122
LOG.error("A service exception", e);
123123
throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e);
124124
} catch (SdkClientException e) {
@@ -137,5 +137,9 @@ public static class FailedProcessingLargePayloadException extends RuntimeExcepti
137137
public FailedProcessingLargePayloadException(String message, Throwable cause) {
138138
super(message, cause);
139139
}
140+
141+
public FailedProcessingLargePayloadException(String message) {
142+
super(message);
143+
}
140144
}
141145
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package software.amazon.payloadoffloading;
2+
3+
import java.util.Optional;
4+
5+
import com.fasterxml.jackson.databind.DeserializationFeature;
6+
import com.fasterxml.jackson.databind.ObjectMapper;
7+
import com.fasterxml.jackson.databind.ObjectWriter;
8+
import com.fasterxml.jackson.databind.SerializationFeature;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import static java.util.Optional.empty;
13+
import static java.util.Optional.ofNullable;
14+
15+
public class PayloadS3Pointer {
16+
private static final Logger LOG = LoggerFactory.getLogger(PayloadS3Pointer.class);
17+
private static final ObjectMapper objectMapper = new ObjectMapper();
18+
19+
private String s3BucketName;
20+
private String s3Key;
21+
22+
static {
23+
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
24+
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
25+
objectMapper.activateDefaultTyping(objectMapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
26+
}
27+
28+
private PayloadS3Pointer() {
29+
30+
}
31+
32+
public String getS3BucketName() {
33+
return this.s3BucketName;
34+
}
35+
36+
public String getS3Key() {
37+
return this.s3Key;
38+
}
39+
40+
public static Optional<PayloadS3Pointer> fromJson(String s3PointerJson) {
41+
try {
42+
return ofNullable(objectMapper.readValue(s3PointerJson, PayloadS3Pointer.class));
43+
} catch (Exception e) {
44+
LOG.error("Failed to read the S3 object pointer from given string.", e);
45+
return empty();
46+
}
47+
}
48+
49+
public Optional<String> toJson() {
50+
try {
51+
ObjectWriter objectWriter = objectMapper.writer();
52+
return ofNullable(objectWriter.writeValueAsString(this));
53+
54+
} catch (Exception e) {
55+
LOG.error("Failed to convert S3 object pointer to text.", e);
56+
return empty();
57+
}
58+
}
59+
}

0 commit comments

Comments
 (0)