Skip to content

Commit 6c30280

Browse files
Implement AppendObject() API (#2082)
1 parent fb4bc4c commit 6c30280

8 files changed

+276
-11
lines changed

api-append-object.go

+226
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*
2+
* MinIO Go Library for Amazon S3 Compatible Cloud Storage
3+
* Copyright 2015-2025 MinIO, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package minio
19+
20+
import (
21+
"bytes"
22+
"context"
23+
"errors"
24+
"fmt"
25+
"io"
26+
"net/http"
27+
"strconv"
28+
29+
"github.com/minio/minio-go/v7/pkg/s3utils"
30+
)
31+
32+
// AppendObjectOptions https://docs.aws.amazon.com/AmazonS3/latest/userguide/directory-buckets-objects-append.html
33+
type AppendObjectOptions struct {
34+
// Provide a progress reader to indicate the current append() progress.
35+
Progress io.Reader
36+
// ChunkSize indicates the maximum append() size,
37+
// it is useful when you want to control how much data
38+
// per append() you are interested in sending to server
39+
// while keeping the input io.Reader of a longer length.
40+
ChunkSize uint64
41+
// Aggressively disable sha256 payload, it is automatically
42+
// turned-off for TLS supporting endpoints, useful in benchmarks
43+
// where you are interested in the peak() numbers.
44+
DisableContentSha256 bool
45+
46+
customHeaders http.Header
47+
checksumType ChecksumType
48+
}
49+
50+
// Header returns the custom header for AppendObject API
51+
func (opts AppendObjectOptions) Header() (header http.Header) {
52+
header = make(http.Header)
53+
for k, v := range opts.customHeaders {
54+
header[k] = v
55+
}
56+
return header
57+
}
58+
59+
func (opts *AppendObjectOptions) setWriteOffset(offset int64) {
60+
if len(opts.customHeaders) == 0 {
61+
opts.customHeaders = make(http.Header)
62+
}
63+
opts.customHeaders["x-amz-write-offset-bytes"] = []string{strconv.FormatInt(offset, 10)}
64+
}
65+
66+
func (opts *AppendObjectOptions) setChecksumParams(info ObjectInfo) {
67+
if len(opts.customHeaders) == 0 {
68+
opts.customHeaders = make(http.Header)
69+
}
70+
fullObject := info.ChecksumMode == ChecksumFullObjectMode.String()
71+
switch {
72+
case info.ChecksumCRC32 != "":
73+
if fullObject {
74+
opts.checksumType = ChecksumFullObjectCRC32
75+
}
76+
case info.ChecksumCRC32C != "":
77+
if fullObject {
78+
opts.checksumType = ChecksumFullObjectCRC32C
79+
}
80+
case info.ChecksumCRC64NVME != "":
81+
// CRC64NVME only has a full object variant
82+
// so it does not carry any special full object
83+
// modifier
84+
opts.checksumType = ChecksumCRC64NVME
85+
}
86+
}
87+
88+
func (opts AppendObjectOptions) validate(c *Client) (err error) {
89+
if opts.ChunkSize > maxPartSize {
90+
return errInvalidArgument("Append chunkSize cannot be larger than max part size allowed")
91+
}
92+
switch {
93+
case !c.trailingHeaderSupport:
94+
return errInvalidArgument("AppendObject() requires Client with TrailingHeaders enabled")
95+
case c.overrideSignerType.IsV2():
96+
return errInvalidArgument("AppendObject() cannot be used with v2 signatures")
97+
case s3utils.IsGoogleEndpoint(*c.endpointURL):
98+
return errInvalidArgument("AppendObject() cannot be used with GCS endpoints")
99+
}
100+
101+
return nil
102+
}
103+
104+
// appendObjectDo - executes the append object http operation.
105+
// NOTE: You must have WRITE permissions on a bucket to add an object to it.
106+
func (c *Client) appendObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts AppendObjectOptions) (UploadInfo, error) {
107+
// Input validation.
108+
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
109+
return UploadInfo{}, err
110+
}
111+
if err := s3utils.CheckValidObjectName(objectName); err != nil {
112+
return UploadInfo{}, err
113+
}
114+
115+
// Set headers.
116+
customHeader := opts.Header()
117+
118+
// Populate request metadata.
119+
reqMetadata := requestMetadata{
120+
bucketName: bucketName,
121+
objectName: objectName,
122+
customHeader: customHeader,
123+
contentBody: reader,
124+
contentLength: size,
125+
streamSha256: !opts.DisableContentSha256,
126+
}
127+
128+
if opts.checksumType.IsSet() {
129+
reqMetadata.addCrc = &opts.checksumType
130+
}
131+
132+
// Execute PUT an objectName.
133+
resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
134+
defer closeResponse(resp)
135+
if err != nil {
136+
return UploadInfo{}, err
137+
}
138+
if resp != nil {
139+
if resp.StatusCode != http.StatusOK {
140+
return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
141+
}
142+
}
143+
144+
h := resp.Header
145+
146+
// When AppendObject() is used, S3 Express will return final object size as x-amz-object-size
147+
if amzSize := h.Get("x-amz-object-size"); amzSize != "" {
148+
size, err = strconv.ParseInt(amzSize, 10, 64)
149+
if err != nil {
150+
return UploadInfo{}, err
151+
}
152+
}
153+
154+
return UploadInfo{
155+
Bucket: bucketName,
156+
Key: objectName,
157+
ETag: trimEtag(h.Get("ETag")),
158+
Size: size,
159+
160+
// Checksum values
161+
ChecksumCRC32: h.Get(ChecksumCRC32.Key()),
162+
ChecksumCRC32C: h.Get(ChecksumCRC32C.Key()),
163+
ChecksumSHA1: h.Get(ChecksumSHA1.Key()),
164+
ChecksumSHA256: h.Get(ChecksumSHA256.Key()),
165+
ChecksumCRC64NVME: h.Get(ChecksumCRC64NVME.Key()),
166+
ChecksumMode: h.Get(ChecksumFullObjectMode.Key()),
167+
}, nil
168+
}
169+
170+
// AppendObject - S3 Express Zone https://docs.aws.amazon.com/AmazonS3/latest/userguide/directory-buckets-objects-append.html
171+
func (c *Client) AppendObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64,
172+
opts AppendObjectOptions,
173+
) (info UploadInfo, err error) {
174+
if objectSize < 0 && opts.ChunkSize == 0 {
175+
return UploadInfo{}, errors.New("object size must be provided when no chunk size is provided")
176+
}
177+
178+
if err = opts.validate(c); err != nil {
179+
return UploadInfo{}, err
180+
}
181+
182+
oinfo, err := c.StatObject(ctx, bucketName, objectName, StatObjectOptions{Checksum: true})
183+
if err != nil {
184+
return UploadInfo{}, err
185+
}
186+
if oinfo.ChecksumMode != ChecksumFullObjectMode.String() {
187+
return UploadInfo{}, fmt.Errorf("append API is not allowed on objects that are not full_object checksum type: %s", oinfo.ChecksumMode)
188+
}
189+
opts.setChecksumParams(oinfo) // set the appropriate checksum params based on the existing object checksum metadata.
190+
opts.setWriteOffset(oinfo.Size) // First append must set the current object size as the offset.
191+
192+
if opts.ChunkSize > 0 {
193+
finalObjSize := int64(-1)
194+
if objectSize > 0 {
195+
finalObjSize = info.Size + objectSize
196+
}
197+
totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(finalObjSize, opts.ChunkSize)
198+
if err != nil {
199+
return UploadInfo{}, err
200+
}
201+
buf := make([]byte, partSize)
202+
var partNumber int
203+
for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
204+
// Proceed to upload the part.
205+
if partNumber == totalPartsCount {
206+
partSize = lastPartSize
207+
}
208+
n, err := readFull(reader, buf)
209+
if err != nil {
210+
return info, err
211+
}
212+
if n != int(partSize) {
213+
return info, io.ErrUnexpectedEOF
214+
}
215+
rd := newHook(bytes.NewReader(buf[:n]), opts.Progress)
216+
uinfo, err := c.appendObjectDo(ctx, bucketName, objectName, rd, partSize, opts)
217+
if err != nil {
218+
return info, err
219+
}
220+
opts.setWriteOffset(uinfo.Size)
221+
}
222+
}
223+
224+
rd := newHook(reader, opts.Progress)
225+
return c.appendObjectDo(ctx, bucketName, objectName, rd, objectSize, opts)
226+
}

