diff --git a/README.md b/README.md index deb3a89..30f6796 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ Please see the [Wiki](https://github.com/Luzifer/twitch-bot/wiki) for documentat ```console # twitch-bot --help Usage of twitch-bot: + --base-url string External URL of the config-editor interface (set to enable EventSub support) --command-timeout duration Timeout for command execution (default 30s) -c, --config string Location of configuration file (default "./config.yaml") --log-level string Log level (debug, info, warn, error, fatal) (default "info") diff --git a/main.go b/main.go index 6908acf..4067f0a 100644 --- a/main.go +++ b/main.go @@ -2,9 +2,12 @@ package main import ( "bytes" + "context" "fmt" + "io/ioutil" "net" "net/http" + "net/url" "os" "strings" "sync" @@ -27,16 +30,18 @@ const ircReconnectDelay = 100 * time.Millisecond var ( cfg = struct { - CommandTimeout time.Duration `flag:"command-timeout" default:"30s" description:"Timeout for command execution"` - Config string `flag:"config,c" default:"./config.yaml" description:"Location of configuration file"` - IRCRateLimit time.Duration `flag:"rate-limit" default:"1500ms" description:"How often to send a message (default: 20/30s=1500ms, if your bot is mod everywhere: 100/30s=300ms, different for known/verified bots)"` - LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` - PluginDir string `flag:"plugin-dir" default:"/usr/lib/twitch-bot" description:"Where to find and load plugins"` - StorageFile string `flag:"storage-file" default:"./storage.json.gz" description:"Where to store the data"` - TwitchClient string `flag:"twitch-client" default:"" description:"Client ID to act as"` - TwitchToken string `flag:"twitch-token" default:"" description:"OAuth token valid for client"` - ValidateConfig bool `flag:"validate-config,v" default:"false" description:"Loads the config, logs any errors and quits with status 0 on success"` - VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` + BaseURL string `flag:"base-url" default:"" description:"External URL of the config-editor interface (set to enable EventSub support)"` + CommandTimeout time.Duration `flag:"command-timeout" default:"30s" description:"Timeout for command execution"` + Config string `flag:"config,c" default:"./config.yaml" description:"Location of configuration file"` + IRCRateLimit time.Duration `flag:"rate-limit" default:"1500ms" description:"How often to send a message (default: 20/30s=1500ms, if your bot is mod everywhere: 100/30s=300ms, different for known/verified bots)"` + LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` + PluginDir string `flag:"plugin-dir" default:"/usr/lib/twitch-bot" description:"Where to find and load plugins"` + StorageFile string `flag:"storage-file" default:"./storage.json.gz" description:"Where to store the data"` + TwitchClient string `flag:"twitch-client" default:"" description:"Client ID to act as"` + TwitchClientSecret string `flag:"twitch-client-secret" default:"" description:"Secret for the Client ID"` + TwitchToken string `flag:"twitch-token" default:"" description:"OAuth token valid for client"` + ValidateConfig bool `flag:"validate-config,v" default:"false" description:"Loads the config, logs any errors and quits with status 0 on success"` + VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` }{} config *configFile @@ -47,10 +52,14 @@ var ( ircHdl *ircHandler router = mux.NewRouter() + runID = uuid.Must(uuid.NewV4()).String() + externalHTTPAvailable bool + sendMessage func(m *irc.Message) error - store = newStorageFile(false) - twitchClient *twitch.Client + store = newStorageFile(false) + twitchClient *twitch.Client + twitchEventSubClient *twitch.EventSubClient version = "dev" ) @@ -142,6 +151,7 @@ func main() { router.Use(corsMiddleware) router.HandleFunc("/openapi.html", handleSwaggerHTML) router.HandleFunc("/openapi.json", handleSwaggerRequest) + router.HandleFunc("/selfcheck", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte(runID)) }) if err = store.Load(); err != nil { log.WithError(err).Fatal("Unable to load storage file") @@ -180,12 +190,6 @@ func main() { return } - for _, c := range config.Channels { - if err := twitchWatch.AddChannel(c); err != nil { - log.WithError(err).WithField("channel", c).Error("Unable to add channel to watcher") - } - } - if err = startCheck(); err != nil { log.WithError(err).Fatal("Missing required parameters") } @@ -209,6 +213,33 @@ func main() { go http.Serve(listener, router) log.WithField("address", listener.Addr().String()).Info("HTTP server started") + + checkExternalHTTP() + + if externalHTTPAvailable && cfg.TwitchClient != "" && cfg.TwitchClientSecret != "" { + secret, handle, err := store.GetOrGenerateEventSubSecret() + if err != nil { + log.WithError(err).Fatal("Unable to get or create eventsub secret") + } + + twitchEventSubClient = twitch.NewEventSubClient(strings.Join([]string{ + strings.TrimRight(cfg.BaseURL, "/"), + "eventsub", + handle, + }, "/"), secret, handle) + + if err = twitchEventSubClient.Authorize(cfg.TwitchClient, cfg.TwitchClientSecret); err != nil { + log.WithError(err).Fatal("Unable to authorize Twitch EventSub client") + } + + router.HandleFunc("/eventsub/{keyhandle}", twitchEventSubClient.HandleEventsubPush).Methods(http.MethodPost) + } + } + + for _, c := range config.Channels { + if err := twitchWatch.AddChannel(c); err != nil { + log.WithError(err).WithField("channel", c).Error("Unable to add channel to watcher") + } } ircDisconnected <- struct{}{} @@ -291,6 +322,49 @@ func main() { } } +func checkExternalHTTP() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + base, err := url.Parse(cfg.BaseURL) + if err != nil { + log.WithError(err).Error("Unable to parse BaseURL") + return + } + + if base.String() == "" { + log.Debug("No BaseURL set, disabling EventSub support") + return + } + + base.Path = strings.Join([]string{ + strings.TrimRight(base.Path, "/"), + "selfcheck", + }, "/") + + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, base.String(), nil) + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.WithError(err).Error("Unable to fetch selfcheck") + return + } + defer resp.Body.Close() + + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.WithError(err).Error("Unable to read selfcheck response") + return + } + + if strings.TrimSpace(string(data)) == runID { + externalHTTPAvailable = true + log.Debug("Self-Check successful, EventSub support is available") + } else { + externalHTTPAvailable = false + log.Debug("Self-Check failed, EventSub support is not available") + } +} + func startCheck() error { var errs []string diff --git a/store.go b/store.go index e53564e..de792bc 100644 --- a/store.go +++ b/store.go @@ -2,6 +2,8 @@ package main import ( "compress/gzip" + "crypto/rand" + "encoding/hex" "encoding/json" "os" "sync" @@ -11,6 +13,8 @@ import ( "github.com/pkg/errors" ) +const eventSubSecretLength = 32 + type storageFile struct { Counters map[string]int64 `json:"counters"` Timers map[string]plugins.TimerEntry `json:"timers"` @@ -18,6 +22,8 @@ type storageFile struct { ModuleStorage map[string]json.RawMessage `json:"module_storage"` + EventSubSecret string `json:"event_sub_secret,omitempty"` + inMem bool lock *sync.RWMutex } @@ -51,6 +57,28 @@ func (s *storageFile) GetCounterValue(counter string) int64 { return s.Counters[counter] } +func (s *storageFile) GetOrGenerateEventSubSecret() (string, string, error) { + s.lock.Lock() + defer s.lock.Unlock() + + if s.EventSubSecret != "" { + return s.EventSubSecret, s.EventSubSecret[:5], nil + } + + key := make([]byte, eventSubSecretLength) + n, err := rand.Read(key) + if err != nil { + return "", "", errors.Wrap(err, "generating random secret") + } + if n != eventSubSecretLength { + return "", "", errors.Errorf("read only %d of %d byte", n, eventSubSecretLength) + } + + s.EventSubSecret = hex.EncodeToString(key) + + return s.EventSubSecret, s.EventSubSecret[:5], errors.Wrap(s.Save(), "saving store") +} + func (s *storageFile) GetModuleStore(moduleUUID string, storedObject plugins.StorageUnmarshaller) error { s.lock.RLock() defer s.lock.RUnlock() diff --git a/twitch/eventsub.go b/twitch/eventsub.go new file mode 100644 index 0000000..19afa53 --- /dev/null +++ b/twitch/eventsub.go @@ -0,0 +1,524 @@ +package twitch + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "strings" + "sync" + "time" + + "github.com/Luzifer/go_helpers/v2/str" + "github.com/gofrs/uuid/v3" + "github.com/gorilla/mux" + "github.com/mitchellh/hashstructure/v2" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +const ( + eventSubHeaderMessageID = "Twitch-Eventsub-Message-Id" + eventSubHeaderMessageType = "Twitch-Eventsub-Message-Type" + eventSubHeaderMessageSignature = "Twitch-Eventsub-Message-Signature" + eventSubHeaderMessageTimestamp = "Twitch-Eventsub-Message-Timestamp" + // eventSubHeaderMessageRetry = "Twitch-Eventsub-Message-Retry" + // eventSubHeaderSubscriptionType = "Twitch-Eventsub-Subscription-Type" + // eventSubHeaderSubscriptionVersion = "Twitch-Eventsub-Subscription-Version" + + eventSubMessageTypeVerification = "webhook_callback_verification" + eventSubMessageTypeRevokation = "revocation" + // eventSubMessageTypeNotification = "notification" + + eventSubStatusEnabled = "enabled" + eventSubStatusVerificationPending = "webhook_callback_verification_pending" + // eventSubStatusAuthorizationRevoked = "authorization_revoked" + // eventSubStatusFailuresExceeded = "notification_failures_exceeded" + // eventSubStatusUserRemoved = "user_removed" + // eventSubStatusVerificationFailed = "webhook_callback_verification_failed" + + EventSubEventTypeChannelUpdate = "channel.update" + EventSubEventTypeStreamOffline = "stream.offline" + EventSubEventTypeStreamOnline = "stream.online" +) + +type ( + EventSubClient struct { + apiURL string + secret string + secretHandle string + + twitchClientID string + twitchClientSecret string + twitchAccessToken string + + subscriptions map[string]*registeredSubscription + subscriptionsLock sync.RWMutex + } + + EventSubCondition struct { + BroadcasterUserID string `json:"broadcaster_user_id,omitempty"` + CampaignID string `json:"campaign_id,omitempty"` + CategoryID string `json:"category_id,omitempty"` + ClientID string `json:"client_id,omitempty"` + ExtensionClientID string `json:"extension_client_id,omitempty"` + FromBroadcasterUserID string `json:"from_broadcaster_user_id,omitempty"` + OrganizationID string `json:"organization_id,omitempty"` + RewardID string `json:"reward_id,omitempty"` + ToBroadcasterUserID string `json:"to_broadcaster_user_id,omitempty"` + UserID string `json:"user_id,omitempty"` + } + + EventSubEventChannelUpdate struct { + BroadcasterUserID string `json:"broadcaster_user_id"` + BroadcasterUserLogin string `json:"broadcaster_user_login"` + BroadcasterUserName string `json:"broadcaster_user_name"` + Title string `json:"title"` + Language string `json:"language"` + CategoryID string `json:"category_id"` + CategoryName string `json:"category_name"` + IsMature bool `json:"is_mature"` + } + + EventSubEventFollow struct { + UserID string `json:"user_id"` + UserLogin string `json:"user_login"` + UserName string `json:"user_name"` + BroadcasterUserID string `json:"broadcaster_user_id"` + BroadcasterUserLogin string `json:"broadcaster_user_login"` + BroadcasterUserName string `json:"broadcaster_user_name"` + FollowedAt time.Time `json:"followed_at"` + } + + EventSubEventStreamOffline struct { + BroadcasterUserID string `json:"broadcaster_user_id"` + BroadcasterUserLogin string `json:"broadcaster_user_login"` + BroadcasterUserName string `json:"broadcaster_user_name"` + } + + EventSubEventStreamOnline struct { + ID string `json:"id"` + BroadcasterUserID string `json:"broadcaster_user_id"` + BroadcasterUserLogin string `json:"broadcaster_user_login"` + BroadcasterUserName string `json:"broadcaster_user_name"` + Type string `json:"type"` + StartedAt time.Time `json:"started_at"` + } + + eventSubPostMessage struct { + Challenge string `json:"challenge"` + Subscription eventSubSubscription `json:"subscription"` + Event json.RawMessage `json:"event"` + } + + eventSubSubscription struct { + ID string `json:"id,omitempty"` // READONLY + Status string `json:"status,omitempty"` // READONLY + Type string `json:"type"` + Version string `json:"version"` + Cost int64 `json:"cost,omitempty"` // READONLY + Condition EventSubCondition `json:"condition"` + Transport eventSubTransport `json:"transport"` + CreatedAt time.Time `json:"created_at,omitempty"` // READONLY + } + + eventSubTransport struct { + Method string `json:"method"` + Callback string `json:"callback"` + Secret string `json:"secret"` + } + + registeredSubscription struct { + Type string + Callbacks map[string]func(json.RawMessage) error + Subscription eventSubSubscription + } +) + +func (e EventSubCondition) Hash() (string, error) { + h, err := hashstructure.Hash(e, hashstructure.FormatV2, &hashstructure.HashOptions{TagName: "json"}) + if err != nil { + return "", errors.Wrap(err, "hashing struct") + } + + return fmt.Sprintf("%x", h), nil +} + +func NewEventSubClient(apiURL, secret, secretHandle string) *EventSubClient { + return &EventSubClient{ + apiURL: apiURL, + secret: secret, + secretHandle: secretHandle, + + subscriptions: map[string]*registeredSubscription{}, + } +} + +func (e *EventSubClient) Authorize(clientID, clientSecret string) error { + e.twitchClientID = clientID + e.twitchClientSecret = clientSecret + + _, err := e.getTwitchAppAccessToken() + return errors.Wrap(err, "fetching app access token") +} + +func (e *EventSubClient) getTwitchAppAccessToken() (string, error) { + if e.twitchAccessToken != "" { + return e.twitchAccessToken, nil + } + + var rData struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` + ExpiresIn int `json:"expires_in"` + Scope []interface{} `json:"scope"` + TokenType string `json:"token_type"` + } + + params := make(url.Values) + params.Set("client_id", e.twitchClientID) + params.Set("client_secret", e.twitchClientSecret) + params.Set("grant_type", "client_credentials") + + u, _ := url.Parse("https://id.twitch.tv/oauth2/token") + u.RawQuery = params.Encode() + + ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) + defer cancel() + + req, _ := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", errors.Wrap(err, "fetching response") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", errors.Wrapf(err, "unexpected status %d and cannot read body", resp.StatusCode) + } + return "", errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) + } + + e.twitchAccessToken = rData.AccessToken + + return rData.AccessToken, errors.Wrap( + json.NewDecoder(resp.Body).Decode(&rData), + "decoding response", + ) +} + +func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Request) { + var ( + body = new(bytes.Buffer) + keyHandle = mux.Vars(r)["keyhandle"] + message eventSubPostMessage + signature = r.Header.Get(eventSubHeaderMessageSignature) + ) + + if keyHandle != e.secretHandle { + http.Error(w, "deprecated callback used", http.StatusNotFound) + return + } + + // Copy body for duplicate processing + if _, err := io.Copy(body, r.Body); err != nil { + log.WithError(err).Error("Unable to read hook body") + return + } + + // Verify signature + mac := hmac.New(sha256.New, []byte(e.secret)) + fmt.Fprintf(mac, "%s%s%s", r.Header.Get(eventSubHeaderMessageID), r.Header.Get(eventSubHeaderMessageTimestamp), body.Bytes()) + if cSig := fmt.Sprintf("sha256=%x", mac.Sum(nil)); cSig != signature { + log.Errorf("Got message signature %s, expected %s", signature, cSig) + http.Error(w, "Signature verification failed", http.StatusUnauthorized) + return + } + + // Read message + if err := json.NewDecoder(body).Decode(&message); err != nil { + log.WithError(err).Errorf("Unable to decode eventsub message") + http.Error(w, errors.Wrap(err, "parsing message").Error(), http.StatusBadRequest) + return + } + + logger := log.WithField("type", message.Subscription.Type) + + // If we got a verification request, respond with the challenge + switch r.Header.Get(eventSubHeaderMessageType) { + case eventSubMessageTypeRevokation: + w.WriteHeader(http.StatusNoContent) + return + + case eventSubMessageTypeVerification: + logger.Debug("Confirming eventsub subscription") + w.Write([]byte(message.Challenge)) + return + } + + logger.Debug("Received notification") + + condHash, err := message.Subscription.Condition.Hash() + if err != nil { + logger.WithError(err).Errorf("Unable to hash condition of push") + http.Error(w, errors.Wrap(err, "hashing condition").Error(), http.StatusBadRequest) + return + } + + e.subscriptionsLock.RLock() + defer e.subscriptionsLock.RUnlock() + + cacheKey := strings.Join([]string{message.Subscription.Type, condHash}, "::") + + reg, ok := e.subscriptions[cacheKey] + if !ok { + http.Error(w, "no subscription available", http.StatusBadRequest) + return + } + + for _, cb := range reg.Callbacks { + if err = cb(message.Event); err != nil { + logger.WithError(err).Error("Handler callback caused error") + } + } +} + +//nolint:funlen,gocyclo // Not splitting to keep logic unit +func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubCondition, callback func(json.RawMessage) error) (func(), error) { + condHash, err := condition.Hash() + if err != nil { + return nil, errors.Wrap(err, "hashing condition") + } + + var ( + cacheKey = strings.Join([]string{event, condHash}, "::") + logger = log.WithField("event", event) + ) + + e.subscriptionsLock.RLock() + _, ok := e.subscriptions[cacheKey] + e.subscriptionsLock.RUnlock() + + if ok { + // Subscription already exists + e.subscriptionsLock.Lock() + defer e.subscriptionsLock.Unlock() + + logger.Debug("Adding callback to existing callback") + + cbKey := uuid.Must(uuid.NewV4()).String() + + e.subscriptions[cacheKey].Callbacks[cbKey] = callback + return func() { e.unregisterCallback(cacheKey, cbKey) }, nil + } + + accessToken, err := e.getTwitchAppAccessToken() + if err != nil { + return nil, errors.Wrap(err, "getting app-access-token") + } + + ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) + defer cancel() + + // List existing subscriptions + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https://api.twitch.tv/helix/eventsub/subscriptions", nil) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Client-Id", e.twitchClientID) + req.Header.Set("Authorization", "Bearer "+accessToken) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, errors.Wrap(err, "requesting subscribscriptions") + } + defer resp.Body.Close() + + var subscriptionList struct { + Data []eventSubSubscription + } + + if err = json.NewDecoder(resp.Body).Decode(&subscriptionList); err != nil { + return nil, errors.Wrap(err, "decoding subscription list") + } + + // Register subscriptions + var ( + existingSub *eventSubSubscription + ) + for i, sub := range subscriptionList.Data { + existingConditionHash, err := sub.Condition.Hash() + if err != nil { + return nil, errors.Wrap(err, "hashing existing condition") + } + newConditionHash, err := condition.Hash() + if err != nil { + return nil, errors.Wrap(err, "hashing new condition") + } + + if str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}) && + sub.Transport.Callback == e.apiURL && + existingConditionHash == newConditionHash && + sub.Type == event { + logger = logger.WithFields(log.Fields{ + "id": sub.ID, + "status": sub.Status, + }) + existingSub = &subscriptionList.Data[i] + } + } + + if existingSub != nil { + logger.WithField("event", event).Debug("Not registering hook, already active") + + e.subscriptionsLock.Lock() + defer e.subscriptionsLock.Unlock() + + logger.Debug("Found existing hook, registering and adding callback") + + cbKey := uuid.Must(uuid.NewV4()).String() + e.subscriptions[cacheKey] = ®isteredSubscription{ + Type: event, + Callbacks: map[string]func(json.RawMessage) error{ + cbKey: callback, + }, + Subscription: *existingSub, + } + + return func() { e.unregisterCallback(cacheKey, cbKey) }, nil + } + + payload := eventSubSubscription{ + Type: event, + Version: "1", + Condition: condition, + Transport: eventSubTransport{ + Method: "webhook", + Callback: e.apiURL, + Secret: e.secret, + }, + } + + buf := new(bytes.Buffer) + if err := json.NewEncoder(buf).Encode(payload); err != nil { + return nil, errors.Wrap(err, "assemble subscribe payload") + } + + ctx, cancel = context.WithTimeout(context.Background(), twitchRequestTimeout) + defer cancel() + + req, err = http.NewRequestWithContext(ctx, http.MethodPost, "https://api.twitch.tv/helix/eventsub/subscriptions", buf) + if err != nil { + return nil, errors.Wrap(err, "creating subscribe request") + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Client-Id", e.twitchClientID) + req.Header.Set("Authorization", "Bearer "+accessToken) + + resp, err = http.DefaultClient.Do(req) + if err != nil { + return nil, errors.Wrap(err, "requesting subscribe") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusAccepted { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode) + } + return nil, errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) + } + + var response struct { + Data []eventSubSubscription `json:"data"` + } + if err = json.NewDecoder(resp.Body).Decode(&response); err != nil { + return nil, errors.Wrap(err, "reading eventsub sub response") + } + + e.subscriptionsLock.Lock() + defer e.subscriptionsLock.Unlock() + + logger.Debug("Registered new hook") + + cbKey := uuid.Must(uuid.NewV4()).String() + e.subscriptions[cacheKey] = ®isteredSubscription{ + Type: event, + Callbacks: map[string]func(json.RawMessage) error{ + cbKey: callback, + }, + Subscription: response.Data[0], + } + + logger.Debug("Registered eventsub subscription") + + return func() { e.unregisterCallback(cacheKey, cbKey) }, nil +} + +func (e *EventSubClient) unregisterCallback(cacheKey, cbKey string) { + e.subscriptionsLock.RLock() + regSub, ok := e.subscriptions[cacheKey] + e.subscriptionsLock.RUnlock() + + if !ok { + // That subscription does not exist + log.WithField("cache_key", cacheKey).Debug("Subscription does not exist, not unregistering") + return + } + + if _, ok = regSub.Callbacks[cbKey]; !ok { + // That callback does not exist + log.WithFields(log.Fields{ + "cache_key": cacheKey, + "callback": cbKey, + }).Debug("Callback does not exist, not unregistering") + return + } + + logger := log.WithField("event", regSub.Type) + + delete(regSub.Callbacks, cbKey) + + if len(regSub.Callbacks) > 0 { + // Still callbacks registered, not removing the subscription + return + } + + accessToken, err := e.getTwitchAppAccessToken() + if err != nil { + log.WithError(err).Error("Unable to get access token") + return + } + + ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, fmt.Sprintf("https://api.twitch.tv/helix/eventsub/subscriptions?id=%s", regSub.Subscription.ID), nil) + if err != nil { + log.WithError(err).Error("Unable to create delete subscription request") + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Client-Id", e.twitchClientID) + req.Header.Set("Authorization", "Bearer "+accessToken) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.WithError(err).Error("Unable to execute delete subscription request") + return + } + defer resp.Body.Close() + + e.subscriptionsLock.Lock() + defer e.subscriptionsLock.Unlock() + + logger.Debug("Unregistered hook") + + delete(e.subscriptions, cacheKey) +} diff --git a/twitchWatcher.go b/twitchWatcher.go index d2f6dd1..04ad558 100644 --- a/twitchWatcher.go +++ b/twitchWatcher.go @@ -1,9 +1,11 @@ package main import ( + "encoding/json" "sync" "github.com/Luzifer/twitch-bot/plugins" + "github.com/Luzifer/twitch-bot/twitch" "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -13,6 +15,8 @@ type ( Category string IsLive bool Title string + + unregisterFunc func() } twitchWatcher struct { @@ -34,41 +38,50 @@ func newTwitchWatcher() *twitchWatcher { } } -func (r *twitchWatcher) AddChannel(channel string) error { - r.lock.RLock() - if _, ok := r.ChannelStatus[channel]; ok { - r.lock.RUnlock() +func (t *twitchWatcher) AddChannel(channel string) error { + t.lock.RLock() + _, ok := t.ChannelStatus[channel] + t.lock.RUnlock() + + if ok { return nil } - r.lock.RUnlock() - return r.updateChannelFromAPI(channel, false) + return t.updateChannelFromAPI(channel, false) } -func (r *twitchWatcher) Check() { +func (t *twitchWatcher) Check() { var channels []string - r.lock.RLock() - for c := range r.ChannelStatus { + t.lock.RLock() + for c := range t.ChannelStatus { + if t.ChannelStatus[c].unregisterFunc != nil { + continue + } + channels = append(channels, c) } - r.lock.RUnlock() + t.lock.RUnlock() for _, ch := range channels { - if err := r.updateChannelFromAPI(ch, true); err != nil { + if err := t.updateChannelFromAPI(ch, true); err != nil { log.WithError(err).WithField("channel", ch).Error("Unable to update channel status") } } } -func (r *twitchWatcher) RemoveChannel(channel string) error { - r.lock.Lock() - defer r.lock.Unlock() +func (t *twitchWatcher) RemoveChannel(channel string) error { + t.lock.Lock() + defer t.lock.Unlock() - delete(r.ChannelStatus, channel) + if f := t.ChannelStatus[channel].unregisterFunc; f != nil { + f() + } + + delete(t.ChannelStatus, channel) return nil } -func (r *twitchWatcher) updateChannelFromAPI(channel string, sendUpdate bool) error { +func (t *twitchWatcher) updateChannelFromAPI(channel string, sendUpdate bool) error { var ( err error status twitchChannelState @@ -84,53 +97,137 @@ func (r *twitchWatcher) updateChannelFromAPI(channel string, sendUpdate bool) er return errors.Wrap(err, "getting stream info") } - r.lock.Lock() - defer r.lock.Unlock() + t.lock.Lock() + defer t.lock.Unlock() - if r.ChannelStatus[channel] != nil && r.ChannelStatus[channel].Equals(status) { + if t.ChannelStatus[channel] != nil && t.ChannelStatus[channel].Equals(status) { return nil } - if sendUpdate && r.ChannelStatus[channel] != nil { - if r.ChannelStatus[channel].Category != status.Category { - log.WithFields(log.Fields{ - "channel": channel, - "category": status.Category, - }).Debug("Twitch metadata changed") - go handleMessage(ircHdl.Client(), nil, eventTypeTwitchCategoryUpdate, plugins.FieldCollection{ - "channel": channel, - "category": status.Category, - }) - } - - if r.ChannelStatus[channel].Title != status.Title { - log.WithFields(log.Fields{ - "channel": channel, - "title": status.Title, - }).Debug("Twitch metadata changed") - go handleMessage(ircHdl.Client(), nil, eventTypeTwitchTitleUpdate, plugins.FieldCollection{ - "channel": channel, - "title": status.Title, - }) - } - - if r.ChannelStatus[channel].IsLive != status.IsLive { - log.WithFields(log.Fields{ - "channel": channel, - "isLive": status.IsLive, - }).Debug("Twitch metadata changed") - - evt := eventTypeTwitchStreamOnline - if !status.IsLive { - evt = eventTypeTwitchStreamOffline - } - - go handleMessage(ircHdl.Client(), nil, evt, plugins.FieldCollection{ - "channel": channel, - }) - } + if sendUpdate && t.ChannelStatus[channel] != nil { + t.triggerUpdate(channel, &status.Title, &status.Category, &status.IsLive) + return nil } - r.ChannelStatus[channel] = &status + if status.unregisterFunc, err = t.registerEventSubCallbacks(channel); err != nil { + return errors.Wrap(err, "registering eventsub callbacks") + } + + t.ChannelStatus[channel] = &status return nil } + +func (t *twitchWatcher) registerEventSubCallbacks(channel string) (func(), error) { + if twitchEventSubClient == nil { + // We don't have eventsub functionality + return nil, nil + } + + userID, err := twitchClient.GetIDForUsername(channel) + if err != nil { + return nil, errors.Wrap(err, "resolving channel to user-id") + } + + unsubCU, err := twitchEventSubClient.RegisterEventSubHooks( + twitch.EventSubEventTypeChannelUpdate, + twitch.EventSubCondition{BroadcasterUserID: userID}, + func(m json.RawMessage) error { + var payload twitch.EventSubEventChannelUpdate + if err := json.Unmarshal(m, &payload); err != nil { + return errors.Wrap(err, "unmarshalling event") + } + + t.triggerUpdate(channel, &payload.Title, &payload.CategoryName, nil) + + return nil + }, + ) + if err != nil { + return nil, errors.Wrap(err, "registering channel-update eventsub") + } + + unsubSOff, err := twitchEventSubClient.RegisterEventSubHooks( + twitch.EventSubEventTypeStreamOffline, + twitch.EventSubCondition{BroadcasterUserID: userID}, + func(m json.RawMessage) error { + var payload twitch.EventSubEventStreamOffline + if err := json.Unmarshal(m, &payload); err != nil { + return errors.Wrap(err, "unmarshalling event") + } + + t.triggerUpdate(channel, nil, nil, func(v bool) *bool { return &v }(false)) + + return nil + }, + ) + if err != nil { + return nil, errors.Wrap(err, "registering channel-update eventsub") + } + + unsubSOn, err := twitchEventSubClient.RegisterEventSubHooks( + twitch.EventSubEventTypeStreamOnline, + twitch.EventSubCondition{BroadcasterUserID: userID}, + func(m json.RawMessage) error { + var payload twitch.EventSubEventStreamOnline + if err := json.Unmarshal(m, &payload); err != nil { + return errors.Wrap(err, "unmarshalling event") + } + + t.triggerUpdate(channel, nil, nil, func(v bool) *bool { return &v }(true)) + + return nil + }, + ) + if err != nil { + return nil, errors.Wrap(err, "registering channel-update eventsub") + } + + return func() { + unsubCU() + unsubSOff() + unsubSOn() + }, nil +} + +func (t *twitchWatcher) triggerUpdate(channel string, title, category *string, online *bool) { + if category != nil && t.ChannelStatus[channel].Category != *category { + t.ChannelStatus[channel].Category = *category + log.WithFields(log.Fields{ + "channel": channel, + "category": *category, + }).Debug("Twitch metadata changed") + go handleMessage(ircHdl.Client(), nil, eventTypeTwitchCategoryUpdate, plugins.FieldCollection{ + "channel": channel, + "category": *category, + }) + } + + if title != nil && t.ChannelStatus[channel].Title != *title { + t.ChannelStatus[channel].Title = *title + log.WithFields(log.Fields{ + "channel": channel, + "title": *title, + }).Debug("Twitch metadata changed") + go handleMessage(ircHdl.Client(), nil, eventTypeTwitchTitleUpdate, plugins.FieldCollection{ + "channel": channel, + "title": *title, + }) + } + + if online != nil && t.ChannelStatus[channel].IsLive != *online { + t.ChannelStatus[channel].IsLive = *online + log.WithFields(log.Fields{ + "channel": channel, + "isLive": *online, + }).Debug("Twitch metadata changed") + + evt := eventTypeTwitchStreamOnline + if !*online { + evt = eventTypeTwitchStreamOffline + } + + go handleMessage(ircHdl.Client(), nil, evt, plugins.FieldCollection{ + "channel": channel, + }) + } +}