1
0
Fork 0
mirror of https://github.com/Luzifer/cloudbox.git synced 2024-12-22 18:51:21 +00:00

Add change management

This commit is contained in:
Knut Ahlers 2019-06-16 18:53:30 +02:00
parent 3dfd5694d9
commit cf6c4d640d
Signed by: luzifer
GPG key ID: DC2729FDD34BE99E
10 changed files with 319 additions and 24 deletions

View file

@ -2,10 +2,13 @@ package main
import ( import (
"os" "os"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"github.com/Luzifer/cloudbox/sync"
) )
type shareConfig struct { type shareConfig struct {
@ -14,8 +17,9 @@ type shareConfig struct {
} }
type syncConfig struct { type syncConfig struct {
LocalDir string `yaml:"local_dir"` LocalDir string `yaml:"local_dir"`
RemoteURI string `yaml:"remote_uri"` RemoteURI string `yaml:"remote_uri"`
Settings sync.SyncConfig `yaml:"settings"`
} }
type configFile struct { type configFile struct {
@ -43,6 +47,11 @@ func (c configFile) validate() error {
func defaultConfig() *configFile { func defaultConfig() *configFile {
return &configFile{ return &configFile{
ControlDir: "~/.cache/cloudbox", ControlDir: "~/.cache/cloudbox",
Sync: syncConfig{
Settings: sync.SyncConfig{
ScanInterval: time.Minute,
},
},
} }
} }

View file

@ -20,7 +20,7 @@ func providerFromURI(uri string) (providers.CloudProvider, error) {
cp, err := f(uri) cp, err := f(uri)
switch err { switch err {
case nil: case nil:
if cp.Capabilities()&providers.CapBasic == 0 { if !cp.Capabilities().Has(providers.CapBasic) {
return nil, errors.Errorf("Provider %s does not support basic capabilities", cp.Name()) return nil, errors.Errorf("Provider %s does not support basic capabilities", cp.Name())
} }

View file

@ -3,7 +3,9 @@ package main
import ( import (
"database/sql" "database/sql"
"os" "os"
"os/signal"
"path" "path"
"syscall"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -37,7 +39,16 @@ func execSync() error {
return errors.Wrap(err, "Unable to establish database connection") return errors.Wrap(err, "Unable to establish database connection")
} }
s := sync.New(local, remote, db) s := sync.New(local, remote, db, conf.Sync.Settings)
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
go func() {
for range sigchan {
s.Stop()
}
}()
log.Info("Starting sync run...") log.Info("Starting sync run...")
return errors.Wrap(s.Run(), "Unable to sync") return errors.Wrap(s.Run(), "Unable to sync")

View file

@ -1,6 +1,7 @@
package providers package providers
import ( import (
"hash"
"io" "io"
"time" "time"
@ -11,7 +12,7 @@ var ErrFileNotFound = errors.New("File not found")
type File interface { type File interface {
Info() FileInfo Info() FileInfo
Checksum() (string, error) Checksum(hash.Hash) (string, error)
Content() (io.ReadCloser, error) Content() (io.ReadCloser, error)
} }
@ -21,3 +22,33 @@ type FileInfo struct {
Checksum string // Expected to be present on CapAutoChecksum Checksum string // Expected to be present on CapAutoChecksum
Size uint64 Size uint64
} }
func (f *FileInfo) Equal(other *FileInfo) bool {
if f == nil && other == nil {
// Both are not present: No change
return true
}
if (f != nil && other == nil) || (f == nil && other != nil) {
// One is not present, the other is: Change
return false
}
if (f.Checksum != "" || other.Checksum != "") && f.Checksum != other.Checksum {
// Checksum is present in one, doesn't match: Change
return false
}
if f.Size != other.Size {
// No checksums present, size differs: Change
return false
}
if !f.LastModified.Equal(other.LastModified) {
// LastModified date differs: Change
return false
}
// No changes detected yet: No change
return true
}

View file

@ -1,6 +1,8 @@
package providers package providers
import ( import (
"hash"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -12,6 +14,8 @@ const (
CapAutoChecksum CapAutoChecksum
) )
func (c Capability) Has(test Capability) bool { return c&test != 0 }
var ( var (
ErrInvalidURI = errors.New("Spefified URI is invalid for this provider") ErrInvalidURI = errors.New("Spefified URI is invalid for this provider")
ErrFeatureNotSupported = errors.New("Feature not supported") ErrFeatureNotSupported = errors.New("Feature not supported")
@ -21,10 +25,11 @@ type CloudProviderInitFunc func(string) (CloudProvider, error)
type CloudProvider interface { type CloudProvider interface {
Capabilities() Capability Capabilities() Capability
Name() string
DeleteFile(relativeName string) error DeleteFile(relativeName string) error
GetChecksumMethod() hash.Hash
GetFile(relativeName string) (File, error) GetFile(relativeName string) (File, error)
ListFiles() ([]File, error) ListFiles() ([]File, error)
PutFile(File) error Name() string
PutFile(File) (File, error)
Share(relativeName string) (string, error) Share(relativeName string) (string, error)
} }

View file

@ -2,8 +2,8 @@ package local
import ( import (
"bytes" "bytes"
"crypto/sha256"
"fmt" "fmt"
"hash"
"io" "io"
"os" "os"
@ -26,7 +26,7 @@ func (f File) Info() providers.FileInfo {
} }
} }
func (f File) Checksum() (string, error) { func (f File) Checksum(h hash.Hash) (string, error) {
fc, err := f.Content() fc, err := f.Content()
if err != nil { if err != nil {
return "", errors.Wrap(err, "Unable to get file contents") return "", errors.Wrap(err, "Unable to get file contents")
@ -37,7 +37,7 @@ func (f File) Checksum() (string, error) {
return "", errors.Wrap(err, "Unable to read file contents") return "", errors.Wrap(err, "Unable to read file contents")
} }
return fmt.Sprintf("%x", sha256.Sum256(buf.Bytes())), nil return fmt.Sprintf("%x", h.Sum(buf.Bytes())), nil
} }
func (f File) Content() (io.ReadCloser, error) { func (f File) Content() (io.ReadCloser, error) {

View file

@ -1,6 +1,8 @@
package local package local
import ( import (
"crypto/sha256"
"hash"
"io" "io"
"os" "os"
"path" "path"
@ -26,6 +28,7 @@ type Provider struct {
func (p Provider) Capabilities() providers.Capability { return providers.CapBasic } func (p Provider) Capabilities() providers.Capability { return providers.CapBasic }
func (p Provider) Name() string { return "local" } func (p Provider) Name() string { return "local" }
func (p Provider) GetChecksumMethod() hash.Hash { return sha256.New() }
func (p Provider) ListFiles() ([]providers.File, error) { func (p Provider) ListFiles() ([]providers.File, error) {
return nil, errors.New("Not implemented") return nil, errors.New("Not implemented")
@ -53,33 +56,33 @@ func (p Provider) GetFile(relativeName string) (providers.File, error) {
}, nil }, nil
} }
func (p Provider) PutFile(f providers.File) error { func (p Provider) PutFile(f providers.File) (providers.File, error) {
fullPath := path.Join(p.directory, f.Info().RelativeName) fullPath := path.Join(p.directory, f.Info().RelativeName)
fp, err := os.Create(fullPath) fp, err := os.Create(fullPath)
if err != nil { if err != nil {
return errors.Wrap(err, "Unable to create file") return nil, errors.Wrap(err, "Unable to create file")
} }
rfp, err := f.Content() rfp, err := f.Content()
if err != nil { if err != nil {
return errors.Wrap(err, "Unable to get remote file content") return nil, errors.Wrap(err, "Unable to get remote file content")
} }
defer rfp.Close() defer rfp.Close()
if _, err := io.Copy(fp, rfp); err != nil { if _, err := io.Copy(fp, rfp); err != nil {
return errors.Wrap(err, "Unable to copy file contents") return nil, errors.Wrap(err, "Unable to copy file contents")
} }
if err := fp.Close(); err != nil { if err := fp.Close(); err != nil {
return errors.Wrap(err, "Unable to close local file") return nil, errors.Wrap(err, "Unable to close local file")
} }
if err := os.Chtimes(fullPath, time.Now(), f.Info().LastModified); err != nil { if err := os.Chtimes(fullPath, time.Now(), f.Info().LastModified); err != nil {
return errors.Wrap(err, "Unable to set last file mod time") return nil, errors.Wrap(err, "Unable to set last file mod time")
} }
return nil return p.GetFile(f.Info().RelativeName)
} }
func (p Provider) Share(relativeName string) (string, error) { func (p Provider) Share(relativeName string) (string, error) {

64
sync/db.go Normal file
View file

@ -0,0 +1,64 @@
package sync
import (
"database/sql"
"fmt"
"github.com/pkg/errors"
"github.com/Luzifer/cloudbox/providers"
)
const schema = `
CREATE TABLE IF NOT EXISTS local_state (
relative_name TEXT PRIMARY KEY,
last_modified DATETIME,
checksum TEXT,
size INT
);
CREATE TABLE IF NOT EXISTS remote_state (
relative_name TEXT PRIMARY KEY,
last_modified DATETIME,
checksum TEXT,
size INT
);
`
func (s *Sync) initSchema() error {
_, err := s.db.Exec(schema)
return err
}
func (s *Sync) getDBFileInfo(side, relativeName string) (providers.FileInfo, error) {
info := providers.FileInfo{}
stmt, err := s.db.Prepare(fmt.Sprintf("SELECT * from %s_state WHERE relative_name = ?", side))
if err != nil {
return info, errors.Wrap(err, "Unable to prepare query")
}
row := stmt.QueryRow(relativeName)
if err = row.Scan(&info.RelativeName, &info.LastModified, &info.Checksum, &info.Size); err != nil {
if err == sql.ErrNoRows {
return info, providers.ErrFileNotFound
}
return info, errors.Wrap(err, "Unable to read response")
}
return info, nil
}
func (s *Sync) setDBFileInfo(side, info providers.FileInfo) error {
stmt, err := s.db.Prepare(fmt.Sprintf(
`INSERT INTO %s_state VALUES(?, ?, ?, ?)
ON CONFLICT(relative_name) DO UPDATE SET
last_modified=excluded.last_modified,
checksum=excluded.checksum,
size=excluded.size`, side))
if err != nil {
return errors.Wrap(err, "Unable to prepare query")
}
_, err = stmt.Exec(info.RelativeName, info.LastModified, info.Checksum, info.Size)
return errors.Wrap(err, "Unable to upsert file info")
}

141
sync/state.go Normal file
View file

@ -0,0 +1,141 @@
package sync
import (
"sort"
"strings"
"sync"
"github.com/Luzifer/cloudbox/providers"
)
type Change uint8
const (
ChangeLocalAdd Change = 1 << iota
ChangeLocalDelete
ChangeLocalUpdate
ChangeRemoteAdd
ChangeRemoteDelete
ChangeRemoteUpdate
)
func (c Change) Changed() bool {
return c != 0
}
func (c *Change) Register(add Change) {
*c = *c | add
}
func (c Change) Has(test Change) bool {
return c&test != 0
}
func (c Change) Is(test Change) bool {
return c == test
}
const (
SideLocal string = "local"
SideRemote string = "remote"
SourceDB string = "db"
SourceScan string = "scan"
)
type stateDetail struct {
LocalDB,
LocalScan,
RemoteDB,
RemoteScan *providers.FileInfo
}
type state struct {
files map[string]*stateDetail
lock sync.Mutex
}
func newState() *state {
return &state{
files: make(map[string]*stateDetail),
}
}
func (s *state) GetChangeFor(relativeName string) (result Change) {
s.lock.Lock()
defer s.lock.Unlock()
d := s.files[relativeName]
// No changes detected
if d.LocalDB.Equal(d.LocalScan) && d.RemoteDB.Equal(d.RemoteScan) {
// Check special case: Something went really wrong and sync state is FUBAR
if d.LocalDB == nil && d.RemoteDB != nil {
result.Register(ChangeRemoteAdd)
}
if d.LocalDB != nil && d.RemoteDB == nil {
result.Register(ChangeLocalAdd)
}
return
}
// Check for local changes
switch {
case d.LocalDB == nil && d.LocalScan != nil:
result.Register(ChangeLocalAdd)
case d.LocalDB != nil && d.LocalScan == nil:
result.Register(ChangeLocalDelete)
case !d.LocalDB.Equal(d.LocalScan):
result.Register(ChangeLocalUpdate)
}
// Check for remote changes
switch {
case d.RemoteDB == nil && d.RemoteScan != nil:
result.Register(ChangeRemoteAdd)
case d.RemoteDB != nil && d.RemoteScan == nil:
result.Register(ChangeRemoteDelete)
case !d.RemoteDB.Equal(d.RemoteScan):
result.Register(ChangeRemoteUpdate)
}
return
}
func (s *state) GetRelativeNames() []string {
s.lock.Lock()
defer s.lock.Unlock()
out := []string{}
for k := range s.files {
out = append(out, k)
}
sort.Strings(out)
return out
}
func (s *state) Set(side, source, relativeName string, info providers.FileInfo) {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.files[relativeName]; !ok {
s.files[relativeName] = &stateDetail{}
}
switch strings.Join([]string{side, source}, "::") {
case "local::db":
s.files[relativeName].LocalDB = &info
case "local::scan":
s.files[relativeName].LocalScan = &info
case "remote::db":
s.files[relativeName].RemoteDB = &info
case "remote::scan":
s.files[relativeName].RemoteScan = &info
}
}

View file

@ -2,27 +2,58 @@ package sync
import ( import (
"database/sql" "database/sql"
"time"
"github.com/pkg/errors"
"github.com/Luzifer/cloudbox/providers" "github.com/Luzifer/cloudbox/providers"
) )
type Sync struct { type SyncConfig struct {
db *sql.DB ForceUseChecksum bool `yaml:"force_use_checksum"`
local, remote providers.CloudProvider ScanInterval time.Duration `yaml:"scan_interval"`
} }
func New(local, remote providers.CloudProvider, db *sql.DB) *Sync { type Sync struct {
db *sql.DB
conf SyncConfig
local, remote providers.CloudProvider
stop chan struct{}
}
func New(local, remote providers.CloudProvider, db *sql.DB, conf SyncConfig) *Sync {
return &Sync{ return &Sync{
db: db, db: db,
conf: conf,
local: local, local: local,
remote: remote, remote: remote,
stop: make(chan struct{}),
} }
} }
func (s *Sync) Run() error { func (s *Sync) Run() error {
for { if err := s.initSchema(); err != nil {
select {} return errors.Wrap(err, "Unable to initialize database schema")
} }
return nil var (
hashMethod = s.remote.GetChecksumMethod()
refresh = time.NewTimer(s.conf.ScanInterval)
useChecksum = s.remote.Capabilities().Has(providers.CapAutoChecksum) || s.conf.ForceUseChecksum
)
for {
select {
case <-refresh.C:
// TODO: Execute rescan & sync
refresh.Reset(s.conf.ScanInterval)
case <-s.stop:
return nil
}
}
} }
func (s *Sync) Stop() { s.stop <- struct{}{} }