diff --git a/internal/apimodules/overlays/default/eventclient.mjs b/internal/apimodules/overlays/default/eventclient.mjs index f73ed23..f3bb7bf 100644 --- a/internal/apimodules/overlays/default/eventclient.mjs +++ b/internal/apimodules/overlays/default/eventclient.mjs @@ -8,6 +8,8 @@ * @prop {string} [token] - API access token to use to connect to the WebSocket */ +const HOUR = 3600 * 1000 + const initialSocketBackoff = 500 const maxSocketBackoff = 10000 const socketBackoffMultiplier = 1.25 @@ -34,6 +36,14 @@ export default class EventClient { this.socketBackoff = initialSocketBackoff this.connect() + + // If reply is enabled and channel is provided, fetch the replay + if (this.paramOptionFallback('replay', false) && this.paramOptionFallback('channel')) { + this.fetchReplayForChannel( + this.paramOptionFallback('channel'), + Number(this.paramOptionFallback('maxReplayAge', -1)), + ) + } } /** @@ -69,18 +79,6 @@ export default class EventClient { if (data.type === '_auth') { // Special handling for auth confirmation this.socketBackoff = initialSocketBackoff - - // Auth was confirmed, request replay if wanted by client - if (this.paramOptionFallback('replay', false) && this.paramOptionFallback('channel')) { - this.socket.send(JSON.stringify({ - fields: { - channel: this.paramOptionFallback('channel'), - maxage: Number(this.paramOptionFallback('maxReplayAge', -1)), - }, - type: '_replay', - })) - } - return } @@ -102,6 +100,38 @@ export default class EventClient { } } + /* + * Requests past events from the API and feed them through the registered handlers + * + * @params {string} channel The channel to fetch the events for + * @params {number} hours The amount of hours to fetch into the past (-1 = infinite) + * @returns {Promise} Can be listened for failures using `.catch` + */ + fetchReplayForChannel(channel, hours = -1) { + const params = new URLSearchParams() + if (hours > -1) { + params.set('since', new Date(new Date().getTime() - hours * HOUR).toISOString()) + } + + return fetch(`${this.apiBase()}/overlays/events/${encodeURIComponent(channel)}?${params.toString()}`, { + headers: { + authorization: this.paramOptionFallback('token'), + }, + }) + .then(resp => resp.json()) + .then(data => { + const handlers = [] + + for (const msg of data) { + for (const fn of [this.handlers[msg.type], this.handlers._].filter(fn => fn)) { + handlers.push(fn(msg.type, msg.fields, new Date(msg.time), msg.is_live)) + } + } + + return Promise.all(handlers) + }) + } + /** * Resolves the given key through url hash parameters with fallback to constructor options * diff --git a/internal/apimodules/overlays/overlays.go b/internal/apimodules/overlays/overlays.go index bbd8b1c..fb47965 100644 --- a/internal/apimodules/overlays/overlays.go +++ b/internal/apimodules/overlays/overlays.go @@ -5,10 +5,13 @@ import ( "encoding/json" "net/http" "os" + "sort" + "strings" "sync" "time" "github.com/gofrs/uuid" + "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -24,8 +27,7 @@ const ( moduleUUID = "f9ca2b3a-baf6-45ea-a347-c626168665e8" - msgTypeRequestAuth = "_auth" - msgTypeRequestReplay = "_replay" + msgTypeRequestAuth = "_auth" ) type ( @@ -49,7 +51,6 @@ var ( fsStack httpFSStack - ptrIntMinusOne = func(v int64) *int64 { return &v }(-1) ptrStringEmpty = func(v string) *string { return &v }("") store plugins.StorageManager @@ -83,6 +84,31 @@ func Register(args plugins.RegistrationArguments) error { ResponseType: plugins.HTTPRouteResponseTypeMultiple, }) + args.RegisterAPIRoute(plugins.HTTPRouteRegistrationArgs{ + Description: "Fetch past events for the given channel", + HandlerFunc: handleEventsReplay, + Method: http.MethodGet, + Module: "overlays", + Name: "Replay", + Path: "/events/{channel}", + QueryParams: []plugins.HTTPRouteParamDocumentation{ + { + Description: "ISO / RFC3339 timestamp to fetch the events after", + Name: "since", + Required: false, + Type: "string", + }, + }, + RequiresWriteAuth: true, + ResponseType: plugins.HTTPRouteResponseTypeJSON, + RouteParams: []plugins.HTTPRouteParamDocumentation{ + { + Description: "Channel to fetch the events from", + Name: "channel", + }, + }, + }) + args.RegisterAPIRoute(plugins.HTTPRouteRegistrationArgs{ HandlerFunc: handleServeOverlayAsset, IsPrefix: true, @@ -135,6 +161,33 @@ func Register(args plugins.RegistrationArguments) error { ) } +func handleEventsReplay(w http.ResponseWriter, r *http.Request) { + var ( + channel = mux.Vars(r)["channel"] + msgs []socketMessage + since = time.Time{} + ) + + if s, err := time.Parse(time.RFC3339, r.URL.Query().Get("since")); err == nil { + since = s + } + + for _, msg := range storedObject.GetChannelEvents("#" + strings.TrimLeft(channel, "#")) { + if msg.Time.Before(since) { + continue + } + + msgs = append(msgs, msg) + } + + sort.Slice(msgs, func(i, j int) bool { return msgs[i].Time.Before(msgs[j].Time) }) + + if err := json.NewEncoder(w).Encode(msgs); err != nil { + http.Error(w, errors.Wrap(err, "encoding response").Error(), http.StatusInternalServerError) + return + } +} + func handleServeOverlayAsset(w http.ResponseWriter, r *http.Request) { http.StripPrefix("/overlays", http.FileServer(fsStack)).ServeHTTP(w, r) } @@ -245,19 +298,6 @@ func handleSocketSubscription(w http.ResponseWriter, r *http.Request) { Type: msgTypeRequestAuth, } - case msgTypeRequestReplay: - go func() { - maxAge := time.Duration(recvMsg.Fields.MustInt64("maxage", ptrIntMinusOne)) * time.Hour - log.Errorf("DEBUG %s %T", maxAge, recvMsg.Fields.Data()["maxage"]) - for _, msg := range storedObject.GetChannelEvents(recvMsg.Fields.MustString("channel", ptrStringEmpty)) { - if maxAge > 0 && time.Since(msg.Time) > maxAge { - continue - } - - sendMsgC <- msg - } - }() - default: logger.WithField("type", recvMsg.Type).Warn("Got unexpected message type from frontend") }