mirror of
https://github.com/Luzifer/s3sync.git
synced 2024-12-20 19:41:15 +00:00
Merge pull request #1 from Luzifer/s3chan
Fetch s3 file list in parallel
This commit is contained in:
commit
429dbf0313
1 changed files with 68 additions and 44 deletions
112
s3.go
112
s3.go
|
@ -7,6 +7,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
@ -44,59 +45,82 @@ func (s *s3Provider) ListFiles(prefix string) ([]file, error) {
|
||||||
return out, err
|
return out, err
|
||||||
}
|
}
|
||||||
|
|
||||||
prefixList := []*string{aws.String(path)}
|
processedPrefixes := []string{}
|
||||||
|
|
||||||
|
prefixChan := make(chan *string, 100)
|
||||||
|
outputChan := make(chan file, 100)
|
||||||
|
errChan := make(chan error, 10)
|
||||||
|
syncChan := make(chan bool, 10)
|
||||||
|
doneTimer := time.NewTicker(500 * time.Millisecond)
|
||||||
|
|
||||||
|
prefixChan <- aws.String(path)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
fmt.Printf("Scanning prefixes (%d left)...\r", len(prefixList))
|
select {
|
||||||
var p *string
|
case prefix := <-prefixChan:
|
||||||
p, prefixList = prefixList[0], prefixList[1:]
|
if len(syncChan) == 10 {
|
||||||
in := &s3.ListObjectsInput{
|
prefixChan <- prefix
|
||||||
Bucket: aws.String(bucket),
|
} else {
|
||||||
Prefix: p,
|
found := false
|
||||||
MaxKeys: aws.Long(1000),
|
for _, v := range processedPrefixes {
|
||||||
Delimiter: aws.String("/"),
|
if v == *prefix {
|
||||||
}
|
found = true
|
||||||
for {
|
|
||||||
o, err := s.conn.ListObjects(in)
|
|
||||||
if err != nil {
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, v := range o.Contents {
|
|
||||||
out = append(out, file{
|
|
||||||
Filename: *v.Key,
|
|
||||||
Size: *v.Size,
|
|
||||||
MD5: strings.Trim(*v.ETag, "\""), // Wat?
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(o.CommonPrefixes) > 0 {
|
|
||||||
for _, cp := range o.CommonPrefixes {
|
|
||||||
found := false
|
|
||||||
for _, v := range prefixList {
|
|
||||||
if v == cp.Prefix {
|
|
||||||
found = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
prefixList = append(prefixList, cp.Prefix)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !found {
|
||||||
|
syncChan <- true
|
||||||
|
go s.readS3FileList(bucket, prefix, outputChan, prefixChan, errChan, syncChan)
|
||||||
|
processedPrefixes = append(processedPrefixes, *prefix)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
case o := <-outputChan:
|
||||||
if !*o.IsTruncated {
|
out = append(out, o)
|
||||||
break
|
case err := <-errChan:
|
||||||
|
return out, err
|
||||||
|
case <-doneTimer.C:
|
||||||
|
fmt.Printf("Scanning prefixes (%d working, %d left)...\r", len(syncChan), len(prefixChan))
|
||||||
|
if len(prefixChan) == 0 && len(syncChan) == 0 {
|
||||||
|
fmt.Printf("\n")
|
||||||
|
return out, nil
|
||||||
}
|
}
|
||||||
in.Marker = o.NextMarker
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(prefixList) == 0 {
|
|
||||||
fmt.Printf("\n")
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return out, nil
|
func (s *s3Provider) readS3FileList(bucket string, path *string, outputChan chan file, prefixChan chan *string, errorChan chan error, syncChan chan bool) {
|
||||||
|
defer func() { <-syncChan }()
|
||||||
|
in := &s3.ListObjectsInput{
|
||||||
|
Bucket: aws.String(bucket),
|
||||||
|
Prefix: path,
|
||||||
|
MaxKeys: aws.Long(1000),
|
||||||
|
Delimiter: aws.String("/"),
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
o, err := s.conn.ListObjects(in)
|
||||||
|
if err != nil {
|
||||||
|
errorChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v := range o.Contents {
|
||||||
|
outputChan <- file{
|
||||||
|
Filename: *v.Key,
|
||||||
|
Size: *v.Size,
|
||||||
|
MD5: strings.Trim(*v.ETag, "\""), // Wat?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(o.CommonPrefixes) > 0 {
|
||||||
|
for _, cp := range o.CommonPrefixes {
|
||||||
|
prefixChan <- cp.Prefix
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !*o.IsTruncated {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
in.Marker = o.NextMarker
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *s3Provider) WriteFile(path string, content io.ReadSeeker, public bool) error {
|
func (s *s3Provider) WriteFile(path string, content io.ReadSeeker, public bool) error {
|
||||||
|
|
Loading…
Reference in a new issue