Skip to content

Commit 7d7e7ff

Browse files
committed
AWS SDK v2 S3 TransferManager S3V2BlobProvider POC with
aws-sdk-java-v2 version 2.17.144 aws-sdk-java-v2 s3-transfer-manager version 2.17.144-SNAPSHOT-PREVIEW built from PR aws/aws-sdk-java-v2#3084 aws-crt-java version 0.15.20
1 parent b4efd24 commit 7d7e7ff

File tree

7 files changed

+367
-10
lines changed

7 files changed

+367
-10
lines changed

addons/nuxeo-core-binarymanager-cloud/nuxeo-core-binarymanager-s3/pom.xml

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,37 @@
44
<parent>
55
<groupId>org.nuxeo.ecm.core</groupId>
66
<artifactId>nuxeo-core-binarymanager-cloud</artifactId>
7-
<version>10.10-HF58</version>
7+
<version>10.10-HF59-SNAPSHOT</version>
88
<relativePath>../pom.xml</relativePath>
99
</parent>
1010

1111
<artifactId>nuxeo-core-binarymanager-s3</artifactId>
1212
<name>Nuxeo Core S3 BinaryManager</name>
1313
<description>Nuxeo Core S3 BinaryManager</description>
1414

15+
<dependencyManagement>
16+
<dependencies>
17+
<dependency>
18+
<groupId>software.amazon.awssdk</groupId>
19+
<artifactId>bom</artifactId>
20+
<version>2.17.144</version>
21+
<type>pom</type>
22+
<scope>import</scope>
23+
</dependency>
24+
</dependencies>
25+
</dependencyManagement>
26+
1527
<dependencies>
28+
<dependency>
29+
<groupId>software.amazon.awssdk</groupId>
30+
<artifactId>s3-transfer-manager</artifactId>
31+
<version>2.17.144-SNAPSHOT-PREVIEW</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>software.amazon.awssdk.crt</groupId>
35+
<artifactId>aws-crt</artifactId>
36+
<version>0.15.20</version>
37+
</dependency>
1638
<dependency>
1739
<groupId>org.nuxeo.common</groupId>
1840
<artifactId>nuxeo-common</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package org.nuxeo.ecm.blob.s3v2;
2+
3+
import org.apache.logging.log4j.LogManager;
4+
import org.apache.logging.log4j.Logger;
5+
import org.nuxeo.ecm.blob.s3.S3BlobProvider;
6+
import org.nuxeo.ecm.core.api.Blob;
7+
import org.nuxeo.ecm.core.api.repository.RepositoryManager;
8+
import org.nuxeo.ecm.core.blob.AbstractBlobProvider;
9+
import org.nuxeo.ecm.core.blob.BlobInfo;
10+
import org.nuxeo.ecm.core.blob.BlobManager;
11+
import org.nuxeo.ecm.core.blob.BlobProvider;
12+
import org.nuxeo.runtime.api.Framework;
13+
14+
import java.io.IOException;
15+
import java.util.Collections;
16+
import java.util.List;
17+
import java.util.Map;
18+
19+
/**
20+
* This is not a full blob provider. It is a POC that provides two methods of a blob provider: {@link #readBlob}
21+
* and {@link #readBlobs} to illustrate the use of the AWS SDK V2 Transfer Manager (preview) for binary object
22+
* download only. Fails - or behavior is undefined - if any other provider methods are called. Fails if an
23+
* attempt is made to use the class when the default {@link BlobManager} is not an {@link S3BlobProvider}.
24+
*/
25+
public class S3V2BlobProvider extends AbstractBlobProvider {
26+
27+
private static final Logger log = LogManager.getLogger(S3V2BlobProvider.class);
28+
29+
private S3V2BlobReader blobReader;
30+
31+
public Map<BlobInfo, Blob> readBlobs(List<BlobInfo> blobInfos) {
32+
return blobReader.readBlobs(blobInfos);
33+
}
34+
35+
@Override
36+
public void initialize(String blobProviderId, Map<String, String> properties) throws IOException {
37+
super.initialize(blobProviderId, properties);
38+
blobReader = initBlobReader();
39+
}
40+
41+
private S3V2BlobReader initBlobReader() {
42+
final String defaultRepoName = Framework.getService(RepositoryManager.class).getDefaultRepositoryName();
43+
BlobProvider frameworkBlobProvider = Framework.getService(BlobManager.class).getBlobProvider(defaultRepoName);
44+
if (frameworkBlobProvider == null) {
45+
throw new RuntimeException("No blob provider found for framework default repository");
46+
}
47+
if (!(frameworkBlobProvider instanceof S3BlobProvider)) {
48+
throw new UnsupportedOperationException();
49+
}
50+
log.info("BlobReader initialized");
51+
return new S3V2BlobReader((S3BlobProvider)frameworkBlobProvider);
52+
}
53+
54+
@Override
55+
public void close() {
56+
blobReader.close();
57+
}
58+
59+
@Override
60+
public Blob readBlob(BlobInfo blobInfo) throws IOException {
61+
return blobReader.readBlobs(Collections.singletonList(blobInfo)).get(blobInfo);
62+
}
63+
64+
@Override
65+
public String writeBlob(Blob blob) throws IOException {
66+
throw new UnsupportedOperationException();
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
package org.nuxeo.ecm.blob.s3v2;
2+
3+
import org.apache.logging.log4j.LogManager;
4+
import org.apache.logging.log4j.Logger;
5+
import org.nuxeo.ecm.blob.s3.S3BlobProvider;
6+
import org.nuxeo.ecm.blob.s3.S3BlobStoreConfiguration;
7+
import org.nuxeo.ecm.core.api.Blob;
8+
import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
9+
import org.nuxeo.ecm.core.blob.BlobInfo;
10+
import org.nuxeo.runtime.api.Framework;
11+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
12+
import software.amazon.awssdk.auth.credentials.AwsCredentials;
13+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
14+
import software.amazon.awssdk.regions.Region;
15+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
16+
import software.amazon.awssdk.transfer.s3.CompletedFileDownload;
17+
import software.amazon.awssdk.transfer.s3.DownloadFileRequest;
18+
import software.amazon.awssdk.transfer.s3.FileDownload;
19+
import software.amazon.awssdk.transfer.s3.S3TransferManager;
20+
21+
import java.io.File;
22+
import java.net.URI;
23+
import java.nio.file.Files;
24+
import java.nio.file.Path;
25+
import java.nio.file.Paths;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.ExecutionException;
31+
32+
public class S3V2BlobReader {
33+
34+
private static final Logger log = LogManager.getLogger(S3V2BlobReader.class);
35+
36+
private static final String NUXEO_S3STORAGE_MIN_PARTSIZE_PROPERTY = "nuxeo.s3storage.minPartSize";
37+
private static final String NUXEO_S3STORAGE_TARGET_THROUGHPUT_PROPERTY = "nuxeo.s3storage.targetThroughput";
38+
private static final String NUXEO_S3STORAGE_MAX_CONCURRENCY_PROPERTY = "nuxeo.s3storage.maxConcurrency";
39+
40+
final String bucketName;
41+
final String endpoint;
42+
final String region;
43+
final String accessKeyId;
44+
final String secretAccessKey;
45+
final Integer pathDepth;
46+
final Long minimumPartSizeInBytes;
47+
final Double targetThroughputInGbps;
48+
final Integer maxConcurrency;
49+
final Path frameworkBlobCacheDir;
50+
final S3BlobProvider frameworkBlobProvider;
51+
final S3TransferManager transferManager;
52+
53+
/**
54+
* In the POC, this reader shares the filesystem cache with the {@link S3BlobProvider} that is contributed
55+
* by the marketplace package. This is a temporary approach until the concurrent read functionality demonstrated
56+
* by this POC is incorporated into the addon.
57+
*
58+
* @param frameworkBlobProvider so we can share the cache as described as well as other properties
59+
*/
60+
S3V2BlobReader(S3BlobProvider frameworkBlobProvider) {
61+
this.frameworkBlobProvider = frameworkBlobProvider;
62+
frameworkBlobCacheDir = frameworkBlobProvider.config.cachingConfiguration.dir;
63+
64+
bucketName = frameworkBlobProvider.config.bucketName;
65+
endpoint = frameworkBlobProvider.config.getProperty(S3BlobStoreConfiguration.ENDPOINT_PROPERTY);
66+
region = frameworkBlobProvider.config.amazonS3.getRegionName();
67+
accessKeyId = frameworkBlobProvider.config.getProperty(S3BlobStoreConfiguration.AWS_ID_PROPERTY);
68+
secretAccessKey = frameworkBlobProvider.config.getProperty(S3BlobStoreConfiguration.AWS_SECRET_PROPERTY);
69+
pathDepth = frameworkBlobProvider.config.getIntProperty(S3BlobStoreConfiguration.BUCKET_SUB_DIRS_DEPTH_PROPERTY);
70+
minimumPartSizeInBytes = Framework.getProperty(NUXEO_S3STORAGE_MIN_PARTSIZE_PROPERTY) == null ? null :
71+
Long.parseLong(Framework.getProperty(NUXEO_S3STORAGE_MIN_PARTSIZE_PROPERTY));
72+
targetThroughputInGbps = Framework.getProperty(NUXEO_S3STORAGE_TARGET_THROUGHPUT_PROPERTY) == null ? null :
73+
Double.parseDouble(Framework.getProperty(NUXEO_S3STORAGE_TARGET_THROUGHPUT_PROPERTY));
74+
maxConcurrency = Framework.getProperty(NUXEO_S3STORAGE_MAX_CONCURRENCY_PROPERTY) == null ? null :
75+
Integer.parseInt(Framework.getProperty(NUXEO_S3STORAGE_MAX_CONCURRENCY_PROPERTY));
76+
77+
AwsCredentials awsCreds = AwsBasicCredentials.create(accessKeyId, secretAccessKey);
78+
79+
transferManager = S3TransferManager.builder().s3ClientConfiguration(cfg -> cfg
80+
.credentialsProvider(StaticCredentialsProvider.create(awsCreds))
81+
.region(Region.of(region))
82+
.endpointOverride(endpoint == null ? null : URI.create(endpoint))
83+
.minimumPartSizeInBytes(minimumPartSizeInBytes)
84+
.targetThroughputInGbps(targetThroughputInGbps)
85+
.maxConcurrency(maxConcurrency))
86+
.build();
87+
88+
log.info("Blob reader initialized using framework blob provider: " + frameworkBlobProvider
89+
.getClass()
90+
.getSimpleName());
91+
}
92+
93+
Map<BlobInfo, Blob> readBlobs(List<BlobInfo> blobInfos) {
94+
Map<BlobInfo, CompletableFuture<CompletedFileDownload>> futureMap = readBlobsInternal(blobInfos);
95+
Map<BlobInfo, Blob> blobMap = new HashMap<>();
96+
for (Map.Entry<BlobInfo, CompletableFuture<CompletedFileDownload>> entry : futureMap.entrySet()) {
97+
try {
98+
entry.getValue().get();
99+
} catch (ExecutionException | InterruptedException e) {
100+
if (!(e.getMessage().contains("FileAlreadyExistsException"))) {
101+
throw new RuntimeException(e);
102+
}
103+
}
104+
blobMap.put(entry.getKey(), new FileBlob(new File(frameworkBlobCacheDir.toAbsolutePath().toString(),
105+
entry.getKey().digest)));
106+
}
107+
// add blobs that were already cached so the blobs returned by the method matches the input
108+
blobInfos.forEach(blobInfo -> {
109+
if (blobMap.get(blobInfo.digest) == null) {
110+
blobMap.put(blobInfo,
111+
new FileBlob(new File(frameworkBlobCacheDir.toAbsolutePath().toString(), blobInfo.digest)));
112+
}
113+
});
114+
return blobMap;
115+
}
116+
117+
void close() {
118+
try {
119+
transferManager.close();
120+
} catch (Exception e) {
121+
log.info("Exception attempting to close the transfer manager", e);
122+
}
123+
}
124+
125+
/**
126+
* Uses the AWS SDK V2 (Preview) Transfer Manager to download blobs concurrently.
127+
*
128+
* @param blobInfos a list of BlobInfos representing the blobs to download.
129+
* @return a Map in which the key is a blob info, and the value is a future to wait on. When the
130+
* future completes, the blob is downloaded. Only the blobs downloaded by the method are
131+
* returned: i.e. if the blob was already present in cache, it is excluded from the return value.
132+
*/
133+
private Map<BlobInfo, CompletableFuture<CompletedFileDownload>> readBlobsInternal(List<BlobInfo> blobInfos) {
134+
Map<BlobInfo, CompletableFuture<CompletedFileDownload>> futureMap = new HashMap<>();
135+
blobInfos.forEach(blobInfo -> {
136+
Path blobFile = Paths.get(frameworkBlobCacheDir.toAbsolutePath().toString(), blobInfo.digest);
137+
// only enqueue a download if blob is not already cached, and haven't already enqueued a download
138+
if (!Files.exists(blobFile) && futureMap.get(blobInfo.digest) == null) {
139+
GetObjectRequest getObjectRequest = GetObjectRequest.builder()
140+
.key(toPathedKey(blobInfo.digest))
141+
.bucket(bucketName)
142+
.build();
143+
DownloadFileRequest downloadFileRequest = DownloadFileRequest.builder()
144+
.destination(blobFile)
145+
.getObjectRequest(getObjectRequest)
146+
.build();
147+
FileDownload download = transferManager.downloadFile(downloadFileRequest);
148+
futureMap.put(blobInfo, download.completionFuture());
149+
}
150+
});
151+
return futureMap;
152+
}
153+
154+
/**
155+
* Given <code>digest</code> "123456" and {@link #pathDepth} == 2, then returns
156+
* "12/34/123456".
157+
*
158+
* @param digest an MD5 digest
159+
* @return the digest with path segments optionally prepended
160+
*/
161+
private String toPathedKey(String digest) {
162+
if (pathDepth == 0) {
163+
return digest;
164+
}
165+
StringBuilder builder = new StringBuilder();
166+
for (int i = 0; i < pathDepth; i++) {
167+
int idx = i * 2;
168+
builder.append(digest, idx, idx + 2).append("/");
169+
}
170+
return builder.append(digest).toString();
171+
}
172+
}

addons/nuxeo-core-binarymanager-cloud/nuxeo-core-binarymanager-s3/src/test/java/org/nuxeo/ecm/blob/s3/S3BlobProviderFeature.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,14 @@ public class S3BlobProviderFeature implements RunnerFeature {
128128
@Override
129129
@SuppressWarnings("unchecked")
130130
public void start(FeaturesRunner runner) {
131+
String awsId = configureProperty(AWS_ID, constant("admin"));
132+
String awsSecret = configureProperty(AWS_SECRET, constant("password"));
133+
131134
// configure global blob provider properties
132-
String awsId = configureProperty(AWS_ID, sysEnv(ACCESS_KEY_ENV_VAR), sysEnv(ALTERNATE_ACCESS_KEY_ENV_VAR),
133-
sysProp(AWS_ID));
134-
String awsSecret = configureProperty(AWS_SECRET, sysEnv(SECRET_KEY_ENV_VAR),
135-
sysEnv(ALTERNATE_SECRET_KEY_ENV_VAR), sysProp(AWS_SECRET));
135+
// String awsId = configureProperty(AWS_ID, sysEnv(ACCESS_KEY_ENV_VAR), sysEnv(ALTERNATE_ACCESS_KEY_ENV_VAR),
136+
// sysProp(AWS_ID));
137+
// String awsSecret = configureProperty(AWS_SECRET, sysEnv(SECRET_KEY_ENV_VAR),
138+
// sysEnv(ALTERNATE_SECRET_KEY_ENV_VAR), sysProp(AWS_SECRET));
136139
configureProperty(AWS_SESSION_TOKEN, sysEnv(AWS_SESSION_TOKEN_ENV_VAR), sysProp(AWS_SESSION_TOKEN));
137140
configureProperty(BUCKET_REGION, sysEnv(AWS_REGION_ENV_VAR), sysProp(BUCKET_REGION));
138141
// configure specific blob provider properties
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* (C) Copyright 2021 Nuxeo (http://nuxeo.com/) and others.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* Contributors:
17+
* Kevin Leturc <[email protected]>
18+
*/
19+
package org.nuxeo.ecm.blob.s3v2;
20+
21+
import static org.junit.Assert.assertEquals;
22+
23+
import java.io.IOException;
24+
import java.io.Serializable;
25+
import java.time.Clock;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
29+
import javax.inject.Inject;
30+
31+
import org.junit.Test;
32+
import org.junit.runner.RunWith;
33+
import org.nuxeo.ecm.blob.s3.S3BlobProviderFeature;
34+
import org.nuxeo.ecm.core.api.Blobs;
35+
import org.nuxeo.ecm.core.api.CoreSession;
36+
import org.nuxeo.ecm.core.api.DocumentModel;
37+
import org.nuxeo.ecm.core.blob.*;
38+
import org.nuxeo.ecm.core.test.CoreFeature;
39+
import org.nuxeo.runtime.api.Framework;
40+
import org.nuxeo.runtime.test.runner.Features;
41+
import org.nuxeo.runtime.test.runner.FeaturesRunner;
42+
import org.nuxeo.runtime.test.runner.TransactionalFeature;
43+
44+
@RunWith(FeaturesRunner.class)
45+
@Features({ CoreFeature.class, S3BlobProviderFeature.class })
46+
public class TestS3V2BlobProviderWithDocument {
47+
48+
@Inject
49+
protected TransactionalFeature txFeature;
50+
51+
@Inject
52+
protected CoreSession session;
53+
54+
@Test
55+
public void testStoreToBlobProviderTestv2() throws IOException {
56+
String blobProviderId = "test";
57+
int docCount = 2000;
58+
List<BlobInfo> blobInfos = new ArrayList<>(docCount);
59+
for (int i = 1; i <= docCount; i++) {
60+
String blobContent = "A simple blob " + Integer.toString(i);
61+
DocumentModel doc = session.createDocumentModel("/", "document", S3BlobProviderFeature.S3_DOC_TYPE);
62+
doc.setPropertyValue(String.format("s3-%s:content", blobProviderId),
63+
(Serializable) Blobs.createBlob(blobContent));
64+
doc = session.createDocument(doc);
65+
ManagedBlob blob = (ManagedBlob) doc.getPropertyValue(String.format("s3-%s:content", blobProviderId));
66+
String digest = blob.getDigest(); // digest is the key
67+
BlobInfo blobInfo = new BlobInfo();
68+
blobInfo.digest = digest;
69+
blobInfos.add(blobInfo);
70+
if (i % (docCount / 10) == 0) {
71+
txFeature.nextTransaction(); // avoid an error in FulltextExtractorWork -> ManagedBlobn
72+
}
73+
}
74+
75+
S3V2BlobProvider blobProvider = (S3V2BlobProvider) Framework.getService(BlobManager.class)
76+
.getBlobProvider("testv2");
77+
Clock clock = Clock.systemDefaultZone();
78+
long millis = clock.millis();
79+
blobProvider.readBlobs(blobInfos);
80+
System.out.printf("Time in milliseconds to read %s blobs: %d", docCount, clock.millis() - millis);
81+
}
82+
}

0 commit comments

Comments
 (0)