Compare commits

..

7 commits

Author SHA1 Message Date
01390583b2
prepare release v3.22.0 2023-12-14 19:25:57 +01:00
3ac5284583
[core] Fix: Properly handle channels without credentials
Signed-off-by: Knut Ahlers <knut@ahlers.me>
2023-12-14 16:49:33 +01:00
eec4966b82
[eventsub] Fix: Clean IPs from eventsub-socket read errors
Signed-off-by: Knut Ahlers <knut@ahlers.me>
2023-12-14 00:19:29 +01:00
5d0a5322a5
[editor] Display clear warning when ext perms are missing
Signed-off-by: Knut Ahlers <knut@ahlers.me>
2023-12-14 00:02:14 +01:00
1515536746
[twitch] Fix: Log correct error when wiping token fails
Signed-off-by: Knut Ahlers <knut@ahlers.me>
2023-12-13 23:31:06 +01:00
e1f11a6c98
[eventsub] Make topic subscriptions more dynamic
Signed-off-by: Knut Ahlers <knut@ahlers.me>
2023-12-13 23:14:50 +01:00
091dac235b
[eventsub] Update field naming for ad-break, use V1 event
Signed-off-by: Knut Ahlers <knut@ahlers.me>
2023-12-12 16:00:08 +01:00
11 changed files with 241 additions and 86 deletions

View file

@ -1,3 +1,15 @@
# 3.22.0 / 2023-12-14
* Improvements
* [editor] Display clear warning when ext perms are missing
* [eventsub] Make topic subscriptions more dynamic
* Bugfixes
* [core] Fix: Properly handle channels without credentials
* [eventsub] Fix: Clean IPs from eventsub-socket read errors
* [eventsub] Update field naming for ad-break, use V1 event
* [twitch] Fix: Log correct error when wiping token fails
# 3.21.0 / 2023-12-09 # 3.21.0 / 2023-12-09
* Improvements * Improvements

View file

