mirror of
https://github.com/Luzifer/twitch-bot.git
synced 2024-11-09 16:50:01 +00:00
[overlays] Add overlays server capability (#14)
This commit is contained in:
parent
a4b6036da0
commit
8a8347401e
13 changed files with 695 additions and 25 deletions
|
@ -83,4 +83,9 @@ func handleMessage(c *irc.Client, m *irc.Message, event *string, eventData *plug
|
||||||
r.SetCooldown(timerStore, m, eventData)
|
r.SetCooldown(timerStore, m, eventData)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send events to registered handlers
|
||||||
|
if event != nil {
|
||||||
|
notifyEventHandlers(*event, eventData)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
32
events.go
32
events.go
|
@ -1,7 +1,20 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/Luzifer/twitch-bot/plugins"
|
||||||
|
)
|
||||||
|
|
||||||
func ptrStr(s string) *string { return &s }
|
func ptrStr(s string) *string { return &s }
|
||||||
|
|
||||||
|
var (
|
||||||
|
registeredEventHandlers []plugins.EventHandlerFunc
|
||||||
|
registeredEventHandlersLock sync.Mutex
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
eventTypeBan = ptrStr("ban")
|
eventTypeBan = ptrStr("ban")
|
||||||
eventTypeBits = ptrStr("bits")
|
eventTypeBits = ptrStr("bits")
|
||||||
|
@ -51,3 +64,22 @@ var (
|
||||||
eventTypeTwitchTitleUpdate,
|
eventTypeTwitchTitleUpdate,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func notifyEventHandlers(event string, eventData *plugins.FieldCollection) {
|
||||||
|
registeredEventHandlersLock.Lock()
|
||||||
|
defer registeredEventHandlersLock.Unlock()
|
||||||
|
|
||||||
|
for _, fn := range registeredEventHandlers {
|
||||||
|
if err := fn(event, eventData); err != nil {
|
||||||
|
log.WithError(err).Error("EventHandler caused error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func registerEventHandlers(eh plugins.EventHandlerFunc) error {
|
||||||
|
registeredEventHandlersLock.Lock()
|
||||||
|
defer registeredEventHandlersLock.Unlock()
|
||||||
|
|
||||||
|
registeredEventHandlers = append(registeredEventHandlers, eh)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -8,6 +8,7 @@ require (
|
||||||
github.com/Luzifer/korvike/functions v0.6.1
|
github.com/Luzifer/korvike/functions v0.6.1
|
||||||
github.com/Luzifer/rconfig/v2 v2.3.0
|
github.com/Luzifer/rconfig/v2 v2.3.0
|
||||||
github.com/go-irc/irc v2.1.0+incompatible
|
github.com/go-irc/irc v2.1.0+incompatible
|
||||||
|
github.com/gofrs/uuid v4.2.0+incompatible
|
||||||
github.com/gofrs/uuid/v3 v3.1.2
|
github.com/gofrs/uuid/v3 v3.1.2
|
||||||
github.com/gorilla/mux v1.7.4
|
github.com/gorilla/mux v1.7.4
|
||||||
github.com/gorilla/websocket v1.4.2
|
github.com/gorilla/websocket v1.4.2
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -88,6 +88,8 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
|
||||||
github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
|
github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
|
||||||
github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
|
github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
|
||||||
github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
|
github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
|
||||||
|
github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZgBrnJfGa0=
|
||||||
|
github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
|
||||||
github.com/gofrs/uuid/v3 v3.1.2 h1:V3IBv1oU82x6YIr5txe3azVHgmOKYdyKQTowm9moBlY=
|
github.com/gofrs/uuid/v3 v3.1.2 h1:V3IBv1oU82x6YIr5txe3azVHgmOKYdyKQTowm9moBlY=
|
||||||
github.com/gofrs/uuid/v3 v3.1.2/go.mod h1:xPwMqoocQ1L5G6pXX5BcE7N5jlzn2o19oqAKxwZW/kI=
|
github.com/gofrs/uuid/v3 v3.1.2/go.mod h1:xPwMqoocQ1L5G6pXX5BcE7N5jlzn2o19oqAKxwZW/kI=
|
||||||
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||||
|
|
88
internal/apimodules/overlays/default/debug.html
Normal file
88
internal/apimodules/overlays/default/debug.html
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
<html>
|
||||||
|
<style>
|
||||||
|
[v-cloak] { display: none; }
|
||||||
|
html {
|
||||||
|
background-color: #333;
|
||||||
|
color: #fff;
|
||||||
|
font-family: monospace;
|
||||||
|
}
|
||||||
|
span.event {
|
||||||
|
background-color: #e3e3ff3f;
|
||||||
|
border-radius: 0.5rem;
|
||||||
|
display: inline-block;
|
||||||
|
margin-bottom: 0.5rem;
|
||||||
|
margin-right: 5px;
|
||||||
|
padding: 0.1rem 0.5rem;
|
||||||
|
white-space: pre;
|
||||||
|
}
|
||||||
|
table {
|
||||||
|
border-spacing: 10px;
|
||||||
|
margin: 0 auto;
|
||||||
|
max-width: 1200px;
|
||||||
|
}
|
||||||
|
td {
|
||||||
|
vertical-align: top;
|
||||||
|
}
|
||||||
|
th {
|
||||||
|
text-align: left;
|
||||||
|
}
|
||||||
|
</style>
|
||||||
|
|
||||||
|
<div id="app" v-cloak>
|
||||||
|
<table>
|
||||||
|
<tr><th>Time</th><th>Event</th><th>Fields</th></tr>
|
||||||
|
<tr v-for="event in events">
|
||||||
|
<td>{{ moment(event.time).format('YYYY-MM-DD HH:mm:ss') }}</td>
|
||||||
|
<td>{{ event.event }}</td>
|
||||||
|
<td>
|
||||||
|
<span
|
||||||
|
class="event"
|
||||||
|
v-for="field in formattedFields(event.fields)"
|
||||||
|
>{{ field }}</span>
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
|
</table>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<script src="https://cdn.jsdelivr.net/combine/npm/vue@2,npm/moment@2"></script>
|
||||||
|
<script type="module">
|
||||||
|
import EventClient from './eventclient.mjs'
|
||||||
|
|
||||||
|
new Vue({
|
||||||
|
computed: {
|
||||||
|
maxEventLen() {
|
||||||
|
return this.events
|
||||||
|
.map(evt => evt.event.length)
|
||||||
|
.reduce((ml, cl) => cl > ml ? cl : ml, 0)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
data: {
|
||||||
|
events: [],
|
||||||
|
},
|
||||||
|
|
||||||
|
el: '#app',
|
||||||
|
|
||||||
|
methods: {
|
||||||
|
formattedFields(fields) {
|
||||||
|
return Object.entries(fields).map(el => `${el[0]}="${el[1]}"`).sort()
|
||||||
|
},
|
||||||
|
moment,
|
||||||
|
},
|
||||||
|
|
||||||
|
mounted() {
|
||||||
|
new EventClient({
|
||||||
|
handlers: {
|
||||||
|
_: (evt, data, time, live) => {
|
||||||
|
this.events = [
|
||||||
|
{ event: evt, fields: data, time },
|
||||||
|
...this.events,
|
||||||
|
]
|
||||||
|
},
|
||||||
|
},
|
||||||
|
replay: true,
|
||||||
|
})
|
||||||
|
},
|
||||||
|
})
|
||||||
|
</script>
|
||||||
|
</html>
|
113
internal/apimodules/overlays/default/eventclient.mjs
Normal file
113
internal/apimodules/overlays/default/eventclient.mjs
Normal file
|
@ -0,0 +1,113 @@
|
||||||
|
/**
|
||||||
|
* Options to pass to the EventClient constructor
|
||||||
|
* @typedef {Object} EventClient~Options
|
||||||
|
* @prop {string} [channel] - Filter for specific channel events (format: `#channel`)
|
||||||
|
* @prop {Object} handlers - Map event types to callback functions `(event, fields, time, live) => {...}`
|
||||||
|
* @prop {boolean} replay - Request a replay at connect (requires channel to be set to a channel name)
|
||||||
|
* @prop {string} [token] - API access token to use to connect to the WebSocket
|
||||||
|
*/
|
||||||
|
|
||||||
|
const initialSocketBackoff = 500
|
||||||
|
const maxSocketBackoff = 10000
|
||||||
|
const socketBackoffMultiplier = 1.25
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @class EventClient abstracts the connection to the bot websocket for events
|
||||||
|
*/
|
||||||
|
export default class EventClient {
|
||||||
|
/**
|
||||||
|
* Creates, initializes and connects the EventClient
|
||||||
|
*
|
||||||
|
* @param {EventClient~Options} opts {@link EventClient~Options} for the EventClient
|
||||||
|
*/
|
||||||
|
constructor(opts) {
|
||||||
|
this.params = new URLSearchParams(window.location.hash.substr(1))
|
||||||
|
this.handlers = { ...opts.handlers || {} }
|
||||||
|
this.options = { ...opts }
|
||||||
|
|
||||||
|
this.token = this.paramOptionFallback('token')
|
||||||
|
if (!this.token) {
|
||||||
|
throw new Error('token for socket not present in hash or opts')
|
||||||
|
}
|
||||||
|
|
||||||
|
this.socketBackoff = initialSocketBackoff
|
||||||
|
|
||||||
|
this.connect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connects the EventClient to the socket
|
||||||
|
*
|
||||||
|
* @private
|
||||||
|
*/
|
||||||
|
connect() {
|
||||||
|
if (this.socket) {
|
||||||
|
this.socket.close()
|
||||||
|
this.socket = null
|
||||||
|
}
|
||||||
|
|
||||||
|
this.socket = new WebSocket(this.socketAddr())
|
||||||
|
|
||||||
|
this.socket.onclose = () => {
|
||||||
|
this.socketBackoff = Math.min(this.socketBackoff * socketBackoffMultiplier, maxSocketBackoff)
|
||||||
|
window.setTimeout(() => this.connect(), this.socketBackoff)
|
||||||
|
}
|
||||||
|
|
||||||
|
this.socket.onmessage = evt => {
|
||||||
|
const data = JSON.parse(evt.data)
|
||||||
|
|
||||||
|
if (data.type === '_auth') {
|
||||||
|
// Special handling for auth confirmation
|
||||||
|
this.socketBackoff = initialSocketBackoff
|
||||||
|
|
||||||
|
// Auth was confirmed, request replay if wanted by client
|
||||||
|
if (this.paramOptionFallback('replay', false) && this.paramOptionFallback('channel')) {
|
||||||
|
this.socket.send(JSON.stringify({
|
||||||
|
fields: { channel: this.paramOptionFallback('channel') },
|
||||||
|
type: '_replay',
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.paramOptionFallback('channel') && !data.fields?.channel?.match(this.paramOptionFallback('channel'))) {
|
||||||
|
// Channel filter is active and channel does not match
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const fn of [this.handlers[data.type], this.handlers._].filter(fn => fn)) {
|
||||||
|
fn(data.type, data.fields, new Date(data.time), data.is_live)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.socket.onopen = () => {
|
||||||
|
this.socket.send(JSON.stringify({
|
||||||
|
fields: { token: this.token },
|
||||||
|
type: '_auth',
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolves the given key through url hash parameters with fallback to constructor options
|
||||||
|
*
|
||||||
|
* @params {string} key The key to resolve
|
||||||
|
* @params {*=} fallback=null Fallback to return if neither params nor options contained that key
|
||||||
|
* @returns {*} Value of the key or null
|
||||||
|
*/
|
||||||
|
paramOptionFallback(key, fallback = null) {
|
||||||
|
return this.params.get(key) || this.options[key] || fallback
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Modifies the overlay address to the websocket address the bot listens to
|
||||||
|
*
|
||||||
|
* @private
|
||||||
|
* @returns {string} Websocket address in form ws://... or wss://...
|
||||||
|
*/
|
||||||
|
socketAddr() {
|
||||||
|
const base = window.location.href.substr(0, window.location.href.indexOf('/overlays/') + '/overlays/'.length)
|
||||||
|
return `${base.replace(/^http/, 'ws')}events.sock`
|
||||||
|
}
|
||||||
|
}
|
38
internal/apimodules/overlays/fsstack.go
Normal file
38
internal/apimodules/overlays/fsstack.go
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
package overlays
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/fs"
|
||||||
|
"net/http"
|
||||||
|
"path"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Compile-time assertion
|
||||||
|
var _ http.FileSystem = httpFSStack{}
|
||||||
|
|
||||||
|
type httpFSStack []http.FileSystem
|
||||||
|
|
||||||
|
func (h httpFSStack) Open(name string) (http.File, error) {
|
||||||
|
for _, fs := range h {
|
||||||
|
if f, err := fs.Open(name); err == nil {
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fs.ErrNotExist
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compile-time assertion
|
||||||
|
var _ http.FileSystem = prefixedFS{}
|
||||||
|
|
||||||
|
type prefixedFS struct {
|
||||||
|
originFS http.FileSystem
|
||||||
|
prefix string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPrefixedFS(prefix string, originFS http.FileSystem) *prefixedFS {
|
||||||
|
return &prefixedFS{originFS: originFS, prefix: prefix}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p prefixedFS) Open(name string) (http.File, error) {
|
||||||
|
return p.originFS.Open(path.Join(p.prefix, name))
|
||||||
|
}
|
348
internal/apimodules/overlays/overlays.go
Normal file
348
internal/apimodules/overlays/overlays.go
Normal file
|
@ -0,0 +1,348 @@
|
||||||
|
package overlays
|
||||||
|
|
||||||
|
import (
|
||||||
|
"embed"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gofrs/uuid"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/Luzifer/go_helpers/v2/str"
|
||||||
|
"github.com/Luzifer/twitch-bot/plugins"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
authTimeout = 10 * time.Second
|
||||||
|
bufferSizeByte = 1024
|
||||||
|
socketKeepAlive = 5 * time.Second
|
||||||
|
|
||||||
|
moduleUUID = "f9ca2b3a-baf6-45ea-a347-c626168665e8"
|
||||||
|
|
||||||
|
msgTypeRequestAuth = "_auth"
|
||||||
|
msgTypeRequestReplay = "_replay"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
storage struct {
|
||||||
|
ChannelEvents map[string][]socketMessage `json:"channel_events"`
|
||||||
|
|
||||||
|
lock sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
socketMessage struct {
|
||||||
|
IsLive bool `json:"is_live"`
|
||||||
|
Time time.Time `json:"time"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Fields *plugins.FieldCollection `json:"fields"`
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
//go:embed default/**
|
||||||
|
embeddedOverlays embed.FS
|
||||||
|
|
||||||
|
fsStack httpFSStack
|
||||||
|
|
||||||
|
ptrStringEmpty = func(v string) *string { return &v }("")
|
||||||
|
|
||||||
|
store plugins.StorageManager
|
||||||
|
storeExemption = []string{
|
||||||
|
"join", "part", // Those make no sense for replay
|
||||||
|
}
|
||||||
|
storedObject = newStorage()
|
||||||
|
|
||||||
|
subscribers = map[string]func(event string, eventData *plugins.FieldCollection){}
|
||||||
|
subscribersLock sync.RWMutex
|
||||||
|
|
||||||
|
upgrader = websocket.Upgrader{
|
||||||
|
ReadBufferSize: bufferSizeByte,
|
||||||
|
WriteBufferSize: bufferSizeByte,
|
||||||
|
}
|
||||||
|
|
||||||
|
validateToken plugins.ValidateTokenFunc
|
||||||
|
)
|
||||||
|
|
||||||
|
func Register(args plugins.RegistrationArguments) error {
|
||||||
|
store = args.GetStorageManager()
|
||||||
|
validateToken = args.ValidateToken
|
||||||
|
|
||||||
|
args.RegisterAPIRoute(plugins.HTTPRouteRegistrationArgs{
|
||||||
|
Description: "Websocket subscriber for bot events",
|
||||||
|
HandlerFunc: handleSocketSubscription,
|
||||||
|
Method: http.MethodGet,
|
||||||
|
Module: "overlays",
|
||||||
|
Name: "Websocket",
|
||||||
|
Path: "/events.sock",
|
||||||
|
ResponseType: plugins.HTTPRouteResponseTypeMultiple,
|
||||||
|
})
|
||||||
|
|
||||||
|
args.RegisterAPIRoute(plugins.HTTPRouteRegistrationArgs{
|
||||||
|
HandlerFunc: handleServeOverlayAsset,
|
||||||
|
IsPrefix: true,
|
||||||
|
Method: http.MethodGet,
|
||||||
|
Module: "overlays",
|
||||||
|
Path: "/",
|
||||||
|
ResponseType: plugins.HTTPRouteResponseTypeMultiple,
|
||||||
|
SkipDocumentation: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
args.RegisterEventHandler(func(event string, eventData *plugins.FieldCollection) error {
|
||||||
|
subscribersLock.RLock()
|
||||||
|
defer subscribersLock.RUnlock()
|
||||||
|
|
||||||
|
for _, fn := range subscribers {
|
||||||
|
fn(event, eventData)
|
||||||
|
}
|
||||||
|
|
||||||
|
if str.StringInSlice(event, storeExemption) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
storedObject.AddEvent(plugins.DeriveChannel(nil, eventData), socketMessage{
|
||||||
|
IsLive: false,
|
||||||
|
Time: time.Now(),
|
||||||
|
Type: event,
|
||||||
|
Fields: eventData,
|
||||||
|
})
|
||||||
|
|
||||||
|
return errors.Wrap(
|
||||||
|
store.SetModuleStore(moduleUUID, storedObject),
|
||||||
|
"storing events database",
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
fsStack = httpFSStack{
|
||||||
|
newPrefixedFS("default", http.FS(embeddedOverlays)),
|
||||||
|
}
|
||||||
|
|
||||||
|
overlaysDir := os.Getenv("OVERLAYS_DIR")
|
||||||
|
if ds, err := os.Stat(overlaysDir); err != nil || overlaysDir == "" || !ds.IsDir() {
|
||||||
|
log.WithField("dir", overlaysDir).Warn("Overlays dir not specified, no dir or non existent")
|
||||||
|
} else {
|
||||||
|
fsStack = append(httpFSStack{http.Dir(overlaysDir)}, fsStack...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.Wrap(
|
||||||
|
store.GetModuleStore(moduleUUID, storedObject),
|
||||||
|
"loading module storage",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleServeOverlayAsset(w http.ResponseWriter, r *http.Request) {
|
||||||
|
http.StripPrefix("/overlays", http.FileServer(fsStack)).ServeHTTP(w, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
//nolint:funlen,gocognit,gocyclo // Not split in order to keep the socket logic in one place
|
||||||
|
func handleSocketSubscription(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var (
|
||||||
|
connID = uuid.Must(uuid.NewV4()).String()
|
||||||
|
logger = log.WithField("conn_id", connID)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Upgrade connection to socket
|
||||||
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.WithError(err).Error("Unable to upgrade socket")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
var (
|
||||||
|
authTimeout = time.NewTimer(authTimeout)
|
||||||
|
connLock = new(sync.Mutex)
|
||||||
|
errC = make(chan error, 1)
|
||||||
|
isAuthorized bool
|
||||||
|
sendMsgC = make(chan socketMessage, 1)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Register listener
|
||||||
|
unsub := subscribeSocket(func(event string, eventData *plugins.FieldCollection) {
|
||||||
|
sendMsgC <- socketMessage{
|
||||||
|
IsLive: true,
|
||||||
|
Time: time.Now(),
|
||||||
|
Type: event,
|
||||||
|
Fields: eventData,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
defer unsub()
|
||||||
|
|
||||||
|
keepAlive := time.NewTicker(socketKeepAlive)
|
||||||
|
defer keepAlive.Stop()
|
||||||
|
go func() {
|
||||||
|
for range keepAlive.C {
|
||||||
|
connLock.Lock()
|
||||||
|
|
||||||
|
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||||
|
logger.WithError(err).Error("Unable to send ping message")
|
||||||
|
connLock.Unlock()
|
||||||
|
conn.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
connLock.Unlock()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// Handle socket
|
||||||
|
for {
|
||||||
|
messageType, p, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
errC <- errors.Wrap(err, "reading from socket")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch messageType {
|
||||||
|
case websocket.TextMessage:
|
||||||
|
// This is fine and expected
|
||||||
|
|
||||||
|
case websocket.BinaryMessage:
|
||||||
|
// Wat?
|
||||||
|
errC <- errors.New("binary message received")
|
||||||
|
return
|
||||||
|
|
||||||
|
case websocket.CloseMessage:
|
||||||
|
// They want to go? Fine, have it that way!
|
||||||
|
errC <- nil
|
||||||
|
return
|
||||||
|
|
||||||
|
default:
|
||||||
|
logger.Debugf("Got unhandled message from socket: %d", messageType)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var recvMsg socketMessage
|
||||||
|
if err = json.Unmarshal(p, &recvMsg); err != nil {
|
||||||
|
errC <- errors.Wrap(err, "decoding message")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !isAuthorized && recvMsg.Type != msgTypeRequestAuth {
|
||||||
|
// Socket is requesting stuff but is not authorized, we don't want them to be here!
|
||||||
|
errC <- nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch recvMsg.Type {
|
||||||
|
case msgTypeRequestAuth:
|
||||||
|
if err := validateToken(recvMsg.Fields.MustString("token", ptrStringEmpty), "overlays"); err != nil {
|
||||||
|
errC <- errors.Wrap(err, "validating auth token")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
authTimeout.Stop()
|
||||||
|
isAuthorized = true
|
||||||
|
sendMsgC <- socketMessage{
|
||||||
|
IsLive: true,
|
||||||
|
Time: time.Now(),
|
||||||
|
Type: msgTypeRequestAuth,
|
||||||
|
}
|
||||||
|
|
||||||
|
case msgTypeRequestReplay:
|
||||||
|
go func() {
|
||||||
|
for _, msg := range storedObject.GetChannelEvents(recvMsg.Fields.MustString("channel", ptrStringEmpty)) {
|
||||||
|
sendMsgC <- msg
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
default:
|
||||||
|
logger.WithField("type", recvMsg.Type).Warn("Got unexpected message type from frontend")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-authTimeout.C:
|
||||||
|
// Timeout was not stopped, no auth was done
|
||||||
|
logger.Warn("socket failed to auth")
|
||||||
|
return
|
||||||
|
|
||||||
|
case err := <-errC:
|
||||||
|
if err != nil {
|
||||||
|
logger.WithError(err).Error("Message processing caused error")
|
||||||
|
}
|
||||||
|
return // We use nil-error to close the connection
|
||||||
|
|
||||||
|
case msg := <-sendMsgC:
|
||||||
|
if !isAuthorized {
|
||||||
|
// Not authorized, we're silently dropping messages
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
connLock.Lock()
|
||||||
|
if err := conn.WriteJSON(msg); err != nil {
|
||||||
|
logger.WithError(err).Error("Unable to send socket message")
|
||||||
|
connLock.Unlock()
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
connLock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func subscribeSocket(fn func(event string, eventData *plugins.FieldCollection)) func() {
|
||||||
|
id := uuid.Must(uuid.NewV4()).String()
|
||||||
|
|
||||||
|
subscribersLock.Lock()
|
||||||
|
defer subscribersLock.Unlock()
|
||||||
|
|
||||||
|
subscribers[id] = fn
|
||||||
|
|
||||||
|
return func() { unsubscribeSocket(id) }
|
||||||
|
}
|
||||||
|
|
||||||
|
func unsubscribeSocket(id string) {
|
||||||
|
subscribersLock.Lock()
|
||||||
|
defer subscribersLock.Unlock()
|
||||||
|
|
||||||
|
delete(subscribers, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Storage
|
||||||
|
|
||||||
|
func newStorage() *storage {
|
||||||
|
return &storage{
|
||||||
|
ChannelEvents: make(map[string][]socketMessage),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *storage) AddEvent(channel string, evt socketMessage) {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
s.ChannelEvents[channel] = append(s.ChannelEvents[channel], evt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *storage) GetChannelEvents(channel string) []socketMessage {
|
||||||
|
s.lock.RLock()
|
||||||
|
defer s.lock.RUnlock()
|
||||||
|
|
||||||
|
return s.ChannelEvents[channel]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implement marshaller interfaces
|
||||||
|
func (s *storage) MarshalStoredObject() ([]byte, error) {
|
||||||
|
s.lock.RLock()
|
||||||
|
defer s.lock.RUnlock()
|
||||||
|
|
||||||
|
return json.Marshal(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *storage) UnmarshalStoredObject(data []byte) error {
|
||||||
|
if data == nil {
|
||||||
|
// No data set yet, don't try to unmarshal
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
return json.Unmarshal(data, s)
|
||||||
|
}
|
29
irc.go
29
irc.go
|
@ -242,11 +242,19 @@ func (i ircHandler) handleClearChat(m *irc.Message) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ircHandler) handleJoin(m *irc.Message) {
|
func (i ircHandler) handleJoin(m *irc.Message) {
|
||||||
go handleMessage(i.c, m, eventTypeJoin, nil)
|
fields := plugins.FieldCollectionFromData(map[string]interface{}{
|
||||||
|
"channel": i.getChannel(m), // Compatibility to plugins.DeriveChannel
|
||||||
|
"user": m.User, // Compatibility to plugins.DeriveUser
|
||||||
|
})
|
||||||
|
go handleMessage(i.c, m, eventTypeJoin, fields)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ircHandler) handlePart(m *irc.Message) {
|
func (i ircHandler) handlePart(m *irc.Message) {
|
||||||
go handleMessage(i.c, m, eventTypePart, nil)
|
fields := plugins.FieldCollectionFromData(map[string]interface{}{
|
||||||
|
"channel": i.getChannel(m), // Compatibility to plugins.DeriveChannel
|
||||||
|
"user": m.User, // Compatibility to plugins.DeriveUser
|
||||||
|
})
|
||||||
|
go handleMessage(i.c, m, eventTypePart, fields)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ircHandler) handlePermit(m *irc.Message) {
|
func (i ircHandler) handlePermit(m *irc.Message) {
|
||||||
|
@ -263,10 +271,17 @@ func (i ircHandler) handlePermit(m *irc.Message) {
|
||||||
|
|
||||||
username := msgParts[1]
|
username := msgParts[1]
|
||||||
|
|
||||||
log.WithField("user", username).Debug("Added permit")
|
fields := plugins.FieldCollectionFromData(map[string]interface{}{
|
||||||
|
"channel": i.getChannel(m), // Compatibility to plugins.DeriveChannel
|
||||||
|
"user": m.User, // Compatibility to plugins.DeriveUser
|
||||||
|
"username": username, // DEPRECATED but kept for comapatibility
|
||||||
|
"to": username,
|
||||||
|
})
|
||||||
|
|
||||||
|
log.WithFields(fields.Data()).Debug("Added permit")
|
||||||
timerStore.AddPermit(m.Params[0], username)
|
timerStore.AddPermit(m.Params[0], username)
|
||||||
|
|
||||||
go handleMessage(i.c, m, eventTypePermit, plugins.FieldCollectionFromData(map[string]interface{}{"username": username}))
|
go handleMessage(i.c, m, eventTypePermit, fields)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ircHandler) handleTwitchNotice(m *irc.Message) {
|
func (i ircHandler) handleTwitchNotice(m *irc.Message) {
|
||||||
|
@ -284,7 +299,11 @@ func (i ircHandler) handleTwitchNotice(m *irc.Message) {
|
||||||
case "host_success", "host_success_viewers":
|
case "host_success", "host_success_viewers":
|
||||||
log.WithField("trailing", m.Trailing()).Warn("Incoming host")
|
log.WithField("trailing", m.Trailing()).Warn("Incoming host")
|
||||||
|
|
||||||
go handleMessage(i.c, m, eventTypeHost, nil)
|
fields := plugins.FieldCollectionFromData(map[string]interface{}{
|
||||||
|
"channel": i.getChannel(m), // Compatibility to plugins.DeriveChannel
|
||||||
|
"user": m.User, // Compatibility to plugins.DeriveUser
|
||||||
|
})
|
||||||
|
go handleMessage(i.c, m, eventTypeHost, fields)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,9 @@ type (
|
||||||
|
|
||||||
CronRegistrationFunc func(spec string, cmd func()) (cron.EntryID, error)
|
CronRegistrationFunc func(spec string, cmd func()) (cron.EntryID, error)
|
||||||
|
|
||||||
|
EventHandlerFunc func(evt string, eventData *FieldCollection) error
|
||||||
|
EventHandlerRegisterFunc func(EventHandlerFunc) error
|
||||||
|
|
||||||
LoggerCreationFunc func(moduleName string) *log.Entry
|
LoggerCreationFunc func(moduleName string) *log.Entry
|
||||||
|
|
||||||
MsgFormatter func(tplString string, m *irc.Message, r *Rule, fields *FieldCollection) (string, error)
|
MsgFormatter func(tplString string, m *irc.Message, r *Rule, fields *FieldCollection) (string, error)
|
||||||
|
@ -60,12 +63,16 @@ type (
|
||||||
RegisterAPIRoute HTTPRouteRegistrationFunc
|
RegisterAPIRoute HTTPRouteRegistrationFunc
|
||||||
// RegisterCron is a method to register cron functions in the global cron instance
|
// RegisterCron is a method to register cron functions in the global cron instance
|
||||||
RegisterCron CronRegistrationFunc
|
RegisterCron CronRegistrationFunc
|
||||||
|
// RegisterEventHandler is a method to register a handler function receiving ALL events
|
||||||
|
RegisterEventHandler EventHandlerRegisterFunc
|
||||||
// RegisterRawMessageHandler is a method to register an handler to receive ALL messages received
|
// RegisterRawMessageHandler is a method to register an handler to receive ALL messages received
|
||||||
RegisterRawMessageHandler RawMessageHandlerRegisterFunc
|
RegisterRawMessageHandler RawMessageHandlerRegisterFunc
|
||||||
// RegisterTemplateFunction can be used to register a new template functions
|
// RegisterTemplateFunction can be used to register a new template functions
|
||||||
RegisterTemplateFunction TemplateFuncRegister
|
RegisterTemplateFunction TemplateFuncRegister
|
||||||
// SendMessage can be used to send a message not triggered by an event
|
// SendMessage can be used to send a message not triggered by an event
|
||||||
SendMessage SendMessageFunc
|
SendMessage SendMessageFunc
|
||||||
|
// ValidateToken offers a way to validate a token and determine whether it has permissions on a given module
|
||||||
|
ValidateToken ValidateTokenFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
SendMessageFunc func(*irc.Message) error
|
SendMessageFunc func(*irc.Message) error
|
||||||
|
@ -86,6 +93,8 @@ type (
|
||||||
|
|
||||||
TemplateFuncGetter func(*irc.Message, *Rule, *FieldCollection) interface{}
|
TemplateFuncGetter func(*irc.Message, *Rule, *FieldCollection) interface{}
|
||||||
TemplateFuncRegister func(name string, fg TemplateFuncGetter)
|
TemplateFuncRegister func(name string, fg TemplateFuncGetter)
|
||||||
|
|
||||||
|
ValidateTokenFunc func(token string, modules ...string) error
|
||||||
)
|
)
|
||||||
|
|
||||||
func GenericTemplateFunctionGetter(f interface{}) TemplateFuncGetter {
|
func GenericTemplateFunctionGetter(f interface{}) TemplateFuncGetter {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"github.com/Luzifer/twitch-bot/internal/actors/timeout"
|
"github.com/Luzifer/twitch-bot/internal/actors/timeout"
|
||||||
"github.com/Luzifer/twitch-bot/internal/actors/whisper"
|
"github.com/Luzifer/twitch-bot/internal/actors/whisper"
|
||||||
"github.com/Luzifer/twitch-bot/internal/apimodules/msgformat"
|
"github.com/Luzifer/twitch-bot/internal/apimodules/msgformat"
|
||||||
|
"github.com/Luzifer/twitch-bot/internal/apimodules/overlays"
|
||||||
"github.com/Luzifer/twitch-bot/internal/template/numeric"
|
"github.com/Luzifer/twitch-bot/internal/template/numeric"
|
||||||
"github.com/Luzifer/twitch-bot/internal/template/random"
|
"github.com/Luzifer/twitch-bot/internal/template/random"
|
||||||
"github.com/Luzifer/twitch-bot/plugins"
|
"github.com/Luzifer/twitch-bot/plugins"
|
||||||
|
@ -51,6 +52,7 @@ var (
|
||||||
|
|
||||||
// API-only modules
|
// API-only modules
|
||||||
msgformat.Register,
|
msgformat.Register,
|
||||||
|
overlays.Register,
|
||||||
}
|
}
|
||||||
knownModules []string
|
knownModules []string
|
||||||
)
|
)
|
||||||
|
@ -108,9 +110,11 @@ func getRegistrationArguments() plugins.RegistrationArguments {
|
||||||
RegisterActorDocumentation: registerActorDocumentation,
|
RegisterActorDocumentation: registerActorDocumentation,
|
||||||
RegisterAPIRoute: registerRoute,
|
RegisterAPIRoute: registerRoute,
|
||||||
RegisterCron: cronService.AddFunc,
|
RegisterCron: cronService.AddFunc,
|
||||||
|
RegisterEventHandler: registerEventHandlers,
|
||||||
RegisterRawMessageHandler: registerRawMessageHandler,
|
RegisterRawMessageHandler: registerRawMessageHandler,
|
||||||
RegisterTemplateFunction: tplFuncs.Register,
|
RegisterTemplateFunction: tplFuncs.Register,
|
||||||
SendMessage: sendMessage,
|
SendMessage: sendMessage,
|
||||||
|
ValidateToken: validateAuthToken,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -84,7 +84,7 @@ User joined the channel-chat. This is **NOT** an indicator they are viewing, the
|
||||||
Fields:
|
Fields:
|
||||||
|
|
||||||
- `channel` - The channel the event occurred in
|
- `channel` - The channel the event occurred in
|
||||||
- `username` - The login-name of the user who joined
|
- `user` - The login-name of the user who joined
|
||||||
|
|
||||||
## `part`
|
## `part`
|
||||||
|
|
||||||
|
@ -93,7 +93,7 @@ User left the channel-chat. This is **NOT** an indicator they are no longer view
|
||||||
Fields:
|
Fields:
|
||||||
|
|
||||||
- `channel` - The channel the event occurred in
|
- `channel` - The channel the event occurred in
|
||||||
- `username` - The login-name of the user who left
|
- `user` - The login-name of the user who left
|
||||||
|
|
||||||
## `permit`
|
## `permit`
|
||||||
|
|
||||||
|
@ -102,7 +102,8 @@ User received a permit, which means they are no longer affected by rules which a
|
||||||
Fields:
|
Fields:
|
||||||
|
|
||||||
- `channel` - The channel the event occurred in
|
- `channel` - The channel the event occurred in
|
||||||
- `username` - The login-name of the user
|
- `user` - The login-name of the user who **gave** the permit
|
||||||
|
- `to` - The username who got the permit
|
||||||
|
|
||||||
## `raid`
|
## `raid`
|
||||||
|
|
||||||
|
|
22
writeAuth.go
22
writeAuth.go
|
@ -33,6 +33,16 @@ func writeAuthMiddleware(h http.Handler, module string) http.Handler {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := validateAuthToken(token, module); err != nil {
|
||||||
|
http.Error(w, "auth not successful", http.StatusForbidden)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.ServeHTTP(w, r)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateAuthToken(token string, modules ...string) error {
|
||||||
for _, auth := range config.AuthTokens {
|
for _, auth := range config.AuthTokens {
|
||||||
rawHash, err := hex.DecodeString(auth.Hash)
|
rawHash, err := hex.DecodeString(auth.Hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -44,14 +54,14 @@ func writeAuthMiddleware(h http.Handler, module string) http.Handler {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if !str.StringInSlice(module, auth.Modules) && !str.StringInSlice("*", auth.Modules) {
|
for _, reqMod := range modules {
|
||||||
continue
|
if !str.StringInSlice(reqMod, auth.Modules) && !str.StringInSlice("*", auth.Modules) {
|
||||||
|
return errors.New("missing module in auth")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
h.ServeHTTP(w, r)
|
return nil // We found a matching token and it has all required tokens
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
http.Error(w, "auth not successful", http.StatusForbidden)
|
return errors.New("no matching token")
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue