Add storing / replaying events

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2021-05-09 18:39:30 +02:00
parent 0d0e1ca3e5
commit e0117c7cd1
Signed by: luzifer
GPG key ID: 0066F03ED215AD7D
7 changed files with 82 additions and 26 deletions

76
api.go
View file

@ -28,12 +28,16 @@ const (
msgTypeStore string = "store" msgTypeStore string = "store"
msgTypeSub string = "sub" msgTypeSub string = "sub"
msgTypeSubGift string = "subgift" msgTypeSubGift string = "subgift"
msgTypeReplay string = "replay"
) )
var subscriptions = newSubscriptionStore() var subscriptions = newSubscriptionStore()
type socketMessage struct { type socketMessage struct {
Payload interface{} `json:"payload"` Payload interface{} `json:"payload"`
Replay bool `json:"replay"`
Time *time.Time `json:"time,omitempty"`
Type string `json:"type"` Type string `json:"type"`
Version string `json:"version"` Version string `json:"version"`
} }
@ -50,19 +54,40 @@ func newSubscriptionStore() *subcriptionStore {
} }
} }
func (s subcriptionStore) SendAllSockets(msgType string, msg interface{}) error { func (s subcriptionStore) SendAllSockets(msgType string, msg interface{}, replay, storeEvent bool) error {
s.socketSubscriptionsLock.RLock() s.socketSubscriptionsLock.RLock()
defer s.socketSubscriptionsLock.RUnlock() defer s.socketSubscriptionsLock.RUnlock()
for _, hdl := range s.socketSubscriptions { for _, hdl := range s.socketSubscriptions {
if err := hdl(compileSocketMessage(msgType, msg)); err != nil { if err := hdl(compileSocketMessage(msgType, msg, replay, nil)); err != nil {
return errors.Wrap(err, "submit message") return errors.Wrap(err, "submit message")
} }
} }
if replay || !storeEvent {
return nil return nil
} }
if err := store.WithModLock(func() error {
data, err := json.Marshal(msg)
if err != nil {
return errors.Wrap(err, "marshalling message")
}
store.Events = append(store.Events, storedEvent{
Time: time.Now(),
Type: msgType,
Message: data,
})
return nil
}); err != nil {
return errors.Wrap(err, "storing event")
}
return errors.Wrap(store.Save(cfg.StoreFile), "saving store")
}
func (s *subcriptionStore) SubscribeSocket(id string, hdl func(socketMessage) error) { func (s *subcriptionStore) SubscribeSocket(id string, hdl func(socketMessage) error) {
s.socketSubscriptionsLock.Lock() s.socketSubscriptionsLock.Lock()
defer s.socketSubscriptionsLock.Unlock() defer s.socketSubscriptionsLock.Unlock()
@ -77,7 +102,7 @@ func (s *subcriptionStore) UnsubscribeSocket(id string) {
delete(s.socketSubscriptions, id) delete(s.socketSubscriptions, id)
} }
func compileSocketMessage(msgType string, msg interface{}) socketMessage { func compileSocketMessage(msgType string, msg interface{}, replay bool, overrideTime *time.Time) socketMessage {
versionParts := []string{version} versionParts := []string{version}
for _, asset := range assetVersions.Keys() { for _, asset := range assetVersions.Keys() {
versionParts = append(versionParts, assetVersions.Get(asset)) versionParts = append(versionParts, assetVersions.Get(asset))
@ -88,11 +113,18 @@ func compileSocketMessage(msgType string, msg interface{}) socketMessage {
ver := fmt.Sprintf("%x", hash.Sum(nil)) ver := fmt.Sprintf("%x", hash.Sum(nil))
return socketMessage{ out := socketMessage{
Payload: msg, Payload: msg,
Replay: replay,
Type: msgType, Type: msgType,
Version: ver, Version: ver,
} }
if overrideTime != nil {
out.Time = overrideTime
}
return out
} }
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
@ -128,7 +160,7 @@ func handleCustomAlert(w http.ResponseWriter, r *http.Request) {
return return
} }
if err := subscriptions.SendAllSockets(msgTypeAlert, alert); err != nil { if err := subscriptions.SendAllSockets(msgTypeAlert, alert, false, true); err != nil {
http.Error(w, errors.Wrap(err, "send to sockets").Error(), http.StatusInternalServerError) http.Error(w, errors.Wrap(err, "send to sockets").Error(), http.StatusInternalServerError)
return return
} }
@ -146,7 +178,7 @@ func handleCustomEvent(w http.ResponseWriter, r *http.Request) {
return return
} }
if err := subscriptions.SendAllSockets(msgTypeCustom, event); err != nil { if err := subscriptions.SendAllSockets(msgTypeCustom, event, false, true); err != nil {
http.Error(w, errors.Wrap(err, "send to sockets").Error(), http.StatusInternalServerError) http.Error(w, errors.Wrap(err, "send to sockets").Error(), http.StatusInternalServerError)
return return
} }
@ -167,7 +199,7 @@ func handleSetLastFollower(w http.ResponseWriter, r *http.Request) {
log.WithError(err).Error("Unable to update persistent store") log.WithError(err).Error("Unable to update persistent store")
} }
if err := subscriptions.SendAllSockets(msgTypeStore, store); err != nil { if err := subscriptions.SendAllSockets(msgTypeStore, store, false, false); err != nil {
log.WithError(err).Error("Unable to send update to all sockets") log.WithError(err).Error("Unable to send update to all sockets")
} }
@ -214,7 +246,7 @@ func handleUpdateSocket(w http.ResponseWriter, r *http.Request) {
}() }()
connLock.Lock() connLock.Lock()
if err := conn.WriteJSON(compileSocketMessage(msgTypeStore, store)); err != nil { if err := conn.WriteJSON(compileSocketMessage(msgTypeStore, store, false, nil)); err != nil {
log.WithError(err).Error("Unable to send initial state") log.WithError(err).Error("Unable to send initial state")
return return
} }
@ -246,7 +278,31 @@ func handleUpdateSocket(w http.ResponseWriter, r *http.Request) {
continue continue
} }
// FIXME: Do we need this? var recvMsg socketMessage
_ = p if err = json.Unmarshal(p, &recvMsg); err != nil {
log.Warn("Got unreadable message from socket, disconnecting...")
return
}
switch recvMsg.Type {
case msgTypeReplay:
if err = store.WithModRLock(func() error {
connLock.Lock()
defer connLock.Unlock()
for _, evt := range store.Events {
if err := conn.WriteJSON(compileSocketMessage(evt.Type, evt.Message, true, &evt.Time)); err != nil {
return errors.Wrap(err, "sending replay message")
}
}
return nil
}); err != nil {
log.WithError(err).Error("Unable to replay messages")
}
default:
log.WithField("type", recvMsg.Type).Warn("Got unexpected message type from frontend")
}
} }
} }

