From e1f11a6c98d9dad0bc73a9f26ca989de2355d954 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Wed, 13 Dec 2023 23:14:23 +0100 Subject: [PATCH] [eventsub] Make topic subscriptions more dynamic Signed-off-by: Knut Ahlers --- pkg/twitch/eventsubWebsocketClient.go | 101 ++++++++++++++++++++------ pkg/twitch/twitch.go | 19 +++-- twitchWatcher.go | 22 +++++- 3 files changed, 112 insertions(+), 30 deletions(-) diff --git a/pkg/twitch/eventsubWebsocketClient.go b/pkg/twitch/eventsubWebsocketClient.go index 838b70c..0424740 100644 --- a/pkg/twitch/eventsubWebsocketClient.go +++ b/pkg/twitch/eventsubWebsocketClient.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/Luzifer/go_helpers/v2/backoff" "github.com/Luzifer/twitch-bot/v3/internal/helpers" ) @@ -21,6 +22,10 @@ const ( socketConnectTimeout = 15 * time.Second socketInitialTimeout = 30 * time.Second socketTimeoutGraceMultiplier = 1.5 + + retrySubscribeMaxTotal = 30 * time.Minute + retrySubscribeMaxWait = 5 * time.Minute + retrySubscribeMinWait = 30 * time.Second ) const ( @@ -70,9 +75,10 @@ type ( } eventSubSocketSubscriptionType struct { - Event, Version string - Condition EventSubCondition - Callback func(json.RawMessage) error + Event, Version string + Condition EventSubCondition + Callback func(json.RawMessage) error + BackgroundRetry bool } eventSubSocketPayloadNotification struct { @@ -136,11 +142,7 @@ func WithLogger(logger *logrus.Entry) EventSubSocketClientOpt { return func(e *EventSubSocketClient) { e.logger = logger } } -func WithSocketURL(url string) EventSubSocketClientOpt { - return func(e *EventSubSocketClient) { e.socketDest = url } -} - -func WithSubscription(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) EventSubSocketClientOpt { +func WithMustSubscribe(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) EventSubSocketClientOpt { if version == "" { version = EventSubTopicVersion1 } @@ -155,6 +157,26 @@ func WithSubscription(event, version string, condition EventSubCondition, callba } } +func WithRetryBackgroundSubscribe(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) EventSubSocketClientOpt { + if version == "" { + version = EventSubTopicVersion1 + } + + return func(e *EventSubSocketClient) { + e.subscriptionTypes = append(e.subscriptionTypes, eventSubSocketSubscriptionType{ + Event: event, + Version: version, + Condition: condition, + Callback: callback, + BackgroundRetry: true, + }) + } +} + +func WithSocketURL(url string) EventSubSocketClientOpt { + return func(e *EventSubSocketClient) { e.socketDest = url } +} + func WithTwitchClient(c *Client) EventSubSocketClientOpt { return func(e *EventSubSocketClient) { e.twitch = c } } @@ -382,7 +404,7 @@ func (e *EventSubSocketClient) handleWelcomeMessage(msg eventSubSocketMessage) ( // connection after something broke) if e.socketID != payload.Session.ID { e.socketID = payload.Session.ID - if err := e.subscribe(); err != nil { + if err := e.subscribeAll(); err != nil { return socketInitialTimeout, errors.Wrap(err, "subscribing to topics") } } @@ -393,21 +415,56 @@ func (e *EventSubSocketClient) handleWelcomeMessage(msg eventSubSocketMessage) ( return time.Duration(float64(payload.Session.KeepaliveTimeoutSeconds)*socketTimeoutGraceMultiplier) * time.Second, nil } -func (e *EventSubSocketClient) subscribe() error { - for _, st := range e.subscriptionTypes { - if _, err := e.twitch.createEventSubSubscriptionWebsocket(context.Background(), eventSubSubscription{ - Type: st.Event, - Version: st.Version, - Condition: st.Condition, - Transport: eventSubTransport{ - Method: "websocket", - SessionID: e.socketID, - }, - }); err != nil { - return errors.Wrapf(err, "subscribing to %s/%s", st.Event, st.Version) +func (e *EventSubSocketClient) retryBackgroundSubscribe(st eventSubSocketSubscriptionType) { + err := backoff.NewBackoff(). + WithMaxIterationTime(retrySubscribeMaxWait). + WithMaxTotalTime(retrySubscribeMaxTotal). + WithMinIterationTime(retrySubscribeMinWait). + Retry(func() error { + return e.subscribe(st) + }) + if err != nil { + e.logger. + WithField("topic", strings.Join([]string{st.Event, st.Version}, "/")). + Error("gave up retrying to subscribe") + } +} + +func (e *EventSubSocketClient) subscribe(st eventSubSocketSubscriptionType) error { + logger := e.logger. + WithField("topic", strings.Join([]string{st.Event, st.Version}, "/")) + + if _, err := e.twitch.createEventSubSubscriptionWebsocket(context.Background(), eventSubSubscription{ + Type: st.Event, + Version: st.Version, + Condition: st.Condition, + Transport: eventSubTransport{ + Method: "websocket", + SessionID: e.socketID, + }, + }); err != nil { + logger.WithError(err).Debug("subscribing to topic") + return errors.Wrapf(err, "subscribing to %s/%s", st.Event, st.Version) + } + + logger. + WithField("topic", strings.Join([]string{st.Event, st.Version}, "/")). + Debug("subscribed to topic") + return nil +} + +func (e *EventSubSocketClient) subscribeAll() (err error) { + for i := range e.subscriptionTypes { + st := e.subscriptionTypes[i] + + if st.BackgroundRetry { + go e.retryBackgroundSubscribe(st) + continue } - e.logger.WithField("topic", strings.Join([]string{st.Event, st.Version}, "/")).Debug("subscribed to topic") + if err = e.subscribe(st); err != nil { + return err + } } return nil diff --git a/pkg/twitch/twitch.go b/pkg/twitch/twitch.go index 31da2fe..e7bf7fc 100644 --- a/pkg/twitch/twitch.go +++ b/pkg/twitch/twitch.go @@ -101,17 +101,22 @@ type ( // before in order to have the response body available in the returned // HTTPError func ValidateStatus(opts ClientRequestOpts, resp *http.Response) error { - if resp.StatusCode == http.StatusTooManyRequests { - // Twitch doesn't want to hear any more of this - return backoff.NewErrCannotRetry(newHTTPError(resp.StatusCode, nil, nil)) - } - if opts.OKStatus != 0 && resp.StatusCode != opts.OKStatus { + // We shall not accept this! + var ret error + body, err := io.ReadAll(resp.Body) if err != nil { - return newHTTPError(resp.StatusCode, nil, err) + ret = newHTTPError(resp.StatusCode, nil, err) + } else { + ret = newHTTPError(resp.StatusCode, body, nil) } - return newHTTPError(resp.StatusCode, body, nil) + + if resp.StatusCode == http.StatusTooManyRequests { + // Twitch doesn't want to hear any more of this + return backoff.NewErrCannotRetry(ret) + } + return ret } return nil diff --git a/twitchWatcher.go b/twitchWatcher.go index 2644f89..2c2da11 100644 --- a/twitchWatcher.go +++ b/twitchWatcher.go @@ -21,6 +21,7 @@ type ( AnyScope bool Hook func(json.RawMessage) error Version string + Optional bool } twitchChannelState struct { @@ -118,6 +119,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration Condition: twitch.EventSubCondition{BroadcasterUserID: userID}, RequiredScopes: []string{twitch.ScopeChannelReadAds}, Hook: t.handleEventSubChannelAdBreakBegin, + Optional: true, }, { Topic: twitch.EventSubEventTypeChannelFollow, @@ -125,6 +127,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration Condition: twitch.EventSubCondition{BroadcasterUserID: userID, ModeratorUserID: userID}, RequiredScopes: []string{twitch.ScopeModeratorReadFollowers}, Hook: t.handleEventSubChannelFollow, + Optional: true, }, { Topic: twitch.EventSubEventTypeChannelPointCustomRewardRedemptionAdd, @@ -132,6 +135,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration RequiredScopes: []string{twitch.ScopeChannelReadRedemptions, twitch.ScopeChannelManageRedemptions}, AnyScope: true, Hook: t.handleEventSubChannelPointCustomRewardRedemptionAdd, + Optional: true, }, { Topic: twitch.EventSubEventTypeChannelPollBegin, @@ -139,6 +143,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls}, AnyScope: true, Hook: t.handleEventSubChannelPollChange(eventTypePollBegin), + Optional: true, }, { Topic: twitch.EventSubEventTypeChannelPollEnd, @@ -146,6 +151,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls}, AnyScope: true, Hook: t.handleEventSubChannelPollChange(eventTypePollEnd), + Optional: true, }, { Topic: twitch.EventSubEventTypeChannelPollProgress, @@ -153,12 +159,14 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls}, AnyScope: true, Hook: t.handleEventSubChannelPollChange(eventTypePollProgress), + Optional: true, }, { Topic: twitch.EventSubEventTypeChannelRaid, Condition: twitch.EventSubCondition{FromBroadcasterUserID: userID}, RequiredScopes: nil, Hook: t.handleEventSubChannelOutboundRaid, + Optional: true, }, { Topic: twitch.EventSubEventTypeChannelShoutoutCreate, @@ -166,6 +174,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration RequiredScopes: []string{twitch.ScopeModeratorManageShoutouts, twitch.ScopeModeratorReadShoutouts}, AnyScope: true, Hook: t.handleEventSubShoutoutCreated, + Optional: true, }, { Topic: twitch.EventSubEventTypeChannelShoutoutReceive, @@ -173,6 +182,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration RequiredScopes: []string{twitch.ScopeModeratorManageShoutouts, twitch.ScopeModeratorReadShoutouts}, AnyScope: true, Hook: t.handleEventSubShoutoutReceived, + Optional: true, }, { Topic: twitch.EventSubEventTypeChannelUpdate, @@ -180,18 +190,21 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration Condition: twitch.EventSubCondition{BroadcasterUserID: userID}, RequiredScopes: nil, Hook: t.handleEventSubChannelUpdate, + Optional: true, }, { Topic: twitch.EventSubEventTypeStreamOffline, Condition: twitch.EventSubCondition{BroadcasterUserID: userID}, RequiredScopes: nil, Hook: t.handleEventSubStreamOnOff(false), + Optional: true, }, { Topic: twitch.EventSubEventTypeStreamOnline, Condition: twitch.EventSubCondition{BroadcasterUserID: userID}, RequiredScopes: nil, Hook: t.handleEventSubStreamOnOff(true), + Optional: true, }, } } @@ -477,7 +490,14 @@ func (t *twitchWatcher) registerEventSubCallbacks(channel string) (*twitch.Event } } - topicOpts = append(topicOpts, twitch.WithSubscription(tr.Topic, tr.Version, tr.Condition, tr.Hook)) + var opt twitch.EventSubSocketClientOpt + if tr.Optional { + opt = twitch.WithRetryBackgroundSubscribe(tr.Topic, tr.Version, tr.Condition, tr.Hook) + } else { + opt = twitch.WithMustSubscribe(tr.Topic, tr.Version, tr.Condition, tr.Hook) + } + + topicOpts = append(topicOpts, opt) } esClient, err := twitch.NewEventSubSocketClient(append(