1
0
Fork 0
mirror of https://github.com/Luzifer/s3sync.git synced 2024-12-20 19:41:15 +00:00

Fetch s3 file list in parallel

This commit is contained in:
Knut Ahlers 2015-07-26 17:43:55 +02:00
parent 5dfb0e4b45
commit 0360e06ad1

112
s3.go
View file

@ -7,6 +7,7 @@ import (
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings" "strings"
"time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
@ -44,59 +45,82 @@ func (s *s3Provider) ListFiles(prefix string) ([]file, error) {
return out, err return out, err
} }
prefixList := []*string{aws.String(path)} processedPrefixes := []string{}
prefixChan := make(chan *string, 100)
outputChan := make(chan file, 100)
errChan := make(chan error, 10)
syncChan := make(chan bool, 10)
doneTimer := time.NewTicker(500 * time.Millisecond)
prefixChan <- aws.String(path)
for { for {
fmt.Printf("Scanning prefixes (%d left)...\r", len(prefixList)) select {
var p *string case prefix := <-prefixChan:
p, prefixList = prefixList[0], prefixList[1:] if len(syncChan) == 10 {
in := &s3.ListObjectsInput{ prefixChan <- prefix
Bucket: aws.String(bucket), } else {
Prefix: p, found := false
MaxKeys: aws.Long(1000), for _, v := range processedPrefixes {
Delimiter: aws.String("/"), if v == *prefix {
} found = true
for {
o, err := s.conn.ListObjects(in)
if err != nil {
return out, err
}
for _, v := range o.Contents {
out = append(out, file{
Filename: *v.Key,
Size: *v.Size,
MD5: strings.Trim(*v.ETag, "\""), // Wat?
})
}
if len(o.CommonPrefixes) > 0 {
for _, cp := range o.CommonPrefixes {
found := false
for _, v := range prefixList {
if v == cp.Prefix {
found = true
}
}
if !found {
prefixList = append(prefixList, cp.Prefix)
} }
} }
if !found {
syncChan <- true
go s.readS3FileList(bucket, prefix, outputChan, prefixChan, errChan, syncChan)
processedPrefixes = append(processedPrefixes, *prefix)
}
} }
case o := <-outputChan:
if !*o.IsTruncated { out = append(out, o)
break case err := <-errChan:
return out, err
case <-doneTimer.C:
fmt.Printf("Scanning prefixes (%d working, %d left)...\r", len(syncChan), len(prefixChan))
if len(prefixChan) == 0 && len(syncChan) == 0 {
fmt.Printf("\n")
return out, nil
} }
in.Marker = o.NextMarker
}
if len(prefixList) == 0 {
fmt.Printf("\n")
break
} }
} }
}
return out, nil func (s *s3Provider) readS3FileList(bucket string, path *string, outputChan chan file, prefixChan chan *string, errorChan chan error, syncChan chan bool) {
defer func() { <-syncChan }()
in := &s3.ListObjectsInput{
Bucket: aws.String(bucket),
Prefix: path,
MaxKeys: aws.Long(1000),
Delimiter: aws.String("/"),
}
for {
o, err := s.conn.ListObjects(in)
if err != nil {
errorChan <- err
return
}
for _, v := range o.Contents {
outputChan <- file{
Filename: *v.Key,
Size: *v.Size,
MD5: strings.Trim(*v.ETag, "\""), // Wat?
}
}
if len(o.CommonPrefixes) > 0 {
for _, cp := range o.CommonPrefixes {
prefixChan <- cp.Prefix
}
}
if !*o.IsTruncated {
break
}
in.Marker = o.NextMarker
}
} }
func (s *s3Provider) WriteFile(path string, content io.ReadSeeker, public bool) error { func (s *s3Provider) WriteFile(path string, content io.ReadSeeker, public bool) error {