[overlays] Add parameter to limit replay message age

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2022-02-20 13:36:36 +01:00
parent 135404862a
commit ddc52d8920
Signed by: luzifer
GPG key ID: 0066F03ED215AD7D
2 changed files with 12 additions and 1 deletions

View file

@ -3,6 +3,7 @@
* @typedef {Object} EventClient~Options * @typedef {Object} EventClient~Options
* @prop {string} [channel] - Filter for specific channel events (format: `#channel`) * @prop {string} [channel] - Filter for specific channel events (format: `#channel`)
* @prop {Object} handlers - Map event types to callback functions `(event, fields, time, live) => {...}` * @prop {Object} handlers - Map event types to callback functions `(event, fields, time, live) => {...}`
* @prop {number} [maxReplayAge=-1] - Number of hours to replay the events for (-1 = infinite)
* @prop {boolean} replay - Request a replay at connect (requires channel to be set to a channel name) * @prop {boolean} replay - Request a replay at connect (requires channel to be set to a channel name)
* @prop {string} [token] - API access token to use to connect to the WebSocket * @prop {string} [token] - API access token to use to connect to the WebSocket
*/ */
@ -63,7 +64,10 @@ export default class EventClient {
// Auth was confirmed, request replay if wanted by client // Auth was confirmed, request replay if wanted by client
if (this.paramOptionFallback('replay', false) && this.paramOptionFallback('channel')) { if (this.paramOptionFallback('replay', false) && this.paramOptionFallback('channel')) {
this.socket.send(JSON.stringify({ this.socket.send(JSON.stringify({
fields: { channel: this.paramOptionFallback('channel') }, fields: {
channel: this.paramOptionFallback('channel'),
maxage: Number(this.paramOptionFallback('maxReplayAge', -1)),
},
type: '_replay', type: '_replay',
})) }))
} }

View file

@ -49,6 +49,7 @@ var (
fsStack httpFSStack fsStack httpFSStack
ptrIntMinusOne = func(v int64) *int64 { return &v }(-1)
ptrStringEmpty = func(v string) *string { return &v }("") ptrStringEmpty = func(v string) *string { return &v }("")
store plugins.StorageManager store plugins.StorageManager
@ -246,7 +247,13 @@ func handleSocketSubscription(w http.ResponseWriter, r *http.Request) {
case msgTypeRequestReplay: case msgTypeRequestReplay:
go func() { go func() {
maxAge := time.Duration(recvMsg.Fields.MustInt64("maxage", ptrIntMinusOne)) * time.Hour
log.Errorf("DEBUG %s %T", maxAge, recvMsg.Fields.Data()["maxage"])
for _, msg := range storedObject.GetChannelEvents(recvMsg.Fields.MustString("channel", ptrStringEmpty)) { for _, msg := range storedObject.GetChannelEvents(recvMsg.Fields.MustString("channel", ptrStringEmpty)) {
if maxAge > 0 && time.Since(msg.Time) > maxAge {
continue
}
sendMsgC <- msg sendMsgC <- msg
} }
}() }()