package s3manager import ( "bytes" "fmt" "io" "sort" "sync" "time" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awsutil" "github.com/aws/aws-sdk-go/service/s3" ) // The maximum allowed number of parts in a multi-part upload on Amazon S3. var MaxUploadParts = 10000 // The minimum allowed part size when uploading a part to Amazon S3. var MinUploadPartSize int64 = 1024 * 1024 * 5 // The default part size to buffer chunks of a payload into. var DefaultUploadPartSize = MinUploadPartSize // The default number of goroutines to spin up when using Upload(). var DefaultUploadConcurrency = 5 // The default set of options used when opts is nil in Upload(). var DefaultUploadOptions = &UploadOptions{ PartSize: DefaultUploadPartSize, Concurrency: DefaultUploadConcurrency, LeavePartsOnError: false, S3: nil, } // A MultiUploadFailure wraps a failed S3 multipart upload. An error returned // will satisfy this interface when a multi part upload failed to upload all // chucks to S3. In the case of a failure the UploadID is needed to operate on // the chunks, if any, which were uploaded. // // Example: // // u := s3manager.NewUploader(opts) // output, err := u.upload(input) // if err != nil { // if multierr, ok := err.(MultiUploadFailure); ok { // // Process error and its associated uploadID // fmt.Println("Error:", multierr.Code(), multierr.Message(), multierr.UploadID()) // } else { // // Process error generically // fmt.Println("Error:", err.Error()) // } // } // type MultiUploadFailure interface { awserr.Error // Returns the upload id for the S3 multipart upload that failed. UploadID() string } // So that the Error interface type can be included as an anonymous field // in the multiUploadError struct and not conflict with the error.Error() method. type awsError awserr.Error // A multiUploadError wraps the upload ID of a failed s3 multipart upload. // Composed of BaseError for code, message, and original error // // Should be used for an error that occurred failing a S3 multipart upload, // and a upload ID is available. If an uploadID is not available a more relevant type multiUploadError struct { awsError // ID for multipart upload which failed. uploadID string } // Error returns the string representation of the error. // // See apierr.BaseError ErrorWithExtra for output format // // Satisfies the error interface. func (m multiUploadError) Error() string { extra := fmt.Sprintf("upload id: %s", m.uploadID) return awserr.SprintError(m.Code(), m.Message(), extra, m.OrigErr()) } // String returns the string representation of the error. // Alias for Error to satisfy the stringer interface. func (m multiUploadError) String() string { return m.Error() } // UploadID returns the id of the S3 upload which failed. func (m multiUploadError) UploadID() string { return m.uploadID } // UploadInput contains all input for upload requests to Amazon S3. type UploadInput struct { // The canned ACL to apply to the object. ACL *string `location:"header" locationName:"x-amz-acl" type:"string"` Bucket *string `location:"uri" locationName:"Bucket" type:"string" required:"true"` // Specifies caching behavior along the request/reply chain. CacheControl *string `location:"header" locationName:"Cache-Control" type:"string"` // Specifies presentational information for the object. ContentDisposition *string `location:"header" locationName:"Content-Disposition" type:"string"` // Specifies what content encodings have been applied to the object and thus // what decoding mechanisms must be applied to obtain the media-type referenced // by the Content-Type header field. ContentEncoding *string `location:"header" locationName:"Content-Encoding" type:"string"` // The language the content is in. ContentLanguage *string `location:"header" locationName:"Content-Language" type:"string"` // A standard MIME type describing the format of the object data. ContentType *string `location:"header" locationName:"Content-Type" type:"string"` // The date and time at which the object is no longer cacheable. Expires *time.Time `location:"header" locationName:"Expires" type:"timestamp" timestampFormat:"rfc822"` // Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object. GrantFullControl *string `location:"header" locationName:"x-amz-grant-full-control" type:"string"` // Allows grantee to read the object data and its metadata. GrantRead *string `location:"header" locationName:"x-amz-grant-read" type:"string"` // Allows grantee to read the object ACL. GrantReadACP *string `location:"header" locationName:"x-amz-grant-read-acp" type:"string"` // Allows grantee to write the ACL for the applicable object. GrantWriteACP *string `location:"header" locationName:"x-amz-grant-write-acp" type:"string"` Key *string `location:"uri" locationName:"Key" type:"string" required:"true"` // A map of metadata to store with the object in S3. Metadata map[string]*string `location:"headers" locationName:"x-amz-meta-" type:"map"` // Confirms that the requester knows that she or he will be charged for the // request. Bucket owners need not specify this parameter in their requests. // Documentation on downloading objects from requester pays buckets can be found // at http://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectsinRequesterPaysBuckets.html RequestPayer *string `location:"header" locationName:"x-amz-request-payer" type:"string"` // Specifies the algorithm to use to when encrypting the object (e.g., AES256, // aws:kms). SSECustomerAlgorithm *string `location:"header" locationName:"x-amz-server-side-encryption-customer-algorithm" type:"string"` // Specifies the customer-provided encryption key for Amazon S3 to use in encrypting // data. This value is used to store the object and then it is discarded; Amazon // does not store the encryption key. The key must be appropriate for use with // the algorithm specified in the x-amz-server-side​-encryption​-customer-algorithm // header. SSECustomerKey *string `location:"header" locationName:"x-amz-server-side-encryption-customer-key" type:"string"` // Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321. // Amazon S3 uses this header for a message integrity check to ensure the encryption // key was transmitted without error. SSECustomerKeyMD5 *string `location:"header" locationName:"x-amz-server-side-encryption-customer-key-MD5" type:"string"` // Specifies the AWS KMS key ID to use for object encryption. All GET and PUT // requests for an object protected by AWS KMS will fail if not made via SSL // or using SigV4. Documentation on configuring any of the officially supported // AWS SDKs and CLI can be found at http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#specify-signature-version SSEKMSKeyID *string `location:"header" locationName:"x-amz-server-side-encryption-aws-kms-key-id" type:"string"` // The Server-side encryption algorithm used when storing this object in S3 // (e.g., AES256, aws:kms). ServerSideEncryption *string `location:"header" locationName:"x-amz-server-side-encryption" type:"string"` // The type of storage to use for the object. Defaults to 'STANDARD'. StorageClass *string `location:"header" locationName:"x-amz-storage-class" type:"string"` // If the bucket is configured as a website, redirects requests for this object // to another object in the same bucket or to an external URL. Amazon S3 stores // the value of this header in the object metadata. WebsiteRedirectLocation *string `location:"header" locationName:"x-amz-website-redirect-location" type:"string"` // The readable body payload to send to S3. Body io.Reader } // UploadOutput represents a response from the Upload() call. type UploadOutput struct { // The URL where the object was uploaded to. Location string // The ID for a multipart upload to S3. In the case of an error the error // can be cast to the MultiUploadFailure interface to extract the upload ID. UploadID string } // UploadOptions keeps tracks of extra options to pass to an Upload() call. type UploadOptions struct { // The buffer size (in bytes) to use when buffering data into chunks and // sending them as parts to S3. The minimum allowed part size is 5MB, and // if this value is set to zero, the DefaultPartSize value will be used. PartSize int64 // The number of goroutines to spin up in parallel when sending parts. // If this is set to zero, the DefaultConcurrency value will be used. Concurrency int // Setting this value to true will cause the SDK to avoid calling // AbortMultipartUpload on a failure, leaving all successfully uploaded // parts on S3 for manual recovery. // // Note that storing parts of an incomplete multipart upload counts towards // space usage on S3 and will add additional costs if not cleaned up. LeavePartsOnError bool // The client to use when uploading to S3. Leave this as nil to use the // default S3 client. S3 *s3.S3 } // NewUploader creates a new Uploader object to upload data to S3. Pass in // an optional opts structure to customize the uploader behavior. func NewUploader(opts *UploadOptions) *Uploader { if opts == nil { opts = DefaultUploadOptions } return &Uploader{opts: opts} } // The Uploader structure that calls Upload(). It is safe to call Upload() // on this structure for multiple objects and across concurrent goroutines. type Uploader struct { opts *UploadOptions } // Upload uploads an object to S3, intelligently buffering large files into // smaller chunks and sending them in parallel across multiple goroutines. You // can configure the buffer size and concurrency through the opts parameter. // // If opts is set to nil, DefaultUploadOptions will be used. // // It is safe to call this method for multiple objects and across concurrent // goroutines. func (u *Uploader) Upload(input *UploadInput) (*UploadOutput, error) { i := uploader{in: input, opts: *u.opts} return i.upload() } // internal structure to manage an upload to S3. type uploader struct { in *UploadInput opts UploadOptions readerPos int64 // current reader position totalSize int64 // set to -1 if the size is not known } // internal logic for deciding whether to upload a single part or use a // multipart upload. func (u *uploader) upload() (*UploadOutput, error) { u.init() if u.opts.PartSize < MinUploadPartSize { msg := fmt.Sprintf("part size must be at least %d bytes", MinUploadPartSize) return nil, awserr.New("ConfigError", msg, nil) } // Do one read to determine if we have more than one part buf, err := u.nextReader() if err == io.EOF || err == io.ErrUnexpectedEOF { // single part return u.singlePart(buf) } else if err != nil { return nil, awserr.New("ReadRequestBody", "read upload data failed", err) } mu := multiuploader{uploader: u} return mu.upload(buf) } // init will initialize all default options. func (u *uploader) init() { if u.opts.S3 == nil { u.opts.S3 = s3.New(nil) } if u.opts.Concurrency == 0 { u.opts.Concurrency = DefaultUploadConcurrency } if u.opts.PartSize == 0 { u.opts.PartSize = DefaultUploadPartSize } // Try to get the total size for some optimizations u.initSize() } // initSize tries to detect the total stream size, setting u.totalSize. If // the size is not known, totalSize is set to -1. func (u *uploader) initSize() { u.totalSize = -1 switch r := u.in.Body.(type) { case io.Seeker: pos, _ := r.Seek(0, 1) defer r.Seek(pos, 0) n, err := r.Seek(0, 2) if err != nil { return } u.totalSize = n // try to adjust partSize if it is too small if u.totalSize/u.opts.PartSize >= int64(MaxUploadParts) { u.opts.PartSize = u.totalSize / int64(MaxUploadParts) } } } // nextReader returns a seekable reader representing the next packet of data. // This operation increases the shared u.readerPos counter, but note that it // does not need to be wrapped in a mutex because nextReader is only called // from the main thread. func (u *uploader) nextReader() (io.ReadSeeker, error) { switch r := u.in.Body.(type) { case io.ReaderAt: var err error n := u.opts.PartSize if u.totalSize >= 0 { bytesLeft := u.totalSize - u.readerPos if bytesLeft == 0 { err = io.EOF } else if bytesLeft <= u.opts.PartSize { err = io.ErrUnexpectedEOF n = bytesLeft } } buf := io.NewSectionReader(r, u.readerPos, n) u.readerPos += n return buf, err default: packet := make([]byte, u.opts.PartSize) n, err := io.ReadFull(u.in.Body, packet) u.readerPos += int64(n) return bytes.NewReader(packet[0:n]), err } } // singlePart contains upload logic for uploading a single chunk via // a regular PutObject request. Multipart requests require at least two // parts, or at least 5MB of data. func (u *uploader) singlePart(buf io.ReadSeeker) (*UploadOutput, error) { params := &s3.PutObjectInput{} awsutil.Copy(params, u.in) params.Body = buf req, _ := u.opts.S3.PutObjectRequest(params) if err := req.Send(); err != nil { return nil, err } url := req.HTTPRequest.URL.String() return &UploadOutput{Location: url}, nil } // internal structure to manage a specific multipart upload to S3. type multiuploader struct { *uploader wg sync.WaitGroup m sync.Mutex err error uploadID string parts completedParts } // keeps track of a single chunk of data being sent to S3. type chunk struct { buf io.ReadSeeker num int64 } // completedParts is a wrapper to make parts sortable by their part number, // since S3 required this list to be sent in sorted order. type completedParts []*s3.CompletedPart func (a completedParts) Len() int { return len(a) } func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber } // upload will perform a multipart upload using the firstBuf buffer containing // the first chunk of data. func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) { params := &s3.CreateMultipartUploadInput{} awsutil.Copy(params, u.in) // Create the multipart resp, err := u.opts.S3.CreateMultipartUpload(params) if err != nil { return nil, err } u.uploadID = *resp.UploadID // Create the workers ch := make(chan chunk, u.opts.Concurrency) for i := 0; i < u.opts.Concurrency; i++ { u.wg.Add(1) go u.readChunk(ch) } // Send part 1 to the workers var num int64 = 1 ch <- chunk{buf: firstBuf, num: num} // Read and queue the rest of the parts for u.geterr() == nil { // This upload exceeded maximum number of supported parts, error now. if num > int64(MaxUploadParts) { msg := fmt.Sprintf("exceeded total allowed parts (%d). "+ "Adjust PartSize to fit in this limit", MaxUploadParts) u.seterr(awserr.New("TotalPartsExceeded", msg, nil)) break } num++ buf, err := u.nextReader() if err == io.EOF { break } ch <- chunk{buf: buf, num: num} if err != nil && err != io.ErrUnexpectedEOF { u.seterr(awserr.New( "ReadRequestBody", "read multipart upload data failed", err)) break } } // Close the channel, wait for workers, and complete upload close(ch) u.wg.Wait() complete := u.complete() if err := u.geterr(); err != nil { return nil, &multiUploadError{ awsError: awserr.New( "MultipartUpload", "upload multipart failed", err), uploadID: u.uploadID, } } return &UploadOutput{ Location: *complete.Location, UploadID: u.uploadID, }, nil } // readChunk runs in worker goroutines to pull chunks off of the ch channel // and send() them as UploadPart requests. func (u *multiuploader) readChunk(ch chan chunk) { defer u.wg.Done() for { data, ok := <-ch if !ok { break } if u.geterr() == nil { if err := u.send(data); err != nil { u.seterr(err) } } } } // send performs an UploadPart request and keeps track of the completed // part information. func (u *multiuploader) send(c chunk) error { resp, err := u.opts.S3.UploadPart(&s3.UploadPartInput{ Bucket: u.in.Bucket, Key: u.in.Key, Body: c.buf, UploadID: &u.uploadID, PartNumber: &c.num, }) if err != nil { return err } n := c.num completed := &s3.CompletedPart{ETag: resp.ETag, PartNumber: &n} u.m.Lock() u.parts = append(u.parts, completed) u.m.Unlock() return nil } // geterr is a thread-safe getter for the error object func (u *multiuploader) geterr() error { u.m.Lock() defer u.m.Unlock() return u.err } // seterr is a thread-safe setter for the error object func (u *multiuploader) seterr(e error) { u.m.Lock() defer u.m.Unlock() u.err = e } // fail will abort the multipart unless LeavePartsOnError is set to true. func (u *multiuploader) fail() { if u.opts.LeavePartsOnError { return } u.opts.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ Bucket: u.in.Bucket, Key: u.in.Key, UploadID: &u.uploadID, }) } // complete successfully completes a multipart upload and returns the response. func (u *multiuploader) complete() *s3.CompleteMultipartUploadOutput { if u.geterr() != nil { u.fail() return nil } // Parts must be sorted in PartNumber order. sort.Sort(u.parts) resp, err := u.opts.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ Bucket: u.in.Bucket, Key: u.in.Key, UploadID: &u.uploadID, MultipartUpload: &s3.CompletedMultipartUpload{Parts: u.parts}, }) if err != nil { u.seterr(err) u.fail() } return resp }