From bf10539e34e4360c55ec117506124a4b82fe1546 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Sun, 16 Jun 2019 19:57:02 +0200 Subject: [PATCH] Implement reading in file lists --- cmd/cloudbox/sync.go | 2 +- providers/local/provider.go | 32 +++++++++++++++++- sync/db.go | 20 ++++++++++++ sync/state.go | 31 ++++++++++-------- sync/sync.go | 65 +++++++++++++++++++++++++++++++++---- 5 files changed, 128 insertions(+), 22 deletions(-) diff --git a/cmd/cloudbox/sync.go b/cmd/cloudbox/sync.go index 814cd5d..15c9684 100644 --- a/cmd/cloudbox/sync.go +++ b/cmd/cloudbox/sync.go @@ -39,7 +39,7 @@ func execSync() error { return errors.Wrap(err, "Unable to establish database connection") } - s := sync.New(local, remote, db, conf.Sync.Settings) + s := sync.New(local, remote, db, conf.Sync.Settings, log.NewEntry(log.StandardLogger())) sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) diff --git a/providers/local/provider.go b/providers/local/provider.go index 5b15091..10a7748 100644 --- a/providers/local/provider.go +++ b/providers/local/provider.go @@ -6,6 +6,7 @@ import ( "io" "os" "path" + "path/filepath" "strings" "time" @@ -31,7 +32,36 @@ func (p Provider) Name() string { return "local" } func (p Provider) GetChecksumMethod() hash.Hash { return sha256.New() } func (p Provider) ListFiles() ([]providers.File, error) { - return nil, errors.New("Not implemented") + var ( + absPath string + err error + files []providers.File + ) + + if absPath, err = filepath.Abs(p.directory); err != nil { + return nil, errors.Wrap(err, "Unable to calculate absolute path") + } + + err = filepath.Walk(absPath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if info.IsDir() { + // We behave like git: We don't care about dirs themselves + return nil + } + + files = append(files, File{ + info: info, + relativeName: strings.TrimLeft(strings.TrimPrefix(path, absPath), "/"), + fullPath: path, + }) + + return nil + }) + + return files, errors.Wrap(err, "File listing failed") } func (p Provider) DeleteFile(relativeName string) error { diff --git a/sync/db.go b/sync/db.go index 11bfa66..d57886d 100644 --- a/sync/db.go +++ b/sync/db.go @@ -62,3 +62,23 @@ func (s *Sync) setDBFileInfo(side, info providers.FileInfo) error { _, err = stmt.Exec(info.RelativeName, info.LastModified, info.Checksum, info.Size) return errors.Wrap(err, "Unable to upsert file info") } + +func (s *Sync) updateStateFromDatabase(st *state) error { + for _, table := range []string{sideLocal, sideRemote} { + rows, err := s.db.Query(fmt.Sprintf("SELECT * FROM %s_state", table)) + if err != nil { + return errors.Wrapf(err, "Unable to query table %s", table) + } + defer rows.Close() + + for rows.Next() { + info := providers.FileInfo{} + if err = rows.Scan(&info.RelativeName, &info.LastModified, &info.Checksum, &info.Size); err != nil { + return errors.Wrap(err, "Unable to read response") + } + st.Set(table, sourceDB, info) + } + } + + return nil +} diff --git a/sync/state.go b/sync/state.go index 80de354..9eed6fd 100644 --- a/sync/state.go +++ b/sync/state.go @@ -27,8 +27,13 @@ func (c *Change) Register(add Change) { *c = *c | add } -func (c Change) Has(test Change) bool { - return c&test != 0 +func (c Change) HasOne(test ...Change) bool { + for _, t := range test { + if c&t != 0 { + return true + } + } + return false } func (c Change) Is(test Change) bool { @@ -36,10 +41,10 @@ func (c Change) Is(test Change) bool { } const ( - SideLocal string = "local" - SideRemote string = "remote" - SourceDB string = "db" - SourceScan string = "scan" + sideLocal string = "local" + sideRemote string = "remote" + sourceDB string = "db" + sourceScan string = "scan" ) type stateDetail struct { @@ -120,22 +125,22 @@ func (s *state) GetRelativeNames() []string { return out } -func (s *state) Set(side, source, relativeName string, info providers.FileInfo) { +func (s *state) Set(side, source string, info providers.FileInfo) { s.lock.Lock() defer s.lock.Unlock() - if _, ok := s.files[relativeName]; !ok { - s.files[relativeName] = &stateDetail{} + if _, ok := s.files[info.RelativeName]; !ok { + s.files[info.RelativeName] = &stateDetail{} } switch strings.Join([]string{side, source}, "::") { case "local::db": - s.files[relativeName].LocalDB = &info + s.files[info.RelativeName].LocalDB = &info case "local::scan": - s.files[relativeName].LocalScan = &info + s.files[info.RelativeName].LocalScan = &info case "remote::db": - s.files[relativeName].RemoteDB = &info + s.files[info.RelativeName].RemoteDB = &info case "remote::scan": - s.files[relativeName].RemoteScan = &info + s.files[info.RelativeName].RemoteScan = &info } } diff --git a/sync/sync.go b/sync/sync.go index 9692963..923df29 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -2,9 +2,11 @@ package sync import ( "database/sql" + "hash" "time" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" "github.com/Luzifer/cloudbox/providers" ) @@ -19,16 +21,20 @@ type Sync struct { conf SyncConfig local, remote providers.CloudProvider + log *log.Entry + stop chan struct{} } -func New(local, remote providers.CloudProvider, db *sql.DB, conf SyncConfig) *Sync { +func New(local, remote providers.CloudProvider, db *sql.DB, conf SyncConfig, logger *log.Entry) *Sync { return &Sync{ db: db, conf: conf, local: local, remote: remote, + log: logger, + stop: make(chan struct{}), } } @@ -38,16 +44,14 @@ func (s *Sync) Run() error { return errors.Wrap(err, "Unable to initialize database schema") } - var ( - hashMethod = s.remote.GetChecksumMethod() - refresh = time.NewTimer(s.conf.ScanInterval) - useChecksum = s.remote.Capabilities().Has(providers.CapAutoChecksum) || s.conf.ForceUseChecksum - ) + var refresh = time.NewTimer(s.conf.ScanInterval) for { select { case <-refresh.C: - // TODO: Execute rescan & sync + if err := s.runSync(); err != nil { + return errors.Wrap(err, "Sync failed") + } refresh.Reset(s.conf.ScanInterval) case <-s.stop: @@ -57,3 +61,50 @@ func (s *Sync) Run() error { } func (s *Sync) Stop() { s.stop <- struct{}{} } + +func (s *Sync) fillStateFromProvider(syncState *state, provider providers.CloudProvider, side string, useChecksum bool, hashMethod hash.Hash) error { + files, err := provider.ListFiles() + if err != nil { + return errors.Wrap(err, "Unable to list files") + } + + for _, f := range files { + info := f.Info() + if useChecksum && info.Checksum == "" { + cs, err := f.Checksum(hashMethod) + if err != nil { + return errors.Wrap(err, "Unable to fetch checksum") + } + info.Checksum = cs + } + + syncState.Set(side, sourceScan, info) + } + + return nil +} + +func (s *Sync) runSync() error { + var ( + hashMethod = s.remote.GetChecksumMethod() + syncState = newState() + useChecksum = s.remote.Capabilities().Has(providers.CapAutoChecksum) || s.conf.ForceUseChecksum + ) + + if err := s.updateStateFromDatabase(syncState); err != nil { + return errors.Wrap(err, "Unable to load database state") + } + + if err := s.fillStateFromProvider(syncState, s.local, sideLocal, useChecksum, hashMethod); err != nil { + return errors.Wrap(err, "Unable to load local files") + } + + if err := s.fillStateFromProvider(syncState, s.remote, sideRemote, useChecksum, hashMethod); err != nil { + return errors.Wrap(err, "Unable to load remote files") + } + + // TODO: Do something with sync database + s.log.Printf("%#v", syncState) + + return nil +}