5
5
import java .util .List ;
6
6
import java .util .function .Function ;
7
7
8
- import com .amazonaws .AmazonServiceException ;
9
- import com .amazonaws .SdkClientException ;
10
8
import com .amazonaws .services .lambda .runtime .Context ;
11
9
import com .amazonaws .services .lambda .runtime .events .SQSEvent ;
12
- import com .amazonaws .services .s3 .AmazonS3 ;
13
- import com .amazonaws .services .s3 .AmazonS3ClientBuilder ;
14
- import com .amazonaws .services .s3 .model .S3Object ;
15
- import com .amazonaws .services .s3 .model .S3ObjectInputStream ;
16
- import com .amazonaws .util .IOUtils ;
17
-
18
10
import org .aspectj .lang .ProceedingJoinPoint ;
19
11
import org .aspectj .lang .annotation .Around ;
20
12
import org .aspectj .lang .annotation .Aspect ;
21
13
import org .aspectj .lang .annotation .Pointcut ;
22
14
import org .slf4j .Logger ;
23
15
import org .slf4j .LoggerFactory ;
24
-
16
+ import software .amazon .awssdk .core .ResponseInputStream ;
17
+ import software .amazon .awssdk .core .exception .SdkClientException ;
18
+ import software .amazon .awssdk .services .s3 .model .DeleteObjectRequest ;
19
+ import software .amazon .awssdk .services .s3 .model .GetObjectRequest ;
20
+ import software .amazon .awssdk .services .s3 .model .GetObjectResponse ;
21
+ import software .amazon .awssdk .services .s3 .model .S3Exception ;
22
+ import software .amazon .awssdk .utils .IoUtils ;
25
23
import software .amazon .lambda .powertools .sqs .SqsLargeMessage ;
26
24
import software .amazon .payloadoffloading .PayloadS3Pointer ;
27
25
28
26
import static com .amazonaws .services .lambda .runtime .events .SQSEvent .SQSMessage ;
29
27
import static java .lang .String .format ;
30
28
import static software .amazon .lambda .powertools .core .internal .LambdaHandlerProcessor .isHandlerMethod ;
29
+ import static software .amazon .lambda .powertools .sqs .SqsUtils .s3Client ;
31
30
32
31
@ Aspect
33
32
public class SqsLargeMessageAspect {
34
33
35
34
private static final Logger LOG = LoggerFactory .getLogger (SqsLargeMessageAspect .class );
36
- private static AmazonS3 amazonS3 = AmazonS3ClientBuilder .defaultClient ();
37
35
38
36
@ SuppressWarnings ({"EmptyMethod" })
39
37
@ Pointcut ("@annotation(sqsLargeMessage)" )
@@ -52,7 +50,7 @@ && placedOnSqsEventRequestHandler(pjp)) {
52
50
Object proceed = pjp .proceed (proceedArgs );
53
51
54
52
if (sqsLargeMessage .deletePayloads ()) {
55
- pointersToDelete .forEach (this :: deleteMessageFromS3 );
53
+ pointersToDelete .forEach (SqsLargeMessageAspect :: deleteMessage );
56
54
}
57
55
return proceed ;
58
56
}
@@ -69,15 +67,21 @@ public static List<PayloadS3Pointer> processMessages(final List<SQSMessage> reco
69
67
List <PayloadS3Pointer > s3Pointers = new ArrayList <>();
70
68
for (SQSMessage sqsMessage : records ) {
71
69
if (isBodyLargeMessagePointer (sqsMessage .getBody ())) {
72
- PayloadS3Pointer s3Pointer = PayloadS3Pointer .fromJson (sqsMessage .getBody ());
73
70
74
- S3Object s3Object = callS3Gracefully (s3Pointer , pointer -> {
75
- S3Object object = amazonS3 .getObject (pointer .getS3BucketName (), pointer .getS3Key ());
71
+ PayloadS3Pointer s3Pointer = PayloadS3Pointer .fromJson (sqsMessage .getBody ())
72
+ .orElseThrow (() -> new FailedProcessingLargePayloadException (format ("Failed processing SQS body to extract S3 details. [ %s ]." , sqsMessage .getBody ())));
73
+
74
+ ResponseInputStream <GetObjectResponse > s3Object = callS3Gracefully (s3Pointer , pointer -> {
75
+ ResponseInputStream <GetObjectResponse > response = s3Client ().getObject (GetObjectRequest .builder ()
76
+ .bucket (pointer .getS3BucketName ())
77
+ .key (pointer .getS3Key ())
78
+ .build ());
79
+
76
80
LOG .debug ("Object downloaded with key: " + s3Pointer .getS3Key ());
77
- return object ;
81
+ return response ;
78
82
});
79
83
80
- sqsMessage .setBody (readStringFromS3Object (s3Object ));
84
+ sqsMessage .setBody (readStringFromS3Object (s3Object , s3Pointer ));
81
85
s3Pointers .add (s3Pointer );
82
86
}
83
87
}
@@ -89,26 +93,22 @@ private static boolean isBodyLargeMessagePointer(String record) {
89
93
return record .startsWith ("[\" software.amazon.payloadoffloading.PayloadS3Pointer\" " );
90
94
}
91
95
92
- private static String readStringFromS3Object (S3Object object ) {
93
- try (S3ObjectInputStream is = object .getObjectContent ()) {
94
- return IOUtils .toString (is );
96
+ private static String readStringFromS3Object (ResponseInputStream <GetObjectResponse > response ,
97
+ PayloadS3Pointer s3Pointer ) {
98
+ try (ResponseInputStream <GetObjectResponse > content = response ) {
99
+ return IoUtils .toUtf8String (content );
95
100
} catch (IOException e ) {
96
101
LOG .error ("Error converting S3 object to String" , e );
97
- throw new FailedProcessingLargePayloadException (format ("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]" , object . getBucketName (), object . getKey ()), e );
102
+ throw new FailedProcessingLargePayloadException (format ("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]" , s3Pointer . getS3BucketName (), s3Pointer . getS3Key ()), e );
98
103
}
99
104
}
100
105
101
- private void deleteMessageFromS3 (PayloadS3Pointer s3Pointer ) {
102
- callS3Gracefully (s3Pointer , pointer -> {
103
- amazonS3 .deleteObject (s3Pointer .getS3BucketName (), s3Pointer .getS3Key ());
104
- LOG .info ("Message deleted from S3: " + s3Pointer .toJson ());
105
- return null ;
106
- });
107
- }
108
-
109
106
public static void deleteMessage (PayloadS3Pointer s3Pointer ) {
110
107
callS3Gracefully (s3Pointer , pointer -> {
111
- amazonS3 .deleteObject (s3Pointer .getS3BucketName (), s3Pointer .getS3Key ());
108
+ s3Client ().deleteObject (DeleteObjectRequest .builder ()
109
+ .bucket (pointer .getS3BucketName ())
110
+ .key (pointer .getS3Key ())
111
+ .build ());
112
112
LOG .info ("Message deleted from S3: " + s3Pointer .toJson ());
113
113
return null ;
114
114
});
@@ -118,7 +118,7 @@ private static <R> R callS3Gracefully(final PayloadS3Pointer pointer,
118
118
final Function <PayloadS3Pointer , R > function ) {
119
119
try {
120
120
return function .apply (pointer );
121
- } catch (AmazonServiceException e ) {
121
+ } catch (S3Exception e ) {
122
122
LOG .error ("A service exception" , e );
123
123
throw new FailedProcessingLargePayloadException (format ("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]" , pointer .getS3BucketName (), pointer .getS3Key ()), e );
124
124
} catch (SdkClientException e ) {
@@ -137,5 +137,9 @@ public static class FailedProcessingLargePayloadException extends RuntimeExcepti
137
137
public FailedProcessingLargePayloadException (String message , Throwable cause ) {
138
138
super (message , cause );
139
139
}
140
+
141
+ public FailedProcessingLargePayloadException (String message ) {
142
+ super (message );
143
+ }
140
144
}
141
145
}
0 commit comments