diff --git a/api.go b/api.go index 4b405e9..020240f 100644 --- a/api.go +++ b/api.go @@ -18,10 +18,11 @@ import ( ) const ( - msgTypeHost string = "host" - msgTypeRaid string = "raid" - msgTypeStore string = "store" - msgTypeSub string = "sub" + msgTypeHost string = "host" + msgTypeRaid string = "raid" + msgTypeStore string = "store" + msgTypeSub string = "sub" + msgTypeSubGift string = "subgift" ) var subscriptions = newSubscriptionStore() diff --git a/irc.go b/irc.go index 71ade2b..23eac0e 100644 --- a/irc.go +++ b/irc.go @@ -222,8 +222,16 @@ func (ircHandler) handleTwitchUsernotice(m *irc.Message) { duration = v } - store.Subs.Last = &strDisplayName - store.Subs.LastDuration = duration + store.WithModLock(func() error { + store.Subs.Last = &strDisplayName + store.Subs.LastDuration = duration + store.Subs.Recent = append([]subscriber{{ + Name: strDisplayName, + Months: duration, + }}, store.Subs.Recent...) + + return nil + }) // Send update to sockets log.WithFields(log.Fields(fields)).Info("New subscriber") @@ -234,7 +242,7 @@ func (ircHandler) handleTwitchUsernotice(m *irc.Message) { log.WithError(err).Error("Unable to update persistent store") } - if err := subscriptions.SendAllSockets(msgTypeStore, store); err != nil { + if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store) }); err != nil { log.WithError(err).Error("Unable to send update to all sockets") } @@ -255,25 +263,33 @@ func (ircHandler) handleTwitchUsernotice(m *irc.Message) { } // Update store - strDisplayName := string(displayName) + strDisplayName := string(toName) var duration int64 if v, err := strconv.ParseInt(string(m.Tags["msg-param-months"]), 10, 64); err == nil { duration = v } - store.Subs.Last = &strDisplayName - store.Subs.LastDuration = duration + store.WithModLock(func() error { + store.Subs.Last = &strDisplayName + store.Subs.LastDuration = duration + store.Subs.Recent = append([]subscriber{{ + Name: strDisplayName, + Months: duration, + }}, store.Subs.Recent...) + + return nil + }) // Send update to sockets log.WithFields(log.Fields(fields)).Info("New sub-gift") - subscriptions.SendAllSockets(msgTypeSub, fields) + subscriptions.SendAllSockets(msgTypeSubGift, fields) // Execute store save if err := store.Save(cfg.StoreFile); err != nil { log.WithError(err).Error("Unable to update persistent store") } - if err := subscriptions.SendAllSockets(msgTypeStore, store); err != nil { + if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store) }); err != nil { log.WithError(err).Error("Unable to send update to all sockets") } diff --git a/stats.go b/stats.go index a38ffab..cd3b9d6 100644 --- a/stats.go +++ b/stats.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "net/http" + "net/url" "time" "github.com/pkg/errors" @@ -16,6 +17,8 @@ func updateStats() error { log.Debug("Updating statistics from API") for _, fn := range []func() error{ updateFollowers, + updateSubscriberCount, + func() error { return subscriptions.SendAllSockets(msgTypeStore, store) }, } { if err := fn(); err != nil { return errors.Wrap(err, "update statistics module") @@ -69,15 +72,86 @@ func updateFollowers() error { seen = append(seen, f.FromName) } - store.Followers.Count = payload.Total - store.Followers.Seen = seen + store.WithModLock(func() error { + store.Followers.Count = payload.Total + store.Followers.Seen = seen - if err = store.Save(cfg.StoreFile); err != nil { - return errors.Wrap(err, "save store") + return nil + }) + + return errors.Wrap(store.Save(cfg.StoreFile), "save store") +} + +func updateSubscriberCount() error { + log.Debug("Updating subscriber count from API") + + var ( + params = url.Values{"broadcaster_id": []string{cfg.TwitchID}} + subCount int64 + ) + + for { + ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://api.twitch.tv/helix/subscriptions?%s", params.Encode()), nil) + if err != nil { + return errors.Wrap(err, "assemble subscriber request") + } + req.Header.Set("Client-Id", cfg.TwitchClient) + req.Header.Set("Authorization", "Bearer "+cfg.TwitchToken) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return errors.Wrap(err, "requesting subscribe") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode) + } + return errors.Errorf("unexpected status %d: %s", resp.StatusCode, body) + } + + payload := struct { + Pagination struct { + Cursor string `json:"cursor"` + } `json:"pagination"` + Data []struct { + BroadcasterID string `json:"broadcaster_id"` + Tier string `json:"tier"` + UserID string `json:"user_id"` + } `json:"data"` + // Contains more but I don't care. + }{} + + if err = json.NewDecoder(resp.Body).Decode(&payload); err != nil { + return errors.Wrap(err, "decode json response") + } + + if len(payload.Data) == 0 { + break + } + + for _, sub := range payload.Data { + if sub.UserID == sub.BroadcasterID { + // Don't count self + continue + } + + subCount++ + } + + params.Set("after", payload.Pagination.Cursor) } - return errors.Wrap( - subscriptions.SendAllSockets(msgTypeStore, store), - "update all sockets", - ) + store.WithModLock(func() error { + store.Subs.Count = subCount + + return nil + }) + + return errors.Wrap(store.Save(cfg.StoreFile), "save store") } diff --git a/storage.go b/storage.go index 647a345..0e4ef74 100644 --- a/storage.go +++ b/storage.go @@ -9,7 +9,12 @@ import ( "github.com/pkg/errors" ) -const storeMaxFollowers = 25 +const storeMaxRecent = 25 + +type subscriber struct { + Name string `json:"name"` + Months int64 `json:"months"` +} type storage struct { Donations struct { @@ -23,11 +28,13 @@ type storage struct { Count int64 `json:"count"` } `json:"followers"` Subs struct { - Last *string `json:"last"` - LastDuration int64 `json:"last_duration"` - Count int64 `json:"count"` + Last *string `json:"last"` + LastDuration int64 `json:"last_duration"` + Count int64 `json:"count"` + Recent []subscriber `json:"recent"` } `json:"subs"` + modLock sync.RWMutex saveLock sync.Mutex } @@ -56,11 +63,18 @@ func (s *storage) Load(from string) error { } func (s *storage) Save(to string) error { + s.modLock.RLock() + defer s.modLock.RUnlock() + s.saveLock.Lock() defer s.saveLock.Unlock() - if len(s.Followers.Seen) > storeMaxFollowers { - s.Followers.Seen = s.Followers.Seen[:storeMaxFollowers] + if len(s.Followers.Seen) > storeMaxRecent { + s.Followers.Seen = s.Followers.Seen[:storeMaxRecent] + } + + if len(s.Subs.Recent) > storeMaxRecent { + s.Subs.Recent = s.Subs.Recent[:storeMaxRecent] } f, err := os.Create(to) @@ -77,3 +91,17 @@ func (s *storage) Save(to string) error { "encode json", ) } + +func (s *storage) WithModLock(fn func() error) error { + s.modLock.Lock() + defer s.modLock.Unlock() + + return fn() +} + +func (s *storage) WithModRLock(fn func() error) error { + s.modLock.RLock() + defer s.modLock.RUnlock() + + return fn() +} diff --git a/webhook.go b/webhook.go index 7dbfe48..0a4f798 100644 --- a/webhook.go +++ b/webhook.go @@ -78,15 +78,26 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) { sort.Slice(payload.Data, func(i, j int) bool { return payload.Data[i].FollowedAt.Before(payload.Data[j].FollowedAt) }) for _, f := range payload.Data { - if str.StringInSlice(f.FromName, store.Followers.Seen) { + var isKnown bool + + store.WithModRLock(func() error { + isKnown = str.StringInSlice(f.FromName, store.Followers.Seen) + return nil + }) + + if isKnown { logger.WithField("name", f.FromName).Debug("New follower already known, skipping") continue } logger.WithField("name", f.FromName).Info("New follower announced") - store.Followers.Last = &f.FromName - store.Followers.Count++ - store.Followers.Seen = append([]string{f.FromName}, store.Followers.Seen...) + store.WithModLock(func() error { + store.Followers.Last = &f.FromName + store.Followers.Count++ + store.Followers.Seen = append([]string{f.FromName}, store.Followers.Seen...) + + return nil + }) } default: @@ -98,7 +109,7 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) { logger.WithError(err).Error("Unable to update persistent store") } - if err := subscriptions.SendAllSockets(msgTypeStore, store); err != nil { + if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store) }); err != nil { logger.WithError(err).Error("Unable to send update to all sockets") } }