1
0
Fork 0
mirror of https://github.com/Luzifer/mqttcli.git synced 2024-11-08 14:50:11 +00:00

Add keepalive

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2022-02-07 15:50:56 +01:00
parent e27424aa27
commit 58abbcc02c
Signed by: luzifer
GPG key ID: 0066F03ED215AD7D
2 changed files with 14 additions and 11 deletions

View file

@ -68,8 +68,9 @@ func main() {
mqtt.NewClientOptions().
AddBroker(cfg.MQTTBroker).
SetClientID(cfg.MQTTClientID).
SetUsername(cfg.MQTTUser).
SetPassword(cfg.MQTTPass),
SetKeepAlive(cfg.MQTTTimeout).
SetPassword(cfg.MQTTPass).
SetUsername(cfg.MQTTUser),
)
tok := client.Connect()

20
mqtt.go
View file

@ -2,6 +2,7 @@ package main
import (
"encoding/json"
"errors"
"fmt"
"os"
@ -9,17 +10,19 @@ import (
log "github.com/sirupsen/logrus"
)
func mqttTokenToError(tok mqtt.Token) error {
if !tok.WaitTimeout(cfg.MQTTTimeout) {
return errors.New("token wait timed out")
}
return tok.Error()
}
func publish(client mqtt.Client) error {
for _, t := range cfg.Topics {
tok := client.Publish(t, byte(cfg.QOS), cfg.Retain, cfg.Message)
logger := log.WithField("topic", t)
if !tok.WaitTimeout(cfg.MQTTTimeout) {
logger.Error("Unable to publish message within timeout")
}
if err := tok.Error(); err != nil {
if err := mqttTokenToError(client.Publish(t, byte(cfg.QOS), cfg.Retain, cfg.Message)); err != nil {
logger.WithError(err).Fatal("Unable to publish message")
}
@ -54,8 +57,7 @@ func subscribe(client mqtt.Client) error {
log.WithField("format", cfg.OutputFormat).Fatal("Invalid output format specified")
}
tok := client.SubscribeMultiple(topics, callback)
if err := tok.Error(); err != nil {
if err := mqttTokenToError(client.SubscribeMultiple(topics, callback)); err != nil {
log.WithError(err).Fatal("Unable to subscribe topics")
}