From 7ff74d6e1ad586b9b326825d6af6ad4e69871784 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Mon, 1 Apr 2024 13:02:07 +0200 Subject: [PATCH] Do some refactoring, add limiting of uploaders Signed-off-by: Knut Ahlers --- README.md | 6 +- main.go | 117 +++++++++++++++++++--------- pkg/config/config.go | 4 +- pkg/uploader/interface.go | 15 +++- pkg/uploader/sftp/sftp.go | 41 ++++++---- pkg/uploader/youtube/youtube.go | 17 ++-- pkg/vault-oauth2/vault.go | 2 + pkg/vault-oauth2/youtube/youtube.go | 10 ++- uploaders.go | 4 +- 9 files changed, 147 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index 24e3f8a..38f86a7 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,16 @@ Or you create a config for this tool and start one command which then concurrent uploaders: - - name: Example SFTP + myhost: + name: Example SFTP type: sftp settings: host: hostname:port path: /where/to/put/the/file user: someoneimportant - - name: Example Youtube + mychannel: + name: Example Youtube type: youtube settings: channel: the ID of your channel found in its link diff --git a/main.go b/main.go index 60e788a..ee7199d 100644 --- a/main.go +++ b/main.go @@ -13,15 +13,17 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/Luzifer/go_helpers/v2/str" "github.com/Luzifer/rconfig/v2" ) var ( cfg = struct { - Config string `flag:"config,c" default:"config.yaml" description:"Configuration for the uploaders"` - LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` - ValidateOnly bool `flag:"validate-only" default:"false" description:"Only execute validation and prepare functions, do not upload"` - VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` + Config string `flag:"config,c" default:"config.yaml" description:"Configuration for the uploaders"` + Limit []string `flag:"limit,l" description:"Limit to certain uploader IDs"` + LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` + ValidateOnly bool `flag:"validate-only" default:"false" description:"Only execute validation and prepare functions, do not upload"` + VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` }{} version = "dev" @@ -33,6 +35,10 @@ func initApp() error { return errors.Wrap(err, "parsing cli options") } + if len(cfg.Limit) == 1 && cfg.Limit[0] == "" { + cfg.Limit = nil + } + l, err := logrus.ParseLevel(cfg.LogLevel) if err != nil { return errors.Wrap(err, "parsing log-level") @@ -60,7 +66,7 @@ func main() { var ( ctx = context.Background() - wg sync.WaitGroup + wg = new(sync.WaitGroup) ) var ( @@ -68,26 +74,8 @@ func main() { longestPrefix int ) - for _, c := range configFile.Uploaders { - u := uploaderByName(c.Type) - if u == nil { - logrus.Fatalf("unknown uploader %q", c.Type) - } - - if err = u.ValidateConfig(c.Settings); err != nil { - logrus.WithError(err).Fatalf("validating config entry %q", c.Name) - } - - if err = u.Prepare(ctx, uploader.UploaderOpts{ - Name: c.Name, - Config: c.Settings, - }); err != nil { - logrus.WithError(err).Fatalf("preparing uploader %q", c.Name) - } - - if l := len(c.Name); l > longestPrefix { - longestPrefix = l - } + if longestPrefix, err = preflight(ctx, configFile); err != nil { + logrus.WithError(err).Fatal("executing preflight checks") } if cfg.ValidateOnly { @@ -98,19 +86,79 @@ func main() { logrus.Fatal("Usage: publish-vod ") } - fileName := rconfig.Args()[1] - f, err := os.Open(fileName) + if err = startUploads(ctx, rconfig.Args()[1], configFile, wg, barPool, longestPrefix); err != nil { + logrus.WithError(err).Fatal("starting uploads") + } + + if err = barPool.Start(); err != nil { + logrus.WithError(err).Fatal("starting progressbars") + } + + wg.Wait() + + if err = barPool.Stop(); err != nil { + logrus.WithError(err).Fatal("stopping progressbars") + } +} + +func preflight(ctx context.Context, configFile config.File) (longestPrefix int, err error) { + for id, c := range configFile.Uploaders { + if cfg.Limit != nil && !str.StringInSlice(id, cfg.Limit) { + continue + } + + logrus.WithField("id", id).Info("validating config...") + + u := uploaderByName(c.Type) + if u == nil { + return longestPrefix, fmt.Errorf("unknown uploader %q", c.Type) + } + + if err = u.ValidateConfig(c.Settings); err != nil { + return longestPrefix, fmt.Errorf("validating config entry %q: %w", c.Name, err) + } + + if err = u.Prepare(ctx, uploader.Opts{ + Name: c.Name, + Config: c.Settings, + }); err != nil { + return longestPrefix, fmt.Errorf("preparing uploader %q: %w", c.Name, err) + } + + if l := len(c.Name); l > longestPrefix { + longestPrefix = l + } + } + + return longestPrefix, nil +} + +func startUploads( + ctx context.Context, + fileName string, + configFile config.File, + wg *sync.WaitGroup, + barPool *pb.Pool, + longestPrefix int, +) (err error) { + f, err := os.Open(fileName) //#nosec G304 // Intended to open and upload arbitrary file if err != nil { - logrus.WithError(err).Fatal("opening VoD") + return fmt.Errorf("opening VoD: %w", err) } defer f.Close() //nolint:errcheck // File is closed by process exit stat, err := f.Stat() if err != nil { - logrus.WithError(err).Fatal("getting VoD stat") + return fmt.Errorf("getting VoD stat: %w", err) } - for i := range configFile.Uploaders { + for id := range configFile.Uploaders { + if cfg.Limit != nil && !str.StringInSlice(id, cfg.Limit) { + continue + } + + logrus.WithField("id", id).Info("starting upload...") + wg.Add(1) bar := pb.New64(stat.Size()) @@ -122,7 +170,7 @@ func main() { bar.Set("prefix", fmt.Sprintf(fmt.Sprintf("%%-%ds", longestPrefix), c.Name)) - if err = uploaderByName(c.Type).UploadFile(ctx, uploader.UploaderOpts{ + if err = uploaderByName(c.Type).UploadFile(ctx, uploader.Opts{ Name: c.Name, Config: c.Settings, @@ -137,11 +185,8 @@ func main() { }); err != nil { bar.SetTemplate(pb.ProgressBarTemplate(fmt.Sprintf(`{{ string . "prefix" }} ERR: %s`, err))) } - }(configFile.Uploaders[i], bar) - + }(configFile.Uploaders[id], bar) } - barPool.Start() - wg.Wait() - barPool.Stop() + return nil } diff --git a/pkg/config/config.go b/pkg/config/config.go index 933114d..1c8c94d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -13,7 +13,7 @@ import ( type ( // File represents the contents of a configuration file File struct { - Uploaders []UploaderConfig `yaml:"uploaders"` + Uploaders map[string]UploaderConfig `yaml:"uploaders"` } // UploaderConfig defines the settings for each uploader @@ -26,7 +26,7 @@ type ( // Load parses the configuration file from disk func Load(filename string) (config File, err error) { - f, err := os.Open(filename) + f, err := os.Open(filename) //#nosec G304 // Intended to open user-given config if err != nil { return config, fmt.Errorf("opening config file: %w", err) } diff --git a/pkg/uploader/interface.go b/pkg/uploader/interface.go index 49c6387..c3d7588 100644 --- a/pkg/uploader/interface.go +++ b/pkg/uploader/interface.go @@ -11,13 +11,22 @@ import ( ) type ( + // Uploader defines the interface to implement on new uploaders Uploader interface { - Prepare(ctx context.Context, opts UploaderOpts) error - UploadFile(ctx context.Context, opts UploaderOpts) error + // Prepare is used to check authorizations, re-authorize uploaders + // and do preflight-checks in before the real upload is started + Prepare(ctx context.Context, opts Opts) error + // UploadFile is executed after Prepare and ValidateConfig and + // can concentrate on uploading the file to the respective target + UploadFile(ctx context.Context, opts Opts) error + // ValidateConfig is called before Prepare and UploadFile and + // must make sure the settings given are complete and valid ValidateConfig(config *fieldcollection.FieldCollection) error } - UploaderOpts struct { + // Opts contains the require arguments for an Uploader + // to do its job + Opts struct { Name string Config *fieldcollection.FieldCollection diff --git a/pkg/uploader/sftp/sftp.go b/pkg/uploader/sftp/sftp.go index 681f0bc..c088c41 100644 --- a/pkg/uploader/sftp/sftp.go +++ b/pkg/uploader/sftp/sftp.go @@ -1,3 +1,4 @@ +// Package sftp contains the implementation for an SFTP uploader package sftp import ( @@ -9,6 +10,7 @@ import ( "git.luzifer.io/luzifer/publish-vod/pkg/uploader" "github.com/Luzifer/go_helpers/v2/fieldcollection" + "github.com/sirupsen/logrus" pkgSFTP "github.com/pkg/sftp" "golang.org/x/crypto/ssh" @@ -16,18 +18,15 @@ import ( ) type ( - Uploader struct{} + impl struct{} ) -var ( - ptrStringEmpty = func(v string) *string { return &v }("") +// New returns a SFTP Uploader +func New() uploader.Uploader { return impl{} } - _ uploader.Uploader = Uploader{} -) +func (impl) Prepare(context.Context, uploader.Opts) error { return nil } -func (Uploader) Prepare(context.Context, uploader.UploaderOpts) error { return nil } - -func (Uploader) UploadFile(ctx context.Context, opts uploader.UploaderOpts) error { +func (impl) UploadFile(_ context.Context, opts uploader.Opts) error { socket := os.Getenv("SSH_AUTH_SOCK") conn, err := net.Dial("unix", socket) if err != nil { @@ -42,38 +41,52 @@ func (Uploader) UploadFile(ctx context.Context, opts uploader.UploaderOpts) erro // agent once the remote server wants it. ssh.PublicKeysCallback(agentClient.Signers), }, - HostKeyCallback: ssh.InsecureIgnoreHostKey(), + HostKeyCallback: ssh.InsecureIgnoreHostKey(), //#nosec G106 } client, err := ssh.Dial("tcp", opts.Config.MustString("host", nil), config) if err != nil { return fmt.Errorf("dialing SSH server: %w", err) } - defer client.Close() + defer func() { + if err := client.Close(); err != nil { + logrus.WithError(err).Error("closing SSH client (leaked fd)") + } + }() sftpClient, err := pkgSFTP.NewClient(client) if err != nil { return fmt.Errorf("creating sftp client: %w", err) } - defer sftpClient.Close() + defer func() { + if err := sftpClient.Close(); err != nil { + logrus.WithError(err).Error("closing SFTP client (leaked fd)") + } + }() remoteFile := path.Join(opts.Config.MustString("path", nil), path.Base(opts.Filename)) f, err := sftpClient.OpenFile(remoteFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC) if err != nil { return fmt.Errorf("opening remote file: %w", err) } - defer f.Close() + defer func() { + if err := f.Close(); err != nil { + logrus.WithError(err).Error("closing SFTP file (leaked fd)") + } + }() progressReader := opts.ProgressBar.NewProxyReader(opts.Content) - f.ReadFrom(progressReader) + if _, err = f.ReadFrom(progressReader); err != nil { + return fmt.Errorf("uploading video: %w", err) + } opts.FinalMessage("Video uploaded to %s", remoteFile) return nil } -func (Uploader) ValidateConfig(config *fieldcollection.FieldCollection) error { +func (impl) ValidateConfig(config *fieldcollection.FieldCollection) error { for _, key := range []string{"host", "path", "user"} { if v, err := config.String(key); err != nil || v == "" { return fmt.Errorf("key %q must be non-empty string", key) diff --git a/pkg/uploader/youtube/youtube.go b/pkg/uploader/youtube/youtube.go index a9697ba..5b635c8 100644 --- a/pkg/uploader/youtube/youtube.go +++ b/pkg/uploader/youtube/youtube.go @@ -1,3 +1,5 @@ +// Package youtube contains an uploader to push VoDs to a Youtube +// channel package youtube import ( @@ -18,16 +20,15 @@ import ( ) type ( - Uploader struct{} + impl struct{} ) -var ( - ptrStringEmpty = func(v string) *string { return &v }("") +var ptrStringEmpty = func(v string) *string { return &v }("") - _ uploader.Uploader = Uploader{} -) +// New creates a new Youtube Uploader +func New() uploader.Uploader { return impl{} } -func (Uploader) Prepare(ctx context.Context, opts uploader.UploaderOpts) error { +func (impl) Prepare(ctx context.Context, opts uploader.Opts) error { _, ts, err := oauth4youtube.GetOAuth2Client(ctx, opts.Config.MustString("vaultKey", nil)) if err != nil { return fmt.Errorf("getting oauth credentials: %w", err) @@ -45,7 +46,7 @@ func (Uploader) Prepare(ctx context.Context, opts uploader.UploaderOpts) error { return nil } -func (Uploader) UploadFile(ctx context.Context, opts uploader.UploaderOpts) error { +func (impl) UploadFile(ctx context.Context, opts uploader.Opts) error { client, ts, err := oauth4youtube.GetOAuth2Client(ctx, opts.Config.MustString("vaultKey", nil)) if err != nil { return fmt.Errorf("getting oauth credentials: %w", err) @@ -103,7 +104,7 @@ func (Uploader) UploadFile(ctx context.Context, opts uploader.UploaderOpts) erro return nil } -func (Uploader) ValidateConfig(config *fieldcollection.FieldCollection) error { +func (impl) ValidateConfig(config *fieldcollection.FieldCollection) error { for _, key := range []string{"channel", "vaultKey"} { if v, err := config.String(key); err != nil || v == "" { return fmt.Errorf("key %q must be non-empty string", key) diff --git a/pkg/vault-oauth2/vault.go b/pkg/vault-oauth2/vault.go index 35d519d..76f2a40 100644 --- a/pkg/vault-oauth2/vault.go +++ b/pkg/vault-oauth2/vault.go @@ -1,3 +1,5 @@ +// Package vaultoauth2 supplies helper functions to store and update +// oauth2 client credentials and tokens in a Vault key package vaultoauth2 import ( diff --git a/pkg/vault-oauth2/youtube/youtube.go b/pkg/vault-oauth2/youtube/youtube.go index 06bddaf..6b6470f 100644 --- a/pkg/vault-oauth2/youtube/youtube.go +++ b/pkg/vault-oauth2/youtube/youtube.go @@ -1,3 +1,5 @@ +// Package youtube uses the vaultoauth2 package to create a Youtube +// client and authorize it package youtube import ( @@ -37,14 +39,18 @@ func GetOAuth2Client(ctx context.Context, vaultKey string) (client *http.Client, codeChan := make(chan string, 1) mux := http.NewServeMux() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { codeChan <- r.URL.Query().Get("code") }) + mux.HandleFunc("/", func(_ http.ResponseWriter, r *http.Request) { codeChan <- r.URL.Query().Get("code") }) server := &http.Server{ Addr: ":65285", Handler: mux, ReadHeaderTimeout: time.Second, } - go server.ListenAndServe() + go func() { + if err := server.ListenAndServe(); err != nil { + logrus.WithError(err).Fatal("creating listener for Youtube auth") + } + }() logrus.Warnf("Youtube is not authorized, please do so by visiting %s", c.AuthCodeURL("offline", oauth2.AccessTypeOffline)) diff --git a/uploaders.go b/uploaders.go index aaa6a0e..ee69e1e 100644 --- a/uploaders.go +++ b/uploaders.go @@ -9,10 +9,10 @@ import ( func uploaderByName(name string) uploader.Uploader { switch name { case "sftp": - return sftp.Uploader{} + return sftp.New() case "youtube": - return youtube.Uploader{} + return youtube.New() default: return nil