diff --git a/api.go b/api.go index f090cdb..63d8630 100644 --- a/api.go +++ b/api.go @@ -140,6 +140,7 @@ func registerAPI(r *mux.Router) { r.HandleFunc("/api/follows/set-last/{name}", handleSetLastFollower).Methods(http.MethodPut) r.HandleFunc("/api/subscribe", handleUpdateSocket).Methods(http.MethodGet) r.HandleFunc("/api/webhook/{type}", handleWebHookPush) + r.HandleFunc("/api/eventsys", handleEventsubPush) } func handleCustomAlert(w http.ResponseWriter, r *http.Request) { diff --git a/eventsub.go b/eventsub.go new file mode 100644 index 0000000..b0a8d53 --- /dev/null +++ b/eventsub.go @@ -0,0 +1,281 @@ +package main + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "strings" + "time" + + "github.com/Luzifer/go_helpers/v2/str" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +const ( + eventSubHeaderMessageID = "Twitch-Eventsub-Message-Id" + eventSubHeaderMessageRetry = "Twitch-Eventsub-Message-Retry" + eventSubHeaderMessageType = "Twitch-Eventsub-Message-Type" + eventSubHeaderMessageSignature = "Twitch-Eventsub-Message-Signature" + eventSubHeaderMessageTimestamp = "Twitch-Eventsub-Message-Timestamp" + eventSubHeaderSubscriptionType = "Twitch-Eventsub-Subscription-Type" + eventSubHeaderSubscriptionVersion = "Twitch-Eventsub-Subscription-Version" + + eventSubMessageTypeNotification = "notification" + eventSubMessageTypeVerification = "webhook_callback_verification" + eventSubMessageTypeRevokation = "revocation" + + eventSubStatusAuthorizationRevoked = "authorization_revoked" + eventSubStatusEnabled = "enabled" + eventSubStatusFailuresExceeded = "notification_failures_exceeded" + eventSubStatusUserRemoved = "user_removed" + eventSubStatusVerificationFailed = "webhook_callback_verification_failed" + eventSubStatusVerificationPending = "webhook_callback_verification_pending" +) + +type ( + eventSubCondition struct { + BroadcasterUserID string `json:"broadcaster_user_id,omitempty"` + } + eventSubEventFollow struct { + UserID string `json:"user_id"` + UserLogin string `json:"user_login"` + UserName string `json:"user_name"` + BroadcasterUserID string `json:"broadcaster_user_id"` + BroadcasterUserLogin string `json:"broadcaster_user_login"` + BroadcasterUserName string `json:"broadcaster_user_name"` + FollowedAt time.Time `json:"followed_at"` + } + eventSubPostMessage struct { + Challenge string `json:"challenge"` + Subscription eventSubSubscription `json:"subscription"` + Event json.RawMessage `json:"event"` + } + eventSubSubscription struct { + ID string `json:"id,omitempty"` // READONLY + Status string `json:"status,omitempty"` // READONLY + Type string `json:"type"` + Version string `json:"version"` + Cost int64 `json:"cost,omitempty"` // READONLY + Condition eventSubCondition `json:"condition"` + Transport eventSubTransport `json:"transport"` + CreatedAt time.Time `json:"created_at,omitempty"` // READONLY + } + eventSubTransport struct { + Method string `json:"method"` + Callback string `json:"callback"` + Secret string `json:"secret"` + } +) + +func handleEventsubPush(w http.ResponseWriter, r *http.Request) { + var ( + body = new(bytes.Buffer) + message eventSubPostMessage + signature = r.Header.Get(eventSubHeaderMessageSignature) + ) + + // Copy body for duplicate processing + if _, err := io.Copy(body, r.Body); err != nil { + log.WithError(err).Error("Unable to read hook body") + return + } + + // Verify signature + mac := hmac.New(sha256.New, []byte(cfg.WebHookSecret)) + fmt.Fprintf(mac, "%s%s%s", r.Header.Get(eventSubHeaderMessageID), r.Header.Get(eventSubHeaderMessageTimestamp), body.Bytes()) + if cSig := fmt.Sprintf("sha256=%x", mac.Sum(nil)); cSig != signature { + log.Errorf("Got message signature %s, expected %s", signature, cSig) + http.Error(w, "Signature verification failed", http.StatusUnauthorized) + return + } + + // Read message + if err := json.NewDecoder(body).Decode(&message); err != nil { + log.WithError(err).Errorf("Unable to decode eventsys message") + http.Error(w, errors.Wrap(err, "parsing message").Error(), http.StatusBadRequest) + return + } + + logger := log.WithField("type", message.Subscription.Type) + + // If we got a verification request, respond with the challenge + switch r.Header.Get(eventSubHeaderMessageType) { + case eventSubMessageTypeRevokation: + w.WriteHeader(http.StatusNoContent) + return + + case eventSubMessageTypeVerification: + log.WithFields(log.Fields{ + "type": message.Subscription.Type, + }).Debug("Confirming eventsub subscription") + w.Write([]byte(message.Challenge)) + return + } + + switch message.Subscription.Type { + case "channel.follow": + var evt eventSubEventFollow + if err := json.Unmarshal(message.Event, &evt); err != nil { + log.WithError(err).Errorf("Unable to decode eventsys event payload") + http.Error(w, errors.Wrap(err, "parsing message").Error(), http.StatusBadRequest) + return + } + + logger = logger.WithField("name", evt.UserLogin) + + var isKnown bool + store.WithModRLock(func() error { + isKnown = str.StringInSlice(evt.UserLogin, store.Followers.Seen) + return nil + }) + + if isKnown { + logger.WithField("name", evt.UserLogin).Debug("New follower already known, skipping") + return + } + + fields := map[string]interface{}{ + "from": evt.UserLogin, + "followed_at": evt.FollowedAt, + } + + if err := subscriptions.SendAllSockets(msgTypeFollow, fields, false, true); err != nil { + log.WithError(err).Error("Unable to send update to all sockets") + } + + logger.Info("New follower announced") + store.WithModLock(func() error { + store.Followers.Last = &evt.UserLogin + store.Followers.Count++ + store.Followers.Seen = append([]string{evt.UserLogin}, store.Followers.Seen...) + + return nil + }) + + default: + logger.Warn("Received unexpected webhook request") + return + + } + + if err := store.Save(cfg.StoreFile); err != nil { + logger.WithError(err).Error("Unable to update persistent store") + } + + if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store, false, false) }); err != nil { + logger.WithError(err).Error("Unable to send update to all sockets") + } +} + +func registerEventSubHooks() error { + hookURL := strings.Join([]string{ + strings.TrimRight(cfg.BaseURL, "/"), + "api", "eventsys", + }, "/") + + ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) + defer cancel() + accessToken, err := getTwitchAppAccessToken(ctx) + if err != nil { + return errors.Wrap(err, "getting app-access-token") + } + + // List existing subscriptions + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https://api.twitch.tv/helix/eventsub/subscriptions", nil) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Client-Id", cfg.TwitchClient) + req.Header.Set("Authorization", "Bearer "+accessToken) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return errors.Wrap(err, "requesting subscribscriptions") + } + defer resp.Body.Close() + + var subscriptionList struct { + Data []eventSubSubscription + } + + if err = json.NewDecoder(resp.Body).Decode(&subscriptionList); err != nil { + return errors.Wrap(err, "decoding subscription list") + } + + // Register subscriptions + for _, event := range []string{ + "channel.follow", + } { + var ( + logger = log.WithField("event", event) + subscriptionExists bool + ) + + for _, sub := range subscriptionList.Data { + if str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}) && sub.Transport.Callback == hookURL && sub.Type == event { + logger = logger.WithFields(log.Fields{ + "id": sub.ID, + "status": sub.Status, + }) + subscriptionExists = true + } + } + + if subscriptionExists { + logger.WithField("event", event).Debug("Not registering hook, already active") + continue + } + + payload := eventSubSubscription{ + Type: event, + Version: "1", + Condition: eventSubCondition{ + BroadcasterUserID: cfg.TwitchID, + }, + Transport: eventSubTransport{ + Method: "webhook", + Callback: hookURL, + Secret: cfg.WebHookSecret, + }, + } + + buf := new(bytes.Buffer) + if err := json.NewEncoder(buf).Encode(payload); err != nil { + return errors.Wrap(err, "assemble subscribe payload") + } + + ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://api.twitch.tv/helix/eventsub/subscriptions", buf) + if err != nil { + return errors.Wrap(err, "creating subscribe request") + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Client-Id", cfg.TwitchClient) + req.Header.Set("Authorization", "Bearer "+accessToken) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return errors.Wrap(err, "requesting subscribe") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusAccepted { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode) + } + return errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) + } + + logger.Debug("Registered eventsys subscription") + } + + return nil +} diff --git a/main.go b/main.go index dcfa81b..f7a3471 100644 --- a/main.go +++ b/main.go @@ -24,12 +24,12 @@ var ( LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` StoreFile string `flag:"store-file" default:"store.json.gz" description:"File to store the state to"` TwitchClient string `flag:"twitch-client" default:"" description:"Client ID to act as" validate:"nonzero"` + TwitchSecret string `flag:"twitch-secret" default:"" description:"Secret to the given Client ID" validate:"nonzero"` TwitchID string `flag:"twitch-id" default:"" description:"ID of the user of the overlay" validate:"nonzero"` TwitchToken string `flag:"twitch-token" default:"" description:"OAuth token valid for client"` UpdateFromAPIInterval time.Duration `flag:"update-from-api-interval" default:"10m" description:"How often to ask the API for real values"` VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` WebHookSecret string `flag:"webhook-secret" default:"" description:"Secret to use for HMAC hashing of webhook payload"` - WebHookTimeout time.Duration `flag:"webhook-timeout" default:"15m" description:"When to re-register the webhooks"` }{} store *storage @@ -94,7 +94,7 @@ func main() { } }() - if err = registerWebHooks(); err != nil { + if err = registerEventSubHooks(); err != nil { log.WithError(err).Fatal("Unable to register webhooks") } @@ -102,10 +102,9 @@ func main() { irc *ircHandler ircDisconnected = make(chan struct{}, 1) - timerAssetCheck = time.NewTicker(cfg.AssetCheckInterval) - timerForceSync = time.NewTicker(cfg.ForceSyncInterval) - timerUpdateFromAPI = time.NewTicker(cfg.UpdateFromAPIInterval) - timerWebhookRegister = time.NewTicker(cfg.WebHookTimeout) + timerAssetCheck = time.NewTicker(cfg.AssetCheckInterval) + timerForceSync = time.NewTicker(cfg.ForceSyncInterval) + timerUpdateFromAPI = time.NewTicker(cfg.UpdateFromAPIInterval) ) ircDisconnected <- struct{}{} @@ -144,11 +143,6 @@ func main() { log.WithError(err).Error("Unable to update statistics from API") } - case <-timerWebhookRegister.C: - if err := registerWebHooks(); err != nil { - log.WithError(err).Fatal("Unable to re-register webhooks") - } - } } } diff --git a/twitch.go b/twitch.go new file mode 100644 index 0000000..5e98ee4 --- /dev/null +++ b/twitch.go @@ -0,0 +1,49 @@ +package main + +import ( + "context" + "encoding/json" + "io/ioutil" + "net/http" + "net/url" + + "github.com/pkg/errors" +) + +func getTwitchAppAccessToken(ctx context.Context) (string, error) { + var rData struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` + ExpiresIn int `json:"expires_in"` + Scope []interface{} `json:"scope"` + TokenType string `json:"token_type"` + } + + params := make(url.Values) + params.Set("client_id", cfg.TwitchClient) + params.Set("client_secret", cfg.TwitchSecret) + params.Set("grant_type", "client_credentials") + + u, _ := url.Parse("https://id.twitch.tv/oauth2/token") + u.RawQuery = params.Encode() + + req, _ := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", errors.Wrap(err, "fetching response") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", errors.Wrapf(err, "unexpected status %d and cannot read body", resp.StatusCode) + } + return "", errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) + } + + return rData.AccessToken, errors.Wrap( + json.NewDecoder(resp.Body).Decode(&rData), + "decoding response", + ) +} diff --git a/webhook.go b/webhook.go index 51faf1a..e61a77d 100644 --- a/webhook.go +++ b/webhook.go @@ -2,23 +2,16 @@ package main import ( "bytes" - "context" "crypto/hmac" "crypto/sha256" "encoding/json" "fmt" "io" - "io/ioutil" "net/http" - "sort" - "strings" "time" "github.com/gorilla/mux" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" - - "github.com/Luzifer/go_helpers/v2/str" ) const twitchRequestTimeout = 2 * time.Second @@ -31,19 +24,6 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) { logger = log.WithField("type", hookType) ) - // When asked for a confirmation, just confirm it - if challengeToken := r.URL.Query().Get("hub.challenge"); challengeToken != "" { - logger.Debug("Confirming webhook subscription") - w.Write([]byte(challengeToken)) - return - } - - // We're getting a reason for a denied subscription - if reason := r.URL.Query().Get("hub.reason"); reason != "" { - logger.WithField("reason", reason).Error("Webhook subscription was denied") - return - } - var ( body = new(bytes.Buffer) signature = r.Header.Get("X-Hub-Signature") @@ -92,52 +72,6 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) { return nil }) - case "follow": - var payload struct { - Data []struct { - FromName string `json:"from_name"` - FollowedAt time.Time `json:"followed_at"` - } `json:"data"` - } - - if err := json.NewDecoder(body).Decode(&payload); err != nil { - logger.WithError(err).Error("Unable to decode payload") - return - } - - sort.Slice(payload.Data, func(i, j int) bool { return payload.Data[i].FollowedAt.Before(payload.Data[j].FollowedAt) }) - for _, f := range payload.Data { - var isKnown bool - - store.WithModRLock(func() error { - isKnown = str.StringInSlice(f.FromName, store.Followers.Seen) - return nil - }) - - if isKnown { - logger.WithField("name", f.FromName).Debug("New follower already known, skipping") - continue - } - - fields := map[string]interface{}{ - "from": f.FromName, - "followed_at": f.FollowedAt, - } - - if err := subscriptions.SendAllSockets(msgTypeFollow, fields, false, true); err != nil { - log.WithError(err).Error("Unable to send update to all sockets") - } - - logger.WithField("name", f.FromName).Info("New follower announced") - store.WithModLock(func() error { - store.Followers.Last = &f.FromName - store.Followers.Count++ - store.Followers.Seen = append([]string{f.FromName}, store.Followers.Seen...) - - return nil - }) - } - default: log.WithField("type", hookType).Warn("Received unexpected webhook request") return @@ -151,55 +85,3 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) { logger.WithError(err).Error("Unable to send update to all sockets") } } - -func registerWebHooks() error { - hookURL := func(hookType string) string { - return strings.Join([]string{ - strings.TrimRight(cfg.BaseURL, "/"), - "api", "webhook", - hookType, - }, "/") - } - - for uri, topic := range map[string]string{ - hookURL("follow"): fmt.Sprintf("https://api.twitch.tv/helix/users/follows?first=1&to_id=%s", cfg.TwitchID), - } { - ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) - defer cancel() - - buf := new(bytes.Buffer) - if err := json.NewEncoder(buf).Encode(map[string]interface{}{ - "hub.callback": uri, - "hub.mode": "subscribe", - "hub.topic": topic, - "hub.lease_seconds": int64((cfg.WebHookTimeout + twitchRequestTimeout) / time.Second), - "hub.secret": cfg.WebHookSecret, - }); err != nil { - return errors.Wrap(err, "assemble subscribe payload") - } - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://api.twitch.tv/helix/webhooks/hub", buf) - if err != nil { - return errors.Wrap(err, "assemble subscribe request") - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Client-Id", cfg.TwitchClient) - req.Header.Set("Authorization", "Bearer "+cfg.TwitchToken) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return errors.Wrap(err, "requesting subscribe") - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusAccepted { - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode) - } - return errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) - } - } - - return nil -}