[eventsub] Make topic subscriptions more dynamic

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2023-12-13 23:14:23 +01:00
parent 091dac235b
commit e1f11a6c98
Signed by: luzifer
GPG key ID: D91C3E91E4CAD6F5
3 changed files with 112 additions and 30 deletions

View file

@ -13,6 +13,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/Luzifer/go_helpers/v2/backoff"
"github.com/Luzifer/twitch-bot/v3/internal/helpers" "github.com/Luzifer/twitch-bot/v3/internal/helpers"
) )
@ -21,6 +22,10 @@ const (
socketConnectTimeout = 15 * time.Second socketConnectTimeout = 15 * time.Second
socketInitialTimeout = 30 * time.Second socketInitialTimeout = 30 * time.Second
socketTimeoutGraceMultiplier = 1.5 socketTimeoutGraceMultiplier = 1.5
retrySubscribeMaxTotal = 30 * time.Minute
retrySubscribeMaxWait = 5 * time.Minute
retrySubscribeMinWait = 30 * time.Second
) )
const ( const (
@ -73,6 +78,7 @@ type (
Event, Version string Event, Version string
Condition EventSubCondition Condition EventSubCondition
Callback func(json.RawMessage) error Callback func(json.RawMessage) error
BackgroundRetry bool
} }
eventSubSocketPayloadNotification struct { eventSubSocketPayloadNotification struct {
@ -136,11 +142,7 @@ func WithLogger(logger *logrus.Entry) EventSubSocketClientOpt {
return func(e *EventSubSocketClient) { e.logger = logger } return func(e *EventSubSocketClient) { e.logger = logger }
} }
func WithSocketURL(url string) EventSubSocketClientOpt { func WithMustSubscribe(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) EventSubSocketClientOpt {
return func(e *EventSubSocketClient) { e.socketDest = url }
}
func WithSubscription(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) EventSubSocketClientOpt {
if version == "" { if version == "" {
version = EventSubTopicVersion1 version = EventSubTopicVersion1
} }
@ -155,6 +157,26 @@ func WithSubscription(event, version string, condition EventSubCondition, callba
} }
} }
func WithRetryBackgroundSubscribe(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) EventSubSocketClientOpt {
if version == "" {
version = EventSubTopicVersion1
}
return func(e *EventSubSocketClient) {
e.subscriptionTypes = append(e.subscriptionTypes, eventSubSocketSubscriptionType{
Event: event,
Version: version,
Condition: condition,
Callback: callback,
BackgroundRetry: true,
})
}
}
func WithSocketURL(url string) EventSubSocketClientOpt {
return func(e *EventSubSocketClient) { e.socketDest = url }
}
func WithTwitchClient(c *Client) EventSubSocketClientOpt { func WithTwitchClient(c *Client) EventSubSocketClientOpt {
return func(e *EventSubSocketClient) { e.twitch = c } return func(e *EventSubSocketClient) { e.twitch = c }
} }
@ -382,7 +404,7 @@ func (e *EventSubSocketClient) handleWelcomeMessage(msg eventSubSocketMessage) (
// connection after something broke) // connection after something broke)
if e.socketID != payload.Session.ID { if e.socketID != payload.Session.ID {
e.socketID = payload.Session.ID e.socketID = payload.Session.ID
if err := e.subscribe(); err != nil { if err := e.subscribeAll(); err != nil {
return socketInitialTimeout, errors.Wrap(err, "subscribing to topics") return socketInitialTimeout, errors.Wrap(err, "subscribing to topics")
} }
} }
@ -393,8 +415,25 @@ func (e *EventSubSocketClient) handleWelcomeMessage(msg eventSubSocketMessage) (
return time.Duration(float64(payload.Session.KeepaliveTimeoutSeconds)*socketTimeoutGraceMultiplier) * time.Second, nil return time.Duration(float64(payload.Session.KeepaliveTimeoutSeconds)*socketTimeoutGraceMultiplier) * time.Second, nil
} }
func (e *EventSubSocketClient) subscribe() error { func (e *EventSubSocketClient) retryBackgroundSubscribe(st eventSubSocketSubscriptionType) {
for _, st := range e.subscriptionTypes { err := backoff.NewBackoff().
WithMaxIterationTime(retrySubscribeMaxWait).
WithMaxTotalTime(retrySubscribeMaxTotal).
WithMinIterationTime(retrySubscribeMinWait).
Retry(func() error {
return e.subscribe(st)
})
if err != nil {
e.logger.
WithField("topic", strings.Join([]string{st.Event, st.Version}, "/")).
Error("gave up retrying to subscribe")
}
}
func (e *EventSubSocketClient) subscribe(st eventSubSocketSubscriptionType) error {
logger := e.logger.
WithField("topic", strings.Join([]string{st.Event, st.Version}, "/"))
if _, err := e.twitch.createEventSubSubscriptionWebsocket(context.Background(), eventSubSubscription{ if _, err := e.twitch.createEventSubSubscriptionWebsocket(context.Background(), eventSubSubscription{
Type: st.Event, Type: st.Event,
Version: st.Version, Version: st.Version,
@ -404,10 +443,28 @@ func (e *EventSubSocketClient) subscribe() error {
SessionID: e.socketID, SessionID: e.socketID,
}, },
}); err != nil { }); err != nil {
logger.WithError(err).Debug("subscribing to topic")
return errors.Wrapf(err, "subscribing to %s/%s", st.Event, st.Version) return errors.Wrapf(err, "subscribing to %s/%s", st.Event, st.Version)
} }
e.logger.WithField("topic", strings.Join([]string{st.Event, st.Version}, "/")).Debug("subscribed to topic") logger.
WithField("topic", strings.Join([]string{st.Event, st.Version}, "/")).
Debug("subscribed to topic")
return nil
}
func (e *EventSubSocketClient) subscribeAll() (err error) {
for i := range e.subscriptionTypes {
st := e.subscriptionTypes[i]
if st.BackgroundRetry {
go e.retryBackgroundSubscribe(st)
continue
}
if err = e.subscribe(st); err != nil {
return err
}
} }
return nil return nil

View file

@ -101,17 +101,22 @@ type (
// before in order to have the response body available in the returned // before in order to have the response body available in the returned
// HTTPError // HTTPError
func ValidateStatus(opts ClientRequestOpts, resp *http.Response) error { func ValidateStatus(opts ClientRequestOpts, resp *http.Response) error {
if resp.StatusCode == http.StatusTooManyRequests {
// Twitch doesn't want to hear any more of this
return backoff.NewErrCannotRetry(newHTTPError(resp.StatusCode, nil, nil))
}
if opts.OKStatus != 0 && resp.StatusCode != opts.OKStatus { if opts.OKStatus != 0 && resp.StatusCode != opts.OKStatus {
// We shall not accept this!
var ret error
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return newHTTPError(resp.StatusCode, nil, err) ret = newHTTPError(resp.StatusCode, nil, err)
} else {
ret = newHTTPError(resp.StatusCode, body, nil)
} }
return newHTTPError(resp.StatusCode, body, nil)
if resp.StatusCode == http.StatusTooManyRequests {
// Twitch doesn't want to hear any more of this
return backoff.NewErrCannotRetry(ret)
}
return ret
} }
return nil return nil

View file

@ -21,6 +21,7 @@ type (
AnyScope bool AnyScope bool
Hook func(json.RawMessage) error Hook func(json.RawMessage) error
Version string Version string
Optional bool
} }
twitchChannelState struct { twitchChannelState struct {
@ -118,6 +119,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
Condition: twitch.EventSubCondition{BroadcasterUserID: userID}, Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: []string{twitch.ScopeChannelReadAds}, RequiredScopes: []string{twitch.ScopeChannelReadAds},
Hook: t.handleEventSubChannelAdBreakBegin, Hook: t.handleEventSubChannelAdBreakBegin,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelFollow, Topic: twitch.EventSubEventTypeChannelFollow,
@ -125,6 +127,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
Condition: twitch.EventSubCondition{BroadcasterUserID: userID, ModeratorUserID: userID}, Condition: twitch.EventSubCondition{BroadcasterUserID: userID, ModeratorUserID: userID},
RequiredScopes: []string{twitch.ScopeModeratorReadFollowers}, RequiredScopes: []string{twitch.ScopeModeratorReadFollowers},
Hook: t.handleEventSubChannelFollow, Hook: t.handleEventSubChannelFollow,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelPointCustomRewardRedemptionAdd, Topic: twitch.EventSubEventTypeChannelPointCustomRewardRedemptionAdd,
@ -132,6 +135,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
RequiredScopes: []string{twitch.ScopeChannelReadRedemptions, twitch.ScopeChannelManageRedemptions}, RequiredScopes: []string{twitch.ScopeChannelReadRedemptions, twitch.ScopeChannelManageRedemptions},
AnyScope: true, AnyScope: true,
Hook: t.handleEventSubChannelPointCustomRewardRedemptionAdd, Hook: t.handleEventSubChannelPointCustomRewardRedemptionAdd,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelPollBegin, Topic: twitch.EventSubEventTypeChannelPollBegin,
@ -139,6 +143,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls}, RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls},
AnyScope: true, AnyScope: true,
Hook: t.handleEventSubChannelPollChange(eventTypePollBegin), Hook: t.handleEventSubChannelPollChange(eventTypePollBegin),
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelPollEnd, Topic: twitch.EventSubEventTypeChannelPollEnd,
@ -146,6 +151,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls}, RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls},
AnyScope: true, AnyScope: true,
Hook: t.handleEventSubChannelPollChange(eventTypePollEnd), Hook: t.handleEventSubChannelPollChange(eventTypePollEnd),
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelPollProgress, Topic: twitch.EventSubEventTypeChannelPollProgress,
@ -153,12 +159,14 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls}, RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls},
AnyScope: true, AnyScope: true,
Hook: t.handleEventSubChannelPollChange(eventTypePollProgress), Hook: t.handleEventSubChannelPollChange(eventTypePollProgress),
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelRaid, Topic: twitch.EventSubEventTypeChannelRaid,
Condition: twitch.EventSubCondition{FromBroadcasterUserID: userID}, Condition: twitch.EventSubCondition{FromBroadcasterUserID: userID},
RequiredScopes: nil, RequiredScopes: nil,
Hook: t.handleEventSubChannelOutboundRaid, Hook: t.handleEventSubChannelOutboundRaid,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelShoutoutCreate, Topic: twitch.EventSubEventTypeChannelShoutoutCreate,
@ -166,6 +174,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
RequiredScopes: []string{twitch.ScopeModeratorManageShoutouts, twitch.ScopeModeratorReadShoutouts}, RequiredScopes: []string{twitch.ScopeModeratorManageShoutouts, twitch.ScopeModeratorReadShoutouts},
AnyScope: true, AnyScope: true,
Hook: t.handleEventSubShoutoutCreated, Hook: t.handleEventSubShoutoutCreated,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelShoutoutReceive, Topic: twitch.EventSubEventTypeChannelShoutoutReceive,
@ -173,6 +182,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
RequiredScopes: []string{twitch.ScopeModeratorManageShoutouts, twitch.ScopeModeratorReadShoutouts}, RequiredScopes: []string{twitch.ScopeModeratorManageShoutouts, twitch.ScopeModeratorReadShoutouts},
AnyScope: true, AnyScope: true,
Hook: t.handleEventSubShoutoutReceived, Hook: t.handleEventSubShoutoutReceived,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelUpdate, Topic: twitch.EventSubEventTypeChannelUpdate,
@ -180,18 +190,21 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
Condition: twitch.EventSubCondition{BroadcasterUserID: userID}, Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: nil, RequiredScopes: nil,
Hook: t.handleEventSubChannelUpdate, Hook: t.handleEventSubChannelUpdate,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeStreamOffline, Topic: twitch.EventSubEventTypeStreamOffline,
Condition: twitch.EventSubCondition{BroadcasterUserID: userID}, Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: nil, RequiredScopes: nil,
Hook: t.handleEventSubStreamOnOff(false), Hook: t.handleEventSubStreamOnOff(false),
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeStreamOnline, Topic: twitch.EventSubEventTypeStreamOnline,
Condition: twitch.EventSubCondition{BroadcasterUserID: userID}, Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: nil, RequiredScopes: nil,
Hook: t.handleEventSubStreamOnOff(true), Hook: t.handleEventSubStreamOnOff(true),
Optional: true,
}, },
} }
} }
@ -477,7 +490,14 @@ func (t *twitchWatcher) registerEventSubCallbacks(channel string) (*twitch.Event
} }
} }
topicOpts = append(topicOpts, twitch.WithSubscription(tr.Topic, tr.Version, tr.Condition, tr.Hook)) var opt twitch.EventSubSocketClientOpt
if tr.Optional {
opt = twitch.WithRetryBackgroundSubscribe(tr.Topic, tr.Version, tr.Condition, tr.Hook)
} else {
opt = twitch.WithMustSubscribe(tr.Topic, tr.Version, tr.Condition, tr.Hook)
}
topicOpts = append(topicOpts, opt)
} }
esClient, err := twitch.NewEventSubSocketClient(append( esClient, err := twitch.NewEventSubSocketClient(append(