package twitch import ( "bytes" "context" "crypto/hmac" "crypto/sha256" "encoding/json" "fmt" "io" "io/ioutil" "net/http" "net/url" "strings" "sync" "time" "github.com/gofrs/uuid/v3" "github.com/gorilla/mux" "github.com/mitchellh/hashstructure/v2" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/Luzifer/go_helpers/v2/str" ) const ( eventSubHeaderMessageID = "Twitch-Eventsub-Message-Id" eventSubHeaderMessageType = "Twitch-Eventsub-Message-Type" eventSubHeaderMessageSignature = "Twitch-Eventsub-Message-Signature" eventSubHeaderMessageTimestamp = "Twitch-Eventsub-Message-Timestamp" // eventSubHeaderMessageRetry = "Twitch-Eventsub-Message-Retry" // eventSubHeaderSubscriptionType = "Twitch-Eventsub-Subscription-Type" // eventSubHeaderSubscriptionVersion = "Twitch-Eventsub-Subscription-Version" eventSubMessageTypeVerification = "webhook_callback_verification" eventSubMessageTypeRevokation = "revocation" // eventSubMessageTypeNotification = "notification" eventSubStatusEnabled = "enabled" eventSubStatusVerificationPending = "webhook_callback_verification_pending" // eventSubStatusAuthorizationRevoked = "authorization_revoked" // eventSubStatusFailuresExceeded = "notification_failures_exceeded" // eventSubStatusUserRemoved = "user_removed" // eventSubStatusVerificationFailed = "webhook_callback_verification_failed" EventSubEventTypeChannelUpdate = "channel.update" EventSubEventTypeStreamOffline = "stream.offline" EventSubEventTypeStreamOnline = "stream.online" ) type ( EventSubClient struct { apiURL string secret string secretHandle string twitchClientID string twitchClientSecret string twitchAccessToken string subscriptions map[string]*registeredSubscription subscriptionsLock sync.RWMutex } EventSubCondition struct { BroadcasterUserID string `json:"broadcaster_user_id,omitempty"` CampaignID string `json:"campaign_id,omitempty"` CategoryID string `json:"category_id,omitempty"` ClientID string `json:"client_id,omitempty"` ExtensionClientID string `json:"extension_client_id,omitempty"` FromBroadcasterUserID string `json:"from_broadcaster_user_id,omitempty"` OrganizationID string `json:"organization_id,omitempty"` RewardID string `json:"reward_id,omitempty"` ToBroadcasterUserID string `json:"to_broadcaster_user_id,omitempty"` UserID string `json:"user_id,omitempty"` } EventSubEventChannelUpdate struct { BroadcasterUserID string `json:"broadcaster_user_id"` BroadcasterUserLogin string `json:"broadcaster_user_login"` BroadcasterUserName string `json:"broadcaster_user_name"` Title string `json:"title"` Language string `json:"language"` CategoryID string `json:"category_id"` CategoryName string `json:"category_name"` IsMature bool `json:"is_mature"` } EventSubEventFollow struct { UserID string `json:"user_id"` UserLogin string `json:"user_login"` UserName string `json:"user_name"` BroadcasterUserID string `json:"broadcaster_user_id"` BroadcasterUserLogin string `json:"broadcaster_user_login"` BroadcasterUserName string `json:"broadcaster_user_name"` FollowedAt time.Time `json:"followed_at"` } EventSubEventStreamOffline struct { BroadcasterUserID string `json:"broadcaster_user_id"` BroadcasterUserLogin string `json:"broadcaster_user_login"` BroadcasterUserName string `json:"broadcaster_user_name"` } EventSubEventStreamOnline struct { ID string `json:"id"` BroadcasterUserID string `json:"broadcaster_user_id"` BroadcasterUserLogin string `json:"broadcaster_user_login"` BroadcasterUserName string `json:"broadcaster_user_name"` Type string `json:"type"` StartedAt time.Time `json:"started_at"` } eventSubPostMessage struct { Challenge string `json:"challenge"` Subscription eventSubSubscription `json:"subscription"` Event json.RawMessage `json:"event"` } eventSubSubscription struct { ID string `json:"id,omitempty"` // READONLY Status string `json:"status,omitempty"` // READONLY Type string `json:"type"` Version string `json:"version"` Cost int64 `json:"cost,omitempty"` // READONLY Condition EventSubCondition `json:"condition"` Transport eventSubTransport `json:"transport"` CreatedAt time.Time `json:"created_at,omitempty"` // READONLY } eventSubTransport struct { Method string `json:"method"` Callback string `json:"callback"` Secret string `json:"secret"` } registeredSubscription struct { Type string Callbacks map[string]func(json.RawMessage) error Subscription eventSubSubscription } ) func (e EventSubCondition) Hash() (string, error) { h, err := hashstructure.Hash(e, hashstructure.FormatV2, &hashstructure.HashOptions{TagName: "json"}) if err != nil { return "", errors.Wrap(err, "hashing struct") } return fmt.Sprintf("%x", h), nil } func NewEventSubClient(apiURL, secret, secretHandle string) *EventSubClient { return &EventSubClient{ apiURL: apiURL, secret: secret, secretHandle: secretHandle, subscriptions: map[string]*registeredSubscription{}, } } func (e *EventSubClient) Authorize(clientID, clientSecret string) error { e.twitchClientID = clientID e.twitchClientSecret = clientSecret _, err := e.getTwitchAppAccessToken() return errors.Wrap(err, "fetching app access token") } func (e *EventSubClient) getTwitchAppAccessToken() (string, error) { if e.twitchAccessToken != "" { return e.twitchAccessToken, nil } var rData struct { AccessToken string `json:"access_token"` RefreshToken string `json:"refresh_token"` ExpiresIn int `json:"expires_in"` Scope []interface{} `json:"scope"` TokenType string `json:"token_type"` } params := make(url.Values) params.Set("client_id", e.twitchClientID) params.Set("client_secret", e.twitchClientSecret) params.Set("grant_type", "client_credentials") u, _ := url.Parse("https://id.twitch.tv/oauth2/token") u.RawQuery = params.Encode() ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) defer cancel() req, _ := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil) resp, err := http.DefaultClient.Do(req) if err != nil { return "", errors.Wrap(err, "fetching response") } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, err := ioutil.ReadAll(resp.Body) if err != nil { return "", errors.Wrapf(err, "unexpected status %d and cannot read body", resp.StatusCode) } return "", errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) } e.twitchAccessToken = rData.AccessToken return rData.AccessToken, errors.Wrap( json.NewDecoder(resp.Body).Decode(&rData), "decoding response", ) } func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Request) { var ( body = new(bytes.Buffer) keyHandle = mux.Vars(r)["keyhandle"] message eventSubPostMessage signature = r.Header.Get(eventSubHeaderMessageSignature) ) if keyHandle != e.secretHandle { http.Error(w, "deprecated callback used", http.StatusNotFound) return } // Copy body for duplicate processing if _, err := io.Copy(body, r.Body); err != nil { log.WithError(err).Error("Unable to read hook body") return } // Verify signature mac := hmac.New(sha256.New, []byte(e.secret)) fmt.Fprintf(mac, "%s%s%s", r.Header.Get(eventSubHeaderMessageID), r.Header.Get(eventSubHeaderMessageTimestamp), body.Bytes()) if cSig := fmt.Sprintf("sha256=%x", mac.Sum(nil)); cSig != signature { log.Errorf("Got message signature %s, expected %s", signature, cSig) http.Error(w, "Signature verification failed", http.StatusUnauthorized) return } // Read message if err := json.NewDecoder(body).Decode(&message); err != nil { log.WithError(err).Errorf("Unable to decode eventsub message") http.Error(w, errors.Wrap(err, "parsing message").Error(), http.StatusBadRequest) return } logger := log.WithField("type", message.Subscription.Type) // If we got a verification request, respond with the challenge switch r.Header.Get(eventSubHeaderMessageType) { case eventSubMessageTypeRevokation: w.WriteHeader(http.StatusNoContent) return case eventSubMessageTypeVerification: logger.Debug("Confirming eventsub subscription") w.Write([]byte(message.Challenge)) return } logger.Debug("Received notification") condHash, err := message.Subscription.Condition.Hash() if err != nil { logger.WithError(err).Errorf("Unable to hash condition of push") http.Error(w, errors.Wrap(err, "hashing condition").Error(), http.StatusBadRequest) return } e.subscriptionsLock.RLock() defer e.subscriptionsLock.RUnlock() cacheKey := strings.Join([]string{message.Subscription.Type, condHash}, "::") reg, ok := e.subscriptions[cacheKey] if !ok { http.Error(w, "no subscription available", http.StatusBadRequest) return } for _, cb := range reg.Callbacks { if err = cb(message.Event); err != nil { logger.WithError(err).Error("Handler callback caused error") } } } //nolint:funlen,gocyclo // Not splitting to keep logic unit func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubCondition, callback func(json.RawMessage) error) (func(), error) { condHash, err := condition.Hash() if err != nil { return nil, errors.Wrap(err, "hashing condition") } var ( cacheKey = strings.Join([]string{event, condHash}, "::") logger = log.WithField("event", event) ) e.subscriptionsLock.RLock() _, ok := e.subscriptions[cacheKey] e.subscriptionsLock.RUnlock() if ok { // Subscription already exists e.subscriptionsLock.Lock() defer e.subscriptionsLock.Unlock() logger.Debug("Adding callback to existing callback") cbKey := uuid.Must(uuid.NewV4()).String() e.subscriptions[cacheKey].Callbacks[cbKey] = callback return func() { e.unregisterCallback(cacheKey, cbKey) }, nil } accessToken, err := e.getTwitchAppAccessToken() if err != nil { return nil, errors.Wrap(err, "getting app-access-token") } ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) defer cancel() // List existing subscriptions req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https://api.twitch.tv/helix/eventsub/subscriptions", nil) req.Header.Set("Content-Type", "application/json") req.Header.Set("Client-Id", e.twitchClientID) req.Header.Set("Authorization", "Bearer "+accessToken) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, errors.Wrap(err, "requesting subscribscriptions") } defer resp.Body.Close() var subscriptionList struct { Data []eventSubSubscription } if err = json.NewDecoder(resp.Body).Decode(&subscriptionList); err != nil { return nil, errors.Wrap(err, "decoding subscription list") } // Register subscriptions var ( existingSub *eventSubSubscription ) for i, sub := range subscriptionList.Data { existingConditionHash, err := sub.Condition.Hash() if err != nil { return nil, errors.Wrap(err, "hashing existing condition") } newConditionHash, err := condition.Hash() if err != nil { return nil, errors.Wrap(err, "hashing new condition") } if str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}) && sub.Transport.Callback == e.apiURL && existingConditionHash == newConditionHash && sub.Type == event { logger = logger.WithFields(log.Fields{ "id": sub.ID, "status": sub.Status, }) existingSub = &subscriptionList.Data[i] } } if existingSub != nil { logger.WithField("event", event).Debug("Not registering hook, already active") e.subscriptionsLock.Lock() defer e.subscriptionsLock.Unlock() logger.Debug("Found existing hook, registering and adding callback") cbKey := uuid.Must(uuid.NewV4()).String() e.subscriptions[cacheKey] = ®isteredSubscription{ Type: event, Callbacks: map[string]func(json.RawMessage) error{ cbKey: callback, }, Subscription: *existingSub, } return func() { e.unregisterCallback(cacheKey, cbKey) }, nil } payload := eventSubSubscription{ Type: event, Version: "1", Condition: condition, Transport: eventSubTransport{ Method: "webhook", Callback: e.apiURL, Secret: e.secret, }, } buf := new(bytes.Buffer) if err := json.NewEncoder(buf).Encode(payload); err != nil { return nil, errors.Wrap(err, "assemble subscribe payload") } ctx, cancel = context.WithTimeout(context.Background(), twitchRequestTimeout) defer cancel() req, err = http.NewRequestWithContext(ctx, http.MethodPost, "https://api.twitch.tv/helix/eventsub/subscriptions", buf) if err != nil { return nil, errors.Wrap(err, "creating subscribe request") } req.Header.Set("Content-Type", "application/json") req.Header.Set("Client-Id", e.twitchClientID) req.Header.Set("Authorization", "Bearer "+accessToken) resp, err = http.DefaultClient.Do(req) if err != nil { return nil, errors.Wrap(err, "requesting subscribe") } defer resp.Body.Close() if resp.StatusCode != http.StatusAccepted { body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode) } return nil, errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) } var response struct { Data []eventSubSubscription `json:"data"` } if err = json.NewDecoder(resp.Body).Decode(&response); err != nil { return nil, errors.Wrap(err, "reading eventsub sub response") } e.subscriptionsLock.Lock() defer e.subscriptionsLock.Unlock() logger.Debug("Registered new hook") cbKey := uuid.Must(uuid.NewV4()).String() e.subscriptions[cacheKey] = ®isteredSubscription{ Type: event, Callbacks: map[string]func(json.RawMessage) error{ cbKey: callback, }, Subscription: response.Data[0], } logger.Debug("Registered eventsub subscription") return func() { e.unregisterCallback(cacheKey, cbKey) }, nil } func (e *EventSubClient) unregisterCallback(cacheKey, cbKey string) { e.subscriptionsLock.RLock() regSub, ok := e.subscriptions[cacheKey] e.subscriptionsLock.RUnlock() if !ok { // That subscription does not exist log.WithField("cache_key", cacheKey).Debug("Subscription does not exist, not unregistering") return } if _, ok = regSub.Callbacks[cbKey]; !ok { // That callback does not exist log.WithFields(log.Fields{ "cache_key": cacheKey, "callback": cbKey, }).Debug("Callback does not exist, not unregistering") return } logger := log.WithField("event", regSub.Type) delete(regSub.Callbacks, cbKey) if len(regSub.Callbacks) > 0 { // Still callbacks registered, not removing the subscription return } accessToken, err := e.getTwitchAppAccessToken() if err != nil { log.WithError(err).Error("Unable to get access token") return } ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodDelete, fmt.Sprintf("https://api.twitch.tv/helix/eventsub/subscriptions?id=%s", regSub.Subscription.ID), nil) if err != nil { log.WithError(err).Error("Unable to create delete subscription request") return } req.Header.Set("Content-Type", "application/json") req.Header.Set("Client-Id", e.twitchClientID) req.Header.Set("Authorization", "Bearer "+accessToken) resp, err := http.DefaultClient.Do(req) if err != nil { log.WithError(err).Error("Unable to execute delete subscription request") return } defer resp.Body.Close() e.subscriptionsLock.Lock() defer e.subscriptionsLock.Unlock() logger.Debug("Unregistered hook") delete(e.subscriptions, cacheKey) }