api-datatypes.go

+2
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ type UploadInfo struct {
148148
ChecksumSHA1 string
149149
ChecksumSHA256 string
150150
ChecksumCRC64NVME string
151+
ChecksumMode string
151152
}
152153

153154
// RestoreInfo contains information of the restore operation of an archived object
@@ -223,6 +224,7 @@ type ObjectInfo struct {
223224
ChecksumSHA1 string
224225
ChecksumSHA256 string
225226
ChecksumCRC64NVME string
227+
ChecksumMode string
226228

227229
Internal *struct {
228230
K int // Data blocks

api-put-object-multipart.go

+1
Original file line numberDiff line numberDiff line change
@@ -457,5 +457,6 @@ func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, object
457457
ChecksumCRC32: completeMultipartUploadResult.ChecksumCRC32,
458458
ChecksumCRC32C: completeMultipartUploadResult.ChecksumCRC32C,
459459
ChecksumCRC64NVME: completeMultipartUploadResult.ChecksumCRC64NVME,
460+
ChecksumMode: completeMultipartUploadResult.ChecksumType,
460461
}, nil
461462
}

api-put-object-streaming.go

+1
Original file line numberDiff line numberDiff line change
@@ -805,5 +805,6 @@ func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string,
805805
ChecksumSHA1: h.Get(ChecksumSHA1.Key()),
806806
ChecksumSHA256: h.Get(ChecksumSHA256.Key()),
807807
ChecksumCRC64NVME: h.Get(ChecksumCRC64NVME.Key()),
808+
ChecksumMode: h.Get(ChecksumFullObjectMode.Key()),
808809
}, nil
809810
}

