mirror of
https://github.com/Luzifer/mqttcli.git
synced 2024-12-20 10:31:18 +00:00
Initial version
Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
commit
ddf29ffadb
5 changed files with 241 additions and 0 deletions
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
.env
|
||||
mqttcli
|
11
go.mod
Normal file
11
go.mod
Normal file
|
@ -0,0 +1,11 @@
|
|||
module github.com/Luzifer/mqttcli
|
||||
|
||||
go 1.12
|
||||
|
||||
require (
|
||||
github.com/Luzifer/rconfig/v2 v2.2.1
|
||||
github.com/eclipse/paho.mqtt.golang v1.2.0
|
||||
github.com/gofrs/uuid v3.2.0+incompatible
|
||||
github.com/sirupsen/logrus v1.4.2
|
||||
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect
|
||||
)
|
27
go.sum
Normal file
27
go.sum
Normal file
|
@ -0,0 +1,27 @@
|
|||
github.com/Luzifer/rconfig/v2 v2.2.1 h1:zcDdLQlnlzwcBJ8E0WFzOkQE1pCMn3EbX0dFYkeTczg=
|
||||
github.com/Luzifer/rconfig/v2 v2.2.1/go.mod h1:OKIX0/JRZrPJ/ZXXWklQEFXA6tBfWaljZbW37w+sqBw=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
|
||||
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
|
||||
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
|
||||
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
|
||||
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk=
|
||||
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
|
||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 h1:WB265cn5OpO+hK3pikC9hpP1zI/KTwmyMFKloW9eOVc=
|
||||
gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
100
main.go
Normal file
100
main.go
Normal file
|
@ -0,0 +1,100 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/gofrs/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/Luzifer/rconfig/v2"
|
||||
)
|
||||
|
||||
var (
|
||||
cfg = struct {
|
||||
LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"`
|
||||
Message string `flag:"message,m" default:"" description:""`
|
||||
MQTTBroker string `flag:"mqtt-broker,b" default:"tcp://localhost:1883" description:"Broker URI to connect to scheme://host:port (scheme is one of 'tcp', 'ssl', or 'ws')"`
|
||||
MQTTClientID string `flag:"mqtt-client-id" vardefault:"client-id" description:"Client ID to use when connecting, must be unique"`
|
||||
MQTTUser string `flag:"mqtt-user,u" default:"" description:"Username to identify against the broker" validate:"nonzero"`
|
||||
MQTTPass string `flag:"mqtt-pass,p" default:"" description:"Password to identify against the broker" validate:"nonzero"`
|
||||
MQTTTimeout time.Duration `flag:"mqtt-timeout" default:"10s" description:"How long to wait for the client to complete operations"`
|
||||
OutputFormat string `flag:"output-format,o" default:"log" description:"How to ouptut received messages (One of 'log', 'csv', 'jsonl')"`
|
||||
QOS int `flag:"qos" default:"1" description:"QOS to use (0 - Only Once, 1 - At Least Once, 2 - Only Once)"`
|
||||
Retain bool `flag:"retain" default:"false" description:"Retain message on topic"`
|
||||
Topics []string `flag:"topic,t" default:"" description:"Topic to subscribe / publish to"`
|
||||
VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"`
|
||||
}{}
|
||||
|
||||
version = "dev"
|
||||
)
|
||||
|
||||
func init() {
|
||||
rconfig.AutoEnv(true)
|
||||
|
||||
rconfig.SetVariableDefaults(map[string]string{
|
||||
"client-id": uuid.Must(uuid.NewV4()).String(),
|
||||
})
|
||||
|
||||
if err := rconfig.ParseAndValidate(&cfg); err != nil {
|
||||
log.Fatalf("Unable to parse commandline options: %s", err)
|
||||
}
|
||||
|
||||
if cfg.VersionAndExit {
|
||||
fmt.Printf("mqttcli %s\n", version)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
if l, err := log.ParseLevel(cfg.LogLevel); err != nil {
|
||||
log.WithError(err).Fatal("Unable to parse log level")
|
||||
} else {
|
||||
log.SetLevel(l)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
var cmd string
|
||||
if len(rconfig.Args()) > 1 {
|
||||
cmd = rconfig.Args()[1]
|
||||
}
|
||||
|
||||
if len(cfg.Topics) == 0 || (len(cfg.Topics) == 1 && cfg.Topics[0] == "") {
|
||||
log.Fatal("No topic(s) specified")
|
||||
}
|
||||
|
||||
client := mqtt.NewClient(
|
||||
mqtt.NewClientOptions().
|
||||
AddBroker(cfg.MQTTBroker).
|
||||
SetUsername(cfg.MQTTUser).
|
||||
SetPassword(cfg.MQTTPass),
|
||||
)
|
||||
|
||||
tok := client.Connect()
|
||||
if !tok.WaitTimeout(cfg.MQTTTimeout) {
|
||||
log.Fatal("Unable to connect within timeout")
|
||||
}
|
||||
if err := tok.Error(); err != nil {
|
||||
log.WithError(err).Fatal("Unable to connect to broker")
|
||||
}
|
||||
|
||||
switch cmd {
|
||||
case "pub":
|
||||
if cfg.Message == "" {
|
||||
log.Fatal("Empty message on publish")
|
||||
}
|
||||
|
||||
if err := publish(client); err != nil {
|
||||
log.WithError(err).Fatal("Failed to publish message")
|
||||
}
|
||||
|
||||
case "sub":
|
||||
if err := subscribe(client); err != nil {
|
||||
log.WithError(err).Fatal("Failed to subscribe and listen")
|
||||
}
|
||||
|
||||
default:
|
||||
log.Fatal("No command specified. Usage: mqttcli [opts] <pub|sub>")
|
||||
}
|
||||
}
|
101
mqtt.go
Normal file
101
mqtt.go
Normal file
|
@ -0,0 +1,101 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func publish(client mqtt.Client) error {
|
||||
for _, t := range cfg.Topics {
|
||||
tok := client.Publish(t, byte(cfg.QOS), cfg.Retain, cfg.Message)
|
||||
|
||||
logger := log.WithField("topic", t)
|
||||
|
||||
if !tok.WaitTimeout(cfg.MQTTTimeout) {
|
||||
logger.Error("Unable to publish message within timeout")
|
||||
}
|
||||
|
||||
if err := tok.Error(); err != nil {
|
||||
logger.WithError(err).Fatal("Unable to publish message")
|
||||
}
|
||||
|
||||
logger.Info("Message published")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func subscribe(client mqtt.Client) error {
|
||||
var (
|
||||
callback mqtt.MessageHandler
|
||||
topics = map[string]byte{}
|
||||
)
|
||||
|
||||
for _, t := range cfg.Topics {
|
||||
topics[t] = byte(cfg.QOS)
|
||||
}
|
||||
|
||||
switch cfg.OutputFormat {
|
||||
case "log":
|
||||
callback = subscribeCallbackLog
|
||||
|
||||
case "csv":
|
||||
fmt.Println("Topic,QOS,Retained,Message")
|
||||
callback = subscribeCallbackCSV
|
||||
|
||||
case "jsonl":
|
||||
callback = subscribeCallbackJSONL
|
||||
|
||||
default:
|
||||
log.WithField("format", cfg.OutputFormat).Fatal("Invalid output format specified")
|
||||
}
|
||||
|
||||
tok := client.SubscribeMultiple(topics, callback)
|
||||
if err := tok.Error(); err != nil {
|
||||
log.WithError(err).Fatal("Unable to subscribe topics")
|
||||
}
|
||||
|
||||
for {
|
||||
select {}
|
||||
}
|
||||
}
|
||||
|
||||
func subscribeCallbackLog(client mqtt.Client, msg mqtt.Message) {
|
||||
log.WithFields(log.Fields{
|
||||
"topic": msg.Topic(),
|
||||
"qos": msg.Qos(),
|
||||
"retained": msg.Retained(),
|
||||
"message": string(msg.Payload()),
|
||||
}).Info("Message received")
|
||||
}
|
||||
|
||||
func subscribeCallbackCSV(client mqtt.Client, msg mqtt.Message) {
|
||||
fmt.Printf("%s,%d,%v,%q",
|
||||
msg.Topic(),
|
||||
msg.Qos(),
|
||||
msg.Retained(),
|
||||
string(msg.Payload()),
|
||||
)
|
||||
}
|
||||
|
||||
func subscribeCallbackJSONL(client mqtt.Client, msg mqtt.Message) {
|
||||
jsonMessage := struct {
|
||||
Topic string `json:"topic"`
|
||||
QOS byte `json:"qos"`
|
||||
Retained bool `json:"retained"`
|
||||
Message string `json:"message"`
|
||||
}{
|
||||
msg.Topic(),
|
||||
msg.Qos(),
|
||||
msg.Retained(),
|
||||
string(msg.Payload()),
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(os.Stdout).Encode(jsonMessage); err != nil {
|
||||
log.WithError(err).Fatal("Unable to marshal message into jsonl")
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue