[eventsub] Switch to Websocket transport (#46)

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2023-05-18 15:05:43 +02:00
parent e9ba4d2c88
commit 0e7af1cfc8
Signed by: luzifer
GPG key ID: D91C3E91E4CAD6F5
6 changed files with 739 additions and 439 deletions

View file

@ -17,7 +17,7 @@ Please see the [Wiki](https://github.com/Luzifer/twitch-bot/wiki) for documentat
```console
# twitch-bot --help
Usage of twitch-bot:
--base-url string External URL of the config-editor interface (set to enable EventSub support)
--base-url string External URL of the config-editor interface (used to generate auth-urls)
--command-timeout duration Timeout for command execution (default 30s)
-c, --config string Location of configuration file (default "./config.yaml")
--log-level string Log level (debug, info, warn, error, fatal) (default "info")

114
main.go
View file

@ -1,15 +1,10 @@
package main
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"math"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
@ -23,7 +18,6 @@ import (
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
"github.com/Luzifer/go_helpers/v2/backoff"
"github.com/Luzifer/go_helpers/v2/str"
"github.com/Luzifer/rconfig/v2"
"github.com/Luzifer/twitch-bot/v3/internal/service/access"
@ -40,14 +34,11 @@ const (
maxIRCRetryBackoff = time.Minute
httpReadHeaderTimeout = 5 * time.Second
coreMetaKeyEventSubSecret = "event_sub_secret"
eventSubSecretLength = 32
)
var (
cfg = struct {
BaseURL string `flag:"base-url" default:"" description:"External URL of the config-editor interface (set to enable EventSub support)"`
BaseURL string `flag:"base-url" default:"" description:"External URL of the config-editor interface (used to generate auth-urls)"`
CommandTimeout time.Duration `flag:"command-timeout" default:"30s" description:"Timeout for command execution"`
Config string `flag:"config,c" default:"./config.yaml" description:"Location of configuration file"`
IRCRateLimit time.Duration `flag:"rate-limit" default:"1500ms" description:"How often to send a message (default: 20/30s=1500ms, if your bot is mod everywhere: 100/30s=300ms, different for known/verified bots)"`
@ -74,14 +65,12 @@ var (
router = mux.NewRouter()
runID = uuid.Must(uuid.NewV4()).String()
externalHTTPAvailable bool
db database.Connector
accessService *access.Service
timerService *timer.Service
twitchClient *twitch.Client
twitchEventSubClient *twitch.EventSubClient
version = "dev"
)
@ -126,35 +115,6 @@ func initApp() error {
return nil
}
func getEventSubSecret() (secret, handle string, err error) {
var eventSubSecret string
err = db.ReadEncryptedCoreMeta(coreMetaKeyEventSubSecret, &eventSubSecret)
switch {
case errors.Is(err, nil):
return eventSubSecret, eventSubSecret[:5], nil
case errors.Is(err, database.ErrCoreMetaNotFound):
// We need to generate a new secret below
default:
return "", "", errors.Wrap(err, "reading secret from database")
}
key := make([]byte, eventSubSecretLength)
n, err := rand.Read(key)
if err != nil {
return "", "", errors.Wrap(err, "generating random secret")
}
if n != eventSubSecretLength {
return "", "", errors.Errorf("read only %d of %d byte", n, eventSubSecretLength)
}
eventSubSecret = hex.EncodeToString(key)
return eventSubSecret, eventSubSecret[:5], errors.Wrap(db.StoreEncryptedCoreMeta(coreMetaKeyEventSubSecret, eventSubSecret), "storing secret to database")
}
//nolint:funlen,gocognit,gocyclo // Complexity is a little too high but makes no sense to split
func main() {
var err error
@ -287,30 +247,6 @@ func main() {
go server.Serve(listener)
log.WithField("address", listener.Addr().String()).Info("HTTP server started")
checkExternalHTTP()
if externalHTTPAvailable && cfg.TwitchClient != "" && cfg.TwitchClientSecret != "" {
secret, handle, err := getEventSubSecret()
if err != nil {
log.WithError(err).Fatal("Unable to get or create eventsub secret")
}
twitchEventSubClient, err = twitch.NewEventSubClient(twitchClient, strings.Join([]string{
strings.TrimRight(cfg.BaseURL, "/"),
"eventsub",
}, "/"), secret, handle)
if err != nil {
log.WithError(err).Fatal("Unable to create eventsub client")
}
if err := twitchWatch.registerGlobalHooks(); err != nil {
log.WithError(err).Fatal("Unable to register global eventsub hooks")
}
router.HandleFunc("/eventsub/{keyhandle}", twitchEventSubClient.HandleEventsubPush).Methods(http.MethodPost)
}
}
for _, c := range config.Channels {
@ -404,54 +340,6 @@ func main() {
}
}
func checkExternalHTTP() {
base, err := url.Parse(cfg.BaseURL)
if err != nil {
log.WithError(err).Error("Unable to parse BaseURL")
return
}
if base.String() == "" {
log.Debug("No BaseURL set, disabling EventSub support")
return
}
base.Path = strings.Join([]string{
strings.TrimRight(base.Path, "/"),
"selfcheck",
}, "/")
var data []byte
if err = backoff.NewBackoff().WithMaxTotalTime(cfg.WaitForSelfcheck).Retry(func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, base.String(), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return errors.Wrap(err, "requesting self-check URL")
}
defer resp.Body.Close()
data, err = io.ReadAll(resp.Body)
if err != nil {
return errors.Wrap(err, "reading self-check response")
}
if strings.TrimSpace(string(data)) != runID {
return errors.New("found unexpected run-id")
}
return nil
}); err != nil {
log.WithError(err).Error("executing self-check")
return
}
externalHTTPAvailable = true
log.Debug("Self-Check successful, EventSub support is available")
}
func startCheck() error {
var errs []string

View file

@ -3,46 +3,19 @@ package twitch
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/gofrs/uuid/v3"
"github.com/gorilla/mux"
"github.com/mitchellh/hashstructure/v2"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/Luzifer/go_helpers/v2/str"
)
const (
eventSubHeaderMessageID = "Twitch-Eventsub-Message-Id"
eventSubHeaderMessageType = "Twitch-Eventsub-Message-Type"
eventSubHeaderMessageSignature = "Twitch-Eventsub-Message-Signature"
eventSubHeaderMessageTimestamp = "Twitch-Eventsub-Message-Timestamp"
// eventSubHeaderMessageRetry = "Twitch-Eventsub-Message-Retry"
// eventSubHeaderSubscriptionType = "Twitch-Eventsub-Subscription-Type"
// eventSubHeaderSubscriptionVersion = "Twitch-Eventsub-Subscription-Version"
eventSubMessageTypeVerification = "webhook_callback_verification"
eventSubMessageTypeRevokation = "revocation"
// eventSubMessageTypeNotification = "notification"
eventSubStatusEnabled = "enabled"
eventSubStatusVerificationPending = "webhook_callback_verification_pending"
// eventSubStatusAuthorizationRevoked = "authorization_revoked"
// eventSubStatusFailuresExceeded = "notification_failures_exceeded"
// eventSubStatusUserRemoved = "user_removed"
// eventSubStatusVerificationFailed = "webhook_callback_verification_failed"
EventSubEventTypeChannelFollow = "channel.follow"
EventSubEventTypeChannelPointCustomRewardRedemptionAdd = "channel.channel_points_custom_reward_redemption.add"
EventSubEventTypeChannelRaid = "channel.raid"
@ -59,17 +32,6 @@ const (
)
type (
EventSubClient struct {
apiURL string
secret string
secretHandle string
twitchClient *Client
subscriptions map[string]*registeredSubscription
subscriptionsLock sync.RWMutex
}
EventSubCondition struct {
BroadcasterUserID string `json:"broadcaster_user_id,omitempty"`
CampaignID string `json:"campaign_id,omitempty"`
@ -204,6 +166,7 @@ type (
Method string `json:"method"`
Callback string `json:"callback"`
Secret string `json:"secret"`
SessionID string `json:"session_id"`
}
registeredSubscription struct {
@ -222,234 +185,15 @@ func (e EventSubCondition) Hash() (string, error) {
return fmt.Sprintf("%x", h), nil
}
func NewEventSubClient(twitchClient *Client, apiURL, secret, secretHandle string) (*EventSubClient, error) {
c := &EventSubClient{
apiURL: apiURL,
secret: secret,
secretHandle: secretHandle,
twitchClient: twitchClient,
subscriptions: map[string]*registeredSubscription{},
}
return c, c.PreFetchSubscriptions(context.Background())
func (c *Client) createEventSubSubscriptionWebhook(ctx context.Context, sub eventSubSubscription) (*eventSubSubscription, error) {
return c.createEventSubSubscription(ctx, authTypeAppAccessToken, sub)
}
func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Request) {
var (
body = new(bytes.Buffer)
keyHandle = mux.Vars(r)["keyhandle"]
message eventSubPostMessage
signature = r.Header.Get(eventSubHeaderMessageSignature)
)
if keyHandle != e.secretHandle {
http.Error(w, "deprecated callback used", http.StatusNotFound)
return
}
// Copy body for duplicate processing
if _, err := io.Copy(body, r.Body); err != nil {
log.WithError(err).Error("Unable to read hook body")
return
}
// Verify signature
mac := hmac.New(sha256.New, []byte(e.secret))
fmt.Fprintf(mac, "%s%s%s", r.Header.Get(eventSubHeaderMessageID), r.Header.Get(eventSubHeaderMessageTimestamp), body.Bytes())
if cSig := fmt.Sprintf("sha256=%x", mac.Sum(nil)); cSig != signature {
log.Errorf("Got message signature %s, expected %s", signature, cSig)
http.Error(w, "Signature verification failed", http.StatusUnauthorized)
return
}
// Read message
if err := json.NewDecoder(body).Decode(&message); err != nil {
log.WithError(err).Errorf("Unable to decode eventsub message")
http.Error(w, errors.Wrap(err, "parsing message").Error(), http.StatusBadRequest)
return
}
logger := log.WithField("type", message.Subscription.Type)
// If we got a verification request, respond with the challenge
switch r.Header.Get(eventSubHeaderMessageType) {
case eventSubMessageTypeRevokation:
w.WriteHeader(http.StatusNoContent)
return
case eventSubMessageTypeVerification:
logger.Debug("Confirming eventsub subscription")
w.Write([]byte(message.Challenge))
return
}
logger.Debug("Received notification")
condHash, err := message.Subscription.Condition.Hash()
if err != nil {
logger.WithError(err).Errorf("Unable to hash condition of push")
http.Error(w, errors.Wrap(err, "hashing condition").Error(), http.StatusBadRequest)
return
}
e.subscriptionsLock.RLock()
defer e.subscriptionsLock.RUnlock()
cacheKey := strings.Join([]string{message.Subscription.Type, message.Subscription.Version, condHash}, "::")
reg, ok := e.subscriptions[cacheKey]
if !ok {
http.Error(w, "no subscription available", http.StatusBadRequest)
return
}
for _, cb := range reg.Callbacks {
if err = cb(message.Event); err != nil {
logger.WithError(err).Error("Handler callback caused error")
}
}
func (c *Client) createEventSubSubscriptionWebsocket(ctx context.Context, sub eventSubSubscription) (*eventSubSubscription, error) {
return c.createEventSubSubscription(ctx, authTypeBearerToken, sub)
}
func (e *EventSubClient) PreFetchSubscriptions(ctx context.Context) error {
e.subscriptionsLock.Lock()
defer e.subscriptionsLock.Unlock()
subList, err := e.twitchClient.getEventSubSubscriptions(ctx)
if err != nil {
return errors.Wrap(err, "listing existing subscriptions")
}
for i := range subList {
sub := subList[i]
switch {
case !str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}):
// Is not an active hook, we don't need to care: It will be
// confirmed later or will expire but should not be counted
continue
case strings.HasPrefix(sub.Transport.Callback, e.apiURL) && sub.Transport.Callback != e.fullAPIurl():
// Uses the same API URL but with another secret handle: Must
// have been registered by another instance with another secret
// so we should be able to deregister it without causing any
// trouble
logger := log.WithFields(log.Fields{
"id": sub.ID,
"topic": sub.Type,
"version": sub.Version,
})
logger.Debug("Removing deprecated EventSub subscription")
if err = e.twitchClient.deleteEventSubSubscription(ctx, sub.ID); err != nil {
logger.WithError(err).Error("Unable to deregister deprecated EventSub subscription")
}
continue
case sub.Transport.Callback != e.fullAPIurl():
// Different callback URL: We don't care, it's probably another
// bot instance with the same client ID
continue
}
condHash, err := sub.Condition.Hash()
if err != nil {
return errors.Wrap(err, "hashing condition")
}
log.WithFields(log.Fields{
"condition": sub.Condition,
"type": sub.Type,
"version": sub.Version,
}).Debug("found existing eventsub subscription")
cacheKey := strings.Join([]string{sub.Type, sub.Version, condHash}, "::")
e.subscriptions[cacheKey] = &registeredSubscription{
Type: sub.Type,
Callbacks: map[string]func(json.RawMessage) error{},
Subscription: sub,
}
}
return nil
}
func (e *EventSubClient) RegisterEventSubHooks(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) (func(), error) {
if version == "" {
version = EventSubTopicVersion1
}
condHash, err := condition.Hash()
if err != nil {
return nil, errors.Wrap(err, "hashing condition")
}
var (
cacheKey = strings.Join([]string{event, version, condHash}, "::")
logger = log.WithFields(log.Fields{
"condition": condition,
"type": event,
"version": version,
})
)
e.subscriptionsLock.RLock()
_, ok := e.subscriptions[cacheKey]
e.subscriptionsLock.RUnlock()
if ok {
// Subscription already exists
e.subscriptionsLock.Lock()
defer e.subscriptionsLock.Unlock()
logger.Debug("Adding callback to known subscription")
cbKey := uuid.Must(uuid.NewV4()).String()
e.subscriptions[cacheKey].Callbacks[cbKey] = callback
return func() { e.unregisterCallback(cacheKey, cbKey) }, nil
}
logger.Debug("registering new eventsub subscription")
// Register subscriptions
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
defer cancel()
newSub, err := e.twitchClient.createEventSubSubscription(ctx, eventSubSubscription{
Type: event,
Version: version,
Condition: condition,
Transport: eventSubTransport{
Method: "webhook",
Callback: e.fullAPIurl(),
Secret: e.secret,
},
})
if err != nil {
return nil, errors.Wrap(err, "creating subscription")
}
e.subscriptionsLock.Lock()
defer e.subscriptionsLock.Unlock()
logger.Debug("Registered new hook")
cbKey := uuid.Must(uuid.NewV4()).String()
e.subscriptions[cacheKey] = &registeredSubscription{
Type: event,
Callbacks: map[string]func(json.RawMessage) error{
cbKey: callback,
},
Subscription: *newSub,
}
logger.Debug("Registered eventsub subscription")
return func() { e.unregisterCallback(cacheKey, cbKey) }, nil
}
func (c *Client) createEventSubSubscription(ctx context.Context, sub eventSubSubscription) (*eventSubSubscription, error) {
func (c *Client) createEventSubSubscription(ctx context.Context, auth authType, sub eventSubSubscription) (*eventSubSubscription, error) {
var (
buf = new(bytes.Buffer)
resp struct {
@ -466,7 +210,7 @@ func (c *Client) createEventSubSubscription(ctx context.Context, sub eventSubSub
}
if err := c.request(clientRequestOpts{
AuthType: authTypeAppAccessToken,
AuthType: auth,
Body: buf,
Context: ctx,
Method: http.MethodPost,

View file

@ -0,0 +1,277 @@
package twitch
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
"github.com/gofrs/uuid/v3"
"github.com/gorilla/mux"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/Luzifer/go_helpers/v2/str"
)
const (
eventSubHeaderMessageID = "Twitch-Eventsub-Message-Id"
eventSubHeaderMessageType = "Twitch-Eventsub-Message-Type"
eventSubHeaderMessageSignature = "Twitch-Eventsub-Message-Signature"
eventSubHeaderMessageTimestamp = "Twitch-Eventsub-Message-Timestamp"
eventSubMessageTypeVerification = "webhook_callback_verification"
eventSubMessageTypeRevokation = "revocation"
eventSubStatusEnabled = "enabled"
eventSubStatusVerificationPending = "webhook_callback_verification_pending"
)
type (
// Deprecated: This client should no longer be used and will not be
// maintained afterwards. Replace with EventSubSocketClient.
EventSubClient struct {
apiURL string
secret string
secretHandle string
twitchClient *Client
subscriptions map[string]*registeredSubscription
subscriptionsLock sync.RWMutex
}
)
// Deprecated: See deprecation notice of EventSubClient
func NewEventSubClient(twitchClient *Client, apiURL, secret, secretHandle string) (*EventSubClient, error) {
c := &EventSubClient{
apiURL: apiURL,
secret: secret,
secretHandle: secretHandle,
twitchClient: twitchClient,
subscriptions: map[string]*registeredSubscription{},
}
return c, c.PreFetchSubscriptions(context.Background())
}
func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Request) {
var (
body = new(bytes.Buffer)
keyHandle = mux.Vars(r)["keyhandle"]
message eventSubPostMessage
signature = r.Header.Get(eventSubHeaderMessageSignature)
)
if keyHandle != e.secretHandle {
http.Error(w, "deprecated callback used", http.StatusNotFound)
return
}
// Copy body for duplicate processing
if _, err := io.Copy(body, r.Body); err != nil {
log.WithError(err).Error("Unable to read hook body")
return
}
// Verify signature
mac := hmac.New(sha256.New, []byte(e.secret))
fmt.Fprintf(mac, "%s%s%s", r.Header.Get(eventSubHeaderMessageID), r.Header.Get(eventSubHeaderMessageTimestamp), body.Bytes())
if cSig := fmt.Sprintf("sha256=%x", mac.Sum(nil)); cSig != signature {
log.Errorf("Got message signature %s, expected %s", signature, cSig)
http.Error(w, "Signature verification failed", http.StatusUnauthorized)
return
}
// Read message
if err := json.NewDecoder(body).Decode(&message); err != nil {
log.WithError(err).Errorf("Unable to decode eventsub message")
http.Error(w, errors.Wrap(err, "parsing message").Error(), http.StatusBadRequest)
return
}
logger := log.WithField("type", message.Subscription.Type)
// If we got a verification request, respond with the challenge
switch r.Header.Get(eventSubHeaderMessageType) {
case eventSubMessageTypeRevokation:
w.WriteHeader(http.StatusNoContent)
return
case eventSubMessageTypeVerification:
logger.Debug("Confirming eventsub subscription")
w.Write([]byte(message.Challenge))
return
}
logger.Debug("Received notification")
condHash, err := message.Subscription.Condition.Hash()
if err != nil {
logger.WithError(err).Errorf("Unable to hash condition of push")
http.Error(w, errors.Wrap(err, "hashing condition").Error(), http.StatusBadRequest)
return
}
e.subscriptionsLock.RLock()
defer e.subscriptionsLock.RUnlock()
cacheKey := strings.Join([]string{message.Subscription.Type, message.Subscription.Version, condHash}, "::")
reg, ok := e.subscriptions[cacheKey]
if !ok {
http.Error(w, "no subscription available", http.StatusBadRequest)
return
}
for _, cb := range reg.Callbacks {
if err = cb(message.Event); err != nil {
logger.WithError(err).Error("Handler callback caused error")
}
}
}
func (e *EventSubClient) PreFetchSubscriptions(ctx context.Context) error {
e.subscriptionsLock.Lock()
defer e.subscriptionsLock.Unlock()
subList, err := e.twitchClient.getEventSubSubscriptions(ctx)
if err != nil {
return errors.Wrap(err, "listing existing subscriptions")
}
for i := range subList {
sub := subList[i]
switch {
case !str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}):
// Is not an active hook, we don't need to care: It will be
// confirmed later or will expire but should not be counted
continue
case strings.HasPrefix(sub.Transport.Callback, e.apiURL) && sub.Transport.Callback != e.fullAPIurl():
// Uses the same API URL but with another secret handle: Must
// have been registered by another instance with another secret
// so we should be able to deregister it without causing any
// trouble
logger := log.WithFields(log.Fields{
"id": sub.ID,
"topic": sub.Type,
"version": sub.Version,
})
logger.Debug("Removing deprecated EventSub subscription")
if err = e.twitchClient.deleteEventSubSubscription(ctx, sub.ID); err != nil {
logger.WithError(err).Error("Unable to deregister deprecated EventSub subscription")
}
continue
case sub.Transport.Callback != e.fullAPIurl():
// Different callback URL: We don't care, it's probably another
// bot instance with the same client ID
continue
}
condHash, err := sub.Condition.Hash()
if err != nil {
return errors.Wrap(err, "hashing condition")
}
log.WithFields(log.Fields{
"condition": sub.Condition,
"type": sub.Type,
"version": sub.Version,
}).Debug("found existing eventsub subscription")
cacheKey := strings.Join([]string{sub.Type, sub.Version, condHash}, "::")
e.subscriptions[cacheKey] = &registeredSubscription{
Type: sub.Type,
Callbacks: map[string]func(json.RawMessage) error{},
Subscription: sub,
}
}
return nil
}
func (e *EventSubClient) RegisterEventSubHooks(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) (func(), error) {
if version == "" {
version = EventSubTopicVersion1
}
condHash, err := condition.Hash()
if err != nil {
return nil, errors.Wrap(err, "hashing condition")
}
var (
cacheKey = strings.Join([]string{event, version, condHash}, "::")
logger = log.WithFields(log.Fields{
"condition": condition,
"type": event,
"version": version,
})
)
e.subscriptionsLock.RLock()
_, ok := e.subscriptions[cacheKey]
e.subscriptionsLock.RUnlock()
if ok {
// Subscription already exists
e.subscriptionsLock.Lock()
defer e.subscriptionsLock.Unlock()
logger.Debug("Adding callback to known subscription")
cbKey := uuid.Must(uuid.NewV4()).String()
e.subscriptions[cacheKey].Callbacks[cbKey] = callback
return func() { e.unregisterCallback(cacheKey, cbKey) }, nil
}
logger.Debug("registering new eventsub subscription")
// Register subscriptions
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
defer cancel()
newSub, err := e.twitchClient.createEventSubSubscriptionWebhook(ctx, eventSubSubscription{
Type: event,
Version: version,
Condition: condition,
Transport: eventSubTransport{
Method: "webhook",
Callback: e.fullAPIurl(),
Secret: e.secret,
},
})
if err != nil {
return nil, errors.Wrap(err, "creating subscription")
}
e.subscriptionsLock.Lock()
defer e.subscriptionsLock.Unlock()
logger.Debug("Registered new hook")
cbKey := uuid.Must(uuid.NewV4()).String()
e.subscriptions[cacheKey] = &registeredSubscription{
Type: event,
Callbacks: map[string]func(json.RawMessage) error{
cbKey: callback,
},
Subscription: *newSub,
}
logger.Debug("Registered eventsub subscription")
return func() { e.unregisterCallback(cacheKey, cbKey) }, nil
}

View file

@ -0,0 +1,403 @@
package twitch
import (
"context"
"encoding/json"
"io"
"net"
"reflect"
"time"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
eventsubLiveSocketDest = "wss://eventsub.wss.twitch.tv/ws"
socketInitialTimeout = 30 * time.Second
socketTimeoutGraceMultiplier = 1.5
)
const (
eventsubSocketMessageTypeKeepalive = "session_keepalive"
eventsubSocketMessageTypeNotification = "notification"
eventsubSocketMessageTypeReconnect = "session_reconnect"
eventsubSocketMessageTypeWelcome = "session_welcome"
)
const (
eventsubCloseCodeInternalServerError = 4000
eventsubCloseCodeClientSentTraffic = 4001
eventsubCloseCodeClientFailedPingPong = 4002
eventsubCloseCodeConnectionUnused = 4003
eventsubCloseCodeReconnectGraceExpire = 4004
eventsubCloseCodeNetworkTimeout = 4005
eventsubCloseCodeNetworkError = 4006
eventsubCloseCodeInvalidReconnect = 4007
)
type (
EventSubSocketClient struct {
logger *logrus.Entry
socketDest string
socketID string
subscriptionTypes []eventSubSocketSubscriptionType
twitch *Client
conn *websocket.Conn
newconn *websocket.Conn
runCtx context.Context
runCtxCancel context.CancelFunc
}
EventSubSocketClientOpt func(*EventSubSocketClient)
eventSubSocketMessage struct {
Metadata struct {
MessageID string `json:"message_id"`
MessageType string `json:"message_type"`
MessageTimestamp time.Time `json:"message_timestamp"`
SubscriptionType string `json:"subscription_type"`
SubscriptionVersion string `json:"subscription_version"`
} `json:"metadata"`
Payload json.RawMessage `json:"payload"`
}
eventSubSocketSubscriptionType struct {
Event, Version string
Condition EventSubCondition
Callback func(json.RawMessage) error
}
eventSubSocketPayloadNotification struct {
Event json.RawMessage `json:"event"`
Subscription struct {
ID string `json:"id"`
Status string `json:"status"`
Type string `json:"type"`
Version string `json:"version"`
Cost int64 `json:"cost"`
Condition EventSubCondition `json:"condition"`
Transport struct {
Method string `json:"method"`
SessionID string `json:"session_id"`
} `json:"transport"`
CreatedAt time.Time `json:"created_at"`
} `json:"subscription"`
}
eventSubSocketPayloadSession struct {
Session struct {
ID string `json:"id"`
Status string `json:"status"`
ConnectedAt time.Time `json:"connected_at"`
KeepaliveTimeoutSeconds int64 `json:"keepalive_timeout_seconds"`
ReconnectURL *string `json:"reconnect_url"`
} `json:"session"`
}
)
func NewEventSubSocketClient(opts ...EventSubSocketClientOpt) (*EventSubSocketClient, error) {
ctx, cancel := context.WithCancel(context.Background())
c := &EventSubSocketClient{
runCtx: ctx,
runCtxCancel: cancel,
}
for _, opt := range opts {
opt(c)
}
if c.socketDest == "" {
c.socketDest = eventsubLiveSocketDest
}
if c.logger == nil {
discardLogger := logrus.New()
discardLogger.SetOutput(io.Discard)
c.logger = logrus.NewEntry(discardLogger)
}
if c.twitch == nil {
return nil, errors.New("no twitch-client configured")
}
return c, nil
}
func WithLogger(logger *logrus.Entry) EventSubSocketClientOpt {
return func(e *EventSubSocketClient) { e.logger = logger }
}
func WithSocketURL(url string) EventSubSocketClientOpt {
return func(e *EventSubSocketClient) { e.socketDest = url }
}
func WithSubscription(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) EventSubSocketClientOpt {
if version == "" {
version = EventSubTopicVersion1
}
return func(e *EventSubSocketClient) {
e.subscriptionTypes = append(e.subscriptionTypes, eventSubSocketSubscriptionType{
Event: event,
Version: version,
Condition: condition,
Callback: callback,
})
}
}
func WithTwitchClient(c *Client) EventSubSocketClientOpt {
return func(e *EventSubSocketClient) { e.twitch = c }
}
func (e *EventSubSocketClient) Close() { e.runCtxCancel() }
//nolint:gocyclo // Makes no sense to split further
func (e *EventSubSocketClient) Run() error {
var (
errC = make(chan error, 1)
keepaliveTimeout = socketInitialTimeout
msgC = make(chan eventSubSocketMessage, 1)
socketTimeout = time.NewTimer(keepaliveTimeout)
)
if err := e.connect(e.socketDest, msgC, errC, "client init"); err != nil {
return errors.Wrap(err, "establishing initial connection")
}
defer func() {
if err := e.conn.Close(); err != nil {
e.logger.WithError(err).Error("finally closing socket")
}
}()
for {
select {
case err := <-errC:
// Something went wrong
if err = e.handleSocketError(err, msgC, errC); err != nil {
return err
}
case <-socketTimeout.C:
// No message received, deeming connection dead
socketTimeout.Reset(socketInitialTimeout)
if err := e.connect(e.socketDest, msgC, errC, "socket timeout"); err != nil {
errC <- errors.Wrap(err, "re-connecting after timeout")
continue
}
case msg := <-msgC:
// The keepalive timer is reset with each notification or
// keepalive message.
socketTimeout.Reset(keepaliveTimeout)
switch msg.Metadata.MessageType {
case eventsubSocketMessageTypeKeepalive:
// Handle only for debug, timer reset is done above
e.logger.Trace("keepalive received")
case eventsubSocketMessageTypeNotification:
// We got mail! Yay!
if err := e.handleNotificationMessage(msg); err != nil {
errC <- err
}
case eventsubSocketMessageTypeReconnect:
// Twitch politely asked us to reconnect
if err := e.handleReconnectMessage(msg, msgC, errC); err != nil {
errC <- err
}
case eventsubSocketMessageTypeWelcome:
var err error
if keepaliveTimeout, err = e.handleWelcomeMessage(msg); err != nil {
errC <- err
}
default:
e.logger.WithField("type", msg.Metadata.MessageType).Error("unknown message type received")
}
case <-e.runCtx.Done():
return nil
}
}
}
func (e *EventSubSocketClient) connect(url string, msgC chan eventSubSocketMessage, errC chan error, reason string) error {
e.logger.WithField("reason", reason).Debug("(re-)connecting websocket")
conn, _, err := websocket.DefaultDialer.Dial(url, nil) //nolint:bodyclose // Close is implemented at other place
if err != nil {
return errors.Wrap(err, "dialing websocket")
}
go func() {
for {
var msg eventSubSocketMessage
if err = conn.ReadJSON(&msg); err != nil {
errC <- errors.Wrap(err, "reading message")
return
}
msgC <- msg
}
}()
e.newconn = conn
return nil
}
func (e *EventSubSocketClient) handleNotificationMessage(msg eventSubSocketMessage) error {
var payload eventSubSocketPayloadNotification
if err := msg.Unmarshal(&payload); err != nil {
return errors.Wrap(err, "unmarshalling notification")
}
for _, st := range e.subscriptionTypes {
if st.Event != payload.Subscription.Type || st.Version != payload.Subscription.Version || !reflect.DeepEqual(st.Condition, payload.Subscription.Condition) {
continue
}
if err := st.Callback(payload.Event); err != nil {
e.logger.WithError(err).WithFields(logrus.Fields{
"condition": st.Condition,
"event": st.Event,
"version": st.Version,
}).Error("callback caused error")
}
}
return nil
}
func (e *EventSubSocketClient) handleReconnectMessage(msg eventSubSocketMessage, msgC chan eventSubSocketMessage, errC chan error) error {
e.logger.Debug("socket ask for reconnect")
var payload eventSubSocketPayloadSession
if err := msg.Unmarshal(&payload); err != nil {
return errors.Wrap(err, "unmarshalling reconnect message")
}
if payload.Session.ReconnectURL == nil {
return errors.New("reconnect message did not contain reconnect_url")
}
if err := e.connect(*payload.Session.ReconnectURL, msgC, errC, "reconnect requested"); err != nil {
return errors.Wrap(err, "re-connecting after reconnect message")
}
return nil
}
func (e *EventSubSocketClient) handleSocketError(err error, msgC chan eventSubSocketMessage, errC chan error) error {
var closeErr *websocket.CloseError
if errors.As(err, &closeErr) {
switch closeErr.Code {
case eventsubCloseCodeInternalServerError:
e.logger.Warn("websocket reported internal server error")
return errors.Wrap(e.connect(e.socketDest, msgC, errC, "internal-server-error"), "re-connecting after internal-server-error")
case eventsubCloseCodeClientSentTraffic:
e.logger.Error("wrong usage of websocket (client-sent-traffic)")
case eventsubCloseCodeClientFailedPingPong:
e.logger.Error("wrong usage of websocket (missing-ping-pong)")
case eventsubCloseCodeConnectionUnused:
e.logger.Error("wrong usage of websocket (no-topics-subscribed)")
case eventsubCloseCodeReconnectGraceExpire:
e.logger.Error("wrong usage of websocket (no-reconnect-in-time)")
case eventsubCloseCodeNetworkTimeout:
e.logger.Warn("websocket reported network timeout")
return errors.Wrap(e.connect(e.socketDest, msgC, errC, "network-timeout"), "re-connecting after network-timeout")
case eventsubCloseCodeNetworkError:
e.logger.Warn("websocket reported network error")
return errors.Wrap(e.connect(e.socketDest, msgC, errC, "network-error"), "re-connecting after network-error")
case eventsubCloseCodeInvalidReconnect:
e.logger.Warn("websocket reported invalid reconnect url")
case websocket.CloseNormalClosure:
// We don't take action here as a graceful close should
// be initiated by us after establishing a new conn
e.logger.Debug("websocket was closed normally")
return nil
default:
// Some non-twitch close code we did not expect
e.logger.WithError(closeErr).Error("websocket reported unexpected error code")
}
}
if errors.Is(err, net.ErrClosed) {
// This isn't nice but might happen, in this case the socket is
// already gone but the read didn't notice that until this error
return nil
}
return err
}
func (e *EventSubSocketClient) handleWelcomeMessage(msg eventSubSocketMessage) (time.Duration, error) {
var payload eventSubSocketPayloadSession
if err := msg.Unmarshal(&payload); err != nil {
return socketInitialTimeout, errors.Wrap(err, "unmarshalling welcome message")
}
// Close old connection if present
if e.conn != nil {
if err := e.conn.Close(); err != nil {
e.logger.WithError(err).Error("closing old websocket")
}
}
// Promote new connection to existing conn
e.conn, e.newconn = e.newconn, nil
// Subscribe to topics if the socket ID changed (should only
// happen on first connect or if we established a new
// connection after something broke)
if e.socketID != payload.Session.ID {
e.socketID = payload.Session.ID
if err := e.subscribe(); err != nil {
return socketInitialTimeout, errors.Wrap(err, "subscribing to topics")
}
}
e.logger.WithField("id", e.socketID).Debug("websocket connected successfully")
// Configure proper keepalive
return time.Duration(float64(payload.Session.KeepaliveTimeoutSeconds)*socketTimeoutGraceMultiplier) * time.Second, nil
}
func (e *EventSubSocketClient) subscribe() error {
for _, st := range e.subscriptionTypes {
if _, err := e.twitch.createEventSubSubscriptionWebsocket(context.Background(), eventSubSubscription{
Type: st.Event,
Version: st.Version,
Condition: st.Condition,
Transport: eventSubTransport{
Method: "websocket",
SessionID: e.socketID,
},
}); err != nil {
return errors.Wrapf(err, "subscribing to %s/%s", st.Event, st.Version)
}
}
return nil
}
func (e eventSubSocketMessage) Unmarshal(dest any) error {
return errors.Wrap(json.Unmarshal(e.Payload, dest), "unmarshalling payload")
}

View file

@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/Luzifer/twitch-bot/v3/internal/service/access"
"github.com/Luzifer/twitch-bot/v3/pkg/twitch"
"github.com/Luzifer/twitch-bot/v3/plugins"
)
@ -27,7 +28,7 @@ type (
Title string
isInitialized bool
unregisterFunc func()
esc *twitch.EventSubSocketClient
}
twitchWatcher struct {
@ -37,6 +38,11 @@ type (
}
)
func (t *twitchChannelState) CloseESC() {
t.esc.Close()
t.esc = nil
}
func (t twitchChannelState) Equals(c twitchChannelState) bool {
return t.Category == c.Category &&
t.IsLive == c.IsLive &&
@ -76,7 +82,7 @@ func (t *twitchWatcher) Check() {
var channels []string
t.lock.RLock()
for c := range t.ChannelStatus {
if t.ChannelStatus[c].unregisterFunc != nil {
if t.ChannelStatus[c].esc != nil {
continue
}
@ -95,8 +101,8 @@ func (t *twitchWatcher) RemoveChannel(channel string) error {
t.lock.Lock()
defer t.lock.Unlock()
if f := t.ChannelStatus[channel].unregisterFunc; f != nil {
f()
if t.ChannelStatus[channel].esc != nil {
t.ChannelStatus[channel].esc.Close()
}
delete(t.ChannelStatus, channel)
@ -282,23 +288,6 @@ func (t *twitchWatcher) handleEventSubStreamOnOff(isOnline bool) func(json.RawMe
}
}
func (t *twitchWatcher) handleEventUserAuthRevoke(m json.RawMessage) error {
var payload twitch.EventSubEventUserAuthorizationRevoke
if err := json.Unmarshal(m, &payload); err != nil {
return errors.Wrap(err, "unmarshalling event")
}
if payload.ClientID != cfg.TwitchClient {
// We got an revoke for a different ID: Shouldn't happen but whatever.
return nil
}
return errors.Wrap(
accessService.RemoveExendedTwitchCredentials(payload.UserLogin),
"deleting granted scopes",
)
}
func (t *twitchWatcher) updateChannelFromAPI(channel string) error {
t.lock.Lock()
defer t.lock.Unlock()
@ -332,24 +321,41 @@ func (t *twitchWatcher) updateChannelFromAPI(channel string) error {
storedStatus.Update(status)
storedStatus.isInitialized = true
if storedStatus.unregisterFunc != nil {
if storedStatus.esc != nil {
// Do not register twice
return nil
}
if storedStatus.unregisterFunc, err = t.registerEventSubCallbacks(channel); err != nil {
if storedStatus.esc, err = t.registerEventSubCallbacks(channel); err != nil {
return errors.Wrap(err, "registering eventsub callbacks")
}
if storedStatus.esc != nil {
log.WithField("channel", channel).Info("watching for eventsub events")
go func(storedStatus *twitchChannelState) {
if err := storedStatus.esc.Run(); err != nil {
log.WithField("channel", channel).WithError(err).Error("eventsub client caused error")
}
storedStatus.CloseESC()
}(storedStatus)
}
return nil
}
func (t *twitchWatcher) registerEventSubCallbacks(channel string) (func(), error) {
if twitchEventSubClient == nil {
// We don't have eventsub functionality
func (t *twitchWatcher) registerEventSubCallbacks(channel string) (*twitch.EventSubSocketClient, error) {
tc, err := accessService.GetTwitchClientForChannel(channel, access.ClientConfig{
TwitchClient: cfg.TwitchClient,
TwitchClientSecret: cfg.TwitchClientSecret,
})
if err != nil {
if errors.Is(err, access.ErrChannelNotAuthorized) {
return nil, nil
}
return nil, errors.Wrap(err, "getting twitch client for channel")
}
userID, err := twitchClient.GetIDForUsername(channel)
if err != nil {
return nil, errors.Wrap(err, "resolving channel to user-id")
@ -357,7 +363,7 @@ func (t *twitchWatcher) registerEventSubCallbacks(channel string) (func(), error
var (
topicRegistrations = t.getTopicRegistrations(userID)
unsubHandlers []func()
topicOpts []twitch.EventSubSocketClientOpt
)
for _, tr := range topicRegistrations {
@ -385,37 +391,19 @@ func (t *twitchWatcher) registerEventSubCallbacks(channel string) (func(), error
}
}
uf, err := twitchEventSubClient.RegisterEventSubHooks(tr.Topic, tr.Version, tr.Condition, tr.Hook)
topicOpts = append(topicOpts, twitch.WithSubscription(tr.Topic, tr.Version, tr.Condition, tr.Hook))
}
esClient, err := twitch.NewEventSubSocketClient(append(
topicOpts,
twitch.WithLogger(log.WithField("channel", channel)),
twitch.WithTwitchClient(tc),
)...)
if err != nil {
logger.WithError(err).Error("Unable to register topic")
for _, f := range unsubHandlers {
// Error will cause unsub handlers not to be stored, therefore we unsub them now
f()
return nil, errors.Wrap(err, "getting eventsub client for channel")
}
return nil, errors.Wrap(err, "registering topic")
}
unsubHandlers = append(unsubHandlers, uf)
}
return func() {
for _, f := range unsubHandlers {
f()
}
}, nil
}
func (t *twitchWatcher) registerGlobalHooks() error {
_, err := twitchEventSubClient.RegisterEventSubHooks(
twitch.EventSubEventTypeUserAuthorizationRevoke,
twitch.EventSubTopicVersion1,
twitch.EventSubCondition{ClientID: cfg.TwitchClient},
t.handleEventUserAuthRevoke,
)
return errors.Wrap(err, "registering user auth hook")
return esClient, nil
}
func (t *twitchWatcher) triggerUpdate(channel string, title, category *string, online *bool) {