From 0360e06ad147c093fb2d971cfe3cb3e6c811bde5 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Sun, 26 Jul 2015 17:43:55 +0200 Subject: [PATCH] Fetch s3 file list in parallel --- s3.go | 112 +++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 68 insertions(+), 44 deletions(-) diff --git a/s3.go b/s3.go index 9eb2abc..ad005ee 100644 --- a/s3.go +++ b/s3.go @@ -7,6 +7,7 @@ import ( "path/filepath" "regexp" "strings" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" @@ -44,59 +45,82 @@ func (s *s3Provider) ListFiles(prefix string) ([]file, error) { return out, err } - prefixList := []*string{aws.String(path)} + processedPrefixes := []string{} + + prefixChan := make(chan *string, 100) + outputChan := make(chan file, 100) + errChan := make(chan error, 10) + syncChan := make(chan bool, 10) + doneTimer := time.NewTicker(500 * time.Millisecond) + + prefixChan <- aws.String(path) for { - fmt.Printf("Scanning prefixes (%d left)...\r", len(prefixList)) - var p *string - p, prefixList = prefixList[0], prefixList[1:] - in := &s3.ListObjectsInput{ - Bucket: aws.String(bucket), - Prefix: p, - MaxKeys: aws.Long(1000), - Delimiter: aws.String("/"), - } - for { - o, err := s.conn.ListObjects(in) - if err != nil { - return out, err - } - - for _, v := range o.Contents { - out = append(out, file{ - Filename: *v.Key, - Size: *v.Size, - MD5: strings.Trim(*v.ETag, "\""), // Wat? - }) - } - - if len(o.CommonPrefixes) > 0 { - for _, cp := range o.CommonPrefixes { - found := false - for _, v := range prefixList { - if v == cp.Prefix { - found = true - } - } - if !found { - prefixList = append(prefixList, cp.Prefix) + select { + case prefix := <-prefixChan: + if len(syncChan) == 10 { + prefixChan <- prefix + } else { + found := false + for _, v := range processedPrefixes { + if v == *prefix { + found = true } } + if !found { + syncChan <- true + go s.readS3FileList(bucket, prefix, outputChan, prefixChan, errChan, syncChan) + processedPrefixes = append(processedPrefixes, *prefix) + } } - - if !*o.IsTruncated { - break + case o := <-outputChan: + out = append(out, o) + case err := <-errChan: + return out, err + case <-doneTimer.C: + fmt.Printf("Scanning prefixes (%d working, %d left)...\r", len(syncChan), len(prefixChan)) + if len(prefixChan) == 0 && len(syncChan) == 0 { + fmt.Printf("\n") + return out, nil } - in.Marker = o.NextMarker - } - - if len(prefixList) == 0 { - fmt.Printf("\n") - break } } +} - return out, nil +func (s *s3Provider) readS3FileList(bucket string, path *string, outputChan chan file, prefixChan chan *string, errorChan chan error, syncChan chan bool) { + defer func() { <-syncChan }() + in := &s3.ListObjectsInput{ + Bucket: aws.String(bucket), + Prefix: path, + MaxKeys: aws.Long(1000), + Delimiter: aws.String("/"), + } + for { + o, err := s.conn.ListObjects(in) + if err != nil { + errorChan <- err + return + } + + for _, v := range o.Contents { + outputChan <- file{ + Filename: *v.Key, + Size: *v.Size, + MD5: strings.Trim(*v.ETag, "\""), // Wat? + } + } + + if len(o.CommonPrefixes) > 0 { + for _, cp := range o.CommonPrefixes { + prefixChan <- cp.Prefix + } + } + + if !*o.IsTruncated { + break + } + in.Marker = o.NextMarker + } } func (s *s3Provider) WriteFile(path string, content io.ReadSeeker, public bool) error {