[overlays] Move replay into own HTTP request

as replaying through the socket takes ages for longer time ranges

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2022-04-03 15:53:18 +02:00
parent 91846e300f
commit c51a0751f0
Signed by: luzifer
GPG key ID: 0066F03ED215AD7D
2 changed files with 98 additions and 28 deletions

View file

@ -8,6 +8,8 @@
* @prop {string} [token] - API access token to use to connect to the WebSocket * @prop {string} [token] - API access token to use to connect to the WebSocket
*/ */
const HOUR = 3600 * 1000
const initialSocketBackoff = 500 const initialSocketBackoff = 500
const maxSocketBackoff = 10000 const maxSocketBackoff = 10000
const socketBackoffMultiplier = 1.25 const socketBackoffMultiplier = 1.25
@ -34,6 +36,14 @@ export default class EventClient {
this.socketBackoff = initialSocketBackoff this.socketBackoff = initialSocketBackoff
this.connect() this.connect()
// If reply is enabled and channel is provided, fetch the replay
if (this.paramOptionFallback('replay', false) && this.paramOptionFallback('channel')) {
this.fetchReplayForChannel(
this.paramOptionFallback('channel'),
Number(this.paramOptionFallback('maxReplayAge', -1)),
)
}
} }
/** /**
@ -69,18 +79,6 @@ export default class EventClient {
if (data.type === '_auth') { if (data.type === '_auth') {
// Special handling for auth confirmation // Special handling for auth confirmation
this.socketBackoff = initialSocketBackoff this.socketBackoff = initialSocketBackoff
// Auth was confirmed, request replay if wanted by client
if (this.paramOptionFallback('replay', false) && this.paramOptionFallback('channel')) {
this.socket.send(JSON.stringify({
fields: {
channel: this.paramOptionFallback('channel'),
maxage: Number(this.paramOptionFallback('maxReplayAge', -1)),
},
type: '_replay',
}))
}
return return
} }
@ -102,6 +100,38 @@ export default class EventClient {
} }
} }
/*
* Requests past events from the API and feed them through the registered handlers
*
* @params {string} channel The channel to fetch the events for
* @params {number} hours The amount of hours to fetch into the past (-1 = infinite)
* @returns {Promise} Can be listened for failures using `.catch`
*/
fetchReplayForChannel(channel, hours = -1) {
const params = new URLSearchParams()
if (hours > -1) {
params.set('since', new Date(new Date().getTime() - hours * HOUR).toISOString())
}
return fetch(`${this.apiBase()}/overlays/events/${encodeURIComponent(channel)}?${params.toString()}`, {
headers: {
authorization: this.paramOptionFallback('token'),
},
})
.then(resp => resp.json())
.then(data => {
const handlers = []
for (const msg of data) {
for (const fn of [this.handlers[msg.type], this.handlers._].filter(fn => fn)) {
handlers.push(fn(msg.type, msg.fields, new Date(msg.time), msg.is_live))
}
}
return Promise.all(handlers)
})
}
/** /**
* Resolves the given key through url hash parameters with fallback to constructor options * Resolves the given key through url hash parameters with fallback to constructor options
* *

View file

@ -5,10 +5,13 @@ import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"os" "os"
"sort"
"strings"
"sync" "sync"
"time" "time"
"github.com/gofrs/uuid" "github.com/gofrs/uuid"
"github.com/gorilla/mux"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -24,8 +27,7 @@ const (
moduleUUID = "f9ca2b3a-baf6-45ea-a347-c626168665e8" moduleUUID = "f9ca2b3a-baf6-45ea-a347-c626168665e8"
msgTypeRequestAuth = "_auth" msgTypeRequestAuth = "_auth"
msgTypeRequestReplay = "_replay"
) )
type ( type (
@ -49,7 +51,6 @@ var (
fsStack httpFSStack fsStack httpFSStack
ptrIntMinusOne = func(v int64) *int64 { return &v }(-1)
ptrStringEmpty = func(v string) *string { return &v }("") ptrStringEmpty = func(v string) *string { return &v }("")
store plugins.StorageManager store plugins.StorageManager
@ -83,6 +84,31 @@ func Register(args plugins.RegistrationArguments) error {
ResponseType: plugins.HTTPRouteResponseTypeMultiple, ResponseType: plugins.HTTPRouteResponseTypeMultiple,
}) })
args.RegisterAPIRoute(plugins.HTTPRouteRegistrationArgs{
Description: "Fetch past events for the given channel",
HandlerFunc: handleEventsReplay,
Method: http.MethodGet,
Module: "overlays",
Name: "Replay",
Path: "/events/{channel}",
QueryParams: []plugins.HTTPRouteParamDocumentation{
{
Description: "ISO / RFC3339 timestamp to fetch the events after",
Name: "since",
Required: false,
Type: "string",
},
},
RequiresWriteAuth: true,
ResponseType: plugins.HTTPRouteResponseTypeJSON,
RouteParams: []plugins.HTTPRouteParamDocumentation{
{
Description: "Channel to fetch the events from",
Name: "channel",
},
},
})
args.RegisterAPIRoute(plugins.HTTPRouteRegistrationArgs{ args.RegisterAPIRoute(plugins.HTTPRouteRegistrationArgs{
HandlerFunc: handleServeOverlayAsset, HandlerFunc: handleServeOverlayAsset,
IsPrefix: true, IsPrefix: true,
@ -135,6 +161,33 @@ func Register(args plugins.RegistrationArguments) error {
) )
} }
func handleEventsReplay(w http.ResponseWriter, r *http.Request) {
var (
channel = mux.Vars(r)["channel"]
msgs []socketMessage
since = time.Time{}
)
if s, err := time.Parse(time.RFC3339, r.URL.Query().Get("since")); err == nil {
since = s
}
for _, msg := range storedObject.GetChannelEvents("#" + strings.TrimLeft(channel, "#")) {
if msg.Time.Before(since) {
continue
}
msgs = append(msgs, msg)
}
sort.Slice(msgs, func(i, j int) bool { return msgs[i].Time.Before(msgs[j].Time) })
if err := json.NewEncoder(w).Encode(msgs); err != nil {
http.Error(w, errors.Wrap(err, "encoding response").Error(), http.StatusInternalServerError)
return
}
}
func handleServeOverlayAsset(w http.ResponseWriter, r *http.Request) { func handleServeOverlayAsset(w http.ResponseWriter, r *http.Request) {
http.StripPrefix("/overlays", http.FileServer(fsStack)).ServeHTTP(w, r) http.StripPrefix("/overlays", http.FileServer(fsStack)).ServeHTTP(w, r)
} }
@ -245,19 +298,6 @@ func handleSocketSubscription(w http.ResponseWriter, r *http.Request) {
Type: msgTypeRequestAuth, Type: msgTypeRequestAuth,
} }
case msgTypeRequestReplay:
go func() {
maxAge := time.Duration(recvMsg.Fields.MustInt64("maxage", ptrIntMinusOne)) * time.Hour
log.Errorf("DEBUG %s %T", maxAge, recvMsg.Fields.Data()["maxage"])
for _, msg := range storedObject.GetChannelEvents(recvMsg.Fields.MustString("channel", ptrStringEmpty)) {
if maxAge > 0 && time.Since(msg.Time) > maxAge {
continue
}
sendMsgC <- msg
}
}()
default: default:
logger.WithField("type", recvMsg.Type).Warn("Got unexpected message type from frontend") logger.WithField("type", recvMsg.Type).Warn("Got unexpected message type from frontend")
} }