Add polling for live-streams (#1)

This commit is contained in:
Knut Ahlers 2021-08-01 02:13:05 +02:00 committed by GitHub
parent cd0496a359
commit 22c13bca00
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 127 additions and 53 deletions

View file

@ -2,6 +2,7 @@ package main
import ( import (
"fmt" "fmt"
"strconv"
"strings" "strings"
"time" "time"
@ -31,6 +32,17 @@ func (m moduleAttributeStore) Expect(keys ...string) error {
return nil return nil
} }
func (m moduleAttributeStore) MustBool(name string, defVal *bool) bool {
v, err := m.Bool(name)
if err != nil {
if defVal != nil {
return *defVal
}
panic(err)
}
return v
}
func (m moduleAttributeStore) MustDuration(name string, defVal *time.Duration) time.Duration { func (m moduleAttributeStore) MustDuration(name string, defVal *time.Duration) time.Duration {
v, err := m.Duration(name) v, err := m.Duration(name)
if err != nil { if err != nil {
@ -64,6 +76,23 @@ func (m moduleAttributeStore) MustString(name string, defVal *string) string {
return v return v
} }
func (m moduleAttributeStore) Bool(name string) (bool, error) {
v, ok := m[name]
if !ok {
return false, errValueNotSet
}
switch v := v.(type) {
case bool:
return v, nil
case string:
bv, err := strconv.ParseBool(v)
return bv, errors.Wrap(err, "parsing string to bool")
}
return false, errValueMismatch
}
func (m moduleAttributeStore) Duration(name string) (time.Duration, error) { func (m moduleAttributeStore) Duration(name string) (time.Duration, error) {
v, err := m.String(name) v, err := m.String(name)
if err != nil { if err != nil {

View file

@ -3,10 +3,12 @@ package main
import "time" import "time"
var ( var (
ptrBoolFalse = ptrBool(false)
ptrInt64Zero = ptrInt64(0) ptrInt64Zero = ptrInt64(0)
ptrStringEmpty = ptrString("") ptrStringEmpty = ptrString("")
) )
func ptrBool(v bool) *bool { return &v }
func ptrDuration(v time.Duration) *time.Duration { return &v } func ptrDuration(v time.Duration) *time.Duration { return &v }
func ptrInt64(v int64) *int64 { return &v } func ptrInt64(v int64) *int64 { return &v }
func ptrString(v string) *string { return &v } func ptrString(v string) *string { return &v }

View file

@ -45,19 +45,101 @@ func (m *modLivePosting) Initialize(crontab *cron.Cron, discord *discordgo.Sessi
if err := attrs.Expect( if err := attrs.Expect(
"discord_channel_id", "discord_channel_id",
"post_text", "post_text",
"whitelisted_role",
"twitch_client_id", "twitch_client_id",
"twitch_client_secret", "twitch_client_secret",
); err != nil { ); err != nil {
return errors.Wrap(err, "validating attributes") return errors.Wrap(err, "validating attributes")
} }
// @attr disable_presence optional bool "false" Disable posting live-postings for discord presence changes
if !attrs.MustBool("disable_presence", ptrBoolFalse) {
discord.AddHandler(m.handlePresenceUpdate) discord.AddHandler(m.handlePresenceUpdate)
}
// @attr cron optional string "*/5 * * * *" Fetch live status of `poll_usernames` (set to empty string to disable): keep this below `stream_freshness` or you might miss streams
if cronDirective := attrs.MustString("cron", ptrString("*/5 * * * *")); cronDirective != "" {
if _, err := crontab.AddFunc(cronDirective, m.cronFetchChannelStatus); err != nil {
return errors.Wrap(err, "adding cron function")
}
}
return nil
}
func (m modLivePosting) cronFetchChannelStatus() {
// @attr poll_usernames optional []string "[]" Check these usernames for active streams when executing the `cron` (at most 100 users can be checked)
usernames, err := m.attrs.StringSlice("poll_usernames")
switch err {
case nil:
// We got a list of users
case errValueNotSet:
// There is no list of users
return
default:
log.WithError(err).Error("Unable to get poll_usernames list")
return
}
log.WithField("entries", len(usernames)).Trace("Fetching streams for users (cron)")
if err = m.fetchAndPostForUsername(usernames...); err != nil {
log.WithError(err).Error("Unable to post status for users")
}
}
func (m modLivePosting) fetchAndPostForUsername(usernames ...string) error {
twitch := newTwitchAdapter(
// @attr twitch_client_id required string "" Twitch client ID the token was issued for
m.attrs.MustString("twitch_client_id", nil),
// @attr twitch_client_secret required string "" Secret for the Twitch app identified with twitch_client_id
m.attrs.MustString("twitch_client_secret", nil),
"", // No User-Token used
)
users, err := twitch.GetUserByUsername(context.Background(), usernames...)
if err != nil {
return errors.Wrap(err, "fetching twitch user details")
}
streams, err := twitch.GetStreamsForUser(context.Background(), usernames...)
if err != nil {
return errors.Wrap(err, "fetching streams for user")
}
log.WithFields(log.Fields{
"streams": len(streams.Data),
"users": len(users.Data),
}).Trace("Found active streams from users")
for _, stream := range streams.Data {
for _, user := range users.Data {
if user.ID != stream.UserID {
continue
}
// @attr stream_freshness optional duration "5m" How long after stream start to post shoutout
streamFreshness := m.attrs.MustDuration("stream_freshness", ptrDuration(livePostingDefaultStreamFreshness))
if time.Since(stream.StartedAt) > streamFreshness {
// Stream is too old, don't annoounce
return nil
}
if err = m.sendLivePost(
user.Login,
user.DisplayName,
stream.Title,
stream.GameName,
stream.ThumbnailURL,
user.ProfileImageURL,
); err != nil {
return errors.Wrap(err, "sending post")
}
}
}
return nil return nil
} }
//nolint: gocyclo // One directive too many, makes no sense to split
func (m modLivePosting) handlePresenceUpdate(d *discordgo.Session, p *discordgo.PresenceUpdate) { func (m modLivePosting) handlePresenceUpdate(d *discordgo.Session, p *discordgo.PresenceUpdate) {
if p.User == nil { if p.User == nil {
// The frick? Non-user presence? // The frick? Non-user presence?
@ -108,52 +190,8 @@ func (m modLivePosting) handlePresenceUpdate(d *discordgo.Session, p *discordgo.
twitchUsername := strings.TrimLeft(u.Path, "/") twitchUsername := strings.TrimLeft(u.Path, "/")
twitch := newTwitchAdapter( if err = m.fetchAndPostForUsername(twitchUsername); err != nil {
// @attr twitch_client_id required string "" Twitch client ID the token was issued for logger.WithError(err).WithField("url", activity.URL).Error("Unable to fetch info / post live posting")
m.attrs.MustString("twitch_client_id", nil),
// @attr twitch_client_secret required string "" Secret for the Twitch app identified with twitch_client_id
m.attrs.MustString("twitch_client_secret", nil),
"", // No User-Token used
)
users, err := twitch.GetUserByUsername(context.Background(), twitchUsername)
if err != nil {
logger.WithError(err).WithField("user", twitchUsername).Warning("Unable to fetch details for user")
return
}
if l := len(users.Data); l != 1 {
logger.WithError(err).WithField("url", activity.URL).Warning("Unable to fetch user for login")
return
}
streams, err := twitch.GetStreamsForUser(context.Background(), twitchUsername)
if err != nil {
logger.WithError(err).WithField("user", twitchUsername).Debug("Unable to fetch streams for user")
return
}
if l := len(streams.Data); l != 1 {
logger.WithError(err).WithField("url", activity.URL).Debug("Unable to fetch streams for login")
return
}
// @attr stream_freshness optional duration "5m" How long after stream start to post shoutout
ignoreTime := m.attrs.MustDuration("stream_freshness", ptrDuration(livePostingDefaultStreamFreshness))
if streams.Data[0].StartedAt.Add(ignoreTime).Before(time.Now()) {
// Stream is too old, don't annoounce
return
}
if err = m.sendLivePost(
users.Data[0].Login,
users.Data[0].DisplayName,
streams.Data[0].Title,
streams.Data[0].GameName,
streams.Data[0].ThumbnailURL,
users.Data[0].ProfileImageURL,
); err != nil {
logger.WithError(err).WithField("url", activity.URL).Error("Unable to send post")
return return
} }
} }

View file

@ -119,11 +119,12 @@ func (t twitchAdapter) GetChannelStreamSchedule(ctx context.Context, broadcaster
}) })
} }
func (t twitchAdapter) GetStreamsForUser(ctx context.Context, userName string) (*twitchStreamListing, error) { func (t twitchAdapter) GetStreamsForUser(ctx context.Context, userNames ...string) (*twitchStreamListing, error) {
out := &twitchStreamListing{} out := &twitchStreamListing{}
params := make(url.Values) params := make(url.Values)
params.Set("user_login", strings.ToLower(userName)) params.Set("first", "100")
params["user_login"] = userNames
return out, backoff.NewBackoff(). return out, backoff.NewBackoff().
WithMaxIterations(twitchAPIRequestLimit). WithMaxIterations(twitchAPIRequestLimit).
@ -135,11 +136,12 @@ func (t twitchAdapter) GetStreamsForUser(ctx context.Context, userName string) (
}) })
} }
func (t twitchAdapter) GetUserByUsername(ctx context.Context, userName string) (*twitchUserListing, error) { func (t twitchAdapter) GetUserByUsername(ctx context.Context, userNames ...string) (*twitchUserListing, error) {
out := &twitchUserListing{} out := &twitchUserListing{}
params := make(url.Values) params := make(url.Values)
params.Set("login", strings.ToLower(userName)) params.Set("first", "100")
params["login"] = userNames
return out, backoff.NewBackoff(). return out, backoff.NewBackoff().
WithMaxIterations(twitchAPIRequestLimit). WithMaxIterations(twitchAPIRequestLimit).

View file

@ -75,6 +75,9 @@ Announces stream live status based on Discord streaming status
| `post_text` | ✅ | string | | Message to post to channel use `${displayname}` and `${username}` as placeholders | | `post_text` | ✅ | string | | Message to post to channel use `${displayname}` and `${username}` as placeholders |
| `twitch_client_id` | ✅ | string | | Twitch client ID the token was issued for | | `twitch_client_id` | ✅ | string | | Twitch client ID the token was issued for |
| `twitch_client_secret` | ✅ | string | | Secret for the Twitch app identified with twitch_client_id | | `twitch_client_secret` | ✅ | string | | Secret for the Twitch app identified with twitch_client_id |
| `cron` | | string | `*/5 * * * *` | Fetch live status of `poll_usernames` (set to empty string to disable): keep this below `stream_freshness` or you might miss streams |
| `disable_presence` | | bool | `false` | Disable posting live-postings for discord presence changes |
| `poll_usernames` | | []string | `[]` | Check these usernames for active streams when executing the `cron` (at most 100 users can be checked) |
| `stream_freshness` | | duration | `5m` | How long after stream start to post shoutout | | `stream_freshness` | | duration | `5m` | How long after stream start to post shoutout |
| `whitelisted_role` | | string | | Only post for members of this role ID | | `whitelisted_role` | | string | | Only post for members of this role ID |