From 932e6907dac75d1866b6243fafa370a8b2f4921e Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Sun, 29 Oct 2023 16:54:11 +0100 Subject: [PATCH] [eventsub] Replace keepalive timer as `time.Timer` wasn't really suited for how it was used Signed-off-by: Knut Ahlers --- pkg/twitch/eventsubWebsocketClient.go | 14 +++++++--- pkg/twitch/keepalive.go | 39 +++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) create mode 100644 pkg/twitch/keepalive.go diff --git a/pkg/twitch/eventsubWebsocketClient.go b/pkg/twitch/eventsubWebsocketClient.go index 54989b2..2347154 100644 --- a/pkg/twitch/eventsubWebsocketClient.go +++ b/pkg/twitch/eventsubWebsocketClient.go @@ -166,7 +166,8 @@ func (e *EventSubSocketClient) Run() error { errC = make(chan error, 1) keepaliveTimeout = socketInitialTimeout msgC = make(chan eventSubSocketMessage, 1) - socketTimeout = time.NewTimer(keepaliveTimeout) + timeoutC = make(chan struct{}, 1) + socketTimeout = newKeepaliveTracker(timeoutC, keepaliveTimeout) ) if err := e.connect(e.socketDest, msgC, errC, "client init"); err != nil { @@ -187,9 +188,14 @@ func (e *EventSubSocketClient) Run() error { return err } - case <-socketTimeout.C: + case <-timeoutC: // No message received, deeming connection dead - socketTimeout.Reset(socketInitialTimeout) + e.logger.WithFields(logrus.Fields{ + "expired": socketTimeout.ExpiresAt(), + "last_event": socketTimeout.LastRenew(), + }).Warn("eventsub socket missed keepalive") + + socketTimeout = newKeepaliveTracker(timeoutC, socketInitialTimeout) if err := e.connect(e.socketDest, msgC, errC, "socket timeout"); err != nil { errC <- errors.Wrap(err, "re-connecting after timeout") continue @@ -198,7 +204,7 @@ func (e *EventSubSocketClient) Run() error { case msg := <-msgC: // The keepalive timer is reset with each notification or // keepalive message. - socketTimeout.Reset(keepaliveTimeout) + socketTimeout.Renew(keepaliveTimeout) switch msg.Metadata.MessageType { case eventsubSocketMessageTypeKeepalive: diff --git a/pkg/twitch/keepalive.go b/pkg/twitch/keepalive.go new file mode 100644 index 0000000..dc8dc02 --- /dev/null +++ b/pkg/twitch/keepalive.go @@ -0,0 +1,39 @@ +package twitch + +import "time" + +const keepaliveTrackerCheckInterval = 100 * time.Millisecond + +type ( + keepaliveTracker struct { + c chan<- struct{} + expires time.Time + renewed time.Time + } +) + +func newKeepaliveTracker(timeout chan<- struct{}, d time.Duration) *keepaliveTracker { + t := &keepaliveTracker{ + c: timeout, + expires: time.Now().Add(d), + } + + go t.run() + + return t +} + +func (t keepaliveTracker) ExpiresAt() time.Time { return t.expires } +func (t keepaliveTracker) LastRenew() time.Time { return t.renewed } + +func (t *keepaliveTracker) Renew(d time.Duration) { + t.expires = time.Now().Add(d) + t.renewed = time.Now() +} + +func (t *keepaliveTracker) run() { + for t.expires.After(time.Now()) { + time.Sleep(keepaliveTrackerCheckInterval) + } + t.c <- struct{}{} +}