[core] Improve EventSub API request design

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2021-12-24 19:59:20 +01:00
parent 9cfcdaaf25
commit 246bb2811d
Signed by: luzifer
GPG key ID: 0066F03ED215AD7D
5 changed files with 282 additions and 221 deletions

View file

@ -15,7 +15,7 @@ func getAuthorizationFromRequest(r *http.Request) (string, *twitch.Client, error
return "", nil, errors.New("no authorization provided")
}
tc := twitch.New(cfg.TwitchClient, token)
tc := twitch.New(cfg.TwitchClient, cfg.TwitchClientSecret, token)
user, err := tc.GetAuthorizedUsername()
return user, tc, errors.Wrap(err, "getting authorized user")

View file

@ -140,7 +140,7 @@ func main() {
var err error
cronService = cron.New()
twitchClient = twitch.New(cfg.TwitchClient, cfg.TwitchToken)
twitchClient = twitch.New(cfg.TwitchClient, cfg.TwitchClientSecret, cfg.TwitchToken)
twitchWatch := newTwitchWatcher()
cronService.AddFunc("@every 10s", twitchWatch.Check) // Query may run that often as the twitchClient has an internal cache
@ -219,16 +219,12 @@ func main() {
log.WithError(err).Fatal("Unable to get or create eventsub secret")
}
twitchEventSubClient = twitch.NewEventSubClient(strings.Join([]string{
twitchEventSubClient = twitch.NewEventSubClient(twitchClient, strings.Join([]string{
strings.TrimRight(cfg.BaseURL, "/"),
"eventsub",
handle,
}, "/"), secret, handle)
if err = twitchEventSubClient.Authorize(cfg.TwitchClient, cfg.TwitchClientSecret); err != nil {
log.WithError(err).Fatal("Unable to authorize Twitch EventSub client")
}
router.HandleFunc("/eventsub/{keyhandle}", twitchEventSubClient.HandleEventsubPush).Methods(http.MethodPost)
}
}

View file

@ -75,7 +75,7 @@ func TestAllowExecuteDisableOnOffline(t *testing.T) {
r := &Rule{DisableOnOffline: testPtrBool(true)}
// Fake cache entries to prevent calling the real Twitch API
r.twitchClient = twitch.New("", "")
r.twitchClient = twitch.New("", "", "")
r.twitchClient.APICache().Set([]string{"hasLiveStream", "channel1"}, time.Minute, true)
r.twitchClient.APICache().Set([]string{"hasLiveStream", "channel2"}, time.Minute, false)

View file

@ -8,9 +8,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
"time"
@ -55,9 +53,7 @@ type (
secret string
secretHandle string
twitchClientID string
twitchClientSecret string
twitchAccessToken string
twitchClient *Client
subscriptions map[string]*registeredSubscription
subscriptionsLock sync.RWMutex
@ -151,71 +147,18 @@ func (e EventSubCondition) Hash() (string, error) {
return fmt.Sprintf("%x", h), nil
}
func NewEventSubClient(apiURL, secret, secretHandle string) *EventSubClient {
func NewEventSubClient(twitchClient *Client, apiURL, secret, secretHandle string) *EventSubClient {
return &EventSubClient{
apiURL: apiURL,
secret: secret,
secretHandle: secretHandle,
twitchClient: twitchClient,
subscriptions: map[string]*registeredSubscription{},
}
}
func (e *EventSubClient) Authorize(clientID, clientSecret string) error {
e.twitchClientID = clientID
e.twitchClientSecret = clientSecret
_, err := e.getTwitchAppAccessToken()
return errors.Wrap(err, "fetching app access token")
}
func (e *EventSubClient) getTwitchAppAccessToken() (string, error) {
if e.twitchAccessToken != "" {
return e.twitchAccessToken, nil
}
var rData struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresIn int `json:"expires_in"`
Scope []interface{} `json:"scope"`
TokenType string `json:"token_type"`
}
params := make(url.Values)
params.Set("client_id", e.twitchClientID)
params.Set("client_secret", e.twitchClientSecret)
params.Set("grant_type", "client_credentials")
u, _ := url.Parse("https://id.twitch.tv/oauth2/token")
u.RawQuery = params.Encode()
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
defer cancel()
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", errors.Wrap(err, "fetching response")
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", errors.Wrapf(err, "unexpected status %d and cannot read body", resp.StatusCode)
}
return "", errors.Errorf("unexpected status %d: %s", resp.StatusCode, body)
}
e.twitchAccessToken = rData.AccessToken
return rData.AccessToken, errors.Wrap(
json.NewDecoder(resp.Body).Decode(&rData),
"decoding response",
)
}
func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Request) {
var (
body = new(bytes.Buffer)
@ -292,7 +235,7 @@ func (e *EventSubClient) HandleEventsubPush(w http.ResponseWriter, r *http.Reque
}
}
//nolint:funlen,gocyclo // Not splitting to keep logic unit
//nolint:funlen // Not splitting to keep logic unit
func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubCondition, callback func(json.RawMessage) error) (func(), error) {
condHash, err := condition.Hash()
if err != nil {
@ -321,39 +264,20 @@ func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubC
return func() { e.unregisterCallback(cacheKey, cbKey) }, nil
}
accessToken, err := e.getTwitchAppAccessToken()
if err != nil {
return nil, errors.Wrap(err, "getting app-access-token")
}
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
defer cancel()
// List existing subscriptions
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https://api.twitch.tv/helix/eventsub/subscriptions", nil)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Client-Id", e.twitchClientID)
req.Header.Set("Authorization", "Bearer "+accessToken)
resp, err := http.DefaultClient.Do(req)
subList, err := e.twitchClient.getEventSubSubscriptions(ctx)
if err != nil {
return nil, errors.Wrap(err, "requesting subscribscriptions")
}
defer resp.Body.Close()
var subscriptionList struct {
Data []eventSubSubscription
}
if err = json.NewDecoder(resp.Body).Decode(&subscriptionList); err != nil {
return nil, errors.Wrap(err, "decoding subscription list")
return nil, errors.Wrap(err, "listing existing subscriptions")
}
// Register subscriptions
var (
existingSub *eventSubSubscription
)
for i, sub := range subscriptionList.Data {
for i, sub := range subList {
existingConditionHash, err := sub.Condition.Hash()
if err != nil {
return nil, errors.Wrap(err, "hashing existing condition")
@ -371,7 +295,7 @@ func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubC
"id": sub.ID,
"status": sub.Status,
})
existingSub = &subscriptionList.Data[i]
existingSub = &subList[i]
}
}
@ -395,7 +319,10 @@ func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubC
return func() { e.unregisterCallback(cacheKey, cbKey) }, nil
}
payload := eventSubSubscription{
ctx, cancel = context.WithTimeout(context.Background(), twitchRequestTimeout)
defer cancel()
newSub, err := e.twitchClient.createEventSubSubscription(ctx, eventSubSubscription{
Type: event,
Version: "1",
Condition: condition,
@ -404,43 +331,9 @@ func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubC
Callback: e.apiURL,
Secret: e.secret,
},
}
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(payload); err != nil {
return nil, errors.Wrap(err, "assemble subscribe payload")
}
ctx, cancel = context.WithTimeout(context.Background(), twitchRequestTimeout)
defer cancel()
req, err = http.NewRequestWithContext(ctx, http.MethodPost, "https://api.twitch.tv/helix/eventsub/subscriptions", buf)
})
if err != nil {
return nil, errors.Wrap(err, "creating subscribe request")
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Client-Id", e.twitchClientID)
req.Header.Set("Authorization", "Bearer "+accessToken)
resp, err = http.DefaultClient.Do(req)
if err != nil {
return nil, errors.Wrap(err, "requesting subscribe")
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrapf(err, "unexpected status %d, unable to read body", resp.StatusCode)
}
return nil, errors.Errorf("unexpected status %d: %s", resp.StatusCode, body)
}
var response struct {
Data []eventSubSubscription `json:"data"`
}
if err = json.NewDecoder(resp.Body).Decode(&response); err != nil {
return nil, errors.Wrap(err, "reading eventsub sub response")
return nil, errors.Wrap(err, "creating subscription")
}
e.subscriptionsLock.Lock()
@ -454,7 +347,7 @@ func (e *EventSubClient) RegisterEventSubHooks(event string, condition EventSubC
Callbacks: map[string]func(json.RawMessage) error{
cbKey: callback,
},
Subscription: response.Data[0],
Subscription: *newSub,
}
logger.Debug("Registered eventsub subscription")
@ -491,30 +384,13 @@ func (e *EventSubClient) unregisterCallback(cacheKey, cbKey string) {
return
}
accessToken, err := e.getTwitchAppAccessToken()
if err != nil {
log.WithError(err).Error("Unable to get access token")
return
}
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, fmt.Sprintf("https://api.twitch.tv/helix/eventsub/subscriptions?id=%s", regSub.Subscription.ID), nil)
if err != nil {
log.WithError(err).Error("Unable to create delete subscription request")
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Client-Id", e.twitchClientID)
req.Header.Set("Authorization", "Bearer "+accessToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
if err := e.twitchClient.deleteEventSubSubscription(ctx, regSub.Subscription.ID); err != nil {
log.WithError(err).Error("Unable to execute delete subscription request")
return
}
defer resp.Body.Close()
e.subscriptionsLock.Lock()
defer e.subscriptionsLock.Unlock()

View file

@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
@ -26,6 +27,12 @@ const (
twitchRequestTimeout = 2 * time.Second
)
const (
authTypeUnauthorized authType = iota
authTypeAppAccessToken
authTypeBearerToken
)
type (
Category struct {
BoxArtURL string `json:"box_art_url"`
@ -35,8 +42,11 @@ type (
Client struct {
clientID string
clientSecret string
token string
appAccessToken string
apiCache *APICache
}
@ -63,11 +73,24 @@ type (
Login string `json:"login"`
ProfileImageURL string `json:"profile_image_url"`
}
authType uint8
clientRequestOpts struct {
AuthType authType
Body io.Reader
Context context.Context
Method string
OKStatus int
Out interface{}
URL string
}
)
func New(clientID, token string) *Client {
func New(clientID, clientSecret, token string) *Client {
return &Client{
clientID: clientID,
clientSecret: clientSecret,
token: token,
apiCache: newTwitchAPICache(),
@ -81,13 +104,14 @@ func (c Client) GetAuthorizedUsername() (string, error) {
Data []User `json:"data"`
}
if err := c.request(
context.Background(),
http.MethodGet,
"https://api.twitch.tv/helix/users",
nil,
&payload,
); err != nil {
if err := c.request(clientRequestOpts{
AuthType: authTypeBearerToken,
Context: context.Background(),
Method: http.MethodGet,
OKStatus: http.StatusOK,
Out: &payload,
URL: "https://api.twitch.tv/helix/users",
}); err != nil {
return "", errors.Wrap(err, "request channel info")
}
@ -108,13 +132,13 @@ func (c Client) GetDisplayNameForUser(username string) (string, error) {
Data []User `json:"data"`
}
if err := c.request(
context.Background(),
http.MethodGet,
fmt.Sprintf("https://api.twitch.tv/helix/users?login=%s", username),
nil,
&payload,
); err != nil {
if err := c.request(clientRequestOpts{
AuthType: authTypeBearerToken,
Context: context.Background(),
Method: http.MethodGet,
Out: &payload,
URL: fmt.Sprintf("https://api.twitch.tv/helix/users?login=%s", username),
}); err != nil {
return "", errors.Wrap(err, "request channel info")
}
@ -149,13 +173,14 @@ func (c Client) GetFollowDate(from, to string) (time.Time, error) {
} `json:"data"`
}
if err := c.request(
context.Background(),
http.MethodGet,
fmt.Sprintf("https://api.twitch.tv/helix/users/follows?to_id=%s&from_id=%s", toID, fromID),
nil,
&payload,
); err != nil {
if err := c.request(clientRequestOpts{
AuthType: authTypeBearerToken,
Context: context.Background(),
Method: http.MethodGet,
OKStatus: http.StatusOK,
Out: &payload,
URL: fmt.Sprintf("https://api.twitch.tv/helix/users/follows?to_id=%s&from_id=%s", toID, fromID),
}); err != nil {
return time.Time{}, errors.Wrap(err, "request follow info")
}
@ -188,13 +213,14 @@ func (c Client) GetUserInformation(user string) (*User, error) {
param = "id"
}
if err := c.request(
context.Background(),
http.MethodGet,
fmt.Sprintf("https://api.twitch.tv/helix/users?%s=%s", param, user),
nil,
&payload,
); err != nil {
if err := c.request(clientRequestOpts{
AuthType: authTypeBearerToken,
Context: context.Background(),
Method: http.MethodGet,
OKStatus: http.StatusOK,
Out: &payload,
URL: fmt.Sprintf("https://api.twitch.tv/helix/users?%s=%s", param, user),
}); err != nil {
return nil, errors.Wrap(err, "request user info")
}
@ -224,7 +250,14 @@ func (c Client) SearchCategories(ctx context.Context, name string) ([]Category,
}
for {
if err := c.request(ctx, http.MethodGet, fmt.Sprintf("https://api.twitch.tv/helix/search/categories?%s", params.Encode()), nil, &resp); err != nil {
if err := c.request(clientRequestOpts{
AuthType: authTypeBearerToken,
Context: ctx,
Method: http.MethodGet,
OKStatus: http.StatusOK,
Out: &resp,
URL: fmt.Sprintf("https://api.twitch.tv/helix/search/categories?%s", params.Encode()),
}); err != nil {
return nil, errors.Wrap(err, "executing request")
}
@ -255,13 +288,14 @@ func (c Client) HasLiveStream(username string) (bool, error) {
} `json:"data"`
}
if err := c.request(
context.Background(),
http.MethodGet,
fmt.Sprintf("https://api.twitch.tv/helix/streams?user_login=%s", username),
nil,
&payload,
); err != nil {
if err := c.request(clientRequestOpts{
AuthType: authTypeBearerToken,
Context: context.Background(),
Method: http.MethodGet,
OKStatus: http.StatusOK,
Out: &payload,
URL: fmt.Sprintf("https://api.twitch.tv/helix/streams?user_login=%s", username),
}); err != nil {
return false, errors.Wrap(err, "request stream info")
}
@ -286,13 +320,14 @@ func (c Client) GetCurrentStreamInfo(username string) (*StreamInfo, error) {
Data []*StreamInfo `json:"data"`
}
if err := c.request(
context.Background(),
http.MethodGet,
fmt.Sprintf("https://api.twitch.tv/helix/streams?user_id=%s", id),
nil,
&payload,
); err != nil {
if err := c.request(clientRequestOpts{
AuthType: authTypeBearerToken,
Context: context.Background(),
Method: http.MethodGet,
OKStatus: http.StatusOK,
Out: &payload,
URL: fmt.Sprintf("https://api.twitch.tv/helix/streams?user_id=%s", id),
}); err != nil {
return nil, errors.Wrap(err, "request channel info")
}
@ -316,13 +351,14 @@ func (c Client) GetIDForUsername(username string) (string, error) {
Data []User `json:"data"`
}
if err := c.request(
context.Background(),
http.MethodGet,
fmt.Sprintf("https://api.twitch.tv/helix/users?login=%s", username),
nil,
&payload,
); err != nil {
if err := c.request(clientRequestOpts{
AuthType: authTypeBearerToken,
Context: context.Background(),
Method: http.MethodGet,
OKStatus: http.StatusOK,
Out: &payload,
URL: fmt.Sprintf("https://api.twitch.tv/helix/users?login=%s", username),
}); err != nil {
return "", errors.Wrap(err, "request channel info")
}
@ -356,13 +392,14 @@ func (c Client) GetRecentStreamInfo(username string) (string, string, error) {
} `json:"data"`
}
if err := c.request(
context.Background(),
http.MethodGet,
fmt.Sprintf("https://api.twitch.tv/helix/channels?broadcaster_id=%s", id),
nil,
&payload,
); err != nil {
if err := c.request(clientRequestOpts{
AuthType: authTypeBearerToken,
Context: context.Background(),
Method: http.MethodGet,
OKStatus: http.StatusOK,
Out: &payload,
URL: fmt.Sprintf("https://api.twitch.tv/helix/channels?broadcaster_id=%s", id),
}); err != nil {
return "", "", errors.Wrap(err, "request channel info")
}
@ -429,28 +466,172 @@ func (c Client) ModifyChannelInformation(ctx context.Context, broadcasterName st
}
return errors.Wrap(
c.request(ctx, http.MethodPatch, fmt.Sprintf("https://api.twitch.tv/helix/channels?broadcaster_id=%s", broadcaster), body, nil),
c.request(clientRequestOpts{
AuthType: authTypeBearerToken,
Body: body,
Context: ctx,
Method: http.MethodPatch,
OKStatus: http.StatusOK,
URL: fmt.Sprintf("https://api.twitch.tv/helix/channels?broadcaster_id=%s", broadcaster),
}),
"executing request",
)
}
func (c Client) request(ctx context.Context, method, url string, body io.Reader, out interface{}) error {
func (c *Client) createEventSubSubscription(ctx context.Context, sub eventSubSubscription) (*eventSubSubscription, error) {
var (
buf = new(bytes.Buffer)
resp struct {
Total int64 `json:"total"`
Data []eventSubSubscription `json:"data"`
Pagination struct {
Cursor string `json:"cursor"`
} `json:"pagination"`
}
)
if err := json.NewEncoder(buf).Encode(sub); err != nil {
return nil, errors.Wrap(err, "assemble subscribe payload")
}
if err := c.request(clientRequestOpts{
AuthType: authTypeAppAccessToken,
Body: buf,
Context: ctx,
Method: http.MethodPost,
OKStatus: http.StatusAccepted,
Out: &resp,
URL: "https://api.twitch.tv/helix/eventsub/subscriptions",
}); err != nil {
return nil, errors.Wrap(err, "executing request")
}
return &resp.Data[0], nil
}
func (c *Client) deleteEventSubSubscription(ctx context.Context, id string) error {
return errors.Wrap(c.request(clientRequestOpts{
AuthType: authTypeAppAccessToken,
Context: ctx,
Method: http.MethodDelete,
OKStatus: http.StatusNoContent,
URL: fmt.Sprintf("https://api.twitch.tv/helix/eventsub/subscriptions?id=%s", id),
}), "executing request")
}
func (c *Client) getEventSubSubscriptions(ctx context.Context) ([]eventSubSubscription, error) {
var (
out []eventSubSubscription
params = make(url.Values)
resp struct {
Total int64 `json:"total"`
Data []eventSubSubscription `json:"data"`
Pagination struct {
Cursor string `json:"cursor"`
} `json:"pagination"`
}
)
for {
if err := c.request(clientRequestOpts{
AuthType: authTypeAppAccessToken,
Context: ctx,
Method: http.MethodGet,
OKStatus: http.StatusOK,
Out: &resp,
URL: fmt.Sprintf("https://api.twitch.tv/helix/eventsub/subscriptions?%s", params.Encode()),
}); err != nil {
return nil, errors.Wrap(err, "executing request")
}
out = append(out, resp.Data...)
if resp.Pagination.Cursor == "" {
break
}
params.Set("after", resp.Pagination.Cursor)
resp.Pagination.Cursor = "" // Clear from struct as struct is reused
}
return out, nil
}
func (c *Client) getTwitchAppAccessToken() (string, error) {
if c.appAccessToken != "" {
return c.appAccessToken, nil
}
var rData struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresIn int `json:"expires_in"`
Scope []interface{} `json:"scope"`
TokenType string `json:"token_type"`
}
params := make(url.Values)
params.Set("client_id", c.clientID)
params.Set("client_secret", c.clientSecret)
params.Set("grant_type", "client_credentials")
u, _ := url.Parse("https://id.twitch.tv/oauth2/token")
u.RawQuery = params.Encode()
ctx, cancel := context.WithTimeout(context.Background(), twitchRequestTimeout)
defer cancel()
if err := c.request(clientRequestOpts{
AuthType: authTypeUnauthorized,
Context: ctx,
Method: http.MethodPost,
OKStatus: http.StatusOK,
Out: &rData,
URL: u.String(),
}); err != nil {
return "", errors.Wrap(err, "fetching token response")
}
c.appAccessToken = rData.AccessToken
return rData.AccessToken, nil
}
func (c *Client) request(opts clientRequestOpts) error {
log.WithFields(log.Fields{
"method": method,
"url": url,
"method": opts.Method,
"url": opts.URL,
}).Trace("Execute Twitch API request")
return backoff.NewBackoff().WithMaxIterations(twitchRequestRetries).Retry(func() error {
reqCtx, cancel := context.WithTimeout(ctx, twitchRequestTimeout)
reqCtx, cancel := context.WithTimeout(opts.Context, twitchRequestTimeout)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, method, url, body)
req, err := http.NewRequestWithContext(reqCtx, opts.Method, opts.URL, opts.Body)
if err != nil {
return errors.Wrap(err, "assemble request")
}
req.Header.Set("Content-Type", "application/json")
switch opts.AuthType {
case authTypeUnauthorized:
// Nothing to do
case authTypeAppAccessToken:
accessToken, err := c.getTwitchAppAccessToken()
if err != nil {
return errors.Wrap(err, "getting app-access-token")
}
req.Header.Set("Authorization", "Bearer "+accessToken)
req.Header.Set("Client-Id", c.clientID)
case authTypeBearerToken:
req.Header.Set("Authorization", "Bearer "+c.token)
req.Header.Set("Client-Id", c.clientID)
default:
return errors.New("invalid auth type specified")
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
@ -458,12 +639,20 @@ func (c Client) request(ctx context.Context, method, url string, body io.Reader,
}
defer resp.Body.Close()
if out == nil {
if opts.OKStatus != 0 && resp.StatusCode != opts.OKStatus {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return errors.Wrapf(err, "unexpected status %d and cannot read body", resp.StatusCode)
}
return errors.Errorf("unexpected status %d: %s", resp.StatusCode, body)
}
if opts.Out == nil {
return nil
}
return errors.Wrap(
json.NewDecoder(resp.Body).Decode(out),
json.NewDecoder(resp.Body).Decode(opts.Out),
"parse user info",
)
})