mirror of
https://github.com/Luzifer/twitch-manager.git
synced 2024-12-20 20:01:18 +00:00
Switch to eventsub after webhook deprecation
Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
parent
10a98e4e70
commit
dd4203c187
5 changed files with 336 additions and 129 deletions
1
api.go
1
api.go
|
@ -140,6 +140,7 @@ func registerAPI(r *mux.Router) {
|
|||
r.HandleFunc("/api/follows/set-last/{name}", handleSetLastFollower).Methods(http.MethodPut)
|
||||
r.HandleFunc("/api/subscribe", handleUpdateSocket).Methods(http.MethodGet)
|
||||
r.HandleFunc("/api/webhook/{type}", handleWebHookPush)
|
||||
r.HandleFunc("/api/eventsys", handleEventsubPush)
|
||||
}
|
||||
|
||||
func handleCustomAlert(w http.ResponseWriter, r *http.Request) {
|
||||
|
|
281
eventsub.go
Normal file
281
eventsub.go
Normal file
|
@ -0,0 +1,281 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Luzifer/go_helpers/v2/str"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
eventSubHeaderMessageID = "Twitch-Eventsub-Message-Id"
|
||||
eventSubHeaderMessageRetry = "Twitch-Eventsub-Message-Retry"
|
||||
eventSubHeaderMessageType = "Twitch-Eventsub-Message-Type"
|
||||
eventSubHeaderMessageSignature = "Twitch-Eventsub-Message-Signature"
|
||||
eventSubHeaderMessageTimestamp = "Twitch-Eventsub-Message-Timestamp"
|
||||
eventSubHeaderSubscriptionType = "Twitch-Eventsub-Subscription-Type"
|
||||
eventSubHeaderSubscriptionVersion = "Twitch-Eventsub-Subscription-Version"
|
||||
|
||||
eventSubMessageTypeNotification = "notification"
|
||||
eventSubMessageTypeVerification = "webhook_callback_verification"
|
||||
eventSubMessageTypeRevokation = "revocation"
|
||||
|
||||
eventSubStatusAuthorizationRevoked = "authorization_revoked"
|
||||
eventSubStatusEnabled = "enabled"
|
||||
eventSubStatusFailuresExceeded = "notification_failures_exceeded"
|
||||
eventSubStatusUserRemoved = "user_removed"
|
||||
eventSubStatusVerificationFailed = "webhook_callback_verification_failed"
|
||||
eventSubStatusVerificationPending = "webhook_callback_verification_pending"
|
||||
)
|
||||
|
||||
type (
|
||||
eventSubCondition struct {
|
||||
BroadcasterUserID string `json:"broadcaster_user_id,omitempty"`
|
||||
}
|
||||
eventSubEventFollow struct {
|
||||
UserID string `json:"user_id"`
|
||||
UserLogin string `json:"user_login"`
|
||||
UserName string `json:"user_name"`
|
||||
BroadcasterUserID string `json:"broadcaster_user_id"`
|
||||
BroadcasterUserLogin string `json:"broadcaster_user_login"`
|
||||
BroadcasterUserName string `json:"broadcaster_user_name"`
|
||||
FollowedAt time.Time `json:"followed_at"`
|
||||
}
|
||||
eventSubPostMessage struct {
|
||||
Challenge string `json:"challenge"`
|
||||
Subscription eventSubSubscription `json:"subscription"`
|
||||
Event json.RawMessage `json:"event"`
|
||||
}
|
||||
eventSubSubscription struct {
|
||||
ID string `json:"id,omitempty"` // READONLY
|
||||
Status string `json:"status,omitempty"` // READONLY
|
||||
Type string `json:"type"`
|
||||
Version string `json:"version"`
|
||||
Cost int64 `json:"cost,omitempty"` // READONLY
|
||||
Condition eventSubCondition `json:"condition"`
|
||||
Transport eventSubTransport `json:"transport"`
|
||||
CreatedAt time.Time `json:"created_at,omitempty"` // READONLY
|
||||
}
|
||||
eventSubTransport struct {
|
||||
Method string `json:"method"`
|
||||
Callback string `json:"callback"`
|
||||
Secret string `json:"secret"`
|
||||
}
|
||||
)
|
||||
|
||||
func handleEventsubPush(w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
body = new(bytes.Buffer)
|
||||
message eventSubPostMessage
|
||||
signature = r.Header.Get(eventSubHeaderMessageSignature)
|
||||
)
|
||||
|
||||
// Copy body for duplicate processing
|
||||
if _, err := io.Copy(body, r.Body); err != nil {
|
||||
log.WithError(err).Error("Unable to read hook body")
|
||||
return
|
||||
}
|
||||
|
||||
// Verify signature
|
||||
mac := hmac.New(sha256.New, []byte(cfg.WebHookSecret))
|
||||
fmt.Fprintf(mac, "%s%s%s", r.Header.Get(eventSubHeaderMessageID), r.Header.Get(eventSubHeaderMessageTimestamp), body.Bytes())
|
||||
if cSig := fmt.Sprintf("sha256=%x", mac.Sum(nil)); cSig != signature {
|
||||
log.Errorf("Got message signature %s, expected %s", signature, cSig)
|
||||
http.Error(w, "Signature verification failed", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
// Read message
|
||||
if err := json.NewDecoder(body).Decode(&message); err != nil {
|
||||
log.WithError(err).Errorf("Unable to decode eventsys message")
|
||||
http.Error(w, errors.Wrap(err, "parsing message").Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
logger := log.WithField("type", message.Subscription.Type)
|
||||
|
||||
// If we got a verification request, respond with the challenge
|
||||
switch r.Header.Get(eventSubHeaderMessageType) {
|
||||
case eventSubMessageTypeRevokation:
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
|
||||
case eventSubMessageTypeVerification:
|
||||
log.WithFields(log.Fields{
|
||||
"type": message.Subscription.Type,
|
||||
}).Debug("Confirming eventsub subscription")
|
||||
w.Write([]byte(message.Challenge))
|
||||
return
|
||||
}
|
||||
|
||||
switch message.Subscription.Type {
|
||||
case "channel.follow":
|
||||
var evt eventSubEventFollow
|
||||
if err := json.Unmarshal(message.Event, &evt); err != nil {
|
||||
log.WithError(err).Errorf("Unable to decode eventsys event payload")
|
||||
http.Error(w, errors.Wrap(err, "parsing message").Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
logger = logger.WithField("name", evt.UserLogin)
|
||||
|
||||
var isKnown bool
|
||||
store.WithModRLock(func() error {
|
||||
isKnown = str.StringInSlice(evt.UserLogin, store.Followers.Seen)
|
||||
return nil
|
||||
})
|
||||
|
||||
if isKnown {
|
||||
logger.WithField("name", evt.UserLogin).Debug("New follower already known, skipping")
|
||||
return
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"from": evt.UserLogin,
|
||||
"followed_at": evt.FollowedAt,
|
||||
}
|
||||
|
||||
if err := subscriptions.SendAllSockets(msgTypeFollow, fields, false, true); err != nil {
|
||||
log.WithError(err).Error("Unable to send update to all sockets")
|
||||
}
|
||||
|
||||
logger.Info("New follower announced")
|
||||
store.WithModLock(func() error {
|
||||
store.Followers.Last = &evt.UserLogin
|
||||
store.Followers.Count++
|
||||
store.Followers.Seen = append([]string{evt.UserLogin}, store.Followers.Seen...)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
default:
|
||||
logger.Warn("Received unexpected webhook request")
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
if err := store.Save(cfg.StoreFile); err != nil {
|
||||
logger.WithError(err).Error("Unable to update persistent store")
|
||||
}
|
||||
|
||||
if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store, false, false) }); err != nil {
|
||||
logger.WithError(err).Error("Unable to send update to all sockets")
|
||||
}
|
||||
}
|
||||
|
||||
func registerEventSubHooks() error {
|
||||
hookURL := strings.Join([]string{
|
||||
strings.TrimRight(cfg.BaseURL, "/"),
|
||||
"api", "eventsys",
|
||||
}, "/")
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
|
||||
defer cancel()
|
||||
accessToken, err := getTwitchAppAccessToken(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getting app-access-token")
|
||||
}
|
||||
|
||||
// List existing subscriptions
|
||||
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https://api.twitch.tv/helix/eventsub/subscriptions", nil)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Client-Id", cfg.TwitchClient)
|
||||
req.Header.Set("Authorization", "Bearer "+accessToken)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "requesting subscribscriptions")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var subscriptionList struct {
|
||||
Data []eventSubSubscription
|
||||
}
|
||||
|
||||
if err = json.NewDecoder(resp.Body).Decode(&subscriptionList); err != nil {
|
||||
return errors.Wrap(err, "decoding subscription list")
|
||||
}
|
||||
|
||||
// Register subscriptions
|
||||
for _, event := range []string{
|
||||
"channel.follow",
|
||||
} {
|
||||
var (
|
||||
logger = log.WithField("event", event)
|
||||
subscriptionExists bool
|
||||
)
|
||||
|
||||
for _, sub := range subscriptionList.Data {
|
||||
if str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}) && sub.Transport.Callback == hookURL && sub.Type == event {
|
||||
logger = logger.WithFields(log.Fields{
|
||||
"id": sub.ID,
|
||||
"status": sub.Status,
|
||||
})
|
||||
subscriptionExists = true
|
||||
}
|
||||
}
|
||||
|
||||
if subscriptionExists {
|
||||
logger.WithField("event", event).Debug("Not registering hook, already active")
|
||||
continue
|
||||
}
|
||||
|
||||
payload := eventSubSubscription{
|
||||
Type: event,
|
||||
Version: "1",
|
||||
Condition: eventSubCondition{
|
||||
BroadcasterUserID: cfg.TwitchID,
|
||||
},
|
||||
Transport: eventSubTransport{
|
||||
Method: "webhook",
|
||||
Callback: hookURL,
|
||||
Secret: cfg.WebHookSecret,
|
||||
},
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if err := json.NewEncoder(buf).Encode(payload); err != nil {
|
||||
return errors.Wrap(err, "assemble subscribe payload")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://api.twitch.tv/helix/eventsub/subscriptions", buf)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "creating subscribe request")
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Client-Id", cfg.TwitchClient)
|
||||
req.Header.Set("Authorization", "Bearer "+accessToken)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "requesting subscribe")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode)
|
||||
}
|
||||
return errors.Errorf("unexpected status %d: %s", resp.StatusCode, body)
|
||||
}
|
||||
|
||||
logger.Debug("Registered eventsys subscription")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
10
main.go
10
main.go
|
@ -24,12 +24,12 @@ var (
|
|||
LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"`
|
||||
StoreFile string `flag:"store-file" default:"store.json.gz" description:"File to store the state to"`
|
||||
TwitchClient string `flag:"twitch-client" default:"" description:"Client ID to act as" validate:"nonzero"`
|
||||
TwitchSecret string `flag:"twitch-secret" default:"" description:"Secret to the given Client ID" validate:"nonzero"`
|
||||
TwitchID string `flag:"twitch-id" default:"" description:"ID of the user of the overlay" validate:"nonzero"`
|
||||
TwitchToken string `flag:"twitch-token" default:"" description:"OAuth token valid for client"`
|
||||
UpdateFromAPIInterval time.Duration `flag:"update-from-api-interval" default:"10m" description:"How often to ask the API for real values"`
|
||||
VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"`
|
||||
WebHookSecret string `flag:"webhook-secret" default:"" description:"Secret to use for HMAC hashing of webhook payload"`
|
||||
WebHookTimeout time.Duration `flag:"webhook-timeout" default:"15m" description:"When to re-register the webhooks"`
|
||||
}{}
|
||||
|
||||
store *storage
|
||||
|
@ -94,7 +94,7 @@ func main() {
|
|||
}
|
||||
}()
|
||||
|
||||
if err = registerWebHooks(); err != nil {
|
||||
if err = registerEventSubHooks(); err != nil {
|
||||
log.WithError(err).Fatal("Unable to register webhooks")
|
||||
}
|
||||
|
||||
|
@ -105,7 +105,6 @@ func main() {
|
|||
timerAssetCheck = time.NewTicker(cfg.AssetCheckInterval)
|
||||
timerForceSync = time.NewTicker(cfg.ForceSyncInterval)
|
||||
timerUpdateFromAPI = time.NewTicker(cfg.UpdateFromAPIInterval)
|
||||
timerWebhookRegister = time.NewTicker(cfg.WebHookTimeout)
|
||||
)
|
||||
|
||||
ircDisconnected <- struct{}{}
|
||||
|
@ -144,11 +143,6 @@ func main() {
|
|||
log.WithError(err).Error("Unable to update statistics from API")
|
||||
}
|
||||
|
||||
case <-timerWebhookRegister.C:
|
||||
if err := registerWebHooks(); err != nil {
|
||||
log.WithError(err).Fatal("Unable to re-register webhooks")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
49
twitch.go
Normal file
49
twitch.go
Normal file
|
@ -0,0 +1,49 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func getTwitchAppAccessToken(ctx context.Context) (string, error) {
|
||||
var rData struct {
|
||||
AccessToken string `json:"access_token"`
|
||||
RefreshToken string `json:"refresh_token"`
|
||||
ExpiresIn int `json:"expires_in"`
|
||||
Scope []interface{} `json:"scope"`
|
||||
TokenType string `json:"token_type"`
|
||||
}
|
||||
|
||||
params := make(url.Values)
|
||||
params.Set("client_id", cfg.TwitchClient)
|
||||
params.Set("client_secret", cfg.TwitchSecret)
|
||||
params.Set("grant_type", "client_credentials")
|
||||
|
||||
u, _ := url.Parse("https://id.twitch.tv/oauth2/token")
|
||||
u.RawQuery = params.Encode()
|
||||
|
||||
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil)
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "fetching response")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "unexpected status %d and cannot read body", resp.StatusCode)
|
||||
}
|
||||
return "", errors.Errorf("unexpected status %d: %s", resp.StatusCode, body)
|
||||
}
|
||||
|
||||
return rData.AccessToken, errors.Wrap(
|
||||
json.NewDecoder(resp.Body).Decode(&rData),
|
||||
"decoding response",
|
||||
)
|
||||
}
|
118
webhook.go
118
webhook.go
|
@ -2,23 +2,16 @@ package main
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/Luzifer/go_helpers/v2/str"
|
||||
)
|
||||
|
||||
const twitchRequestTimeout = 2 * time.Second
|
||||
|
@ -31,19 +24,6 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) {
|
|||
logger = log.WithField("type", hookType)
|
||||
)
|
||||
|
||||
// When asked for a confirmation, just confirm it
|
||||
if challengeToken := r.URL.Query().Get("hub.challenge"); challengeToken != "" {
|
||||
logger.Debug("Confirming webhook subscription")
|
||||
w.Write([]byte(challengeToken))
|
||||
return
|
||||
}
|
||||
|
||||
// We're getting a reason for a denied subscription
|
||||
if reason := r.URL.Query().Get("hub.reason"); reason != "" {
|
||||
logger.WithField("reason", reason).Error("Webhook subscription was denied")
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
body = new(bytes.Buffer)
|
||||
signature = r.Header.Get("X-Hub-Signature")
|
||||
|
@ -92,52 +72,6 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) {
|
|||
return nil
|
||||
})
|
||||
|
||||
case "follow":
|
||||
var payload struct {
|
||||
Data []struct {
|
||||
FromName string `json:"from_name"`
|
||||
FollowedAt time.Time `json:"followed_at"`
|
||||
} `json:"data"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(body).Decode(&payload); err != nil {
|
||||
logger.WithError(err).Error("Unable to decode payload")
|
||||
return
|
||||
}
|
||||
|
||||
sort.Slice(payload.Data, func(i, j int) bool { return payload.Data[i].FollowedAt.Before(payload.Data[j].FollowedAt) })
|
||||
for _, f := range payload.Data {
|
||||
var isKnown bool
|
||||
|
||||
store.WithModRLock(func() error {
|
||||
isKnown = str.StringInSlice(f.FromName, store.Followers.Seen)
|
||||
return nil
|
||||
})
|
||||
|
||||
if isKnown {
|
||||
logger.WithField("name", f.FromName).Debug("New follower already known, skipping")
|
||||
continue
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"from": f.FromName,
|
||||
"followed_at": f.FollowedAt,
|
||||
}
|
||||
|
||||
if err := subscriptions.SendAllSockets(msgTypeFollow, fields, false, true); err != nil {
|
||||
log.WithError(err).Error("Unable to send update to all sockets")
|
||||
}
|
||||
|
||||
logger.WithField("name", f.FromName).Info("New follower announced")
|
||||
store.WithModLock(func() error {
|
||||
store.Followers.Last = &f.FromName
|
||||
store.Followers.Count++
|
||||
store.Followers.Seen = append([]string{f.FromName}, store.Followers.Seen...)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
default:
|
||||
log.WithField("type", hookType).Warn("Received unexpected webhook request")
|
||||
return
|
||||
|
@ -151,55 +85,3 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) {
|
|||
logger.WithError(err).Error("Unable to send update to all sockets")
|
||||
}
|
||||
}
|
||||
|
||||
func registerWebHooks() error {
|
||||
hookURL := func(hookType string) string {
|
||||
return strings.Join([]string{
|
||||
strings.TrimRight(cfg.BaseURL, "/"),
|
||||
"api", "webhook",
|
||||
hookType,
|
||||
}, "/")
|
||||
}
|
||||
|
||||
for uri, topic := range map[string]string{
|
||||
hookURL("follow"): fmt.Sprintf("https://api.twitch.tv/helix/users/follows?first=1&to_id=%s", cfg.TwitchID),
|
||||
} {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if err := json.NewEncoder(buf).Encode(map[string]interface{}{
|
||||
"hub.callback": uri,
|
||||
"hub.mode": "subscribe",
|
||||
"hub.topic": topic,
|
||||
"hub.lease_seconds": int64((cfg.WebHookTimeout + twitchRequestTimeout) / time.Second),
|
||||
"hub.secret": cfg.WebHookSecret,
|
||||
}); err != nil {
|
||||
return errors.Wrap(err, "assemble subscribe payload")
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://api.twitch.tv/helix/webhooks/hub", buf)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "assemble subscribe request")
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Client-Id", cfg.TwitchClient)
|
||||
req.Header.Set("Authorization", "Bearer "+cfg.TwitchToken)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "requesting subscribe")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode)
|
||||
}
|
||||
return errors.Errorf("unexpected status %d: %s", resp.StatusCode, body)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue