164 lines
4.4 KiB
Go
164 lines
4.4 KiB
Go
|
package main
|
||
|
|
||
|
import (
|
||
|
"database/sql"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"github.com/jmoiron/sqlx"
|
||
|
)
|
||
|
|
||
|
const streamOfflineGrace = 30 * time.Minute
|
||
|
|
||
|
func countStreak(db *sqlx.DB, twitchID uint64, username string) (user streakUser, err error) {
|
||
|
if err = withTx(db, func(tx *sqlx.Tx) (err error) {
|
||
|
if err = tx.Get(&user, "SELECT * FROM streak_users WHERE twitch_id = ?", twitchID); err != nil {
|
||
|
if !errors.Is(err, sql.ErrNoRows) {
|
||
|
return fmt.Errorf("getting user: %w", err)
|
||
|
}
|
||
|
|
||
|
// User was not yet inserted
|
||
|
user = streakUser{
|
||
|
TwitchID: twitchID,
|
||
|
Username: username,
|
||
|
StreamsCount: 0,
|
||
|
CurrentStreak: 0,
|
||
|
MaxStreak: 0,
|
||
|
StreakStatus: statusBroken,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
switch user.StreakStatus {
|
||
|
case statusActive:
|
||
|
// User has an active streak, do nothing
|
||
|
return nil
|
||
|
|
||
|
case statusBroken:
|
||
|
// User needs a new streak
|
||
|
user.CurrentStreak = 1
|
||
|
|
||
|
case statusPending:
|
||
|
// User can prolong their streak
|
||
|
user.CurrentStreak += 1
|
||
|
}
|
||
|
|
||
|
// In any case set the streak active and count the current stream
|
||
|
user.StreamsCount++
|
||
|
user.StreakStatus = statusActive
|
||
|
if user.CurrentStreak > user.MaxStreak {
|
||
|
user.MaxStreak = user.CurrentStreak
|
||
|
}
|
||
|
|
||
|
if _, err = db.NamedExec(
|
||
|
`INSERT INTO streak_users VALUES (:twitch_id, :username, :streams_count, :current_streak, :max_streak, :streak_status)
|
||
|
ON DUPLICATE KEY UPDATE username=:username, streams_count=:streams_count, current_streak=:current_streak, max_streak=:max_streak, streak_status=:streak_status`,
|
||
|
user,
|
||
|
); err != nil {
|
||
|
return fmt.Errorf("updating user streak status: %w", err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}); err != nil {
|
||
|
return user, fmt.Errorf("counting streak for user: %w", err)
|
||
|
}
|
||
|
|
||
|
return user, nil
|
||
|
}
|
||
|
|
||
|
func getTimeFromMeta(tx *sqlx.Tx, key string) (t time.Time, err error) {
|
||
|
var lastOfflineStr string
|
||
|
if err = tx.Get(&lastOfflineStr, "SELECT value FROM streak_meta WHERE `key` = ?", key); err != nil {
|
||
|
if !errors.Is(err, sql.ErrNoRows) {
|
||
|
return t, fmt.Errorf("getting last %s time: %w", key, err)
|
||
|
}
|
||
|
|
||
|
lastOfflineStr = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)
|
||
|
}
|
||
|
|
||
|
t, err = time.Parse(time.RFC3339Nano, lastOfflineStr)
|
||
|
if err != nil {
|
||
|
return t, fmt.Errorf("parsing offline time: %w", err)
|
||
|
}
|
||
|
|
||
|
return t, nil
|
||
|
}
|
||
|
|
||
|
func setStreamOffline(db *sqlx.DB) (err error) {
|
||
|
return withTx(db, func(tx *sqlx.Tx) error {
|
||
|
return storeTimeToMeta(tx, "stream_offline", time.Now())
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func startStream(db *sqlx.DB) (err error) {
|
||
|
if err = withTx(db, func(tx *sqlx.Tx) (err error) {
|
||
|
lastOffline, err := getTimeFromMeta(tx, "stream_offline")
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("getting offline time: %w", err)
|
||
|
}
|
||
|
|
||
|
lastOnline, err := getTimeFromMeta(tx, "stream_online")
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("getting online time: %w", err)
|
||
|
}
|
||
|
|
||
|
if err = storeTimeToMeta(tx, "stream_online", time.Now()); err != nil {
|
||
|
return fmt.Errorf("storing stream start: %w", err)
|
||
|
}
|
||
|
|
||
|
if time.Since(lastOffline) < streamOfflineGrace || lastOnline.After(lastOffline) {
|
||
|
// We only had a short break or the stream was already started
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
if _, err = tx.Exec("UPDATE streak_users SET streak_status = ?, current_streak = 0 WHERE streak_status = ?", statusBroken, statusPending); err != nil {
|
||
|
return fmt.Errorf("breaking streaks for pending users: %w", err)
|
||
|
}
|
||
|
|
||
|
if _, err = tx.Exec("UPDATE streak_users SET streak_status = ? WHERE streak_status = ?", statusPending, statusActive); err != nil {
|
||
|
return fmt.Errorf("breaking streaks for pending users: %w", err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}); err != nil {
|
||
|
return fmt.Errorf("starting stream: %w", err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func storeTimeToMeta(tx *sqlx.Tx, key string, t time.Time) (err error) {
|
||
|
if _, err = tx.NamedExec(
|
||
|
`INSERT INTO streak_meta VALUES (:key, :value)
|
||
|
ON DUPLICATE KEY UPDATE value = :value`,
|
||
|
map[string]any{
|
||
|
"key": key,
|
||
|
"value": t.Format(time.RFC3339Nano),
|
||
|
},
|
||
|
); err != nil {
|
||
|
return fmt.Errorf("updating stream meta: %w", err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func withTx(db *sqlx.DB, fn func(*sqlx.Tx) error) error {
|
||
|
tx, err := db.Beginx()
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("starting transaction: %w", err)
|
||
|
}
|
||
|
|
||
|
if err = fn(tx); err != nil {
|
||
|
if rerr := tx.Rollback(); rerr != nil {
|
||
|
return fmt.Errorf("rolling back after error: %w", rerr)
|
||
|
}
|
||
|
return fmt.Errorf("executing transaction (rolled back): %w", err)
|
||
|
}
|
||
|
|
||
|
if err = tx.Commit(); err != nil {
|
||
|
return fmt.Errorf("committing transaction: %w", err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|