Add subs, increase concurrency safety

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2020-12-30 16:40:23 +01:00
parent 5293fc8ea2
commit 54224d2118
Signed by: luzifer
GPG key ID: 0066F03ED215AD7D
5 changed files with 161 additions and 31 deletions

9
api.go
View file

@ -18,10 +18,11 @@ import (
)
const (
msgTypeHost string = "host"
msgTypeRaid string = "raid"
msgTypeStore string = "store"
msgTypeSub string = "sub"
msgTypeHost string = "host"
msgTypeRaid string = "raid"
msgTypeStore string = "store"
msgTypeSub string = "sub"
msgTypeSubGift string = "subgift"
)
var subscriptions = newSubscriptionStore()

32
irc.go
View file

@ -222,8 +222,16 @@ func (ircHandler) handleTwitchUsernotice(m *irc.Message) {
duration = v
}
store.Subs.Last = &strDisplayName
store.Subs.LastDuration = duration
store.WithModLock(func() error {
store.Subs.Last = &strDisplayName
store.Subs.LastDuration = duration
store.Subs.Recent = append([]subscriber{{
Name: strDisplayName,
Months: duration,
}}, store.Subs.Recent...)
return nil
})
// Send update to sockets
log.WithFields(log.Fields(fields)).Info("New subscriber")
@ -234,7 +242,7 @@ func (ircHandler) handleTwitchUsernotice(m *irc.Message) {
log.WithError(err).Error("Unable to update persistent store")
}
if err := subscriptions.SendAllSockets(msgTypeStore, store); err != nil {
if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store) }); err != nil {
log.WithError(err).Error("Unable to send update to all sockets")
}
@ -255,25 +263,33 @@ func (ircHandler) handleTwitchUsernotice(m *irc.Message) {
}
// Update store
strDisplayName := string(displayName)
strDisplayName := string(toName)
var duration int64
if v, err := strconv.ParseInt(string(m.Tags["msg-param-months"]), 10, 64); err == nil {
duration = v
}
store.Subs.Last = &strDisplayName
store.Subs.LastDuration = duration
store.WithModLock(func() error {
store.Subs.Last = &strDisplayName
store.Subs.LastDuration = duration
store.Subs.Recent = append([]subscriber{{
Name: strDisplayName,
Months: duration,
}}, store.Subs.Recent...)
return nil
})
// Send update to sockets
log.WithFields(log.Fields(fields)).Info("New sub-gift")
subscriptions.SendAllSockets(msgTypeSub, fields)
subscriptions.SendAllSockets(msgTypeSubGift, fields)
// Execute store save
if err := store.Save(cfg.StoreFile); err != nil {
log.WithError(err).Error("Unable to update persistent store")
}
if err := subscriptions.SendAllSockets(msgTypeStore, store); err != nil {
if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store) }); err != nil {
log.WithError(err).Error("Unable to send update to all sockets")
}

View file

