1
0
Fork 0
mirror of https://github.com/Luzifer/s3sync.git synced 2024-10-18 06:24:20 +00:00
s3sync/s3.go

192 lines
4 KiB
Go
Raw Normal View History

2015-07-26 14:06:24 +00:00
package main
import (
"fmt"
"io"
"mime"
"os"
2015-07-26 14:06:24 +00:00
"path/filepath"
"regexp"
"strings"
2015-07-26 15:43:55 +00:00
"time"
2015-07-26 14:06:24 +00:00
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
2015-07-26 14:06:24 +00:00
"github.com/aws/aws-sdk-go/service/s3"
)
type s3Provider struct {
2016-03-23 15:40:11 +00:00
conn *s3.S3
requestedPrefix string
2015-07-26 14:06:24 +00:00
}
func newS3Provider() (*s3Provider, error) {
sess := session.Must(session.NewSession())
2015-07-26 14:06:24 +00:00
return &s3Provider{
conn: s3.New(sess),
2015-07-26 14:06:24 +00:00
}, nil
}
func (s *s3Provider) getBucketPath(prefix string) (bucket string, path string, err error) {
rex := regexp.MustCompile(`^s3://?([^/]+)/(.*)$`)
matches := rex.FindStringSubmatch(prefix)
if len(matches) != 3 {
err = fmt.Errorf("prefix did not match requirements")
return
}
bucket = matches[1]
path = strings.Replace(matches[2], string(os.PathSeparator), "/", -1)
2016-03-23 15:40:11 +00:00
s.requestedPrefix = path
2015-07-26 14:06:24 +00:00
return
}
func (s *s3Provider) ListFiles(prefix string) ([]file, error) {
out := []file{}
bucket, path, err := s.getBucketPath(prefix)
if err != nil {
return out, err
}
2015-07-26 15:43:55 +00:00
processedPrefixes := []string{}
prefixChan := make(chan *string, 10000)
outputChan := make(chan file, 10000)
2015-07-26 15:43:55 +00:00
errChan := make(chan error, 10)
syncChan := make(chan bool, 10)
doneTimer := time.NewTicker(500 * time.Millisecond)
2015-07-26 15:43:55 +00:00
prefixChan <- aws.String(path)
2015-07-26 15:43:55 +00:00
for {
select {
case prefix := <-prefixChan:
if len(syncChan) == 10 {
prefixChan <- prefix
} else {
found := false
for _, v := range processedPrefixes {
if v == *prefix {
found = true
}
}
2015-07-26 15:43:55 +00:00
if !found {
syncChan <- true
go s.readS3FileList(bucket, prefix, outputChan, prefixChan, errChan, syncChan)
processedPrefixes = append(processedPrefixes, *prefix)
}
}
case o := <-outputChan:
out = append(out, o)
case err := <-errChan:
return out, err
case <-doneTimer.C:
stdout.DebugF("Scanning prefixes (%d working, %d left)...\r", len(syncChan), len(prefixChan))
2015-07-26 15:43:55 +00:00
if len(prefixChan) == 0 && len(syncChan) == 0 {
fmt.Printf("\n")
return out, nil
}
2015-07-26 15:43:55 +00:00
}
}
}
func (s *s3Provider) readS3FileList(bucket string, path *string, outputChan chan file, prefixChan chan *string, errorChan chan error, syncChan chan bool) {
defer func() { <-syncChan }()
in := &s3.ListObjectsInput{
Bucket: aws.String(bucket),
Prefix: path,
2015-08-01 21:57:53 +00:00
MaxKeys: aws.Int64(1000),
2015-07-26 15:43:55 +00:00
Delimiter: aws.String("/"),
}
for {
o, err := s.conn.ListObjects(in)
if err != nil {
errorChan <- err
return
}
2015-07-26 14:06:24 +00:00
2015-07-26 15:43:55 +00:00
for _, v := range o.Contents {
outputChan <- file{
2016-03-23 15:40:11 +00:00
Filename: strings.Replace(*v.Key, s.requestedPrefix, "", 1),
2015-07-26 15:43:55 +00:00
Size: *v.Size,
MD5: strings.Trim(*v.ETag, "\""), // Wat?
}
2015-07-26 14:06:24 +00:00
}
2015-07-26 15:43:55 +00:00
if len(o.CommonPrefixes) > 0 {
for _, cp := range o.CommonPrefixes {
prefixChan <- cp.Prefix
}
}
if !*o.IsTruncated {
2015-07-26 14:06:24 +00:00
break
}
2015-07-26 15:43:55 +00:00
in.Marker = o.NextMarker
2015-07-26 14:06:24 +00:00
}
}
func (s *s3Provider) WriteFile(path string, content io.ReadSeeker, public bool) error {
bucket, path, err := s.getBucketPath(path)
if err != nil {
return err
}
ext := filepath.Ext(path)
mimeType := mime.TypeByExtension(ext)
if mimeType == "" {
mimeType = "application/octet-stream"
}
params := &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
Body: content,
ContentType: aws.String(mimeType),
}
if public {
params.ACL = aws.String("public-read")
}
_, err = s.conn.PutObject(params)
return err
}
func (s *s3Provider) ReadFile(path string) (io.ReadCloser, error) {
bucket, path, err := s.getBucketPath(path)
if err != nil {
return nil, err
}
o, err := s.conn.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
})
if err != nil {
return nil, err
}
return o.Body, nil
}
func (s *s3Provider) DeleteFile(path string) error {
bucket, path, err := s.getBucketPath(path)
if err != nil {
return err
}
_, err = s.conn.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
})
return err
}
func (s *s3Provider) GetAbsolutePath(path string) (string, error) {
return path, nil
}