1
0
Fork 0
mirror of https://github.com/Luzifer/s3sync.git synced 2024-10-18 06:24:20 +00:00
s3sync/main.go

207 lines
5.2 KiB
Go
Raw Normal View History

2015-07-26 14:06:24 +00:00
package main
import (
"os"
"path"
"strings"
"time"
2015-07-26 14:06:24 +00:00
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/Luzifer/rconfig/v2"
"github.com/Luzifer/s3sync/v2/pkg/fsprovider"
2015-07-26 14:06:24 +00:00
)
var (
cfg = struct {
Delete bool `flag:"delete,d" default:"false" description:"Delete files on remote not existing on local"`
Endpoint string `flag:"endpoint" default:"" description:"Switch S3 endpoint (i.e. for MinIO compatibility)"`
LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"`
MaxThreads int `flag:"max-threads" default:"10" description:"Use max N parallel threads for file sync"`
Public bool `flag:"public,P" default:"false" description:"Make files public when syncing to S3"`
VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"`
2015-07-26 14:06:24 +00:00
}{}
2015-07-26 14:06:24 +00:00
version = "dev"
)
func getFSProvider(prefix string) (fsprovider.Provider, error) {
if strings.HasPrefix(prefix, "s3://") {
p, err := fsprovider.NewS3(cfg.Endpoint)
return p, errors.Wrap(err, "getting s3 provider")
}
return fsprovider.NewLocal(), nil
2015-07-26 14:06:24 +00:00
}
func initApp() error {
rconfig.AutoEnv(true)
if err := rconfig.ParseAndValidate(&cfg); err != nil {
return errors.Wrap(err, "parsing cli options")
}
2015-07-26 14:06:24 +00:00
l, err := logrus.ParseLevel(cfg.LogLevel)
if err != nil {
return errors.Wrap(err, "parsing log-level")
}
logrus.SetLevel(l)
2015-07-26 14:06:24 +00:00
return nil
}
func main() {
var err error
if err = initApp(); err != nil {
logrus.WithError(err).Fatal("initializing app")
}
if cfg.VersionAndExit {
logrus.WithField("version", version).Info("s3sync")
os.Exit(0)
}
2015-07-26 14:06:24 +00:00
if err = runSync(rconfig.Args()[1:]); err != nil {
logrus.WithError(err).Fatal("running sync")
}
2015-07-26 14:06:24 +00:00
}
//nolint:funlen,gocognit,gocyclo // Should be kept as single unit
func runSync(args []string) error {
//nolint:gomnd // Simple count of parameters, makes no sense to export
2015-07-26 14:06:24 +00:00
if len(args) != 2 {
return errors.New("missing required arguments: s3sync <from> <to>")
2015-07-26 14:06:24 +00:00
}
local, err := getFSProvider(args[0])
if err != nil {
return errors.Wrap(err, "getting local provider")
}
2015-07-26 14:06:24 +00:00
remote, err := getFSProvider(args[1])
if err != nil {
return errors.Wrap(err, "getting remote provider")
}
2015-07-26 14:06:24 +00:00
localPath, err := local.GetAbsolutePath(args[0])
if err != nil {
return errors.Wrap(err, "getting local path")
}
2015-07-26 14:06:24 +00:00
remotePath, err := remote.GetAbsolutePath(args[1])
if err != nil {
return errors.Wrap(err, "getting remote path")
}
2015-07-26 14:06:24 +00:00
localFiles, err := local.ListFiles(localPath)
if err != nil {
return errors.Wrap(err, "listing local files")
}
2015-07-26 14:06:24 +00:00
remoteFiles, err := remote.ListFiles(remotePath)
if err != nil {
return errors.Wrap(err, "listing remote files")
}
var (
nErr int
syncChannel = make(chan bool, cfg.MaxThreads)
)
2015-07-26 14:06:24 +00:00
for i, localFile := range localFiles {
syncChannel <- true
go func(i int, localFile fsprovider.File) {
defer func() { <-syncChannel }()
var (
logger = logrus.WithField("filename", localFile.Filename)
debugLogger = logger.WithField("tx_reason", "missing")
needsCopy bool
remoteFound bool
)
for _, remoteFile := range remoteFiles {
if remoteFile.Filename != localFile.Filename {
// Different file, do not compare
continue
}
// We found a match, lets check whether tx is required
remoteFound = true
switch {
case remoteFile.Size != localFile.Size:
debugLogger = debugLogger.WithField("tx_reason", "size-mismatch").WithField("ls", localFile.Size).WithField("rs", remoteFile.Size)
needsCopy = true
case localFile.LastModified.After(remoteFile.LastModified):
debugLogger = debugLogger.WithField("tx_reason", "local-newer")
needsCopy = true
default:
// No reason to update
needsCopy = false
}
break
2015-07-26 14:06:24 +00:00
}
if remoteFound && !needsCopy {
logger.Debug("skipped transfer")
return
}
debugLogger.Debug("starting transfer")
2015-07-26 14:06:24 +00:00
l, err := local.ReadFile(path.Join(localPath, localFile.Filename))
if err != nil {
logger.WithError(err).Error("reading local file")
nErr++
return
}
defer func() {
if err := l.Close(); err != nil {
logger.WithError(err).Error("closing local file")
}
}()
2015-07-26 14:06:24 +00:00
err = remote.WriteFile(path.Join(remotePath, localFile.Filename), l, cfg.Public)
if err != nil {
logger.WithError(err).Error("writing remote file")
nErr++
return
}
2015-07-26 14:06:24 +00:00
logger.Info("transferred file")
}(i, localFile)
2015-07-26 14:06:24 +00:00
}
if cfg.Delete {
for _, remoteFile := range remoteFiles {
syncChannel <- true
go func(remoteFile fsprovider.File) {
defer func() { <-syncChannel }()
needsDeletion := true
for _, localFile := range localFiles {
if localFile.Filename == remoteFile.Filename {
needsDeletion = false
}
2015-07-26 14:06:24 +00:00
}
if needsDeletion {
if err := remote.DeleteFile(path.Join(remotePath, remoteFile.Filename)); err != nil {
logrus.WithField("filename", remoteFile.Filename).WithError(err).Error("deleting remote file")
nErr++
return
}
logrus.WithField("filename", remoteFile.Filename).Info("deleted remote file")
2015-07-26 14:06:24 +00:00
}
}(remoteFile)
2015-07-26 14:06:24 +00:00
}
}
for len(syncChannel) > 0 {
<-time.After(time.Second)
}
2015-07-26 14:06:24 +00:00
return nil
2015-07-26 14:06:24 +00:00
}