View file

@ -87,7 +87,7 @@ func handleDemoAlert(w http.ResponseWriter, r *http.Request) {
return return
} }
if err := subscriptions.SendAllSockets(event, data); err != nil { if err := subscriptions.SendAllSockets(event, data, false, false); err != nil {
http.Error(w, errors.Wrap(err, "send to sockets").Error(), http.StatusInternalServerError) http.Error(w, errors.Wrap(err, "send to sockets").Error(), http.StatusInternalServerError)
return return
} }

16
irc.go
View file

@ -176,7 +176,7 @@ func (ircHandler) handleTwitchPrivmsg(m *irc.Message) {
subscriptions.SendAllSockets(msgTypeHost, map[string]interface{}{ subscriptions.SendAllSockets(msgTypeHost, map[string]interface{}{
"from": matches[1], "from": matches[1],
"viewerCount": matches[2], "viewerCount": matches[2],
}) }, false, true)
} }
// Handle bit-messages // Handle bit-messages
@ -213,14 +213,14 @@ func (ircHandler) handleTwitchPrivmsg(m *irc.Message) {
// Send update to sockets // Send update to sockets
log.WithFields(log.Fields(fields)).Info("Bit donation") log.WithFields(log.Fields(fields)).Info("Bit donation")
subscriptions.SendAllSockets(msgTypeBits, fields) subscriptions.SendAllSockets(msgTypeBits, fields, false, true)
// Execute store save // Execute store save
if err := store.Save(cfg.StoreFile); err != nil { if err := store.Save(cfg.StoreFile); err != nil {
log.WithError(err).Error("Unable to update persistent store") log.WithError(err).Error("Unable to update persistent store")
} }
if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store) }); err != nil { if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store, false, false) }); err != nil {
log.WithError(err).Error("Unable to send update to all sockets") log.WithError(err).Error("Unable to send update to all sockets")
} }
} }
@ -249,7 +249,7 @@ func (ircHandler) handleTwitchUsernotice(m *irc.Message) {
} }
log.WithFields(log.Fields(fields)).Info("Incoming raid") log.WithFields(log.Fields(fields)).Info("Incoming raid")
subscriptions.SendAllSockets(msgTypeRaid, fields) subscriptions.SendAllSockets(msgTypeRaid, fields, false, true)
case "sub", "resub": case "sub", "resub":
fields := map[string]interface{}{ fields := map[string]interface{}{
@ -281,14 +281,14 @@ func (ircHandler) handleTwitchUsernotice(m *irc.Message) {
// Send update to sockets // Send update to sockets
log.WithFields(log.Fields(fields)).Info("New subscriber") log.WithFields(log.Fields(fields)).Info("New subscriber")
subscriptions.SendAllSockets(msgTypeSub, fields) subscriptions.SendAllSockets(msgTypeSub, fields, false, true)
// Execute store save // Execute store save
if err := store.Save(cfg.StoreFile); err != nil { if err := store.Save(cfg.StoreFile); err != nil {
log.WithError(err).Error("Unable to update persistent store") log.WithError(err).Error("Unable to update persistent store")
} }
if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store) }); err != nil { if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store, false, false) }); err != nil {
log.WithError(err).Error("Unable to send update to all sockets") log.WithError(err).Error("Unable to send update to all sockets")
} }
@ -328,14 +328,14 @@ func (ircHandler) handleTwitchUsernotice(m *irc.Message) {
// Send update to sockets // Send update to sockets
log.WithFields(log.Fields(fields)).Info("New sub-gift") log.WithFields(log.Fields(fields)).Info("New sub-gift")
subscriptions.SendAllSockets(msgTypeSubGift, fields) subscriptions.SendAllSockets(msgTypeSubGift, fields, false, true)
// Execute store save // Execute store save
if err := store.Save(cfg.StoreFile); err != nil { if err := store.Save(cfg.StoreFile); err != nil {
log.WithError(err).Error("Unable to update persistent store") log.WithError(err).Error("Unable to update persistent store")
} }
if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store) }); err != nil { if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store, false, false) }); err != nil {
log.WithError(err).Error("Unable to send update to all sockets") log.WithError(err).Error("Unable to send update to all sockets")
} }

