Encapsulate socket subscriptions

This commit is contained in:
Knut Ahlers 2020-11-23 09:48:53 +01:00
parent dce43ffd99
commit c902fbe230
Signed by: luzifer
GPG key ID: 0066F03ED215AD7D
4 changed files with 30 additions and 21 deletions

45
api.go
View file

@ -20,16 +20,25 @@ const (
msgTypeStore string = "store" msgTypeStore string = "store"
) )
var ( var subscriptions = newSubscriptionStore()
socketSubscriptions = map[string]func(msg interface{}) error{}
socketSubscriptionsLock = new(sync.RWMutex)
)
func sendAllSockets(msgType string, msg interface{}) error { type subcriptionStore struct {
socketSubscriptionsLock.RLock() socketSubscriptions map[string]func(interface{}) error
defer socketSubscriptionsLock.RUnlock() socketSubscriptionsLock *sync.RWMutex
}
for _, hdl := range socketSubscriptions { func newSubscriptionStore() *subcriptionStore {
return &subcriptionStore{
socketSubscriptions: map[string]func(msg interface{}) error{},
socketSubscriptionsLock: new(sync.RWMutex),
}
}
func (s subcriptionStore) SendAllSockets(msgType string, msg interface{}) error {
s.socketSubscriptionsLock.RLock()
defer s.socketSubscriptionsLock.RUnlock()
for _, hdl := range s.socketSubscriptions {
if err := hdl(compileSocketMessage(msgType, msg)); err != nil { if err := hdl(compileSocketMessage(msgType, msg)); err != nil {
return errors.Wrap(err, "submit message") return errors.Wrap(err, "submit message")
} }
@ -38,18 +47,18 @@ func sendAllSockets(msgType string, msg interface{}) error {
return nil return nil
} }
func subscribeSocket(id string, hdl func(interface{}) error) { func (s *subcriptionStore) SubscribeSocket(id string, hdl func(interface{}) error) {
socketSubscriptionsLock.Lock() s.socketSubscriptionsLock.Lock()
defer socketSubscriptionsLock.Unlock() defer s.socketSubscriptionsLock.Unlock()
socketSubscriptions[id] = hdl s.socketSubscriptions[id] = hdl
} }
func unsubscribeSocket(id string) { func (s *subcriptionStore) UnsubscribeSocket(id string) {
socketSubscriptionsLock.Lock() s.socketSubscriptionsLock.Lock()
defer socketSubscriptionsLock.Unlock() defer s.socketSubscriptionsLock.Unlock()
delete(socketSubscriptions, id) delete(s.socketSubscriptions, id)
} }
func compileSocketMessage(msgType string, msg interface{}) map[string]interface{} { func compileSocketMessage(msgType string, msg interface{}) map[string]interface{} {
@ -97,13 +106,13 @@ func handleUpdateSocket(w http.ResponseWriter, r *http.Request) {
connLock = new(sync.Mutex) connLock = new(sync.Mutex)
id = uuid.Must(uuid.NewV4()).String() id = uuid.Must(uuid.NewV4()).String()
) )
subscribeSocket(id, func(msg interface{}) error { subscriptions.SubscribeSocket(id, func(msg interface{}) error {
connLock.Lock() connLock.Lock()
defer connLock.Unlock() defer connLock.Unlock()
return conn.WriteJSON(msg) return conn.WriteJSON(msg)
}) })
defer unsubscribeSocket(id) defer subscriptions.UnsubscribeSocket(id)
keepAlive := time.NewTicker(5 * time.Second) keepAlive := time.NewTicker(5 * time.Second)
defer keepAlive.Stop() defer keepAlive.Stop()

View file

@ -110,7 +110,7 @@ func main() {
} }
case <-timerForceSync.C: case <-timerForceSync.C:
if err := sendAllSockets(msgTypeStore, store); err != nil { if err := subscriptions.SendAllSockets(msgTypeStore, store); err != nil {
log.WithError(err).Error("Unable to send store to all sockets") log.WithError(err).Error("Unable to send store to all sockets")
} }

View file

@ -77,7 +77,7 @@ func updateFollowers() error {
} }
return errors.Wrap( return errors.Wrap(
sendAllSockets(msgTypeStore, store), subscriptions.SendAllSockets(msgTypeStore, store),
"update all sockets", "update all sockets",
) )
} }

View file

@ -98,7 +98,7 @@ func handleWebHookPush(w http.ResponseWriter, r *http.Request) {
logger.WithError(err).Error("Unable to update persistent store") logger.WithError(err).Error("Unable to update persistent store")
} }
if err := sendAllSockets(msgTypeStore, store); err != nil { if err := subscriptions.SendAllSockets(msgTypeStore, store); err != nil {
logger.WithError(err).Error("Unable to send update to all sockets") logger.WithError(err).Error("Unable to send update to all sockets")
} }
} }