@ -6,6 +6,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/pkg/errors"
@ -16,6 +17,8 @@ func updateStats() error {
log.Debug("Updating statistics from API")
for _, fn := range []func() error{
updateFollowers,
updateSubscriberCount,
func() error { return subscriptions.SendAllSockets(msgTypeStore, store) },
} {
if err := fn(); err != nil {
return errors.Wrap(err, "update statistics module")
@ -69,15 +72,86 @@ func updateFollowers() error {
seen = append(seen, f.FromName)
}
store.Followers.Count = payload.Total
store.Followers.Seen = seen
store.WithModLock(func() error {
store.Followers.Count = payload.Total
store.Followers.Seen = seen
if err = store.Save(cfg.StoreFile); err != nil {
return errors.Wrap(err, "save store")
return nil
})
return errors.Wrap(store.Save(cfg.StoreFile), "save store")
}
func updateSubscriberCount() error {
log.Debug("Updating subscriber count from API")
var (
params = url.Values{"broadcaster_id": []string{cfg.TwitchID}}
subCount int64
)
for {
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://api.twitch.tv/helix/subscriptions?%s", params.Encode()), nil)
if err != nil {
return errors.Wrap(err, "assemble subscriber request")
}
req.Header.Set("Client-Id", cfg.TwitchClient)
req.Header.Set("Authorization", "Bearer "+cfg.TwitchToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return errors.Wrap(err, "requesting subscribe")
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode)
}
return errors.Errorf("unexpected status %d: %s", resp.StatusCode, body)
}
payload := struct {
Pagination struct {
Cursor string `json:"cursor"`
} `json:"pagination"`
Data []struct {
BroadcasterID string `json:"broadcaster_id"`
Tier string `json:"tier"`
UserID string `json:"user_id"`
} `json:"data"`
// Contains more but I don't care.
}{}
if err = json.NewDecoder(resp.Body).Decode(&payload); err != nil {
return errors.Wrap(err, "decode json response")
}
if len(payload.Data) == 0 {
break
}
for _, sub := range payload.Data {
if sub.UserID == sub.BroadcasterID {
// Don't count self
continue
}
subCount++
}
params.Set("after", payload.Pagination.Cursor)
}
return errors.Wrap(
subscriptions.SendAllSockets(msgTypeStore, store),
"update all sockets",
)
store.WithModLock(func() error {
store.Subs.Count = subCount
return nil
})
return errors.Wrap(store.Save(cfg.StoreFile), "save store")
}

View file

@ -9,7 +9,12 @@ import (
"github.com/pkg/errors"
)
const storeMaxFollowers = 25
const storeMaxRecent = 25
type subscriber struct {
Name string `json:"name"`
Months int64 `json:"months"`
}
type storage struct {
Donations struct {
@ -23,11 +28,13 @@ type storage struct {
Count int64 `json:"count"`
} `json:"followers"`
Subs struct {
Last *string `json:"last"`
LastDuration int64 `json:"last_duration"`
Count int64 `json:"count"`
Last *string `json:"last"`
LastDuration int64 `json:"last_duration"`
Count int64 `json:"count"`
Recent []subscriber `json:"recent"`
} `json:"subs"`
modLock sync.RWMutex
saveLock sync.Mutex
}
@ -56,11 +63,18 @@ func (s *storage) Load(from string) error {
}
func (s *storage) Save(to string) error {
s.modLock.RLock()
defer s.modLock.RUnlock()
s.saveLock.Lock()
defer s.saveLock.Unlock()
if len(s.Followers.Seen) > storeMaxFollowers {
s.Followers.Seen = s.Followers.Seen[:storeMaxFollowers]
if len(s.Followers.Seen) > storeMaxRecent {
s.Followers.Seen = s.Followers.Seen[:storeMaxRecent]
}
if len(s.Subs.Recent) > storeMaxRecent {
s.Subs.Recent = s.Subs.Recent[:storeMaxRecent]
}
f, err := os.Create(to)
@ -77,3 +91,17 @@ func (s *storage) Save(to string) error {
"encode json",
)
}
func (s *storage) WithModLock(fn func() error) error {
s.modLock.Lock()
defer s.modLock.Unlock()
return fn()
}
func (s *storage) WithModRLock(fn func() error) error {
s.modLock.RLock()
defer s.modLock.RUnlock()
return fn()
}

View file

@ -78,15 +78,26 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) {
sort.Slice(payload.Data, func(i, j int) bool { return payload.Data[i].FollowedAt.Before(payload.Data[j].FollowedAt) })
for _, f := range payload.Data {
if str.StringInSlice(f.FromName, store.Followers.Seen) {
var isKnown bool
store.WithModRLock(func() error {
isKnown = str.StringInSlice(f.FromName, store.Followers.Seen)
return nil
})
if isKnown {
logger.WithField("name", f.FromName).Debug("New follower already known, skipping")
continue
}
logger.WithField("name", f.FromName).Info("New follower announced")
store.Followers.Last = &f.FromName
store.Followers.Count++
store.Followers.Seen = append([]string{f.FromName}, store.Followers.Seen...)
store.WithModLock(func() error {
store.Followers.Last = &f.FromName
store.Followers.Count++
store.Followers.Seen = append([]string{f.FromName}, store.Followers.Seen...)
return nil
})
}
default:
@ -98,7 +109,7 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) {
logger.WithError(err).Error("Unable to update persistent store")
}
if err := subscriptions.SendAllSockets(msgTypeStore, store); err != nil {
if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store) }); err != nil {
logger.WithError(err).Error("Unable to send update to all sockets")
}
}