[eventsub] Replace keepalive timer

as `time.Timer` wasn't really suited for how it was used

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2023-10-29 16:54:11 +01:00
parent 1a76b75968
commit 932e6907da
Signed by: luzifer
GPG key ID: D91C3E91E4CAD6F5
2 changed files with 49 additions and 4 deletions

View file

@ -166,7 +166,8 @@ func (e *EventSubSocketClient) Run() error {
errC = make(chan error, 1) errC = make(chan error, 1)
keepaliveTimeout = socketInitialTimeout keepaliveTimeout = socketInitialTimeout
msgC = make(chan eventSubSocketMessage, 1) msgC = make(chan eventSubSocketMessage, 1)
socketTimeout = time.NewTimer(keepaliveTimeout) timeoutC = make(chan struct{}, 1)
socketTimeout = newKeepaliveTracker(timeoutC, keepaliveTimeout)
) )
if err := e.connect(e.socketDest, msgC, errC, "client init"); err != nil { if err := e.connect(e.socketDest, msgC, errC, "client init"); err != nil {
@ -187,9 +188,14 @@ func (e *EventSubSocketClient) Run() error {
return err return err
} }
case <-socketTimeout.C: case <-timeoutC:
// No message received, deeming connection dead // No message received, deeming connection dead
socketTimeout.Reset(socketInitialTimeout) e.logger.WithFields(logrus.Fields{
"expired": socketTimeout.ExpiresAt(),
"last_event": socketTimeout.LastRenew(),
}).Warn("eventsub socket missed keepalive")
socketTimeout = newKeepaliveTracker(timeoutC, socketInitialTimeout)
if err := e.connect(e.socketDest, msgC, errC, "socket timeout"); err != nil { if err := e.connect(e.socketDest, msgC, errC, "socket timeout"); err != nil {
errC <- errors.Wrap(err, "re-connecting after timeout") errC <- errors.Wrap(err, "re-connecting after timeout")
continue continue
@ -198,7 +204,7 @@ func (e *EventSubSocketClient) Run() error {
case msg := <-msgC: case msg := <-msgC:
// The keepalive timer is reset with each notification or // The keepalive timer is reset with each notification or
// keepalive message. // keepalive message.
socketTimeout.Reset(keepaliveTimeout) socketTimeout.Renew(keepaliveTimeout)
switch msg.Metadata.MessageType { switch msg.Metadata.MessageType {
case eventsubSocketMessageTypeKeepalive: case eventsubSocketMessageTypeKeepalive:

39
pkg/twitch/keepalive.go Normal file
View file

@ -0,0 +1,39 @@
package twitch
import "time"
const keepaliveTrackerCheckInterval = 100 * time.Millisecond
type (
keepaliveTracker struct {
c chan<- struct{}
expires time.Time
renewed time.Time
}
)
func newKeepaliveTracker(timeout chan<- struct{}, d time.Duration) *keepaliveTracker {
t := &keepaliveTracker{
c: timeout,
expires: time.Now().Add(d),
}
go t.run()
return t
}
func (t keepaliveTracker) ExpiresAt() time.Time { return t.expires }
func (t keepaliveTracker) LastRenew() time.Time { return t.renewed }
func (t *keepaliveTracker) Renew(d time.Duration) {
t.expires = time.Now().Add(d)
t.renewed = time.Now()
}
func (t *keepaliveTracker) run() {
for t.expires.After(time.Now()) {
time.Sleep(keepaliveTrackerCheckInterval)
}
t.c <- struct{}{}
}