commit dea42ee207de0066dc90ad1ab3e93d855f888983 Author: Knut Ahlers Date: Fri Nov 20 22:51:10 2020 +0100 Initial version with followers overlay diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..391192f --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.env +store.json.gz +twitch-manager diff --git a/api.go b/api.go new file mode 100644 index 0000000..533aa47 --- /dev/null +++ b/api.go @@ -0,0 +1,117 @@ +package main + +import ( + "net/http" + "sync" + "time" + + "github.com/gofrs/uuid" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" +) + +var ( + socketSubscriptions = map[string]func(msg interface{}) error{} + socketSubscriptionsLock = new(sync.RWMutex) +) + +func sendAllSockets(msg interface{}) error { + socketSubscriptionsLock.RLock() + defer socketSubscriptionsLock.RUnlock() + + for _, hdl := range socketSubscriptions { + if err := hdl(msg); err != nil { + return errors.Wrap(err, "submit message") + } + } + + return nil +} + +func subscribeSocket(id string, hdl func(interface{}) error) { + socketSubscriptionsLock.Lock() + defer socketSubscriptionsLock.Unlock() + + socketSubscriptions[id] = hdl +} + +func unsubscribeSocket(id string) { + socketSubscriptionsLock.Lock() + defer socketSubscriptionsLock.Unlock() + + delete(socketSubscriptions, id) +} + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +func registerAPI(r *mux.Router) { + r.HandleFunc("/api/subscribe", handleUpdateSocket) + r.HandleFunc("/api/webhook/{type}", handleWebHookPush) +} + +func handleUpdateSocket(w http.ResponseWriter, r *http.Request) { + // Upgrade connection to socket + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.WithError(err).Error("Unable to upgrade socket") + return + } + defer conn.Close() + + // Register listener + id := uuid.Must(uuid.NewV4()).String() + subscribeSocket(id, conn.WriteJSON) + defer unsubscribeSocket(id) + + keepAlive := time.NewTicker(5 * time.Second) + defer keepAlive.Stop() + go func() { + for range keepAlive.C { + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { + log.WithError(err).Error("Unable to send ping message") + conn.Close() + } + } + }() + + if err := conn.WriteJSON(store); err != nil { + log.WithError(err).Error("Unable to send initial state") + return + } + + // Handle socket + for { + messageType, p, err := conn.ReadMessage() + if err != nil { + log.WithError(err).Error("Unable to read from socket") + return + } + + switch messageType { + case websocket.TextMessage: + // This is fine and expected + + case websocket.BinaryMessage: + // Wat? + log.Warn("Got binary message from socket, disconnecting...") + return + + case websocket.CloseMessage: + // They want to go? Fine, have it that way! + return + + default: + log.Debug("Got unhandled message from socket: %d", messageType) + continue + } + + // FIXME: Do we need this? + _ = p + } +} diff --git a/app.js b/app.js new file mode 100644 index 0000000..5ac72e1 --- /dev/null +++ b/app.js @@ -0,0 +1,93 @@ +Vue.config.devtools = true +const app = new Vue({ + computed: { + clock() { + return moment(this.time).format('HH:mm:ss') + }, + + icons() { + const icons = [] + + if (!this.conn.avail) { + icons.push({ class: 'fas fa-ethernet text-warning' }) + } + + return icons + return [ + // { class: 'fab fa-discord' }, + // { class: 'fab fa-spotify' }, + // { class: 'fab fa-teamspeak' }, + ] + }, + + nextFollowers() { + return Math.ceil((this.store.followers.count + 1) / 25) * 25 + }, + + nextSubs() { + return Math.ceil((this.store.subs.count + 1) / 5) * 5 + }, + }, + + created() { + window.setInterval(() => { this.time = new Date() }, 1000) + this.startSocket() + }, + + data: { + conn: { + avail: false, + backoff: 100, + }, + store: {}, + socket: null, + time: new Date(), + }, + + el: '#app', + + methods: { + showAlert(title, text) { + this.$bvToast.toast(text, { + title, + toaster: 'b-toaster-top-right', + variant: 'primary', + }) + }, + + startSocket() { + if (this.socket) { + // Dispose old socket + this.socket.close() + this.socket = null + } + + let socketAddr = `${window.location.origin.replace(/^http/, 'ws')}/api/subscribe` + + this.socket = new WebSocket(socketAddr) + this.socket.onclose = () => { + this.conn.avail = false + this.conn.backoff = Math.min(this.conn.backoff * 1.25, 10000) + window.setTimeout(this.startSocket, this.conn.backoff) // Restart socket + } + this.socket.onmessage = evt => { + const data = JSON.parse(evt.data) + this.store = data + } + this.socket.onopen = evt => { + this.conn.avail = true + this.conn.backoff = 100 + } + }, + }, + + watch: { + 'store.followers.last'(to, from) { + if (!from || !to) { + // Initial load + return + } + this.showAlert('New Follower', `${to} just followed`) + }, + }, +}) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..68c0bc2 --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module github.com/Luzifer/twitch-manager + +go 1.15 + +require ( + github.com/Luzifer/go_helpers/v2 v2.11.0 + github.com/Luzifer/rconfig/v2 v2.2.1 + github.com/gofrs/uuid v3.3.0+incompatible + github.com/gorilla/mux v1.8.0 + github.com/gorilla/websocket v1.4.2 + github.com/pkg/errors v0.9.1 + github.com/sirupsen/logrus v1.7.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e068fb4 --- /dev/null +++ b/go.sum @@ -0,0 +1,31 @@ +github.com/Luzifer/go_helpers v1.4.0 h1:Pmm058SbYewfnpP1CHda/zERoAqYoZFiBHF4l8k03Ko= +github.com/Luzifer/go_helpers/v2 v2.11.0 h1:IEVuDEAq2st1sjQNaaTX8TxZ2LsXP0qGeqb2uzYZCIo= +github.com/Luzifer/go_helpers/v2 v2.11.0/go.mod h1:ZnWxPjyCdQ4rZP3kNiMSUW/7FigU1X9Rz8XopdJ5ZCU= +github.com/Luzifer/rconfig v1.2.0 h1:waD1sqasGVSQSrExpLrQ9Q1JmMaltrS391VdOjWXP/I= +github.com/Luzifer/rconfig/v2 v2.2.1 h1:zcDdLQlnlzwcBJ8E0WFzOkQE1pCMn3EbX0dFYkeTczg= +github.com/Luzifer/rconfig/v2 v2.2.1/go.mod h1:OKIX0/JRZrPJ/ZXXWklQEFXA6tBfWaljZbW37w+sqBw= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gofrs/uuid v1.2.0 h1:coDhrjgyJaglxSjxuJdqQSSdUpG3w6p1OwN2od6frBU= +github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6rR7UHz84= +github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf/go.mod h1:hyb9oH7vZsitZCiBt0ZvifOrB+qc8PS5IiilCIb87rg= +github.com/leekchan/gtf v0.0.0-20190214083521-5fba33c5b00b/go.mod h1:thNruaSwydMhkQ8dXzapABF9Sc1Tz08ZBcDdgott9RA= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM= +github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 h1:WB265cn5OpO+hK3pikC9hpP1zI/KTwmyMFKloW9eOVc= +gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go new file mode 100644 index 0000000..6110dbb --- /dev/null +++ b/main.go @@ -0,0 +1,98 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "path" + "time" + + "github.com/gofrs/uuid" + "github.com/gorilla/mux" + log "github.com/sirupsen/logrus" + + "github.com/Luzifer/rconfig/v2" +) + +var ( + cfg = struct { + AssetDir string `flag:"asset-dir" default:"." description:"Directory containing assets"` + BaseURL string `flag:"base-url" default:"" description:"Base URL of this service" validate:"nonzero"` + ForceSyncInterval time.Duration `flag:"force-sync-interval" default:"1m" description:"How often to force a sync without updates"` + Listen string `flag:"listen" default:":3000" description:"Port/IP to listen on"` + LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` + StoreFile string `flag:"store-file" default:"store.json.gz" description:"File to store the state to"` + TwitchClient string `flag:"twitch-client" default:"" description:"Client ID to act as" validate:"nonzero"` + TwitchID string `flag:"twitch-id" default:"" description:"ID of the user of the overlay" validate:"nonzero"` + TwitchToken string `flag:"twitch-token" default:"" description:"OAuth token valid for client"` + UpdateFromAPIInterval time.Duration `flag:"update-from-api-interval" default:"10m" description:"How often to ask the API for real values"` + VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` + WebHookTimeout time.Duration `flag:"webhook-timeout" default:"15m" description:"When to re-register the webhooks"` + }{} + + version = "dev" + store *storage + webhookSecret = uuid.Must(uuid.NewV4()).String() +) + +func init() { + rconfig.AutoEnv(true) + if err := rconfig.ParseAndValidate(&cfg); err != nil { + log.Fatalf("Unable to parse commandline options: %s", err) + } + + if cfg.VersionAndExit { + fmt.Printf("twitch-manager %s\n", version) + os.Exit(0) + } + + if l, err := log.ParseLevel(cfg.LogLevel); err != nil { + log.WithError(err).Fatal("Unable to parse log level") + } else { + log.SetLevel(l) + } +} + +func main() { + store = newStorage() + if err := store.Load(cfg.StoreFile); err != nil && !os.IsNotExist(err) { + log.WithError(err).Fatal("Unable to load store") + } + + router := mux.NewRouter() + registerAPI(router) + + router.HandleFunc("/{file:(?:app.js|overlay.html)}", func(w http.ResponseWriter, r *http.Request) { + http.ServeFile(w, r, path.Join(cfg.AssetDir, mux.Vars(r)["file"])) + }) + + go func() { + if err := http.ListenAndServe(cfg.Listen, router); err != nil { + log.WithError(err).Fatal("HTTP server ended unexpectedly") + } + }() + + if err := registerWebHooks(); err != nil { + log.WithError(err).Fatal("Unable to register webhooks") + } + + for { + select { + case <-time.NewTicker(cfg.WebHookTimeout).C: + if err := registerWebHooks(); err != nil { + log.WithError(err).Fatal("Unable to re-register webhooks") + } + + case <-time.NewTicker(cfg.UpdateFromAPIInterval).C: + if err := updateStats(); err != nil { + log.WithError(err).Error("Unable to update statistics from API") + } + + case <-time.NewTicker(cfg.ForceSyncInterval).C: + if err := sendAllSockets(store); err != nil { + log.WithError(err).Error("Unable to send store to all sockets") + } + + } + } +} diff --git a/overlay.html b/overlay.html new file mode 100644 index 0000000..5e59caf --- /dev/null +++ b/overlay.html @@ -0,0 +1,62 @@ + + + + + + + +
+ + + + Luziferus + + + {{ store.followers.last }} + + + {{ store.followers.count }} / {{ nextFollowers }} + + + {{ store.subs.last }} (x{{ store.subs.lastDuration }}) + + + {{ store.subs.count }} / {{ nextSubs }} + + + {{ store.donation.last_donator }}: {{ store.donation.last_amount.toFixed(2) }} € + + + + + + + ​ + + {{ clock }} + + + + +
+ + + + + + + diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..599548b --- /dev/null +++ b/stats.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +func updateStats() error { + log.Debug("Updating statistics from API") + for _, fn := range []func() error{ + updateFollowers, + } { + if err := fn(); err != nil { + return errors.Wrap(err, "update statistics module") + } + } + + return nil +} + +func updateFollowers() error { + log.Debug("Updating followers from API") + ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://api.twitch.tv/helix/users/follows?to_id=%s", cfg.TwitchID), nil) + if err != nil { + return errors.Wrap(err, "assemble follower count request") + } + req.Header.Set("Client-Id", cfg.TwitchClient) + req.Header.Set("Authorization", "Bearer "+cfg.TwitchToken) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return errors.Wrap(err, "requesting subscribe") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode) + } + return errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) + } + + payload := struct { + Total int64 `json:"total"` + Data []struct { + FromName string `json:"from_name"` + FollowedAt time.Time `json:"followed_at"` + } `json:"data"` + // Contains more but I don't care. + }{} + + if err = json.NewDecoder(resp.Body).Decode(&payload); err != nil { + return errors.Wrap(err, "decode json response") + } + + var seen []string + for _, f := range payload.Data { + seen = append(seen, f.FromName) + } + + store.Followers.Count = payload.Total + store.Followers.Seen = seen + + if err = store.Save(cfg.StoreFile); err != nil { + return errors.Wrap(err, "save store") + } + + return errors.Wrap( + sendAllSockets(store), + "update all sockets", + ) +} diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..647a345 --- /dev/null +++ b/storage.go @@ -0,0 +1,79 @@ +package main + +import ( + "compress/gzip" + "encoding/json" + "os" + "sync" + + "github.com/pkg/errors" +) + +const storeMaxFollowers = 25 + +type storage struct { + Donations struct { + LastDonator *string `json:"last_donator"` + LastAmount float64 `json:"last_amount"` + TotalAmount float64 `json:"total_amount"` + } `json:"donations"` + Followers struct { + Last *string `json:"last"` + Seen []string `json:"seen"` + Count int64 `json:"count"` + } `json:"followers"` + Subs struct { + Last *string `json:"last"` + LastDuration int64 `json:"last_duration"` + Count int64 `json:"count"` + } `json:"subs"` + + saveLock sync.Mutex +} + +func newStorage() *storage { return &storage{} } + +func (s *storage) Load(from string) error { + f, err := os.Open(from) + if err != nil { + if os.IsNotExist(err) { + return err + } + return errors.Wrap(err, "opening storage file") + } + defer f.Close() + + gf, err := gzip.NewReader(f) + if err != nil { + return errors.Wrap(err, "create gzip reader") + } + defer gf.Close() + + return errors.Wrap( + json.NewDecoder(gf).Decode(s), + "decode json", + ) +} + +func (s *storage) Save(to string) error { + s.saveLock.Lock() + defer s.saveLock.Unlock() + + if len(s.Followers.Seen) > storeMaxFollowers { + s.Followers.Seen = s.Followers.Seen[:storeMaxFollowers] + } + + f, err := os.Create(to) + if err != nil { + return errors.Wrap(err, "create file") + } + defer f.Close() + + gf := gzip.NewWriter(f) + defer gf.Close() + + return errors.Wrap( + json.NewEncoder(gf).Encode(s), + "encode json", + ) +} diff --git a/webhook.go b/webhook.go new file mode 100644 index 0000000..7cddb84 --- /dev/null +++ b/webhook.go @@ -0,0 +1,155 @@ +package main + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "sort" + "strings" + "time" + + "github.com/gorilla/mux" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/Luzifer/go_helpers/v2/str" +) + +const twitchRequestTimeout = 2 * time.Second + +func handleWebHookPush(w http.ResponseWriter, r *http.Request) { + var ( + vars = mux.Vars(r) + hookType = vars["type"] + + logger = log.WithField("type", hookType) + ) + + // When asked for a confirmation, just confirm it + if challengeToken := r.URL.Query().Get("hub.challenge"); challengeToken != "" { + logger.WithField("challenge", challengeToken).Debug("Confirming webhook subscription") + w.Write([]byte(challengeToken)) + return + } + + // We're getting a reason for a denied subscription + if reason := r.URL.Query().Get("hub.reason"); reason != "" { + logger.WithField("reason", reason).Error("Webhook subscription was denied") + return + } + + var ( + body = new(bytes.Buffer) + signature = r.Header.Get("X-Hub-Signature") + ) + + if _, err := io.Copy(body, r.Body); err != nil { + logger.WithError(err).Error("Unable to read hook body") + return + } + + mac := hmac.New(sha256.New, []byte(webhookSecret)) + mac.Write(body.Bytes()) + if cSig := fmt.Sprintf("sha256=%x", mac.Sum(nil)); cSig != signature { + log.Errorf("Got message signature %s, expected %s", signature, cSig) + http.Error(w, "Signature verification failed", http.StatusUnauthorized) + return + } + + switch hookType { + case "follow": + var payload struct { + Data []struct { + FromName string `json:"from_name"` + FollowedAt time.Time `json:"followed_at"` + } `json:"data"` + } + + if err := json.NewDecoder(body).Decode(&payload); err != nil { + logger.WithError(err).Error("Unable to decode payload") + return + } + + sort.Slice(payload.Data, func(i, j int) bool { return payload.Data[i].FollowedAt.Before(payload.Data[j].FollowedAt) }) + for _, f := range payload.Data { + if str.StringInSlice(f.FromName, store.Followers.Seen) { + continue + } + + logger.WithField("name", f.FromName).Info("New follower announced") + store.Followers.Last = &f.FromName + store.Followers.Count++ + store.Followers.Seen = append([]string{f.FromName}, store.Followers.Seen...) + } + + default: + log.WithField("type", hookType).Warn("Received unexpected webhook request") + return + } + + if err := store.Save(cfg.StoreFile); err != nil { + logger.WithError(err).Error("Unable to update persistent store") + } + + if err := sendAllSockets(store); err != nil { + logger.WithError(err).Error("Unable to send update to all sockets") + } +} + +func registerWebHooks() error { + hookURL := func(hookType string) string { + return strings.Join([]string{ + strings.TrimRight(cfg.BaseURL, "/"), + "api", "webhook", + hookType, + }, "/") + } + + for uri, topic := range map[string]string{ + hookURL("follow"): fmt.Sprintf("https://api.twitch.tv/helix/users/follows?first=1&to_id=%s", cfg.TwitchID), + } { + ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) + defer cancel() + + buf := new(bytes.Buffer) + if err := json.NewEncoder(buf).Encode(map[string]interface{}{ + "hub.callback": uri, + "hub.mode": "subscribe", + "hub.topic": topic, + "hub.lease_seconds": int64((cfg.WebHookTimeout + twitchRequestTimeout) / time.Second), + "hub.secret": webhookSecret, + }); err != nil { + return errors.Wrap(err, "assemble subscribe payload") + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://api.twitch.tv/helix/webhooks/hub", buf) + if err != nil { + return errors.Wrap(err, "assemble subscribe request") + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Client-Id", cfg.TwitchClient) + req.Header.Set("Authorization", "Bearer "+cfg.TwitchToken) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return errors.Wrap(err, "requesting subscribe") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusAccepted { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode) + } + return errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) + } + } + + return nil +}