diff --git a/README.md b/README.md index 49f0c77..b19c7ce 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Please see the [Wiki](https://github.com/Luzifer/twitch-bot/wiki) for documentat ```console # twitch-bot --help Usage of twitch-bot: - --base-url string External URL of the config-editor interface (set to enable EventSub support) + --base-url string External URL of the config-editor interface (used to generate auth-urls) --command-timeout duration Timeout for command execution (default 30s) -c, --config string Location of configuration file (default "./config.yaml") --log-level string Log level (debug, info, warn, error, fatal) (default "info") diff --git a/main.go b/main.go index c685a08..2e9e056 100644 --- a/main.go +++ b/main.go @@ -1,15 +1,10 @@ package main import ( - "context" - "crypto/rand" - "encoding/hex" "fmt" - "io" "math" "net" "net/http" - "net/url" "os" "strings" "sync" @@ -23,7 +18,6 @@ import ( "github.com/robfig/cron/v3" log "github.com/sirupsen/logrus" - "github.com/Luzifer/go_helpers/v2/backoff" "github.com/Luzifer/go_helpers/v2/str" "github.com/Luzifer/rconfig/v2" "github.com/Luzifer/twitch-bot/v3/internal/service/access" @@ -40,14 +34,11 @@ const ( maxIRCRetryBackoff = time.Minute httpReadHeaderTimeout = 5 * time.Second - - coreMetaKeyEventSubSecret = "event_sub_secret" - eventSubSecretLength = 32 ) var ( cfg = struct { - BaseURL string `flag:"base-url" default:"" description:"External URL of the config-editor interface (set to enable EventSub support)"` + BaseURL string `flag:"base-url" default:"" description:"External URL of the config-editor interface (used to generate auth-urls)"` CommandTimeout time.Duration `flag:"command-timeout" default:"30s" description:"Timeout for command execution"` Config string `flag:"config,c" default:"./config.yaml" description:"Location of configuration file"` IRCRateLimit time.Duration `flag:"rate-limit" default:"1500ms" description:"How often to send a message (default: 20/30s=1500ms, if your bot is mod everywhere: 100/30s=300ms, different for known/verified bots)"` @@ -73,15 +64,13 @@ var ( ircHdl *ircHandler router = mux.NewRouter() - runID = uuid.Must(uuid.NewV4()).String() - externalHTTPAvailable bool + runID = uuid.Must(uuid.NewV4()).String() db database.Connector accessService *access.Service timerService *timer.Service - twitchClient *twitch.Client - twitchEventSubClient *twitch.EventSubClient + twitchClient *twitch.Client version = "dev" ) @@ -126,35 +115,6 @@ func initApp() error { return nil } -func getEventSubSecret() (secret, handle string, err error) { - var eventSubSecret string - - err = db.ReadEncryptedCoreMeta(coreMetaKeyEventSubSecret, &eventSubSecret) - switch { - case errors.Is(err, nil): - return eventSubSecret, eventSubSecret[:5], nil - - case errors.Is(err, database.ErrCoreMetaNotFound): - // We need to generate a new secret below - - default: - return "", "", errors.Wrap(err, "reading secret from database") - } - - key := make([]byte, eventSubSecretLength) - n, err := rand.Read(key) - if err != nil { - return "", "", errors.Wrap(err, "generating random secret") - } - if n != eventSubSecretLength { - return "", "", errors.Errorf("read only %d of %d byte", n, eventSubSecretLength) - } - - eventSubSecret = hex.EncodeToString(key) - - return eventSubSecret, eventSubSecret[:5], errors.Wrap(db.StoreEncryptedCoreMeta(coreMetaKeyEventSubSecret, eventSubSecret), "storing secret to database") -} - //nolint:funlen,gocognit,gocyclo // Complexity is a little too high but makes no sense to split func main() { var err error @@ -287,30 +247,6 @@ func main() { go server.Serve(listener) log.WithField("address", listener.Addr().String()).Info("HTTP server started") - - checkExternalHTTP() - - if externalHTTPAvailable && cfg.TwitchClient != "" && cfg.TwitchClientSecret != "" { - secret, handle, err := getEventSubSecret() - if err != nil { - log.WithError(err).Fatal("Unable to get or create eventsub secret") - } - - twitchEventSubClient, err = twitch.NewEventSubClient(twitchClient, strings.Join([]string{ - strings.TrimRight(cfg.BaseURL, "/"), - "eventsub", - }, "/"), secret, handle) - - if err != nil { - log.WithError(err).Fatal("Unable to create eventsub client") - } - - if err := twitchWatch.registerGlobalHooks(); err != nil { - log.WithError(err).Fatal("Unable to register global eventsub hooks") - } - - router.HandleFunc("/eventsub/{keyhandle}", twitchEventSubClient.HandleEventsubPush).Methods(http.MethodPost) - } } for _, c := range config.Channels { @@ -404,54 +340,6 @@ func main() { } } -func checkExternalHTTP() { - base, err := url.Parse(cfg.BaseURL) - if err != nil { - log.WithError(err).Error("Unable to parse BaseURL") - return - } - - if base.String() == "" { - log.Debug("No BaseURL set, disabling EventSub support") - return - } - - base.Path = strings.Join([]string{ - strings.TrimRight(base.Path, "/"), - "selfcheck", - }, "/") - - var data []byte - if err = backoff.NewBackoff().WithMaxTotalTime(cfg.WaitForSelfcheck).Retry(func() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - req, _ := http.NewRequestWithContext(ctx, http.MethodGet, base.String(), nil) - resp, err := http.DefaultClient.Do(req) - if err != nil { - return errors.Wrap(err, "requesting self-check URL") - } - defer resp.Body.Close() - - data, err = io.ReadAll(resp.Body) - if err != nil { - return errors.Wrap(err, "reading self-check response") - } - - if strings.TrimSpace(string(data)) != runID { - return errors.New("found unexpected run-id") - } - - return nil - }); err != nil { - log.WithError(err).Error("executing self-check") - return - } - - externalHTTPAvailable = true - log.Debug("Self-Check successful, EventSub support is available") -} - func startCheck() error { var errs []string diff --git a/pkg/twitch/eventsub.go b/pkg/twitch/eventsub.go index bed568f..544c062 100644 --- a/pkg/twitch/eventsub.go +++ b/pkg/twitch/eventsub.go @@ -3,46 +3,19 @@ package twitch import ( "bytes" "context" - "crypto/hmac" - "crypto/sha256" "encoding/json" "fmt" - "io" "net/http" "net/url" "strings" - "sync" "time" - "github.com/gofrs/uuid/v3" - "github.com/gorilla/mux" "github.com/mitchellh/hashstructure/v2" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - - "github.com/Luzifer/go_helpers/v2/str" ) const ( - eventSubHeaderMessageID = "Twitch-Eventsub-Message-Id" - eventSubHeaderMessageType = "Twitch-Eventsub-Message-Type" - eventSubHeaderMessageSignature = "Twitch-Eventsub-Message-Signature" - eventSubHeaderMessageTimestamp = "Twitch-Eventsub-Message-Timestamp" - // eventSubHeaderMessageRetry = "Twitch-Eventsub-Message-Retry" - // eventSubHeaderSubscriptionType = "Twitch-Eventsub-Subscription-Type" - // eventSubHeaderSubscriptionVersion = "Twitch-Eventsub-Subscription-Version" - - eventSubMessageTypeVerification = "webhook_callback_verification" - eventSubMessageTypeRevokation = "revocation" - // eventSubMessageTypeNotification = "notification" - - eventSubStatusEnabled = "enabled" - eventSubStatusVerificationPending = "webhook_callback_verification_pending" - // eventSubStatusAuthorizationRevoked = "authorization_revoked" - // eventSubStatusFailuresExceeded = "notification_failures_exceeded" - // eventSubStatusUserRemoved = "user_removed" - // eventSubStatusVerificationFailed = "webhook_callback_verification_failed" - EventSubEventTypeChannelFollow = "channel.follow" EventSubEventTypeChannelPointCustomRewardRedemptionAdd = "channel.channel_points_custom_reward_redemption.add" EventSubEventTypeChannelRaid = "channel.raid" @@ -59,17 +32,6 @@ const ( ) type ( - EventSubClient struct { - apiURL string - secret string - secretHandle string - - twitchClient *Client - - subscriptions map[string]*registeredSubscription - subscriptionsLock sync.RWMutex - } - EventSubCondition struct { BroadcasterUserID string `json:"broadcaster_user_id,omitempty"` CampaignID string `json:"campaign_id,omitempty"` @@ -201,9 +163,10 @@ type ( } eventSubTransport struct { - Method string `json:"method"` - Callback string `json:"callback"` - Secret string `json:"secret"` + Method string `json:"method"` + Callback string `json:"callback"` + Secret string `json:"secret"` + SessionID string `json:"session_id"` } registeredSubscription struct { @@ -222,234 +185,15 @@ func (e EventSubCondition) Hash() (string, error) { return fmt.Sprintf("%x", h), nil } -func NewEventSubClient(twitchClient *Client, apiURL, secret, secretHandle string) (*EventSubClient, error) { - c := &EventSubClient{ - apiURL: apiURL, - secret: secret, - secretHandle: secretHandle, - - twitchClient: twitchClient, - - subscriptions: map[string]*registeredSubscription{}, - } - - return c, c.PreFetchSubscriptions(context.Background()) +func (c *Client) createEventSubSubscriptionWebhook(ctx context.Context, sub eventSubSubscription) (*eventSubSubscription, error) { + return c.createEventSubSubscription(ctx, authTypeAppAccessToken, sub) } -func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Request) { - var ( - body = new(bytes.Buffer) - keyHandle = mux.Vars(r)["keyhandle"] - message eventSubPostMessage - signature = r.Header.Get(eventSubHeaderMessageSignature) - ) - - if keyHandle != e.secretHandle { - http.Error(w, "deprecated callback used", http.StatusNotFound) - return - } - - // Copy body for duplicate processing - if _, err := io.Copy(body, r.Body); err != nil { - log.WithError(err).Error("Unable to read hook body") - return - } - - // Verify signature - mac := hmac.New(sha256.New, []byte(e.secret)) - fmt.Fprintf(mac, "%s%s%s", r.Header.Get(eventSubHeaderMessageID), r.Header.Get(eventSubHeaderMessageTimestamp), body.Bytes()) - if cSig := fmt.Sprintf("sha256=%x", mac.Sum(nil)); cSig != signature { - log.Errorf("Got message signature %s, expected %s", signature, cSig) - http.Error(w, "Signature verification failed", http.StatusUnauthorized) - return - } - - // Read message - if err := json.NewDecoder(body).Decode(&message); err != nil { - log.WithError(err).Errorf("Unable to decode eventsub message") - http.Error(w, errors.Wrap(err, "parsing message").Error(), http.StatusBadRequest) - return - } - - logger := log.WithField("type", message.Subscription.Type) - - // If we got a verification request, respond with the challenge - switch r.Header.Get(eventSubHeaderMessageType) { - case eventSubMessageTypeRevokation: - w.WriteHeader(http.StatusNoContent) - return - - case eventSubMessageTypeVerification: - logger.Debug("Confirming eventsub subscription") - w.Write([]byte(message.Challenge)) - return - } - - logger.Debug("Received notification") - - condHash, err := message.Subscription.Condition.Hash() - if err != nil { - logger.WithError(err).Errorf("Unable to hash condition of push") - http.Error(w, errors.Wrap(err, "hashing condition").Error(), http.StatusBadRequest) - return - } - - e.subscriptionsLock.RLock() - defer e.subscriptionsLock.RUnlock() - - cacheKey := strings.Join([]string{message.Subscription.Type, message.Subscription.Version, condHash}, "::") - - reg, ok := e.subscriptions[cacheKey] - if !ok { - http.Error(w, "no subscription available", http.StatusBadRequest) - return - } - - for _, cb := range reg.Callbacks { - if err = cb(message.Event); err != nil { - logger.WithError(err).Error("Handler callback caused error") - } - } +func (c *Client) createEventSubSubscriptionWebsocket(ctx context.Context, sub eventSubSubscription) (*eventSubSubscription, error) { + return c.createEventSubSubscription(ctx, authTypeBearerToken, sub) } -func (e *EventSubClient) PreFetchSubscriptions(ctx context.Context) error { - e.subscriptionsLock.Lock() - defer e.subscriptionsLock.Unlock() - - subList, err := e.twitchClient.getEventSubSubscriptions(ctx) - if err != nil { - return errors.Wrap(err, "listing existing subscriptions") - } - - for i := range subList { - sub := subList[i] - - switch { - case !str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}): - // Is not an active hook, we don't need to care: It will be - // confirmed later or will expire but should not be counted - continue - - case strings.HasPrefix(sub.Transport.Callback, e.apiURL) && sub.Transport.Callback != e.fullAPIurl(): - // Uses the same API URL but with another secret handle: Must - // have been registered by another instance with another secret - // so we should be able to deregister it without causing any - // trouble - logger := log.WithFields(log.Fields{ - "id": sub.ID, - "topic": sub.Type, - "version": sub.Version, - }) - logger.Debug("Removing deprecated EventSub subscription") - if err = e.twitchClient.deleteEventSubSubscription(ctx, sub.ID); err != nil { - logger.WithError(err).Error("Unable to deregister deprecated EventSub subscription") - } - continue - - case sub.Transport.Callback != e.fullAPIurl(): - // Different callback URL: We don't care, it's probably another - // bot instance with the same client ID - continue - } - - condHash, err := sub.Condition.Hash() - if err != nil { - return errors.Wrap(err, "hashing condition") - } - - log.WithFields(log.Fields{ - "condition": sub.Condition, - "type": sub.Type, - "version": sub.Version, - }).Debug("found existing eventsub subscription") - - cacheKey := strings.Join([]string{sub.Type, sub.Version, condHash}, "::") - e.subscriptions[cacheKey] = ®isteredSubscription{ - Type: sub.Type, - Callbacks: map[string]func(json.RawMessage) error{}, - Subscription: sub, - } - } - - return nil -} - -func (e *EventSubClient) RegisterEventSubHooks(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) (func(), error) { - if version == "" { - version = EventSubTopicVersion1 - } - - condHash, err := condition.Hash() - if err != nil { - return nil, errors.Wrap(err, "hashing condition") - } - - var ( - cacheKey = strings.Join([]string{event, version, condHash}, "::") - logger = log.WithFields(log.Fields{ - "condition": condition, - "type": event, - "version": version, - }) - ) - - e.subscriptionsLock.RLock() - _, ok := e.subscriptions[cacheKey] - e.subscriptionsLock.RUnlock() - - if ok { - // Subscription already exists - e.subscriptionsLock.Lock() - defer e.subscriptionsLock.Unlock() - - logger.Debug("Adding callback to known subscription") - - cbKey := uuid.Must(uuid.NewV4()).String() - - e.subscriptions[cacheKey].Callbacks[cbKey] = callback - return func() { e.unregisterCallback(cacheKey, cbKey) }, nil - } - - logger.Debug("registering new eventsub subscription") - - // Register subscriptions - ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) - defer cancel() - - newSub, err := e.twitchClient.createEventSubSubscription(ctx, eventSubSubscription{ - Type: event, - Version: version, - Condition: condition, - Transport: eventSubTransport{ - Method: "webhook", - Callback: e.fullAPIurl(), - Secret: e.secret, - }, - }) - if err != nil { - return nil, errors.Wrap(err, "creating subscription") - } - - e.subscriptionsLock.Lock() - defer e.subscriptionsLock.Unlock() - - logger.Debug("Registered new hook") - - cbKey := uuid.Must(uuid.NewV4()).String() - e.subscriptions[cacheKey] = ®isteredSubscription{ - Type: event, - Callbacks: map[string]func(json.RawMessage) error{ - cbKey: callback, - }, - Subscription: *newSub, - } - - logger.Debug("Registered eventsub subscription") - - return func() { e.unregisterCallback(cacheKey, cbKey) }, nil -} - -func (c *Client) createEventSubSubscription(ctx context.Context, sub eventSubSubscription) (*eventSubSubscription, error) { +func (c *Client) createEventSubSubscription(ctx context.Context, auth authType, sub eventSubSubscription) (*eventSubSubscription, error) { var ( buf = new(bytes.Buffer) resp struct { @@ -466,7 +210,7 @@ func (c *Client) createEventSubSubscription(ctx context.Context, sub eventSubSub } if err := c.request(clientRequestOpts{ - AuthType: authTypeAppAccessToken, + AuthType: auth, Body: buf, Context: ctx, Method: http.MethodPost, diff --git a/pkg/twitch/eventsubWebhookClient.go b/pkg/twitch/eventsubWebhookClient.go new file mode 100644 index 0000000..062a1da --- /dev/null +++ b/pkg/twitch/eventsubWebhookClient.go @@ -0,0 +1,277 @@ +package twitch + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "sync" + + "github.com/gofrs/uuid/v3" + "github.com/gorilla/mux" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/Luzifer/go_helpers/v2/str" +) + +const ( + eventSubHeaderMessageID = "Twitch-Eventsub-Message-Id" + eventSubHeaderMessageType = "Twitch-Eventsub-Message-Type" + eventSubHeaderMessageSignature = "Twitch-Eventsub-Message-Signature" + eventSubHeaderMessageTimestamp = "Twitch-Eventsub-Message-Timestamp" + + eventSubMessageTypeVerification = "webhook_callback_verification" + eventSubMessageTypeRevokation = "revocation" + + eventSubStatusEnabled = "enabled" + eventSubStatusVerificationPending = "webhook_callback_verification_pending" +) + +type ( + // Deprecated: This client should no longer be used and will not be + // maintained afterwards. Replace with EventSubSocketClient. + EventSubClient struct { + apiURL string + secret string + secretHandle string + + twitchClient *Client + + subscriptions map[string]*registeredSubscription + subscriptionsLock sync.RWMutex + } +) + +// Deprecated: See deprecation notice of EventSubClient +func NewEventSubClient(twitchClient *Client, apiURL, secret, secretHandle string) (*EventSubClient, error) { + c := &EventSubClient{ + apiURL: apiURL, + secret: secret, + secretHandle: secretHandle, + + twitchClient: twitchClient, + + subscriptions: map[string]*registeredSubscription{}, + } + + return c, c.PreFetchSubscriptions(context.Background()) +} + +func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Request) { + var ( + body = new(bytes.Buffer) + keyHandle = mux.Vars(r)["keyhandle"] + message eventSubPostMessage + signature = r.Header.Get(eventSubHeaderMessageSignature) + ) + + if keyHandle != e.secretHandle { + http.Error(w, "deprecated callback used", http.StatusNotFound) + return + } + + // Copy body for duplicate processing + if _, err := io.Copy(body, r.Body); err != nil { + log.WithError(err).Error("Unable to read hook body") + return + } + + // Verify signature + mac := hmac.New(sha256.New, []byte(e.secret)) + fmt.Fprintf(mac, "%s%s%s", r.Header.Get(eventSubHeaderMessageID), r.Header.Get(eventSubHeaderMessageTimestamp), body.Bytes()) + if cSig := fmt.Sprintf("sha256=%x", mac.Sum(nil)); cSig != signature { + log.Errorf("Got message signature %s, expected %s", signature, cSig) + http.Error(w, "Signature verification failed", http.StatusUnauthorized) + return + } + + // Read message + if err := json.NewDecoder(body).Decode(&message); err != nil { + log.WithError(err).Errorf("Unable to decode eventsub message") + http.Error(w, errors.Wrap(err, "parsing message").Error(), http.StatusBadRequest) + return + } + + logger := log.WithField("type", message.Subscription.Type) + + // If we got a verification request, respond with the challenge + switch r.Header.Get(eventSubHeaderMessageType) { + case eventSubMessageTypeRevokation: + w.WriteHeader(http.StatusNoContent) + return + + case eventSubMessageTypeVerification: + logger.Debug("Confirming eventsub subscription") + w.Write([]byte(message.Challenge)) + return + } + + logger.Debug("Received notification") + + condHash, err := message.Subscription.Condition.Hash() + if err != nil { + logger.WithError(err).Errorf("Unable to hash condition of push") + http.Error(w, errors.Wrap(err, "hashing condition").Error(), http.StatusBadRequest) + return + } + + e.subscriptionsLock.RLock() + defer e.subscriptionsLock.RUnlock() + + cacheKey := strings.Join([]string{message.Subscription.Type, message.Subscription.Version, condHash}, "::") + + reg, ok := e.subscriptions[cacheKey] + if !ok { + http.Error(w, "no subscription available", http.StatusBadRequest) + return + } + + for _, cb := range reg.Callbacks { + if err = cb(message.Event); err != nil { + logger.WithError(err).Error("Handler callback caused error") + } + } +} + +func (e *EventSubClient) PreFetchSubscriptions(ctx context.Context) error { + e.subscriptionsLock.Lock() + defer e.subscriptionsLock.Unlock() + + subList, err := e.twitchClient.getEventSubSubscriptions(ctx) + if err != nil { + return errors.Wrap(err, "listing existing subscriptions") + } + + for i := range subList { + sub := subList[i] + + switch { + case !str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}): + // Is not an active hook, we don't need to care: It will be + // confirmed later or will expire but should not be counted + continue + + case strings.HasPrefix(sub.Transport.Callback, e.apiURL) && sub.Transport.Callback != e.fullAPIurl(): + // Uses the same API URL but with another secret handle: Must + // have been registered by another instance with another secret + // so we should be able to deregister it without causing any + // trouble + logger := log.WithFields(log.Fields{ + "id": sub.ID, + "topic": sub.Type, + "version": sub.Version, + }) + logger.Debug("Removing deprecated EventSub subscription") + if err = e.twitchClient.deleteEventSubSubscription(ctx, sub.ID); err != nil { + logger.WithError(err).Error("Unable to deregister deprecated EventSub subscription") + } + continue + + case sub.Transport.Callback != e.fullAPIurl(): + // Different callback URL: We don't care, it's probably another + // bot instance with the same client ID + continue + } + + condHash, err := sub.Condition.Hash() + if err != nil { + return errors.Wrap(err, "hashing condition") + } + + log.WithFields(log.Fields{ + "condition": sub.Condition, + "type": sub.Type, + "version": sub.Version, + }).Debug("found existing eventsub subscription") + + cacheKey := strings.Join([]string{sub.Type, sub.Version, condHash}, "::") + e.subscriptions[cacheKey] = ®isteredSubscription{ + Type: sub.Type, + Callbacks: map[string]func(json.RawMessage) error{}, + Subscription: sub, + } + } + + return nil +} + +func (e *EventSubClient) RegisterEventSubHooks(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) (func(), error) { + if version == "" { + version = EventSubTopicVersion1 + } + + condHash, err := condition.Hash() + if err != nil { + return nil, errors.Wrap(err, "hashing condition") + } + + var ( + cacheKey = strings.Join([]string{event, version, condHash}, "::") + logger = log.WithFields(log.Fields{ + "condition": condition, + "type": event, + "version": version, + }) + ) + + e.subscriptionsLock.RLock() + _, ok := e.subscriptions[cacheKey] + e.subscriptionsLock.RUnlock() + + if ok { + // Subscription already exists + e.subscriptionsLock.Lock() + defer e.subscriptionsLock.Unlock() + + logger.Debug("Adding callback to known subscription") + + cbKey := uuid.Must(uuid.NewV4()).String() + + e.subscriptions[cacheKey].Callbacks[cbKey] = callback + return func() { e.unregisterCallback(cacheKey, cbKey) }, nil + } + + logger.Debug("registering new eventsub subscription") + + // Register subscriptions + ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) + defer cancel() + + newSub, err := e.twitchClient.createEventSubSubscriptionWebhook(ctx, eventSubSubscription{ + Type: event, + Version: version, + Condition: condition, + Transport: eventSubTransport{ + Method: "webhook", + Callback: e.fullAPIurl(), + Secret: e.secret, + }, + }) + if err != nil { + return nil, errors.Wrap(err, "creating subscription") + } + + e.subscriptionsLock.Lock() + defer e.subscriptionsLock.Unlock() + + logger.Debug("Registered new hook") + + cbKey := uuid.Must(uuid.NewV4()).String() + e.subscriptions[cacheKey] = ®isteredSubscription{ + Type: event, + Callbacks: map[string]func(json.RawMessage) error{ + cbKey: callback, + }, + Subscription: *newSub, + } + + logger.Debug("Registered eventsub subscription") + + return func() { e.unregisterCallback(cacheKey, cbKey) }, nil +} diff --git a/pkg/twitch/eventsubWebsocketClient.go b/pkg/twitch/eventsubWebsocketClient.go new file mode 100644 index 0000000..85f5aa7 --- /dev/null +++ b/pkg/twitch/eventsubWebsocketClient.go @@ -0,0 +1,403 @@ +package twitch + +import ( + "context" + "encoding/json" + "io" + "net" + "reflect" + "time" + + "github.com/gorilla/websocket" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ( + eventsubLiveSocketDest = "wss://eventsub.wss.twitch.tv/ws" + socketInitialTimeout = 30 * time.Second + socketTimeoutGraceMultiplier = 1.5 +) + +const ( + eventsubSocketMessageTypeKeepalive = "session_keepalive" + eventsubSocketMessageTypeNotification = "notification" + eventsubSocketMessageTypeReconnect = "session_reconnect" + eventsubSocketMessageTypeWelcome = "session_welcome" +) + +const ( + eventsubCloseCodeInternalServerError = 4000 + eventsubCloseCodeClientSentTraffic = 4001 + eventsubCloseCodeClientFailedPingPong = 4002 + eventsubCloseCodeConnectionUnused = 4003 + eventsubCloseCodeReconnectGraceExpire = 4004 + eventsubCloseCodeNetworkTimeout = 4005 + eventsubCloseCodeNetworkError = 4006 + eventsubCloseCodeInvalidReconnect = 4007 +) + +type ( + EventSubSocketClient struct { + logger *logrus.Entry + socketDest string + socketID string + subscriptionTypes []eventSubSocketSubscriptionType + twitch *Client + + conn *websocket.Conn + newconn *websocket.Conn + + runCtx context.Context + runCtxCancel context.CancelFunc + } + + EventSubSocketClientOpt func(*EventSubSocketClient) + + eventSubSocketMessage struct { + Metadata struct { + MessageID string `json:"message_id"` + MessageType string `json:"message_type"` + MessageTimestamp time.Time `json:"message_timestamp"` + SubscriptionType string `json:"subscription_type"` + SubscriptionVersion string `json:"subscription_version"` + } `json:"metadata"` + Payload json.RawMessage `json:"payload"` + } + + eventSubSocketSubscriptionType struct { + Event, Version string + Condition EventSubCondition + Callback func(json.RawMessage) error + } + + eventSubSocketPayloadNotification struct { + Event json.RawMessage `json:"event"` + Subscription struct { + ID string `json:"id"` + Status string `json:"status"` + Type string `json:"type"` + Version string `json:"version"` + Cost int64 `json:"cost"` + Condition EventSubCondition `json:"condition"` + Transport struct { + Method string `json:"method"` + SessionID string `json:"session_id"` + } `json:"transport"` + CreatedAt time.Time `json:"created_at"` + } `json:"subscription"` + } + + eventSubSocketPayloadSession struct { + Session struct { + ID string `json:"id"` + Status string `json:"status"` + ConnectedAt time.Time `json:"connected_at"` + KeepaliveTimeoutSeconds int64 `json:"keepalive_timeout_seconds"` + ReconnectURL *string `json:"reconnect_url"` + } `json:"session"` + } +) + +func NewEventSubSocketClient(opts ...EventSubSocketClientOpt) (*EventSubSocketClient, error) { + ctx, cancel := context.WithCancel(context.Background()) + + c := &EventSubSocketClient{ + runCtx: ctx, + runCtxCancel: cancel, + } + + for _, opt := range opts { + opt(c) + } + + if c.socketDest == "" { + c.socketDest = eventsubLiveSocketDest + } + + if c.logger == nil { + discardLogger := logrus.New() + discardLogger.SetOutput(io.Discard) + c.logger = logrus.NewEntry(discardLogger) + } + + if c.twitch == nil { + return nil, errors.New("no twitch-client configured") + } + + return c, nil +} + +func WithLogger(logger *logrus.Entry) EventSubSocketClientOpt { + return func(e *EventSubSocketClient) { e.logger = logger } +} + +func WithSocketURL(url string) EventSubSocketClientOpt { + return func(e *EventSubSocketClient) { e.socketDest = url } +} + +func WithSubscription(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) EventSubSocketClientOpt { + if version == "" { + version = EventSubTopicVersion1 + } + + return func(e *EventSubSocketClient) { + e.subscriptionTypes = append(e.subscriptionTypes, eventSubSocketSubscriptionType{ + Event: event, + Version: version, + Condition: condition, + Callback: callback, + }) + } +} + +func WithTwitchClient(c *Client) EventSubSocketClientOpt { + return func(e *EventSubSocketClient) { e.twitch = c } +} + +func (e *EventSubSocketClient) Close() { e.runCtxCancel() } + +//nolint:gocyclo // Makes no sense to split further +func (e *EventSubSocketClient) Run() error { + var ( + errC = make(chan error, 1) + keepaliveTimeout = socketInitialTimeout + msgC = make(chan eventSubSocketMessage, 1) + socketTimeout = time.NewTimer(keepaliveTimeout) + ) + + if err := e.connect(e.socketDest, msgC, errC, "client init"); err != nil { + return errors.Wrap(err, "establishing initial connection") + } + + defer func() { + if err := e.conn.Close(); err != nil { + e.logger.WithError(err).Error("finally closing socket") + } + }() + + for { + select { + case err := <-errC: + // Something went wrong + if err = e.handleSocketError(err, msgC, errC); err != nil { + return err + } + + case <-socketTimeout.C: + // No message received, deeming connection dead + socketTimeout.Reset(socketInitialTimeout) + if err := e.connect(e.socketDest, msgC, errC, "socket timeout"); err != nil { + errC <- errors.Wrap(err, "re-connecting after timeout") + continue + } + + case msg := <-msgC: + // The keepalive timer is reset with each notification or + // keepalive message. + socketTimeout.Reset(keepaliveTimeout) + + switch msg.Metadata.MessageType { + case eventsubSocketMessageTypeKeepalive: + // Handle only for debug, timer reset is done above + e.logger.Trace("keepalive received") + + case eventsubSocketMessageTypeNotification: + // We got mail! Yay! + if err := e.handleNotificationMessage(msg); err != nil { + errC <- err + } + + case eventsubSocketMessageTypeReconnect: + // Twitch politely asked us to reconnect + if err := e.handleReconnectMessage(msg, msgC, errC); err != nil { + errC <- err + } + + case eventsubSocketMessageTypeWelcome: + var err error + if keepaliveTimeout, err = e.handleWelcomeMessage(msg); err != nil { + errC <- err + } + + default: + e.logger.WithField("type", msg.Metadata.MessageType).Error("unknown message type received") + } + + case <-e.runCtx.Done(): + return nil + } + } +} + +func (e *EventSubSocketClient) connect(url string, msgC chan eventSubSocketMessage, errC chan error, reason string) error { + e.logger.WithField("reason", reason).Debug("(re-)connecting websocket") + + conn, _, err := websocket.DefaultDialer.Dial(url, nil) //nolint:bodyclose // Close is implemented at other place + if err != nil { + return errors.Wrap(err, "dialing websocket") + } + + go func() { + for { + var msg eventSubSocketMessage + if err = conn.ReadJSON(&msg); err != nil { + errC <- errors.Wrap(err, "reading message") + return + } + + msgC <- msg + } + }() + + e.newconn = conn + return nil +} + +func (e *EventSubSocketClient) handleNotificationMessage(msg eventSubSocketMessage) error { + var payload eventSubSocketPayloadNotification + if err := msg.Unmarshal(&payload); err != nil { + return errors.Wrap(err, "unmarshalling notification") + } + + for _, st := range e.subscriptionTypes { + if st.Event != payload.Subscription.Type || st.Version != payload.Subscription.Version || !reflect.DeepEqual(st.Condition, payload.Subscription.Condition) { + continue + } + + if err := st.Callback(payload.Event); err != nil { + e.logger.WithError(err).WithFields(logrus.Fields{ + "condition": st.Condition, + "event": st.Event, + "version": st.Version, + }).Error("callback caused error") + } + } + + return nil +} + +func (e *EventSubSocketClient) handleReconnectMessage(msg eventSubSocketMessage, msgC chan eventSubSocketMessage, errC chan error) error { + e.logger.Debug("socket ask for reconnect") + + var payload eventSubSocketPayloadSession + if err := msg.Unmarshal(&payload); err != nil { + return errors.Wrap(err, "unmarshalling reconnect message") + } + + if payload.Session.ReconnectURL == nil { + return errors.New("reconnect message did not contain reconnect_url") + } + + if err := e.connect(*payload.Session.ReconnectURL, msgC, errC, "reconnect requested"); err != nil { + return errors.Wrap(err, "re-connecting after reconnect message") + } + + return nil +} + +func (e *EventSubSocketClient) handleSocketError(err error, msgC chan eventSubSocketMessage, errC chan error) error { + var closeErr *websocket.CloseError + if errors.As(err, &closeErr) { + switch closeErr.Code { + case eventsubCloseCodeInternalServerError: + e.logger.Warn("websocket reported internal server error") + return errors.Wrap(e.connect(e.socketDest, msgC, errC, "internal-server-error"), "re-connecting after internal-server-error") + + case eventsubCloseCodeClientSentTraffic: + e.logger.Error("wrong usage of websocket (client-sent-traffic)") + + case eventsubCloseCodeClientFailedPingPong: + e.logger.Error("wrong usage of websocket (missing-ping-pong)") + + case eventsubCloseCodeConnectionUnused: + e.logger.Error("wrong usage of websocket (no-topics-subscribed)") + + case eventsubCloseCodeReconnectGraceExpire: + e.logger.Error("wrong usage of websocket (no-reconnect-in-time)") + + case eventsubCloseCodeNetworkTimeout: + e.logger.Warn("websocket reported network timeout") + return errors.Wrap(e.connect(e.socketDest, msgC, errC, "network-timeout"), "re-connecting after network-timeout") + + case eventsubCloseCodeNetworkError: + e.logger.Warn("websocket reported network error") + return errors.Wrap(e.connect(e.socketDest, msgC, errC, "network-error"), "re-connecting after network-error") + + case eventsubCloseCodeInvalidReconnect: + e.logger.Warn("websocket reported invalid reconnect url") + + case websocket.CloseNormalClosure: + // We don't take action here as a graceful close should + // be initiated by us after establishing a new conn + e.logger.Debug("websocket was closed normally") + return nil + + default: + // Some non-twitch close code we did not expect + e.logger.WithError(closeErr).Error("websocket reported unexpected error code") + } + } + + if errors.Is(err, net.ErrClosed) { + // This isn't nice but might happen, in this case the socket is + // already gone but the read didn't notice that until this error + return nil + } + + return err +} + +func (e *EventSubSocketClient) handleWelcomeMessage(msg eventSubSocketMessage) (time.Duration, error) { + var payload eventSubSocketPayloadSession + if err := msg.Unmarshal(&payload); err != nil { + return socketInitialTimeout, errors.Wrap(err, "unmarshalling welcome message") + } + + // Close old connection if present + if e.conn != nil { + if err := e.conn.Close(); err != nil { + e.logger.WithError(err).Error("closing old websocket") + } + } + + // Promote new connection to existing conn + e.conn, e.newconn = e.newconn, nil + + // Subscribe to topics if the socket ID changed (should only + // happen on first connect or if we established a new + // connection after something broke) + if e.socketID != payload.Session.ID { + e.socketID = payload.Session.ID + if err := e.subscribe(); err != nil { + return socketInitialTimeout, errors.Wrap(err, "subscribing to topics") + } + } + + e.logger.WithField("id", e.socketID).Debug("websocket connected successfully") + + // Configure proper keepalive + return time.Duration(float64(payload.Session.KeepaliveTimeoutSeconds)*socketTimeoutGraceMultiplier) * time.Second, nil +} + +func (e *EventSubSocketClient) subscribe() error { + for _, st := range e.subscriptionTypes { + if _, err := e.twitch.createEventSubSubscriptionWebsocket(context.Background(), eventSubSubscription{ + Type: st.Event, + Version: st.Version, + Condition: st.Condition, + Transport: eventSubTransport{ + Method: "websocket", + SessionID: e.socketID, + }, + }); err != nil { + return errors.Wrapf(err, "subscribing to %s/%s", st.Event, st.Version) + } + } + + return nil +} + +func (e eventSubSocketMessage) Unmarshal(dest any) error { + return errors.Wrap(json.Unmarshal(e.Payload, dest), "unmarshalling payload") +} diff --git a/twitchWatcher.go b/twitchWatcher.go index 60b4e73..a74654d 100644 --- a/twitchWatcher.go +++ b/twitchWatcher.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/Luzifer/twitch-bot/v3/internal/service/access" "github.com/Luzifer/twitch-bot/v3/pkg/twitch" "github.com/Luzifer/twitch-bot/v3/plugins" ) @@ -26,8 +27,8 @@ type ( IsLive bool Title string - isInitialized bool - unregisterFunc func() + isInitialized bool + esc *twitch.EventSubSocketClient } twitchWatcher struct { @@ -37,6 +38,11 @@ type ( } ) +func (t *twitchChannelState) CloseESC() { + t.esc.Close() + t.esc = nil +} + func (t twitchChannelState) Equals(c twitchChannelState) bool { return t.Category == c.Category && t.IsLive == c.IsLive && @@ -76,7 +82,7 @@ func (t *twitchWatcher) Check() { var channels []string t.lock.RLock() for c := range t.ChannelStatus { - if t.ChannelStatus[c].unregisterFunc != nil { + if t.ChannelStatus[c].esc != nil { continue } @@ -95,8 +101,8 @@ func (t *twitchWatcher) RemoveChannel(channel string) error { t.lock.Lock() defer t.lock.Unlock() - if f := t.ChannelStatus[channel].unregisterFunc; f != nil { - f() + if t.ChannelStatus[channel].esc != nil { + t.ChannelStatus[channel].esc.Close() } delete(t.ChannelStatus, channel) @@ -282,23 +288,6 @@ func (t *twitchWatcher) handleEventSubStreamOnOff(isOnline bool) func(json.RawMe } } -func (t *twitchWatcher) handleEventUserAuthRevoke(m json.RawMessage) error { - var payload twitch.EventSubEventUserAuthorizationRevoke - if err := json.Unmarshal(m, &payload); err != nil { - return errors.Wrap(err, "unmarshalling event") - } - - if payload.ClientID != cfg.TwitchClient { - // We got an revoke for a different ID: Shouldn't happen but whatever. - return nil - } - - return errors.Wrap( - accessService.RemoveExendedTwitchCredentials(payload.UserLogin), - "deleting granted scopes", - ) -} - func (t *twitchWatcher) updateChannelFromAPI(channel string) error { t.lock.Lock() defer t.lock.Unlock() @@ -332,22 +321,39 @@ func (t *twitchWatcher) updateChannelFromAPI(channel string) error { storedStatus.Update(status) storedStatus.isInitialized = true - if storedStatus.unregisterFunc != nil { + if storedStatus.esc != nil { // Do not register twice return nil } - if storedStatus.unregisterFunc, err = t.registerEventSubCallbacks(channel); err != nil { + if storedStatus.esc, err = t.registerEventSubCallbacks(channel); err != nil { return errors.Wrap(err, "registering eventsub callbacks") } + if storedStatus.esc != nil { + log.WithField("channel", channel).Info("watching for eventsub events") + go func(storedStatus *twitchChannelState) { + if err := storedStatus.esc.Run(); err != nil { + log.WithField("channel", channel).WithError(err).Error("eventsub client caused error") + } + storedStatus.CloseESC() + }(storedStatus) + } + return nil } -func (t *twitchWatcher) registerEventSubCallbacks(channel string) (func(), error) { - if twitchEventSubClient == nil { - // We don't have eventsub functionality - return nil, nil +func (t *twitchWatcher) registerEventSubCallbacks(channel string) (*twitch.EventSubSocketClient, error) { + tc, err := accessService.GetTwitchClientForChannel(channel, access.ClientConfig{ + TwitchClient: cfg.TwitchClient, + TwitchClientSecret: cfg.TwitchClientSecret, + }) + if err != nil { + if errors.Is(err, access.ErrChannelNotAuthorized) { + return nil, nil + } + + return nil, errors.Wrap(err, "getting twitch client for channel") } userID, err := twitchClient.GetIDForUsername(channel) @@ -357,7 +363,7 @@ func (t *twitchWatcher) registerEventSubCallbacks(channel string) (func(), error var ( topicRegistrations = t.getTopicRegistrations(userID) - unsubHandlers []func() + topicOpts []twitch.EventSubSocketClientOpt ) for _, tr := range topicRegistrations { @@ -385,37 +391,19 @@ func (t *twitchWatcher) registerEventSubCallbacks(channel string) (func(), error } } - uf, err := twitchEventSubClient.RegisterEventSubHooks(tr.Topic, tr.Version, tr.Condition, tr.Hook) - if err != nil { - logger.WithError(err).Error("Unable to register topic") - - for _, f := range unsubHandlers { - // Error will cause unsub handlers not to be stored, therefore we unsub them now - f() - } - - return nil, errors.Wrap(err, "registering topic") - } - - unsubHandlers = append(unsubHandlers, uf) + topicOpts = append(topicOpts, twitch.WithSubscription(tr.Topic, tr.Version, tr.Condition, tr.Hook)) } - return func() { - for _, f := range unsubHandlers { - f() - } - }, nil -} + esClient, err := twitch.NewEventSubSocketClient(append( + topicOpts, + twitch.WithLogger(log.WithField("channel", channel)), + twitch.WithTwitchClient(tc), + )...) + if err != nil { + return nil, errors.Wrap(err, "getting eventsub client for channel") + } -func (t *twitchWatcher) registerGlobalHooks() error { - _, err := twitchEventSubClient.RegisterEventSubHooks( - twitch.EventSubEventTypeUserAuthorizationRevoke, - twitch.EventSubTopicVersion1, - twitch.EventSubCondition{ClientID: cfg.TwitchClient}, - t.handleEventUserAuthRevoke, - ) - - return errors.Wrap(err, "registering user auth hook") + return esClient, nil } func (t *twitchWatcher) triggerUpdate(channel string, title, category *string, online *bool) {