package main import ( "database/sql" "errors" "fmt" "time" "github.com/jmoiron/sqlx" ) const streamOfflineGrace = 30 * time.Minute func countStreak(db *sqlx.DB, twitchID uint64, username string) (user streakUser, err error) { if err = withTx(db, func(tx *sqlx.Tx) (err error) { if err = tx.Get(&user, "SELECT * FROM streak_users WHERE twitch_id = ?", twitchID); err != nil { if !errors.Is(err, sql.ErrNoRows) { return fmt.Errorf("getting user: %w", err) } // User was not yet inserted user = streakUser{ TwitchID: twitchID, Username: username, StreamsCount: 0, CurrentStreak: 0, MaxStreak: 0, StreakStatus: statusBroken, } } switch user.StreakStatus { case statusActive: // User has an active streak, do nothing return nil case statusBroken: // User needs a new streak user.CurrentStreak = 1 case statusPending: // User can prolong their streak user.CurrentStreak += 1 } // In any case set the streak active and count the current stream user.StreamsCount++ user.StreakStatus = statusActive if user.CurrentStreak > user.MaxStreak { user.MaxStreak = user.CurrentStreak } if _, err = db.NamedExec( `INSERT INTO streak_users VALUES (:twitch_id, :username, :streams_count, :current_streak, :max_streak, :streak_status) ON DUPLICATE KEY UPDATE username=:username, streams_count=:streams_count, current_streak=:current_streak, max_streak=:max_streak, streak_status=:streak_status`, user, ); err != nil { return fmt.Errorf("updating user streak status: %w", err) } return nil }); err != nil { return user, fmt.Errorf("counting streak for user: %w", err) } return user, nil } func getTimeFromMeta(tx *sqlx.Tx, key string) (t time.Time, err error) { var lastOfflineStr string if err = tx.Get(&lastOfflineStr, "SELECT value FROM streak_meta WHERE `key` = ?", key); err != nil { if !errors.Is(err, sql.ErrNoRows) { return t, fmt.Errorf("getting last %s time: %w", key, err) } lastOfflineStr = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano) } t, err = time.Parse(time.RFC3339Nano, lastOfflineStr) if err != nil { return t, fmt.Errorf("parsing offline time: %w", err) } return t, nil } func setStreamOffline(db *sqlx.DB) (err error) { return withTx(db, func(tx *sqlx.Tx) error { return storeTimeToMeta(tx, "stream_offline", time.Now()) }) } func startStream(db *sqlx.DB) (err error) { if err = withTx(db, func(tx *sqlx.Tx) (err error) { lastOffline, err := getTimeFromMeta(tx, "stream_offline") if err != nil { return fmt.Errorf("getting offline time: %w", err) } lastOnline, err := getTimeFromMeta(tx, "stream_online") if err != nil { return fmt.Errorf("getting online time: %w", err) } if err = storeTimeToMeta(tx, "stream_online", time.Now()); err != nil { return fmt.Errorf("storing stream start: %w", err) } if time.Since(lastOffline) < streamOfflineGrace || lastOnline.After(lastOffline) { // We only had a short break or the stream was already started return nil } if _, err = tx.Exec("UPDATE streak_users SET streak_status = ?, current_streak = 0 WHERE streak_status = ?", statusBroken, statusPending); err != nil { return fmt.Errorf("breaking streaks for pending users: %w", err) } if _, err = tx.Exec("UPDATE streak_users SET streak_status = ? WHERE streak_status = ?", statusPending, statusActive); err != nil { return fmt.Errorf("breaking streaks for pending users: %w", err) } return nil }); err != nil { return fmt.Errorf("starting stream: %w", err) } return nil } func storeTimeToMeta(tx *sqlx.Tx, key string, t time.Time) (err error) { if _, err = tx.NamedExec( `INSERT INTO streak_meta VALUES (:key, :value) ON DUPLICATE KEY UPDATE value = :value`, map[string]any{ "key": key, "value": t.Format(time.RFC3339Nano), }, ); err != nil { return fmt.Errorf("updating stream meta: %w", err) } return nil } func withTx(db *sqlx.DB, fn func(*sqlx.Tx) error) error { tx, err := db.Beginx() if err != nil { return fmt.Errorf("starting transaction: %w", err) } if err = fn(tx); err != nil { if rerr := tx.Rollback(); rerr != nil { return fmt.Errorf("rolling back after error: %w", rerr) } return fmt.Errorf("executing transaction (rolled back): %w", err) } if err = tx.Commit(); err != nil { return fmt.Errorf("committing transaction: %w", err) } return nil }