Initial version

This commit is contained in:
Knut Ahlers 2022-05-21 15:21:51 +02:00
commit c47c80a974
Signed by: luzifer
GPG Key ID: 0066F03ED215AD7D
8 changed files with 314 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
ws-relay

32
Dockerfile Normal file
View File

@ -0,0 +1,32 @@
FROM golang:alpine as builder
COPY . /go/src/github.com/Luzifer/ws-relay
WORKDIR /go/src/github.com/Luzifer/ws-relay
RUN set -ex \
&& apk add --update \
build-base \
git \
&& go install \
-ldflags "-X main.version=$(git describe --tags --always || echo dev)" \
-mod=readonly \
-modcacherw \
-trimpath
FROM alpine:latest
LABEL maintainer "Knut Ahlers <knut@ahlers.me>"
RUN set -ex \
&& apk --no-cache add \
ca-certificates
COPY --from=builder /go/bin/ws-relay /usr/local/bin/ws-relay
EXPOSE 3000
ENTRYPOINT ["/usr/local/bin/ws-relay"]
CMD ["--"]
# vim: set ft=Dockerfile:

9
README.md Normal file
View File

@ -0,0 +1,9 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/Luzifer/ws-relay)](https://goreportcard.com/report/github.com/Luzifer/ws-relay)
![](https://badges.fyi/github/license/Luzifer/ws-relay)
![](https://badges.fyi/github/downloads/Luzifer/ws-relay)
![](https://badges.fyi/github/latest-release/Luzifer/ws-relay)
![](https://knut.in/project-status/ws-relay)
# Luzifer / ws-relay
This project is a very simple WebSocket relay service: No auth, no message parsing, just 1-n clients connecting to the same socket name receiving all messages sent to the socket.

19
go.mod Normal file
View File

@ -0,0 +1,19 @@
module github.com/Luzifer/ws-relay
go 1.18
require (
github.com/Luzifer/rconfig/v2 v2.4.0
github.com/gofrs/uuid v4.2.0+incompatible
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.8.1
)
require (
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect
gopkg.in/validator.v2 v2.0.0-20210331031555-b37d688a7fb0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

28
go.sum Normal file
View File

@ -0,0 +1,28 @@
github.com/Luzifer/rconfig/v2 v2.4.0 h1:MAdymTlExAZ8mx5VG8xOFAtFQSpWBipKYQHPOmYTn9o=
github.com/Luzifer/rconfig/v2 v2.4.0/go.mod h1:hWF3ZVSusbYlg5bEvCwalEyUSY+0JPJWUiIu7rBmav8=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZgBrnJfGa0=
github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/validator.v2 v2.0.0-20210331031555-b37d688a7fb0 h1:EFLtLCwd8tGN+r/ePz3cvRtdsfYNhDEdt/vp6qsT+0A=
gopkg.in/validator.v2 v2.0.0-20210331031555-b37d688a7fb0/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

99
main.go Normal file
View File

@ -0,0 +1,99 @@
package main
import (
"fmt"
"net/http"
"os"
"strings"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/Luzifer/rconfig/v2"
)
var (
cfg = struct {
Listen string `flag:"listen" default:":3000" description:"Port/IP to listen on"`
LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"`
VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"`
}{}
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
version = "dev"
)
func initApp() error {
rconfig.AutoEnv(true)
if err := rconfig.ParseAndValidate(&cfg); err != nil {
return errors.Wrap(err, "parsing cli options")
}
if cfg.VersionAndExit {
fmt.Printf("ws-relay %s\n", version)
os.Exit(0)
}
l, err := logrus.ParseLevel(cfg.LogLevel)
if err != nil {
return errors.Wrap(err, "parsing log-level")
}
logrus.SetLevel(l)
return nil
}
func main() {
var err error
if err = initApp(); err != nil {
logrus.WithError(err).Fatal("initializing app")
}
logrus.WithField("version", version).Info("ws-relay started")
router := mux.NewRouter()
router.HandleFunc("/{socket}", handleSocketRelay)
if err := http.ListenAndServe(cfg.Listen, router); err != nil {
logrus.WithError(err).Fatal("http server errored")
}
}
func handleSocketRelay(w http.ResponseWriter, r *http.Request) {
if !strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") {
// That's no socket request, don't spam the logs
http.Error(w, "this is a socket", http.StatusBadRequest)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logrus.WithError(err).Error("upgrading socket")
return
}
defer conn.Close()
var (
socketName = mux.Vars(r)["socket"]
connID, unregister = pool.Register(socketName, conn)
logger = logrus.WithFields(logrus.Fields{"id": connID, "socket": socketName})
)
defer unregister()
for {
msgType, msg, err := conn.ReadMessage()
if err != nil {
logger.WithError(err).Error("reading from connection")
return
}
pool.Send(socketName, msgType, msg)
}
}

36
namedLocker.go Normal file
View File

@ -0,0 +1,36 @@
package main
import "sync"
type (
namedLocker struct {
lockers map[string]*sync.Mutex
self *sync.Mutex
}
)
func newNamedLocker() *namedLocker {
return &namedLocker{
lockers: make(map[string]*sync.Mutex),
self: new(sync.Mutex),
}
}
func (n *namedLocker) Lock(name string) {
n.getLocker(name).Lock()
}
func (n *namedLocker) Unlock(name string) {
n.getLocker(name).Unlock()
}
func (n *namedLocker) getLocker(name string) *sync.Mutex {
n.self.Lock()
defer n.self.Unlock()
if n.lockers[name] == nil {
n.lockers[name] = new(sync.Mutex)
}
return n.lockers[name]
}

90
socketPool.go Normal file
View File

@ -0,0 +1,90 @@
package main
import (
"path"
"sync"
"github.com/gofrs/uuid"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)
var pool = newSocketPool()
type (
socketPool struct {
lock sync.RWMutex
pool map[string]map[string]*websocket.Conn
sendQueue *namedLocker
}
)
func newSocketPool() *socketPool {
return &socketPool{
pool: make(map[string]map[string]*websocket.Conn),
sendQueue: newNamedLocker(),
}
}
func (s *socketPool) Register(name string, conn *websocket.Conn) (string, func()) {
s.lock.Lock()
defer s.lock.Unlock()
connID := uuid.Must(uuid.NewV4()).String()
if s.pool[name] == nil {
s.pool[name] = map[string]*websocket.Conn{}
}
s.pool[name][connID] = conn
logrus.
WithFields(logrus.Fields{"id": connID, "socket": name}).
Info("registered socket")
return connID, func() { s.Unregister(name, connID) }
}
func (s *socketPool) Send(name string, msgType int, msg []byte) {
s.lock.RLock()
defer s.lock.RUnlock()
wg := new(sync.WaitGroup)
for id := range s.pool[name] {
wg.Add(1)
go s.SendLocked(wg, name, id, msgType, msg)
}
wg.Wait()
}
func (s *socketPool) SendLocked(wg *sync.WaitGroup, name, id string, msgType int, msg []byte) {
defer wg.Done()
s.sendQueue.Lock(path.Join(name, id))
defer s.sendQueue.Unlock(path.Join(name, id))
if err := s.pool[name][id].WriteMessage(msgType, msg); err != nil {
logrus.
WithError(err).
WithFields(logrus.Fields{"id": id, "socket": name}).
Error("delivering to socket")
s.Unregister(name, id)
}
}
func (s *socketPool) Unregister(name, connID string) {
s.lock.Lock()
defer s.lock.Unlock()
if s.pool[name] == nil || s.pool[name][connID] == nil {
return
}
s.pool[name][connID].Close()
delete(s.pool[name], connID)
logrus.
WithFields(logrus.Fields{"id": connID, "socket": name}).
Info("unregistered socket")
}