diff --git a/Makefile b/Makefile index 71d35b1..fbb905d 100644 --- a/Makefile +++ b/Makefile @@ -56,7 +56,7 @@ trivy: # -- Documentation Site -- -docs: actor_docs template_docs +docs: actor_docs eventclient_docs template_docs actor_docs: go run . --storage-conn-string $(shell mktemp).db actor-docs >docs/content/configuration/actors.md diff --git a/docs/content/configuration/actors.md b/docs/content/configuration/actors.md index db5e5f7..4b29946 100644 --- a/docs/content/configuration/actors.md +++ b/docs/content/configuration/actors.md @@ -502,7 +502,7 @@ Scans for links in the message and adds the "links" field to the event data ```yaml - type: linkdetector attributes: - # Enable heuristic scans to find links with spaces or other means of obfuscation in them + # Enable heuristic scans to find links with spaces or other means of obfuscation in them (quite slow and will detect MANY false-positive links, only use for blacklisting links!) # Optional: true # Type: bool heuristic: false diff --git a/docs/content/configuration/templating.md b/docs/content/configuration/templating.md index d0f8ad4..40205ca 100644 --- a/docs/content/configuration/templating.md +++ b/docs/content/configuration/templating.md @@ -441,7 +441,7 @@ Example: ``` # Your int this hour: {{ printf "%.0f" (mulf (seededRandom (list "int" .username (now | date "2006-01-02 15") | join ":")) 100) }}% -< Your int this hour: 73% +< Your int this hour: 66% ``` ### `streamUptime` diff --git a/docs/content/overlays/eventclient.md b/docs/content/overlays/eventclient.md index 49afa12..8e779b7 100644 --- a/docs/content/overlays/eventclient.md +++ b/docs/content/overlays/eventclient.md @@ -17,6 +17,9 @@ weight: 10000
Options : Object

Options to pass to the EventClient constructor

+
SocketMessage : Object
+

SocketMessage received for every event and passed to the new (eventObj) => { ... } handlers

