From 0d10b5165f5723fa6224a8a6e806e6d449719c44 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Tue, 28 Nov 2023 00:09:27 +0100 Subject: [PATCH] [core] Add retries for database access methods to compensate for database temporarily not being available. This is not suitable for longer database outages as 5 retries with a 1.5 multiplier will not give much time to recover but should cover for cluster changes and short network hickups. Signed-off-by: Knut Ahlers --- internal/actors/counter/database.go | 51 ++--- internal/actors/punish/database.go | 36 ++-- internal/actors/quotedb/database.go | 56 +++--- internal/actors/variables/database.go | 24 ++- internal/apimodules/customevent/database.go | 35 ++-- internal/apimodules/overlays/database.go | 20 +- internal/apimodules/raffle/database.go | 201 ++++++++++++-------- internal/helpers/retry.go | 25 +++ internal/service/access/access.go | 63 +++--- internal/service/timer/timer.go | 30 ++- pkg/database/connector.go | 1 - pkg/database/coreKV.go | 24 ++- 12 files changed, 359 insertions(+), 207 deletions(-) create mode 100644 internal/helpers/retry.go diff --git a/internal/actors/counter/database.go b/internal/actors/counter/database.go index 1793ca5..a0d915b 100644 --- a/internal/actors/counter/database.go +++ b/internal/actors/counter/database.go @@ -5,6 +5,7 @@ import ( "gorm.io/gorm" "gorm.io/gorm/clause" + "github.com/Luzifer/twitch-bot/v3/internal/helpers" "github.com/Luzifer/twitch-bot/v3/pkg/database" ) @@ -18,17 +19,16 @@ type ( func GetCounterValue(db database.Connector, counterName string) (int64, error) { var c Counter - err := db.DB().First(&c, "name = ?", counterName).Error - switch { - case err == nil: - return c.Value, nil + err := helpers.Retry(func() error { + err := db.DB().First(&c, "name = ?", counterName).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } - case errors.Is(err, gorm.ErrRecordNotFound): - return 0, nil + return err + }) - default: - return 0, errors.Wrap(err, "querying counter") - } + return c.Value, errors.Wrap(err, "querying counter") } func UpdateCounter(db database.Connector, counterName string, value int64, absolute bool) error { @@ -42,10 +42,12 @@ func UpdateCounter(db database.Connector, counterName string, value int64, absol } return errors.Wrap( - db.DB().Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "name"}}, - DoUpdates: clause.AssignmentColumns([]string{"value"}), - }).Create(Counter{Name: counterName, Value: value}).Error, + helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { + return tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "name"}}, + DoUpdates: clause.AssignmentColumns([]string{"value"}), + }).Create(Counter{Name: counterName, Value: value}).Error + }), "storing counter value", ) } @@ -53,11 +55,12 @@ func UpdateCounter(db database.Connector, counterName string, value int64, absol func getCounterRank(db database.Connector, prefix, name string) (rank, count int64, err error) { var cc []Counter - err = db.DB(). - Order("value DESC"). - Find(&cc, "name LIKE ?", prefix+"%"). - Error - if err != nil { + if err = helpers.Retry(func() error { + return db.DB(). + Order("value DESC"). + Find(&cc, "name LIKE ?", prefix+"%"). + Error + }); err != nil { return 0, 0, errors.Wrap(err, "querying counters") } @@ -74,11 +77,13 @@ func getCounterRank(db database.Connector, prefix, name string) (rank, count int func getCounterTopList(db database.Connector, prefix string, n int) ([]Counter, error) { var cc []Counter - err := db.DB(). - Order("value DESC"). - Limit(n). - Find(&cc, "name LIKE ?", prefix+"%"). - Error + err := helpers.Retry(func() error { + return db.DB(). + Order("value DESC"). + Limit(n). + Find(&cc, "name LIKE ?", prefix+"%"). + Error + }) return cc, errors.Wrap(err, "querying counters") } diff --git a/internal/actors/punish/database.go b/internal/actors/punish/database.go index eff807f..e77f1ba 100644 --- a/internal/actors/punish/database.go +++ b/internal/actors/punish/database.go @@ -8,6 +8,8 @@ import ( "gorm.io/gorm" "gorm.io/gorm/clause" + "github.com/Luzifer/go_helpers/v2/backoff" + "github.com/Luzifer/twitch-bot/v3/internal/helpers" "github.com/Luzifer/twitch-bot/v3/pkg/database" ) @@ -23,7 +25,7 @@ type ( func calculateCurrentPunishments(db database.Connector) (err error) { var ps []punishLevel - if err = db.DB().Find(&ps).Error; err != nil { + if err = helpers.Retry(func() error { return db.DB().Find(&ps).Error }); err != nil { return errors.Wrap(err, "querying punish_levels") } @@ -72,7 +74,9 @@ func deletePunishment(db database.Connector, channel, user, uuid string) error { func deletePunishmentForKey(db database.Connector, key string) error { return errors.Wrap( - db.DB().Delete(&punishLevel{}, "key = ?", key).Error, + helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { + return tx.Delete(&punishLevel{}, "key = ?", key).Error + }), "deleting punishment info", ) } @@ -87,7 +91,13 @@ func getPunishment(db database.Connector, channel, user, uuid string) (*levelCon p punishLevel ) - err := db.DB().First(&p, "key = ?", getDBKey(channel, user, uuid)).Error + err := helpers.Retry(func() error { + err := db.DB().First(&p, "key = ?", getDBKey(channel, user, uuid)).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return backoff.NewErrCannotRetry(err) + } + return err + }) switch { case err == nil: return &levelConfig{ @@ -114,15 +124,17 @@ func setPunishmentForKey(db database.Connector, key string, lc *levelConfig) err } return errors.Wrap( - db.DB().Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "key"}}, - UpdateAll: true, - }).Create(punishLevel{ - Key: key, - LastLevel: lc.LastLevel, - Executed: lc.Executed, - Cooldown: lc.Cooldown, - }).Error, + helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { + return tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "key"}}, + UpdateAll: true, + }).Create(punishLevel{ + Key: key, + LastLevel: lc.LastLevel, + Executed: lc.Executed, + Cooldown: lc.Cooldown, + }).Error + }), "updating punishment info", ) } diff --git a/internal/actors/quotedb/database.go b/internal/actors/quotedb/database.go index cb96079..be11595 100644 --- a/internal/actors/quotedb/database.go +++ b/internal/actors/quotedb/database.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "gorm.io/gorm" + "github.com/Luzifer/twitch-bot/v3/internal/helpers" "github.com/Luzifer/twitch-bot/v3/pkg/database" ) @@ -20,11 +21,13 @@ type ( func AddQuote(db database.Connector, channel, quoteStr string) error { return errors.Wrap( - db.DB().Create(quote{ - Channel: channel, - CreatedAt: time.Now().UnixNano(), - Quote: quoteStr, - }).Error, + helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { + return tx.Create(quote{ + Channel: channel, + CreatedAt: time.Now().UnixNano(), + Quote: quoteStr, + }).Error + }), "adding quote to database", ) } @@ -36,14 +39,18 @@ func DelQuote(db database.Connector, channel string, quoteIdx int) error { } return errors.Wrap( - db.DB().Delete("e{}, "channel = ? AND created_at = ?", channel, createdAt).Error, + helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { + return tx.Delete("e{}, "channel = ? AND created_at = ?", channel, createdAt).Error + }), "deleting quote", ) } func GetChannelQuotes(db database.Connector, channel string) ([]string, error) { var qs []quote - if err := db.DB().Where("channel = ?", channel).Order("created_at").Find(&qs).Error; err != nil { + if err := helpers.Retry(func() error { + return db.DB().Where("channel = ?", channel).Order("created_at").Find(&qs).Error + }); err != nil { return nil, errors.Wrap(err, "querying quotes") } @@ -57,11 +64,13 @@ func GetChannelQuotes(db database.Connector, channel string) ([]string, error) { func GetMaxQuoteIdx(db database.Connector, channel string) (int, error) { var count int64 - if err := db.DB(). - Model("e{}). - Where("channel = ?", channel). - Count(&count). - Error; err != nil { + if err := helpers.Retry(func() error { + return db.DB(). + Model("e{}). + Where("channel = ?", channel). + Count(&count). + Error + }); err != nil { return 0, errors.Wrap(err, "getting quote count") } @@ -83,11 +92,13 @@ func GetQuoteRaw(db database.Connector, channel string, quoteIdx int) (int, int6 } var q quote - err := db.DB(). - Where("channel = ?", channel). - Limit(1). - Offset(quoteIdx - 1). - First(&q).Error + err := helpers.Retry(func() error { + return db.DB(). + Where("channel = ?", channel). + Limit(1). + Offset(quoteIdx - 1). + First(&q).Error + }) switch { case err == nil: @@ -103,7 +114,7 @@ func GetQuoteRaw(db database.Connector, channel string, quoteIdx int) (int, int6 func SetQuotes(db database.Connector, channel string, quotes []string) error { return errors.Wrap( - db.DB().Transaction(func(tx *gorm.DB) error { + helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { if err := tx.Where("channel = ?", channel).Delete("e{}).Error; err != nil { return errors.Wrap(err, "deleting quotes for channel") } @@ -134,10 +145,11 @@ func UpdateQuote(db database.Connector, channel string, idx int, quoteStr string } return errors.Wrap( - db.DB(). - Where("channel = ? AND created_at = ?", channel, createdAt). - Update("quote", quoteStr). - Error, + helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { + return tx.Where("channel = ? AND created_at = ?", channel, createdAt). + Update("quote", quoteStr). + Error + }), "updating quote", ) } diff --git a/internal/actors/variables/database.go b/internal/actors/variables/database.go index 015baba..b83cb36 100644 --- a/internal/actors/variables/database.go +++ b/internal/actors/variables/database.go @@ -5,6 +5,8 @@ import ( "gorm.io/gorm" "gorm.io/gorm/clause" + "github.com/Luzifer/go_helpers/v2/backoff" + "github.com/Luzifer/twitch-bot/v3/internal/helpers" "github.com/Luzifer/twitch-bot/v3/pkg/database" ) @@ -17,7 +19,13 @@ type ( func GetVariable(db database.Connector, key string) (string, error) { var v variable - err := db.DB().First(&v, "name = ?", key).Error + err := helpers.Retry(func() error { + err := db.DB().First(&v, "name = ?", key).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return backoff.NewErrCannotRetry(err) + } + return err + }) switch { case err == nil: return v.Value, nil @@ -32,17 +40,21 @@ func GetVariable(db database.Connector, key string) (string, error) { func SetVariable(db database.Connector, key, value string) error { return errors.Wrap( - db.DB().Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "name"}}, - DoUpdates: clause.AssignmentColumns([]string{"value"}), - }).Create(variable{Name: key, Value: value}).Error, + helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { + return tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "name"}}, + DoUpdates: clause.AssignmentColumns([]string{"value"}), + }).Create(variable{Name: key, Value: value}).Error + }), "updating value in database", ) } func RemoveVariable(db database.Connector, key string) error { return errors.Wrap( - db.DB().Delete(&variable{}, "name = ?", key).Error, + helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { + return tx.Delete(&variable{}, "name = ?", key).Error + }), "deleting value in database", ) } diff --git a/internal/apimodules/customevent/database.go b/internal/apimodules/customevent/database.go index a7f8cf2..02bd3e4 100644 --- a/internal/apimodules/customevent/database.go +++ b/internal/apimodules/customevent/database.go @@ -7,7 +7,9 @@ import ( "github.com/gofrs/uuid/v3" "github.com/pkg/errors" + "gorm.io/gorm" + "github.com/Luzifer/twitch-bot/v3/internal/helpers" "github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/plugins" ) @@ -25,20 +27,23 @@ type ( func cleanupStoredEvents(db database.Connector) error { return errors.Wrap( - db.DB(). - Where("scheduled_at < ?", time.Now().Add(cleanupTimeout*-1).UTC()). - Delete(&storedCustomEvent{}). - Error, + helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { + return tx.Where("scheduled_at < ?", time.Now().Add(cleanupTimeout*-1).UTC()). + Delete(&storedCustomEvent{}). + Error + }), "deleting past events", ) } func getFutureEvents(db database.Connector) (out []storedCustomEvent, err error) { return out, errors.Wrap( - db.DB(). - Where("scheduled_at >= ?", time.Now().UTC()). - Find(&out). - Error, + helpers.Retry(func() error { + return db.DB(). + Where("scheduled_at >= ?", time.Now().UTC()). + Find(&out). + Error + }), "getting events from database", ) } @@ -50,12 +55,14 @@ func storeEvent(db database.Connector, scheduleAt time.Time, channel string, fie } return errors.Wrap( - db.DB().Create(storedCustomEvent{ - ID: uuid.Must(uuid.NewV4()).String(), - Channel: channel, - Fields: fieldBuf.String(), - ScheduledAt: scheduleAt, - }).Error, + helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { + return tx.Create(storedCustomEvent{ + ID: uuid.Must(uuid.NewV4()).String(), + Channel: channel, + Fields: fieldBuf.String(), + ScheduledAt: scheduleAt, + }).Error + }), "storing event", ) } diff --git a/internal/apimodules/overlays/database.go b/internal/apimodules/overlays/database.go index 701616c..fa49657 100644 --- a/internal/apimodules/overlays/database.go +++ b/internal/apimodules/overlays/database.go @@ -7,7 +7,9 @@ import ( "time" "github.com/pkg/errors" + "gorm.io/gorm" + "github.com/Luzifer/twitch-bot/v3/internal/helpers" "github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/plugins" ) @@ -29,12 +31,14 @@ func AddChannelEvent(db database.Connector, channel string, evt SocketMessage) e } return errors.Wrap( - db.DB().Create(&overlaysEvent{ - Channel: channel, - CreatedAt: evt.Time.UTC(), - EventType: evt.Type, - Fields: strings.TrimSpace(buf.String()), - }).Error, + helpers.RetryTransaction(db.DB(), func(tx *gorm.DB) error { + return tx.Create(&overlaysEvent{ + Channel: channel, + CreatedAt: evt.Time.UTC(), + EventType: evt.Type, + Fields: strings.TrimSpace(buf.String()), + }).Error + }), "storing event to database", ) } @@ -42,7 +46,9 @@ func AddChannelEvent(db database.Connector, channel string, evt SocketMessage) e func GetChannelEvents(db database.Connector, channel string) ([]SocketMessage, error) { var evts []overlaysEvent - if err := db.DB().Where("channel = ?", channel).Order("created_at").Find(&evts).Error; err != nil { + if err := helpers.Retry(func() error { + return db.DB().Where("channel = ?", channel).Order("created_at").Find(&evts).Error + }); err != nil { return nil, errors.Wrap(err, "querying channel events") } diff --git a/internal/apimodules/raffle/database.go b/internal/apimodules/raffle/database.go index 711f31b..ff717bc 100644 --- a/internal/apimodules/raffle/database.go +++ b/internal/apimodules/raffle/database.go @@ -7,7 +7,9 @@ import ( "github.com/pkg/errors" "gopkg.in/irc.v4" + "gorm.io/gorm" + "github.com/Luzifer/twitch-bot/v3/internal/helpers" "github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/plugins" ) @@ -121,10 +123,12 @@ func newDBClient(db database.Connector) *dbClient { func (d *dbClient) AutoCloseExpired() (err error) { var rr []raffle - if err = d.db.DB(). - Where("status = ? AND close_at IS NOT NULL AND close_at < ?", raffleStatusActive, time.Now().UTC()). - Find(&rr). - Error; err != nil { + if err = helpers.Retry(func() error { + return d.db.DB(). + Where("status = ? AND close_at IS NOT NULL AND close_at < ?", raffleStatusActive, time.Now().UTC()). + Find(&rr). + Error + }); err != nil { return errors.Wrap(err, "fetching raffles to close") } @@ -142,10 +146,12 @@ func (d *dbClient) AutoCloseExpired() (err error) { func (d *dbClient) AutoSendReminders() (err error) { var rr []raffle - if err = d.db.DB(). - Where("status = ? AND text_reminder_post = ? AND (text_reminder_next_send IS NULL OR text_reminder_next_send < ?)", raffleStatusActive, true, time.Now().UTC()). - Find(&rr). - Error; err != nil { + if err = helpers.Retry(func() error { + return d.db.DB(). + Where("status = ? AND text_reminder_post = ? AND (text_reminder_next_send IS NULL OR text_reminder_next_send < ?)", raffleStatusActive, true, time.Now().UTC()). + Find(&rr). + Error + }); err != nil { return errors.Wrap(err, "fetching raffles to send reminders") } @@ -162,10 +168,12 @@ func (d *dbClient) AutoSendReminders() (err error) { func (d *dbClient) AutoStart() (err error) { var rr []raffle - if err = d.db.DB(). - Where("status = ? AND auto_start_at IS NOT NULL AND auto_start_at < ?", raffleStatusPlanned, time.Now().UTC()). - Find(&rr). - Error; err != nil { + if err = helpers.Retry(func() error { + return d.db.DB(). + Where("status = ? AND auto_start_at IS NOT NULL AND auto_start_at < ?", raffleStatusPlanned, time.Now().UTC()). + Find(&rr). + Error + }); err != nil { return errors.Wrap(err, "fetching raffles to start") } @@ -208,10 +216,12 @@ func (d *dbClient) Close(raffleID uint64) error { return errors.Wrap(err, "getting raffle") } - if err = d.db.DB().Model(&raffle{}). - Where("id = ?", raffleID). - Update("status", raffleStatusEnded). - Error; err != nil { + if err = helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error { + return tx.Model(&raffle{}). + Where("id = ?", raffleID). + Update("status", raffleStatusEnded). + Error + }); err != nil { return errors.Wrap(err, "setting status closed") } @@ -231,7 +241,9 @@ func (d *dbClient) Close(raffleID uint64) error { // the database without modification and therefore need to be filled // before calling this function func (d *dbClient) Create(r raffle) error { - if err := d.db.DB().Create(&r).Error; err != nil { + if err := helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error { + return tx.Create(&r).Error + }); err != nil { return errors.Wrap(err, "creating database record") } @@ -242,17 +254,23 @@ func (d *dbClient) Create(r raffle) error { // Delete removes all entries for the given raffle and afterwards // deletes the raffle itself func (d *dbClient) Delete(raffleID uint64) (err error) { - if err = d.db.DB(). - Where("raffle_id = ?", raffleID). - Delete(&raffleEntry{}). - Error; err != nil { - return errors.Wrap(err, "deleting raffle entries") - } + if err = helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error { + if err = tx. + Where("raffle_id = ?", raffleID). + Delete(&raffleEntry{}). + Error; err != nil { + return errors.Wrap(err, "deleting raffle entries") + } - if err = d.db.DB(). - Where("id = ?", raffleID). - Delete(&raffle{}).Error; err != nil { - return errors.Wrap(err, "creating database record") + if err = tx. + Where("id = ?", raffleID). + Delete(&raffle{}).Error; err != nil { + return errors.Wrap(err, "creating database record") + } + + return nil + }); err != nil { + return errors.Wrap(err, "deleting raffle") } frontendNotify(frontendNotifyEventRaffleChange) @@ -263,7 +281,7 @@ func (d *dbClient) Delete(raffleID uint64) (err error) { // the database without modification and therefore need to be filled // before calling this function func (d *dbClient) Enter(re raffleEntry) error { - if err := d.db.DB().Create(&re).Error; err != nil { + if err := helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error { return tx.Create(&re).Error }); err != nil { return errors.Wrap(err, "creating database record") } @@ -274,11 +292,13 @@ func (d *dbClient) Enter(re raffleEntry) error { // Get retrieves a raffle from the database func (d *dbClient) Get(raffleID uint64) (out raffle, err error) { return out, errors.Wrap( - d.db.DB(). - Where("raffles.id = ?", raffleID). - Preload("Entries"). - First(&out). - Error, + helpers.Retry(func() error { + return d.db.DB(). + Where("raffles.id = ?", raffleID). + Preload("Entries"). + First(&out). + Error + }), "getting raffle from database", ) } @@ -302,10 +322,12 @@ func (d *dbClient) GetByChannelAndKeyword(channel, keyword string) (raffle, erro // List returns a list of all known raffles func (d *dbClient) List() (raffles []raffle, _ error) { return raffles, errors.Wrap( - d.db.DB().Model(&raffle{}). - Order("id DESC"). - Find(&raffles). - Error, + helpers.Retry(func() error { + return d.db.DB().Model(&raffle{}). + Order("id DESC"). + Find(&raffles). + Error + }), "updating column", ) } @@ -314,10 +336,12 @@ func (d *dbClient) List() (raffles []raffle, _ error) { // sent for the given raffle ID. No other fields are modified func (d *dbClient) PatchNextReminderSend(raffleID uint64, next time.Time) error { return errors.Wrap( - d.db.DB().Model(&raffle{}). - Where("id = ?", raffleID). - Update("text_reminder_next_send", next). - Error, + helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error { + return tx.Model(&raffle{}). + Where("id = ?", raffleID). + Update("text_reminder_next_send", next). + Error + }), "updating column", ) } @@ -336,10 +360,12 @@ func (d *dbClient) PickWinner(raffleID uint64) error { } speakUpUntil := time.Now().UTC().Add(r.WaitForResponse) - if err = d.db.DB().Model(&raffleEntry{}). - Where("id = ?", winner.ID). - Updates(map[string]any{"was_picked": true, "speak_up_until": speakUpUntil}). - Error; err != nil { + if err = helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error { + return tx.Model(&raffleEntry{}). + Where("id = ?", winner.ID). + Updates(map[string]any{"was_picked": true, "speak_up_until": speakUpUntil}). + Error + }); err != nil { return errors.Wrap(err, "updating winner") } @@ -364,10 +390,12 @@ func (d *dbClient) PickWinner(raffleID uint64) error { // RedrawWinner marks the previous winner as redrawn (and therefore // crossed out as winner in the interface) and picks a new one func (d *dbClient) RedrawWinner(raffleID, winnerID uint64) error { - if err := d.db.DB().Model(&raffleEntry{}). - Where("id = ?", winnerID). - Update("was_redrawn", true). - Error; err != nil { + if err := helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error { + return tx.Model(&raffleEntry{}). + Where("id = ?", winnerID). + Update("was_redrawn", true). + Error + }); err != nil { return errors.Wrap(err, "updating previous winner") } @@ -385,10 +413,12 @@ func (d *dbClient) RefreshActiveRaffles() error { tmp = map[string]uint64{} ) - if err := d.db.DB(). - Where("status = ?", raffleStatusActive). - Find(&actives). - Error; err != nil { + if err := helpers.Retry(func() error { + return d.db.DB(). + Where("status = ?", raffleStatusActive). + Find(&actives). + Error + }); err != nil { return errors.Wrap(err, "fetching active raffles") } @@ -411,19 +441,23 @@ func (d *dbClient) RefreshSpeakUp() error { tmp = map[string]*speakUpWait{} ) - if err := d.db.DB().Debug(). - Where("speak_up_until IS NOT NULL AND speak_up_until > ?", time.Now().UTC()). - Find(&res). - Error; err != nil { + if err := helpers.Retry(func() error { + return d.db.DB().Debug(). + Where("speak_up_until IS NOT NULL AND speak_up_until > ?", time.Now().UTC()). + Find(&res). + Error + }); err != nil { return errors.Wrap(err, "querying active entries") } for _, e := range res { var r raffle - if err := d.db.DB(). - Where("id = ?", e.RaffleID). - First(&r). - Error; err != nil { + if err := helpers.Retry(func() error { + return d.db.DB(). + Where("id = ?", e.RaffleID). + First(&r). + Error + }); err != nil { return errors.Wrap(err, "fetching raffle for entry") } tmp[strings.Join([]string{r.Channel, e.UserLogin}, ":")] = &speakUpWait{RaffleEntryID: e.ID, Until: *e.SpeakUpUntil} @@ -445,14 +479,15 @@ func (d *dbClient) RegisterSpeakUp(channel, user, message string) error { return nil } - if err := d.db.DB(). - Model(&raffleEntry{}). - Where("id = ?", w.RaffleEntryID). - Updates(map[string]any{ - "DrawResponse": message, - "SpeakUpUntil": nil, - }). - Error; err != nil { + if err := helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error { + return tx.Model(&raffleEntry{}). + Where("id = ?", w.RaffleEntryID). + Updates(map[string]any{ + "DrawResponse": message, + "SpeakUpUntil": nil, + }). + Error + }); err != nil { return errors.Wrap(err, "registering speak-up") } @@ -472,14 +507,15 @@ func (d *dbClient) Reopen(raffleID uint64, duration time.Duration) error { return errors.Wrap(err, "getting specified raffle") } - if err = d.db.DB(). - Model(&raffle{}). - Where("id = ?", raffleID). - Updates(map[string]any{ - "CloseAt": time.Now().UTC().Add(duration), - "status": raffleStatusActive, - }). - Error; err != nil { + if err = helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error { + return tx.Model(&raffle{}). + Where("id = ?", raffleID). + Updates(map[string]any{ + "CloseAt": time.Now().UTC().Add(duration), + "status": raffleStatusActive, + }). + Error + }); err != nil { return errors.Wrap(err, "updating raffle") } @@ -557,11 +593,12 @@ func (d *dbClient) Update(r raffle) error { r.Entries = nil r.TextReminderNextSend = old.TextReminderNextSend - if err := d.db.DB(). - Model(&raffle{}). - Where("id = ?", r.ID). - Updates(&r). - Error; err != nil { + if err := helpers.RetryTransaction(d.db.DB(), func(tx *gorm.DB) error { + return tx.Model(&raffle{}). + Where("id = ?", r.ID). + Updates(&r). + Error + }); err != nil { return errors.Wrap(err, "updating raffle") } diff --git a/internal/helpers/retry.go b/internal/helpers/retry.go new file mode 100644 index 0000000..94b3bb4 --- /dev/null +++ b/internal/helpers/retry.go @@ -0,0 +1,25 @@ +package helpers + +import ( + "github.com/Luzifer/go_helpers/v2/backoff" + "gorm.io/gorm" +) + +const ( + maxRetries = 5 +) + +// Retry contains a standard set of configuration parameters for an +// exponential backoff to be used throughout the bot +func Retry(fn func() error) error { + return backoff.NewBackoff(). + WithMaxIterations(maxRetries). + Retry(fn) +} + +// RetryTransaction takes a database object and a function acting on +// the database. The function will be run in a transaction on the +// database and will be retried as if executed using Retry +func RetryTransaction(db *gorm.DB, fn func(tx *gorm.DB) error) error { + return Retry(func() error { return db.Transaction(fn) }) +} diff --git a/internal/service/access/access.go b/internal/service/access/access.go index 5d8807f..dfd9560 100644 --- a/internal/service/access/access.go +++ b/internal/service/access/access.go @@ -7,7 +7,9 @@ import ( "gorm.io/gorm" "gorm.io/gorm/clause" + "github.com/Luzifer/go_helpers/v2/backoff" "github.com/Luzifer/go_helpers/v2/str" + "github.com/Luzifer/twitch-bot/v3/internal/helpers" "github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/pkg/twitch" ) @@ -51,9 +53,8 @@ func (s *Service) CopyDatabase(src, target *gorm.DB) error { return database.CopyObjects(src, target, &extendedPermission{}) } -func (s Service) GetBotUsername() (string, error) { - var botUsername string - err := s.db.ReadCoreMeta(coreMetaKeyBotUsername, &botUsername) +func (s Service) GetBotUsername() (botUsername string, err error) { + err = s.db.ReadCoreMeta(coreMetaKeyBotUsername, &botUsername) return botUsername, errors.Wrap(err, "reading bot username") } @@ -63,11 +64,15 @@ func (s Service) GetChannelPermissions(channel string) ([]string, error) { perm extendedPermission ) - if err = s.db.DB().First(&perm, "channel = ?", strings.TrimLeft(channel, "#")).Error; err != nil { + if err = helpers.Retry(func() error { + err = s.db.DB().First(&perm, "channel = ?", strings.TrimLeft(channel, "#")).Error if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, nil + return nil } - return nil, errors.Wrap(err, "getting twitch credential from database") + + return errors.Wrap(err, "getting twitch credential from database") + }); err != nil { + return nil, err } return strings.Split(perm.Scopes, " "), nil @@ -149,11 +154,14 @@ func (s Service) GetTwitchClientForChannel(channel string, cfg ClientConfig) (*t perm extendedPermission ) - if err = s.db.DB().First(&perm, "channel = ?", strings.TrimLeft(channel, "#")).Error; err != nil { + if err = helpers.Retry(func() error { + err = s.db.DB().First(&perm, "channel = ?", strings.TrimLeft(channel, "#")).Error if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, ErrChannelNotAuthorized + return backoff.NewErrCannotRetry(ErrChannelNotAuthorized) } - return nil, errors.Wrap(err, "getting twitch credential from database") + return errors.Wrap(err, "getting twitch credential from database") + }); err != nil { + return nil, err } if perm.AccessToken, err = s.db.DecryptField(perm.AccessToken); err != nil { @@ -204,13 +212,14 @@ func (s Service) HasPermissionsForChannel(channel string, scopes ...string) (boo return true, nil } -func (s Service) ListPermittedChannels() ([]string, error) { +func (s Service) ListPermittedChannels() (out []string, err error) { var perms []extendedPermission - if err := s.db.DB().Find(&perms).Error; err != nil { - return nil, errors.Wrap(err, "listing permissions") + if err = helpers.Retry(func() error { + return errors.Wrap(s.db.DB().Find(&perms).Error, "listing permissions") + }); err != nil { + return nil, err } - var out []string for _, perm := range perms { out = append(out, perm.Channel) } @@ -220,14 +229,18 @@ func (s Service) ListPermittedChannels() ([]string, error) { func (s Service) RemoveAllExtendedTwitchCredentials() error { return errors.Wrap( - s.db.DB().Delete(&extendedPermission{}, "1 = 1").Error, + helpers.RetryTransaction(s.db.DB(), func(tx *gorm.DB) error { + return tx.Delete(&extendedPermission{}, "1 = 1").Error + }), "deleting data from table", ) } func (s Service) RemoveExendedTwitchCredentials(channel string) error { return errors.Wrap( - s.db.DB().Delete(&extendedPermission{}, "channel = ?", strings.TrimLeft(channel, "#")).Error, + helpers.RetryTransaction(s.db.DB(), func(tx *gorm.DB) error { + return tx.Delete(&extendedPermission{}, "channel = ?", strings.TrimLeft(channel, "#")).Error + }), "deleting data from table", ) } @@ -249,15 +262,17 @@ func (s Service) SetExtendedTwitchCredentials(channel, accessToken, refreshToken } return errors.Wrap( - s.db.DB().Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "channel"}}, - DoUpdates: clause.AssignmentColumns([]string{"access_token", "refresh_token", "scopes"}), - }).Create(extendedPermission{ - Channel: strings.TrimLeft(channel, "#"), - AccessToken: accessToken, - RefreshToken: refreshToken, - Scopes: strings.Join(scope, " "), - }).Error, + helpers.RetryTransaction(s.db.DB(), func(tx *gorm.DB) error { + return tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "channel"}}, + DoUpdates: clause.AssignmentColumns([]string{"access_token", "refresh_token", "scopes"}), + }).Create(extendedPermission{ + Channel: strings.TrimLeft(channel, "#"), + AccessToken: accessToken, + RefreshToken: refreshToken, + Scopes: strings.Join(scope, " "), + }).Error + }), "inserting data into table", ) } diff --git a/internal/service/timer/timer.go b/internal/service/timer/timer.go index 5191117..7ecba83 100644 --- a/internal/service/timer/timer.go +++ b/internal/service/timer/timer.go @@ -12,6 +12,8 @@ import ( "gorm.io/gorm" "gorm.io/gorm/clause" + "github.com/Luzifer/go_helpers/v2/backoff" + "github.com/Luzifer/twitch-bot/v3/internal/helpers" "github.com/Luzifer/twitch-bot/v3/pkg/database" "github.com/Luzifer/twitch-bot/v3/plugins" ) @@ -88,7 +90,13 @@ func (Service) getPermitTimerKey(channel, username string) string { func (s Service) HasTimer(id string) (bool, error) { var t timer - err := s.db.DB().First(&t, "id = ? AND expires_at >= ?", id, time.Now().UTC()).Error + err := helpers.Retry(func() error { + err := s.db.DB().First(&t, "id = ? AND expires_at >= ?", id, time.Now().UTC()).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return backoff.NewErrCannotRetry(err) + } + return err + }) switch { case err == nil: return true, nil @@ -103,19 +111,23 @@ func (s Service) HasTimer(id string) (bool, error) { func (s Service) SetTimer(id string, expiry time.Time) error { return errors.Wrap( - s.db.DB().Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "id"}}, - DoUpdates: clause.AssignmentColumns([]string{"expires_at"}), - }).Create(timer{ - ID: id, - ExpiresAt: expiry.UTC(), - }).Error, + helpers.RetryTransaction(s.db.DB(), func(tx *gorm.DB) error { + return tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + DoUpdates: clause.AssignmentColumns([]string{"expires_at"}), + }).Create(timer{ + ID: id, + ExpiresAt: expiry.UTC(), + }).Error + }), "storing counter in database", ) } func (s Service) cleanupTimers() { - if err := s.db.DB().Delete(&timer{}, "expires_at < ?", time.Now().UTC()).Error; err != nil { + if err := helpers.RetryTransaction(s.db.DB(), func(tx *gorm.DB) error { + return tx.Delete(&timer{}, "expires_at < ?", time.Now().UTC()).Error + }); err != nil { logrus.WithError(err).Error("cleaning up expired timers") } } diff --git a/pkg/database/connector.go b/pkg/database/connector.go index d711c8e..b135d3f 100644 --- a/pkg/database/connector.go +++ b/pkg/database/connector.go @@ -89,7 +89,6 @@ func New(driverName, connString, encryptionSecret string) (Connector, error) { } func (c connector) Close() error { - // return errors.Wrap(c.db.Close(), "closing database") return nil } diff --git a/pkg/database/coreKV.go b/pkg/database/coreKV.go index 94e9018..526b3ac 100644 --- a/pkg/database/coreKV.go +++ b/pkg/database/coreKV.go @@ -13,6 +13,7 @@ import ( "gorm.io/gorm/clause" "github.com/Luzifer/go_helpers/v2/backoff" + "github.com/Luzifer/twitch-bot/v3/internal/helpers" ) const ( @@ -31,7 +32,9 @@ type ( // DeleteCoreMeta removes a core_kv table entry func (c connector) DeleteCoreMeta(key string) error { return errors.Wrap( - c.db.Delete(&coreKV{}, "name = ?", key).Error, + helpers.RetryTransaction(c.db, func(tx *gorm.DB) error { + return tx.Delete(&coreKV{}, "name = ?", key).Error + }), "deleting key from database", ) } @@ -61,7 +64,9 @@ func (c connector) ReadEncryptedCoreMeta(key string, value any) error { // ResetEncryptedCoreMeta removes all CoreKV entries from the database func (c connector) ResetEncryptedCoreMeta() error { return errors.Wrap( - c.db.Delete(&coreKV{}, "value LIKE ?", "U2FsdGVkX1%").Error, + helpers.RetryTransaction(c.db, func(tx *gorm.DB) error { + return tx.Delete(&coreKV{}, "value LIKE ?", "U2FsdGVkX1%").Error + }), "removing encrypted meta entries", ) } @@ -110,11 +115,14 @@ func (c connector) ValidateEncryption() error { func (c connector) readCoreMeta(key string, value any, processor func(string) (string, error)) (err error) { var data coreKV - if err = c.db.First(&data, "name = ?", key).Error; err != nil { + if err = helpers.Retry(func() error { + err = c.db.First(&data, "name = ?", key).Error if errors.Is(err, gorm.ErrRecordNotFound) { return ErrCoreMetaNotFound } return errors.Wrap(err, "querying core meta table") + }); err != nil { + return err } if data.Value == "" { @@ -149,10 +157,12 @@ func (c connector) storeCoreMeta(key string, value any, processor func(string) ( data := coreKV{Name: key, Value: encValue} return errors.Wrap( - c.db.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "name"}}, - DoUpdates: clause.AssignmentColumns([]string{"value"}), - }).Create(data).Error, + helpers.RetryTransaction(c.db, func(tx *gorm.DB) error { + return tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "name"}}, + DoUpdates: clause.AssignmentColumns([]string{"value"}), + }).Create(data).Error + }), "upserting core meta value", ) }