[customevent] Add scheduled custom events

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2022-10-31 15:20:41 +01:00
parent 19f54d910d
commit ffd6c9bd09
Signed by: luzifer
GPG key ID: D91C3E91E4CAD6F5
7 changed files with 229 additions and 3 deletions

View file

@ -2,6 +2,7 @@ package customevent
import ( import (
"strings" "strings"
"time"
"github.com/go-irc/irc" "github.com/go-irc/irc"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -23,6 +24,24 @@ func (a actor) Execute(c *irc.Client, m *irc.Message, r *plugins.Rule, eventData
return false, errors.New("fields template evaluated to empty string") return false, errors.New("fields template evaluated to empty string")
} }
delayRaw, err := formatMessage(attrs.MustString("schedule_in", ptrStringEmpty), m, r, eventData)
if err != nil {
return false, errors.Wrap(err, "executing schedule_in template")
}
if delay, err := time.ParseDuration(delayRaw); err == nil && delay > 0 {
fields, err := parseEvent(plugins.DeriveChannel(m, eventData), strings.NewReader(fd))
if err != nil {
return false, errors.Wrap(err, "parsing fields data")
}
if err = storeEvent(db, time.Now().Add(delay).UTC(), plugins.DeriveChannel(m, eventData), fields); err != nil {
return false, errors.Wrap(err, "storing event")
}
return false, errors.Wrap(mc.Refresh(), "refreshing memory cache")
}
return false, errors.Wrap( return false, errors.Wrap(
triggerEvent(plugins.DeriveChannel(m, eventData), strings.NewReader(fd)), triggerEvent(plugins.DeriveChannel(m, eventData), strings.NewReader(fd)),
"triggering event", "triggering event",

View file

@ -2,6 +2,7 @@ package customevent
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"net/http" "net/http"
"strings" "strings"
@ -9,17 +10,27 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/Luzifer/twitch-bot/v2/pkg/database"
"github.com/Luzifer/twitch-bot/v2/plugins" "github.com/Luzifer/twitch-bot/v2/plugins"
) )
const actorName = "customevent" const actorName = "customevent"
var ( var (
db database.Connector
eventCreatorFunc plugins.EventHandlerFunc eventCreatorFunc plugins.EventHandlerFunc
formatMessage plugins.MsgFormatter formatMessage plugins.MsgFormatter
mc *memoryCache
) )
func Register(args plugins.RegistrationArguments) error { func Register(args plugins.RegistrationArguments) error {
db = args.GetDatabaseConnector()
if err := db.DB().AutoMigrate(&storedCustomEvent{}); err != nil {
return errors.Wrap(err, "applying schema migration")
}
mc = &memoryCache{dbc: db}
eventCreatorFunc = args.CreateEvent eventCreatorFunc = args.CreateEvent
formatMessage = args.FormatMessage formatMessage = args.FormatMessage
@ -40,6 +51,15 @@ func Register(args plugins.RegistrationArguments) error {
SupportTemplate: true, SupportTemplate: true,
Type: plugins.ActionDocumentationFieldTypeString, Type: plugins.ActionDocumentationFieldTypeString,
}, },
{
Default: "",
Description: "Time until the event is triggered (must be valid duration like 1h, 1h1m, 10s, ...)",
Key: "schedule_in",
Name: "Schedule In",
Optional: true,
SupportTemplate: true,
Type: plugins.ActionDocumentationFieldTypeString,
},
}, },
}) })
@ -60,6 +80,15 @@ func Register(args plugins.RegistrationArguments) error {
}, },
}) })
for schedule, fn := range map[string]func(){
fmt.Sprintf("@every %s", cleanupTimeout): scheduleCleanup,
"* * * * * *": scheduleSend,
} {
if _, err := args.RegisterCron(schedule, fn); err != nil {
return errors.Wrap(err, "registering cron function")
}
}
return nil return nil
} }
@ -80,16 +109,25 @@ func handleCreateEvent(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
func triggerEvent(channel string, fieldData io.Reader) error { func parseEvent(channel string, fieldData io.Reader) (*plugins.FieldCollection, error) {
payload := make(map[string]any) payload := make(map[string]any)
if err := json.NewDecoder(fieldData).Decode(&payload); err != nil { if err := json.NewDecoder(fieldData).Decode(&payload); err != nil {
return errors.Wrap(err, "parsing event payload") return nil, errors.Wrap(err, "parsing event payload")
} }
fields := plugins.FieldCollectionFromData(payload) fields := plugins.FieldCollectionFromData(payload)
fields.Set("channel", "#"+strings.TrimLeft(channel, "#")) fields.Set("channel", "#"+strings.TrimLeft(channel, "#"))
return fields, nil
}
func triggerEvent(channel string, fieldData io.Reader) error {
fields, err := parseEvent(channel, fieldData)
if err != nil {
return errors.Wrap(err, "parsing fields")
}
if err := eventCreatorFunc("custom", fields); err != nil { if err := eventCreatorFunc("custom", fields); err != nil {
return errors.Wrap(err, "creating event") return errors.Wrap(err, "creating event")
} }

View file

@ -0,0 +1,61 @@
package customevent
import (
"bytes"
"encoding/json"
"time"
"github.com/gofrs/uuid/v3"
"github.com/pkg/errors"
"github.com/Luzifer/twitch-bot/v2/pkg/database"
"github.com/Luzifer/twitch-bot/v2/plugins"
)
const cleanupTimeout = 15 * time.Minute
type (
storedCustomEvent struct {
ID string `gorm:"primaryKey"`
Channel string
Fields string
ScheduledAt time.Time
}
)
func cleanupStoredEvents(db database.Connector) error {
return errors.Wrap(
db.DB().
Where("scheduled_at < ?", time.Now().Add(cleanupTimeout*-1).UTC()).
Delete(&storedCustomEvent{}).
Error,
"deleting past events",
)
}
func getFutureEvents(db database.Connector) (out []storedCustomEvent, err error) {
return out, errors.Wrap(
db.DB().
Where("scheduled_at >= ?", time.Now().UTC()).
Find(&out).
Error,
"getting events from database",
)
}
func storeEvent(db database.Connector, scheduleAt time.Time, channel string, fields *plugins.FieldCollection) error {
fieldBuf := new(bytes.Buffer)
if err := json.NewEncoder(fieldBuf).Encode(fields); err != nil {
return errors.Wrap(err, "marshalling fields")
}
return errors.Wrap(
db.DB().Create(storedCustomEvent{
ID: uuid.Must(uuid.NewV4()).String(),
Channel: channel,
Fields: fieldBuf.String(),
ScheduledAt: scheduleAt,
}).Error,
"storing event",
)
}

View file

@ -0,0 +1,68 @@
package customevent
import (
"sync"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/Luzifer/twitch-bot/v2/pkg/database"
)
const memoryCacheRefreshInterval = 5 * time.Minute
type (
memoryCache struct {
events []storedCustomEvent
validUntil time.Time
dbc database.Connector
lock sync.Mutex
}
)
func (m *memoryCache) PopEventsToExecute() ([]storedCustomEvent, error) {
m.lock.Lock()
defer m.lock.Unlock()
if m.validUntil.Before(time.Now()) {
if err := m.refresh(); err != nil {
return nil, errors.Wrap(err, "refreshing stale cache")
}
}
var (
execEvents, storeEvents []storedCustomEvent
now = time.Now()
)
for i := range m.events {
evt := m.events[i]
if evt.ScheduledAt.After(now) {
storeEvents = append(storeEvents, evt)
continue
}
execEvents = append(execEvents, evt)
}
m.events = storeEvents
return execEvents, nil
}
func (m *memoryCache) Refresh() (err error) {
m.lock.Lock()
defer m.lock.Unlock()
return m.refresh()
}
func (m *memoryCache) refresh() (err error) {
if m.events, err = getFutureEvents(m.dbc); err != nil {
return errors.Wrap(err, "fetching events from database")
}
m.validUntil = time.Now().Add(memoryCacheRefreshInterval)
logrus.WithField("event_count", len(m.events)).Trace("loaded stored events from database")
return nil
}

View file

@ -0,0 +1,36 @@
package customevent
import (
"strings"
"github.com/sirupsen/logrus"
)
func scheduleCleanup() {
if err := cleanupStoredEvents(db); err != nil {
logrus.WithError(err).Error("executing custom event database cleanup")
}
}
func scheduleSend() {
evts, err := mc.PopEventsToExecute()
if err != nil {
logrus.WithError(err).Error("collecting scheduled custom events for sending")
return
}
for i := range evts {
go func(evt storedCustomEvent) {
evtData, err := parseEvent(evt.Channel, strings.NewReader(evt.Fields))
if err != nil {
logrus.WithError(err).Error("parsing fields in stored event")
return
}
if err = eventCreatorFunc("custom", evtData); err != nil {
logrus.WithError(err).Error("triggering stored event")
return
}
}(evts[i])
}
}

View file

@ -220,7 +220,7 @@ func main() {
log.WithError(err).Fatal("Unable to apply timer migration") log.WithError(err).Fatal("Unable to apply timer migration")
} }
cronService = cron.New() cronService = cron.New(cron.WithSeconds())
if twitchClient, err = accessService.GetBotTwitchClient(access.ClientConfig{ if twitchClient, err = accessService.GetBotTwitchClient(access.ClientConfig{
TwitchClient: cfg.TwitchClient, TwitchClient: cfg.TwitchClient,
TwitchClientSecret: cfg.TwitchClientSecret, TwitchClientSecret: cfg.TwitchClientSecret,

View file

@ -25,6 +25,10 @@ Create a custom event
# Optional: false # Optional: false
# Type: string (Supports Templating) # Type: string (Supports Templating)
fields: "{}" fields: "{}"
# Time until the event is triggered (must be valid duration like 1h, 1h1m, 10s, ...)
# Optional: true
# Type: string (Supports Templating)
schedule_in: ""
``` ```
## Delay ## Delay