api-s3-datatypes.go

+1
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ type completeMultipartUploadResult struct {
366366
ChecksumSHA1 string
367367
ChecksumSHA256 string
368368
ChecksumCRC64NVME string
369+
ChecksumType string
369370
}
370371

371372
// CompletePart sub container lists individual part numbers and their

checksum.go

+43-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,43 @@ import (
3434
"github.com/minio/crc64nvme"
3535
)
3636

37+
// ChecksumMode contains information about the checksum mode on the object
38+
type ChecksumMode uint32
39+
40+
const (
41+
// ChecksumFullObjectMode Full object checksum `csumCombine(csum1, csum2...)...), csumN...)`
42+
ChecksumFullObjectMode ChecksumMode = 1 << iota
43+
44+
// ChecksumCompositeMode Composite checksum `csum([csum1 + csum2 ... + csumN])`
45+
ChecksumCompositeMode
46+
47+
// Keep after all valid checksums
48+
checksumLastMode
49+
50+
// checksumModeMask is a mask for valid checksum mode types.
51+
checksumModeMask = checksumLastMode - 1
52+
)
53+
54+
// Is returns if c is all of t.
55+
func (c ChecksumMode) Is(t ChecksumMode) bool {
56+
return c&t == t
57+
}
58+
59+
// Key returns the header key.
60+
func (c ChecksumMode) Key() string {
61+
return amzChecksumMode
62+
}
63+
64+
func (c ChecksumMode) String() string {
65+
switch c & checksumModeMask {
66+
case ChecksumFullObjectMode:
67+
return "FULL_OBJECT"
68+
case ChecksumCompositeMode:
69+
return "COMPOSITE"
70+
}
71+
return ""
72+
}
73+
3774
// ChecksumType contains information about the checksum type.
3875
type ChecksumType uint32
3976

