[core] Add EventSub subscription prefetching

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2021-12-24 21:39:38 +01:00
parent f70382876a
commit 2ba462fedc
Signed by: luzifer
GPG key ID: 0066F03ED215AD7D
2 changed files with 44 additions and 60 deletions

View file

@ -219,12 +219,16 @@ func main() {
log.WithError(err).Fatal("Unable to get or create eventsub secret")
}
twitchEventSubClient = twitch.NewEventSubClient(twitchClient, strings.Join([]string{
twitchEventSubClient, err = twitch.NewEventSubClient(twitchClient, strings.Join([]string{
strings.TrimRight(cfg.BaseURL, "/"),
"eventsub",
handle,
}, "/"), secret, handle)
if err != nil {
log.WithError(err).Fatal("Unable to create eventsub client")
}
router.HandleFunc("/eventsub/{keyhandle}", twitchEventSubClient.HandleEventsubPush).Methods(http.MethodPost)
}
}

View file

@ -148,8 +148,8 @@ func (e EventSubCondition) Hash() (string, error) {
return fmt.Sprintf("%x", h), nil
}
func NewEventSubClient(twitchClient *Client, apiURL, secret, secretHandle string) *EventSubClient {
return &EventSubClient{
func NewEventSubClient(twitchClient *Client, apiURL, secret, secretHandle string) (*EventSubClient, error) {
c := &EventSubClient{
apiURL: apiURL,
secret: secret,
secretHandle: secretHandle,
@ -158,6 +158,8 @@ func NewEventSubClient(twitchClient *Client, apiURL, secret, secretHandle string
subscriptions: map[string]*registeredSubscription{},
}
return c, c.PreFetchSubscriptions(context.Background())
}
func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Request) {
@ -236,7 +238,39 @@ func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Reque
}
}
//nolint:funlen // Not splitting to keep logic unit
func (e *EventSubClient) PreFetchSubscriptions(ctx context.Context) error {
e.subscriptionsLock.Lock()
defer e.subscriptionsLock.Unlock()
subList, err := e.twitchClient.getEventSubSubscriptions(ctx)
if err != nil {
return errors.Wrap(err, "listing existing subscriptions")
}
for i := range subList {
sub := subList[i]
if !str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}) || sub.Transport.Callback != e.apiURL {
// Not our callback or not active
continue
}
condHash, err := sub.Condition.Hash()
if err != nil {
return errors.Wrap(err, "hashing condition")
}
cacheKey := strings.Join([]string{sub.Type, condHash}, "::")
e.subscriptions[cacheKey] = &registeredSubscription{
Type: sub.Type,
Callbacks: map[string]func(json.RawMessage) error{},
Subscription: sub,
}
}
return nil
}
func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubCondition, callback func(json.RawMessage) error) (func(), error) {
condHash, err := condition.Hash()
if err != nil {
@ -257,7 +291,7 @@ func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubC
e.subscriptionsLock.Lock()
defer e.subscriptionsLock.Unlock()
logger.Debug("Adding callback to existing callback")
logger.Debug("Adding callback to known subscription")
cbKey := uuid.Must(uuid.NewV4()).String()
@ -265,62 +299,8 @@ func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubC
return func() { e.unregisterCallback(cacheKey, cbKey) }, nil
}
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
defer cancel()
// List existing subscriptions
subList, err := e.twitchClient.getEventSubSubscriptions(ctx)
if err != nil {
return nil, errors.Wrap(err, "listing existing subscriptions")
}
// Register subscriptions
var (
existingSub *eventSubSubscription
)
for i, sub := range subList {
existingConditionHash, err := sub.Condition.Hash()
if err != nil {
return nil, errors.Wrap(err, "hashing existing condition")
}
newConditionHash, err := condition.Hash()
if err != nil {
return nil, errors.Wrap(err, "hashing new condition")
}
if str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}) &&
sub.Transport.Callback == e.apiURL &&
existingConditionHash == newConditionHash &&
sub.Type == event {
logger = logger.WithFields(log.Fields{
"id": sub.ID,
"status": sub.Status,
})
existingSub = &subList[i]
}
}
if existingSub != nil {
logger.WithField("event", event).Debug("Not registering hook, already active")
e.subscriptionsLock.Lock()
defer e.subscriptionsLock.Unlock()
logger.Debug("Found existing hook, registering and adding callback")
cbKey := uuid.Must(uuid.NewV4()).String()
e.subscriptions[cacheKey] = &registeredSubscription{
Type: event,
Callbacks: map[string]func(json.RawMessage) error{
cbKey: callback,
},
Subscription: *existingSub,
}
return func() { e.unregisterCallback(cacheKey, cbKey) }, nil
}
ctx, cancel = context.WithTimeout(context.Background(), twitchRequestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
defer cancel()
newSub, err := e.twitchClient.createEventSubSubscription(ctx, eventSubSubscription{