Do some refactoring, add limiting of uploaders

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2024-04-01 13:02:07 +02:00
parent 0b11c9d450
commit 7ff74d6e1a
Signed by: luzifer
SSH key fingerprint: SHA256:/xtE5lCgiRDQr8SLxHMS92ZBlACmATUmF1crK16Ks4E
9 changed files with 147 additions and 69 deletions

View file

@ -11,14 +11,16 @@ Or you create a config for this tool and start one command which then concurrent
uploaders: uploaders:
- name: Example SFTP myhost:
name: Example SFTP
type: sftp type: sftp
settings: settings:
host: hostname:port host: hostname:port
path: /where/to/put/the/file path: /where/to/put/the/file
user: someoneimportant user: someoneimportant
- name: Example Youtube mychannel:
name: Example Youtube
type: youtube type: youtube
settings: settings:
channel: the ID of your channel found in its link channel: the ID of your channel found in its link

109
main.go
View file

@ -13,12 +13,14 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/Luzifer/go_helpers/v2/str"
"github.com/Luzifer/rconfig/v2" "github.com/Luzifer/rconfig/v2"
) )
var ( var (
cfg = struct { cfg = struct {
Config string `flag:"config,c" default:"config.yaml" description:"Configuration for the uploaders"` Config string `flag:"config,c" default:"config.yaml" description:"Configuration for the uploaders"`
Limit []string `flag:"limit,l" description:"Limit to certain uploader IDs"`
LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"`
ValidateOnly bool `flag:"validate-only" default:"false" description:"Only execute validation and prepare functions, do not upload"` ValidateOnly bool `flag:"validate-only" default:"false" description:"Only execute validation and prepare functions, do not upload"`
VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"`
@ -33,6 +35,10 @@ func initApp() error {
return errors.Wrap(err, "parsing cli options") return errors.Wrap(err, "parsing cli options")
} }
if len(cfg.Limit) == 1 && cfg.Limit[0] == "" {
cfg.Limit = nil
}
l, err := logrus.ParseLevel(cfg.LogLevel) l, err := logrus.ParseLevel(cfg.LogLevel)
if err != nil { if err != nil {
return errors.Wrap(err, "parsing log-level") return errors.Wrap(err, "parsing log-level")
@ -60,7 +66,7 @@ func main() {
var ( var (
ctx = context.Background() ctx = context.Background()
wg sync.WaitGroup wg = new(sync.WaitGroup)
) )
var ( var (
@ -68,26 +74,8 @@ func main() {
longestPrefix int longestPrefix int
) )
for _, c := range configFile.Uploaders { if longestPrefix, err = preflight(ctx, configFile); err != nil {
u := uploaderByName(c.Type) logrus.WithError(err).Fatal("executing preflight checks")
if u == nil {
logrus.Fatalf("unknown uploader %q", c.Type)
}
if err = u.ValidateConfig(c.Settings); err != nil {
logrus.WithError(err).Fatalf("validating config entry %q", c.Name)
}
if err = u.Prepare(ctx, uploader.UploaderOpts{
Name: c.Name,
Config: c.Settings,
}); err != nil {
logrus.WithError(err).Fatalf("preparing uploader %q", c.Name)
}
if l := len(c.Name); l > longestPrefix {
longestPrefix = l
}
} }
if cfg.ValidateOnly { if cfg.ValidateOnly {
@ -98,19 +86,79 @@ func main() {
logrus.Fatal("Usage: publish-vod <filename>") logrus.Fatal("Usage: publish-vod <filename>")
} }
fileName := rconfig.Args()[1] if err = startUploads(ctx, rconfig.Args()[1], configFile, wg, barPool, longestPrefix); err != nil {
f, err := os.Open(fileName) logrus.WithError(err).Fatal("starting uploads")
}
if err = barPool.Start(); err != nil {
logrus.WithError(err).Fatal("starting progressbars")
}
wg.Wait()
if err = barPool.Stop(); err != nil {
logrus.WithError(err).Fatal("stopping progressbars")
}
}
func preflight(ctx context.Context, configFile config.File) (longestPrefix int, err error) {
for id, c := range configFile.Uploaders {
if cfg.Limit != nil && !str.StringInSlice(id, cfg.Limit) {
continue
}
logrus.WithField("id", id).Info("validating config...")
u := uploaderByName(c.Type)
if u == nil {
return longestPrefix, fmt.Errorf("unknown uploader %q", c.Type)
}
if err = u.ValidateConfig(c.Settings); err != nil {
return longestPrefix, fmt.Errorf("validating config entry %q: %w", c.Name, err)
}
if err = u.Prepare(ctx, uploader.Opts{
Name: c.Name,
Config: c.Settings,
}); err != nil {
return longestPrefix, fmt.Errorf("preparing uploader %q: %w", c.Name, err)
}
if l := len(c.Name); l > longestPrefix {
longestPrefix = l
}
}
return longestPrefix, nil
}
func startUploads(
ctx context.Context,
fileName string,
configFile config.File,
wg *sync.WaitGroup,
barPool *pb.Pool,
longestPrefix int,
) (err error) {
f, err := os.Open(fileName) //#nosec G304 // Intended to open and upload arbitrary file
if err != nil { if err != nil {
logrus.WithError(err).Fatal("opening VoD") return fmt.Errorf("opening VoD: %w", err)
} }
defer f.Close() //nolint:errcheck // File is closed by process exit defer f.Close() //nolint:errcheck // File is closed by process exit
stat, err := f.Stat() stat, err := f.Stat()
if err != nil { if err != nil {
logrus.WithError(err).Fatal("getting VoD stat") return fmt.Errorf("getting VoD stat: %w", err)
} }
for i := range configFile.Uploaders { for id := range configFile.Uploaders {
if cfg.Limit != nil && !str.StringInSlice(id, cfg.Limit) {
continue
}
logrus.WithField("id", id).Info("starting upload...")
wg.Add(1) wg.Add(1)
bar := pb.New64(stat.Size()) bar := pb.New64(stat.Size())
@ -122,7 +170,7 @@ func main() {
bar.Set("prefix", fmt.Sprintf(fmt.Sprintf("%%-%ds", longestPrefix), c.Name)) bar.Set("prefix", fmt.Sprintf(fmt.Sprintf("%%-%ds", longestPrefix), c.Name))
if err = uploaderByName(c.Type).UploadFile(ctx, uploader.UploaderOpts{ if err = uploaderByName(c.Type).UploadFile(ctx, uploader.Opts{
Name: c.Name, Name: c.Name,
Config: c.Settings, Config: c.Settings,
@ -137,11 +185,8 @@ func main() {
}); err != nil { }); err != nil {
bar.SetTemplate(pb.ProgressBarTemplate(fmt.Sprintf(`{{ string . "prefix" }} ERR: %s`, err))) bar.SetTemplate(pb.ProgressBarTemplate(fmt.Sprintf(`{{ string . "prefix" }} ERR: %s`, err)))
} }
}(configFile.Uploaders[i], bar) }(configFile.Uploaders[id], bar)
} }
barPool.Start() return nil
wg.Wait()
barPool.Stop()
} }

View file

@ -13,7 +13,7 @@ import (
type ( type (
// File represents the contents of a configuration file // File represents the contents of a configuration file
File struct { File struct {
Uploaders []UploaderConfig `yaml:"uploaders"` Uploaders map[string]UploaderConfig `yaml:"uploaders"`
} }
// UploaderConfig defines the settings for each uploader // UploaderConfig defines the settings for each uploader
@ -26,7 +26,7 @@ type (
// Load parses the configuration file from disk // Load parses the configuration file from disk
func Load(filename string) (config File, err error) { func Load(filename string) (config File, err error) {
f, err := os.Open(filename) f, err := os.Open(filename) //#nosec G304 // Intended to open user-given config
if err != nil { if err != nil {
return config, fmt.Errorf("opening config file: %w", err) return config, fmt.Errorf("opening config file: %w", err)
} }

View file

@ -11,13 +11,22 @@ import (
) )
type ( type (
// Uploader defines the interface to implement on new uploaders
Uploader interface { Uploader interface {
Prepare(ctx context.Context, opts UploaderOpts) error // Prepare is used to check authorizations, re-authorize uploaders
UploadFile(ctx context.Context, opts UploaderOpts) error // and do preflight-checks in before the real upload is started
Prepare(ctx context.Context, opts Opts) error
// UploadFile is executed after Prepare and ValidateConfig and
// can concentrate on uploading the file to the respective target
UploadFile(ctx context.Context, opts Opts) error
// ValidateConfig is called before Prepare and UploadFile and
// must make sure the settings given are complete and valid
ValidateConfig(config *fieldcollection.FieldCollection) error ValidateConfig(config *fieldcollection.FieldCollection) error
} }
UploaderOpts struct { // Opts contains the require arguments for an Uploader
// to do its job
Opts struct {
Name string Name string
Config *fieldcollection.FieldCollection Config *fieldcollection.FieldCollection

View file

@ -1,3 +1,4 @@
// Package sftp contains the implementation for an SFTP uploader
package sftp package sftp
import ( import (
@ -9,6 +10,7 @@ import (
"git.luzifer.io/luzifer/publish-vod/pkg/uploader" "git.luzifer.io/luzifer/publish-vod/pkg/uploader"
"github.com/Luzifer/go_helpers/v2/fieldcollection" "github.com/Luzifer/go_helpers/v2/fieldcollection"
"github.com/sirupsen/logrus"
pkgSFTP "github.com/pkg/sftp" pkgSFTP "github.com/pkg/sftp"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
@ -16,18 +18,15 @@ import (
) )
type ( type (
Uploader struct{} impl struct{}
) )
var ( // New returns a SFTP Uploader
ptrStringEmpty = func(v string) *string { return &v }("") func New() uploader.Uploader { return impl{} }
_ uploader.Uploader = Uploader{} func (impl) Prepare(context.Context, uploader.Opts) error { return nil }
)
func (Uploader) Prepare(context.Context, uploader.UploaderOpts) error { return nil } func (impl) UploadFile(_ context.Context, opts uploader.Opts) error {
func (Uploader) UploadFile(ctx context.Context, opts uploader.UploaderOpts) error {
socket := os.Getenv("SSH_AUTH_SOCK") socket := os.Getenv("SSH_AUTH_SOCK")
conn, err := net.Dial("unix", socket) conn, err := net.Dial("unix", socket)
if err != nil { if err != nil {
@ -42,38 +41,52 @@ func (Uploader) UploadFile(ctx context.Context, opts uploader.UploaderOpts) erro
// agent once the remote server wants it. // agent once the remote server wants it.
ssh.PublicKeysCallback(agentClient.Signers), ssh.PublicKeysCallback(agentClient.Signers),
}, },
HostKeyCallback: ssh.InsecureIgnoreHostKey(), HostKeyCallback: ssh.InsecureIgnoreHostKey(), //#nosec G106
} }
client, err := ssh.Dial("tcp", opts.Config.MustString("host", nil), config) client, err := ssh.Dial("tcp", opts.Config.MustString("host", nil), config)
if err != nil { if err != nil {
return fmt.Errorf("dialing SSH server: %w", err) return fmt.Errorf("dialing SSH server: %w", err)
} }
defer client.Close() defer func() {
if err := client.Close(); err != nil {
logrus.WithError(err).Error("closing SSH client (leaked fd)")
}
}()
sftpClient, err := pkgSFTP.NewClient(client) sftpClient, err := pkgSFTP.NewClient(client)
if err != nil { if err != nil {
return fmt.Errorf("creating sftp client: %w", err) return fmt.Errorf("creating sftp client: %w", err)
} }
defer sftpClient.Close() defer func() {
if err := sftpClient.Close(); err != nil {
logrus.WithError(err).Error("closing SFTP client (leaked fd)")
}
}()
remoteFile := path.Join(opts.Config.MustString("path", nil), path.Base(opts.Filename)) remoteFile := path.Join(opts.Config.MustString("path", nil), path.Base(opts.Filename))
f, err := sftpClient.OpenFile(remoteFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC) f, err := sftpClient.OpenFile(remoteFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC)
if err != nil { if err != nil {
return fmt.Errorf("opening remote file: %w", err) return fmt.Errorf("opening remote file: %w", err)
} }
defer f.Close() defer func() {
if err := f.Close(); err != nil {
logrus.WithError(err).Error("closing SFTP file (leaked fd)")
}
}()
progressReader := opts.ProgressBar.NewProxyReader(opts.Content) progressReader := opts.ProgressBar.NewProxyReader(opts.Content)
f.ReadFrom(progressReader) if _, err = f.ReadFrom(progressReader); err != nil {
return fmt.Errorf("uploading video: %w", err)
}
opts.FinalMessage("Video uploaded to %s", remoteFile) opts.FinalMessage("Video uploaded to %s", remoteFile)
return nil return nil
} }
func (Uploader) ValidateConfig(config *fieldcollection.FieldCollection) error { func (impl) ValidateConfig(config *fieldcollection.FieldCollection) error {
for _, key := range []string{"host", "path", "user"} { for _, key := range []string{"host", "path", "user"} {
if v, err := config.String(key); err != nil || v == "" { if v, err := config.String(key); err != nil || v == "" {
return fmt.Errorf("key %q must be non-empty string", key) return fmt.Errorf("key %q must be non-empty string", key)

View file

@ -1,3 +1,5 @@
// Package youtube contains an uploader to push VoDs to a Youtube
// channel
package youtube package youtube
import ( import (
@ -18,16 +20,15 @@ import (
) )
type ( type (
Uploader struct{} impl struct{}
) )
var ( var ptrStringEmpty = func(v string) *string { return &v }("")
ptrStringEmpty = func(v string) *string { return &v }("")
_ uploader.Uploader = Uploader{} // New creates a new Youtube Uploader
) func New() uploader.Uploader { return impl{} }
func (Uploader) Prepare(ctx context.Context, opts uploader.UploaderOpts) error { func (impl) Prepare(ctx context.Context, opts uploader.Opts) error {
_, ts, err := oauth4youtube.GetOAuth2Client(ctx, opts.Config.MustString("vaultKey", nil)) _, ts, err := oauth4youtube.GetOAuth2Client(ctx, opts.Config.MustString("vaultKey", nil))
if err != nil { if err != nil {
return fmt.Errorf("getting oauth credentials: %w", err) return fmt.Errorf("getting oauth credentials: %w", err)
@ -45,7 +46,7 @@ func (Uploader) Prepare(ctx context.Context, opts uploader.UploaderOpts) error {
return nil return nil
} }
func (Uploader) UploadFile(ctx context.Context, opts uploader.UploaderOpts) error { func (impl) UploadFile(ctx context.Context, opts uploader.Opts) error {
client, ts, err := oauth4youtube.GetOAuth2Client(ctx, opts.Config.MustString("vaultKey", nil)) client, ts, err := oauth4youtube.GetOAuth2Client(ctx, opts.Config.MustString("vaultKey", nil))
if err != nil { if err != nil {
return fmt.Errorf("getting oauth credentials: %w", err) return fmt.Errorf("getting oauth credentials: %w", err)
@ -103,7 +104,7 @@ func (Uploader) UploadFile(ctx context.Context, opts uploader.UploaderOpts) erro
return nil return nil
} }
func (Uploader) ValidateConfig(config *fieldcollection.FieldCollection) error { func (impl) ValidateConfig(config *fieldcollection.FieldCollection) error {
for _, key := range []string{"channel", "vaultKey"} { for _, key := range []string{"channel", "vaultKey"} {
if v, err := config.String(key); err != nil || v == "" { if v, err := config.String(key); err != nil || v == "" {
return fmt.Errorf("key %q must be non-empty string", key) return fmt.Errorf("key %q must be non-empty string", key)

View file

@ -1,3 +1,5 @@
// Package vaultoauth2 supplies helper functions to store and update
// oauth2 client credentials and tokens in a Vault key
package vaultoauth2 package vaultoauth2
import ( import (

View file

@ -1,3 +1,5 @@
// Package youtube uses the vaultoauth2 package to create a Youtube
// client and authorize it
package youtube package youtube
import ( import (
@ -37,14 +39,18 @@ func GetOAuth2Client(ctx context.Context, vaultKey string) (client *http.Client,
codeChan := make(chan string, 1) codeChan := make(chan string, 1)
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { codeChan <- r.URL.Query().Get("code") }) mux.HandleFunc("/", func(_ http.ResponseWriter, r *http.Request) { codeChan <- r.URL.Query().Get("code") })
server := &http.Server{ server := &http.Server{
Addr: ":65285", Addr: ":65285",
Handler: mux, Handler: mux,
ReadHeaderTimeout: time.Second, ReadHeaderTimeout: time.Second,
} }
go server.ListenAndServe() go func() {
if err := server.ListenAndServe(); err != nil {
logrus.WithError(err).Fatal("creating listener for Youtube auth")
}
}()
logrus.Warnf("Youtube is not authorized, please do so by visiting %s", c.AuthCodeURL("offline", oauth2.AccessTypeOffline)) logrus.Warnf("Youtube is not authorized, please do so by visiting %s", c.AuthCodeURL("offline", oauth2.AccessTypeOffline))

View file

@ -9,10 +9,10 @@ import (
func uploaderByName(name string) uploader.Uploader { func uploaderByName(name string) uploader.Uploader {
switch name { switch name {
case "sftp": case "sftp":
return sftp.Uploader{} return sftp.New()
case "youtube": case "youtube":
return youtube.Uploader{} return youtube.New()
default: default:
return nil return nil