+
@@ -31,6 +34,7 @@ EventClient abstracts the connection to the bot websocket for events * [.apiBase()](#EventClient+apiBase) ⇒ string * [.paramOptionFallback(key, [fallback])](#EventClient+paramOptionFallback) ⇒ \* * [.renderTemplate(template)](#EventClient+renderTemplate) ⇒ Promise + * [.replayEvent(eventId)](#EventClient+replayEvent) ⇒ Promise @@ -74,6 +78,18 @@ Renders a given template using the bots msgformat API (supports all templating y | --- | --- | --- | | template | string | The template to render | + + +### eventClient.replayEvent(eventId) ⇒ Promise +Triggers a replay of the given event to all overlays currently listening for events. This event will have the `is_live` flag set to `false`. + +**Kind**: instance method of [EventClient](#EventClient) +**Returns**: Promise - Promise of the fetch request + +| Param | Type | Description | +| --- | --- | --- | +| eventId | Number | The ID of the event received through the SocketMessage object | + ## Options : Object @@ -84,9 +100,26 @@ Options to pass to the EventClient constructor | Name | Type | Default | Description | | --- | --- | --- | --- | -| [channel] | string | | Filter for specific channel events (format: `#channel`) | -| [handlers] | Object | {} | Map event types to callback functions `(event, fields, time, live) => {...}` | -| [maxReplayAge] | number | -1 | Number of hours to replay the events for (-1 = infinite) | -| [replay] | boolean | false | Request a replay at connect (requires channel to be set to a channel name) | -| [token] | string | | API access token to use to connect to the WebSocket (if not set, must be provided through URL hash) | +| [channel] | String | | Filter for specific channel events (format: `#channel`) | +| [handlers] | Object | {} | Map event types to callback functions `(eventObj) => { ... }` (new) or `(event, fields, time, live) => {...}` (old) | +| [maxReplayAge] | Number | -1 | Number of hours to replay the events for (-1 = infinite) | +| [replay] | Boolean | false | Request a replay at connect (requires channel to be set to a channel name) | +| [token] | String | | API access token to use to connect to the WebSocket (if not set, must be provided through URL hash) | + + + +## SocketMessage : Object +SocketMessage received for every event and passed to the new `(eventObj) => { ... }` handlers + +**Kind**: global typedef +**Properties** + +| Name | Type | Description | +| --- | --- | --- | +| [event_id] | Number | UID of the event used to re-trigger an event | +| [is_live] | Boolean | Whether the event was sent through a replay (false) or occurred live (true) | +| [reason] | String | Reason of this message (one of `bulk-replay`, `live-event`, `single-replay`) | +| [time] | String | RFC3339 timestamp of the event | +| [type] | String | Event type (i.e. `raid`, `sub`, ...) | +| [fields] | Object | string->any mapping of fields available for the event | diff --git a/internal/apimodules/overlays/database.go b/internal/apimodules/overlays/database.go index fa49657..45925ff 100644 --- a/internal/apimodules/overlays/database.go +++ b/internal/apimodules/overlays/database.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" "gorm.io/gorm" + "github.com/Luzifer/go_helpers/v2/backoff" "github.com/Luzifer/twitch-bot/v3/internal/helpers" "github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/plugins" @@ -24,23 +25,26 @@ type ( } ) -func AddChannelEvent(db database.Connector, channel string, evt SocketMessage) error { +func AddChannelEvent(db database.Connector, channel string, evt SocketMessage) (evtID uint64, err error) { buf := new(bytes.Buffer) if err := json.NewEncoder(buf).Encode(evt.Fields); err != nil { - return errors.Wrap(err, "encoding fields") + return 0, errors.Wrap(err, "encoding fields") } - return errors.Wrap( - helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { - return tx.Create(&overlaysEvent{ - Channel: channel, - CreatedAt: evt.Time.UTC(), - EventType: evt.Type, - Fields: strings.TrimSpace(buf.String()), - }).Error - }), - "storing event to database", - ) + storEvt := &overlaysEvent{ + Channel: channel, + CreatedAt: evt.Time.UTC(), + EventType: evt.Type, + Fields: strings.TrimSpace(buf.String()), + } + + if err = helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { + return tx.Create(storEvt).Error + }); err != nil { + return 0, errors.Wrap(err, "storing event to database") + } + + return storEvt.ID, nil } func GetChannelEvents(db database.Connector, channel string) ([]SocketMessage, error) { @@ -54,18 +58,44 @@ func GetChannelEvents(db database.Connector, channel string) ([]SocketMessage, e var out []SocketMessage for _, e := range evts { - fields := new(plugins.FieldCollection) - if err := json.NewDecoder(strings.NewReader(e.Fields)).Decode(fields); err != nil { - return nil, errors.Wrap(err, "decoding fields") + sm, err := e.ToSocketMessage() + if err != nil { + return nil, errors.Wrap(err, "transforming event") } - out = append(out, SocketMessage{ - IsLive: false, - Time: e.CreatedAt, - Type: e.EventType, - Fields: fields, - }) + out = append(out, sm) } return out, nil } + +func GetEventByID(db database.Connector, eventID uint64) (SocketMessage, error) { + var evt overlaysEvent + + if err := helpers.Retry(func() (err error) { + err = db.DB().Where("id = ?", eventID).First(&evt).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return backoff.NewErrCannotRetry(err) + } + return err + }); err != nil { + return SocketMessage{}, errors.Wrap(err, "fetching event") + } + + return evt.ToSocketMessage() +} + +func (o overlaysEvent) ToSocketMessage() (SocketMessage, error) { + fields := new(plugins.FieldCollection) + if err := json.NewDecoder(strings.NewReader(o.Fields)).Decode(fields); err != nil { + return SocketMessage{}, errors.Wrap(err, "decoding fields") + } + + return SocketMessage{ + EventID: o.ID, + IsLive: false, + Time: o.CreatedAt, + Type: o.EventType, + Fields: fields, + }, nil +} diff --git a/internal/apimodules/overlays/database_test.go b/internal/apimodules/overlays/database_test.go index 84521dd..0b2ec84 100644 --- a/internal/apimodules/overlays/database_test.go +++ b/internal/apimodules/overlays/database_test.go @@ -17,7 +17,8 @@ func TestEventDatabaseRoundtrip(t *testing.T) { var ( channel = "#test" - tEvent1 = time.Now() + evtID uint64 + tEvent1 = time.Now().UTC() tEvent2 = tEvent1.Add(time.Second) ) @@ -25,30 +26,46 @@ func TestEventDatabaseRoundtrip(t *testing.T) { assert.NoError(t, err, "getting events on empty db") assert.Zero(t, evts, "expect no events on empty db") - assert.NoError(t, AddChannelEvent(dbc, channel, SocketMessage{ + evtID, err = AddChannelEvent(dbc, channel, SocketMessage{ IsLive: true, Time: tEvent2, Type: "event 2", Fields: plugins.FieldCollectionFromData(map[string]any{"foo": "bar"}), - }), "adding second event") + }) + assert.Equal(t, uint64(1), evtID) + assert.NoError(t, err, "adding second event") - assert.NoError(t, AddChannelEvent(dbc, channel, SocketMessage{ + evtID, err = AddChannelEvent(dbc, channel, SocketMessage{ IsLive: true, Time: tEvent1, Type: "event 1", Fields: plugins.FieldCollectionFromData(map[string]any{"foo": "bar"}), - }), "adding first event") + }) + assert.Equal(t, uint64(2), evtID) + assert.NoError(t, err, "adding first event") - assert.NoError(t, AddChannelEvent(dbc, "#otherchannel", SocketMessage{ + evtID, err = AddChannelEvent(dbc, "#otherchannel", SocketMessage{ IsLive: true, Time: tEvent1, Type: "event", Fields: plugins.FieldCollectionFromData(map[string]any{"foo": "bar"}), - }), "adding other channel event") + }) + assert.Equal(t, uint64(3), evtID) + assert.NoError(t, err, "adding other channel event") evts, err = GetChannelEvents(dbc, channel) assert.NoError(t, err, "getting events") assert.Len(t, evts, 2, "expect 2 events") assert.Less(t, evts[0].Time, evts[1].Time, "expect sorting") + + evt, err := GetEventByID(dbc, 2) + assert.NoError(t, err) + assert.Equal(t, SocketMessage{ + EventID: 2, + IsLive: false, + Time: tEvent1, + Type: "event 1", + Fields: plugins.FieldCollectionFromData(map[string]any{"foo": "bar"}), + }, evt) } diff --git a/internal/apimodules/overlays/default/debug.html b/internal/apimodules/overlays/default/debug.html index 6263018..8fd39c2 100644 --- a/internal/apimodules/overlays/default/debug.html +++ b/internal/apimodules/overlays/default/debug.html @@ -30,9 +30,10 @@
- + +
TimeEventFields
TimeReasonEventFields
{{ moment(event.time).format('YYYY-MM-DD HH:mm:ss') }}{{ event.reason }} {{ event.event }} { - if (window.botClient.paramOptionFallback('hide', '').split(',').includes(evt)) { + _: ({ fields, reason, time, type }) => { + if (window.botClient.paramOptionFallback('hide', '').split(',').includes(type)) { return } this.events = [ - { event: evt, fields: data, time }, + { event: type, fields, reason, time }, ...this.events, ] }, diff --git a/internal/apimodules/overlays/default/eventclient.js b/internal/apimodules/overlays/default/eventclient.js index 709a25b..724658d 100644 --- a/internal/apimodules/overlays/default/eventclient.js +++ b/internal/apimodules/overlays/default/eventclient.js @@ -1,11 +1,22 @@ /** * Options to pass to the EventClient constructor * @typedef {Object} Options - * @prop {string} [channel] - Filter for specific channel events (format: `#channel`) - * @prop {Object} [handlers={}] - Map event types to callback functions `(event, fields, time, live) => {...}` - * @prop {number} [maxReplayAge=-1] - Number of hours to replay the events for (-1 = infinite) - * @prop {boolean} [replay=false] - Request a replay at connect (requires channel to be set to a channel name) - * @prop {string} [token] - API access token to use to connect to the WebSocket (if not set, must be provided through URL hash) + * @prop {String} [channel] - Filter for specific channel events (format: `#channel`) + * @prop {Object} [handlers={}] - Map event types to callback functions `(eventObj) => { ... }` (new) or `(event, fields, time, live) => {...}` (old) + * @prop {Number} [maxReplayAge=-1] - Number of hours to replay the events for (-1 = infinite) + * @prop {Boolean} [replay=false] - Request a replay at connect (requires channel to be set to a channel name) + * @prop {String} [token] - API access token to use to connect to the WebSocket (if not set, must be provided through URL hash) + */ + +/** + * SocketMessage received for every event and passed to the new `(eventObj) => { ... }` handlers + * @typedef {Object} SocketMessage + * @prop {Number} [event_id] - UID of the event used to re-trigger an event + * @prop {Boolean} [is_live] - Whether the event was sent through a replay (false) or occurred live (true) + * @prop {String} [reason] - Reason of this message (one of `bulk-replay`, `live-event`, `single-replay`) + * @prop {String} [time] - RFC3339 timestamp of the event + * @prop {String} [type] - Event type (i.e. `raid`, `sub`, ...) + * @prop {Object} [fields] - string->any mapping of fields available for the event */ const HOUR = 3600 * 1000 @@ -24,7 +35,7 @@ class EventClient { * @param {Options} opts Options for the EventClient */ constructor(opts) { - this.params = new URLSearchParams(window.location.hash.substr(1)) + this.params = new URLSearchParams(window.location.hash.substring(1)) this.handlers = { ...opts.handlers || {} } this.options = { ...opts } @@ -52,7 +63,7 @@ class EventClient { * @returns {string} API base URL */ apiBase() { - return window.location.href.substr(0, window.location.href.indexOf('/overlays/')) + return window.location.href.substring(0, window.location.href.indexOf('/overlays/')) } /** @@ -88,7 +99,7 @@ class EventClient { } for (const fn of [this.handlers[data.type], this.handlers._].filter(fn => fn)) { - fn(data.type, data.fields, new Date(data.time), data.is_live) + fn.length === 1 ? fn({ ...data, time: new Date(data.time) }) : fn(data.type, data.fields, new Date(data.time), data.is_live) } } @@ -125,7 +136,7 @@ class EventClient { for (const msg of data) { for (const fn of [this.handlers[msg.type], this.handlers._].filter(fn => fn)) { - handlers.push(fn(msg.type, msg.fields, new Date(msg.time), msg.is_live)) + handlers.push(fn.length === 1 ? fn({ ...msg, time: new Date(msg.time) }) : fn(msg.type, msg.fields, new Date(msg.time), msg.is_live)) } } @@ -159,6 +170,21 @@ class EventClient { .then(resp => resp.text()) } + /** + * Triggers a replay of the given event to all overlays currently listening for events. This event will have the `is_live` flag set to `false`. + * + * @param {Number} eventId The ID of the event received through the SocketMessage object + * @returns {Promise} Promise of the fetch request + */ + replayEvent(eventId) { + return fetch(`${this.apiBase()}/overlays/event/${eventId}/replay`, { + headers: { + authorization: this.paramOptionFallback('token'), + }, + method: 'PUT', + }) + } + /** * Modifies the overlay address to the websocket address the bot listens to * diff --git a/internal/apimodules/overlays/default/sounds.js b/internal/apimodules/overlays/default/sounds.js index 9020dc9..03ea0ac 100644 --- a/internal/apimodules/overlays/default/sounds.js +++ b/internal/apimodules/overlays/default/sounds.js @@ -8,11 +8,8 @@ new Vue({ new EventClient({ handlers: { - custom: (evt, data, time, live) => this.handleCustom(evt, data, time, live), + custom: ({ fields }) => this.handleCustom(fields), }, - - maxReplayAge: 720, - replay: true, }) }, @@ -64,14 +61,9 @@ new Vue({ source.connect(preGainNode) }, - handleCustom(evt, data, time, live) { + handleCustom(data) { switch (data.type) { case 'soundalert': - if (!live) { - // Not a live event, do not issue alerts - return - } - this.queueAlert({ soundUrl: data.soundUrl, }) diff --git a/internal/apimodules/overlays/overlays.go b/internal/apimodules/overlays/overlays.go index e36fc8e..2559a02 100644 --- a/internal/apimodules/overlays/overlays.go +++ b/internal/apimodules/overlays/overlays.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "sort" + "strconv" "strings" "sync" "time" @@ -31,14 +32,24 @@ const ( ) type ( + SendReason string + SocketMessage struct { - IsLive bool `json:"is_live"` - Time time.Time `json:"time"` - Type string `json:"type"` - Fields *plugins.FieldCollection `json:"fields"` + EventID uint64 `json:"event_id"` + IsLive bool `json:"is_live"` + Reason SendReason `json:"reason"` + Time time.Time `json:"time"` + Type string `json:"type"` + Fields *plugins.FieldCollection `json:"fields"` } ) +const ( + SendReasonLive SendReason = "live-event" + SendReasonBulkReplay SendReason = "bulk-replay" + SendReasonSingleReplay SendReason = "single-replay" +) + var ( //go:embed default/** embeddedOverlays embed.FS @@ -53,7 +64,7 @@ var ( "join", "part", // Those make no sense for replay } - subscribers = map[string]func(event string, eventData *plugins.FieldCollection){} + subscribers = map[string]func(SocketMessage){} subscribersLock sync.RWMutex upgrader = websocket.Upgrader{ @@ -64,6 +75,7 @@ var ( validateToken plugins.ValidateTokenFunc ) +//nolint:funlen func Register(args plugins.RegistrationArguments) error { db = args.GetDatabaseConnector() if err := db.DB().AutoMigrate(&overlaysEvent{}); err != nil { @@ -76,6 +88,22 @@ func Register(args plugins.RegistrationArguments) error { validateToken = args.ValidateToken + args.RegisterAPIRoute(plugins.HTTPRouteRegistrationArgs{ + Description: "Trigger a re-distribution of an event to all subscribed overlays", + HandlerFunc: handleSingleEventReplay, + Method: http.MethodPut, + Module: "overlays", + Name: "Replay Single Event", + Path: "/event/{event_id}/replay", + ResponseType: plugins.HTTPRouteResponseTypeNo200, + RouteParams: []plugins.HTTPRouteParamDocumentation{ + { + Description: "Event ID to replay (unique ID in database)", + Name: "event_id", + }, + }, + }) + args.RegisterAPIRoute(plugins.HTTPRouteRegistrationArgs{ Description: "Websocket subscriber for bot events", HandlerFunc: handleSocketSubscription, @@ -121,27 +149,36 @@ func Register(args plugins.RegistrationArguments) error { SkipDocumentation: true, }) - args.RegisterEventHandler(func(event string, eventData *plugins.FieldCollection) error { + args.RegisterEventHandler(func(event string, eventData *plugins.FieldCollection) (err error) { subscribersLock.RLock() defer subscribersLock.RUnlock() + msg := SocketMessage{ + IsLive: true, + Reason: SendReasonLive, + Time: time.Now(), + Type: event, + Fields: eventData, + } + + if msg.EventID, err = AddChannelEvent(db, plugins.DeriveChannel(nil, eventData), SocketMessage{ + IsLive: false, + Time: time.Now(), + Type: event, + Fields: eventData, + }); err != nil { + return errors.Wrap(err, "storing event") + } + for _, fn := range subscribers { - fn(event, eventData) + fn(msg) } if str.StringInSlice(event, storeExemption) { return nil } - return errors.Wrap( - AddChannelEvent(db, plugins.DeriveChannel(nil, eventData), SocketMessage{ - IsLive: false, - Time: time.Now(), - Type: event, - Fields: eventData, - }), - "storing event", - ) + return nil }) fsStack = httpFSStack{ @@ -180,6 +217,7 @@ func handleEventsReplay(w http.ResponseWriter, r *http.Request) { continue } + msg.Reason = SendReasonBulkReplay msgs = append(msgs, msg) } @@ -195,6 +233,29 @@ func handleServeOverlayAsset(w http.ResponseWriter, r *http.Request) { http.StripPrefix("/overlays", http.FileServer(fsStack)).ServeHTTP(w, r) } +func handleSingleEventReplay(w http.ResponseWriter, r *http.Request) { + eventID, err := strconv.ParseUint(mux.Vars(r)["event_id"], 10, 64) + if err != nil { + http.Error(w, errors.Wrap(err, "parsing event_id").Error(), http.StatusBadRequest) + return + } + + evt, err := GetEventByID(db, eventID) + if err != nil { + http.Error(w, errors.Wrap(err, "fetching event").Error(), http.StatusInternalServerError) + return + } + + evt.Reason = SendReasonSingleReplay + + subscribersLock.RLock() + defer subscribersLock.RUnlock() + + for _, fn := range subscribers { + fn(evt) + } +} + //nolint:funlen,gocognit,gocyclo // Not split in order to keep the socket logic in one place func handleSocketSubscription(w http.ResponseWriter, r *http.Request) { var ( @@ -219,14 +280,7 @@ func handleSocketSubscription(w http.ResponseWriter, r *http.Request) { ) // Register listener - unsub := subscribeSocket(func(event string, eventData *plugins.FieldCollection) { - sendMsgC <- SocketMessage{ - IsLive: true, - Time: time.Now(), - Type: event, - Fields: eventData, - } - }) + unsub := subscribeSocket(func(msg SocketMessage) { sendMsgC <- msg }) defer unsub() keepAlive := time.NewTicker(socketKeepAlive) @@ -345,7 +399,7 @@ func handleSocketSubscription(w http.ResponseWriter, r *http.Request) { } } -func subscribeSocket(fn func(event string, eventData *plugins.FieldCollection)) func() { +func subscribeSocket(fn func(SocketMessage)) func() { id := uuid.Must(uuid.NewV4()).String() subscribersLock.Lock()