From 2ba462fedcee26c99f75f66bc3c00c1e4e834b19 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Fri, 24 Dec 2021 21:39:38 +0100 Subject: [PATCH] [core] Add EventSub subscription prefetching Signed-off-by: Knut Ahlers --- main.go | 6 ++- twitch/eventsub.go | 98 ++++++++++++++++++---------------------------- 2 files changed, 44 insertions(+), 60 deletions(-) diff --git a/main.go b/main.go index aa3ea2d..ee66334 100644 --- a/main.go +++ b/main.go @@ -219,12 +219,16 @@ func main() { log.WithError(err).Fatal("Unable to get or create eventsub secret") } - twitchEventSubClient = twitch.NewEventSubClient(twitchClient, strings.Join([]string{ + twitchEventSubClient, err = twitch.NewEventSubClient(twitchClient, strings.Join([]string{ strings.TrimRight(cfg.BaseURL, "/"), "eventsub", handle, }, "/"), secret, handle) + if err != nil { + log.WithError(err).Fatal("Unable to create eventsub client") + } + router.HandleFunc("/eventsub/{keyhandle}", twitchEventSubClient.HandleEventsubPush).Methods(http.MethodPost) } } diff --git a/twitch/eventsub.go b/twitch/eventsub.go index 50aa309..3768412 100644 --- a/twitch/eventsub.go +++ b/twitch/eventsub.go @@ -148,8 +148,8 @@ func (e EventSubCondition) Hash() (string, error) { return fmt.Sprintf("%x", h), nil } -func NewEventSubClient(twitchClient *Client, apiURL, secret, secretHandle string) *EventSubClient { - return &EventSubClient{ +func NewEventSubClient(twitchClient *Client, apiURL, secret, secretHandle string) (*EventSubClient, error) { + c := &EventSubClient{ apiURL: apiURL, secret: secret, secretHandle: secretHandle, @@ -158,6 +158,8 @@ func NewEventSubClient(twitchClient *Client, apiURL, secret, secretHandle string subscriptions: map[string]*registeredSubscription{}, } + + return c, c.PreFetchSubscriptions(context.Background()) } func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Request) { @@ -236,7 +238,39 @@ func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Reque } } -//nolint:funlen // Not splitting to keep logic unit +func (e *EventSubClient) PreFetchSubscriptions(ctx context.Context) error { + e.subscriptionsLock.Lock() + defer e.subscriptionsLock.Unlock() + + subList, err := e.twitchClient.getEventSubSubscriptions(ctx) + if err != nil { + return errors.Wrap(err, "listing existing subscriptions") + } + + for i := range subList { + sub := subList[i] + + if !str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}) || sub.Transport.Callback != e.apiURL { + // Not our callback or not active + continue + } + + condHash, err := sub.Condition.Hash() + if err != nil { + return errors.Wrap(err, "hashing condition") + } + + cacheKey := strings.Join([]string{sub.Type, condHash}, "::") + e.subscriptions[cacheKey] = ®isteredSubscription{ + Type: sub.Type, + Callbacks: map[string]func(json.RawMessage) error{}, + Subscription: sub, + } + } + + return nil +} + func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubCondition, callback func(json.RawMessage) error) (func(), error) { condHash, err := condition.Hash() if err != nil { @@ -257,7 +291,7 @@ func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubC e.subscriptionsLock.Lock() defer e.subscriptionsLock.Unlock() - logger.Debug("Adding callback to existing callback") + logger.Debug("Adding callback to known subscription") cbKey := uuid.Must(uuid.NewV4()).String() @@ -265,62 +299,8 @@ func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubC return func() { e.unregisterCallback(cacheKey, cbKey) }, nil } - ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) - defer cancel() - - // List existing subscriptions - subList, err := e.twitchClient.getEventSubSubscriptions(ctx) - if err != nil { - return nil, errors.Wrap(err, "listing existing subscriptions") - } - // Register subscriptions - var ( - existingSub *eventSubSubscription - ) - for i, sub := range subList { - existingConditionHash, err := sub.Condition.Hash() - if err != nil { - return nil, errors.Wrap(err, "hashing existing condition") - } - newConditionHash, err := condition.Hash() - if err != nil { - return nil, errors.Wrap(err, "hashing new condition") - } - - if str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}) && - sub.Transport.Callback == e.apiURL && - existingConditionHash == newConditionHash && - sub.Type == event { - logger = logger.WithFields(log.Fields{ - "id": sub.ID, - "status": sub.Status, - }) - existingSub = &subList[i] - } - } - - if existingSub != nil { - logger.WithField("event", event).Debug("Not registering hook, already active") - - e.subscriptionsLock.Lock() - defer e.subscriptionsLock.Unlock() - - logger.Debug("Found existing hook, registering and adding callback") - - cbKey := uuid.Must(uuid.NewV4()).String() - e.subscriptions[cacheKey] = ®isteredSubscription{ - Type: event, - Callbacks: map[string]func(json.RawMessage) error{ - cbKey: callback, - }, - Subscription: *existingSub, - } - - return func() { e.unregisterCallback(cacheKey, cbKey) }, nil - } - - ctx, cancel = context.WithTimeout(context.Background(), twitchRequestTimeout) + ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) defer cancel() newSub, err := e.twitchClient.createEventSubSubscription(ctx, eventSubSubscription{