From c47c80a9748c7754425bca2b7aa2e6f5d38d5294 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Sat, 21 May 2022 15:21:51 +0200 Subject: [PATCH] Initial version --- .gitignore | 1 + Dockerfile | 32 ++++++++++++++++ README.md | 9 +++++ go.mod | 19 ++++++++++ go.sum | 28 ++++++++++++++ main.go | 99 ++++++++++++++++++++++++++++++++++++++++++++++++++ namedLocker.go | 36 ++++++++++++++++++ socketPool.go | 90 +++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 314 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 namedLocker.go create mode 100644 socketPool.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..712f236 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +ws-relay diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..eee1ea9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,32 @@ +FROM golang:alpine as builder + +COPY . /go/src/github.com/Luzifer/ws-relay +WORKDIR /go/src/github.com/Luzifer/ws-relay + +RUN set -ex \ + && apk add --update \ + build-base \ + git \ + && go install \ + -ldflags "-X main.version=$(git describe --tags --always || echo dev)" \ + -mod=readonly \ + -modcacherw \ + -trimpath + + +FROM alpine:latest + +LABEL maintainer "Knut Ahlers " + +RUN set -ex \ + && apk --no-cache add \ + ca-certificates + +COPY --from=builder /go/bin/ws-relay /usr/local/bin/ws-relay + +EXPOSE 3000 + +ENTRYPOINT ["/usr/local/bin/ws-relay"] +CMD ["--"] + +# vim: set ft=Dockerfile: diff --git a/README.md b/README.md new file mode 100644 index 0000000..60625cc --- /dev/null +++ b/README.md @@ -0,0 +1,9 @@ +[![Go Report Card](https://goreportcard.com/badge/github.com/Luzifer/ws-relay)](https://goreportcard.com/report/github.com/Luzifer/ws-relay) +![](https://badges.fyi/github/license/Luzifer/ws-relay) +![](https://badges.fyi/github/downloads/Luzifer/ws-relay) +![](https://badges.fyi/github/latest-release/Luzifer/ws-relay) +![](https://knut.in/project-status/ws-relay) + +# Luzifer / ws-relay + +This project is a very simple WebSocket relay service: No auth, no message parsing, just 1-n clients connecting to the same socket name receiving all messages sent to the socket. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3ace533 --- /dev/null +++ b/go.mod @@ -0,0 +1,19 @@ +module github.com/Luzifer/ws-relay + +go 1.18 + +require ( + github.com/Luzifer/rconfig/v2 v2.4.0 + github.com/gofrs/uuid v4.2.0+incompatible + github.com/gorilla/mux v1.8.0 + github.com/gorilla/websocket v1.5.0 + github.com/pkg/errors v0.9.1 + github.com/sirupsen/logrus v1.8.1 +) + +require ( + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect + gopkg.in/validator.v2 v2.0.0-20210331031555-b37d688a7fb0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..379d6fe --- /dev/null +++ b/go.sum @@ -0,0 +1,28 @@ +github.com/Luzifer/rconfig/v2 v2.4.0 h1:MAdymTlExAZ8mx5VG8xOFAtFQSpWBipKYQHPOmYTn9o= +github.com/Luzifer/rconfig/v2 v2.4.0/go.mod h1:hWF3ZVSusbYlg5bEvCwalEyUSY+0JPJWUiIu7rBmav8= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZgBrnJfGa0= +github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/validator.v2 v2.0.0-20210331031555-b37d688a7fb0 h1:EFLtLCwd8tGN+r/ePz3cvRtdsfYNhDEdt/vp6qsT+0A= +gopkg.in/validator.v2 v2.0.0-20210331031555-b37d688a7fb0/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/main.go b/main.go new file mode 100644 index 0000000..7ab93fd --- /dev/null +++ b/main.go @@ -0,0 +1,99 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "strings" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "github.com/Luzifer/rconfig/v2" +) + +var ( + cfg = struct { + Listen string `flag:"listen" default:":3000" description:"Port/IP to listen on"` + LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` + VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` + }{} + + upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + + version = "dev" +) + +func initApp() error { + rconfig.AutoEnv(true) + if err := rconfig.ParseAndValidate(&cfg); err != nil { + return errors.Wrap(err, "parsing cli options") + } + + if cfg.VersionAndExit { + fmt.Printf("ws-relay %s\n", version) + os.Exit(0) + } + + l, err := logrus.ParseLevel(cfg.LogLevel) + if err != nil { + return errors.Wrap(err, "parsing log-level") + } + logrus.SetLevel(l) + + return nil +} + +func main() { + var err error + if err = initApp(); err != nil { + logrus.WithError(err).Fatal("initializing app") + } + + logrus.WithField("version", version).Info("ws-relay started") + + router := mux.NewRouter() + router.HandleFunc("/{socket}", handleSocketRelay) + + if err := http.ListenAndServe(cfg.Listen, router); err != nil { + logrus.WithError(err).Fatal("http server errored") + } +} + +func handleSocketRelay(w http.ResponseWriter, r *http.Request) { + if !strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") { + // That's no socket request, don't spam the logs + http.Error(w, "this is a socket", http.StatusBadRequest) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + logrus.WithError(err).Error("upgrading socket") + return + } + defer conn.Close() + + var ( + socketName = mux.Vars(r)["socket"] + connID, unregister = pool.Register(socketName, conn) + logger = logrus.WithFields(logrus.Fields{"id": connID, "socket": socketName}) + ) + defer unregister() + + for { + msgType, msg, err := conn.ReadMessage() + if err != nil { + logger.WithError(err).Error("reading from connection") + return + } + + pool.Send(socketName, msgType, msg) + } +} diff --git a/namedLocker.go b/namedLocker.go new file mode 100644 index 0000000..b78cd5f --- /dev/null +++ b/namedLocker.go @@ -0,0 +1,36 @@ +package main + +import "sync" + +type ( + namedLocker struct { + lockers map[string]*sync.Mutex + self *sync.Mutex + } +) + +func newNamedLocker() *namedLocker { + return &namedLocker{ + lockers: make(map[string]*sync.Mutex), + self: new(sync.Mutex), + } +} + +func (n *namedLocker) Lock(name string) { + n.getLocker(name).Lock() +} + +func (n *namedLocker) Unlock(name string) { + n.getLocker(name).Unlock() +} + +func (n *namedLocker) getLocker(name string) *sync.Mutex { + n.self.Lock() + defer n.self.Unlock() + + if n.lockers[name] == nil { + n.lockers[name] = new(sync.Mutex) + } + + return n.lockers[name] +} diff --git a/socketPool.go b/socketPool.go new file mode 100644 index 0000000..cbe7566 --- /dev/null +++ b/socketPool.go @@ -0,0 +1,90 @@ +package main + +import ( + "path" + "sync" + + "github.com/gofrs/uuid" + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" +) + +var pool = newSocketPool() + +type ( + socketPool struct { + lock sync.RWMutex + pool map[string]map[string]*websocket.Conn + sendQueue *namedLocker + } +) + +func newSocketPool() *socketPool { + return &socketPool{ + pool: make(map[string]map[string]*websocket.Conn), + sendQueue: newNamedLocker(), + } +} + +func (s *socketPool) Register(name string, conn *websocket.Conn) (string, func()) { + s.lock.Lock() + defer s.lock.Unlock() + + connID := uuid.Must(uuid.NewV4()).String() + + if s.pool[name] == nil { + s.pool[name] = map[string]*websocket.Conn{} + } + + s.pool[name][connID] = conn + logrus. + WithFields(logrus.Fields{"id": connID, "socket": name}). + Info("registered socket") + + return connID, func() { s.Unregister(name, connID) } +} + +func (s *socketPool) Send(name string, msgType int, msg []byte) { + s.lock.RLock() + defer s.lock.RUnlock() + + wg := new(sync.WaitGroup) + + for id := range s.pool[name] { + wg.Add(1) + go s.SendLocked(wg, name, id, msgType, msg) + } + + wg.Wait() +} + +func (s *socketPool) SendLocked(wg *sync.WaitGroup, name, id string, msgType int, msg []byte) { + defer wg.Done() + + s.sendQueue.Lock(path.Join(name, id)) + defer s.sendQueue.Unlock(path.Join(name, id)) + + if err := s.pool[name][id].WriteMessage(msgType, msg); err != nil { + logrus. + WithError(err). + WithFields(logrus.Fields{"id": id, "socket": name}). + Error("delivering to socket") + s.Unregister(name, id) + } +} + +func (s *socketPool) Unregister(name, connID string) { + s.lock.Lock() + defer s.lock.Unlock() + + if s.pool[name] == nil || s.pool[name][connID] == nil { + return + } + + s.pool[name][connID].Close() + delete(s.pool[name], connID) + + logrus. + WithFields(logrus.Fields{"id": connID, "socket": name}). + Info("unregistered socket") +}