View file

@ -135,7 +135,7 @@ func main() {
} }
case <-timerForceSync.C: case <-timerForceSync.C:
if err := subscriptions.SendAllSockets(msgTypeStore, store); err != nil { if err := subscriptions.SendAllSockets(msgTypeStore, store, false, false); err != nil {
log.WithError(err).Error("Unable to send store to all sockets") log.WithError(err).Error("Unable to send store to all sockets")
} }

View file

@ -18,7 +18,7 @@ func updateStats() error {
for _, fn := range []func() error{ for _, fn := range []func() error{
updateFollowers, updateFollowers,
updateSubscriberCount, updateSubscriberCount,
func() error { return subscriptions.SendAllSockets(msgTypeStore, store) }, func() error { return subscriptions.SendAllSockets(msgTypeStore, store, false, false) },
} { } {
if err := fn(); err != nil { if err := fn(); err != nil {
return errors.Wrap(err, "update statistics module") return errors.Wrap(err, "update statistics module")

View file

@ -21,7 +21,7 @@ type subscriber struct {
type storedEvent struct { type storedEvent struct {
Time time.Time Time time.Time
Type string Type string
Message map[string]interface{} Message json.RawMessage
} }
type storage struct { type storage struct {

View file

@ -81,7 +81,7 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) {
"message": payload.Message, "message": payload.Message,
} }
if err := subscriptions.SendAllSockets(msgTypeDonation, fields); err != nil { if err := subscriptions.SendAllSockets(msgTypeDonation, fields, false, true); err != nil {
log.WithError(err).Error("Unable to send update to all sockets") log.WithError(err).Error("Unable to send update to all sockets")
} }
@ -124,7 +124,7 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) {
"followed_at": f.FollowedAt, "followed_at": f.FollowedAt,
} }
if err := subscriptions.SendAllSockets(msgTypeFollow, fields); err != nil { if err := subscriptions.SendAllSockets(msgTypeFollow, fields, false, true); err != nil {
log.WithError(err).Error("Unable to send update to all sockets") log.WithError(err).Error("Unable to send update to all sockets")
} }
@ -147,7 +147,7 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) {
logger.WithError(err).Error("Unable to update persistent store") logger.WithError(err).Error("Unable to update persistent store")
} }
if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store) }); err != nil { if err := store.WithModRLock(func() error { return subscriptions.SendAllSockets(msgTypeStore, store, false, false) }); err != nil {
logger.WithError(err).Error("Unable to send update to all sockets") logger.WithError(err).Error("Unable to send update to all sockets")
} }
} }