mirror of
https://github.com/Luzifer/twitch-bot.git
synced 2024-11-08 08:10:08 +00:00
[core] Add EventSub support for Twitch-Events (#10)
This commit is contained in:
parent
4e7da1c4cf
commit
d75a719781
5 changed files with 800 additions and 76 deletions
|
@ -17,6 +17,7 @@ Please see the [Wiki](https://github.com/Luzifer/twitch-bot/wiki) for documentat
|
|||
```console
|
||||
# twitch-bot --help
|
||||
Usage of twitch-bot:
|
||||
--base-url string External URL of the config-editor interface (set to enable EventSub support)
|
||||
--command-timeout duration Timeout for command execution (default 30s)
|
||||
-c, --config string Location of configuration file (default "./config.yaml")
|
||||
--log-level string Log level (debug, info, warn, error, fatal) (default "info")
|
||||
|
|
110
main.go
110
main.go
|
@ -2,9 +2,12 @@ package main
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -27,16 +30,18 @@ const ircReconnectDelay = 100 * time.Millisecond
|
|||
|
||||
var (
|
||||
cfg = struct {
|
||||
CommandTimeout time.Duration `flag:"command-timeout" default:"30s" description:"Timeout for command execution"`
|
||||
Config string `flag:"config,c" default:"./config.yaml" description:"Location of configuration file"`
|
||||
IRCRateLimit time.Duration `flag:"rate-limit" default:"1500ms" description:"How often to send a message (default: 20/30s=1500ms, if your bot is mod everywhere: 100/30s=300ms, different for known/verified bots)"`
|
||||
LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"`
|
||||
PluginDir string `flag:"plugin-dir" default:"/usr/lib/twitch-bot" description:"Where to find and load plugins"`
|
||||
StorageFile string `flag:"storage-file" default:"./storage.json.gz" description:"Where to store the data"`
|
||||
TwitchClient string `flag:"twitch-client" default:"" description:"Client ID to act as"`
|
||||
TwitchToken string `flag:"twitch-token" default:"" description:"OAuth token valid for client"`
|
||||
ValidateConfig bool `flag:"validate-config,v" default:"false" description:"Loads the config, logs any errors and quits with status 0 on success"`
|
||||
VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"`
|
||||
BaseURL string `flag:"base-url" default:"" description:"External URL of the config-editor interface (set to enable EventSub support)"`
|
||||
CommandTimeout time.Duration `flag:"command-timeout" default:"30s" description:"Timeout for command execution"`
|
||||
Config string `flag:"config,c" default:"./config.yaml" description:"Location of configuration file"`
|
||||
IRCRateLimit time.Duration `flag:"rate-limit" default:"1500ms" description:"How often to send a message (default: 20/30s=1500ms, if your bot is mod everywhere: 100/30s=300ms, different for known/verified bots)"`
|
||||
LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"`
|
||||
PluginDir string `flag:"plugin-dir" default:"/usr/lib/twitch-bot" description:"Where to find and load plugins"`
|
||||
StorageFile string `flag:"storage-file" default:"./storage.json.gz" description:"Where to store the data"`
|
||||
TwitchClient string `flag:"twitch-client" default:"" description:"Client ID to act as"`
|
||||
TwitchClientSecret string `flag:"twitch-client-secret" default:"" description:"Secret for the Client ID"`
|
||||
TwitchToken string `flag:"twitch-token" default:"" description:"OAuth token valid for client"`
|
||||
ValidateConfig bool `flag:"validate-config,v" default:"false" description:"Loads the config, logs any errors and quits with status 0 on success"`
|
||||
VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"`
|
||||
}{}
|
||||
|
||||
config *configFile
|
||||
|
@ -47,10 +52,14 @@ var (
|
|||
ircHdl *ircHandler
|
||||
router = mux.NewRouter()
|
||||
|
||||
runID = uuid.Must(uuid.NewV4()).String()
|
||||
externalHTTPAvailable bool
|
||||
|
||||
sendMessage func(m *irc.Message) error
|
||||
|
||||
store = newStorageFile(false)
|
||||
twitchClient *twitch.Client
|
||||
store = newStorageFile(false)
|
||||
twitchClient *twitch.Client
|
||||
twitchEventSubClient *twitch.EventSubClient
|
||||
|
||||
version = "dev"
|
||||
)
|
||||
|
@ -142,6 +151,7 @@ func main() {
|
|||
router.Use(corsMiddleware)
|
||||
router.HandleFunc("/openapi.html", handleSwaggerHTML)
|
||||
router.HandleFunc("/openapi.json", handleSwaggerRequest)
|
||||
router.HandleFunc("/selfcheck", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte(runID)) })
|
||||
|
||||
if err = store.Load(); err != nil {
|
||||
log.WithError(err).Fatal("Unable to load storage file")
|
||||
|
@ -180,12 +190,6 @@ func main() {
|
|||
return
|
||||
}
|
||||
|
||||
for _, c := range config.Channels {
|
||||
if err := twitchWatch.AddChannel(c); err != nil {
|
||||
log.WithError(err).WithField("channel", c).Error("Unable to add channel to watcher")
|
||||
}
|
||||
}
|
||||
|
||||
if err = startCheck(); err != nil {
|
||||
log.WithError(err).Fatal("Missing required parameters")
|
||||
}
|
||||
|
@ -209,6 +213,33 @@ func main() {
|
|||
|
||||
go http.Serve(listener, router)
|
||||
log.WithField("address", listener.Addr().String()).Info("HTTP server started")
|
||||
|
||||
checkExternalHTTP()
|
||||
|
||||
if externalHTTPAvailable && cfg.TwitchClient != "" && cfg.TwitchClientSecret != "" {
|
||||
secret, handle, err := store.GetOrGenerateEventSubSecret()
|
||||
if err != nil {
|
||||
log.WithError(err).Fatal("Unable to get or create eventsub secret")
|
||||
}
|
||||
|
||||
twitchEventSubClient = twitch.NewEventSubClient(strings.Join([]string{
|
||||
strings.TrimRight(cfg.BaseURL, "/"),
|
||||
"eventsub",
|
||||
handle,
|
||||
}, "/"), secret, handle)
|
||||
|
||||
if err = twitchEventSubClient.Authorize(cfg.TwitchClient, cfg.TwitchClientSecret); err != nil {
|
||||
log.WithError(err).Fatal("Unable to authorize Twitch EventSub client")
|
||||
}
|
||||
|
||||
router.HandleFunc("/eventsub/{keyhandle}", twitchEventSubClient.HandleEventsubPush).Methods(http.MethodPost)
|
||||
}
|
||||
}
|
||||
|
||||
for _, c := range config.Channels {
|
||||
if err := twitchWatch.AddChannel(c); err != nil {
|
||||
log.WithError(err).WithField("channel", c).Error("Unable to add channel to watcher")
|
||||
}
|
||||
}
|
||||
|
||||
ircDisconnected <- struct{}{}
|
||||
|
@ -291,6 +322,49 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
func checkExternalHTTP() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
base, err := url.Parse(cfg.BaseURL)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to parse BaseURL")
|
||||
return
|
||||
}
|
||||
|
||||
if base.String() == "" {
|
||||
log.Debug("No BaseURL set, disabling EventSub support")
|
||||
return
|
||||
}
|
||||
|
||||
base.Path = strings.Join([]string{
|
||||
strings.TrimRight(base.Path, "/"),
|
||||
"selfcheck",
|
||||
}, "/")
|
||||
|
||||
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, base.String(), nil)
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to fetch selfcheck")
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
data, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to read selfcheck response")
|
||||
return
|
||||
}
|
||||
|
||||
if strings.TrimSpace(string(data)) == runID {
|
||||
externalHTTPAvailable = true
|
||||
log.Debug("Self-Check successful, EventSub support is available")
|
||||
} else {
|
||||
externalHTTPAvailable = false
|
||||
log.Debug("Self-Check failed, EventSub support is not available")
|
||||
}
|
||||
}
|
||||
|
||||
func startCheck() error {
|
||||
var errs []string
|
||||
|
||||
|
|
28
store.go
28
store.go
|
@ -2,6 +2,8 @@ package main
|
|||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"sync"
|
||||
|
@ -11,6 +13,8 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const eventSubSecretLength = 32
|
||||
|
||||
type storageFile struct {
|
||||
Counters map[string]int64 `json:"counters"`
|
||||
Timers map[string]plugins.TimerEntry `json:"timers"`
|
||||
|
@ -18,6 +22,8 @@ type storageFile struct {
|
|||
|
||||
ModuleStorage map[string]json.RawMessage `json:"module_storage"`
|
||||
|
||||
EventSubSecret string `json:"event_sub_secret,omitempty"`
|
||||
|
||||
inMem bool
|
||||
lock *sync.RWMutex
|
||||
}
|
||||
|
@ -51,6 +57,28 @@ func (s *storageFile) GetCounterValue(counter string) int64 {
|
|||
return s.Counters[counter]
|
||||
}
|
||||
|
||||
func (s *storageFile) GetOrGenerateEventSubSecret() (string, string, error) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if s.EventSubSecret != "" {
|
||||
return s.EventSubSecret, s.EventSubSecret[:5], nil
|
||||
}
|
||||
|
||||
key := make([]byte, eventSubSecretLength)
|
||||
n, err := rand.Read(key)
|
||||
if err != nil {
|
||||
return "", "", errors.Wrap(err, "generating random secret")
|
||||
}
|
||||
if n != eventSubSecretLength {
|
||||
return "", "", errors.Errorf("read only %d of %d byte", n, eventSubSecretLength)
|
||||
}
|
||||
|
||||
s.EventSubSecret = hex.EncodeToString(key)
|
||||
|
||||
return s.EventSubSecret, s.EventSubSecret[:5], errors.Wrap(s.Save(), "saving store")
|
||||
}
|
||||
|
||||
func (s *storageFile) GetModuleStore(moduleUUID string, storedObject plugins.StorageUnmarshaller) error {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
|
524
twitch/eventsub.go
Normal file
524
twitch/eventsub.go
Normal file
|
@ -0,0 +1,524 @@
|
|||
package twitch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Luzifer/go_helpers/v2/str"
|
||||
"github.com/gofrs/uuid/v3"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/mitchellh/hashstructure/v2"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
eventSubHeaderMessageID = "Twitch-Eventsub-Message-Id"
|
||||
eventSubHeaderMessageType = "Twitch-Eventsub-Message-Type"
|
||||
eventSubHeaderMessageSignature = "Twitch-Eventsub-Message-Signature"
|
||||
eventSubHeaderMessageTimestamp = "Twitch-Eventsub-Message-Timestamp"
|
||||
// eventSubHeaderMessageRetry = "Twitch-Eventsub-Message-Retry"
|
||||
// eventSubHeaderSubscriptionType = "Twitch-Eventsub-Subscription-Type"
|
||||
// eventSubHeaderSubscriptionVersion = "Twitch-Eventsub-Subscription-Version"
|
||||
|
||||
eventSubMessageTypeVerification = "webhook_callback_verification"
|
||||
eventSubMessageTypeRevokation = "revocation"
|
||||
// eventSubMessageTypeNotification = "notification"
|
||||
|
||||
eventSubStatusEnabled = "enabled"
|
||||
eventSubStatusVerificationPending = "webhook_callback_verification_pending"
|
||||
// eventSubStatusAuthorizationRevoked = "authorization_revoked"
|
||||
// eventSubStatusFailuresExceeded = "notification_failures_exceeded"
|
||||
// eventSubStatusUserRemoved = "user_removed"
|
||||
// eventSubStatusVerificationFailed = "webhook_callback_verification_failed"
|
||||
|
||||
EventSubEventTypeChannelUpdate = "channel.update"
|
||||
EventSubEventTypeStreamOffline = "stream.offline"
|
||||
EventSubEventTypeStreamOnline = "stream.online"
|
||||
)
|
||||
|
||||
type (
|
||||
EventSubClient struct {
|
||||
apiURL string
|
||||
secret string
|
||||
secretHandle string
|
||||
|
||||
twitchClientID string
|
||||
twitchClientSecret string
|
||||
twitchAccessToken string
|
||||
|
||||
subscriptions map[string]*registeredSubscription
|
||||
subscriptionsLock sync.RWMutex
|
||||
}
|
||||
|
||||
EventSubCondition struct {
|
||||
BroadcasterUserID string `json:"broadcaster_user_id,omitempty"`
|
||||
CampaignID string `json:"campaign_id,omitempty"`
|
||||
CategoryID string `json:"category_id,omitempty"`
|
||||
ClientID string `json:"client_id,omitempty"`
|
||||
ExtensionClientID string `json:"extension_client_id,omitempty"`
|
||||
FromBroadcasterUserID string `json:"from_broadcaster_user_id,omitempty"`
|
||||
OrganizationID string `json:"organization_id,omitempty"`
|
||||
RewardID string `json:"reward_id,omitempty"`
|
||||
ToBroadcasterUserID string `json:"to_broadcaster_user_id,omitempty"`
|
||||
UserID string `json:"user_id,omitempty"`
|
||||
}
|
||||
|
||||
EventSubEventChannelUpdate struct {
|
||||
BroadcasterUserID string `json:"broadcaster_user_id"`
|
||||
BroadcasterUserLogin string `json:"broadcaster_user_login"`
|
||||
BroadcasterUserName string `json:"broadcaster_user_name"`
|
||||
Title string `json:"title"`
|
||||
Language string `json:"language"`
|
||||
CategoryID string `json:"category_id"`
|
||||
CategoryName string `json:"category_name"`
|
||||
IsMature bool `json:"is_mature"`
|
||||
}
|
||||
|
||||
EventSubEventFollow struct {
|
||||
UserID string `json:"user_id"`
|
||||
UserLogin string `json:"user_login"`
|
||||
UserName string `json:"user_name"`
|
||||
BroadcasterUserID string `json:"broadcaster_user_id"`
|
||||
BroadcasterUserLogin string `json:"broadcaster_user_login"`
|
||||
BroadcasterUserName string `json:"broadcaster_user_name"`
|
||||
FollowedAt time.Time `json:"followed_at"`
|
||||
}
|
||||
|
||||
EventSubEventStreamOffline struct {
|
||||
BroadcasterUserID string `json:"broadcaster_user_id"`
|
||||
BroadcasterUserLogin string `json:"broadcaster_user_login"`
|
||||
BroadcasterUserName string `json:"broadcaster_user_name"`
|
||||
}
|
||||
|
||||
EventSubEventStreamOnline struct {
|
||||
ID string `json:"id"`
|
||||
BroadcasterUserID string `json:"broadcaster_user_id"`
|
||||
BroadcasterUserLogin string `json:"broadcaster_user_login"`
|
||||
BroadcasterUserName string `json:"broadcaster_user_name"`
|
||||
Type string `json:"type"`
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
}
|
||||
|
||||
eventSubPostMessage struct {
|
||||
Challenge string `json:"challenge"`
|
||||
Subscription eventSubSubscription `json:"subscription"`
|
||||
Event json.RawMessage `json:"event"`
|
||||
}
|
||||
|
||||
eventSubSubscription struct {
|
||||
ID string `json:"id,omitempty"` // READONLY
|
||||
Status string `json:"status,omitempty"` // READONLY
|
||||
Type string `json:"type"`
|
||||
Version string `json:"version"`
|
||||
Cost int64 `json:"cost,omitempty"` // READONLY
|
||||
Condition EventSubCondition `json:"condition"`
|
||||
Transport eventSubTransport `json:"transport"`
|
||||
CreatedAt time.Time `json:"created_at,omitempty"` // READONLY
|
||||
}
|
||||
|
||||
eventSubTransport struct {
|
||||
Method string `json:"method"`
|
||||
Callback string `json:"callback"`
|
||||
Secret string `json:"secret"`
|
||||
}
|
||||
|
||||
registeredSubscription struct {
|
||||
Type string
|
||||
Callbacks map[string]func(json.RawMessage) error
|
||||
Subscription eventSubSubscription
|
||||
}
|
||||
)
|
||||
|
||||
func (e EventSubCondition) Hash() (string, error) {
|
||||
h, err := hashstructure.Hash(e, hashstructure.FormatV2, &hashstructure.HashOptions{TagName: "json"})
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "hashing struct")
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%x", h), nil
|
||||
}
|
||||
|
||||
func NewEventSubClient(apiURL, secret, secretHandle string) *EventSubClient {
|
||||
return &EventSubClient{
|
||||
apiURL: apiURL,
|
||||
secret: secret,
|
||||
secretHandle: secretHandle,
|
||||
|
||||
subscriptions: map[string]*registeredSubscription{},
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EventSubClient) Authorize(clientID, clientSecret string) error {
|
||||
e.twitchClientID = clientID
|
||||
e.twitchClientSecret = clientSecret
|
||||
|
||||
_, err := e.getTwitchAppAccessToken()
|
||||
return errors.Wrap(err, "fetching app access token")
|
||||
}
|
||||
|
||||
func (e *EventSubClient) getTwitchAppAccessToken() (string, error) {
|
||||
if e.twitchAccessToken != "" {
|
||||
return e.twitchAccessToken, nil
|
||||
}
|
||||
|
||||
var rData struct {
|
||||
AccessToken string `json:"access_token"`
|
||||
RefreshToken string `json:"refresh_token"`
|
||||
ExpiresIn int `json:"expires_in"`
|
||||
Scope []interface{} `json:"scope"`
|
||||
TokenType string `json:"token_type"`
|
||||
}
|
||||
|
||||
params := make(url.Values)
|
||||
params.Set("client_id", e.twitchClientID)
|
||||
params.Set("client_secret", e.twitchClientSecret)
|
||||
params.Set("grant_type", "client_credentials")
|
||||
|
||||
u, _ := url.Parse("https://id.twitch.tv/oauth2/token")
|
||||
u.RawQuery = params.Encode()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil)
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "fetching response")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "unexpected status %d and cannot read body", resp.StatusCode)
|
||||
}
|
||||
return "", errors.Errorf("unexpected status %d: %s", resp.StatusCode, body)
|
||||
}
|
||||
|
||||
e.twitchAccessToken = rData.AccessToken
|
||||
|
||||
return rData.AccessToken, errors.Wrap(
|
||||
json.NewDecoder(resp.Body).Decode(&rData),
|
||||
"decoding response",
|
||||
)
|
||||
}
|
||||
|
||||
func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
body = new(bytes.Buffer)
|
||||
keyHandle = mux.Vars(r)["keyhandle"]
|
||||
message eventSubPostMessage
|
||||
signature = r.Header.Get(eventSubHeaderMessageSignature)
|
||||
)
|
||||
|
||||
if keyHandle != e.secretHandle {
|
||||
http.Error(w, "deprecated callback used", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
// Copy body for duplicate processing
|
||||
if _, err := io.Copy(body, r.Body); err != nil {
|
||||
log.WithError(err).Error("Unable to read hook body")
|
||||
return
|
||||
}
|
||||
|
||||
// Verify signature
|
||||
mac := hmac.New(sha256.New, []byte(e.secret))
|
||||
fmt.Fprintf(mac, "%s%s%s", r.Header.Get(eventSubHeaderMessageID), r.Header.Get(eventSubHeaderMessageTimestamp), body.Bytes())
|
||||
if cSig := fmt.Sprintf("sha256=%x", mac.Sum(nil)); cSig != signature {
|
||||
log.Errorf("Got message signature %s, expected %s", signature, cSig)
|
||||
http.Error(w, "Signature verification failed", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
// Read message
|
||||
if err := json.NewDecoder(body).Decode(&message); err != nil {
|
||||
log.WithError(err).Errorf("Unable to decode eventsub message")
|
||||
http.Error(w, errors.Wrap(err, "parsing message").Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
logger := log.WithField("type", message.Subscription.Type)
|
||||
|
||||
// If we got a verification request, respond with the challenge
|
||||
switch r.Header.Get(eventSubHeaderMessageType) {
|
||||
case eventSubMessageTypeRevokation:
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
|
||||
case eventSubMessageTypeVerification:
|
||||
logger.Debug("Confirming eventsub subscription")
|
||||
w.Write([]byte(message.Challenge))
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debug("Received notification")
|
||||
|
||||
condHash, err := message.Subscription.Condition.Hash()
|
||||
if err != nil {
|
||||
logger.WithError(err).Errorf("Unable to hash condition of push")
|
||||
http.Error(w, errors.Wrap(err, "hashing condition").Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
e.subscriptionsLock.RLock()
|
||||
defer e.subscriptionsLock.RUnlock()
|
||||
|
||||
cacheKey := strings.Join([]string{message.Subscription.Type, condHash}, "::")
|
||||
|
||||
reg, ok := e.subscriptions[cacheKey]
|
||||
if !ok {
|
||||
http.Error(w, "no subscription available", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
for _, cb := range reg.Callbacks {
|
||||
if err = cb(message.Event); err != nil {
|
||||
logger.WithError(err).Error("Handler callback caused error")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:funlen,gocyclo // Not splitting to keep logic unit
|
||||
func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubCondition, callback func(json.RawMessage) error) (func(), error) {
|
||||
condHash, err := condition.Hash()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "hashing condition")
|
||||
}
|
||||
|
||||
var (
|
||||
cacheKey = strings.Join([]string{event, condHash}, "::")
|
||||
logger = log.WithField("event", event)
|
||||
)
|
||||
|
||||
e.subscriptionsLock.RLock()
|
||||
_, ok := e.subscriptions[cacheKey]
|
||||
e.subscriptionsLock.RUnlock()
|
||||
|
||||
if ok {
|
||||
// Subscription already exists
|
||||
e.subscriptionsLock.Lock()
|
||||
defer e.subscriptionsLock.Unlock()
|
||||
|
||||
logger.Debug("Adding callback to existing callback")
|
||||
|
||||
cbKey := uuid.Must(uuid.NewV4()).String()
|
||||
|
||||
e.subscriptions[cacheKey].Callbacks[cbKey] = callback
|
||||
return func() { e.unregisterCallback(cacheKey, cbKey) }, nil
|
||||
}
|
||||
|
||||
accessToken, err := e.getTwitchAppAccessToken()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "getting app-access-token")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
// List existing subscriptions
|
||||
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https://api.twitch.tv/helix/eventsub/subscriptions", nil)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Client-Id", e.twitchClientID)
|
||||
req.Header.Set("Authorization", "Bearer "+accessToken)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "requesting subscribscriptions")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var subscriptionList struct {
|
||||
Data []eventSubSubscription
|
||||
}
|
||||
|
||||
if err = json.NewDecoder(resp.Body).Decode(&subscriptionList); err != nil {
|
||||
return nil, errors.Wrap(err, "decoding subscription list")
|
||||
}
|
||||
|
||||
// Register subscriptions
|
||||
var (
|
||||
existingSub *eventSubSubscription
|
||||
)
|
||||
for i, sub := range subscriptionList.Data {
|
||||
existingConditionHash, err := sub.Condition.Hash()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "hashing existing condition")
|
||||
}
|
||||
newConditionHash, err := condition.Hash()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "hashing new condition")
|
||||
}
|
||||
|
||||
if str.StringInSlice(sub.Status, []string{eventSubStatusEnabled, eventSubStatusVerificationPending}) &&
|
||||
sub.Transport.Callback == e.apiURL &&
|
||||
existingConditionHash == newConditionHash &&
|
||||
sub.Type == event {
|
||||
logger = logger.WithFields(log.Fields{
|
||||
"id": sub.ID,
|
||||
"status": sub.Status,
|
||||
})
|
||||
existingSub = &subscriptionList.Data[i]
|
||||
}
|
||||
}
|
||||
|
||||
if existingSub != nil {
|
||||
logger.WithField("event", event).Debug("Not registering hook, already active")
|
||||
|
||||
e.subscriptionsLock.Lock()
|
||||
defer e.subscriptionsLock.Unlock()
|
||||
|
||||
logger.Debug("Found existing hook, registering and adding callback")
|
||||
|
||||
cbKey := uuid.Must(uuid.NewV4()).String()
|
||||
e.subscriptions[cacheKey] = ®isteredSubscription{
|
||||
Type: event,
|
||||
Callbacks: map[string]func(json.RawMessage) error{
|
||||
cbKey: callback,
|
||||
},
|
||||
Subscription: *existingSub,
|
||||
}
|
||||
|
||||
return func() { e.unregisterCallback(cacheKey, cbKey) }, nil
|
||||
}
|
||||
|
||||
payload := eventSubSubscription{
|
||||
Type: event,
|
||||
Version: "1",
|
||||
Condition: condition,
|
||||
Transport: eventSubTransport{
|
||||
Method: "webhook",
|
||||
Callback: e.apiURL,
|
||||
Secret: e.secret,
|
||||
},
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if err := json.NewEncoder(buf).Encode(payload); err != nil {
|
||||
return nil, errors.Wrap(err, "assemble subscribe payload")
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), twitchRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
req, err = http.NewRequestWithContext(ctx, http.MethodPost, "https://api.twitch.tv/helix/eventsub/subscriptions", buf)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "creating subscribe request")
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Client-Id", e.twitchClientID)
|
||||
req.Header.Set("Authorization", "Bearer "+accessToken)
|
||||
|
||||
resp, err = http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "requesting subscribe")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode)
|
||||
}
|
||||
return nil, errors.Errorf("unexpected status %d: %s", resp.StatusCode, body)
|
||||
}
|
||||
|
||||
var response struct {
|
||||
Data []eventSubSubscription `json:"data"`
|
||||
}
|
||||
if err = json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||||
return nil, errors.Wrap(err, "reading eventsub sub response")
|
||||
}
|
||||
|
||||
e.subscriptionsLock.Lock()
|
||||
defer e.subscriptionsLock.Unlock()
|
||||
|
||||
logger.Debug("Registered new hook")
|
||||
|
||||
cbKey := uuid.Must(uuid.NewV4()).String()
|
||||
e.subscriptions[cacheKey] = ®isteredSubscription{
|
||||
Type: event,
|
||||
Callbacks: map[string]func(json.RawMessage) error{
|
||||
cbKey: callback,
|
||||
},
|
||||
Subscription: response.Data[0],
|
||||
}
|
||||
|
||||
logger.Debug("Registered eventsub subscription")
|
||||
|
||||
return func() { e.unregisterCallback(cacheKey, cbKey) }, nil
|
||||
}
|
||||
|
||||
func (e *EventSubClient) unregisterCallback(cacheKey, cbKey string) {
|
||||
e.subscriptionsLock.RLock()
|
||||
regSub, ok := e.subscriptions[cacheKey]
|
||||
e.subscriptionsLock.RUnlock()
|
||||
|
||||
if !ok {
|
||||
// That subscription does not exist
|
||||
log.WithField("cache_key", cacheKey).Debug("Subscription does not exist, not unregistering")
|
||||
return
|
||||
}
|
||||
|
||||
if _, ok = regSub.Callbacks[cbKey]; !ok {
|
||||
// That callback does not exist
|
||||
log.WithFields(log.Fields{
|
||||
"cache_key": cacheKey,
|
||||
"callback": cbKey,
|
||||
}).Debug("Callback does not exist, not unregistering")
|
||||
return
|
||||
}
|
||||
|
||||
logger := log.WithField("event", regSub.Type)
|
||||
|
||||
delete(regSub.Callbacks, cbKey)
|
||||
|
||||
if len(regSub.Callbacks) > 0 {
|
||||
// Still callbacks registered, not removing the subscription
|
||||
return
|
||||
}
|
||||
|
||||
accessToken, err := e.getTwitchAppAccessToken()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to get access token")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, fmt.Sprintf("https://api.twitch.tv/helix/eventsub/subscriptions?id=%s", regSub.Subscription.ID), nil)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to create delete subscription request")
|
||||
return
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Client-Id", e.twitchClientID)
|
||||
req.Header.Set("Authorization", "Bearer "+accessToken)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to execute delete subscription request")
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
e.subscriptionsLock.Lock()
|
||||
defer e.subscriptionsLock.Unlock()
|
||||
|
||||
logger.Debug("Unregistered hook")
|
||||
|
||||
delete(e.subscriptions, cacheKey)
|
||||
}
|
213
twitchWatcher.go
213
twitchWatcher.go
|
@ -1,9 +1,11 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
|
||||
"github.com/Luzifer/twitch-bot/plugins"
|
||||
"github.com/Luzifer/twitch-bot/twitch"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
@ -13,6 +15,8 @@ type (
|
|||
Category string
|
||||
IsLive bool
|
||||
Title string
|
||||
|
||||
unregisterFunc func()
|
||||
}
|
||||
|
||||
twitchWatcher struct {
|
||||
|
@ -34,41 +38,50 @@ func newTwitchWatcher() *twitchWatcher {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *twitchWatcher) AddChannel(channel string) error {
|
||||
r.lock.RLock()
|
||||
if _, ok := r.ChannelStatus[channel]; ok {
|
||||
r.lock.RUnlock()
|
||||
func (t *twitchWatcher) AddChannel(channel string) error {
|
||||
t.lock.RLock()
|
||||
_, ok := t.ChannelStatus[channel]
|
||||
t.lock.RUnlock()
|
||||
|
||||
if ok {
|
||||
return nil
|
||||
}
|
||||
r.lock.RUnlock()
|
||||
|
||||
return r.updateChannelFromAPI(channel, false)
|
||||
return t.updateChannelFromAPI(channel, false)
|
||||
}
|
||||
|
||||
func (r *twitchWatcher) Check() {
|
||||
func (t *twitchWatcher) Check() {
|
||||
var channels []string
|
||||
r.lock.RLock()
|
||||
for c := range r.ChannelStatus {
|
||||
t.lock.RLock()
|
||||
for c := range t.ChannelStatus {
|
||||
if t.ChannelStatus[c].unregisterFunc != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
channels = append(channels, c)
|
||||
}
|
||||
r.lock.RUnlock()
|
||||
t.lock.RUnlock()
|
||||
|
||||
for _, ch := range channels {
|
||||
if err := r.updateChannelFromAPI(ch, true); err != nil {
|
||||
if err := t.updateChannelFromAPI(ch, true); err != nil {
|
||||
log.WithError(err).WithField("channel", ch).Error("Unable to update channel status")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *twitchWatcher) RemoveChannel(channel string) error {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
func (t *twitchWatcher) RemoveChannel(channel string) error {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
delete(r.ChannelStatus, channel)
|
||||
if f := t.ChannelStatus[channel].unregisterFunc; f != nil {
|
||||
f()
|
||||
}
|
||||
|
||||
delete(t.ChannelStatus, channel)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *twitchWatcher) updateChannelFromAPI(channel string, sendUpdate bool) error {
|
||||
func (t *twitchWatcher) updateChannelFromAPI(channel string, sendUpdate bool) error {
|
||||
var (
|
||||
err error
|
||||
status twitchChannelState
|
||||
|
@ -84,53 +97,137 @@ func (r *twitchWatcher) updateChannelFromAPI(channel string, sendUpdate bool) er
|
|||
return errors.Wrap(err, "getting stream info")
|
||||
}
|
||||
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
if r.ChannelStatus[channel] != nil && r.ChannelStatus[channel].Equals(status) {
|
||||
if t.ChannelStatus[channel] != nil && t.ChannelStatus[channel].Equals(status) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if sendUpdate && r.ChannelStatus[channel] != nil {
|
||||
if r.ChannelStatus[channel].Category != status.Category {
|
||||
log.WithFields(log.Fields{
|
||||
"channel": channel,
|
||||
"category": status.Category,
|
||||
}).Debug("Twitch metadata changed")
|
||||
go handleMessage(ircHdl.Client(), nil, eventTypeTwitchCategoryUpdate, plugins.FieldCollection{
|
||||
"channel": channel,
|
||||
"category": status.Category,
|
||||
})
|
||||
}
|
||||
|
||||
if r.ChannelStatus[channel].Title != status.Title {
|
||||
log.WithFields(log.Fields{
|
||||
"channel": channel,
|
||||
"title": status.Title,
|
||||
}).Debug("Twitch metadata changed")
|
||||
go handleMessage(ircHdl.Client(), nil, eventTypeTwitchTitleUpdate, plugins.FieldCollection{
|
||||
"channel": channel,
|
||||
"title": status.Title,
|
||||
})
|
||||
}
|
||||
|
||||
if r.ChannelStatus[channel].IsLive != status.IsLive {
|
||||
log.WithFields(log.Fields{
|
||||
"channel": channel,
|
||||
"isLive": status.IsLive,
|
||||
}).Debug("Twitch metadata changed")
|
||||
|
||||
evt := eventTypeTwitchStreamOnline
|
||||
if !status.IsLive {
|
||||
evt = eventTypeTwitchStreamOffline
|
||||
}
|
||||
|
||||
go handleMessage(ircHdl.Client(), nil, evt, plugins.FieldCollection{
|
||||
"channel": channel,
|
||||
})
|
||||
}
|
||||
if sendUpdate && t.ChannelStatus[channel] != nil {
|
||||
t.triggerUpdate(channel, &status.Title, &status.Category, &status.IsLive)
|
||||
return nil
|
||||
}
|
||||
|
||||
r.ChannelStatus[channel] = &status
|
||||
if status.unregisterFunc, err = t.registerEventSubCallbacks(channel); err != nil {
|
||||
return errors.Wrap(err, "registering eventsub callbacks")
|
||||
}
|
||||
|
||||
t.ChannelStatus[channel] = &status
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *twitchWatcher) registerEventSubCallbacks(channel string) (func(), error) {
|
||||
if twitchEventSubClient == nil {
|
||||
// We don't have eventsub functionality
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
userID, err := twitchClient.GetIDForUsername(channel)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "resolving channel to user-id")
|
||||
}
|
||||
|
||||
unsubCU, err := twitchEventSubClient.RegisterEventSubHooks(
|
||||
twitch.EventSubEventTypeChannelUpdate,
|
||||
twitch.EventSubCondition{BroadcasterUserID: userID},
|
||||
func(m json.RawMessage) error {
|
||||
var payload twitch.EventSubEventChannelUpdate
|
||||
if err := json.Unmarshal(m, &payload); err != nil {
|
||||
return errors.Wrap(err, "unmarshalling event")
|
||||
}
|
||||
|
||||
t.triggerUpdate(channel, &payload.Title, &payload.CategoryName, nil)
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "registering channel-update eventsub")
|
||||
}
|
||||
|
||||
unsubSOff, err := twitchEventSubClient.RegisterEventSubHooks(
|
||||
twitch.EventSubEventTypeStreamOffline,
|
||||
twitch.EventSubCondition{BroadcasterUserID: userID},
|
||||
func(m json.RawMessage) error {
|
||||
var payload twitch.EventSubEventStreamOffline
|
||||
if err := json.Unmarshal(m, &payload); err != nil {
|
||||
return errors.Wrap(err, "unmarshalling event")
|
||||
}
|
||||
|
||||
t.triggerUpdate(channel, nil, nil, func(v bool) *bool { return &v }(false))
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "registering channel-update eventsub")
|
||||
}
|
||||
|
||||
unsubSOn, err := twitchEventSubClient.RegisterEventSubHooks(
|
||||
twitch.EventSubEventTypeStreamOnline,
|
||||
twitch.EventSubCondition{BroadcasterUserID: userID},
|
||||
func(m json.RawMessage) error {
|
||||
var payload twitch.EventSubEventStreamOnline
|
||||
if err := json.Unmarshal(m, &payload); err != nil {
|
||||
return errors.Wrap(err, "unmarshalling event")
|
||||
}
|
||||
|
||||
t.triggerUpdate(channel, nil, nil, func(v bool) *bool { return &v }(true))
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "registering channel-update eventsub")
|
||||
}
|
||||
|
||||
return func() {
|
||||
unsubCU()
|
||||
unsubSOff()
|
||||
unsubSOn()
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *twitchWatcher) triggerUpdate(channel string, title, category *string, online *bool) {
|
||||
if category != nil && t.ChannelStatus[channel].Category != *category {
|
||||
t.ChannelStatus[channel].Category = *category
|
||||
log.WithFields(log.Fields{
|
||||
"channel": channel,
|
||||
"category": *category,
|
||||
}).Debug("Twitch metadata changed")
|
||||
go handleMessage(ircHdl.Client(), nil, eventTypeTwitchCategoryUpdate, plugins.FieldCollection{
|
||||
"channel": channel,
|
||||
"category": *category,
|
||||
})
|
||||
}
|
||||
|
||||
if title != nil && t.ChannelStatus[channel].Title != *title {
|
||||
t.ChannelStatus[channel].Title = *title
|
||||
log.WithFields(log.Fields{
|
||||
"channel": channel,
|
||||
"title": *title,
|
||||
}).Debug("Twitch metadata changed")
|
||||
go handleMessage(ircHdl.Client(), nil, eventTypeTwitchTitleUpdate, plugins.FieldCollection{
|
||||
"channel": channel,
|
||||
"title": *title,
|
||||
})
|
||||
}
|
||||
|
||||
if online != nil && t.ChannelStatus[channel].IsLive != *online {
|
||||
t.ChannelStatus[channel].IsLive = *online
|
||||
log.WithFields(log.Fields{
|
||||
"channel": channel,
|
||||
"isLive": *online,
|
||||
}).Debug("Twitch metadata changed")
|
||||
|
||||
evt := eventTypeTwitchStreamOnline
|
||||
if !*online {
|
||||
evt = eventTypeTwitchStreamOffline
|
||||
}
|
||||
|
||||
go handleMessage(ircHdl.Client(), nil, evt, plugins.FieldCollection{
|
||||
"channel": channel,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue