diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..63bf876 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +config.yaml +dev_test diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..3e5ec3b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,31 @@ +FROM golang:alpine as builder + +ENV GO111MODULE=on + +COPY . /go/src/github.com/Luzifer/cloudbox +WORKDIR /go/src/github.com/Luzifer/cloudbox + +RUN set -ex \ + && apk add --update \ + build-base \ + git \ + sqlite \ + && go install -ldflags "-X main.version=$(git describe --tags --always || echo dev)" \ + github.com/Luzifer/cloudbox/cmd/cloudbox + + +FROM alpine:latest + +LABEL maintainer "Knut Ahlers " + +RUN set -ex \ + && apk --no-cache add \ + ca-certificates \ + sqlite + +COPY --from=builder /go/bin/cloudbox /usr/local/bin/cloudbox + +ENTRYPOINT ["/usr/local/bin/cloudbox"] +CMD ["--"] + +# vim: set ft=Dockerfile: diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..2ce1cf5 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2019- Knut Ahlers + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..1ff9dfe --- /dev/null +++ b/README.md @@ -0,0 +1,8 @@ +[![Go Report Card](https://goreportcard.com/badge/github.com/Luzifer/cloudbox)](https://goreportcard.com/report/github.com/Luzifer/cloudbox) +![](https://badges.fyi/github/license/Luzifer/cloudbox) +![](https://badges.fyi/github/downloads/Luzifer/cloudbox) +![](https://badges.fyi/github/latest-release/Luzifer/cloudbox) + +# Luzifer / cloudbox + +cloudbox is a bidirectional sync provider for local dirs and a remote provider like a S3 bucket. Changes are detected through file attributes (size, modification time) or if supported by the remote provider through checksums. diff --git a/cmd/cloudbox/config.go b/cmd/cloudbox/config.go new file mode 100644 index 0000000..22f5325 --- /dev/null +++ b/cmd/cloudbox/config.go @@ -0,0 +1,103 @@ +package main + +import ( + "os" + "time" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "gopkg.in/yaml.v2" + + "github.com/Luzifer/cloudbox/sync" +) + +type shareConfig struct { + OverrideURI bool `yaml:"override_uri"` + URITemplate string `yaml:"uri_template"` +} + +type syncConfig struct { + LocalDir string `yaml:"local_dir"` + RemoteURI string `yaml:"remote_uri"` + Settings sync.Config `yaml:"settings"` +} + +type configFile struct { + ControlDir string `yaml:"control_dir"` + Sync syncConfig `yaml:"sync"` + Share shareConfig `yaml:"share"` +} + +func (c configFile) validate() error { + if c.Sync.LocalDir == "" { + return errors.New("Local directory not specified") + } + + if c.Sync.RemoteURI == "" { + return errors.New("Remote sync URI not specified") + } + + if c.Share.OverrideURI && c.Share.URITemplate == "" { + return errors.New("Share URI override enabled but no template specified") + } + + return nil +} + +func defaultConfig() *configFile { + return &configFile{ + ControlDir: "~/.cache/cloudbox", + Sync: syncConfig{ + Settings: sync.Config{ + ScanInterval: time.Minute, + }, + }, + } +} + +func execWriteSampleConfig() error { + conf := defaultConfig() + + if _, err := os.Stat(cfg.Config); err == nil { + if conf, err = loadConfig(true); err != nil { + return errors.Wrap(err, "Unable to load existing config") + } + } + + f, err := os.Create(cfg.Config) + if err != nil { + return errors.Wrap(err, "Unable to create config file") + } + defer f.Close() + + f.WriteString("---\n\n") + + if err := yaml.NewEncoder(f).Encode(conf); err != nil { + return errors.Wrap(err, "Unable to write config file") + } + + f.WriteString("\n...\n") + + log.WithField("dest", cfg.Config).Info("Config written") + return nil +} + +func loadConfig(noValidate bool) (*configFile, error) { + config := defaultConfig() + + f, err := os.Open(cfg.Config) + if err != nil { + return nil, errors.Wrap(err, "Unable to open config file") + } + defer f.Close() + + if err = yaml.NewDecoder(f).Decode(config); err != nil { + return nil, errors.Wrap(err, "Unable to decode config") + } + + if noValidate { + return config, nil + } + + return config, config.validate() +} diff --git a/cmd/cloudbox/help.go b/cmd/cloudbox/help.go new file mode 100644 index 0000000..241aaba --- /dev/null +++ b/cmd/cloudbox/help.go @@ -0,0 +1,22 @@ +package main + +import ( + "fmt" + + "github.com/Luzifer/rconfig" +) + +const helpText = ` +Available commands: + help Display this message + share Shares a file and returns its URL when supported + sync Executes the bi-directional sync + write-config Write a sample configuration to specified location +` + +func execHelp() error { + rconfig.Usage() + + fmt.Print(helpText) + return nil +} diff --git a/cmd/cloudbox/main.go b/cmd/cloudbox/main.go new file mode 100644 index 0000000..58b37be --- /dev/null +++ b/cmd/cloudbox/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "fmt" + "os" + + "github.com/mitchellh/go-homedir" + log "github.com/sirupsen/logrus" + + "github.com/Luzifer/rconfig" +) + +type command string +type commandFunc func() error + +const ( + cmdHelp command = "help" + cmdShare command = "share" + cmdSync command = "sync" + cmdWriteConfig command = "write-config" +) + +var cmdFuncs = map[command]commandFunc{ + cmdShare: execShare, + cmdSync: execSync, + cmdWriteConfig: execWriteSampleConfig, +} + +var ( + cfg = struct { + Config string `flag:"config,c" default:"config.yaml" description:"Configuration file location"` + Force bool `flag:"force,f" default:"false" description:"Force operation"` + LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` + VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` + }{} + + version = "dev" +) + +func init() { + if err := rconfig.ParseAndValidate(&cfg); err != nil { + log.Fatalf("Unable to parse commandline options: %s", err) + } + + if cfg.VersionAndExit { + fmt.Printf("cloudbox %s\n", version) + os.Exit(0) + } + + if l, err := log.ParseLevel(cfg.LogLevel); err != nil { + log.WithError(err).Fatal("Unable to parse log level") + } else { + log.SetLevel(l) + } + + if dir, err := homedir.Expand(cfg.Config); err != nil { + log.WithError(err).Fatal("Unable to expand config path") + } else { + cfg.Config = dir + } +} + +func main() { + cmd := cmdHelp + if len(rconfig.Args()) > 1 { + cmd = command(rconfig.Args()[1]) + } + + var cmdFunc commandFunc = execHelp + if f, ok := cmdFuncs[cmd]; ok { + cmdFunc = f + } + + log.WithField("version", version).Info("cloudbox started") + + if err := cmdFunc(); err != nil { + log.WithError(err).Fatal("Command execution failed") + } +} diff --git a/cmd/cloudbox/providers.go b/cmd/cloudbox/providers.go new file mode 100644 index 0000000..e1d8d6d --- /dev/null +++ b/cmd/cloudbox/providers.go @@ -0,0 +1,38 @@ +package main + +import ( + "github.com/pkg/errors" + + "github.com/Luzifer/cloudbox/providers" + "github.com/Luzifer/cloudbox/providers/local" + "github.com/Luzifer/cloudbox/providers/s3" +) + +var providerInitFuncs = []providers.CloudProviderInitFunc{ + local.New, + s3.New, +} + +func providerFromURI(uri string) (providers.CloudProvider, error) { + if uri == "" { + return nil, errors.New("Empty provider URI") + } + + for _, f := range providerInitFuncs { + cp, err := f(uri) + switch err { + case nil: + if !cp.Capabilities().Has(providers.CapBasic) { + return nil, errors.Errorf("Provider %s does not support basic capabilities", cp.Name()) + } + + return cp, nil + case providers.ErrInvalidURI: + // Fine for now, try next one + default: + return nil, errors.Wrap(err, "Unable to initialize provider") + } + } + + return nil, errors.Errorf("No provider found for URI %q", uri) +} diff --git a/cmd/cloudbox/share.go b/cmd/cloudbox/share.go new file mode 100644 index 0000000..2f1694b --- /dev/null +++ b/cmd/cloudbox/share.go @@ -0,0 +1,56 @@ +package main + +import ( + "fmt" + "os" + "text/template" + + "github.com/pkg/errors" + + "github.com/Luzifer/cloudbox/providers" + "github.com/Luzifer/rconfig" +) + +func execShare() error { + conf, err := loadConfig(false) + if err != nil { + return errors.Wrap(err, "Unable to load config") + } + + remote, err := providerFromURI(conf.Sync.RemoteURI) + if err != nil { + return errors.Wrap(err, "Unable to initialize remote provider") + } + + if !remote.Capabilities().Has(providers.CapShare) { + return errors.New("Remote provider does not support sharing") + } + + if len(rconfig.Args()) < 3 { + return errors.New("No filename provided to share") + } + + relativeName := rconfig.Args()[2] + providerURL, err := remote.Share(relativeName) + if err != nil { + return errors.Wrap(err, "Unable to share file") + } + + if !conf.Share.OverrideURI { + fmt.Println(providerURL) + return nil + } + + tpl, err := template.New("share_uri").Parse(conf.Share.URITemplate) + if err != nil { + return errors.Wrap(err, "Unable to parse URI template") + } + + if err := tpl.Execute(os.Stdout, map[string]interface{}{ + "file": relativeName, + }); err != nil { + return errors.Wrap(err, "Unable to render share URI") + } + + return nil +} diff --git a/cmd/cloudbox/sync.go b/cmd/cloudbox/sync.go new file mode 100644 index 0000000..15c9684 --- /dev/null +++ b/cmd/cloudbox/sync.go @@ -0,0 +1,55 @@ +package main + +import ( + "database/sql" + "os" + "os/signal" + "path" + "syscall" + + _ "github.com/mattn/go-sqlite3" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/Luzifer/cloudbox/sync" +) + +func execSync() error { + conf, err := loadConfig(false) + if err != nil { + return errors.Wrap(err, "Unable to load config") + } + + local, err := providerFromURI("file://" + conf.Sync.LocalDir) + if err != nil { + return errors.Wrap(err, "Unable to initialize local provider") + } + + remote, err := providerFromURI(conf.Sync.RemoteURI) + if err != nil { + return errors.Wrap(err, "Unable to initialize remote provider") + } + + if err := os.MkdirAll(conf.ControlDir, 0700); err != nil { + return errors.Wrap(err, "Unable to create control dir") + } + + db, err := sql.Open("sqlite3", path.Join(conf.ControlDir, "sync.db")) + if err != nil { + return errors.Wrap(err, "Unable to establish database connection") + } + + s := sync.New(local, remote, db, conf.Sync.Settings, log.NewEntry(log.StandardLogger())) + + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + for range sigchan { + s.Stop() + } + }() + + log.Info("Starting sync run...") + return errors.Wrap(s.Run(), "Unable to sync") +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..936b026 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module github.com/Luzifer/cloudbox + +go 1.12 + +require ( + github.com/Luzifer/rconfig v2.2.0+incompatible + github.com/aws/aws-sdk-go v1.20.12 + github.com/mattn/go-sqlite3 v1.10.0 + github.com/mitchellh/go-homedir v1.1.0 + github.com/pkg/errors v0.8.1 + github.com/sirupsen/logrus v1.4.2 + github.com/spf13/pflag v1.0.3 // indirect + gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 // indirect + gopkg.in/yaml.v2 v2.2.2 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e72f10c --- /dev/null +++ b/go.sum @@ -0,0 +1,28 @@ +github.com/Luzifer/rconfig v2.2.0+incompatible h1:Kle3+rshPM7LxciOheaR4EfHUzibkDDGws04sefQ5m8= +github.com/Luzifer/rconfig v2.2.0+incompatible/go.mod h1:9pet6z2+mm/UAB0jF/rf0s62USfHNolzgR6Q4KpsJI0= +github.com/aws/aws-sdk-go v1.20.12 h1:xV7xfLSkiqd7JOnLlfER+Jz8kI98rAGJvtXssYkCRs4= +github.com/aws/aws-sdk-go v1.20.12/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o= +github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 h1:WB265cn5OpO+hK3pikC9hpP1zI/KTwmyMFKloW9eOVc= +gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/providers/file.go b/providers/file.go new file mode 100644 index 0000000..f992087 --- /dev/null +++ b/providers/file.go @@ -0,0 +1,54 @@ +package providers + +import ( + "hash" + "io" + "time" + + "github.com/pkg/errors" +) + +var ErrFileNotFound = errors.New("File not found") + +type File interface { + Info() FileInfo + Checksum(hash.Hash) (string, error) + Content() (io.ReadCloser, error) +} + +type FileInfo struct { + RelativeName string + LastModified time.Time + Checksum string // Expected to be present on CapAutoChecksum + Size uint64 +} + +func (f *FileInfo) Equal(other *FileInfo) bool { + if f == nil && other == nil { + // Both are not present: No change + return true + } + + if (f != nil && other == nil) || (f == nil && other != nil) { + // One is not present, the other is: Change + return false + } + + if (f.Checksum != "" || other.Checksum != "") && f.Checksum != other.Checksum { + // Checksum is present in one, doesn't match: Change + return false + } + + if f.Size != other.Size { + // No checksums present, size differs: Change + return false + } + + if !f.LastModified.Equal(other.LastModified) { + // LastModified date differs: Change + return false + } + + // No changes detected yet: No change + return true +} diff --git a/providers/interface.go b/providers/interface.go new file mode 100644 index 0000000..007ea35 --- /dev/null +++ b/providers/interface.go @@ -0,0 +1,35 @@ +package providers + +import ( + "hash" + + "github.com/pkg/errors" +) + +type Capability uint8 + +const ( + CapBasic Capability = 1 << iota + CapShare + CapAutoChecksum +) + +func (c Capability) Has(test Capability) bool { return c&test != 0 } + +var ( + ErrInvalidURI = errors.New("Spefified URI is invalid for this provider") + ErrFeatureNotSupported = errors.New("Feature not supported") +) + +type CloudProviderInitFunc func(string) (CloudProvider, error) + +type CloudProvider interface { + Capabilities() Capability + DeleteFile(relativeName string) error + GetChecksumMethod() hash.Hash + GetFile(relativeName string) (File, error) + ListFiles() ([]File, error) + Name() string + PutFile(File) (File, error) + Share(relativeName string) (string, error) +} diff --git a/providers/local/file.go b/providers/local/file.go new file mode 100644 index 0000000..d756fbd --- /dev/null +++ b/providers/local/file.go @@ -0,0 +1,46 @@ +package local + +import ( + "bytes" + "fmt" + "hash" + "io" + "os" + + "github.com/pkg/errors" + + "github.com/Luzifer/cloudbox/providers" +) + +type File struct { + info os.FileInfo + relativeName string + fullPath string +} + +func (f File) Info() providers.FileInfo { + return providers.FileInfo{ + RelativeName: f.relativeName, + LastModified: f.info.ModTime(), + Size: uint64(f.info.Size()), + } +} + +func (f File) Checksum(h hash.Hash) (string, error) { + fc, err := f.Content() + if err != nil { + return "", errors.Wrap(err, "Unable to get file contents") + } + + buf := new(bytes.Buffer) + if _, err := io.Copy(buf, fc); err != nil { + return "", errors.Wrap(err, "Unable to read file contents") + } + + return fmt.Sprintf("%x", h.Sum(buf.Bytes())), nil +} + +func (f File) Content() (io.ReadCloser, error) { + fp, err := os.Open(f.fullPath) + return fp, errors.Wrap(err, "Unable to open file") +} diff --git a/providers/local/provider.go b/providers/local/provider.go new file mode 100644 index 0000000..10a7748 --- /dev/null +++ b/providers/local/provider.go @@ -0,0 +1,120 @@ +package local + +import ( + "crypto/sha256" + "hash" + "io" + "os" + "path" + "path/filepath" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/Luzifer/cloudbox/providers" +) + +func New(uri string) (providers.CloudProvider, error) { + if !strings.HasPrefix(uri, "file://") { + return nil, providers.ErrInvalidURI + } + + return &Provider{directory: strings.TrimPrefix(uri, "file://")}, nil +} + +type Provider struct { + directory string +} + +func (p Provider) Capabilities() providers.Capability { return providers.CapBasic } +func (p Provider) Name() string { return "local" } +func (p Provider) GetChecksumMethod() hash.Hash { return sha256.New() } + +func (p Provider) ListFiles() ([]providers.File, error) { + var ( + absPath string + err error + files []providers.File + ) + + if absPath, err = filepath.Abs(p.directory); err != nil { + return nil, errors.Wrap(err, "Unable to calculate absolute path") + } + + err = filepath.Walk(absPath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if info.IsDir() { + // We behave like git: We don't care about dirs themselves + return nil + } + + files = append(files, File{ + info: info, + relativeName: strings.TrimLeft(strings.TrimPrefix(path, absPath), "/"), + fullPath: path, + }) + + return nil + }) + + return files, errors.Wrap(err, "File listing failed") +} + +func (p Provider) DeleteFile(relativeName string) error { + return os.Remove(path.Join(p.directory, relativeName)) +} + +func (p Provider) GetFile(relativeName string) (providers.File, error) { + fullPath := path.Join(p.directory, relativeName) + + stat, err := os.Stat(fullPath) + if err != nil { + if os.IsNotExist(err) { + return nil, providers.ErrFileNotFound + } + return nil, errors.Wrap(err, "Unable to get file stat") + } + + return File{ + info: stat, + relativeName: relativeName, + fullPath: fullPath, + }, nil +} + +func (p Provider) PutFile(f providers.File) (providers.File, error) { + fullPath := path.Join(p.directory, f.Info().RelativeName) + + fp, err := os.Create(fullPath) + if err != nil { + return nil, errors.Wrap(err, "Unable to create file") + } + + rfp, err := f.Content() + if err != nil { + return nil, errors.Wrap(err, "Unable to get remote file content") + } + defer rfp.Close() + + if _, err := io.Copy(fp, rfp); err != nil { + return nil, errors.Wrap(err, "Unable to copy file contents") + } + + if err := fp.Close(); err != nil { + return nil, errors.Wrap(err, "Unable to close local file") + } + + if err := os.Chtimes(fullPath, time.Now(), f.Info().LastModified); err != nil { + return nil, errors.Wrap(err, "Unable to set last file mod time") + } + + return p.GetFile(f.Info().RelativeName) +} + +func (p Provider) Share(relativeName string) (string, error) { + return "", providers.ErrFeatureNotSupported +} diff --git a/providers/s3/file.go b/providers/s3/file.go new file mode 100644 index 0000000..8f31d41 --- /dev/null +++ b/providers/s3/file.go @@ -0,0 +1,61 @@ +package s3 + +import ( + "bytes" + "fmt" + "hash" + "io" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/pkg/errors" + + "github.com/Luzifer/cloudbox/providers" +) + +type File struct { + key string + lastModified time.Time + checksum string + size uint64 + + s3Conn *s3.S3 + bucket string +} + +func (f File) Info() providers.FileInfo { + return providers.FileInfo{ + RelativeName: f.key, + LastModified: f.lastModified, + Checksum: f.checksum, + Size: f.size, + } +} + +func (f File) Checksum(h hash.Hash) (string, error) { + cont, err := f.Content() + if err != nil { + return "", errors.Wrap(err, "Unable to get file content") + } + defer cont.Close() + + buf := new(bytes.Buffer) + if _, err := io.Copy(buf, cont); err != nil { + return "", errors.Wrap(err, "Unable to read file content") + } + + return fmt.Sprintf("%x", h.Sum(buf.Bytes())), nil +} + +func (f File) Content() (io.ReadCloser, error) { + resp, err := f.s3Conn.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(f.bucket), + Key: aws.String(f.key), + }) + if err != nil { + return nil, errors.Wrap(err, "Unable to get file") + } + + return resp.Body, nil +} diff --git a/providers/s3/provider.go b/providers/s3/provider.go new file mode 100644 index 0000000..4fc307e --- /dev/null +++ b/providers/s3/provider.go @@ -0,0 +1,177 @@ +package s3 + +import ( + "bytes" + "context" + "crypto/md5" + "fmt" + "hash" + "io" + "net/url" + "strings" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/pkg/errors" + + "github.com/Luzifer/cloudbox/providers" +) + +type Provider struct { + bucket string + bucketRegion string + s3 *s3.S3 +} + +func New(uri string) (providers.CloudProvider, error) { + if !strings.HasPrefix(uri, "s3://") { + return nil, providers.ErrInvalidURI + } + + u, err := url.Parse(uri) + if err != nil { + return nil, errors.Wrap(err, "Invalid URI specified") + } + + region, err := s3manager.GetBucketRegion(context.Background(), session.New(), u.Host, "us-east-1") + if err != nil { + return nil, errors.Wrap(err, "Unable to find bucket region") + } + + cfg := aws.NewConfig().WithRegion(region) + if u.User != nil { + user := u.User.Username() + pass, _ := u.User.Password() + cfg = cfg.WithCredentials(credentials.NewStaticCredentials(user, pass, "")) + } + + sess := session.Must(session.NewSession(cfg)) + svc := s3.New(sess) + + return &Provider{ + bucket: u.Host, + bucketRegion: region, + s3: svc, + }, nil +} + +func (p *Provider) Capabilities() providers.Capability { + return providers.CapBasic | providers.CapAutoChecksum | providers.CapShare +} +func (p *Provider) Name() string { return "s3" } +func (p *Provider) GetChecksumMethod() hash.Hash { return md5.New() } + +func (p *Provider) DeleteFile(relativeName string) error { + _, err := p.s3.DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(p.bucket), + Key: aws.String(relativeName), + }) + + return errors.Wrap(err, "Unable to delete object") +} + +func (p *Provider) GetFile(relativeName string) (providers.File, error) { + resp, err := p.s3.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(p.bucket), + Key: aws.String(relativeName), + }) + if err != nil { + return nil, errors.Wrap(err, "Unable to fetch head information") + } + + return File{ + key: relativeName, + lastModified: *resp.LastModified, + checksum: strings.Trim(*resp.ETag, `"`), + size: uint64(*resp.ContentLength), + + s3Conn: p.s3, + bucket: p.bucket, + }, nil +} + +func (p *Provider) ListFiles() ([]providers.File, error) { + var files []providers.File + + err := p.s3.ListObjectsPages(&s3.ListObjectsInput{ + Bucket: aws.String(p.bucket), + }, func(out *s3.ListObjectsOutput, lastPage bool) bool { + for _, obj := range out.Contents { + files = append(files, File{ + key: *obj.Key, + lastModified: *obj.LastModified, + checksum: strings.Trim(*obj.ETag, `"`), + size: uint64(*obj.Size), + + s3Conn: p.s3, + bucket: p.bucket, + }) + } + + return !lastPage + }) + + return files, errors.Wrap(err, "Unable to list objects") +} + +func (p *Provider) PutFile(f providers.File) (providers.File, error) { + body, err := f.Content() + if err != nil { + return nil, errors.Wrap(err, "Unable to get file reader") + } + defer body.Close() + + buf := new(bytes.Buffer) + if _, err := io.Copy(buf, body); err != nil { + return nil, errors.Wrap(err, "Unable to read source file") + } + + if _, err = p.s3.PutObject(&s3.PutObjectInput{ + ACL: aws.String(p.getFileACL(f.Info().RelativeName)), + Body: bytes.NewReader(buf.Bytes()), + Bucket: aws.String(p.bucket), + Key: aws.String(f.Info().RelativeName), + }); err != nil { + return nil, errors.Wrap(err, "Unable to write file") + } + + return p.GetFile(f.Info().RelativeName) +} + +func (p *Provider) Share(relativeName string) (string, error) { + _, err := p.s3.PutObjectAcl(&s3.PutObjectAclInput{ + ACL: aws.String(s3.ObjectCannedACLPublicRead), + Bucket: aws.String(p.bucket), + Key: aws.String(relativeName), + }) + if err != nil { + return "", errors.Wrap(err, "Unable to publish file") + } + + return fmt.Sprintf("https://s3-%s.amazonaws.com/%s/%s", p.bucketRegion, p.bucket, relativeName), nil +} + +func (p *Provider) getFileACL(relativeName string) string { + objACL, err := p.s3.GetObjectAcl(&s3.GetObjectAclInput{ + Bucket: aws.String(p.bucket), + Key: aws.String(relativeName), + }) + + if err != nil { + return s3.ObjectCannedACLPrivate + } + + for _, g := range objACL.Grants { + if g.Grantee == nil || g.Grantee.URI == nil { + continue + } + if *g.Grantee.URI == "http://acs.amazonaws.com/groups/global/AllUsers" && *g.Permission == "READ" { + return s3.ObjectCannedACLPublicRead + } + } + + return s3.ObjectCannedACLPrivate +} diff --git a/sync/change.go b/sync/change.go new file mode 100644 index 0000000..880dac8 --- /dev/null +++ b/sync/change.go @@ -0,0 +1,69 @@ +package sync + +import "strings" + +type Change uint8 + +const ( + ChangeLocalAdd Change = 1 << iota + ChangeLocalDelete + ChangeLocalUpdate + ChangeRemoteAdd + ChangeRemoteDelete + ChangeRemoteUpdate +) + +var changeNameMap = map[Change]string{ + ChangeLocalAdd: "local-add", + ChangeLocalDelete: "local-delete", + ChangeLocalUpdate: "local-update", + ChangeRemoteAdd: "remote-add", + ChangeRemoteDelete: "remote-delete", + ChangeRemoteUpdate: "remote-update", +} + +func (c Change) Changed() bool { + return c != 0 +} + +func (c *Change) Register(add Change) { + *c |= add +} + +func (c Change) HasAll(test ...Change) bool { + for _, t := range test { + if c&t == 0 { + return false + } + } + + return true +} + +func (c Change) HasOne(test ...Change) bool { + for _, t := range test { + if c&t != 0 { + return true + } + } + return false +} + +func (c Change) Is(test Change) bool { + return c == test +} + +func (c Change) String() string { + if !c.Changed() { + return "none" + } + + names := []string{} + for k, v := range changeNameMap { + if c.HasOne(k) { + names = append(names, v) + } + } + + return strings.Join(names, ", ") +} diff --git a/sync/db.go b/sync/db.go new file mode 100644 index 0000000..e6dc6e7 --- /dev/null +++ b/sync/db.go @@ -0,0 +1,77 @@ +package sync + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/Luzifer/cloudbox/providers" +) + +const schema = ` +CREATE TABLE IF NOT EXISTS local_state ( + relative_name TEXT PRIMARY KEY, + last_modified DATETIME, + checksum TEXT, + size INT +); +CREATE TABLE IF NOT EXISTS remote_state ( + relative_name TEXT PRIMARY KEY, + last_modified DATETIME, + checksum TEXT, + size INT +); +` + +func (s *Sync) initSchema() error { + _, err := s.db.Exec(schema) + return err +} + +func (s *Sync) deleteDBFileInfo(side, relativeName string) error { + // #nosec G201 - fmt is only used to prefix a table with a constant, no user input + stmt, err := s.db.Prepare(fmt.Sprintf(`DELETE FROM %s_state WHERE relative_name = ?`, side)) + if err != nil { + return errors.Wrap(err, "Unable to prepare query") + } + + _, err = stmt.Exec(relativeName) + return errors.Wrap(err, "Unable to delete file info") +} + +func (s *Sync) setDBFileInfo(side string, info providers.FileInfo) error { + // #nosec G201 - fmt is only used to prefix a table with a constant, no user input + stmt, err := s.db.Prepare(fmt.Sprintf( + `INSERT INTO %s_state VALUES(?, ?, ?, ?) + ON CONFLICT(relative_name) DO UPDATE SET + last_modified=excluded.last_modified, + checksum=excluded.checksum, + size=excluded.size`, side)) + if err != nil { + return errors.Wrap(err, "Unable to prepare query") + } + + _, err = stmt.Exec(info.RelativeName, info.LastModified, info.Checksum, info.Size) + return errors.Wrap(err, "Unable to upsert file info") +} + +func (s *Sync) updateStateFromDatabase(st *state) error { + for _, table := range []string{sideLocal, sideRemote} { + // #nosec G201 - fmt is only used to prefix a table with a constant, no user input + rows, err := s.db.Query(fmt.Sprintf("SELECT * FROM %s_state", table)) + if err != nil { + return errors.Wrapf(err, "Unable to query table %s", table) + } + defer rows.Close() + + for rows.Next() { + info := providers.FileInfo{} + if err = rows.Scan(&info.RelativeName, &info.LastModified, &info.Checksum, &info.Size); err != nil { + return errors.Wrap(err, "Unable to read response") + } + st.Set(table, sourceDB, info) + } + } + + return nil +} diff --git a/sync/execute.go b/sync/execute.go new file mode 100644 index 0000000..b0fa917 --- /dev/null +++ b/sync/execute.go @@ -0,0 +1,106 @@ +package sync + +import ( + "crypto/sha256" + + "github.com/pkg/errors" + + "github.com/Luzifer/cloudbox/providers" +) + +func (s *Sync) addBothCreated(fileName string) error { + // Use forced sha256 to ensure lesser chance for collision + var hashMethod = sha256.New() + + local, err := s.local.GetFile(fileName) + if err != nil { + return errors.Wrap(err, "Unable to retrieve file from local") + } + + remote, err := s.remote.GetFile(fileName) + if err != nil { + return errors.Wrap(err, "Unable to retrieve file from remote") + } + + localSum, err := local.Checksum(hashMethod) + if err != nil { + return errors.Wrap(err, "Unable to get checksum from local file") + } + + remoteSum, err := remote.Checksum(hashMethod) + if err != nil { + return errors.Wrap(err, "Unable to get checksum from remote file") + } + + if localSum != remoteSum { + return errors.New("Checksums differ") + } + + localInfo, err := s.getFileInfo(local) + if err != nil { + return errors.Wrap(err, "Unable to get file info for local file") + } + + if err := s.setDBFileInfo(sideLocal, localInfo); err != nil { + return errors.Wrap(err, "Unable to update DB info for local file") + } + + remoteInfo, err := s.getFileInfo(remote) + if err != nil { + return errors.Wrap(err, "Unable to get file info for remote file") + } + + if err := s.setDBFileInfo(sideRemote, remoteInfo); err != nil { + return errors.Wrap(err, "Unable to update DB info for remote file") + } + + return nil +} + +func (s *Sync) deleteFile(on providers.CloudProvider, fileName string) error { + if err := on.DeleteFile(fileName); err != nil { + return errors.Wrap(err, "Unable to delete file") + } + + if err := s.deleteDBFileInfo(sideLocal, fileName); err != nil { + return errors.Wrap(err, "Umable to delete local file info") + } + + if err := s.deleteDBFileInfo(sideRemote, fileName); err != nil { + return errors.Wrap(err, "Umable to delete remote file info") + } + + return nil +} + +func (s *Sync) transferFile(from, to providers.CloudProvider, sideFrom, sideTo, fileName string) error { + file, err := from.GetFile(fileName) + if err != nil { + return errors.Wrap(err, "Unable to retrieve file") + } + + newFile, err := to.PutFile(file) + if err != nil { + return errors.Wrap(err, "Unable to put file") + } + + newFileInfo, err := s.getFileInfo(newFile) + if err != nil { + return errors.Wrap(err, "Unable to get file info for target file") + } + + if err := s.setDBFileInfo(sideTo, newFileInfo); err != nil { + return errors.Wrap(err, "Unable to update DB info for target file") + } + + fileInfo, err := s.getFileInfo(file) + if err != nil { + return errors.Wrap(err, "Unable to get file info for source file") + } + + if err := s.setDBFileInfo(sideFrom, fileInfo); err != nil { + return errors.Wrap(err, "Unable to update DB info for source file") + } + + return nil +} diff --git a/sync/logic.go b/sync/logic.go new file mode 100644 index 0000000..22b2ff1 --- /dev/null +++ b/sync/logic.go @@ -0,0 +1,74 @@ +package sync + +import log "github.com/sirupsen/logrus" + +func (s *Sync) decideAction(syncState *state, fileName string) error { + var ( + change = syncState.GetChangeFor(fileName) + logger = log.WithField("filename", fileName) + ) + + switch { + case !change.Changed(): + // No changes at all: Get out of here + logger.Debug("File in sync") + return nil + + case change.HasAll(ChangeLocalUpdate, ChangeRemoteUpdate): + // We do have local and remote changes: Check both are now the same or leave this to manual resolve + logger.Warn("File has local and remote updates, sync not possible") + + case change.HasAll(ChangeLocalAdd, ChangeRemoteAdd): + // Special case: Both are added, check they are the same file or leave this to manual resolve + logger.Debug("File added locally as well as remotely") + + if err := s.addBothCreated(fileName); err != nil { + logger.WithError(err).Error("Unable to add locally as well as remotely added file") + } + + case change.HasAll(ChangeLocalDelete, ChangeRemoteDelete): + // Special case: Both vanished, we just need to clean up the sync cache + logger.Debug("File deleted locally as well as remotely") + + if err := s.deleteDBFileInfo(sideLocal, fileName); err != nil { + logger.WithError(err).Error("Unable to delete local file info") + return nil + } + + if err := s.deleteDBFileInfo(sideRemote, fileName); err != nil { + logger.WithError(err).Error("Unable to delete remote file info") + return nil + } + + case change.Is(ChangeLocalAdd) || change.Is(ChangeLocalUpdate): + logger.Debug("File added or changed locally, uploading...") + if err := s.transferFile(s.local, s.remote, sideLocal, sideRemote, fileName); err != nil { + logger.WithError(err).Error("Unable to upload file") + } + + case change.Is(ChangeLocalDelete): + logger.Debug("File deleted locally, removing from remote...") + if err := s.deleteFile(s.remote, fileName); err != nil { + logger.WithError(err).Error("Unable to delete file from remote") + } + + case change.Is(ChangeRemoteAdd) || change.Is(ChangeRemoteUpdate): + logger.Debug("File added or changed remotely, downloading...") + if err := s.transferFile(s.remote, s.local, sideRemote, sideLocal, fileName); err != nil { + logger.WithError(err).Error("Unable to download file") + } + + case change.Is(ChangeRemoteDelete): + logger.Debug("File deleted remotely, removing from local...") + if err := s.deleteFile(s.local, fileName); err != nil { + logger.WithError(err).Error("Unable to delete file from local") + } + + default: + // Unhandled case (i.e. human screwed around in sync process) + // Stuff like: LocalUpdate + RemoteDelete, ... + logger.WithField("change", change.String()).Warn("Unhandled change case, sync not possible") + } + + return nil +} diff --git a/sync/state.go b/sync/state.go new file mode 100644 index 0000000..d7522db --- /dev/null +++ b/sync/state.go @@ -0,0 +1,114 @@ +package sync + +import ( + "sort" + "strings" + "sync" + + "github.com/Luzifer/cloudbox/providers" +) + +const ( + sideLocal string = "local" + sideRemote string = "remote" + sourceDB string = "db" + sourceScan string = "scan" +) + +type stateDetail struct { + LocalDB, + LocalScan, + RemoteDB, + RemoteScan *providers.FileInfo +} + +type state struct { + files map[string]*stateDetail + lock sync.Mutex +} + +func newState() *state { + return &state{ + files: make(map[string]*stateDetail), + } +} + +func (s *state) GetChangeFor(relativeName string) (result Change) { + s.lock.Lock() + defer s.lock.Unlock() + + d := s.files[relativeName] + + // No changes detected + if d.LocalDB.Equal(d.LocalScan) && d.RemoteDB.Equal(d.RemoteScan) { + // Check special case: Something went really wrong and sync state is FUBAR + if d.LocalDB == nil && d.RemoteDB != nil { + result.Register(ChangeRemoteAdd) + } + if d.LocalDB != nil && d.RemoteDB == nil { + result.Register(ChangeLocalAdd) + } + + return result + } + + // Check for local changes + switch { + case d.LocalDB == nil && d.LocalScan != nil: + result.Register(ChangeLocalAdd) + + case d.LocalDB != nil && d.LocalScan == nil: + result.Register(ChangeLocalDelete) + + case !d.LocalDB.Equal(d.LocalScan): + result.Register(ChangeLocalUpdate) + } + + // Check for remote changes + switch { + case d.RemoteDB == nil && d.RemoteScan != nil: + result.Register(ChangeRemoteAdd) + + case d.RemoteDB != nil && d.RemoteScan == nil: + result.Register(ChangeRemoteDelete) + + case !d.RemoteDB.Equal(d.RemoteScan): + result.Register(ChangeRemoteUpdate) + } + + return result +} + +func (s *state) GetRelativeNames() []string { + s.lock.Lock() + defer s.lock.Unlock() + + out := []string{} + for k := range s.files { + out = append(out, k) + } + + sort.Strings(out) + + return out +} + +func (s *state) Set(side, source string, info providers.FileInfo) { + s.lock.Lock() + defer s.lock.Unlock() + + if _, ok := s.files[info.RelativeName]; !ok { + s.files[info.RelativeName] = &stateDetail{} + } + + switch strings.Join([]string{side, source}, "::") { + case "local::db": + s.files[info.RelativeName].LocalDB = &info + case "local::scan": + s.files[info.RelativeName].LocalScan = &info + case "remote::db": + s.files[info.RelativeName].RemoteDB = &info + case "remote::scan": + s.files[info.RelativeName].RemoteScan = &info + } +} diff --git a/sync/sync.go b/sync/sync.go new file mode 100644 index 0000000..9f5abd6 --- /dev/null +++ b/sync/sync.go @@ -0,0 +1,126 @@ +package sync + +import ( + "database/sql" + "hash" + "time" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/Luzifer/cloudbox/providers" +) + +type Config struct { + ForceUseChecksum bool `yaml:"force_use_checksum"` + ScanInterval time.Duration `yaml:"scan_interval"` +} + +type Sync struct { + db *sql.DB + conf Config + local, remote providers.CloudProvider + + log *log.Entry + + useChecksum bool + hashMethod hash.Hash + + stop chan struct{} +} + +func New(local, remote providers.CloudProvider, db *sql.DB, conf Config, logger *log.Entry) *Sync { + return &Sync{ + db: db, + conf: conf, + local: local, + remote: remote, + + log: logger, + + stop: make(chan struct{}), + } +} + +func (s *Sync) Run() error { + if err := s.initSchema(); err != nil { + return errors.Wrap(err, "Unable to initialize database schema") + } + + var refresh = time.NewTimer(s.conf.ScanInterval) + + for { + select { + case <-refresh.C: + if err := s.runSync(); err != nil { + return errors.Wrap(err, "Sync failed") + } + refresh.Reset(s.conf.ScanInterval) + + case <-s.stop: + return nil + } + } +} + +func (s *Sync) Stop() { s.stop <- struct{}{} } + +func (s *Sync) getFileInfo(f providers.File) (providers.FileInfo, error) { + var info = f.Info() + + if !s.useChecksum || info.Checksum != "" { + return info, nil + } + + cs, err := f.Checksum(s.hashMethod) + if err != nil { + return info, errors.Wrap(err, "Unable to fetch checksum") + } + info.Checksum = cs + + return info, nil +} + +func (s *Sync) fillStateFromProvider(syncState *state, provider providers.CloudProvider, side string) error { + files, err := provider.ListFiles() + if err != nil { + return errors.Wrap(err, "Unable to list files") + } + + for _, f := range files { + info, err := s.getFileInfo(f) + if err != nil { + return errors.Wrap(err, "Unable to get file info") + } + + syncState.Set(side, sourceScan, info) + } + + return nil +} + +func (s *Sync) runSync() error { + var syncState = newState() + s.hashMethod = s.remote.GetChecksumMethod() + s.useChecksum = s.remote.Capabilities().Has(providers.CapAutoChecksum) || s.conf.ForceUseChecksum + + if err := s.updateStateFromDatabase(syncState); err != nil { + return errors.Wrap(err, "Unable to load database state") + } + + if err := s.fillStateFromProvider(syncState, s.local, sideLocal); err != nil { + return errors.Wrap(err, "Unable to load local files") + } + + if err := s.fillStateFromProvider(syncState, s.remote, sideRemote); err != nil { + return errors.Wrap(err, "Unable to load remote files") + } + + for _, fileName := range syncState.GetRelativeNames() { + if err := s.decideAction(syncState, fileName); err != nil { + return errors.Wrap(err, "Could not execute sync") + } + } + + return nil +}