From cf6c4d640d22c14836e48f11870ca2b7ffb5b675 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Sun, 16 Jun 2019 18:53:30 +0200 Subject: [PATCH] Add change management --- cmd/cloudbox/config.go | 13 +++- cmd/cloudbox/providers.go | 2 +- cmd/cloudbox/sync.go | 13 +++- providers/file.go | 33 ++++++++- providers/interface.go | 9 ++- providers/local/file.go | 6 +- providers/local/provider.go | 17 +++-- sync/db.go | 64 ++++++++++++++++ sync/state.go | 141 ++++++++++++++++++++++++++++++++++++ sync/sync.go | 45 ++++++++++-- 10 files changed, 319 insertions(+), 24 deletions(-) create mode 100644 sync/db.go create mode 100644 sync/state.go diff --git a/cmd/cloudbox/config.go b/cmd/cloudbox/config.go index 48f6cc1..311e905 100644 --- a/cmd/cloudbox/config.go +++ b/cmd/cloudbox/config.go @@ -2,10 +2,13 @@ package main import ( "os" + "time" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" + + "github.com/Luzifer/cloudbox/sync" ) type shareConfig struct { @@ -14,8 +17,9 @@ type shareConfig struct { } type syncConfig struct { - LocalDir string `yaml:"local_dir"` - RemoteURI string `yaml:"remote_uri"` + LocalDir string `yaml:"local_dir"` + RemoteURI string `yaml:"remote_uri"` + Settings sync.SyncConfig `yaml:"settings"` } type configFile struct { @@ -43,6 +47,11 @@ func (c configFile) validate() error { func defaultConfig() *configFile { return &configFile{ ControlDir: "~/.cache/cloudbox", + Sync: syncConfig{ + Settings: sync.SyncConfig{ + ScanInterval: time.Minute, + }, + }, } } diff --git a/cmd/cloudbox/providers.go b/cmd/cloudbox/providers.go index bed67be..8e22ec3 100644 --- a/cmd/cloudbox/providers.go +++ b/cmd/cloudbox/providers.go @@ -20,7 +20,7 @@ func providerFromURI(uri string) (providers.CloudProvider, error) { cp, err := f(uri) switch err { case nil: - if cp.Capabilities()&providers.CapBasic == 0 { + if !cp.Capabilities().Has(providers.CapBasic) { return nil, errors.Errorf("Provider %s does not support basic capabilities", cp.Name()) } diff --git a/cmd/cloudbox/sync.go b/cmd/cloudbox/sync.go index 66f98fc..814cd5d 100644 --- a/cmd/cloudbox/sync.go +++ b/cmd/cloudbox/sync.go @@ -3,7 +3,9 @@ package main import ( "database/sql" "os" + "os/signal" "path" + "syscall" _ "github.com/mattn/go-sqlite3" "github.com/pkg/errors" @@ -37,7 +39,16 @@ func execSync() error { return errors.Wrap(err, "Unable to establish database connection") } - s := sync.New(local, remote, db) + s := sync.New(local, remote, db, conf.Sync.Settings) + + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + for range sigchan { + s.Stop() + } + }() log.Info("Starting sync run...") return errors.Wrap(s.Run(), "Unable to sync") diff --git a/providers/file.go b/providers/file.go index 5990da4..f992087 100644 --- a/providers/file.go +++ b/providers/file.go @@ -1,6 +1,7 @@ package providers import ( + "hash" "io" "time" @@ -11,7 +12,7 @@ var ErrFileNotFound = errors.New("File not found") type File interface { Info() FileInfo - Checksum() (string, error) + Checksum(hash.Hash) (string, error) Content() (io.ReadCloser, error) } @@ -21,3 +22,33 @@ type FileInfo struct { Checksum string // Expected to be present on CapAutoChecksum Size uint64 } + +func (f *FileInfo) Equal(other *FileInfo) bool { + if f == nil && other == nil { + // Both are not present: No change + return true + } + + if (f != nil && other == nil) || (f == nil && other != nil) { + // One is not present, the other is: Change + return false + } + + if (f.Checksum != "" || other.Checksum != "") && f.Checksum != other.Checksum { + // Checksum is present in one, doesn't match: Change + return false + } + + if f.Size != other.Size { + // No checksums present, size differs: Change + return false + } + + if !f.LastModified.Equal(other.LastModified) { + // LastModified date differs: Change + return false + } + + // No changes detected yet: No change + return true +} diff --git a/providers/interface.go b/providers/interface.go index b252a13..007ea35 100644 --- a/providers/interface.go +++ b/providers/interface.go @@ -1,6 +1,8 @@ package providers import ( + "hash" + "github.com/pkg/errors" ) @@ -12,6 +14,8 @@ const ( CapAutoChecksum ) +func (c Capability) Has(test Capability) bool { return c&test != 0 } + var ( ErrInvalidURI = errors.New("Spefified URI is invalid for this provider") ErrFeatureNotSupported = errors.New("Feature not supported") @@ -21,10 +25,11 @@ type CloudProviderInitFunc func(string) (CloudProvider, error) type CloudProvider interface { Capabilities() Capability - Name() string DeleteFile(relativeName string) error + GetChecksumMethod() hash.Hash GetFile(relativeName string) (File, error) ListFiles() ([]File, error) - PutFile(File) error + Name() string + PutFile(File) (File, error) Share(relativeName string) (string, error) } diff --git a/providers/local/file.go b/providers/local/file.go index 675b805..d756fbd 100644 --- a/providers/local/file.go +++ b/providers/local/file.go @@ -2,8 +2,8 @@ package local import ( "bytes" - "crypto/sha256" "fmt" + "hash" "io" "os" @@ -26,7 +26,7 @@ func (f File) Info() providers.FileInfo { } } -func (f File) Checksum() (string, error) { +func (f File) Checksum(h hash.Hash) (string, error) { fc, err := f.Content() if err != nil { return "", errors.Wrap(err, "Unable to get file contents") @@ -37,7 +37,7 @@ func (f File) Checksum() (string, error) { return "", errors.Wrap(err, "Unable to read file contents") } - return fmt.Sprintf("%x", sha256.Sum256(buf.Bytes())), nil + return fmt.Sprintf("%x", h.Sum(buf.Bytes())), nil } func (f File) Content() (io.ReadCloser, error) { diff --git a/providers/local/provider.go b/providers/local/provider.go index 4180f05..5b15091 100644 --- a/providers/local/provider.go +++ b/providers/local/provider.go @@ -1,6 +1,8 @@ package local import ( + "crypto/sha256" + "hash" "io" "os" "path" @@ -26,6 +28,7 @@ type Provider struct { func (p Provider) Capabilities() providers.Capability { return providers.CapBasic } func (p Provider) Name() string { return "local" } +func (p Provider) GetChecksumMethod() hash.Hash { return sha256.New() } func (p Provider) ListFiles() ([]providers.File, error) { return nil, errors.New("Not implemented") @@ -53,33 +56,33 @@ func (p Provider) GetFile(relativeName string) (providers.File, error) { }, nil } -func (p Provider) PutFile(f providers.File) error { +func (p Provider) PutFile(f providers.File) (providers.File, error) { fullPath := path.Join(p.directory, f.Info().RelativeName) fp, err := os.Create(fullPath) if err != nil { - return errors.Wrap(err, "Unable to create file") + return nil, errors.Wrap(err, "Unable to create file") } rfp, err := f.Content() if err != nil { - return errors.Wrap(err, "Unable to get remote file content") + return nil, errors.Wrap(err, "Unable to get remote file content") } defer rfp.Close() if _, err := io.Copy(fp, rfp); err != nil { - return errors.Wrap(err, "Unable to copy file contents") + return nil, errors.Wrap(err, "Unable to copy file contents") } if err := fp.Close(); err != nil { - return errors.Wrap(err, "Unable to close local file") + return nil, errors.Wrap(err, "Unable to close local file") } if err := os.Chtimes(fullPath, time.Now(), f.Info().LastModified); err != nil { - return errors.Wrap(err, "Unable to set last file mod time") + return nil, errors.Wrap(err, "Unable to set last file mod time") } - return nil + return p.GetFile(f.Info().RelativeName) } func (p Provider) Share(relativeName string) (string, error) { diff --git a/sync/db.go b/sync/db.go new file mode 100644 index 0000000..11bfa66 --- /dev/null +++ b/sync/db.go @@ -0,0 +1,64 @@ +package sync + +import ( + "database/sql" + "fmt" + + "github.com/pkg/errors" + + "github.com/Luzifer/cloudbox/providers" +) + +const schema = ` +CREATE TABLE IF NOT EXISTS local_state ( + relative_name TEXT PRIMARY KEY, + last_modified DATETIME, + checksum TEXT, + size INT +); +CREATE TABLE IF NOT EXISTS remote_state ( + relative_name TEXT PRIMARY KEY, + last_modified DATETIME, + checksum TEXT, + size INT +); +` + +func (s *Sync) initSchema() error { + _, err := s.db.Exec(schema) + return err +} + +func (s *Sync) getDBFileInfo(side, relativeName string) (providers.FileInfo, error) { + info := providers.FileInfo{} + + stmt, err := s.db.Prepare(fmt.Sprintf("SELECT * from %s_state WHERE relative_name = ?", side)) + if err != nil { + return info, errors.Wrap(err, "Unable to prepare query") + } + + row := stmt.QueryRow(relativeName) + if err = row.Scan(&info.RelativeName, &info.LastModified, &info.Checksum, &info.Size); err != nil { + if err == sql.ErrNoRows { + return info, providers.ErrFileNotFound + } + return info, errors.Wrap(err, "Unable to read response") + } + + return info, nil +} + +func (s *Sync) setDBFileInfo(side, info providers.FileInfo) error { + stmt, err := s.db.Prepare(fmt.Sprintf( + `INSERT INTO %s_state VALUES(?, ?, ?, ?) + ON CONFLICT(relative_name) DO UPDATE SET + last_modified=excluded.last_modified, + checksum=excluded.checksum, + size=excluded.size`, side)) + if err != nil { + return errors.Wrap(err, "Unable to prepare query") + } + + _, err = stmt.Exec(info.RelativeName, info.LastModified, info.Checksum, info.Size) + return errors.Wrap(err, "Unable to upsert file info") +} diff --git a/sync/state.go b/sync/state.go new file mode 100644 index 0000000..80de354 --- /dev/null +++ b/sync/state.go @@ -0,0 +1,141 @@ +package sync + +import ( + "sort" + "strings" + "sync" + + "github.com/Luzifer/cloudbox/providers" +) + +type Change uint8 + +const ( + ChangeLocalAdd Change = 1 << iota + ChangeLocalDelete + ChangeLocalUpdate + ChangeRemoteAdd + ChangeRemoteDelete + ChangeRemoteUpdate +) + +func (c Change) Changed() bool { + return c != 0 +} + +func (c *Change) Register(add Change) { + *c = *c | add +} + +func (c Change) Has(test Change) bool { + return c&test != 0 +} + +func (c Change) Is(test Change) bool { + return c == test +} + +const ( + SideLocal string = "local" + SideRemote string = "remote" + SourceDB string = "db" + SourceScan string = "scan" +) + +type stateDetail struct { + LocalDB, + LocalScan, + RemoteDB, + RemoteScan *providers.FileInfo +} + +type state struct { + files map[string]*stateDetail + lock sync.Mutex +} + +func newState() *state { + return &state{ + files: make(map[string]*stateDetail), + } +} + +func (s *state) GetChangeFor(relativeName string) (result Change) { + s.lock.Lock() + defer s.lock.Unlock() + + d := s.files[relativeName] + + // No changes detected + if d.LocalDB.Equal(d.LocalScan) && d.RemoteDB.Equal(d.RemoteScan) { + // Check special case: Something went really wrong and sync state is FUBAR + if d.LocalDB == nil && d.RemoteDB != nil { + result.Register(ChangeRemoteAdd) + } + if d.LocalDB != nil && d.RemoteDB == nil { + result.Register(ChangeLocalAdd) + } + + return + } + + // Check for local changes + switch { + case d.LocalDB == nil && d.LocalScan != nil: + result.Register(ChangeLocalAdd) + + case d.LocalDB != nil && d.LocalScan == nil: + result.Register(ChangeLocalDelete) + + case !d.LocalDB.Equal(d.LocalScan): + result.Register(ChangeLocalUpdate) + } + + // Check for remote changes + switch { + case d.RemoteDB == nil && d.RemoteScan != nil: + result.Register(ChangeRemoteAdd) + + case d.RemoteDB != nil && d.RemoteScan == nil: + result.Register(ChangeRemoteDelete) + + case !d.RemoteDB.Equal(d.RemoteScan): + result.Register(ChangeRemoteUpdate) + } + + return +} + +func (s *state) GetRelativeNames() []string { + s.lock.Lock() + defer s.lock.Unlock() + + out := []string{} + for k := range s.files { + out = append(out, k) + } + + sort.Strings(out) + + return out +} + +func (s *state) Set(side, source, relativeName string, info providers.FileInfo) { + s.lock.Lock() + defer s.lock.Unlock() + + if _, ok := s.files[relativeName]; !ok { + s.files[relativeName] = &stateDetail{} + } + + switch strings.Join([]string{side, source}, "::") { + case "local::db": + s.files[relativeName].LocalDB = &info + case "local::scan": + s.files[relativeName].LocalScan = &info + case "remote::db": + s.files[relativeName].RemoteDB = &info + case "remote::scan": + s.files[relativeName].RemoteScan = &info + } +} diff --git a/sync/sync.go b/sync/sync.go index 8a9ac51..9692963 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -2,27 +2,58 @@ package sync import ( "database/sql" + "time" + + "github.com/pkg/errors" "github.com/Luzifer/cloudbox/providers" ) -type Sync struct { - db *sql.DB - local, remote providers.CloudProvider +type SyncConfig struct { + ForceUseChecksum bool `yaml:"force_use_checksum"` + ScanInterval time.Duration `yaml:"scan_interval"` } -func New(local, remote providers.CloudProvider, db *sql.DB) *Sync { +type Sync struct { + db *sql.DB + conf SyncConfig + local, remote providers.CloudProvider + + stop chan struct{} +} + +func New(local, remote providers.CloudProvider, db *sql.DB, conf SyncConfig) *Sync { return &Sync{ db: db, + conf: conf, local: local, remote: remote, + + stop: make(chan struct{}), } } func (s *Sync) Run() error { - for { - select {} + if err := s.initSchema(); err != nil { + return errors.Wrap(err, "Unable to initialize database schema") } - return nil + var ( + hashMethod = s.remote.GetChecksumMethod() + refresh = time.NewTimer(s.conf.ScanInterval) + useChecksum = s.remote.Capabilities().Has(providers.CapAutoChecksum) || s.conf.ForceUseChecksum + ) + + for { + select { + case <-refresh.C: + // TODO: Execute rescan & sync + refresh.Reset(s.conf.ScanInterval) + + case <-s.stop: + return nil + } + } } + +func (s *Sync) Stop() { s.stop <- struct{}{} }