twitch-bot-streak/query.go
2024-03-24 02:57:46 +01:00

164 lines
4.4 KiB
Go

package main
import (
"database/sql"
"errors"
"fmt"
"time"
"github.com/jmoiron/sqlx"
)
const streamOfflineGrace = 30 * time.Minute
func countStreak(db *sqlx.DB, twitchID uint64, username string) (user streakUser, err error) {
if err = withTx(db, func(tx *sqlx.Tx) (err error) {
if err = tx.Get(&user, "SELECT * FROM streak_users WHERE twitch_id = ?", twitchID); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("getting user: %w", err)
}
// User was not yet inserted
user = streakUser{
TwitchID: twitchID,
Username: username,
StreamsCount: 0,
CurrentStreak: 0,
MaxStreak: 0,
StreakStatus: statusBroken,
}
}
switch user.StreakStatus {
case statusActive:
// User has an active streak, do nothing
return nil
case statusBroken:
// User needs a new streak
user.CurrentStreak = 1
case statusPending:
// User can prolong their streak
user.CurrentStreak += 1
}
// In any case set the streak active and count the current stream
user.StreamsCount++
user.StreakStatus = statusActive
if user.CurrentStreak > user.MaxStreak {
user.MaxStreak = user.CurrentStreak
}
if _, err = db.NamedExec(
`INSERT INTO streak_users VALUES (:twitch_id, :username, :streams_count, :current_streak, :max_streak, :streak_status)
ON DUPLICATE KEY UPDATE username=:username, streams_count=:streams_count, current_streak=:current_streak, max_streak=:max_streak, streak_status=:streak_status`,
user,
); err != nil {
return fmt.Errorf("updating user streak status: %w", err)
}
return nil
}); err != nil {
return user, fmt.Errorf("counting streak for user: %w", err)
}
return user, nil
}
func getTimeFromMeta(tx *sqlx.Tx, key string) (t time.Time, err error) {
var lastOfflineStr string
if err = tx.Get(&lastOfflineStr, "SELECT value FROM streak_meta WHERE `key` = ?", key); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return t, fmt.Errorf("getting last %s time: %w", key, err)
}
lastOfflineStr = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)
}
t, err = time.Parse(time.RFC3339Nano, lastOfflineStr)
if err != nil {
return t, fmt.Errorf("parsing offline time: %w", err)
}
return t, nil
}
func setStreamOffline(db *sqlx.DB) (err error) {
return withTx(db, func(tx *sqlx.Tx) error {
return storeTimeToMeta(tx, "stream_offline", time.Now())
})
}
func startStream(db *sqlx.DB) (err error) {
if err = withTx(db, func(tx *sqlx.Tx) (err error) {
lastOffline, err := getTimeFromMeta(tx, "stream_offline")
if err != nil {
return fmt.Errorf("getting offline time: %w", err)
}
lastOnline, err := getTimeFromMeta(tx, "stream_online")
if err != nil {
return fmt.Errorf("getting online time: %w", err)
}
if err = storeTimeToMeta(tx, "stream_online", time.Now()); err != nil {
return fmt.Errorf("storing stream start: %w", err)
}
if time.Since(lastOffline) < streamOfflineGrace || lastOnline.After(lastOffline) {
// We only had a short break or the stream was already started
return nil
}
if _, err = tx.Exec("UPDATE streak_users SET streak_status = ?, current_streak = 0 WHERE streak_status = ?", statusBroken, statusPending); err != nil {
return fmt.Errorf("breaking streaks for pending users: %w", err)
}
if _, err = tx.Exec("UPDATE streak_users SET streak_status = ? WHERE streak_status = ?", statusPending, statusActive); err != nil {
return fmt.Errorf("breaking streaks for pending users: %w", err)
}
return nil
}); err != nil {
return fmt.Errorf("starting stream: %w", err)
}
return nil
}
func storeTimeToMeta(tx *sqlx.Tx, key string, t time.Time) (err error) {
if _, err = tx.NamedExec(
`INSERT INTO streak_meta VALUES (:key, :value)
ON DUPLICATE KEY UPDATE value = :value`,
map[string]any{
"key": key,
"value": t.Format(time.RFC3339Nano),
},
); err != nil {
return fmt.Errorf("updating stream meta: %w", err)
}
return nil
}
func withTx(db *sqlx.DB, fn func(*sqlx.Tx) error) error {
tx, err := db.Beginx()
if err != nil {
return fmt.Errorf("starting transaction: %w", err)
}
if err = fn(tx); err != nil {
if rerr := tx.Rollback(); rerr != nil {
return fmt.Errorf("rolling back after error: %w", rerr)
}
return fmt.Errorf("executing transaction (rolled back): %w", err)
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("committing transaction: %w", err)
}
return nil
}