1
0
Fork 0
mirror of https://github.com/Luzifer/cloudbox.git synced 2024-11-09 14:40:08 +00:00
cloudbox/sync/sync.go

127 lines
2.6 KiB
Go
Raw Normal View History

2019-06-16 00:05:06 +00:00
package sync
import (
"database/sql"
2019-06-16 17:57:02 +00:00
"hash"
2019-06-16 16:53:30 +00:00
"time"
"github.com/pkg/errors"
2019-06-16 17:57:02 +00:00
log "github.com/sirupsen/logrus"
2019-06-16 00:05:06 +00:00
"github.com/Luzifer/cloudbox/providers"
)
type Config struct {
2019-06-16 16:53:30 +00:00
ForceUseChecksum bool `yaml:"force_use_checksum"`
ScanInterval time.Duration `yaml:"scan_interval"`
}
2019-06-16 00:05:06 +00:00
type Sync struct {
db *sql.DB
conf Config
2019-06-16 00:05:06 +00:00
local, remote providers.CloudProvider
2019-06-16 16:53:30 +00:00
2019-06-16 17:57:02 +00:00
log *log.Entry
useChecksum bool
hashMethod hash.Hash
2019-06-16 16:53:30 +00:00
stop chan struct{}
2019-06-16 00:05:06 +00:00
}
func New(local, remote providers.CloudProvider, db *sql.DB, conf Config, logger *log.Entry) *Sync {
2019-06-16 00:05:06 +00:00
return &Sync{
db: db,
2019-06-16 16:53:30 +00:00
conf: conf,
2019-06-16 00:05:06 +00:00
local: local,
remote: remote,
2019-06-16 16:53:30 +00:00
2019-06-16 17:57:02 +00:00
log: logger,
2019-06-16 16:53:30 +00:00
stop: make(chan struct{}),
2019-06-16 00:05:06 +00:00
}
}
func (s *Sync) Run() error {
2019-06-16 16:53:30 +00:00
if err := s.initSchema(); err != nil {
return errors.Wrap(err, "Unable to initialize database schema")
2019-06-16 00:05:06 +00:00
}
2019-06-16 17:57:02 +00:00
var refresh = time.NewTimer(s.conf.ScanInterval)
2019-06-16 16:53:30 +00:00
for {
select {
case <-refresh.C:
2019-06-16 17:57:02 +00:00
if err := s.runSync(); err != nil {
return errors.Wrap(err, "Sync failed")
}
2019-06-16 16:53:30 +00:00
refresh.Reset(s.conf.ScanInterval)
case <-s.stop:
return nil
}
}
2019-06-16 00:05:06 +00:00
}
2019-06-16 16:53:30 +00:00
func (s *Sync) Stop() { s.stop <- struct{}{} }
2019-06-16 17:57:02 +00:00
func (s *Sync) getFileInfo(f providers.File) (providers.FileInfo, error) {
var info = f.Info()
if !s.useChecksum || info.Checksum != "" {
return info, nil
}
cs, err := f.Checksum(s.hashMethod)
if err != nil {
return info, errors.Wrap(err, "Unable to fetch checksum")
}
info.Checksum = cs
return info, nil
}
func (s *Sync) fillStateFromProvider(syncState *state, provider providers.CloudProvider, side string) error {
2019-06-16 17:57:02 +00:00
files, err := provider.ListFiles()
if err != nil {
return errors.Wrap(err, "Unable to list files")
}
for _, f := range files {
info, err := s.getFileInfo(f)
if err != nil {
return errors.Wrap(err, "Unable to get file info")
2019-06-16 17:57:02 +00:00
}
syncState.Set(side, sourceScan, info)
}
return nil
}
func (s *Sync) runSync() error {
var syncState = newState()
s.hashMethod = s.remote.GetChecksumMethod()
s.useChecksum = s.remote.Capabilities().Has(providers.CapAutoChecksum) || s.conf.ForceUseChecksum
2019-06-16 17:57:02 +00:00
if err := s.updateStateFromDatabase(syncState); err != nil {
return errors.Wrap(err, "Unable to load database state")
}
if err := s.fillStateFromProvider(syncState, s.local, sideLocal); err != nil {
2019-06-16 17:57:02 +00:00
return errors.Wrap(err, "Unable to load local files")
}
if err := s.fillStateFromProvider(syncState, s.remote, sideRemote); err != nil {
2019-06-16 17:57:02 +00:00
return errors.Wrap(err, "Unable to load remote files")
}
2019-06-16 20:42:42 +00:00
for _, fileName := range syncState.GetRelativeNames() {
if err := s.decideAction(syncState, fileName); err != nil {
return errors.Wrap(err, "Could not execute sync")
}
}
2019-06-16 17:57:02 +00:00
return nil
}