From ffd6c9bd098713f14503ca0cac72993a0a6c1251 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Mon, 31 Oct 2022 15:20:41 +0100 Subject: [PATCH] [customevent] Add scheduled custom events Signed-off-by: Knut Ahlers --- internal/apimodules/customevent/actor.go | 19 ++++++ .../apimodules/customevent/customevent.go | 42 +++++++++++- internal/apimodules/customevent/database.go | 61 +++++++++++++++++ .../apimodules/customevent/memoryCache.go | 68 +++++++++++++++++++ internal/apimodules/customevent/scheduler.go | 36 ++++++++++ main.go | 2 +- wiki/Actors.md | 4 ++ 7 files changed, 229 insertions(+), 3 deletions(-) create mode 100644 internal/apimodules/customevent/database.go create mode 100644 internal/apimodules/customevent/memoryCache.go create mode 100644 internal/apimodules/customevent/scheduler.go diff --git a/internal/apimodules/customevent/actor.go b/internal/apimodules/customevent/actor.go index ca67a6c..423cecb 100644 --- a/internal/apimodules/customevent/actor.go +++ b/internal/apimodules/customevent/actor.go @@ -2,6 +2,7 @@ package customevent import ( "strings" + "time" "github.com/go-irc/irc" "github.com/pkg/errors" @@ -23,6 +24,24 @@ func (a actor) Execute(c *irc.Client, m *irc.Message, r *plugins.Rule, eventData return false, errors.New("fields template evaluated to empty string") } + delayRaw, err := formatMessage(attrs.MustString("schedule_in", ptrStringEmpty), m, r, eventData) + if err != nil { + return false, errors.Wrap(err, "executing schedule_in template") + } + + if delay, err := time.ParseDuration(delayRaw); err == nil && delay > 0 { + fields, err := parseEvent(plugins.DeriveChannel(m, eventData), strings.NewReader(fd)) + if err != nil { + return false, errors.Wrap(err, "parsing fields data") + } + + if err = storeEvent(db, time.Now().Add(delay).UTC(), plugins.DeriveChannel(m, eventData), fields); err != nil { + return false, errors.Wrap(err, "storing event") + } + + return false, errors.Wrap(mc.Refresh(), "refreshing memory cache") + } + return false, errors.Wrap( triggerEvent(plugins.DeriveChannel(m, eventData), strings.NewReader(fd)), "triggering event", diff --git a/internal/apimodules/customevent/customevent.go b/internal/apimodules/customevent/customevent.go index 526078e..68dde81 100644 --- a/internal/apimodules/customevent/customevent.go +++ b/internal/apimodules/customevent/customevent.go @@ -2,6 +2,7 @@ package customevent import ( "encoding/json" + "fmt" "io" "net/http" "strings" @@ -9,17 +10,27 @@ import ( "github.com/gorilla/mux" "github.com/pkg/errors" + "github.com/Luzifer/twitch-bot/v2/pkg/database" "github.com/Luzifer/twitch-bot/v2/plugins" ) const actorName = "customevent" var ( + db database.Connector eventCreatorFunc plugins.EventHandlerFunc formatMessage plugins.MsgFormatter + mc *memoryCache ) func Register(args plugins.RegistrationArguments) error { + db = args.GetDatabaseConnector() + if err := db.DB().AutoMigrate(&storedCustomEvent{}); err != nil { + return errors.Wrap(err, "applying schema migration") + } + + mc = &memoryCache{dbc: db} + eventCreatorFunc = args.CreateEvent formatMessage = args.FormatMessage @@ -40,6 +51,15 @@ func Register(args plugins.RegistrationArguments) error { SupportTemplate: true, Type: plugins.ActionDocumentationFieldTypeString, }, + { + Default: "", + Description: "Time until the event is triggered (must be valid duration like 1h, 1h1m, 10s, ...)", + Key: "schedule_in", + Name: "Schedule In", + Optional: true, + SupportTemplate: true, + Type: plugins.ActionDocumentationFieldTypeString, + }, }, }) @@ -60,6 +80,15 @@ func Register(args plugins.RegistrationArguments) error { }, }) + for schedule, fn := range map[string]func(){ + fmt.Sprintf("@every %s", cleanupTimeout): scheduleCleanup, + "* * * * * *": scheduleSend, + } { + if _, err := args.RegisterCron(schedule, fn); err != nil { + return errors.Wrap(err, "registering cron function") + } + } + return nil } @@ -80,16 +109,25 @@ func handleCreateEvent(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func triggerEvent(channel string, fieldData io.Reader) error { +func parseEvent(channel string, fieldData io.Reader) (*plugins.FieldCollection, error) { payload := make(map[string]any) if err := json.NewDecoder(fieldData).Decode(&payload); err != nil { - return errors.Wrap(err, "parsing event payload") + return nil, errors.Wrap(err, "parsing event payload") } fields := plugins.FieldCollectionFromData(payload) fields.Set("channel", "#"+strings.TrimLeft(channel, "#")) + return fields, nil +} + +func triggerEvent(channel string, fieldData io.Reader) error { + fields, err := parseEvent(channel, fieldData) + if err != nil { + return errors.Wrap(err, "parsing fields") + } + if err := eventCreatorFunc("custom", fields); err != nil { return errors.Wrap(err, "creating event") } diff --git a/internal/apimodules/customevent/database.go b/internal/apimodules/customevent/database.go new file mode 100644 index 0000000..1279298 --- /dev/null +++ b/internal/apimodules/customevent/database.go @@ -0,0 +1,61 @@ +package customevent + +import ( + "bytes" + "encoding/json" + "time" + + "github.com/gofrs/uuid/v3" + "github.com/pkg/errors" + + "github.com/Luzifer/twitch-bot/v2/pkg/database" + "github.com/Luzifer/twitch-bot/v2/plugins" +) + +const cleanupTimeout = 15 * time.Minute + +type ( + storedCustomEvent struct { + ID string `gorm:"primaryKey"` + Channel string + Fields string + ScheduledAt time.Time + } +) + +func cleanupStoredEvents(db database.Connector) error { + return errors.Wrap( + db.DB(). + Where("scheduled_at < ?", time.Now().Add(cleanupTimeout*-1).UTC()). + Delete(&storedCustomEvent{}). + Error, + "deleting past events", + ) +} + +func getFutureEvents(db database.Connector) (out []storedCustomEvent, err error) { + return out, errors.Wrap( + db.DB(). + Where("scheduled_at >= ?", time.Now().UTC()). + Find(&out). + Error, + "getting events from database", + ) +} + +func storeEvent(db database.Connector, scheduleAt time.Time, channel string, fields *plugins.FieldCollection) error { + fieldBuf := new(bytes.Buffer) + if err := json.NewEncoder(fieldBuf).Encode(fields); err != nil { + return errors.Wrap(err, "marshalling fields") + } + + return errors.Wrap( + db.DB().Create(storedCustomEvent{ + ID: uuid.Must(uuid.NewV4()).String(), + Channel: channel, + Fields: fieldBuf.String(), + ScheduledAt: scheduleAt, + }).Error, + "storing event", + ) +} diff --git a/internal/apimodules/customevent/memoryCache.go b/internal/apimodules/customevent/memoryCache.go new file mode 100644 index 0000000..5a722d0 --- /dev/null +++ b/internal/apimodules/customevent/memoryCache.go @@ -0,0 +1,68 @@ +package customevent + +import ( + "sync" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "github.com/Luzifer/twitch-bot/v2/pkg/database" +) + +const memoryCacheRefreshInterval = 5 * time.Minute + +type ( + memoryCache struct { + events []storedCustomEvent + validUntil time.Time + + dbc database.Connector + lock sync.Mutex + } +) + +func (m *memoryCache) PopEventsToExecute() ([]storedCustomEvent, error) { + m.lock.Lock() + defer m.lock.Unlock() + + if m.validUntil.Before(time.Now()) { + if err := m.refresh(); err != nil { + return nil, errors.Wrap(err, "refreshing stale cache") + } + } + + var ( + execEvents, storeEvents []storedCustomEvent + now = time.Now() + ) + for i := range m.events { + evt := m.events[i] + if evt.ScheduledAt.After(now) { + storeEvents = append(storeEvents, evt) + continue + } + + execEvents = append(execEvents, evt) + } + + m.events = storeEvents + return execEvents, nil +} + +func (m *memoryCache) Refresh() (err error) { + m.lock.Lock() + defer m.lock.Unlock() + + return m.refresh() +} + +func (m *memoryCache) refresh() (err error) { + if m.events, err = getFutureEvents(m.dbc); err != nil { + return errors.Wrap(err, "fetching events from database") + } + + m.validUntil = time.Now().Add(memoryCacheRefreshInterval) + logrus.WithField("event_count", len(m.events)).Trace("loaded stored events from database") + return nil +} diff --git a/internal/apimodules/customevent/scheduler.go b/internal/apimodules/customevent/scheduler.go new file mode 100644 index 0000000..caebc1c --- /dev/null +++ b/internal/apimodules/customevent/scheduler.go @@ -0,0 +1,36 @@ +package customevent + +import ( + "strings" + + "github.com/sirupsen/logrus" +) + +func scheduleCleanup() { + if err := cleanupStoredEvents(db); err != nil { + logrus.WithError(err).Error("executing custom event database cleanup") + } +} + +func scheduleSend() { + evts, err := mc.PopEventsToExecute() + if err != nil { + logrus.WithError(err).Error("collecting scheduled custom events for sending") + return + } + + for i := range evts { + go func(evt storedCustomEvent) { + evtData, err := parseEvent(evt.Channel, strings.NewReader(evt.Fields)) + if err != nil { + logrus.WithError(err).Error("parsing fields in stored event") + return + } + + if err = eventCreatorFunc("custom", evtData); err != nil { + logrus.WithError(err).Error("triggering stored event") + return + } + }(evts[i]) + } +} diff --git a/main.go b/main.go index fb8aa31..a6225f9 100644 --- a/main.go +++ b/main.go @@ -220,7 +220,7 @@ func main() { log.WithError(err).Fatal("Unable to apply timer migration") } - cronService = cron.New() + cronService = cron.New(cron.WithSeconds()) if twitchClient, err = accessService.GetBotTwitchClient(access.ClientConfig{ TwitchClient: cfg.TwitchClient, TwitchClientSecret: cfg.TwitchClientSecret, diff --git a/wiki/Actors.md b/wiki/Actors.md index 8cea0ee..5102a4a 100644 --- a/wiki/Actors.md +++ b/wiki/Actors.md @@ -25,6 +25,10 @@ Create a custom event # Optional: false # Type: string (Supports Templating) fields: "{}" + # Time until the event is triggered (must be valid duration like 1h, 1h1m, 10s, ...) + # Optional: true + # Type: string (Supports Templating) + schedule_in: "" ``` ## Delay