commit c47c80a9748c7754425bca2b7aa2e6f5d38d5294 Author: Knut Ahlers Date: Sat May 21 15:21:51 2022 +0200 Initial version diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..712f236 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +ws-relay diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..eee1ea9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,32 @@ +FROM golang:alpine as builder + +COPY . /go/src/github.com/Luzifer/ws-relay +WORKDIR /go/src/github.com/Luzifer/ws-relay + +RUN set -ex \ + && apk add --update \ + build-base \ + git \ + && go install \ + -ldflags "-X main.version=$(git describe --tags --always || echo dev)" \ + -mod=readonly \ + -modcacherw \ + -trimpath + + +FROM alpine:latest + +LABEL maintainer "Knut Ahlers " + +RUN set -ex \ + && apk --no-cache add \ + ca-certificates + +COPY --from=builder /go/bin/ws-relay /usr/local/bin/ws-relay + +EXPOSE 3000 + +ENTRYPOINT ["/usr/local/bin/ws-relay"] +CMD ["--"] + +# vim: set ft=Dockerfile: diff --git a/README.md b/README.md new file mode 100644 index 0000000..60625cc --- /dev/null +++ b/README.md @@ -0,0 +1,9 @@ +[![Go Report Card](https://goreportcard.com/badge/github.com/Luzifer/ws-relay)](https://goreportcard.com/report/github.com/Luzifer/ws-relay) +![](https://badges.fyi/github/license/Luzifer/ws-relay) +![](https://badges.fyi/github/downloads/Luzifer/ws-relay) +![](https://badges.fyi/github/latest-release/Luzifer/ws-relay) +![](https://knut.in/project-status/ws-relay) + +# Luzifer / ws-relay + +This project is a very simple WebSocket relay service: No auth, no message parsing, just 1-n clients connecting to the same socket name receiving all messages sent to the socket. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3ace533 --- /dev/null +++ b/go.mod @@ -0,0 +1,19 @@ +module github.com/Luzifer/ws-relay + +go 1.18 + +require ( + github.com/Luzifer/rconfig/v2 v2.4.0 + github.com/gofrs/uuid v4.2.0+incompatible + github.com/gorilla/mux v1.8.0 + github.com/gorilla/websocket v1.5.0 + github.com/pkg/errors v0.9.1 + github.com/sirupsen/logrus v1.8.1 +) + +require ( + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect + gopkg.in/validator.v2 v2.0.0-20210331031555-b37d688a7fb0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..379d6fe --- /dev/null +++ b/go.sum @@ -0,0 +1,28 @@ +github.com/Luzifer/rconfig/v2 v2.4.0 h1:MAdymTlExAZ8mx5VG8xOFAtFQSpWBipKYQHPOmYTn9o= +github.com/Luzifer/rconfig/v2 v2.4.0/go.mod h1:hWF3ZVSusbYlg5bEvCwalEyUSY+0JPJWUiIu7rBmav8= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZgBrnJfGa0= +github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/validator.v2 v2.0.0-20210331031555-b37d688a7fb0 h1:EFLtLCwd8tGN+r/ePz3cvRtdsfYNhDEdt/vp6qsT+0A= +gopkg.in/validator.v2 v2.0.0-20210331031555-b37d688a7fb0/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/main.go b/main.go new file mode 100644 index 0000000..7ab93fd --- /dev/null +++ b/main.go @@ -0,0 +1,99 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "strings" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "github.com/Luzifer/rconfig/v2" +) + +var ( + cfg = struct { + Listen string `flag:"listen" default:":3000" description:"Port/IP to listen on"` + LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` + VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` + }{} + + upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + + version = "dev" +) + +func initApp() error { + rconfig.AutoEnv(true) + if err := rconfig.ParseAndValidate(&cfg); err != nil { + return errors.Wrap(err, "parsing cli options") + } + + if cfg.VersionAndExit { + fmt.Printf("ws-relay %s\n", version) + os.Exit(0) + } + + l, err := logrus.ParseLevel(cfg.LogLevel) + if err != nil { + return errors.Wrap(err, "parsing log-level") + } + logrus.SetLevel(l) + + return nil +} + +func main() { + var err error + if err = initApp(); err != nil { + logrus.WithError(err).Fatal("initializing app") + } + + logrus.WithField("version", version).Info("ws-relay started") + + router := mux.NewRouter() + router.HandleFunc("/{socket}", handleSocketRelay) + + if err := http.ListenAndServe(cfg.Listen, router); err != nil { + logrus.WithError(err).Fatal("http server errored") + } +} + +func handleSocketRelay(w http.ResponseWriter, r *http.Request) { + if !strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") { + // That's no socket request, don't spam the logs + http.Error(w, "this is a socket", http.StatusBadRequest) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + logrus.WithError(err).Error("upgrading socket") + return + } + defer conn.Close() + + var ( + socketName = mux.Vars(r)["socket"] + connID, unregister = pool.Register(socketName, conn) + logger = logrus.WithFields(logrus.Fields{"id": connID, "socket": socketName}) + ) + defer unregister() + + for { + msgType, msg, err := conn.ReadMessage() + if err != nil { + logger.WithError(err).Error("reading from connection") + return + } + + pool.Send(socketName, msgType, msg) + } +} diff --git a/namedLocker.go b/namedLocker.go new file mode 100644 index 0000000..b78cd5f --- /dev/null +++ b/namedLocker.go @@ -0,0 +1,36 @@ +package main + +import "sync" + +type ( + namedLocker struct { + lockers map[string]*sync.Mutex + self *sync.Mutex + } +) + +func newNamedLocker() *namedLocker { + return &namedLocker{ + lockers: make(map[string]*sync.Mutex), + self: new(sync.Mutex), + } +} + +func (n *namedLocker) Lock(name string) { + n.getLocker(name).Lock() +} + +func (n *namedLocker) Unlock(name string) { + n.getLocker(name).Unlock() +} + +func (n *namedLocker) getLocker(name string) *sync.Mutex { + n.self.Lock() + defer n.self.Unlock() + + if n.lockers[name] == nil { + n.lockers[name] = new(sync.Mutex) + } + + return n.lockers[name] +} diff --git a/socketPool.go b/socketPool.go new file mode 100644 index 0000000..cbe7566 --- /dev/null +++ b/socketPool.go @@ -0,0 +1,90 @@ +package main + +import ( + "path" + "sync" + + "github.com/gofrs/uuid" + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" +) + +var pool = newSocketPool() + +type ( + socketPool struct { + lock sync.RWMutex + pool map[string]map[string]*websocket.Conn + sendQueue *namedLocker + } +) + +func newSocketPool() *socketPool { + return &socketPool{ + pool: make(map[string]map[string]*websocket.Conn), + sendQueue: newNamedLocker(), + } +} + +func (s *socketPool) Register(name string, conn *websocket.Conn) (string, func()) { + s.lock.Lock() + defer s.lock.Unlock() + + connID := uuid.Must(uuid.NewV4()).String() + + if s.pool[name] == nil { + s.pool[name] = map[string]*websocket.Conn{} + } + + s.pool[name][connID] = conn + logrus. + WithFields(logrus.Fields{"id": connID, "socket": name}). + Info("registered socket") + + return connID, func() { s.Unregister(name, connID) } +} + +func (s *socketPool) Send(name string, msgType int, msg []byte) { + s.lock.RLock() + defer s.lock.RUnlock() + + wg := new(sync.WaitGroup) + + for id := range s.pool[name] { + wg.Add(1) + go s.SendLocked(wg, name, id, msgType, msg) + } + + wg.Wait() +} + +func (s *socketPool) SendLocked(wg *sync.WaitGroup, name, id string, msgType int, msg []byte) { + defer wg.Done() + + s.sendQueue.Lock(path.Join(name, id)) + defer s.sendQueue.Unlock(path.Join(name, id)) + + if err := s.pool[name][id].WriteMessage(msgType, msg); err != nil { + logrus. + WithError(err). + WithFields(logrus.Fields{"id": id, "socket": name}). + Error("delivering to socket") + s.Unregister(name, id) + } +} + +func (s *socketPool) Unregister(name, connID string) { + s.lock.Lock() + defer s.lock.Unlock() + + if s.pool[name] == nil || s.pool[name][connID] == nil { + return + } + + s.pool[name][connID].Close() + delete(s.pool[name], connID) + + logrus. + WithFields(logrus.Fields{"id": connID, "socket": name}). + Info("unregistered socket") +}