1
0
Fork 0
mirror of https://github.com/Luzifer/mqttcli.git synced 2024-11-14 01:22:47 +00:00
mqttcli/mqtt.go

102 lines
2.1 KiB
Go
Raw Permalink Normal View History

package main
import (
"encoding/json"
"fmt"
"os"
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
)
func publish(client mqtt.Client) error {
for _, t := range cfg.Topics {
tok := client.Publish(t, byte(cfg.QOS), cfg.Retain, cfg.Message)
logger := log.WithField("topic", t)
if !tok.WaitTimeout(cfg.MQTTTimeout) {
logger.Error("Unable to publish message within timeout")
}
if err := tok.Error(); err != nil {
logger.WithError(err).Fatal("Unable to publish message")
}
logger.Info("Message published")
}
return nil
}
func subscribe(client mqtt.Client) error {
var (
callback mqtt.MessageHandler
topics = map[string]byte{}
)
for _, t := range cfg.Topics {
topics[t] = byte(cfg.QOS)
}
switch cfg.OutputFormat {
case "log":
callback = subscribeCallbackLog
case "csv":
fmt.Println("Topic,QOS,Retained,Message")
callback = subscribeCallbackCSV
case "jsonl":
callback = subscribeCallbackJSONL
default:
log.WithField("format", cfg.OutputFormat).Fatal("Invalid output format specified")
}
tok := client.SubscribeMultiple(topics, callback)
if err := tok.Error(); err != nil {
log.WithError(err).Fatal("Unable to subscribe topics")
}
for {
select {}
}
}
func subscribeCallbackLog(client mqtt.Client, msg mqtt.Message) {
log.WithFields(log.Fields{
"topic": msg.Topic(),
"qos": msg.Qos(),
"retained": msg.Retained(),
"message": string(msg.Payload()),
}).Info("Message received")
}
func subscribeCallbackCSV(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("%s,%d,%v,%q\n",
msg.Topic(),
msg.Qos(),
msg.Retained(),
string(msg.Payload()),
)
}
func subscribeCallbackJSONL(client mqtt.Client, msg mqtt.Message) {
jsonMessage := struct {
Topic string `json:"topic"`
QOS byte `json:"qos"`
Retained bool `json:"retained"`
Message string `json:"message"`
}{
msg.Topic(),
msg.Qos(),
msg.Retained(),
string(msg.Payload()),
}
if err := json.NewEncoder(os.Stdout).Encode(jsonMessage); err != nil {
log.WithError(err).Fatal("Unable to marshal message into jsonl")
}
}