twitch-manager/api.go

194 lines
4.3 KiB
Go
Raw Normal View History

2020-11-20 21:51:10 +00:00
package main
import (
2020-11-20 23:33:02 +00:00
"crypto/sha256"
"fmt"
2020-11-20 21:51:10 +00:00
"net/http"
2020-11-20 23:33:02 +00:00
"strings"
2020-11-20 21:51:10 +00:00
"sync"
"time"
"github.com/gofrs/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)
2020-11-23 08:54:02 +00:00
const msgTypeStore string = "store"
2020-11-20 23:01:55 +00:00
2020-11-23 08:48:53 +00:00
var subscriptions = newSubscriptionStore()
2020-11-23 08:54:02 +00:00
type socketMessage struct {
Payload interface{} `json:"payload"`
Type string `json:"type"`
Version string `json:"version"`
}
2020-11-23 08:48:53 +00:00
type subcriptionStore struct {
2020-11-23 08:54:02 +00:00
socketSubscriptions map[string]func(socketMessage) error
2020-11-23 08:48:53 +00:00
socketSubscriptionsLock *sync.RWMutex
}
func newSubscriptionStore() *subcriptionStore {
return &subcriptionStore{
2020-11-23 08:54:02 +00:00
socketSubscriptions: map[string]func(socketMessage) error{},
2020-11-23 08:48:53 +00:00
socketSubscriptionsLock: new(sync.RWMutex),
}
}
2020-11-20 21:51:10 +00:00
2020-11-23 08:48:53 +00:00
func (s subcriptionStore) SendAllSockets(msgType string, msg interface{}) error {
s.socketSubscriptionsLock.RLock()
defer s.socketSubscriptionsLock.RUnlock()
2020-11-20 21:51:10 +00:00
2020-11-23 08:48:53 +00:00
for _, hdl := range s.socketSubscriptions {
2020-11-20 23:33:02 +00:00
if err := hdl(compileSocketMessage(msgType, msg)); err != nil {
2020-11-20 21:51:10 +00:00
return errors.Wrap(err, "submit message")
}
}
return nil
}
2020-11-23 08:54:02 +00:00
func (s *subcriptionStore) SubscribeSocket(id string, hdl func(socketMessage) error) {
2020-11-23 08:48:53 +00:00
s.socketSubscriptionsLock.Lock()
defer s.socketSubscriptionsLock.Unlock()
2020-11-20 21:51:10 +00:00
2020-11-23 08:48:53 +00:00
s.socketSubscriptions[id] = hdl
2020-11-20 21:51:10 +00:00
}
2020-11-23 08:48:53 +00:00
func (s *subcriptionStore) UnsubscribeSocket(id string) {
s.socketSubscriptionsLock.Lock()
defer s.socketSubscriptionsLock.Unlock()
2020-11-20 21:51:10 +00:00
2020-11-23 08:48:53 +00:00
delete(s.socketSubscriptions, id)
2020-11-20 21:51:10 +00:00
}
2020-11-23 08:54:02 +00:00
func compileSocketMessage(msgType string, msg interface{}) socketMessage {
2020-11-20 23:33:02 +00:00
versionParts := []string{version}
2020-11-23 09:57:32 +00:00
for _, asset := range assetVersions.Keys() {
versionParts = append(versionParts, assetVersions.Get(asset))
2020-11-20 23:33:02 +00:00
}
hash := sha256.New()
hash.Write([]byte(strings.Join(versionParts, "/")))
ver := fmt.Sprintf("%x", hash.Sum(nil))
2020-11-23 08:54:02 +00:00
return socketMessage{
Payload: msg,
Type: msgType,
Version: ver,
2020-11-20 23:33:02 +00:00
}
}
2020-11-20 21:51:10 +00:00
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
func registerAPI(r *mux.Router) {
r.HandleFunc("/api/follows/clear-last", handleSetLastFollower)
r.HandleFunc("/api/follows/set-last/{name}", handleSetLastFollower)
2020-11-20 21:51:10 +00:00
r.HandleFunc("/api/subscribe", handleUpdateSocket)
r.HandleFunc("/api/webhook/{type}", handleWebHookPush)
}
func handleSetLastFollower(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
if name == "" {
store.Followers.Last = nil
} else {
store.Followers.Last = &name
}
2020-11-23 09:02:08 +00:00
if err := store.Save(cfg.StoreFile); err != nil {
log.WithError(err).Error("Unable to update persistent store")
}
if err := subscriptions.SendAllSockets(msgTypeStore, store); err != nil {
log.WithError(err).Error("Unable to send update to all sockets")
}
w.WriteHeader(http.StatusAccepted)
}
2020-11-20 21:51:10 +00:00
func handleUpdateSocket(w http.ResponseWriter, r *http.Request) {
// Upgrade connection to socket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.WithError(err).Error("Unable to upgrade socket")
return
}
defer conn.Close()
// Register listener
2020-11-22 07:45:33 +00:00
var (
connLock = new(sync.Mutex)
id = uuid.Must(uuid.NewV4()).String()
)
2020-11-23 08:54:02 +00:00
subscriptions.SubscribeSocket(id, func(msg socketMessage) error {
2020-11-22 07:45:33 +00:00
connLock.Lock()
defer connLock.Unlock()
return conn.WriteJSON(msg)
})
2020-11-23 08:48:53 +00:00
defer subscriptions.UnsubscribeSocket(id)
2020-11-20 21:51:10 +00:00
keepAlive := time.NewTicker(5 * time.Second)
defer keepAlive.Stop()
go func() {
for range keepAlive.C {
2020-11-22 07:45:33 +00:00
connLock.Lock()
2020-11-20 21:51:10 +00:00
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.WithError(err).Error("Unable to send ping message")
2020-11-22 07:45:33 +00:00
connLock.Unlock()
2020-11-20 21:51:10 +00:00
conn.Close()
2020-11-22 07:45:33 +00:00
return
2020-11-20 21:51:10 +00:00
}
2020-11-22 07:45:33 +00:00
connLock.Unlock()
2020-11-20 21:51:10 +00:00
}
}()
2020-11-22 07:45:33 +00:00
connLock.Lock()
2020-11-20 23:33:02 +00:00
if err := conn.WriteJSON(compileSocketMessage(msgTypeStore, store)); err != nil {
2020-11-20 21:51:10 +00:00
log.WithError(err).Error("Unable to send initial state")
return
}
2020-11-22 07:45:33 +00:00
connLock.Unlock()
2020-11-20 21:51:10 +00:00
// Handle socket
for {
messageType, p, err := conn.ReadMessage()
if err != nil {
log.WithError(err).Error("Unable to read from socket")
return
}
switch messageType {
case websocket.TextMessage:
// This is fine and expected
case websocket.BinaryMessage:
// Wat?
log.Warn("Got binary message from socket, disconnecting...")
return
case websocket.CloseMessage:
// They want to go? Fine, have it that way!
return
default:
log.Debug("Got unhandled message from socket: %d", messageType)
continue
}
// FIXME: Do we need this?
_ = p
}
}