From ddf29ffadb82602ce5c5edd49132a1f0bbabb946 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Mon, 29 Jul 2019 23:24:38 +0200 Subject: [PATCH] Initial version Signed-off-by: Knut Ahlers --- .gitignore | 2 ++ go.mod | 11 ++++++ go.sum | 27 ++++++++++++++ main.go | 100 ++++++++++++++++++++++++++++++++++++++++++++++++++++ mqtt.go | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 241 insertions(+) create mode 100644 .gitignore create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 mqtt.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e54a25a --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env +mqttcli diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..c80368f --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/Luzifer/mqttcli + +go 1.12 + +require ( + github.com/Luzifer/rconfig/v2 v2.2.1 + github.com/eclipse/paho.mqtt.golang v1.2.0 + github.com/gofrs/uuid v3.2.0+incompatible + github.com/sirupsen/logrus v1.4.2 + golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..284300c --- /dev/null +++ b/go.sum @@ -0,0 +1,27 @@ +github.com/Luzifer/rconfig/v2 v2.2.1 h1:zcDdLQlnlzwcBJ8E0WFzOkQE1pCMn3EbX0dFYkeTczg= +github.com/Luzifer/rconfig/v2 v2.2.1/go.mod h1:OKIX0/JRZrPJ/ZXXWklQEFXA6tBfWaljZbW37w+sqBw= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0= +github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= +github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= +github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk= +golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 h1:WB265cn5OpO+hK3pikC9hpP1zI/KTwmyMFKloW9eOVc= +gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go new file mode 100644 index 0000000..1faa9e2 --- /dev/null +++ b/main.go @@ -0,0 +1,100 @@ +package main + +import ( + "fmt" + "os" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gofrs/uuid" + log "github.com/sirupsen/logrus" + + "github.com/Luzifer/rconfig/v2" +) + +var ( + cfg = struct { + LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` + Message string `flag:"message,m" default:"" description:""` + MQTTBroker string `flag:"mqtt-broker,b" default:"tcp://localhost:1883" description:"Broker URI to connect to scheme://host:port (scheme is one of 'tcp', 'ssl', or 'ws')"` + MQTTClientID string `flag:"mqtt-client-id" vardefault:"client-id" description:"Client ID to use when connecting, must be unique"` + MQTTUser string `flag:"mqtt-user,u" default:"" description:"Username to identify against the broker" validate:"nonzero"` + MQTTPass string `flag:"mqtt-pass,p" default:"" description:"Password to identify against the broker" validate:"nonzero"` + MQTTTimeout time.Duration `flag:"mqtt-timeout" default:"10s" description:"How long to wait for the client to complete operations"` + OutputFormat string `flag:"output-format,o" default:"log" description:"How to ouptut received messages (One of 'log', 'csv', 'jsonl')"` + QOS int `flag:"qos" default:"1" description:"QOS to use (0 - Only Once, 1 - At Least Once, 2 - Only Once)"` + Retain bool `flag:"retain" default:"false" description:"Retain message on topic"` + Topics []string `flag:"topic,t" default:"" description:"Topic to subscribe / publish to"` + VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` + }{} + + version = "dev" +) + +func init() { + rconfig.AutoEnv(true) + + rconfig.SetVariableDefaults(map[string]string{ + "client-id": uuid.Must(uuid.NewV4()).String(), + }) + + if err := rconfig.ParseAndValidate(&cfg); err != nil { + log.Fatalf("Unable to parse commandline options: %s", err) + } + + if cfg.VersionAndExit { + fmt.Printf("mqttcli %s\n", version) + os.Exit(0) + } + + if l, err := log.ParseLevel(cfg.LogLevel); err != nil { + log.WithError(err).Fatal("Unable to parse log level") + } else { + log.SetLevel(l) + } +} + +func main() { + var cmd string + if len(rconfig.Args()) > 1 { + cmd = rconfig.Args()[1] + } + + if len(cfg.Topics) == 0 || (len(cfg.Topics) == 1 && cfg.Topics[0] == "") { + log.Fatal("No topic(s) specified") + } + + client := mqtt.NewClient( + mqtt.NewClientOptions(). + AddBroker(cfg.MQTTBroker). + SetUsername(cfg.MQTTUser). + SetPassword(cfg.MQTTPass), + ) + + tok := client.Connect() + if !tok.WaitTimeout(cfg.MQTTTimeout) { + log.Fatal("Unable to connect within timeout") + } + if err := tok.Error(); err != nil { + log.WithError(err).Fatal("Unable to connect to broker") + } + + switch cmd { + case "pub": + if cfg.Message == "" { + log.Fatal("Empty message on publish") + } + + if err := publish(client); err != nil { + log.WithError(err).Fatal("Failed to publish message") + } + + case "sub": + if err := subscribe(client); err != nil { + log.WithError(err).Fatal("Failed to subscribe and listen") + } + + default: + log.Fatal("No command specified. Usage: mqttcli [opts] ") + } +} diff --git a/mqtt.go b/mqtt.go new file mode 100644 index 0000000..6fe665a --- /dev/null +++ b/mqtt.go @@ -0,0 +1,101 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + + mqtt "github.com/eclipse/paho.mqtt.golang" + log "github.com/sirupsen/logrus" +) + +func publish(client mqtt.Client) error { + for _, t := range cfg.Topics { + tok := client.Publish(t, byte(cfg.QOS), cfg.Retain, cfg.Message) + + logger := log.WithField("topic", t) + + if !tok.WaitTimeout(cfg.MQTTTimeout) { + logger.Error("Unable to publish message within timeout") + } + + if err := tok.Error(); err != nil { + logger.WithError(err).Fatal("Unable to publish message") + } + + logger.Info("Message published") + } + + return nil +} + +func subscribe(client mqtt.Client) error { + var ( + callback mqtt.MessageHandler + topics = map[string]byte{} + ) + + for _, t := range cfg.Topics { + topics[t] = byte(cfg.QOS) + } + + switch cfg.OutputFormat { + case "log": + callback = subscribeCallbackLog + + case "csv": + fmt.Println("Topic,QOS,Retained,Message") + callback = subscribeCallbackCSV + + case "jsonl": + callback = subscribeCallbackJSONL + + default: + log.WithField("format", cfg.OutputFormat).Fatal("Invalid output format specified") + } + + tok := client.SubscribeMultiple(topics, callback) + if err := tok.Error(); err != nil { + log.WithError(err).Fatal("Unable to subscribe topics") + } + + for { + select {} + } +} + +func subscribeCallbackLog(client mqtt.Client, msg mqtt.Message) { + log.WithFields(log.Fields{ + "topic": msg.Topic(), + "qos": msg.Qos(), + "retained": msg.Retained(), + "message": string(msg.Payload()), + }).Info("Message received") +} + +func subscribeCallbackCSV(client mqtt.Client, msg mqtt.Message) { + fmt.Printf("%s,%d,%v,%q", + msg.Topic(), + msg.Qos(), + msg.Retained(), + string(msg.Payload()), + ) +} + +func subscribeCallbackJSONL(client mqtt.Client, msg mqtt.Message) { + jsonMessage := struct { + Topic string `json:"topic"` + QOS byte `json:"qos"` + Retained bool `json:"retained"` + Message string `json:"message"` + }{ + msg.Topic(), + msg.Qos(), + msg.Retained(), + string(msg.Payload()), + } + + if err := json.NewEncoder(os.Stdout).Encode(jsonMessage); err != nil { + log.WithError(err).Fatal("Unable to marshal message into jsonl") + } +}