mirror of
https://github.com/Luzifer/mqttcli.git
synced 2024-11-08 06:40:08 +00:00
103 lines
2.2 KiB
Go
103 lines
2.2 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
func mqttTokenToError(tok mqtt.Token) error {
|
|
if !tok.WaitTimeout(cfg.MQTTTimeout) {
|
|
return errors.New("token wait timed out")
|
|
}
|
|
|
|
return tok.Error() //nolint:wrapcheck // fine in this case, only used in logging
|
|
}
|
|
|
|
func publish(client mqtt.Client) error {
|
|
for _, t := range cfg.Topics {
|
|
logger := log.WithField("topic", t)
|
|
|
|
if err := mqttTokenToError(client.Publish(t, byte(cfg.QOS), cfg.Retain, cfg.Message)); err != nil {
|
|
logger.WithError(err).Fatal("Unable to publish message")
|
|
}
|
|
|
|
logger.Info("Message published")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func subscribe(client mqtt.Client) error {
|
|
var (
|
|
callback mqtt.MessageHandler
|
|
topics = map[string]byte{}
|
|
)
|
|
|
|
for _, t := range cfg.Topics {
|
|
topics[t] = byte(cfg.QOS)
|
|
}
|
|
|
|
switch cfg.OutputFormat {
|
|
case "log":
|
|
callback = subscribeCallbackLog
|
|
|
|
case "csv":
|
|
fmt.Println("Topic,QOS,Retained,Message") //nolint:forbidigo
|
|
callback = subscribeCallbackCSV
|
|
|
|
case "jsonl":
|
|
callback = subscribeCallbackJSONL
|
|
|
|
default:
|
|
log.WithField("format", cfg.OutputFormat).Fatal("Invalid output format specified")
|
|
}
|
|
|
|
if err := mqttTokenToError(client.SubscribeMultiple(topics, callback)); err != nil {
|
|
log.WithError(err).Fatal("Unable to subscribe topics")
|
|
}
|
|
|
|
for {
|
|
select {}
|
|
}
|
|
}
|
|
|
|
func subscribeCallbackLog(_ mqtt.Client, msg mqtt.Message) {
|
|
log.WithFields(log.Fields{
|
|
"topic": msg.Topic(),
|
|
"qos": msg.Qos(),
|
|
"retained": msg.Retained(),
|
|
"message": string(msg.Payload()),
|
|
}).Info("Message received")
|
|
}
|
|
|
|
func subscribeCallbackCSV(_ mqtt.Client, msg mqtt.Message) {
|
|
fmt.Printf("%s,%d,%v,%q\n", //nolint:forbidigo
|
|
msg.Topic(),
|
|
msg.Qos(),
|
|
msg.Retained(),
|
|
string(msg.Payload()),
|
|
)
|
|
}
|
|
|
|
func subscribeCallbackJSONL(_ mqtt.Client, msg mqtt.Message) {
|
|
jsonMessage := struct {
|
|
Topic string `json:"topic"`
|
|
QOS byte `json:"qos"`
|
|
Retained bool `json:"retained"`
|
|
Message string `json:"message"`
|
|
}{
|
|
msg.Topic(),
|
|
msg.Qos(),
|
|
msg.Retained(),
|
|
string(msg.Payload()),
|
|
}
|
|
|
|
if err := json.NewEncoder(os.Stdout).Encode(jsonMessage); err != nil {
|
|
log.WithError(err).Fatal("Unable to marshal message into jsonl")
|
|
}
|
|
}
|