55import java .util .List ;
66import java .util .function .Function ;
77
8- import com .amazonaws .AmazonServiceException ;
9- import com .amazonaws .SdkClientException ;
108import com .amazonaws .services .lambda .runtime .Context ;
119import com .amazonaws .services .lambda .runtime .events .SQSEvent ;
12- import com .amazonaws .services .s3 .AmazonS3 ;
13- import com .amazonaws .services .s3 .AmazonS3ClientBuilder ;
14- import com .amazonaws .services .s3 .model .S3Object ;
15- import com .amazonaws .services .s3 .model .S3ObjectInputStream ;
16- import com .amazonaws .util .IOUtils ;
17-
1810import org .aspectj .lang .ProceedingJoinPoint ;
1911import org .aspectj .lang .annotation .Around ;
2012import org .aspectj .lang .annotation .Aspect ;
2113import org .aspectj .lang .annotation .Pointcut ;
2214import org .slf4j .Logger ;
2315import org .slf4j .LoggerFactory ;
24-
16+ import software .amazon .awssdk .core .ResponseInputStream ;
17+ import software .amazon .awssdk .core .exception .SdkClientException ;
18+ import software .amazon .awssdk .services .s3 .model .DeleteObjectRequest ;
19+ import software .amazon .awssdk .services .s3 .model .GetObjectRequest ;
20+ import software .amazon .awssdk .services .s3 .model .GetObjectResponse ;
21+ import software .amazon .awssdk .services .s3 .model .S3Exception ;
22+ import software .amazon .awssdk .utils .IoUtils ;
2523import software .amazon .lambda .powertools .sqs .SqsLargeMessage ;
2624import software .amazon .payloadoffloading .PayloadS3Pointer ;
2725
2826import static com .amazonaws .services .lambda .runtime .events .SQSEvent .SQSMessage ;
2927import static java .lang .String .format ;
3028import static software .amazon .lambda .powertools .core .internal .LambdaHandlerProcessor .isHandlerMethod ;
29+ import static software .amazon .lambda .powertools .sqs .SqsUtils .s3Client ;
3130
3231@ Aspect
3332public class SqsLargeMessageAspect {
3433
3534 private static final Logger LOG = LoggerFactory .getLogger (SqsLargeMessageAspect .class );
36- private static AmazonS3 amazonS3 = AmazonS3ClientBuilder .defaultClient ();
3735
3836 @ SuppressWarnings ({"EmptyMethod" })
3937 @ Pointcut ("@annotation(sqsLargeMessage)" )
@@ -52,7 +50,7 @@ && placedOnSqsEventRequestHandler(pjp)) {
5250 Object proceed = pjp .proceed (proceedArgs );
5351
5452 if (sqsLargeMessage .deletePayloads ()) {
55- pointersToDelete .forEach (this :: deleteMessageFromS3 );
53+ pointersToDelete .forEach (SqsLargeMessageAspect :: deleteMessage );
5654 }
5755 return proceed ;
5856 }
@@ -69,15 +67,21 @@ public static List<PayloadS3Pointer> processMessages(final List<SQSMessage> reco
6967 List <PayloadS3Pointer > s3Pointers = new ArrayList <>();
7068 for (SQSMessage sqsMessage : records ) {
7169 if (isBodyLargeMessagePointer (sqsMessage .getBody ())) {
72- PayloadS3Pointer s3Pointer = PayloadS3Pointer .fromJson (sqsMessage .getBody ());
7370
74- S3Object s3Object = callS3Gracefully (s3Pointer , pointer -> {
75- S3Object object = amazonS3 .getObject (pointer .getS3BucketName (), pointer .getS3Key ());
71+ PayloadS3Pointer s3Pointer = PayloadS3Pointer .fromJson (sqsMessage .getBody ())
72+ .orElseThrow (() -> new FailedProcessingLargePayloadException (format ("Failed processing SQS body to extract S3 details. [ %s ]." , sqsMessage .getBody ())));
73+
74+ ResponseInputStream <GetObjectResponse > s3Object = callS3Gracefully (s3Pointer , pointer -> {
75+ ResponseInputStream <GetObjectResponse > response = s3Client ().getObject (GetObjectRequest .builder ()
76+ .bucket (pointer .getS3BucketName ())
77+ .key (pointer .getS3Key ())
78+ .build ());
79+
7680 LOG .debug ("Object downloaded with key: " + s3Pointer .getS3Key ());
77- return object ;
81+ return response ;
7882 });
7983
80- sqsMessage .setBody (readStringFromS3Object (s3Object ));
84+ sqsMessage .setBody (readStringFromS3Object (s3Object , s3Pointer ));
8185 s3Pointers .add (s3Pointer );
8286 }
8387 }
@@ -89,26 +93,22 @@ private static boolean isBodyLargeMessagePointer(String record) {
8993 return record .startsWith ("[\" software.amazon.payloadoffloading.PayloadS3Pointer\" " );
9094 }
9195
92- private static String readStringFromS3Object (S3Object object ) {
93- try (S3ObjectInputStream is = object .getObjectContent ()) {
94- return IOUtils .toString (is );
96+ private static String readStringFromS3Object (ResponseInputStream <GetObjectResponse > response ,
97+ PayloadS3Pointer s3Pointer ) {
98+ try (ResponseInputStream <GetObjectResponse > content = response ) {
99+ return IoUtils .toUtf8String (content );
95100 } catch (IOException e ) {
96101 LOG .error ("Error converting S3 object to String" , e );
97- throw new FailedProcessingLargePayloadException (format ("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]" , object . getBucketName (), object . getKey ()), e );
102+ throw new FailedProcessingLargePayloadException (format ("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]" , s3Pointer . getS3BucketName (), s3Pointer . getS3Key ()), e );
98103 }
99104 }
100105
101- private void deleteMessageFromS3 (PayloadS3Pointer s3Pointer ) {
102- callS3Gracefully (s3Pointer , pointer -> {
103- amazonS3 .deleteObject (s3Pointer .getS3BucketName (), s3Pointer .getS3Key ());
104- LOG .info ("Message deleted from S3: " + s3Pointer .toJson ());
105- return null ;
106- });
107- }
108-
109106 public static void deleteMessage (PayloadS3Pointer s3Pointer ) {
110107 callS3Gracefully (s3Pointer , pointer -> {
111- amazonS3 .deleteObject (s3Pointer .getS3BucketName (), s3Pointer .getS3Key ());
108+ s3Client ().deleteObject (DeleteObjectRequest .builder ()
109+ .bucket (pointer .getS3BucketName ())
110+ .key (pointer .getS3Key ())
111+ .build ());
112112 LOG .info ("Message deleted from S3: " + s3Pointer .toJson ());
113113 return null ;
114114 });
@@ -118,7 +118,7 @@ private static <R> R callS3Gracefully(final PayloadS3Pointer pointer,
118118 final Function <PayloadS3Pointer , R > function ) {
119119 try {
120120 return function .apply (pointer );
121- } catch (AmazonServiceException e ) {
121+ } catch (S3Exception e ) {
122122 LOG .error ("A service exception" , e );
123123 throw new FailedProcessingLargePayloadException (format ("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]" , pointer .getS3BucketName (), pointer .getS3Key ()), e );
124124 } catch (SdkClientException e ) {
@@ -137,5 +137,9 @@ public static class FailedProcessingLargePayloadException extends RuntimeExcepti
137137 public FailedProcessingLargePayloadException (String message , Throwable cause ) {
138138 super (message , cause );
139139 }
140+
141+ public FailedProcessingLargePayloadException (String message ) {
142+ super (message );
143+ }
140144 }
141145}
0 commit comments