1
0
Fork 0
mirror of https://github.com/Luzifer/mqttcli.git synced 2024-11-08 14:50:11 +00:00
mqttcli/mqtt.go

104 lines
2.2 KiB
Go
Raw Permalink Normal View History

package main
import (
"encoding/json"
"errors"
"fmt"
"os"
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
)
func mqttTokenToError(tok mqtt.Token) error {
if !tok.WaitTimeout(cfg.MQTTTimeout) {
return errors.New("token wait timed out")
}
return tok.Error() //nolint:wrapcheck // fine in this case, only used in logging
}
func publish(client mqtt.Client) error {
for _, t := range cfg.Topics {
logger := log.WithField("topic", t)
if err := mqttTokenToError(client.Publish(t, byte(cfg.QOS), cfg.Retain, cfg.Message)); err != nil {
logger.WithError(err).Fatal("Unable to publish message")
}
logger.Info("Message published")
}
return nil
}
func subscribe(client mqtt.Client) error {
var (
callback mqtt.MessageHandler
topics = map[string]byte{}
)
for _, t := range cfg.Topics {
topics[t] = byte(cfg.QOS)
}
switch cfg.OutputFormat {
case "log":
callback = subscribeCallbackLog
case "csv":
fmt.Println("Topic,QOS,Retained,Message") //nolint:forbidigo
callback = subscribeCallbackCSV
case "jsonl":
callback = subscribeCallbackJSONL
default:
log.WithField("format", cfg.OutputFormat).Fatal("Invalid output format specified")
}
if err := mqttTokenToError(client.SubscribeMultiple(topics, callback)); err != nil {
log.WithError(err).Fatal("Unable to subscribe topics")
}
for {
select {}
}
}
func subscribeCallbackLog(_ mqtt.Client, msg mqtt.Message) {
log.WithFields(log.Fields{
"topic": msg.Topic(),
"qos": msg.Qos(),
"retained": msg.Retained(),
"message": string(msg.Payload()),
}).Info("Message received")
}
func subscribeCallbackCSV(_ mqtt.Client, msg mqtt.Message) {
fmt.Printf("%s,%d,%v,%q\n", //nolint:forbidigo
msg.Topic(),
msg.Qos(),
msg.Retained(),
string(msg.Payload()),
)
}
func subscribeCallbackJSONL(_ mqtt.Client, msg mqtt.Message) {
jsonMessage := struct {
Topic string `json:"topic"`
QOS byte `json:"qos"`
Retained bool `json:"retained"`
Message string `json:"message"`
}{
msg.Topic(),
msg.Qos(),
msg.Retained(),
string(msg.Payload()),
}
if err := json.NewEncoder(os.Stdout).Encode(jsonMessage); err != nil {
log.WithError(err).Fatal("Unable to marshal message into jsonl")
}
}