publish-vod/main.go
Knut Ahlers 2b991407d7
Fix: Do not close the file before uploads finish
Signed-off-by: Knut Ahlers <knut@ahlers.me>
2024-04-02 13:19:36 +02:00

200 lines
4.7 KiB
Go

package main
import (
"context"
"fmt"
"io"
"os"
"sync"
"git.luzifer.io/luzifer/publish-vod/pkg/config"
"git.luzifer.io/luzifer/publish-vod/pkg/uploader"
"github.com/cheggaaa/pb/v3"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/Luzifer/go_helpers/v2/str"
"github.com/Luzifer/rconfig/v2"
)
var (
cfg = struct {
Config string `flag:"config,c" default:"config.yaml" description:"Configuration for the uploaders"`
Limit []string `flag:"limit,l" description:"Limit to certain uploader IDs"`
LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"`
ValidateOnly bool `flag:"validate-only" default:"false" description:"Only execute validation and prepare functions, do not upload"`
VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"`
}{}
version = "dev"
)
func initApp() error {
rconfig.AutoEnv(true)
if err := rconfig.ParseAndValidate(&cfg); err != nil {
return errors.Wrap(err, "parsing cli options")
}
if len(cfg.Limit) == 1 && cfg.Limit[0] == "" || len(cfg.Limit) == 0 {
cfg.Limit = nil
}
l, err := logrus.ParseLevel(cfg.LogLevel)
if err != nil {
return errors.Wrap(err, "parsing log-level")
}
logrus.SetLevel(l)
return nil
}
func main() {
var err error
if err = initApp(); err != nil {
logrus.WithError(err).Fatal("initializing app")
}
if cfg.VersionAndExit {
logrus.WithField("version", version).Info("publish-vod")
os.Exit(0)
}
configFile, err := config.Load(cfg.Config)
if err != nil {
logrus.WithError(err).Fatal("loading config")
}
var (
ctx = context.Background()
wg = new(sync.WaitGroup)
)
var (
barPool = pb.NewPool()
longestPrefix int
)
if longestPrefix, err = preflight(ctx, configFile); err != nil {
logrus.WithError(err).Fatal("executing preflight checks")
}
if cfg.ValidateOnly {
return
}
if len(rconfig.Args()) < 2 { //nolint:gomnd
logrus.Fatal("Usage: publish-vod <filename>")
}
fileName := rconfig.Args()[1]
f, err := os.Open(fileName) //#nosec G304 // Intended to open and upload arbitrary file
if err != nil {
logrus.WithError(err).Fatal("opening VoD")
}
defer f.Close() //nolint:errcheck // File is closed by process exit
stat, err := f.Stat()
if err != nil {
logrus.WithError(err).Fatal("getting VoD stat")
}
if err = startUploads(
ctx, wg, barPool, longestPrefix, configFile,
fileName, f, stat.Size(),
); err != nil {
logrus.WithError(err).Fatal("starting uploads")
}
if err = barPool.Start(); err != nil {
logrus.WithError(err).Fatal("starting progressbars")
}
wg.Wait()
if err = barPool.Stop(); err != nil {
logrus.WithError(err).Fatal("stopping progressbars")
}
}
func preflight(ctx context.Context, configFile config.File) (longestPrefix int, err error) {
for id, c := range configFile.Uploaders {
if cfg.Limit != nil && !str.StringInSlice(id, cfg.Limit) {
continue
}
logrus.WithField("id", id).Info("validating config...")
u := uploaderByName(c.Type)
if u == nil {
return longestPrefix, fmt.Errorf("unknown uploader %q", c.Type)
}
if err = u.ValidateConfig(c.Settings); err != nil {
return longestPrefix, fmt.Errorf("validating config entry %q: %w", c.Name, err)
}
if err = u.Prepare(ctx, uploader.Opts{
Name: c.Name,
Config: c.Settings,
}); err != nil {
return longestPrefix, fmt.Errorf("preparing uploader %q: %w", c.Name, err)
}
if l := len(c.Name); l > longestPrefix {
longestPrefix = l
}
}
return longestPrefix, nil
}
func startUploads(
ctx context.Context,
wg *sync.WaitGroup,
barPool *pb.Pool,
longestPrefix int,
configFile config.File,
fileName string,
fileReader io.ReaderAt,
fileSize int64,
) (err error) {
for id := range configFile.Uploaders {
if cfg.Limit != nil && !str.StringInSlice(id, cfg.Limit) {
continue
}
logrus.WithField("id", id).Info("starting upload...")
wg.Add(1)
bar := pb.New64(fileSize)
bar.SetTemplate(pb.Full)
barPool.Add(bar)
go func(c config.UploaderConfig, bar *pb.ProgressBar) {
defer wg.Done()
bar.Set("prefix", fmt.Sprintf(fmt.Sprintf("%%-%ds", longestPrefix), c.Name))
if err = uploaderByName(c.Type).UploadFile(ctx, uploader.Opts{
Name: c.Name,
Config: c.Settings,
Filename: fileName,
Content: io.NewSectionReader(fileReader, 0, fileSize),
Size: fileSize,
ProgressBar: bar,
FinalMessage: func(format string, opts ...any) {
bar.SetTemplate(pb.ProgressBarTemplate(fmt.Sprintf(`{{ string . "prefix" }} %s`, fmt.Sprintf(format, opts...))))
},
}); err != nil {
bar.SetTemplate(pb.ProgressBarTemplate(fmt.Sprintf(`{{ string . "prefix" }} ERR: %s`, err)))
}
}(configFile.Uploaders[id], bar)
}
return nil
}