1
1
import {
2
+ AbortMultipartUploadCommand ,
2
3
CompletedPart ,
3
4
CompleteMultipartUploadCommand ,
4
5
CompleteMultipartUploadCommandOutput ,
@@ -36,18 +37,18 @@ const MIN_PART_SIZE = 1024 * 1024 * 5;
36
37
37
38
export class Upload extends EventEmitter {
38
39
/**
39
- * S3 multipart upload does not allow more than 10000 parts.
40
+ * S3 multipart upload does not allow more than 10,000 parts.
40
41
*/
41
- private MAX_PARTS = 10000 ;
42
+ private MAX_PARTS = 10_000 ;
42
43
43
44
// Defaults.
44
- private queueSize = 4 ;
45
- private partSize = MIN_PART_SIZE ;
46
- private leavePartsOnError = false ;
47
- private tags : Tag [ ] = [ ] ;
45
+ private readonly queueSize : number = 4 ;
46
+ private readonly partSize = MIN_PART_SIZE ;
47
+ private readonly leavePartsOnError : boolean = false ;
48
+ private readonly tags : Tag [ ] = [ ] ;
48
49
49
- private client : S3Client ;
50
- private params : PutObjectCommandInput ;
50
+ private readonly client : S3Client ;
51
+ private readonly params : PutObjectCommandInput ;
51
52
52
53
// used for reporting progress.
53
54
private totalBytes ?: number ;
@@ -57,13 +58,19 @@ export class Upload extends EventEmitter {
57
58
private abortController : IAbortController ;
58
59
private concurrentUploaders : Promise < void > [ ] = [ ] ;
59
60
private createMultiPartPromise ?: Promise < CreateMultipartUploadCommandOutput > ;
61
+ private abortMultipartUploadCommand : AbortMultipartUploadCommand | null = null ;
60
62
61
63
private uploadedParts : CompletedPart [ ] = [ ] ;
62
- private uploadId ?: string ;
63
- uploadEvent ?: string ;
64
+ private uploadEnqueuedPartsCount = 0 ;
65
+ /**
66
+ * Last UploadId if the upload was done with MultipartUpload and not PutObject.
67
+ */
68
+ public uploadId ?: string ;
69
+ public uploadEvent ?: string ;
64
70
65
71
private isMultiPart = true ;
66
72
private singleUploadResult ?: CompleteMultipartUploadCommandOutput ;
73
+ private sent = false ;
67
74
68
75
constructor ( options : Options ) {
69
76
super ( ) ;
@@ -94,6 +101,12 @@ export class Upload extends EventEmitter {
94
101
}
95
102
96
103
public async done ( ) : Promise < CompleteMultipartUploadCommandOutput > {
104
+ if ( this . sent ) {
105
+ throw new Error (
106
+ "@aws-sdk/lib-storage: this instance of Upload has already executed .done(). Create a new instance."
107
+ ) ;
108
+ }
109
+ this . sent = true ;
97
110
return await Promise . race ( [ this . __doMultipartUpload ( ) , this . __abortTimeout ( this . abortController . signal ) ] ) ;
98
111
}
99
112
@@ -184,104 +197,64 @@ export class Upload extends EventEmitter {
184
197
private async __createMultipartUpload ( ) : Promise < CreateMultipartUploadCommandOutput > {
185
198
if ( ! this . createMultiPartPromise ) {
186
199
const createCommandParams = { ...this . params , Body : undefined } ;
187
- this . createMultiPartPromise = this . client . send ( new CreateMultipartUploadCommand ( createCommandParams ) ) ;
200
+ this . createMultiPartPromise = this . client
201
+ . send ( new CreateMultipartUploadCommand ( createCommandParams ) )
202
+ . then ( ( createMpuResponse ) => {
203
+ // We use the parameter Bucket/Key rather than the information from
204
+ // createMultipartUpload response in case the Bucket is an access point arn.
205
+ this . abortMultipartUploadCommand = new AbortMultipartUploadCommand ( {
206
+ Bucket : this . params . Bucket ,
207
+ Key : this . params . Key ,
208
+ UploadId : createMpuResponse . UploadId ,
209
+ } ) ;
210
+ return createMpuResponse ;
211
+ } ) ;
188
212
}
189
213
return this . createMultiPartPromise ;
190
214
}
191
215
192
216
private async __doConcurrentUpload ( dataFeeder : AsyncGenerator < RawDataPart , void , undefined > ) : Promise < void > {
193
217
for await ( const dataPart of dataFeeder ) {
194
- if ( this . uploadedParts . length > this . MAX_PARTS ) {
218
+ if ( this . uploadEnqueuedPartsCount > this . MAX_PARTS ) {
195
219
throw new Error (
196
- `Exceeded ${ this . MAX_PARTS } as part of the upload to ${ this . params . Key } and ${ this . params . Bucket } .`
220
+ `Exceeded ${ this . MAX_PARTS } parts in multipart upload to Bucket: ${ this . params . Bucket } Key: ${ this . params . Key } .`
197
221
) ;
198
222
}
199
223
200
- try {
201
- if ( this . abortController . signal . aborted ) {
202
- return ;
203
- }
224
+ if ( this . abortController . signal . aborted ) {
225
+ return ;
226
+ }
204
227
205
- // Use put instead of multi-part for one chunk uploads.
206
- if ( dataPart . partNumber === 1 && dataPart . lastPart ) {
207
- return await this . __uploadUsingPut ( dataPart ) ;
208
- }
228
+ // Use put instead of multipart for one chunk uploads.
229
+ if ( dataPart . partNumber === 1 && dataPart . lastPart ) {
230
+ return await this . __uploadUsingPut ( dataPart ) ;
231
+ }
209
232
210
- if ( ! this . uploadId ) {
211
- const { UploadId } = await this . __createMultipartUpload ( ) ;
212
- this . uploadId = UploadId ;
213
- if ( this . abortController . signal . aborted ) {
214
- return ;
215
- }
233
+ if ( ! this . uploadId ) {
234
+ const { UploadId } = await this . __createMultipartUpload ( ) ;
235
+ this . uploadId = UploadId ;
236
+ if ( this . abortController . signal . aborted ) {
237
+ return ;
216
238
}
239
+ }
217
240
218
- const partSize : number = byteLength ( dataPart . data ) || 0 ;
219
-
220
- const requestHandler = this . client . config . requestHandler ;
221
- const eventEmitter : EventEmitter | null = requestHandler instanceof EventEmitter ? requestHandler : null ;
222
-
223
- let lastSeenBytes = 0 ;
224
- const uploadEventListener = ( event : ProgressEvent , request : HttpRequest ) => {
225
- const requestPartSize = Number ( request . query [ "partNumber" ] ) || - 1 ;
226
-
227
- if ( requestPartSize !== dataPart . partNumber ) {
228
- // ignored, because the emitted event is not for this part.
229
- return ;
230
- }
231
-
232
- if ( event . total && partSize ) {
233
- this . bytesUploadedSoFar += event . loaded - lastSeenBytes ;
234
- lastSeenBytes = event . loaded ;
235
- }
236
-
237
- this . __notifyProgress ( {
238
- loaded : this . bytesUploadedSoFar ,
239
- total : this . totalBytes ,
240
- part : dataPart . partNumber ,
241
- Key : this . params . Key ,
242
- Bucket : this . params . Bucket ,
243
- } ) ;
244
- } ;
241
+ const partSize : number = byteLength ( dataPart . data ) || 0 ;
245
242
246
- if ( eventEmitter !== null ) {
247
- // The requestHandler is the xhr-http-handler.
248
- eventEmitter . on ( "xhr.upload.progress" , uploadEventListener ) ;
249
- }
243
+ const requestHandler = this . client . config . requestHandler ;
244
+ const eventEmitter : EventEmitter | null = requestHandler instanceof EventEmitter ? requestHandler : null ;
250
245
251
- const partResult = await this . client . send (
252
- new UploadPartCommand ( {
253
- ...this . params ,
254
- UploadId : this . uploadId ,
255
- Body : dataPart . data ,
256
- PartNumber : dataPart . partNumber ,
257
- } )
258
- ) ;
246
+ let lastSeenBytes = 0 ;
247
+ const uploadEventListener = ( event : ProgressEvent , request : HttpRequest ) => {
248
+ const requestPartSize = Number ( request . query [ "partNumber" ] ) || - 1 ;
259
249
260
- if ( eventEmitter !== null ) {
261
- eventEmitter . off ( "xhr.upload.progress" , uploadEventListener ) ;
262
- }
263
-
264
- if ( this . abortController . signal . aborted ) {
250
+ if ( requestPartSize !== dataPart . partNumber ) {
251
+ // ignored, because the emitted event is not for this part.
265
252
return ;
266
253
}
267
254
268
- if ( ! partResult . ETag ) {
269
- throw new Error (
270
- `Part ${ dataPart . partNumber } is missing ETag in UploadPart response. Missing Bucket CORS configuration for ETag header?`
271
- ) ;
272
- }
273
-
274
- this . uploadedParts . push ( {
275
- PartNumber : dataPart . partNumber ,
276
- ETag : partResult . ETag ,
277
- ...( partResult . ChecksumCRC32 && { ChecksumCRC32 : partResult . ChecksumCRC32 } ) ,
278
- ...( partResult . ChecksumCRC32C && { ChecksumCRC32C : partResult . ChecksumCRC32C } ) ,
279
- ...( partResult . ChecksumSHA1 && { ChecksumSHA1 : partResult . ChecksumSHA1 } ) ,
280
- ...( partResult . ChecksumSHA256 && { ChecksumSHA256 : partResult . ChecksumSHA256 } ) ,
281
- } ) ;
282
-
283
- if ( eventEmitter === null ) {
284
- this . bytesUploadedSoFar += partSize ;
255
+ if ( event . total && partSize ) {
256
+ this . bytesUploadedSoFar += event . loaded - lastSeenBytes ;
257
+ lastSeenBytes = event . loaded ;
285
258
}
286
259
287
260
this . __notifyProgress ( {
@@ -291,33 +264,89 @@ export class Upload extends EventEmitter {
291
264
Key : this . params . Key ,
292
265
Bucket : this . params . Bucket ,
293
266
} ) ;
294
- } catch ( e ) {
295
- // Failed to create multi-part or put
296
- if ( ! this . uploadId ) {
297
- throw e ;
298
- }
299
- // on leavePartsOnError throw an error so users can deal with it themselves,
300
- // otherwise swallow the error.
301
- if ( this . leavePartsOnError ) {
302
- throw e ;
303
- }
267
+ } ;
268
+
269
+ if ( eventEmitter !== null ) {
270
+ // The requestHandler is the xhr-http-handler.
271
+ eventEmitter . on ( "xhr.upload.progress" , uploadEventListener ) ;
272
+ }
273
+
274
+ this . uploadEnqueuedPartsCount += 1 ;
275
+
276
+ const partResult = await this . client . send (
277
+ new UploadPartCommand ( {
278
+ ...this . params ,
279
+ UploadId : this . uploadId ,
280
+ Body : dataPart . data ,
281
+ PartNumber : dataPart . partNumber ,
282
+ } )
283
+ ) ;
284
+
285
+ if ( eventEmitter !== null ) {
286
+ eventEmitter . off ( "xhr.upload.progress" , uploadEventListener ) ;
304
287
}
288
+
289
+ if ( this . abortController . signal . aborted ) {
290
+ return ;
291
+ }
292
+
293
+ if ( ! partResult . ETag ) {
294
+ throw new Error (
295
+ `Part ${ dataPart . partNumber } is missing ETag in UploadPart response. Missing Bucket CORS configuration for ETag header?`
296
+ ) ;
297
+ }
298
+
299
+ this . uploadedParts . push ( {
300
+ PartNumber : dataPart . partNumber ,
301
+ ETag : partResult . ETag ,
302
+ ...( partResult . ChecksumCRC32 && { ChecksumCRC32 : partResult . ChecksumCRC32 } ) ,
303
+ ...( partResult . ChecksumCRC32C && { ChecksumCRC32C : partResult . ChecksumCRC32C } ) ,
304
+ ...( partResult . ChecksumSHA1 && { ChecksumSHA1 : partResult . ChecksumSHA1 } ) ,
305
+ ...( partResult . ChecksumSHA256 && { ChecksumSHA256 : partResult . ChecksumSHA256 } ) ,
306
+ } ) ;
307
+
308
+ if ( eventEmitter === null ) {
309
+ this . bytesUploadedSoFar += partSize ;
310
+ }
311
+
312
+ this . __notifyProgress ( {
313
+ loaded : this . bytesUploadedSoFar ,
314
+ total : this . totalBytes ,
315
+ part : dataPart . partNumber ,
316
+ Key : this . params . Key ,
317
+ Bucket : this . params . Bucket ,
318
+ } ) ;
305
319
}
306
320
}
307
321
308
322
private async __doMultipartUpload ( ) : Promise < CompleteMultipartUploadCommandOutput > {
309
- // Set up data input chunks.
310
323
const dataFeeder = getChunk ( this . params . Body , this . partSize ) ;
324
+ const concurrentUploaderFailures : Error [ ] = [ ] ;
311
325
312
- // Create and start concurrent uploads.
313
326
for ( let index = 0 ; index < this . queueSize ; index ++ ) {
314
- const currentUpload = this . __doConcurrentUpload ( dataFeeder ) ;
327
+ const currentUpload = this . __doConcurrentUpload ( dataFeeder ) . catch ( ( err ) => {
328
+ concurrentUploaderFailures . push ( err ) ;
329
+ } ) ;
315
330
this . concurrentUploaders . push ( currentUpload ) ;
316
331
}
317
332
318
- // Create and start concurrent uploads.
319
333
await Promise . all ( this . concurrentUploaders ) ;
334
+ if ( concurrentUploaderFailures . length >= 1 ) {
335
+ await this . markUploadAsAborted ( ) ;
336
+ /**
337
+ * Previously, each promise in concurrentUploaders could potentially throw
338
+ * and immediately return control to user code. However, we want to wait for
339
+ * all uploaders to finish before calling AbortMultipartUpload to avoid
340
+ * stranding uploaded parts.
341
+ *
342
+ * We throw only the first error to be consistent with prior behavior,
343
+ * but may consider combining the errors into a report in the future.
344
+ */
345
+ throw concurrentUploaderFailures [ 0 ] ;
346
+ }
347
+
320
348
if ( this . abortController . signal . aborted ) {
349
+ await this . markUploadAsAborted ( ) ;
321
350
throw Object . assign ( new Error ( "Upload aborted." ) , { name : "AbortError" } ) ;
322
351
}
323
352
@@ -341,6 +370,8 @@ export class Upload extends EventEmitter {
341
370
result = this . singleUploadResult ! ;
342
371
}
343
372
373
+ this . abortMultipartUploadCommand = null ;
374
+
344
375
// Add tags to the object after it's completed the upload.
345
376
if ( this . tags . length ) {
346
377
await this . client . send (
@@ -356,6 +387,18 @@ export class Upload extends EventEmitter {
356
387
return result ;
357
388
}
358
389
390
+ /**
391
+ * Abort the last multipart upload in progress
392
+ * if we know the upload id, the user did not specify to leave the parts, and
393
+ * we have a prepared AbortMultipartUpload command.
394
+ */
395
+ private async markUploadAsAborted ( ) : Promise < void > {
396
+ if ( this . uploadId && ! this . leavePartsOnError && null !== this . abortMultipartUploadCommand ) {
397
+ await this . client . send ( this . abortMultipartUploadCommand ) ;
398
+ this . abortMultipartUploadCommand = null ;
399
+ }
400
+ }
401
+
359
402
private __notifyProgress ( progress : Progress ) : void {
360
403
if ( this . uploadEvent ) {
361
404
this . emit ( this . uploadEvent , progress ) ;
0 commit comments