@ -17,10 +17,11 @@ import (
type ( type (
configEditorGeneralConfig struct { configEditorGeneralConfig struct {
BotEditors []string `json:"bot_editors"` BotEditors []string `json:"bot_editors"`
BotName *string `json:"bot_name,omitempty"` BotName *string `json:"bot_name,omitempty"`
Channels []string `json:"channels"` Channels []string `json:"channels"`
ChannelScopes map[string][]string `json:"channel_scopes"` ChannelScopes map[string][]string `json:"channel_scopes"`
ChannelHasToken map[string]bool `json:"channel_has_token"`
} }
) )
@ -186,7 +187,12 @@ func configEditorHandleGeneralDeleteAuthToken(w http.ResponseWriter, r *http.Req
} }
func configEditorHandleGeneralGet(w http.ResponseWriter, _ *http.Request) { func configEditorHandleGeneralGet(w http.ResponseWriter, _ *http.Request) {
channelScopes := make(map[string][]string) resp := configEditorGeneralConfig{
BotEditors: config.BotEditors,
Channels: config.Channels,
ChannelHasToken: make(map[string]bool),
ChannelScopes: make(map[string][]string),
}
channels, err := accessService.ListPermittedChannels() channels, err := accessService.ListPermittedChannels()
if err != nil { if err != nil {
@ -195,7 +201,12 @@ func configEditorHandleGeneralGet(w http.ResponseWriter, _ *http.Request) {
} }
for _, ch := range channels { for _, ch := range channels {
if channelScopes[ch], err = accessService.GetChannelPermissions(ch); err != nil { if resp.ChannelScopes[ch], err = accessService.GetChannelPermissions(ch); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.ChannelHasToken[ch], err = accessService.HasTokensForChannel(ch); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
@ -206,13 +217,9 @@ func configEditorHandleGeneralGet(w http.ResponseWriter, _ *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
resp.BotName = &uName
if err = json.NewEncoder(w).Encode(configEditorGeneralConfig{ if err = json.NewEncoder(w).Encode(resp); err != nil {
BotEditors: config.BotEditors,
BotName: &uName,
Channels: config.Channels,
ChannelScopes: channelScopes,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
} }
} }

View file

@ -0,0 +1,33 @@
package helpers
import (
"errors"
"net"
"regexp"
)
var networkArrowErrorPart = regexp.MustCompile(` (?:(?:[0-9]+\.){3}[0-9]+:[0-9]+(?:->)?)+`)
// CleanNetworkAddressFromError checks whether an IP:Port->IP:port
// information is contained in the error. This is checked by explicitly
// sanitizing *net.OpError instances or by returning a sanitized error
// string without the stack previously present.
//
// As of the loss of information this is only intended to clean up
// logging and not be used in error returns.
func CleanNetworkAddressFromError(err error) error {
if opE, ok := err.(*net.OpError); ok {
// Error in the outmost position is an OpError, lets just patch it
opE.Source = nil
opE.Addr = nil
return opE
}
if networkArrowErrorPart.FindStringIndex(err.Error()) == nil {
// There is no network address somewhere inside, keep the error as is
return err
}
// Patch out IP information and create an new error with its message
return errors.New(networkArrowErrorPart.ReplaceAllString(err.Error(), ""))
}

View file

@ -1,35 +0,0 @@
package helpers
import (
"errors"
"net"
"regexp"
)
// CleanOpError checks whether a *net.OpError is included in the error
// and if so removes the included address information. This can happen
// in two ways: If the passed error is indeed an OpError the address
// info is just patched out. If the OpError is buried deeper inside
// the wrapped error stack, a new error with patched message is created
// sacrificing the wrapping and possible included stacktrace.
//
// As of the loss of information this is only intended to clean up
// logging and not be used in error returns.
func CleanOpError(err error) error {
if opE, ok := err.(*net.OpError); ok {
// Error in the outmost position is an OpError, lets just patch it
opE.Source = nil
opE.Addr = nil
return opE
}
var opE *net.OpError
if !errors.As(err, &opE) {
// There is no OpError somewhere inside, keep the error as is
return err
}
// Patch out IP information and create an new error with its message
return errors.New(regexp.MustCompile(` (?:(?:[0-9]+\.){3}[0-9]+:[0-9]+(?:->)?)+`).
ReplaceAllString(err.Error(), ""))
}

View file

@ -172,6 +172,13 @@ func (s Service) GetTwitchClientForChannel(channel string, cfg ClientConfig) (*t
return nil, errors.Wrap(err, "decrypting refresh token") return nil, errors.Wrap(err, "decrypting refresh token")
} }
if perm.AccessToken == "" && perm.RefreshToken == "" {
// We have no tokens but an entry in the permission table: Means
// we still can't do stuff on behalf of that channel so we treat
// that as an unauthorized channel
return nil, ErrChannelNotAuthorized
}
scopes := strings.Split(perm.Scopes, " ") scopes := strings.Split(perm.Scopes, " ")
tc := twitch.New(cfg.TwitchClient, cfg.TwitchClientSecret, perm.AccessToken, perm.RefreshToken) tc := twitch.New(cfg.TwitchClient, cfg.TwitchClientSecret, perm.AccessToken, perm.RefreshToken)
@ -212,6 +219,40 @@ func (s Service) HasPermissionsForChannel(channel string, scopes ...string) (boo
return true, nil return true, nil
} }
// HasTokensForChannel retrieves and decrypts stored access- and
// refresh-tokens to evaluate whether tokens are available. Those
// tokens are NOT validated in this request, it's just checked whether
// they are present
func (s Service) HasTokensForChannel(channel string) (bool, error) {
var (
err error
perm extendedPermission
)
if err = helpers.Retry(func() error {
err = s.db.DB().First(&perm, "channel = ?", strings.TrimLeft(channel, "#")).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return backoff.NewErrCannotRetry(ErrChannelNotAuthorized)
}
return errors.Wrap(err, "getting twitch credential from database")
}); err != nil {
if errors.Is(err, ErrChannelNotAuthorized) {
return false, nil
}
return false, err
}
if perm.AccessToken, err = s.db.DecryptField(perm.AccessToken); err != nil {
return false, errors.Wrap(err, "decrypting access token")
}
if perm.RefreshToken, err = s.db.DecryptField(perm.RefreshToken); err != nil {
return false, errors.Wrap(err, "decrypting refresh token")
}
return perm.AccessToken != "" && perm.RefreshToken != "", nil
}
func (s Service) ListPermittedChannels() (out []string, err error) { func (s Service) ListPermittedChannels() (out []string, err error) {
var perms []extendedPermission var perms []extendedPermission
if err = helpers.Retry(func() error { if err = helpers.Retry(func() error {

View file

@ -307,7 +307,7 @@ func main() {
go func() { go func() {
log.Info("(re-)connecting IRC client") log.Info("(re-)connecting IRC client")
if err := ircHdl.Run(); err != nil { if err := ircHdl.Run(); err != nil {
log.WithError(helpers.CleanOpError(err)).Error("IRC run exited unexpectedly") log.WithError(helpers.CleanNetworkAddressFromError(err)).Error("IRC run exited unexpectedly")
} }
time.Sleep(ircReconnectDelay) time.Sleep(ircReconnectDelay)
ircDisconnected <- struct{}{} ircDisconnected <- struct{}{}

View file

@ -51,13 +51,15 @@ type (
} }
EventSubEventAdBreakBegin struct { EventSubEventAdBreakBegin struct {
Duration int64 `json:"duration"` Duration int64 `json:"duration_seconds"`
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
IsAutomatic bool `json:"is_automatic"` IsAutomatic bool `json:"is_automatic"`
BroadcasterUserID string `json:"broadcaster_user_id"` BroadcasterUserID string `json:"broadcaster_user_id"`
BroadcasterUserLogin string `json:"broadcaster_user_login"` BroadcasterUserLogin string `json:"broadcaster_user_login"`
BroadcasterUserName string `json:"broadcaster_user_name"` BroadcasterUserName string `json:"broadcaster_user_name"`
RequesterUserID string `json:"requester_user_id"` RequesterUserID string `json:"requester_user_id"`
RequesterUserLogin string `json:"requester_user_login"`
RequesterUserName string `json:"requester_user_name"`
} }
EventSubEventChannelPointCustomRewardRedemptionAdd struct { EventSubEventChannelPointCustomRewardRedemptionAdd struct {

View file

@ -13,6 +13,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/Luzifer/go_helpers/v2/backoff"
"github.com/Luzifer/twitch-bot/v3/internal/helpers" "github.com/Luzifer/twitch-bot/v3/internal/helpers"
) )
@ -21,6 +22,10 @@ const (
socketConnectTimeout = 15 * time.Second socketConnectTimeout = 15 * time.Second
socketInitialTimeout = 30 * time.Second socketInitialTimeout = 30 * time.Second
socketTimeoutGraceMultiplier = 1.5 socketTimeoutGraceMultiplier = 1.5
retrySubscribeMaxTotal = 30 * time.Minute
retrySubscribeMaxWait = 5 * time.Minute
retrySubscribeMinWait = 30 * time.Second
) )
const ( const (
@ -70,9 +75,10 @@ type (
} }
eventSubSocketSubscriptionType struct { eventSubSocketSubscriptionType struct {
Event, Version string Event, Version string
Condition EventSubCondition Condition EventSubCondition
Callback func(json.RawMessage) error Callback func(json.RawMessage) error
BackgroundRetry bool
} }
eventSubSocketPayloadNotification struct { eventSubSocketPayloadNotification struct {
@ -136,11 +142,7 @@ func WithLogger(logger *logrus.Entry) EventSubSocketClientOpt {
return func(e *EventSubSocketClient) { e.logger = logger } return func(e *EventSubSocketClient) { e.logger = logger }
} }
func WithSocketURL(url string) EventSubSocketClientOpt { func WithMustSubscribe(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) EventSubSocketClientOpt {
return func(e *EventSubSocketClient) { e.socketDest = url }
}
func WithSubscription(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) EventSubSocketClientOpt {
if version == "" { if version == "" {
version = EventSubTopicVersion1 version = EventSubTopicVersion1
} }
@ -155,6 +157,26 @@ func WithSubscription(event, version string, condition EventSubCondition, callba
} }
} }
func WithRetryBackgroundSubscribe(event, version string, condition EventSubCondition, callback func(json.RawMessage) error) EventSubSocketClientOpt {
if version == "" {
version = EventSubTopicVersion1
}
return func(e *EventSubSocketClient) {
e.subscriptionTypes = append(e.subscriptionTypes, eventSubSocketSubscriptionType{
Event: event,
Version: version,
Condition: condition,
Callback: callback,
BackgroundRetry: true,
})
}
}
func WithSocketURL(url string) EventSubSocketClientOpt {
return func(e *EventSubSocketClient) { e.socketDest = url }
}
func WithTwitchClient(c *Client) EventSubSocketClientOpt { func WithTwitchClient(c *Client) EventSubSocketClientOpt {
return func(e *EventSubSocketClient) { e.twitch = c } return func(e *EventSubSocketClient) { e.twitch = c }
} }
@ -177,7 +199,7 @@ func (e *EventSubSocketClient) Run() error {
defer func() { defer func() {
if err := e.conn.Close(); err != nil { if err := e.conn.Close(); err != nil {
e.logger.WithError(helpers.CleanOpError(err)).Error("finally closing socket") e.logger.WithError(helpers.CleanNetworkAddressFromError(err)).Error("finally closing socket")
} }
}() }()
@ -382,7 +404,7 @@ func (e *EventSubSocketClient) handleWelcomeMessage(msg eventSubSocketMessage) (
// connection after something broke) // connection after something broke)
if e.socketID != payload.Session.ID { if e.socketID != payload.Session.ID {
e.socketID = payload.Session.ID e.socketID = payload.Session.ID
if err := e.subscribe(); err != nil { if err := e.subscribeAll(); err != nil {
return socketInitialTimeout, errors.Wrap(err, "subscribing to topics") return socketInitialTimeout, errors.Wrap(err, "subscribing to topics")
} }
} }
@ -393,21 +415,56 @@ func (e *EventSubSocketClient) handleWelcomeMessage(msg eventSubSocketMessage) (
return time.Duration(float64(payload.Session.KeepaliveTimeoutSeconds)*socketTimeoutGraceMultiplier) * time.Second, nil return time.Duration(float64(payload.Session.KeepaliveTimeoutSeconds)*socketTimeoutGraceMultiplier) * time.Second, nil
} }
func (e *EventSubSocketClient) subscribe() error { func (e *EventSubSocketClient) retryBackgroundSubscribe(st eventSubSocketSubscriptionType) {
for _, st := range e.subscriptionTypes { err := backoff.NewBackoff().
if _, err := e.twitch.createEventSubSubscriptionWebsocket(context.Background(), eventSubSubscription{ WithMaxIterationTime(retrySubscribeMaxWait).
Type: st.Event, WithMaxTotalTime(retrySubscribeMaxTotal).
Version: st.Version, WithMinIterationTime(retrySubscribeMinWait).
Condition: st.Condition, Retry(func() error {
Transport: eventSubTransport{ return e.subscribe(st)
Method: "websocket", })
SessionID: e.socketID, if err != nil {
}, e.logger.
}); err != nil { WithField("topic", strings.Join([]string{st.Event, st.Version}, "/")).
return errors.Wrapf(err, "subscribing to %s/%s", st.Event, st.Version) Error("gave up retrying to subscribe")
}
}
func (e *EventSubSocketClient) subscribe(st eventSubSocketSubscriptionType) error {
logger := e.logger.
WithField("topic", strings.Join([]string{st.Event, st.Version}, "/"))
if _, err := e.twitch.createEventSubSubscriptionWebsocket(context.Background(), eventSubSubscription{
Type: st.Event,
Version: st.Version,
Condition: st.Condition,
Transport: eventSubTransport{
Method: "websocket",
SessionID: e.socketID,
},
}); err != nil {
logger.WithError(err).Debug("subscribing to topic")
return errors.Wrapf(err, "subscribing to %s/%s", st.Event, st.Version)
}
logger.
WithField("topic", strings.Join([]string{st.Event, st.Version}, "/")).
Debug("subscribed to topic")
return nil
}
func (e *EventSubSocketClient) subscribeAll() (err error) {
for i := range e.subscriptionTypes {
st := e.subscriptionTypes[i]
if st.BackgroundRetry {
go e.retryBackgroundSubscribe(st)
continue
} }
e.logger.WithField("topic", strings.Join([]string{st.Event, st.Version}, "/")).Debug("subscribed to topic") if err = e.subscribe(st); err != nil {
return err
}
} }
return nil return nil

View file

@ -101,17 +101,22 @@ type (
// before in order to have the response body available in the returned // before in order to have the response body available in the returned
// HTTPError // HTTPError
func ValidateStatus(opts ClientRequestOpts, resp *http.Response) error { func ValidateStatus(opts ClientRequestOpts, resp *http.Response) error {
if resp.StatusCode == http.StatusTooManyRequests {
// Twitch doesn't want to hear any more of this
return backoff.NewErrCannotRetry(newHTTPError(resp.StatusCode, nil, nil))
}
if opts.OKStatus != 0 && resp.StatusCode != opts.OKStatus { if opts.OKStatus != 0 && resp.StatusCode != opts.OKStatus {
// We shall not accept this!
var ret error
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return newHTTPError(resp.StatusCode, nil, err) ret = newHTTPError(resp.StatusCode, nil, err)
} else {
ret = newHTTPError(resp.StatusCode, body, nil)
} }
return newHTTPError(resp.StatusCode, body, nil)
if resp.StatusCode == http.StatusTooManyRequests {
// Twitch doesn't want to hear any more of this
return backoff.NewErrCannotRetry(ret)
}
return ret
} }
return nil return nil
@ -174,7 +179,7 @@ func (c *Client) RefreshToken() error {
c.UpdateToken("", "") c.UpdateToken("", "")
if c.tokenUpdateHook != nil { if c.tokenUpdateHook != nil {
if herr := c.tokenUpdateHook("", ""); herr != nil { if herr := c.tokenUpdateHook("", ""); herr != nil {
log.WithError(err).Error("Unable to store token wipe after refresh failure") log.WithError(herr).Error("Unable to store token wipe after refresh failure")
} }
} }
return errors.Wrap(err, "executing request") return errors.Wrap(err, "executing request")

View file

@ -25,7 +25,14 @@
{{ channel }} {{ channel }}
<span class="ml-auto mr-2"> <span class="ml-auto mr-2">
<font-awesome-icon <font-awesome-icon
v-if="!hasAllExtendedScopes(channel)" v-if="!generalConfig.channel_has_token[channel]"
:id="`channelPublicWarn${channel}`"
fixed-width
class="ml-1 text-danger"
:icon="['fas', 'exclamation-triangle']"
/>
<font-awesome-icon
v-else-if="!hasAllExtendedScopes(channel)"
:id="`channelPublicWarn${channel}`" :id="`channelPublicWarn${channel}`"
fixed-width fixed-width
class="ml-1 text-warning" class="ml-1 text-warning"
@ -35,8 +42,14 @@
:target="`channelPublicWarn${channel}`" :target="`channelPublicWarn${channel}`"
triggers="hover" triggers="hover"
> >
Channel is missing {{ missingExtendedScopes(channel).length }} extended permissions. <template v-if="!generalConfig.channel_has_token[channel]">
Click pencil to change granted permissions. Bot is not authorized to access Twitch on behalf of this channels owner (tokens are missing).
Click pencil to grant permissions.
</template>
<template v-else>
Channel is missing {{ missingExtendedScopes(channel).length }} extended permissions.
Click pencil to change granted permissions.
</template>
</b-tooltip> </b-tooltip>
</span> </span>
<b-button-group size="sm"> <b-button-group size="sm">

View file

@ -21,6 +21,7 @@ type (
AnyScope bool AnyScope bool
Hook func(json.RawMessage) error Hook func(json.RawMessage) error
Version string Version string
Optional bool
} }
twitchChannelState struct { twitchChannelState struct {
@ -114,10 +115,11 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
return []topicRegistration{ return []topicRegistration{
{ {
Topic: twitch.EventSubEventTypeChannelAdBreakBegin, Topic: twitch.EventSubEventTypeChannelAdBreakBegin,
Version: twitch.EventSubTopicVersionBeta, Version: twitch.EventSubTopicVersion1,
Condition: twitch.EventSubCondition{BroadcasterUserID: userID}, Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: []string{twitch.ScopeChannelReadAds}, RequiredScopes: []string{twitch.ScopeChannelReadAds},
Hook: t.handleEventSubChannelAdBreakBegin, Hook: t.handleEventSubChannelAdBreakBegin,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelFollow, Topic: twitch.EventSubEventTypeChannelFollow,
@ -125,6 +127,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
Condition: twitch.EventSubCondition{BroadcasterUserID: userID, ModeratorUserID: userID}, Condition: twitch.EventSubCondition{BroadcasterUserID: userID, ModeratorUserID: userID},
RequiredScopes: []string{twitch.ScopeModeratorReadFollowers}, RequiredScopes: []string{twitch.ScopeModeratorReadFollowers},
Hook: t.handleEventSubChannelFollow, Hook: t.handleEventSubChannelFollow,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelPointCustomRewardRedemptionAdd, Topic: twitch.EventSubEventTypeChannelPointCustomRewardRedemptionAdd,
@ -132,6 +135,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
RequiredScopes: []string{twitch.ScopeChannelReadRedemptions, twitch.ScopeChannelManageRedemptions}, RequiredScopes: []string{twitch.ScopeChannelReadRedemptions, twitch.ScopeChannelManageRedemptions},
AnyScope: true, AnyScope: true,
Hook: t.handleEventSubChannelPointCustomRewardRedemptionAdd, Hook: t.handleEventSubChannelPointCustomRewardRedemptionAdd,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelPollBegin, Topic: twitch.EventSubEventTypeChannelPollBegin,
@ -139,6 +143,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls}, RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls},
AnyScope: true, AnyScope: true,
Hook: t.handleEventSubChannelPollChange(eventTypePollBegin), Hook: t.handleEventSubChannelPollChange(eventTypePollBegin),
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelPollEnd, Topic: twitch.EventSubEventTypeChannelPollEnd,
@ -146,6 +151,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls}, RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls},
AnyScope: true, AnyScope: true,
Hook: t.handleEventSubChannelPollChange(eventTypePollEnd), Hook: t.handleEventSubChannelPollChange(eventTypePollEnd),
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelPollProgress, Topic: twitch.EventSubEventTypeChannelPollProgress,
@ -153,12 +159,14 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls}, RequiredScopes: []string{twitch.ScopeChannelReadPolls, twitch.ScopeChannelManagePolls},
AnyScope: true, AnyScope: true,
Hook: t.handleEventSubChannelPollChange(eventTypePollProgress), Hook: t.handleEventSubChannelPollChange(eventTypePollProgress),
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelRaid, Topic: twitch.EventSubEventTypeChannelRaid,
Condition: twitch.EventSubCondition{FromBroadcasterUserID: userID}, Condition: twitch.EventSubCondition{FromBroadcasterUserID: userID},
RequiredScopes: nil, RequiredScopes: nil,
Hook: t.handleEventSubChannelOutboundRaid, Hook: t.handleEventSubChannelOutboundRaid,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelShoutoutCreate, Topic: twitch.EventSubEventTypeChannelShoutoutCreate,
@ -166,6 +174,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
RequiredScopes: []string{twitch.ScopeModeratorManageShoutouts, twitch.ScopeModeratorReadShoutouts}, RequiredScopes: []string{twitch.ScopeModeratorManageShoutouts, twitch.ScopeModeratorReadShoutouts},
AnyScope: true, AnyScope: true,
Hook: t.handleEventSubShoutoutCreated, Hook: t.handleEventSubShoutoutCreated,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelShoutoutReceive, Topic: twitch.EventSubEventTypeChannelShoutoutReceive,
@ -173,6 +182,7 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
RequiredScopes: []string{twitch.ScopeModeratorManageShoutouts, twitch.ScopeModeratorReadShoutouts}, RequiredScopes: []string{twitch.ScopeModeratorManageShoutouts, twitch.ScopeModeratorReadShoutouts},
AnyScope: true, AnyScope: true,
Hook: t.handleEventSubShoutoutReceived, Hook: t.handleEventSubShoutoutReceived,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeChannelUpdate, Topic: twitch.EventSubEventTypeChannelUpdate,
@ -180,18 +190,21 @@ func (t *twitchWatcher) getTopicRegistrations(userID string) []topicRegistration
Condition: twitch.EventSubCondition{BroadcasterUserID: userID}, Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: nil, RequiredScopes: nil,
Hook: t.handleEventSubChannelUpdate, Hook: t.handleEventSubChannelUpdate,
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeStreamOffline, Topic: twitch.EventSubEventTypeStreamOffline,
Condition: twitch.EventSubCondition{BroadcasterUserID: userID}, Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: nil, RequiredScopes: nil,
Hook: t.handleEventSubStreamOnOff(false), Hook: t.handleEventSubStreamOnOff(false),
Optional: true,
}, },
{ {
Topic: twitch.EventSubEventTypeStreamOnline, Topic: twitch.EventSubEventTypeStreamOnline,
Condition: twitch.EventSubCondition{BroadcasterUserID: userID}, Condition: twitch.EventSubCondition{BroadcasterUserID: userID},
RequiredScopes: nil, RequiredScopes: nil,
Hook: t.handleEventSubStreamOnOff(true), Hook: t.handleEventSubStreamOnOff(true),
Optional: true,
}, },
} }
} }
@ -420,7 +433,7 @@ func (t *twitchWatcher) updateChannelFromAPI(channel string) error {
log.WithField("channel", channel).Info("watching for eventsub events") log.WithField("channel", channel).Info("watching for eventsub events")
go func(storedStatus *twitchChannelState) { go func(storedStatus *twitchChannelState) {
if err := storedStatus.esc.Run(); err != nil { if err := storedStatus.esc.Run(); err != nil {
log.WithField("channel", channel).WithError(helpers.CleanOpError(err)).Error("eventsub client caused error") log.WithField("channel", channel).WithError(helpers.CleanNetworkAddressFromError(err)).Error("eventsub client caused error")
} }
storedStatus.CloseESC() storedStatus.CloseESC()
}(storedStatus) }(storedStatus)
@ -477,7 +490,14 @@ func (t *twitchWatcher) registerEventSubCallbacks(channel string) (*twitch.Event
} }
} }
topicOpts = append(topicOpts, twitch.WithSubscription(tr.Topic, tr.Version, tr.Condition, tr.Hook)) var opt twitch.EventSubSocketClientOpt
if tr.Optional {
opt = twitch.WithRetryBackgroundSubscribe(tr.Topic, tr.Version, tr.Condition, tr.Hook)
} else {
opt = twitch.WithMustSubscribe(tr.Topic, tr.Version, tr.Condition, tr.Hook)
}
topicOpts = append(topicOpts, opt)
} }
esClient, err := twitch.NewEventSubSocketClient(append( esClient, err := twitch.NewEventSubSocketClient(append(