1
0
Fork 0
mirror of https://github.com/Luzifer/s3sync.git synced 2024-12-20 19:41:15 +00:00
s3sync/pkg/fsprovider/s3.go

218 lines
5.2 KiB
Go
Raw Normal View History

package fsprovider
import (
"io"
"mime"
"os"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
channelBufferSizeHuge = 10000
channelBufferSizeSmall = 10
doneTickerInterval = 500 * time.Millisecond
maxKeysPerPage = 1000
)
type (
// S3 implements the Provider for S3 / MinIO access
S3 struct {
conn *s3.S3
requestedPrefix string
}
)
// NewS3 creates a new S3 / MinIO file provider
func NewS3(endpoint string) (*S3, error) {
var cfgs []*aws.Config
if endpoint != "" {
cfgs = append(cfgs, &aws.Config{
Endpoint: &endpoint,
S3ForcePathStyle: aws.Bool(true),
})
}
sess := session.Must(session.NewSession(cfgs...))
return &S3{
conn: s3.New(sess),
}, nil
}
// DeleteFile deletes an object from the bucket
func (s *S3) DeleteFile(path string) error {
bucket, path, err := s.getBucketPath(path)
if err != nil {
return errors.Wrap(err, "getting bucket path")
}
_, err = s.conn.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
})
return errors.Wrap(err, "deleting object")
}
// GetAbsolutePath converts the given path into an absolute path
func (*S3) GetAbsolutePath(path string) (string, error) {
return path, nil
}
// ListFiles retrieves the metadata of all objects with given prefix
func (s *S3) ListFiles(prefix string) ([]File, error) {
out := []File{}
bucket, path, err := s.getBucketPath(prefix)
if err != nil {
return out, errors.Wrap(err, "getting bucket path")
}
processedPrefixes := []string{}
prefixChan := make(chan *string, channelBufferSizeHuge)
outputChan := make(chan File, channelBufferSizeHuge)
errChan := make(chan error, channelBufferSizeSmall)
syncChan := make(chan bool, channelBufferSizeSmall)
doneTimer := time.NewTicker(doneTickerInterval)
prefixChan <- aws.String(path)
for {
select {
case prefix := <-prefixChan:
if len(syncChan) == channelBufferSizeSmall {
prefixChan <- prefix
} else {
found := false
for _, v := range processedPrefixes {
if v == *prefix {
found = true
}
}
if !found {
syncChan <- true
go s.readS3FileList(bucket, prefix, outputChan, prefixChan, errChan, syncChan)
processedPrefixes = append(processedPrefixes, *prefix)
}
}
case o := <-outputChan:
out = append(out, o)
case err := <-errChan:
return out, err
case <-doneTimer.C:
logrus.Debugf("scanning prefixes (%d working, %d left)...", len(syncChan), len(prefixChan))
if len(prefixChan) == 0 && len(syncChan) == 0 {
return out, nil
}
}
}
}
// ReadFile retrieves the object body for reading
func (s *S3) ReadFile(path string) (io.ReadCloser, error) {
bucket, path, err := s.getBucketPath(path)
if err != nil {
return nil, errors.Wrap(err, "getting bucket path")
}
o, err := s.conn.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
})
if err != nil {
return nil, errors.Wrap(err, "getting object")
}
return o.Body, nil
}
// WriteFile copies the content into an S3 object
//
//revive:disable-next-line:flag-parameter // That's not a control coupling but a config flag
func (s *S3) WriteFile(path string, content io.Reader, public bool) error {
bucket, path, err := s.getBucketPath(path)
if err != nil {
return errors.Wrap(err, "getting bucket path")
}
ext := filepath.Ext(path)
mimeType := mime.TypeByExtension(ext)
if mimeType == "" {
mimeType = "application/octet-stream"
}
params := &s3manager.UploadInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
Body: content,
ContentType: aws.String(mimeType),
}
if public {
params.ACL = aws.String("public-read")
}
_, err = s3manager.NewUploaderWithClient(s.conn).Upload(params)
return errors.Wrap(err, "uploading file")
}
func (s *S3) getBucketPath(prefix string) (bucket string, path string, err error) {
rex := regexp.MustCompile(`^s3://?([^/]+)/(.*)$`)
matches := rex.FindStringSubmatch(prefix)
if matches == nil {
return "", "", errors.New("prefix did not match requirements")
}
bucket = matches[1]
path = strings.ReplaceAll(matches[2], string(os.PathSeparator), "/")
s.requestedPrefix = path
return bucket, path, nil
}
func (s *S3) readS3FileList(bucket string, path *string, outputChan chan File, prefixChan chan *string, errorChan chan error, syncChan chan bool) {
defer func() { <-syncChan }()
in := &s3.ListObjectsInput{
Bucket: aws.String(bucket),
Prefix: path,
MaxKeys: aws.Int64(maxKeysPerPage),
Delimiter: aws.String("/"),
}
for {
o, err := s.conn.ListObjects(in)
if err != nil {
errorChan <- errors.Wrap(err, "listing objects")
return
}
for _, v := range o.Contents {
outputChan <- File{
Filename: strings.Replace(*v.Key, s.requestedPrefix, "", 1),
LastModified: *v.LastModified,
Size: *v.Size,
}
}
if len(o.CommonPrefixes) > 0 {
for _, cp := range o.CommonPrefixes {
prefixChan <- cp.Prefix
}
}
if !*o.IsTruncated {
break
}
in.Marker = o.NextMarker
}
}