twitch-bot/twitchWatcher.go

377 lines
9.9 KiB
Go
Raw Normal View History

package main
import (
"encoding/json"
"sync"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/Luzifer/twitch-bot/pkg/twitch"
"github.com/Luzifer/twitch-bot/plugins"
)
type (
twitchChannelState struct {
Category string
IsLive bool
Title string
isInitialized bool
unregisterFunc func()
}
twitchWatcher struct {
ChannelStatus map[string]*twitchChannelState
lock sync.RWMutex
}
)
func (t twitchChannelState) Equals(c twitchChannelState) bool {
return t.Category == c.Category &&
t.IsLive == c.IsLive &&
t.Title == c.Title
}
func (t *twitchChannelState) Update(c twitchChannelState) {
t.Category = c.Category
t.IsLive = c.IsLive
t.Title = c.Title
}
func newTwitchWatcher() *twitchWatcher {
return &twitchWatcher{
ChannelStatus: make(map[string]*twitchChannelState),
}
}
func (t *twitchWatcher) AddChannel(channel string) error {
t.lock.RLock()
_, ok := t.ChannelStatus[channel]
t.lock.RUnlock()
if ok {
return nil
}
// Initialize for check loop
t.lock.Lock()
t.ChannelStatus[channel] = &twitchChannelState{}
t.lock.Unlock()
return t.updateChannelFromAPI(channel)
}
func (t *twitchWatcher) Check() {
var channels []string
t.lock.RLock()
for c := range t.ChannelStatus {
if t.ChannelStatus[c].unregisterFunc != nil {
continue
}
channels = append(channels, c)
}
t.lock.RUnlock()
for _, ch := range channels {
if err := t.updateChannelFromAPI(ch); err != nil {
log.WithError(err).WithField("channel", ch).Error("Unable to update channel status")
}
}
}
func (t *twitchWatcher) RemoveChannel(channel string) error {
t.lock.Lock()
defer t.lock.Unlock()
if f := t.ChannelStatus[channel].unregisterFunc; f != nil {
f()
}
delete(t.ChannelStatus, channel)
return nil
}
func (t *twitchWatcher) handleEventSubChannelFollow(m json.RawMessage) error {
var payload twitch.EventSubEventFollow
if err := json.Unmarshal(m, &payload); err != nil {
return errors.Wrap(err, "unmarshalling event")
}
fields := plugins.FieldCollectionFromData(map[string]interface{}{
"channel": "#" + payload.BroadcasterUserLogin,
"followed_at": payload.FollowedAt,
"user_id": payload.UserID,
"user": payload.UserLogin,
})
log.WithFields(log.Fields(fields.Data())).Info("User followed")
go handleMessage(ircHdl.Client(), nil, eventTypeFollow, fields)
return nil
}
func (t *twitchWatcher) handleEventSubChannelPointCustomRewardRedemptionAdd(m json.RawMessage) error {
var payload twitch.EventSubEventChannelPointCustomRewardRedemptionAdd
if err := json.Unmarshal(m, &payload); err != nil {
return errors.Wrap(err, "unmarshalling event")
}
fields := plugins.FieldCollectionFromData(map[string]interface{}{
"channel": "#" + payload.BroadcasterUserLogin,
"reward_cost": payload.Reward.Cost,
"reward_id": payload.Reward.ID,
"reward_title": payload.Reward.Title,
"status": payload.Status,
"user_id": payload.UserID,
"user_input": payload.UserInput,
"user": payload.UserLogin,
})
log.WithFields(log.Fields(fields.Data())).Info("ChannelPoint reward was redeemed")
go handleMessage(ircHdl.Client(), nil, eventTypeChannelPointRedeem, fields)
return nil
}
func (t *twitchWatcher) handleEventSubChannelUpdate(m json.RawMessage) error {
var payload twitch.EventSubEventChannelUpdate
if err := json.Unmarshal(m, &payload); err != nil {
return errors.Wrap(err, "unmarshalling event")
}
t.triggerUpdate(payload.BroadcasterUserLogin, &payload.Title, &payload.CategoryName, nil)
return nil
}
func (t *twitchWatcher) handleEventSubStreamOnOff(isOnline bool) func(json.RawMessage) error {
return func(m json.RawMessage) error {
var payload twitch.EventSubEventFollow
if err := json.Unmarshal(m, &payload); err != nil {
return errors.Wrap(err, "unmarshalling event")
}
t.triggerUpdate(payload.BroadcasterUserLogin, nil, nil, &isOnline)
return nil
}
}
func (t *twitchWatcher) handleEventUserAuthRevoke(m json.RawMessage) error {
var payload twitch.EventSubEventUserAuthorizationRevoke
if err := json.Unmarshal(m, &payload); err != nil {
return errors.Wrap(err, "unmarshalling event")
}
if payload.ClientID != cfg.TwitchClient {
// We got an revoke for a different ID: Shouldn't happen but whatever.
return nil
}
return errors.Wrap(
accessService.RemoveExendedTwitchCredentials(payload.UserLogin),
"deleting granted scopes",
)
}
func (t *twitchWatcher) updateChannelFromAPI(channel string) error {
t.lock.Lock()
defer t.lock.Unlock()
var (
err error
status twitchChannelState
storedStatus = t.ChannelStatus[channel]
)
status.IsLive, err = twitchClient.HasLiveStream(channel)
if err != nil {
return errors.Wrap(err, "getting live status")
}
status.Category, status.Title, err = twitchClient.GetRecentStreamInfo(channel)
if err != nil {
return errors.Wrap(err, "getting stream info")
}
if storedStatus == nil {
storedStatus = &twitchChannelState{}
t.ChannelStatus[channel] = storedStatus
}
if storedStatus.isInitialized && !storedStatus.Equals(status) {
// Send updates only when we do have an update
t.triggerUpdate(channel, &status.Title, &status.Category, &status.IsLive)
}
storedStatus.Update(status)
storedStatus.isInitialized = true
if storedStatus.unregisterFunc != nil {
// Do not register twice
return nil
}
if storedStatus.unregisterFunc, err = t.registerEventSubCallbacks(channel); err != nil {
return errors.Wrap(err, "registering eventsub callbacks")
}
return nil
}
func (t *twitchWatcher) registerEventSubCallbacks(channel string) (func(), error) {
if twitchEventSubClient == nil {
// We don't have eventsub functionality
return nil, nil
}
userID, err := twitchClient.GetIDForUsername(channel)
if err != nil {
return nil, errors.Wrap(err, "resolving channel to user-id")
}
var (
topicRegistrations = []struct {
Topic string
Condition twitch.EventSubCondition
RequiredScopes []string
AnyScope bool
Hook func(json.RawMessage) error
}{
{
Topic: twitch.EventSubEventTypeChannelUpdate,
Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: nil,
Hook: t.handleEventSubChannelUpdate,
},
{
Topic: twitch.EventSubEventTypeStreamOffline,
Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: nil,
Hook: t.handleEventSubStreamOnOff(false),
},
{
Topic: twitch.EventSubEventTypeStreamOnline,
Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: nil,
Hook: t.handleEventSubStreamOnOff(true),
},
{
Topic: twitch.EventSubEventTypeChannelFollow,
Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: nil,
Hook: t.handleEventSubChannelFollow,
},
{
Topic: twitch.EventSubEventTypeChannelPointCustomRewardRedemptionAdd,
Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: []string{twitch.ScopeChannelReadRedemptions, twitch.ScopeChannelManageRedemptions},
AnyScope: true,
Hook: t.handleEventSubChannelPointCustomRewardRedemptionAdd,
},
}
unsubHandlers []func()
)
for _, tr := range topicRegistrations {
logger := log.WithFields(log.Fields{
"any": tr.AnyScope,
"channel": channel,
"scopes": tr.RequiredScopes,
"topic": tr.Topic,
})
if len(tr.RequiredScopes) > 0 {
fn := accessService.HasPermissionsForChannel
if tr.AnyScope {
fn = accessService.HasAnyPermissionForChannel
}
hasScopes, err := fn(channel, tr.RequiredScopes...)
if err != nil {
return nil, errors.Wrap(err, "checking granted scopes")
}
if !hasScopes {
logger.Debug("Missing scopes for eventsub topic")
continue
}
}
uf, err := twitchEventSubClient.RegisterEventSubHooks(tr.Topic, tr.Condition, tr.Hook)
if err != nil {
logger.WithError(err).Error("Unable to register topic")
for _, f := range unsubHandlers {
// Error will cause unsub handlers not to be stored, therefore we unsub them now
f()
}
return nil, errors.Wrap(err, "registering topic")
}
unsubHandlers = append(unsubHandlers, uf)
}
return func() {
for _, f := range unsubHandlers {
f()
}
}, nil
}
func (t *twitchWatcher) registerGlobalHooks() error {
_, err := twitchEventSubClient.RegisterEventSubHooks(
twitch.EventSubEventTypeUserAuthorizationRevoke,
twitch.EventSubCondition{ClientID: cfg.TwitchClient},
t.handleEventUserAuthRevoke,
)
return errors.Wrap(err, "registering user auth hook")
}
func (t *twitchWatcher) triggerUpdate(channel string, title, category *string, online *bool) {
if category != nil && t.ChannelStatus[channel].Category != *category {
t.ChannelStatus[channel].Category = *category
log.WithFields(log.Fields{
"channel": channel,
"category": *category,
}).Debug("Twitch metadata changed")
go handleMessage(ircHdl.Client(), nil, eventTypeTwitchCategoryUpdate, plugins.FieldCollectionFromData(map[string]interface{}{
"channel": "#" + channel,
"category": *category,
}))
}
if title != nil && t.ChannelStatus[channel].Title != *title {
t.ChannelStatus[channel].Title = *title
log.WithFields(log.Fields{
"channel": channel,
"title": *title,
}).Debug("Twitch metadata changed")
go handleMessage(ircHdl.Client(), nil, eventTypeTwitchTitleUpdate, plugins.FieldCollectionFromData(map[string]interface{}{
"channel": "#" + channel,
"title": *title,
}))
}
if online != nil && t.ChannelStatus[channel].IsLive != *online {
t.ChannelStatus[channel].IsLive = *online
log.WithFields(log.Fields{
"channel": channel,
"isLive": *online,
}).Debug("Twitch metadata changed")
evt := eventTypeTwitchStreamOnline
if !*online {
evt = eventTypeTwitchStreamOffline
}
go handleMessage(ircHdl.Client(), nil, evt, plugins.FieldCollectionFromData(map[string]interface{}{
"channel": "#" + channel,
}))
}
}