From c2687bc39f9c75a61de86c8fd546fe53e51f18c2 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Sat, 25 Feb 2023 15:00:48 +0100 Subject: [PATCH] Add additional storage provider redis Signed-off-by: Knut Ahlers --- README.md | 10 +++++ go.mod | 3 ++ go.sum | 11 +++++ main.go | 55 ++++++++++++----------- store.go | 128 ++++++++++++++++++++++++++++++++++++++++++++++-------- 5 files changed, 163 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 63a32b8..a3d3296 100644 --- a/README.md +++ b/README.md @@ -11,3 +11,13 @@ One of my personal use-cases for this is to automatically parse payment receipts received from Twitch and enter the corresponding transactions into my accounting software. In the end this software provides you with a possibility to match any mail you receive by their headers and execute a script which is able to act on those mails. The script is provided with a JSON representation of the mail on `stdin` and can yield commands (for example "mark as read", "move to mailbox", ...) to `stdout` which then will be executed on the mail. + +## Storage types + +- **Local File** + - `--storage-type=file` + - `--storage-dsn=path/to/file.yaml` +- **Redis** + - `--storage-type=redis` + - `--storage-dsn=redis://:@:/` + - `REDIS_KEY_PREFIX=myprefix` (default: `io.luzifer.automail`) diff --git a/go.mod b/go.mod index f96e17e..0734d14 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/Luzifer/rconfig/v2 v2.4.0 github.com/emersion/go-imap v1.2.1 + github.com/go-redis/redis/v8 v8.11.5 github.com/jhillyerd/enmime v0.10.1 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.0 @@ -13,6 +14,8 @@ require ( require ( github.com/cention-sany/utf7 v0.0.0-20170124080048-26cad61bd60a // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/emersion/go-sasl v0.0.0-20220912192320-0145f2c60ead // indirect github.com/gogs/chardet v0.0.0-20211120154057-b7413eaefb8f // indirect github.com/jaytaylor/html2text v0.0.0-20211105163654-bc68cce691ba // indirect diff --git a/go.sum b/go.sum index fdb9824..26e7427 100644 --- a/go.sum +++ b/go.sum @@ -2,9 +2,13 @@ github.com/Luzifer/rconfig/v2 v2.4.0 h1:MAdymTlExAZ8mx5VG8xOFAtFQSpWBipKYQHPOmYT github.com/Luzifer/rconfig/v2 v2.4.0/go.mod h1:hWF3ZVSusbYlg5bEvCwalEyUSY+0JPJWUiIu7rBmav8= github.com/cention-sany/utf7 v0.0.0-20170124080048-26cad61bd60a h1:MISbI8sU/PSK/ztvmWKFcI7UGb5/HQT7B+i3a2myKgI= github.com/cention-sany/utf7 v0.0.0-20170124080048-26cad61bd60a/go.mod h1:2GxOXOlEPAMFPfp014mK1SWq8G8BN8o7/dfYqJrVGn8= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/emersion/go-imap v1.2.1 h1:+s9ZjMEjOB8NzZMVTM3cCenz2JrQIGGo5j1df19WjTA= github.com/emersion/go-imap v1.2.1/go.mod h1:Qlx1FSx2FTxjnjWpIlVNEuX+ylerZQNFE5NsmKFSejY= github.com/emersion/go-message v0.15.0/go.mod h1:wQUEfE+38+7EW8p8aZ96ptg6bAb1iwdgej19uXASlE4= @@ -12,6 +16,9 @@ github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21/go.mod h1:iL2twTe github.com/emersion/go-sasl v0.0.0-20220912192320-0145f2c60ead h1:fI1Jck0vUrXT8bnphprS1EoVRe2Q5CKCX8iDlpqjQ/Y= github.com/emersion/go-sasl v0.0.0-20220912192320-0145f2c60ead/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ= github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594/go.mod h1:aqO8z8wPrjkscevZJFVE1wXJrLpC5LtJG7fqLOsPb2U= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M= github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8= github.com/gogs/chardet v0.0.0-20191104214054-4b6791f73a28/go.mod h1:Pcatq5tYkCW2Q6yrR2VRHlbHpZ/R4/7qyL1TCF7vl14= @@ -28,8 +35,11 @@ github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -63,6 +73,7 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/validator.v2 v2.0.1 h1:xF0KWyGWXm/LM2G1TrEjqOu4pa6coO9AlWSf3msVfDY= gopkg.in/validator.v2 v2.0.1/go.mod h1:lIUZBlB3Im4s/eYp39Ry/wkR02yOPhZ9IwIRBjuPuG8= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/main.go b/main.go index 274744e..d4f5515 100644 --- a/main.go +++ b/main.go @@ -25,7 +25,8 @@ var ( IMAPPass string `flag:"imap-pass,p" default:"" description:"Password to access the IMAP server" validate:"nonzero"` LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` Mailbox string `flag:"mailbox,m" default:"INBOX" description:"Mailbox to fetch from"` - StorageFile string `flag:"storage-file" default:"store.yaml" description:"Where to store persistent info"` + StorageType string `flag:"storage-type" default:"file" description:"Driver to use for storing persistent info"` + StorageDSN string `flag:"storage-dsn" default:"store.yaml" description:"Where to store persistent info"` VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` }{} @@ -35,7 +36,7 @@ var ( func init() { rconfig.AutoEnv(true) if err := rconfig.ParseAndValidate(&cfg); err != nil { - log.Fatalf("Unable to parse commandline options: %s", err) + log.WithError(err).Fatalf("parsing commandline options") } if cfg.VersionAndExit { @@ -44,7 +45,7 @@ func init() { } if l, err := log.ParseLevel(cfg.LogLevel); err != nil { - log.WithError(err).Fatal("Unable to parse log level") + log.WithError(err).Fatal("parsing log level") } else { log.SetLevel(l) } @@ -53,17 +54,21 @@ func init() { func main() { bodySection, err := imap.ParseBodySectionName("BODY[]") if err != nil { - log.WithError(err).Fatal("Unable to parse body section") + log.WithError(err).Fatal("parsing body section") } conf, err := loadConfig() if err != nil { - log.WithError(err).Fatal("Unable to load config") + log.WithError(err).Fatal("loading config") } - store, err := loadStorage() + store, err := newStorage(cfg.StorageType, cfg.StorageDSN) if err != nil { - log.WithError(err).Fatal("Unable to load storage file") + log.WithError(err).Fatal("creating storage interface") + } + + if err = store.Load(); err != nil { + log.WithError(err).Fatal("loading persistent storage data") } var ( @@ -86,17 +91,17 @@ func main() { imapClient, err = client.DialTLS(fmt.Sprintf("%s:%d", cfg.IMAPHost, cfg.IMAPPort), nil) if err != nil { - log.WithError(err).Fatal("Unable to connect to IMAP server") + log.WithError(err).Fatal("connecting to IMAP server") } if err = imapClient.Login(cfg.IMAPUser, cfg.IMAPPass); err != nil { - log.WithError(err).Fatal("Unable to login to IMAP server") + log.WithError(err).Fatal("loggin in to IMAP server") } log.Info("IMAP connected and logged in") if _, err = imapClient.Select(cfg.Mailbox, false); err != nil { - log.WithError(err).Fatal("Unable to select mailbox") + log.WithError(err).Fatal("selecting mailbox") } go func() { @@ -107,13 +112,13 @@ func main() { case <-ticker.C: if _, err := imapClient.Select(cfg.Mailbox, false); err != nil { - log.WithError(err).Error("Unable to select mailbox") + log.WithError(err).Error("selecting mailbox") continue } - seq, err := imap.ParseSeqSet(fmt.Sprintf("%d:*", store.LastUID+1)) + seq, err := imap.ParseSeqSet(fmt.Sprintf("%d:*", store.GetLastUID()+1)) if err != nil { - log.WithError(err).Error("Unable to parse sequence set") + log.WithError(err).Error("parsing sequence set") continue } @@ -121,7 +126,7 @@ func main() { Uid: seq, }) if err != nil { - log.WithError(err).Error("Unable to search for messages") + log.WithError(err).Error("searching for messages") continue } @@ -129,10 +134,10 @@ func main() { continue } - var tmpMsg = make(chan *imap.Message) + tmpMsg := make(chan *imap.Message) go func() { for msg := range tmpMsg { - if msg.Uid <= store.LastUID { + if msg.Uid <= store.GetLastUID() { continue } messages <- msg @@ -147,7 +152,7 @@ func main() { imap.FetchItem("BODY.PEEK[]"), imap.FetchUid, }, tmpMsg); err != nil { - log.WithError(err).Error("Unable to fetch messages") + log.WithError(err).Error("fetching messages") continue } @@ -163,7 +168,7 @@ func main() { mail, err := enmime.ReadEnvelope(body) if err != nil { - log.WithError(err).Error("Unable to parse message") + log.WithError(err).Error("parsing message") continue } @@ -175,19 +180,19 @@ func main() { // Check all handlers whether they want to handle the message for _, hdl := range conf.Handlers { if hdl.Handles(mail) { - go func(msg *imap.Message) { + go func(msg *imap.Message, hdl mailHandler) { if err := hdl.Process(imapClient, msg, mail); err != nil { - log.WithError(err).Error("Error while processing message") + log.WithError(err).Error("processing message") } - }(msg) + }(msg, hdl) } } // Mark message as processed in store - if msg.Uid > store.LastUID { - store.LastUID = msg.Uid - if err = store.saveStorage(); err != nil { - log.WithError(err).Error("Unable to save storage") + if msg.Uid > store.GetLastUID() { + store.SetUID(msg.Uid) + if err = store.Save(); err != nil { + log.WithError(err).Error("saving storage") } } diff --git a/store.go b/store.go index 188480b..3c5684a 100644 --- a/store.go +++ b/store.go @@ -1,43 +1,133 @@ package main import ( + "context" + "encoding/json" "os" "path" + "strings" + "github.com/go-redis/redis/v8" "github.com/pkg/errors" "gopkg.in/yaml.v2" ) -type storage struct { - LastUID uint32 -} +const redisKeyPrefix = "io.luzifer.automail" -func loadStorage() (*storage, error) { - var out = &storage{} - - if _, err := os.Stat(cfg.StorageFile); os.IsNotExist(err) { - return out, nil +type ( + fileStorage struct { + LastUID uint32 + filename string } - f, err := os.Open(cfg.StorageFile) + redisStorage struct { + LastUID uint32 + client *redis.Client + } + + storage interface { + GetLastUID() uint32 + Load() error + Save() error + SetUID(uint32) + } +) + +func newStorage(sType, dsn string) (storage, error) { + switch sType { + case "file": + return &fileStorage{filename: dsn}, nil + + case "redis": + return newRedisStorage(dsn) + + default: + return nil, errors.Errorf("invalid storage type %q", sType) + } +} + +// --- Storage implementation: File + +func (f fileStorage) GetLastUID() uint32 { return f.LastUID } + +func (f *fileStorage) Load() error { + if _, err := os.Stat(f.filename); os.IsNotExist(err) { + return nil + } + + sf, err := os.Open(f.filename) if err != nil { - return nil, errors.Wrap(err, "Failed to open storage file") + return errors.Wrap(err, "opening storage file") } - defer f.Close() + defer sf.Close() - return out, errors.Wrap(yaml.NewDecoder(f).Decode(out), "Unable to decode storage file") + return errors.Wrap(yaml.NewDecoder(sf).Decode(f), "decoding storage file") } -func (s storage) saveStorage() error { - if err := os.MkdirAll(path.Dir(cfg.StorageFile), 0700); err != nil { - return errors.Wrap(err, "Unable to ensure directory for storage file") +func (f fileStorage) Save() error { + if err := os.MkdirAll(path.Dir(f.filename), 0o700); err != nil { + return errors.Wrap(err, "ensuring directory for storage file") } - f, err := os.Create(cfg.StorageFile) + sf, err := os.Create(f.filename) if err != nil { - return errors.Wrap(err, "Unable to create storage file") + return errors.Wrap(err, "creating storage file") } - defer f.Close() + defer sf.Close() - return errors.Wrap(yaml.NewEncoder(f).Encode(s), "Unable to encode storage file") + return errors.Wrap(yaml.NewEncoder(sf).Encode(f), "encoding storage file") +} + +func (f *fileStorage) SetUID(uid uint32) { f.LastUID = uid } + +// --- Storage implementation: Redis + +func newRedisStorage(dsn string) (*redisStorage, error) { + opts, err := redis.ParseURL(dsn) + if err != nil { + return nil, errors.Wrap(err, "parsing storage DSN") + } + + out := &redisStorage{} + out.client = redis.NewClient(opts) + + return out, nil +} + +func (r redisStorage) GetLastUID() uint32 { return r.LastUID } + +func (r *redisStorage) Load() error { + data, err := r.client.Get(context.Background(), r.key()).Bytes() + if err != nil { + if errors.Is(err, redis.Nil) { + return nil + } + + return errors.Wrap(err, "loading persistent data from redis") + } + + return errors.Wrap(json.Unmarshal(data, r), "decoding storage object") +} + +func (r redisStorage) Save() error { + data, err := json.Marshal(r) + if err != nil { + return errors.Wrap(err, "marshalling storage object") + } + + return errors.Wrap( + r.client.Set(context.Background(), r.key(), data, 0).Err(), + "saving persistent data to redis", + ) +} + +func (r *redisStorage) SetUID(uid uint32) { r.LastUID = uid } + +func (redisStorage) key() string { + prefix := redisKeyPrefix + if v := os.Getenv("REDIS_KEY_PREFIX"); v != "" { + prefix = v + } + + return strings.Join([]string{prefix, "store"}, ":") }