@@ -75,6 +112,7 @@ const (
75112
amzChecksumSHA1 = "x-amz-checksum-sha1"
76113
amzChecksumSHA256 = "x-amz-checksum-sha256"
77114
amzChecksumCRC64NVME = "x-amz-checksum-crc64nvme"
115+
amzChecksumMode = "x-amz-checksum-type"
78116
)
79117

80118
// Base returns the base type, without modifiers.
@@ -397,7 +435,7 @@ func addAutoChecksumHeaders(opts *PutObjectOptions) {
397435
}
398436
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
399437
if opts.AutoChecksum.FullObjectRequested() {
400-
opts.UserMetadata["X-Amz-Checksum-Type"] = "FULL_OBJECT"
438+
opts.UserMetadata[amzChecksumMode] = ChecksumFullObjectMode.String()
401439
}
402440
}
403441

@@ -414,7 +452,10 @@ func applyAutoChecksum(opts *PutObjectOptions, allParts []ObjectPart) {
414452
} else if opts.AutoChecksum.CanMergeCRC() {
415453
crc, err := opts.AutoChecksum.FullObjectChecksum(allParts)
416454
if err == nil {
417-
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): crc.Encoded(), "X-Amz-Checksum-Type": "FULL_OBJECT"}
455+
opts.UserMetadata = map[string]string{
456+
opts.AutoChecksum.KeyCapitalized(): crc.Encoded(),
457+
amzChecksumMode: ChecksumFullObjectMode.String(),
458+
}
418459
}
419460
}
420461
}

hook-reader.go

+1-9
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,20 @@ package minio
2020
import (
2121
"fmt"
2222
"io"
23-
"sync"
2423
)
2524

2625
// hookReader hooks additional reader in the source stream. It is
2726
// useful for making progress bars. Second reader is appropriately
2827
// notified about the exact number of bytes read from the primary
2928
// source on each Read operation.
3029
type hookReader struct {
31-
mu sync.RWMutex
3230
source io.Reader
3331
hook io.Reader
3432
}
3533

3634
// Seek implements io.Seeker. Seeks source first, and if necessary
3735
// seeks hook if Seek method is appropriately found.
3836
func (hr *hookReader) Seek(offset int64, whence int) (n int64, err error) {
39-
hr.mu.Lock()
40-
defer hr.mu.Unlock()
41-
4237
// Verify for source has embedded Seeker, use it.
4338
sourceSeeker, ok := hr.source.(io.Seeker)
4439
if ok {
@@ -70,9 +65,6 @@ func (hr *hookReader) Seek(offset int64, whence int) (n int64, err error) {
7065
// value 'n' number of bytes are reported through the hook. Returns
7166
// error for all non io.EOF conditions.
7267
func (hr *hookReader) Read(b []byte) (n int, err error) {
73-
hr.mu.RLock()
74-
defer hr.mu.RUnlock()
75-
7668
n, err = hr.source.Read(b)
7769
if err != nil && err != io.EOF {
7870
return n, err
@@ -92,7 +84,7 @@ func (hr *hookReader) Read(b []byte) (n int, err error) {
9284
// reports the data read from the source to the hook.
9385
func newHook(source, hook io.Reader) io.Reader {
9486
if hook == nil {
95-
return &hookReader{source: source}
87+
return source
9688
}
9789
return &hookReader{
9890
source: source,

utils.go

+1
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ func ToObjectInfo(bucketName, objectName string, h http.Header) (ObjectInfo, err
390390
ChecksumSHA1: h.Get(ChecksumSHA1.Key()),
391391
ChecksumSHA256: h.Get(ChecksumSHA256.Key()),
392392
ChecksumCRC64NVME: h.Get(ChecksumCRC64NVME.Key()),
393+
ChecksumMode: h.Get(ChecksumFullObjectMode.Key()),
393394
}, nil
394395
}
395396

0 commit comments

Comments
 (0)