From 58abbcc02c3620b469355085e46ab484ecc3ef8d Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Mon, 7 Feb 2022 15:50:56 +0100 Subject: [PATCH] Add keepalive Signed-off-by: Knut Ahlers --- main.go | 5 +++-- mqtt.go | 20 +++++++++++--------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/main.go b/main.go index a8af8af..228b932 100644 --- a/main.go +++ b/main.go @@ -68,8 +68,9 @@ func main() { mqtt.NewClientOptions(). AddBroker(cfg.MQTTBroker). SetClientID(cfg.MQTTClientID). - SetUsername(cfg.MQTTUser). - SetPassword(cfg.MQTTPass), + SetKeepAlive(cfg.MQTTTimeout). + SetPassword(cfg.MQTTPass). + SetUsername(cfg.MQTTUser), ) tok := client.Connect() diff --git a/mqtt.go b/mqtt.go index ea9d36a..83f62a9 100644 --- a/mqtt.go +++ b/mqtt.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "errors" "fmt" "os" @@ -9,17 +10,19 @@ import ( log "github.com/sirupsen/logrus" ) +func mqttTokenToError(tok mqtt.Token) error { + if !tok.WaitTimeout(cfg.MQTTTimeout) { + return errors.New("token wait timed out") + } + + return tok.Error() +} + func publish(client mqtt.Client) error { for _, t := range cfg.Topics { - tok := client.Publish(t, byte(cfg.QOS), cfg.Retain, cfg.Message) - logger := log.WithField("topic", t) - if !tok.WaitTimeout(cfg.MQTTTimeout) { - logger.Error("Unable to publish message within timeout") - } - - if err := tok.Error(); err != nil { + if err := mqttTokenToError(client.Publish(t, byte(cfg.QOS), cfg.Retain, cfg.Message)); err != nil { logger.WithError(err).Fatal("Unable to publish message") } @@ -54,8 +57,7 @@ func subscribe(client mqtt.Client) error { log.WithField("format", cfg.OutputFormat).Fatal("Invalid output format specified") } - tok := client.SubscribeMultiple(topics, callback) - if err := tok.Error(); err != nil { + if err := mqttTokenToError(client.SubscribeMultiple(topics, callback)); err != nil { log.WithError(err).Fatal("Unable to subscribe topics") }