From dea42ee207de0066dc90ad1ab3e93d855f888983 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Fri, 20 Nov 2020 22:51:10 +0100 Subject: [PATCH] Initial version with followers overlay --- .gitignore | 3 + api.go | 117 ++++++++++++++++++++++++++++++++++++++ app.js | 93 +++++++++++++++++++++++++++++++ go.mod | 13 +++++ go.sum | 31 +++++++++++ main.go | 98 ++++++++++++++++++++++++++++++++ overlay.html | 62 +++++++++++++++++++++ stats.go | 83 +++++++++++++++++++++++++++ storage.go | 79 ++++++++++++++++++++++++++ webhook.go | 155 +++++++++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 734 insertions(+) create mode 100644 .gitignore create mode 100644 api.go create mode 100644 app.js create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 overlay.html create mode 100644 stats.go create mode 100644 storage.go create mode 100644 webhook.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..391192f --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.env +store.json.gz +twitch-manager diff --git a/api.go b/api.go new file mode 100644 index 0000000..533aa47 --- /dev/null +++ b/api.go @@ -0,0 +1,117 @@ +package main + +import ( + "net/http" + "sync" + "time" + + "github.com/gofrs/uuid" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" +) + +var ( + socketSubscriptions = map[string]func(msg interface{}) error{} + socketSubscriptionsLock = new(sync.RWMutex) +) + +func sendAllSockets(msg interface{}) error { + socketSubscriptionsLock.RLock() + defer socketSubscriptionsLock.RUnlock() + + for _, hdl := range socketSubscriptions { + if err := hdl(msg); err != nil { + return errors.Wrap(err, "submit message") + } + } + + return nil +} + +func subscribeSocket(id string, hdl func(interface{}) error) { + socketSubscriptionsLock.Lock() + defer socketSubscriptionsLock.Unlock() + + socketSubscriptions[id] = hdl +} + +func unsubscribeSocket(id string) { + socketSubscriptionsLock.Lock() + defer socketSubscriptionsLock.Unlock() + + delete(socketSubscriptions, id) +} + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +func registerAPI(r *mux.Router) { + r.HandleFunc("/api/subscribe", handleUpdateSocket) + r.HandleFunc("/api/webhook/{type}", handleWebHookPush) +} + +func handleUpdateSocket(w http.ResponseWriter, r *http.Request) { + // Upgrade connection to socket + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.WithError(err).Error("Unable to upgrade socket") + return + } + defer conn.Close() + + // Register listener + id := uuid.Must(uuid.NewV4()).String() + subscribeSocket(id, conn.WriteJSON) + defer unsubscribeSocket(id) + + keepAlive := time.NewTicker(5 * time.Second) + defer keepAlive.Stop() + go func() { + for range keepAlive.C { + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { + log.WithError(err).Error("Unable to send ping message") + conn.Close() + } + } + }() + + if err := conn.WriteJSON(store); err != nil { + log.WithError(err).Error("Unable to send initial state") + return + } + + // Handle socket + for { + messageType, p, err := conn.ReadMessage() + if err != nil { + log.WithError(err).Error("Unable to read from socket") + return + } + + switch messageType { + case websocket.TextMessage: + // This is fine and expected + + case websocket.BinaryMessage: + // Wat? + log.Warn("Got binary message from socket, disconnecting...") + return + + case websocket.CloseMessage: + // They want to go? Fine, have it that way! + return + + default: + log.Debug("Got unhandled message from socket: %d", messageType) + continue + } + + // FIXME: Do we need this? + _ = p + } +} diff --git a/app.js b/app.js new file mode 100644 index 0000000..5ac72e1 --- /dev/null +++ b/app.js @@ -0,0 +1,93 @@ +Vue.config.devtools = true +const app = new Vue({ + computed: { + clock() { + return moment(this.time).format('HH:mm:ss') + }, + + icons() { + const icons = [] + + if (!this.conn.avail) { + icons.push({ class: 'fas fa-ethernet text-warning' }) + } + + return icons + return [ + // { class: 'fab fa-discord' }, + // { class: 'fab fa-spotify' }, + // { class: 'fab fa-teamspeak' }, + ] + }, + + nextFollowers() { + return Math.ceil((this.store.followers.count + 1) / 25) * 25 + }, + + nextSubs() { + return Math.ceil((this.store.subs.count + 1) / 5) * 5 + }, + }, + + created() { + window.setInterval(() => { this.time = new Date() }, 1000) + this.startSocket() + }, + + data: { + conn: { + avail: false, + backoff: 100, + }, + store: {}, + socket: null, + time: new Date(), + }, + + el: '#app', + + methods: { + showAlert(title, text) { + this.$bvToast.toast(text, { + title, + toaster: 'b-toaster-top-right', + variant: 'primary', + }) + }, + + startSocket() { + if (this.socket) { + // Dispose old socket + this.socket.close() + this.socket = null + } + + let socketAddr = `${window.location.origin.replace(/^http/, 'ws')}/api/subscribe` + + this.socket = new WebSocket(socketAddr) + this.socket.onclose = () => { + this.conn.avail = false + this.conn.backoff = Math.min(this.conn.backoff * 1.25, 10000) + window.setTimeout(this.startSocket, this.conn.backoff) // Restart socket + } + this.socket.onmessage = evt => { + const data = JSON.parse(evt.data) + this.store = data + } + this.socket.onopen = evt => { + this.conn.avail = true + this.conn.backoff = 100 + } + }, + }, + + watch: { + 'store.followers.last'(to, from) { + if (!from || !to) { + // Initial load + return + } + this.showAlert('New Follower', `${to} just followed`) + }, + }, +}) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..68c0bc2 --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module github.com/Luzifer/twitch-manager + +go 1.15 + +require ( + github.com/Luzifer/go_helpers/v2 v2.11.0 + github.com/Luzifer/rconfig/v2 v2.2.1 + github.com/gofrs/uuid v3.3.0+incompatible + github.com/gorilla/mux v1.8.0 + github.com/gorilla/websocket v1.4.2 + github.com/pkg/errors v0.9.1 + github.com/sirupsen/logrus v1.7.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e068fb4 --- /dev/null +++ b/go.sum @@ -0,0 +1,31 @@ +github.com/Luzifer/go_helpers v1.4.0 h1:Pmm058SbYewfnpP1CHda/zERoAqYoZFiBHF4l8k03Ko= +github.com/Luzifer/go_helpers/v2 v2.11.0 h1:IEVuDEAq2st1sjQNaaTX8TxZ2LsXP0qGeqb2uzYZCIo= +github.com/Luzifer/go_helpers/v2 v2.11.0/go.mod h1:ZnWxPjyCdQ4rZP3kNiMSUW/7FigU1X9Rz8XopdJ5ZCU= +github.com/Luzifer/rconfig v1.2.0 h1:waD1sqasGVSQSrExpLrQ9Q1JmMaltrS391VdOjWXP/I= +github.com/Luzifer/rconfig/v2 v2.2.1 h1:zcDdLQlnlzwcBJ8E0WFzOkQE1pCMn3EbX0dFYkeTczg= +github.com/Luzifer/rconfig/v2 v2.2.1/go.mod h1:OKIX0/JRZrPJ/ZXXWklQEFXA6tBfWaljZbW37w+sqBw= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gofrs/uuid v1.2.0 h1:coDhrjgyJaglxSjxuJdqQSSdUpG3w6p1OwN2od6frBU= +github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6rR7UHz84= +github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf/go.mod h1:hyb9oH7vZsitZCiBt0ZvifOrB+qc8PS5IiilCIb87rg= +github.com/leekchan/gtf v0.0.0-20190214083521-5fba33c5b00b/go.mod h1:thNruaSwydMhkQ8dXzapABF9Sc1Tz08ZBcDdgott9RA= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM= +github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 h1:WB265cn5OpO+hK3pikC9hpP1zI/KTwmyMFKloW9eOVc= +gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go new file mode 100644 index 0000000..6110dbb --- /dev/null +++ b/main.go @@ -0,0 +1,98 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "path" + "time" + + "github.com/gofrs/uuid" + "github.com/gorilla/mux" + log "github.com/sirupsen/logrus" + + "github.com/Luzifer/rconfig/v2" +) + +var ( + cfg = struct { + AssetDir string `flag:"asset-dir" default:"." description:"Directory containing assets"` + BaseURL string `flag:"base-url" default:"" description:"Base URL of this service" validate:"nonzero"` + ForceSyncInterval time.Duration `flag:"force-sync-interval" default:"1m" description:"How often to force a sync without updates"` + Listen string `flag:"listen" default:":3000" description:"Port/IP to listen on"` + LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` + StoreFile string `flag:"store-file" default:"store.json.gz" description:"File to store the state to"` + TwitchClient string `flag:"twitch-client" default:"" description:"Client ID to act as" validate:"nonzero"` + TwitchID string `flag:"twitch-id" default:"" description:"ID of the user of the overlay" validate:"nonzero"` + TwitchToken string `flag:"twitch-token" default:"" description:"OAuth token valid for client"` + UpdateFromAPIInterval time.Duration `flag:"update-from-api-interval" default:"10m" description:"How often to ask the API for real values"` + VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` + WebHookTimeout time.Duration `flag:"webhook-timeout" default:"15m" description:"When to re-register the webhooks"` + }{} + + version = "dev" + store *storage + webhookSecret = uuid.Must(uuid.NewV4()).String() +) + +func init() { + rconfig.AutoEnv(true) + if err := rconfig.ParseAndValidate(&cfg); err != nil { + log.Fatalf("Unable to parse commandline options: %s", err) + } + + if cfg.VersionAndExit { + fmt.Printf("twitch-manager %s\n", version) + os.Exit(0) + } + + if l, err := log.ParseLevel(cfg.LogLevel); err != nil { + log.WithError(err).Fatal("Unable to parse log level") + } else { + log.SetLevel(l) + } +} + +func main() { + store = newStorage() + if err := store.Load(cfg.StoreFile); err != nil && !os.IsNotExist(err) { + log.WithError(err).Fatal("Unable to load store") + } + + router := mux.NewRouter() + registerAPI(router) + + router.HandleFunc("/{file:(?:app.js|overlay.html)}", func(w http.ResponseWriter, r *http.Request) { + http.ServeFile(w, r, path.Join(cfg.AssetDir, mux.Vars(r)["file"])) + }) + + go func() { + if err := http.ListenAndServe(cfg.Listen, router); err != nil { + log.WithError(err).Fatal("HTTP server ended unexpectedly") + } + }() + + if err := registerWebHooks(); err != nil { + log.WithError(err).Fatal("Unable to register webhooks") + } + + for { + select { + case <-time.NewTicker(cfg.WebHookTimeout).C: + if err := registerWebHooks(); err != nil { + log.WithError(err).Fatal("Unable to re-register webhooks") + } + + case <-time.NewTicker(cfg.UpdateFromAPIInterval).C: + if err := updateStats(); err != nil { + log.WithError(err).Error("Unable to update statistics from API") + } + + case <-time.NewTicker(cfg.ForceSyncInterval).C: + if err := sendAllSockets(store); err != nil { + log.WithError(err).Error("Unable to send store to all sockets") + } + + } + } +} diff --git a/overlay.html b/overlay.html new file mode 100644 index 0000000..5e59caf --- /dev/null +++ b/overlay.html @@ -0,0 +1,62 @@ + + + + + + + +
+ + + + Luziferus + + + {{ store.followers.last }} + + + {{ store.followers.count }} / {{ nextFollowers }} + + + {{ store.subs.last }} (x{{ store.subs.lastDuration }}) + + + {{ store.subs.count }} / {{ nextSubs }} + + + {{ store.donation.last_donator }}: {{ store.donation.last_amount.toFixed(2) }} € + + + + + + + ​ + + {{ clock }} + + + + +
+ + + + + + + diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..599548b --- /dev/null +++ b/stats.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +func updateStats() error { + log.Debug("Updating statistics from API") + for _, fn := range []func() error{ + updateFollowers, + } { + if err := fn(); err != nil { + return errors.Wrap(err, "update statistics module") + } + } + + return nil +} + +func updateFollowers() error { + log.Debug("Updating followers from API") + ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://api.twitch.tv/helix/users/follows?to_id=%s", cfg.TwitchID), nil) + if err != nil { + return errors.Wrap(err, "assemble follower count request") + } + req.Header.Set("Client-Id", cfg.TwitchClient) + req.Header.Set("Authorization", "Bearer "+cfg.TwitchToken) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return errors.Wrap(err, "requesting subscribe") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode) + } + return errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) + } + + payload := struct { + Total int64 `json:"total"` + Data []struct { + FromName string `json:"from_name"` + FollowedAt time.Time `json:"followed_at"` + } `json:"data"` + // Contains more but I don't care. + }{} + + if err = json.NewDecoder(resp.Body).Decode(&payload); err != nil { + return errors.Wrap(err, "decode json response") + } + + var seen []string + for _, f := range payload.Data { + seen = append(seen, f.FromName) + } + + store.Followers.Count = payload.Total + store.Followers.Seen = seen + + if err = store.Save(cfg.StoreFile); err != nil { + return errors.Wrap(err, "save store") + } + + return errors.Wrap( + sendAllSockets(store), + "update all sockets", + ) +} diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..647a345 --- /dev/null +++ b/storage.go @@ -0,0 +1,79 @@ +package main + +import ( + "compress/gzip" + "encoding/json" + "os" + "sync" + + "github.com/pkg/errors" +) + +const storeMaxFollowers = 25 + +type storage struct { + Donations struct { + LastDonator *string `json:"last_donator"` + LastAmount float64 `json:"last_amount"` + TotalAmount float64 `json:"total_amount"` + } `json:"donations"` + Followers struct { + Last *string `json:"last"` + Seen []string `json:"seen"` + Count int64 `json:"count"` + } `json:"followers"` + Subs struct { + Last *string `json:"last"` + LastDuration int64 `json:"last_duration"` + Count int64 `json:"count"` + } `json:"subs"` + + saveLock sync.Mutex +} + +func newStorage() *storage { return &storage{} } + +func (s *storage) Load(from string) error { + f, err := os.Open(from) + if err != nil { + if os.IsNotExist(err) { + return err + } + return errors.Wrap(err, "opening storage file") + } + defer f.Close() + + gf, err := gzip.NewReader(f) + if err != nil { + return errors.Wrap(err, "create gzip reader") + } + defer gf.Close() + + return errors.Wrap( + json.NewDecoder(gf).Decode(s), + "decode json", + ) +} + +func (s *storage) Save(to string) error { + s.saveLock.Lock() + defer s.saveLock.Unlock() + + if len(s.Followers.Seen) > storeMaxFollowers { + s.Followers.Seen = s.Followers.Seen[:storeMaxFollowers] + } + + f, err := os.Create(to) + if err != nil { + return errors.Wrap(err, "create file") + } + defer f.Close() + + gf := gzip.NewWriter(f) + defer gf.Close() + + return errors.Wrap( + json.NewEncoder(gf).Encode(s), + "encode json", + ) +} diff --git a/webhook.go b/webhook.go new file mode 100644 index 0000000..7cddb84 --- /dev/null +++ b/webhook.go @@ -0,0 +1,155 @@ +package main + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "sort" + "strings" + "time" + + "github.com/gorilla/mux" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/Luzifer/go_helpers/v2/str" +) + +const twitchRequestTimeout = 2 * time.Second + +func handleWebHookPush(w http.ResponseWriter, r *http.Request) { + var ( + vars = mux.Vars(r) + hookType = vars["type"] + + logger = log.WithField("type", hookType) + ) + + // When asked for a confirmation, just confirm it + if challengeToken := r.URL.Query().Get("hub.challenge"); challengeToken != "" { + logger.WithField("challenge", challengeToken).Debug("Confirming webhook subscription") + w.Write([]byte(challengeToken)) + return + } + + // We're getting a reason for a denied subscription + if reason := r.URL.Query().Get("hub.reason"); reason != "" { + logger.WithField("reason", reason).Error("Webhook subscription was denied") + return + } + + var ( + body = new(bytes.Buffer) + signature = r.Header.Get("X-Hub-Signature") + ) + + if _, err := io.Copy(body, r.Body); err != nil { + logger.WithError(err).Error("Unable to read hook body") + return + } + + mac := hmac.New(sha256.New, []byte(webhookSecret)) + mac.Write(body.Bytes()) + if cSig := fmt.Sprintf("sha256=%x", mac.Sum(nil)); cSig != signature { + log.Errorf("Got message signature %s, expected %s", signature, cSig) + http.Error(w, "Signature verification failed", http.StatusUnauthorized) + return + } + + switch hookType { + case "follow": + var payload struct { + Data []struct { + FromName string `json:"from_name"` + FollowedAt time.Time `json:"followed_at"` + } `json:"data"` + } + + if err := json.NewDecoder(body).Decode(&payload); err != nil { + logger.WithError(err).Error("Unable to decode payload") + return + } + + sort.Slice(payload.Data, func(i, j int) bool { return payload.Data[i].FollowedAt.Before(payload.Data[j].FollowedAt) }) + for _, f := range payload.Data { + if str.StringInSlice(f.FromName, store.Followers.Seen) { + continue + } + + logger.WithField("name", f.FromName).Info("New follower announced") + store.Followers.Last = &f.FromName + store.Followers.Count++ + store.Followers.Seen = append([]string{f.FromName}, store.Followers.Seen...) + } + + default: + log.WithField("type", hookType).Warn("Received unexpected webhook request") + return + } + + if err := store.Save(cfg.StoreFile); err != nil { + logger.WithError(err).Error("Unable to update persistent store") + } + + if err := sendAllSockets(store); err != nil { + logger.WithError(err).Error("Unable to send update to all sockets") + } +} + +func registerWebHooks() error { + hookURL := func(hookType string) string { + return strings.Join([]string{ + strings.TrimRight(cfg.BaseURL, "/"), + "api", "webhook", + hookType, + }, "/") + } + + for uri, topic := range map[string]string{ + hookURL("follow"): fmt.Sprintf("https://api.twitch.tv/helix/users/follows?first=1&to_id=%s", cfg.TwitchID), + } { + ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) + defer cancel() + + buf := new(bytes.Buffer) + if err := json.NewEncoder(buf).Encode(map[string]interface{}{ + "hub.callback": uri, + "hub.mode": "subscribe", + "hub.topic": topic, + "hub.lease_seconds": int64((cfg.WebHookTimeout + twitchRequestTimeout) / time.Second), + "hub.secret": webhookSecret, + }); err != nil { + return errors.Wrap(err, "assemble subscribe payload") + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://api.twitch.tv/helix/webhooks/hub", buf) + if err != nil { + return errors.Wrap(err, "assemble subscribe request") + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Client-Id", cfg.TwitchClient) + req.Header.Set("Authorization", "Bearer "+cfg.TwitchToken) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return errors.Wrap(err, "requesting subscribe") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusAccepted { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode) + } + return errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) + } + } + + return nil +}