[core] Add retries for database access methods

to compensate for database temporarily not being available. This is not
suitable for longer database outages as 5 retries with a 1.5 multiplier
will not give much time to recover but should cover for cluster changes
and short network hickups.

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2023-11-28 00:09:27 +01:00
parent a1fa9972a8
commit 0d10b5165f
Signed by: luzifer
GPG key ID: D91C3E91E4CAD6F5
12 changed files with 359 additions and 207 deletions

View file

@ -5,6 +5,7 @@ import (
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause" "gorm.io/gorm/clause"
"github.com/Luzifer/twitch-bot/v3/internal/helpers"
"github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/pkg/database"
) )
@ -18,17 +19,16 @@ type (
func GetCounterValue(db database.Connector, counterName string) (int64, error) { func GetCounterValue(db database.Connector, counterName string) (int64, error) {
var c Counter var c Counter
err := db.DB().First(&c, "name = ?", counterName).Error err := helpers.Retry(func() error {
switch { err := db.DB().First(&c, "name = ?", counterName).Error
case err == nil: if errors.Is(err, gorm.ErrRecordNotFound) {
return c.Value, nil return nil
}
case errors.Is(err, gorm.ErrRecordNotFound): return err
return 0, nil })
default: return c.Value, errors.Wrap(err, "querying counter")
return 0, errors.Wrap(err, "querying counter")
}
} }
func UpdateCounter(db database.Connector, counterName string, value int64, absolute bool) error { func UpdateCounter(db database.Connector, counterName string, value int64, absolute bool) error {
@ -42,10 +42,12 @@ func UpdateCounter(db database.Connector, counterName string, value int64, absol
} }
return errors.Wrap( return errors.Wrap(
db.DB().Clauses(clause.OnConflict{ helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error {
Columns: []clause.Column{{Name: "name"}}, return tx.Clauses(clause.OnConflict{
DoUpdates: clause.AssignmentColumns([]string{"value"}), Columns: []clause.Column{{Name: "name"}},
}).Create(Counter{Name: counterName, Value: value}).Error, DoUpdates: clause.AssignmentColumns([]string{"value"}),
}).Create(Counter{Name: counterName, Value: value}).Error
}),
"storing counter value", "storing counter value",
) )
} }
@ -53,11 +55,12 @@ func UpdateCounter(db database.Connector, counterName string, value int64, absol
func getCounterRank(db database.Connector, prefix, name string) (rank, count int64, err error) { func getCounterRank(db database.Connector, prefix, name string) (rank, count int64, err error) {
var cc []Counter var cc []Counter
err = db.DB(). if err = helpers.Retry(func() error {
Order("value DESC"). return db.DB().
Find(&cc, "name LIKE ?", prefix+"%"). Order("value DESC").
Error Find(&cc, "name LIKE ?", prefix+"%").
if err != nil { Error
}); err != nil {
return 0, 0, errors.Wrap(err, "querying counters") return 0, 0, errors.Wrap(err, "querying counters")
} }
@ -74,11 +77,13 @@ func getCounterRank(db database.Connector, prefix, name string) (rank, count int
func getCounterTopList(db database.Connector, prefix string, n int) ([]Counter, error) { func getCounterTopList(db database.Connector, prefix string, n int) ([]Counter, error) {
var cc []Counter var cc []Counter
err := db.DB(). err := helpers.Retry(func() error {
Order("value DESC"). return db.DB().
Limit(n). Order("value DESC").
Find(&cc, "name LIKE ?", prefix+"%"). Limit(n).
Error Find(&cc, "name LIKE ?", prefix+"%").
Error
})
return cc, errors.Wrap(err, "querying counters") return cc, errors.Wrap(err, "querying counters")
} }

View file

@ -8,6 +8,8 @@ import (
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause" "gorm.io/gorm/clause"
"github.com/Luzifer/go_helpers/v2/backoff"
"github.com/Luzifer/twitch-bot/v3/internal/helpers"
"github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/pkg/database"
) )
@ -23,7 +25,7 @@ type (
func calculateCurrentPunishments(db database.Connector) (err error) { func calculateCurrentPunishments(db database.Connector) (err error) {
var ps []punishLevel var ps []punishLevel
if err = db.DB().Find(&ps).Error; err != nil { if err = helpers.Retry(func() error { return db.DB().Find(&ps).Error }); err != nil {
return errors.Wrap(err, "querying punish_levels") return errors.Wrap(err, "querying punish_levels")
} }
@ -72,7 +74,9 @@ func deletePunishment(db database.Connector, channel, user, uuid string) error {
func deletePunishmentForKey(db database.Connector, key string) error { func deletePunishmentForKey(db database.Connector, key string) error {
return errors.Wrap( return errors.Wrap(
db.DB().Delete(&punishLevel{}, "key = ?", key).Error, helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error {
return tx.Delete(&punishLevel{}, "key = ?", key).Error
}),
"deleting punishment info", "deleting punishment info",
) )
} }
@ -87,7 +91,13 @@ func getPunishment(db database.Connector, channel, user, uuid string) (*levelCon
p punishLevel p punishLevel
) )
err := db.DB().First(&p, "key = ?", getDBKey(channel, user, uuid)).Error err := helpers.Retry(func() error {
err := db.DB().First(&p, "key = ?", getDBKey(channel, user, uuid)).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return backoff.NewErrCannotRetry(err)
}
return err
})
switch { switch {
case err == nil: case err == nil:
return &levelConfig{ return &levelConfig{
@ -114,15 +124,17 @@ func setPunishmentForKey(db database.Connector, key string, lc *levelConfig) err
} }
return errors.Wrap( return errors.Wrap(
db.DB().Clauses(clause.OnConflict{ helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error {
Columns: []clause.Column{{Name: "key"}}, return tx.Clauses(clause.OnConflict{
UpdateAll: true, Columns: []clause.Column{{Name: "key"}},
}).Create(punishLevel{ UpdateAll: true,
Key: key, }).Create(punishLevel{
LastLevel: lc.LastLevel, Key: key,
Executed: lc.Executed, LastLevel: lc.LastLevel,
Cooldown: lc.Cooldown, Executed: lc.Executed,
}).Error, Cooldown: lc.Cooldown,
}).Error
}),
"updating punishment info", "updating punishment info",
) )
} }

View file

@ -7,6 +7,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gorm.io/gorm" "gorm.io/gorm"
"github.com/Luzifer/twitch-bot/v3/internal/helpers"
"github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/pkg/database"
) )
@ -20,11 +21,13 @@ type (
func AddQuote(db database.Connector, channel, quoteStr string) error { func AddQuote(db database.Connector, channel, quoteStr string) error {
return errors.Wrap( return errors.Wrap(
db.DB().Create(quote{ helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error {
Channel: channel, return tx.Create(quote{
CreatedAt: time.Now().UnixNano(), Channel: channel,
Quote: quoteStr, CreatedAt: time.Now().UnixNano(),
}).Error, Quote: quoteStr,
}).Error
}),
"adding quote to database", "adding quote to database",
) )
} }
@ -36,14 +39,18 @@ func DelQuote(db database.Connector, channel string, quoteIdx int) error {
} }
return errors.Wrap( return errors.Wrap(
db.DB().Delete(&quote{}, "channel = ? AND created_at = ?", channel, createdAt).Error, helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error {
return tx.Delete(&quote{}, "channel = ? AND created_at = ?", channel, createdAt).Error
}),
"deleting quote", "deleting quote",
) )
} }
func GetChannelQuotes(db database.Connector, channel string) ([]string, error) { func GetChannelQuotes(db database.Connector, channel string) ([]string, error) {
var qs []quote var qs []quote
if err := db.DB().Where("channel = ?", channel).Order("created_at").Find(&qs).Error; err != nil { if err := helpers.Retry(func() error {
return db.DB().Where("channel = ?", channel).Order("created_at").Find(&qs).Error
}); err != nil {
return nil, errors.Wrap(err, "querying quotes") return nil, errors.Wrap(err, "querying quotes")
} }
@ -57,11 +64,13 @@ func GetChannelQuotes(db database.Connector, channel string) ([]string, error) {
func GetMaxQuoteIdx(db database.Connector, channel string) (int, error) { func GetMaxQuoteIdx(db database.Connector, channel string) (int, error) {
var count int64 var count int64
if err := db.DB(). if err := helpers.Retry(func() error {
Model(&quote{}). return db.DB().
Where("channel = ?", channel). Model(&quote{}).
Count(&count). Where("channel = ?", channel).
Error; err != nil { Count(&count).
Error
}); err != nil {
return 0, errors.Wrap(err, "getting quote count") return 0, errors.Wrap(err, "getting quote count")
} }
@ -83,11 +92,13 @@ func GetQuoteRaw(db database.Connector, channel string, quoteIdx int) (int, int6
} }
var q quote var q quote
err := db.DB(). err := helpers.Retry(func() error {
Where("channel = ?", channel). return db.DB().
Limit(1). Where("channel = ?", channel).
Offset(quoteIdx - 1). Limit(1).
First(&q).Error Offset(quoteIdx - 1).
First(&q).Error
})
switch { switch {
case err == nil: case err == nil:
@ -103,7 +114,7 @@ func GetQuoteRaw(db database.Connector, channel string, quoteIdx int) (int, int6
func SetQuotes(db database.Connector, channel string, quotes []string) error { func SetQuotes(db database.Connector, channel string, quotes []string) error {
return errors.Wrap( return errors.Wrap(
db.DB().Transaction(func(tx *gorm.DB) error { helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error {
if err := tx.Where("channel = ?", channel).Delete(&quote{}).Error; err != nil { if err := tx.Where("channel = ?", channel).Delete(&quote{}).Error; err != nil {
return errors.Wrap(err, "deleting quotes for channel") return errors.Wrap(err, "deleting quotes for channel")
} }
@ -134,10 +145,11 @@ func UpdateQuote(db database.Connector, channel string, idx int, quoteStr string
} }
return errors.Wrap( return errors.Wrap(
db.DB(). helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error {
Where("channel = ? AND created_at = ?", channel, createdAt). return tx.Where("channel = ? AND created_at = ?", channel, createdAt).
Update("quote", quoteStr). Update("quote", quoteStr).
Error, Error
}),
"updating quote", "updating quote",
) )
} }

View file

@ -5,6 +5,8 @@ import (
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause" "gorm.io/gorm/clause"
"github.com/Luzifer/go_helpers/v2/backoff"
"github.com/Luzifer/twitch-bot/v3/internal/helpers"
"github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/pkg/database"
) )
@ -17,7 +19,13 @@ type (
func GetVariable(db database.Connector, key string) (string, error) { func GetVariable(db database.Connector, key string) (string, error) {
var v variable var v variable
err := db.DB().First(&v, "name = ?", key).Error err := helpers.Retry(func() error {
err := db.DB().First(&v, "name = ?", key).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return backoff.NewErrCannotRetry(err)
}
return err
})
switch { switch {
case err == nil: case err == nil:
return v.Value, nil return v.Value, nil
@ -32,17 +40,21 @@ func GetVariable(db database.Connector, key string) (string, error) {
func SetVariable(db database.Connector, key, value string) error { func SetVariable(db database.Connector, key, value string) error {
return errors.Wrap( return errors.Wrap(
db.DB().Clauses(clause.OnConflict{ helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error {
Columns: []clause.Column{{Name: "name"}}, return tx.Clauses(clause.OnConflict{
DoUpdates: clause.AssignmentColumns([]string{"value"}), Columns: []clause.Column{{Name: "name"}},
}).Create(variable{Name: key, Value: value}).Error, DoUpdates: clause.AssignmentColumns([]string{"value"}),
}).Create(variable{Name: key, Value: value}).Error
}),
"updating value in database", "updating value in database",
) )
} }
func RemoveVariable(db database.Connector, key string) error { func RemoveVariable(db database.Connector, key string) error {
return errors.Wrap( return errors.Wrap(
db.DB().Delete(&variable{}, "name = ?", key).Error, helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error {
return tx.Delete(&variable{}, "name = ?", key).Error
}),
"deleting value in database", "deleting value in database",
) )
} }

View file

@ -7,7 +7,9 @@ import (
"github.com/gofrs/uuid/v3" "github.com/gofrs/uuid/v3"
"github.com/pkg/errors" "github.com/pkg/errors"
"gorm.io/gorm"
"github.com/Luzifer/twitch-bot/v3/internal/helpers"
"github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/pkg/database"
"github.com/Luzifer/twitch-bot/v3/plugins" "github.com/Luzifer/twitch-bot/v3/plugins"
) )
@ -25,20 +27,23 @@ type (
func cleanupStoredEvents(db database.Connector) error { func cleanupStoredEvents(db database.Connector) error {
return errors.Wrap( return errors.Wrap(
db.DB(). helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error {
Where("scheduled_at < ?", time.Now().Add(cleanupTimeout*-1).UTC()). return tx.Where("scheduled_at < ?", time.Now().Add(cleanupTimeout*-1).UTC()).
Delete(&storedCustomEvent{}). Delete(&storedCustomEvent{}).
Error, Error
}),
"deleting past events", "deleting past events",
) )
} }
func getFutureEvents(db database.Connector) (out []storedCustomEvent, err error) { func getFutureEvents(db database.Connector) (out []storedCustomEvent, err error) {
return out, errors.Wrap( return out, errors.Wrap(
db.DB(). helpers.Retry(func() error {
Where("scheduled_at >= ?", time.Now().UTC()). return db.DB().
Find(&out). Where("scheduled_at >= ?", time.Now().UTC()).
Error, Find(&out).
Error
}),
"getting events from database", "getting events from database",
) )
} }
@ -50,12 +55,14 @@ func storeEvent(db database.Connector, scheduleAt time.Time, channel string, fie
} }
return errors.Wrap( return errors.Wrap(
db.DB().Create(storedCustomEvent{ helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error {
ID: uuid.Must(uuid.NewV4()).String(), return tx.Create(storedCustomEvent{
Channel: channel, ID: uuid.Must(uuid.NewV4()).String(),
Fields: fieldBuf.String(), Channel: channel,
ScheduledAt: scheduleAt, Fields: fieldBuf.String(),
}).Error, ScheduledAt: scheduleAt,
}).Error
}),
"storing event", "storing event",
) )
} }

View file

@ -7,7 +7,9 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"gorm.io/gorm"
"github.com/Luzifer/twitch-bot/v3/internal/helpers"
"github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/pkg/database"
"github.com/Luzifer/twitch-bot/v3/plugins" "github.com/Luzifer/twitch-bot/v3/plugins"
) )
@ -29,12 +31,14 @@ func AddChannelEvent(db database.Connector, channel string, evt SocketMessage) e
} }
return errors.Wrap( return errors.Wrap(
db.DB().Create(&overlaysEvent{ helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error {
Channel: channel, return tx.Create(&overlaysEvent{
CreatedAt: evt.Time.UTC(), Channel: channel,
EventType: evt.Type, CreatedAt: evt.Time.UTC(),
Fields: strings.TrimSpace(buf.String()), EventType: evt.Type,
}).Error, Fields: strings.TrimSpace(buf.String()),
}).Error
}),
"storing event to database", "storing event to database",
) )
} }
@ -42,7 +46,9 @@ func AddChannelEvent(db database.Connector, channel string, evt SocketMessage) e
func GetChannelEvents(db database.Connector, channel string) ([]SocketMessage, error) { func GetChannelEvents(db database.Connector, channel string) ([]SocketMessage, error) {
var evts []overlaysEvent var evts []overlaysEvent
if err := db.DB().Where("channel = ?", channel).Order("created_at").Find(&evts).Error; err != nil { if err := helpers.Retry(func() error {
return db.DB().Where("channel = ?", channel).Order("created_at").Find(&evts).Error
}); err != nil {
return nil, errors.Wrap(err, "querying channel events") return nil, errors.Wrap(err, "querying channel events")
} }

View file

@ -7,7 +7,9 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gopkg.in/irc.v4" "gopkg.in/irc.v4"
"gorm.io/gorm"
"github.com/Luzifer/twitch-bot/v3/internal/helpers"
"github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/pkg/database"
"github.com/Luzifer/twitch-bot/v3/plugins" "github.com/Luzifer/twitch-bot/v3/plugins"
) )
@ -121,10 +123,12 @@ func newDBClient(db database.Connector) *dbClient {
func (d *dbClient) AutoCloseExpired() (err error) { func (d *dbClient) AutoCloseExpired() (err error) {
var rr []raffle var rr []raffle
if err = d.db.DB(). if err = helpers.Retry(func() error {
Where("status = ? AND close_at IS NOT NULL AND close_at < ?", raffleStatusActive, time.Now().UTC()). return d.db.DB().
Find(&rr). Where("status = ? AND close_at IS NOT NULL AND close_at < ?", raffleStatusActive, time.Now().UTC()).
Error; err != nil { Find(&rr).
Error
}); err != nil {
return errors.Wrap(err, "fetching raffles to close") return errors.Wrap(err, "fetching raffles to close")
} }
@ -142,10 +146,12 @@ func (d *dbClient) AutoCloseExpired() (err error) {
func (d *dbClient) AutoSendReminders() (err error) { func (d *dbClient) AutoSendReminders() (err error) {
var rr []raffle var rr []raffle
if err = d.db.DB(). if err = helpers.Retry(func() error {
Where("status = ? AND text_reminder_post = ? AND (text_reminder_next_send IS NULL OR text_reminder_next_send < ?)", raffleStatusActive, true, time.Now().UTC()). return d.db.DB().
Find(&rr). Where("status = ? AND text_reminder_post = ? AND (text_reminder_next_send IS NULL OR text_reminder_next_send < ?)", raffleStatusActive, true, time.Now().UTC()).
Error; err != nil { Find(&rr).
Error
}); err != nil {
return errors.Wrap(err, "fetching raffles to send reminders") return errors.Wrap(err, "fetching raffles to send reminders")
} }
@ -162,10 +168,12 @@ func (d *dbClient) AutoSendReminders() (err error) {
func (d *dbClient) AutoStart() (err error) { func (d *dbClient) AutoStart() (err error) {
var rr []raffle var rr []raffle
if err = d.db.DB(). if err = helpers.Retry(func() error {
Where("status = ? AND auto_start_at IS NOT NULL AND auto_start_at < ?", raffleStatusPlanned, time.Now().UTC()). return d.db.DB().
Find(&rr). Where("status = ? AND auto_start_at IS NOT NULL AND auto_start_at < ?", raffleStatusPlanned, time.Now().UTC()).
Error; err != nil { Find(&rr).
Error
}); err != nil {
return errors.Wrap(err, "fetching raffles to start") return errors.Wrap(err, "fetching raffles to start")
} }
@ -208,10 +216,12 @@ func (d *dbClient) Close(raffleID uint64) error {
return errors.Wrap(err, "getting raffle") return errors.Wrap(err, "getting raffle")
} }
if err = d.db.DB().Model(&raffle{}). if err = helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error {
Where("id = ?", raffleID). return tx.Model(&raffle{}).
Update("status", raffleStatusEnded). Where("id = ?", raffleID).
Error; err != nil { Update("status", raffleStatusEnded).
Error
}); err != nil {
return errors.Wrap(err, "setting status closed") return errors.Wrap(err, "setting status closed")
} }
@ -231,7 +241,9 @@ func (d *dbClient) Close(raffleID uint64) error {
// the database without modification and therefore need to be filled // the database without modification and therefore need to be filled
// before calling this function // before calling this function
func (d *dbClient) Create(r raffle) error { func (d *dbClient) Create(r raffle) error {
if err := d.db.DB().Create(&r).Error; err != nil { if err := helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error {
return tx.Create(&r).Error
}); err != nil {
return errors.Wrap(err, "creating database record") return errors.Wrap(err, "creating database record")
} }
@ -242,17 +254,23 @@ func (d *dbClient) Create(r raffle) error {
// Delete removes all entries for the given raffle and afterwards // Delete removes all entries for the given raffle and afterwards
// deletes the raffle itself // deletes the raffle itself
func (d *dbClient) Delete(raffleID uint64) (err error) { func (d *dbClient) Delete(raffleID uint64) (err error) {
if err = d.db.DB(). if err = helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error {
Where("raffle_id = ?", raffleID). if err = tx.
Delete(&raffleEntry{}). Where("raffle_id = ?", raffleID).
Error; err != nil { Delete(&raffleEntry{}).
return errors.Wrap(err, "deleting raffle entries") Error; err != nil {
} return errors.Wrap(err, "deleting raffle entries")
}
if err = d.db.DB(). if err = tx.
Where("id = ?", raffleID). Where("id = ?", raffleID).
Delete(&raffle{}).Error; err != nil { Delete(&raffle{}).Error; err != nil {
return errors.Wrap(err, "creating database record") return errors.Wrap(err, "creating database record")
}
return nil
}); err != nil {
return errors.Wrap(err, "deleting raffle")
} }
frontendNotify(frontendNotifyEventRaffleChange) frontendNotify(frontendNotifyEventRaffleChange)
@ -263,7 +281,7 @@ func (d *dbClient) Delete(raffleID uint64) (err error) {
// the database without modification and therefore need to be filled // the database without modification and therefore need to be filled
// before calling this function // before calling this function
func (d *dbClient) Enter(re raffleEntry) error { func (d *dbClient) Enter(re raffleEntry) error {
if err := d.db.DB().Create(&re).Error; err != nil { if err := helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error { return tx.Create(&re).Error }); err != nil {
return errors.Wrap(err, "creating database record") return errors.Wrap(err, "creating database record")
} }
@ -274,11 +292,13 @@ func (d *dbClient) Enter(re raffleEntry) error {
// Get retrieves a raffle from the database // Get retrieves a raffle from the database
func (d *dbClient) Get(raffleID uint64) (out raffle, err error) { func (d *dbClient) Get(raffleID uint64) (out raffle, err error) {
return out, errors.Wrap( return out, errors.Wrap(
d.db.DB(). helpers.Retry(func() error {
Where("raffles.id = ?", raffleID). return d.db.DB().
Preload("Entries"). Where("raffles.id = ?", raffleID).
First(&out). Preload("Entries").
Error, First(&out).
Error
}),
"getting raffle from database", "getting raffle from database",
) )
} }
@ -302,10 +322,12 @@ func (d *dbClient) GetByChannelAndKeyword(channel, keyword string) (raffle, erro
// List returns a list of all known raffles // List returns a list of all known raffles
func (d *dbClient) List() (raffles []raffle, _ error) { func (d *dbClient) List() (raffles []raffle, _ error) {
return raffles, errors.Wrap( return raffles, errors.Wrap(
d.db.DB().Model(&raffle{}). helpers.Retry(func() error {
Order("id DESC"). return d.db.DB().Model(&raffle{}).
Find(&raffles). Order("id DESC").
Error, Find(&raffles).
Error
}),
"updating column", "updating column",
) )
} }
@ -314,10 +336,12 @@ func (d *dbClient) List() (raffles []raffle, _ error) {
// sent for the given raffle ID. No other fields are modified // sent for the given raffle ID. No other fields are modified
func (d *dbClient) PatchNextReminderSend(raffleID uint64, next time.Time) error { func (d *dbClient) PatchNextReminderSend(raffleID uint64, next time.Time) error {
return errors.Wrap( return errors.Wrap(
d.db.DB().Model(&raffle{}). helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error {
Where("id = ?", raffleID). return tx.Model(&raffle{}).
Update("text_reminder_next_send", next). Where("id = ?", raffleID).
Error, Update("text_reminder_next_send", next).
Error
}),
"updating column", "updating column",
) )
} }
@ -336,10 +360,12 @@ func (d *dbClient) PickWinner(raffleID uint64) error {
} }
speakUpUntil := time.Now().UTC().Add(r.WaitForResponse) speakUpUntil := time.Now().UTC().Add(r.WaitForResponse)
if err = d.db.DB().Model(&raffleEntry{}). if err = helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error {
Where("id = ?", winner.ID). return tx.Model(&raffleEntry{}).
Updates(map[string]any{"was_picked": true, "speak_up_until": speakUpUntil}). Where("id = ?", winner.ID).
Error; err != nil { Updates(map[string]any{"was_picked": true, "speak_up_until": speakUpUntil}).
Error
}); err != nil {
return errors.Wrap(err, "updating winner") return errors.Wrap(err, "updating winner")
} }
@ -364,10 +390,12 @@ func (d *dbClient) PickWinner(raffleID uint64) error {
// RedrawWinner marks the previous winner as redrawn (and therefore // RedrawWinner marks the previous winner as redrawn (and therefore
// crossed out as winner in the interface) and picks a new one // crossed out as winner in the interface) and picks a new one
func (d *dbClient) RedrawWinner(raffleID, winnerID uint64) error { func (d *dbClient) RedrawWinner(raffleID, winnerID uint64) error {
if err := d.db.DB().Model(&raffleEntry{}). if err := helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error {
Where("id = ?", winnerID). return tx.Model(&raffleEntry{}).
Update("was_redrawn", true). Where("id = ?", winnerID).
Error; err != nil { Update("was_redrawn", true).
Error
}); err != nil {
return errors.Wrap(err, "updating previous winner") return errors.Wrap(err, "updating previous winner")
} }
@ -385,10 +413,12 @@ func (d *dbClient) RefreshActiveRaffles() error {
tmp = map[string]uint64{} tmp = map[string]uint64{}
) )
if err := d.db.DB(). if err := helpers.Retry(func() error {
Where("status = ?", raffleStatusActive). return d.db.DB().
Find(&actives). Where("status = ?", raffleStatusActive).
Error; err != nil { Find(&actives).
Error
}); err != nil {
return errors.Wrap(err, "fetching active raffles") return errors.Wrap(err, "fetching active raffles")
} }
@ -411,19 +441,23 @@ func (d *dbClient) RefreshSpeakUp() error {
tmp = map[string]*speakUpWait{} tmp = map[string]*speakUpWait{}
) )
if err := d.db.DB().Debug(). if err := helpers.Retry(func() error {
Where("speak_up_until IS NOT NULL AND speak_up_until > ?", time.Now().UTC()). return d.db.DB().Debug().
Find(&res). Where("speak_up_until IS NOT NULL AND speak_up_until > ?", time.Now().UTC()).
Error; err != nil { Find(&res).
Error
}); err != nil {
return errors.Wrap(err, "querying active entries") return errors.Wrap(err, "querying active entries")
} }
for _, e := range res { for _, e := range res {
var r raffle var r raffle
if err := d.db.DB(). if err := helpers.Retry(func() error {
Where("id = ?", e.RaffleID). return d.db.DB().
First(&r). Where("id = ?", e.RaffleID).
Error; err != nil { First(&r).
Error
}); err != nil {
return errors.Wrap(err, "fetching raffle for entry") return errors.Wrap(err, "fetching raffle for entry")
} }
tmp[strings.Join([]string{r.Channel, e.UserLogin}, ":")] = &speakUpWait{RaffleEntryID: e.ID, Until: *e.SpeakUpUntil} tmp[strings.Join([]string{r.Channel, e.UserLogin}, ":")] = &speakUpWait{RaffleEntryID: e.ID, Until: *e.SpeakUpUntil}
@ -445,14 +479,15 @@ func (d *dbClient) RegisterSpeakUp(channel, user, message string) error {
return nil return nil
} }
if err := d.db.DB(). if err := helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error {
Model(&raffleEntry{}). return tx.Model(&raffleEntry{}).
Where("id = ?", w.RaffleEntryID). Where("id = ?", w.RaffleEntryID).
Updates(map[string]any{ Updates(map[string]any{
"DrawResponse": message, "DrawResponse": message,
"SpeakUpUntil": nil, "SpeakUpUntil": nil,
}). }).
Error; err != nil { Error
}); err != nil {
return errors.Wrap(err, "registering speak-up") return errors.Wrap(err, "registering speak-up")
} }
@ -472,14 +507,15 @@ func (d *dbClient) Reopen(raffleID uint64, duration time.Duration) error {
return errors.Wrap(err, "getting specified raffle") return errors.Wrap(err, "getting specified raffle")
} }
if err = d.db.DB(). if err = helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error {
Model(&raffle{}). return tx.Model(&raffle{}).
Where("id = ?", raffleID). Where("id = ?", raffleID).
Updates(map[string]any{ Updates(map[string]any{
"CloseAt": time.Now().UTC().Add(duration), "CloseAt": time.Now().UTC().Add(duration),
"status": raffleStatusActive, "status": raffleStatusActive,
}). }).
Error; err != nil { Error
}); err != nil {
return errors.Wrap(err, "updating raffle") return errors.Wrap(err, "updating raffle")
} }
@ -557,11 +593,12 @@ func (d *dbClient) Update(r raffle) error {
r.Entries = nil r.Entries = nil
r.TextReminderNextSend = old.TextReminderNextSend r.TextReminderNextSend = old.TextReminderNextSend
if err := d.db.DB(). if err := helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error {
Model(&raffle{}). return tx.Model(&raffle{}).
Where("id = ?", r.ID). Where("id = ?", r.ID).
Updates(&r). Updates(&r).
Error; err != nil { Error
}); err != nil {
return errors.Wrap(err, "updating raffle") return errors.Wrap(err, "updating raffle")
} }

25
internal/helpers/retry.go Normal file
View file

@ -0,0 +1,25 @@
package helpers
import (
"github.com/Luzifer/go_helpers/v2/backoff"
"gorm.io/gorm"
)
const (
maxRetries = 5
)
// Retry contains a standard set of configuration parameters for an
// exponential backoff to be used throughout the bot
func Retry(fn func() error) error {
return backoff.NewBackoff().
WithMaxIterations(maxRetries).
Retry(fn)
}
// RetryTransaction takes a database object and a function acting on
// the database. The function will be run in a transaction on the
// database and will be retried as if executed using Retry
func RetryTransaction(db *gorm.DB, fn func(tx *gorm.DB) error) error {
return Retry(func() error { return db.Transaction(fn) })
}

View file

@ -7,7 +7,9 @@ import (
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause" "gorm.io/gorm/clause"
"github.com/Luzifer/go_helpers/v2/backoff"
"github.com/Luzifer/go_helpers/v2/str" "github.com/Luzifer/go_helpers/v2/str"
"github.com/Luzifer/twitch-bot/v3/internal/helpers"
"github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/pkg/database"
"github.com/Luzifer/twitch-bot/v3/pkg/twitch" "github.com/Luzifer/twitch-bot/v3/pkg/twitch"
) )
@ -51,9 +53,8 @@ func (s *Service) CopyDatabase(src, target *gorm.DB) error {
return database.CopyObjects(src, target, &extendedPermission{}) return database.CopyObjects(src, target, &extendedPermission{})
} }
func (s Service) GetBotUsername() (string, error) { func (s Service) GetBotUsername() (botUsername string, err error) {
var botUsername string err = s.db.ReadCoreMeta(coreMetaKeyBotUsername, &botUsername)
err := s.db.ReadCoreMeta(coreMetaKeyBotUsername, &botUsername)
return botUsername, errors.Wrap(err, "reading bot username") return botUsername, errors.Wrap(err, "reading bot username")
} }
@ -63,11 +64,15 @@ func (s Service) GetChannelPermissions(channel string) ([]string, error) {
perm extendedPermission perm extendedPermission
) )
if err = s.db.DB().First(&perm, "channel = ?", strings.TrimLeft(channel, "#")).Error; err != nil { if err = helpers.Retry(func() error {
err = s.db.DB().First(&perm, "channel = ?", strings.TrimLeft(channel, "#")).Error
if errors.Is(err, gorm.ErrRecordNotFound) { if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil return nil
} }
return nil, errors.Wrap(err, "getting twitch credential from database")
return errors.Wrap(err, "getting twitch credential from database")
}); err != nil {
return nil, err
} }
return strings.Split(perm.Scopes, " "), nil return strings.Split(perm.Scopes, " "), nil
@ -149,11 +154,14 @@ func (s Service) GetTwitchClientForChannel(channel string, cfg ClientConfig) (*t
perm extendedPermission perm extendedPermission
) )
if err = s.db.DB().First(&perm, "channel = ?", strings.TrimLeft(channel, "#")).Error; err != nil { if err = helpers.Retry(func() error {
err = s.db.DB().First(&perm, "channel = ?", strings.TrimLeft(channel, "#")).Error
if errors.Is(err, gorm.ErrRecordNotFound) { if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrChannelNotAuthorized return backoff.NewErrCannotRetry(ErrChannelNotAuthorized)
} }
return nil, errors.Wrap(err, "getting twitch credential from database") return errors.Wrap(err, "getting twitch credential from database")
}); err != nil {
return nil, err
} }
if perm.AccessToken, err = s.db.DecryptField(perm.AccessToken); err != nil { if perm.AccessToken, err = s.db.DecryptField(perm.AccessToken); err != nil {
@ -204,13 +212,14 @@ func (s Service) HasPermissionsForChannel(channel string, scopes ...string) (boo
return true, nil return true, nil
} }
func (s Service) ListPermittedChannels() ([]string, error) { func (s Service) ListPermittedChannels() (out []string, err error) {
var perms []extendedPermission var perms []extendedPermission
if err := s.db.DB().Find(&perms).Error; err != nil { if err = helpers.Retry(func() error {
return nil, errors.Wrap(err, "listing permissions") return errors.Wrap(s.db.DB().Find(&perms).Error, "listing permissions")
}); err != nil {
return nil, err
} }
var out []string
for _, perm := range perms { for _, perm := range perms {
out = append(out, perm.Channel) out = append(out, perm.Channel)
} }
@ -220,14 +229,18 @@ func (s Service) ListPermittedChannels() ([]string, error) {
func (s Service) RemoveAllExtendedTwitchCredentials() error { func (s Service) RemoveAllExtendedTwitchCredentials() error {
return errors.Wrap( return errors.Wrap(
s.db.DB().Delete(&extendedPermission{}, "1 = 1").Error, helpers.RetryTransaction(s.db.DB(), func(tx *gorm.DB) error {
return tx.Delete(&extendedPermission{}, "1 = 1").Error
}),
"deleting data from table", "deleting data from table",
) )
} }
func (s Service) RemoveExendedTwitchCredentials(channel string) error { func (s Service) RemoveExendedTwitchCredentials(channel string) error {
return errors.Wrap( return errors.Wrap(
s.db.DB().Delete(&extendedPermission{}, "channel = ?", strings.TrimLeft(channel, "#")).Error, helpers.RetryTransaction(s.db.DB(), func(tx *gorm.DB) error {
return tx.Delete(&extendedPermission{}, "channel = ?", strings.TrimLeft(channel, "#")).Error
}),
"deleting data from table", "deleting data from table",
) )
} }
@ -249,15 +262,17 @@ func (s Service) SetExtendedTwitchCredentials(channel, accessToken, refreshToken
} }
return errors.Wrap( return errors.Wrap(
s.db.DB().Clauses(clause.OnConflict{ helpers.RetryTransaction(s.db.DB(), func(tx *gorm.DB) error {
Columns: []clause.Column{{Name: "channel"}}, return tx.Clauses(clause.OnConflict{
DoUpdates: clause.AssignmentColumns([]string{"access_token", "refresh_token", "scopes"}), Columns: []clause.Column{{Name: "channel"}},
}).Create(extendedPermission{ DoUpdates: clause.AssignmentColumns([]string{"access_token", "refresh_token", "scopes"}),
Channel: strings.TrimLeft(channel, "#"), }).Create(extendedPermission{
AccessToken: accessToken, Channel: strings.TrimLeft(channel, "#"),
RefreshToken: refreshToken, AccessToken: accessToken,
Scopes: strings.Join(scope, " "), RefreshToken: refreshToken,
}).Error, Scopes: strings.Join(scope, " "),
}).Error
}),
"inserting data into table", "inserting data into table",
) )
} }

View file

@ -12,6 +12,8 @@ import (
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause" "gorm.io/gorm/clause"
"github.com/Luzifer/go_helpers/v2/backoff"
"github.com/Luzifer/twitch-bot/v3/internal/helpers"
"github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/pkg/database"
"github.com/Luzifer/twitch-bot/v3/plugins" "github.com/Luzifer/twitch-bot/v3/plugins"
) )
@ -88,7 +90,13 @@ func (Service) getPermitTimerKey(channel, username string) string {
func (s Service) HasTimer(id string) (bool, error) { func (s Service) HasTimer(id string) (bool, error) {
var t timer var t timer
err := s.db.DB().First(&t, "id = ? AND expires_at >= ?", id, time.Now().UTC()).Error err := helpers.Retry(func() error {
err := s.db.DB().First(&t, "id = ? AND expires_at >= ?", id, time.Now().UTC()).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return backoff.NewErrCannotRetry(err)
}
return err
})
switch { switch {
case err == nil: case err == nil:
return true, nil return true, nil
@ -103,19 +111,23 @@ func (s Service) HasTimer(id string) (bool, error) {
func (s Service) SetTimer(id string, expiry time.Time) error { func (s Service) SetTimer(id string, expiry time.Time) error {
return errors.Wrap( return errors.Wrap(
s.db.DB().Clauses(clause.OnConflict{ helpers.RetryTransaction(s.db.DB(), func(tx *gorm.DB) error {
Columns: []clause.Column{{Name: "id"}}, return tx.Clauses(clause.OnConflict{
DoUpdates: clause.AssignmentColumns([]string{"expires_at"}), Columns: []clause.Column{{Name: "id"}},
}).Create(timer{ DoUpdates: clause.AssignmentColumns([]string{"expires_at"}),
ID: id, }).Create(timer{
ExpiresAt: expiry.UTC(), ID: id,
}).Error, ExpiresAt: expiry.UTC(),
}).Error
}),
"storing counter in database", "storing counter in database",
) )
} }
func (s Service) cleanupTimers() { func (s Service) cleanupTimers() {
if err := s.db.DB().Delete(&timer{}, "expires_at < ?", time.Now().UTC()).Error; err != nil { if err := helpers.RetryTransaction(s.db.DB(), func(tx *gorm.DB) error {
return tx.Delete(&timer{}, "expires_at < ?", time.Now().UTC()).Error
}); err != nil {
logrus.WithError(err).Error("cleaning up expired timers") logrus.WithError(err).Error("cleaning up expired timers")
} }
} }

View file

@ -89,7 +89,6 @@ func New(driverName, connString, encryptionSecret string) (Connector, error) {
} }
func (c connector) Close() error { func (c connector) Close() error {
// return errors.Wrap(c.db.Close(), "closing database")
return nil return nil
} }

View file

@ -13,6 +13,7 @@ import (
"gorm.io/gorm/clause" "gorm.io/gorm/clause"
"github.com/Luzifer/go_helpers/v2/backoff" "github.com/Luzifer/go_helpers/v2/backoff"
"github.com/Luzifer/twitch-bot/v3/internal/helpers"
) )
const ( const (
@ -31,7 +32,9 @@ type (
// DeleteCoreMeta removes a core_kv table entry // DeleteCoreMeta removes a core_kv table entry
func (c connector) DeleteCoreMeta(key string) error { func (c connector) DeleteCoreMeta(key string) error {
return errors.Wrap( return errors.Wrap(
c.db.Delete(&coreKV{}, "name = ?", key).Error, helpers.RetryTransaction(c.db, func(tx *gorm.DB) error {
return tx.Delete(&coreKV{}, "name = ?", key).Error
}),
"deleting key from database", "deleting key from database",
) )
} }
@ -61,7 +64,9 @@ func (c connector) ReadEncryptedCoreMeta(key string, value any) error {
// ResetEncryptedCoreMeta removes all CoreKV entries from the database // ResetEncryptedCoreMeta removes all CoreKV entries from the database
func (c connector) ResetEncryptedCoreMeta() error { func (c connector) ResetEncryptedCoreMeta() error {
return errors.Wrap( return errors.Wrap(
c.db.Delete(&coreKV{}, "value LIKE ?", "U2FsdGVkX1%").Error, helpers.RetryTransaction(c.db, func(tx *gorm.DB) error {
return tx.Delete(&coreKV{}, "value LIKE ?", "U2FsdGVkX1%").Error
}),
"removing encrypted meta entries", "removing encrypted meta entries",
) )
} }
@ -110,11 +115,14 @@ func (c connector) ValidateEncryption() error {
func (c connector) readCoreMeta(key string, value any, processor func(string) (string, error)) (err error) { func (c connector) readCoreMeta(key string, value any, processor func(string) (string, error)) (err error) {
var data coreKV var data coreKV
if err = c.db.First(&data, "name = ?", key).Error; err != nil { if err = helpers.Retry(func() error {
err = c.db.First(&data, "name = ?", key).Error
if errors.Is(err, gorm.ErrRecordNotFound) { if errors.Is(err, gorm.ErrRecordNotFound) {
return ErrCoreMetaNotFound return ErrCoreMetaNotFound
} }
return errors.Wrap(err, "querying core meta table") return errors.Wrap(err, "querying core meta table")
}); err != nil {
return err
} }
if data.Value == "" { if data.Value == "" {
@ -149,10 +157,12 @@ func (c connector) storeCoreMeta(key string, value any, processor func(string) (
data := coreKV{Name: key, Value: encValue} data := coreKV{Name: key, Value: encValue}
return errors.Wrap( return errors.Wrap(
c.db.Clauses(clause.OnConflict{ helpers.RetryTransaction(c.db, func(tx *gorm.DB) error {
Columns: []clause.Column{{Name: "name"}}, return tx.Clauses(clause.OnConflict{
DoUpdates: clause.AssignmentColumns([]string{"value"}), Columns: []clause.Column{{Name: "name"}},
}).Create(data).Error, DoUpdates: clause.AssignmentColumns([]string{"value"}),
}).Create(data).Error
}),
"upserting core meta value", "upserting core meta value",
) )
} }