diff --git a/fs20.go b/fs20.go index ff6d795..644f7f2 100644 --- a/fs20.go +++ b/fs20.go @@ -5,6 +5,7 @@ import ( "strings" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -34,13 +35,13 @@ func publishFS20ToCUL(client mqtt.Client, msg mqtt.Message) { } func publishFS20ToMQTT(housecode, device, command string) error { - tok := brokerClient.Publish( - strings.Join([]string{"culmqtt", fmt.Sprintf("%s%s", housecode, device), "state"}, "/"), - 0x01, // QOS Level 1: At least once - true, - command, + return errors.Wrap( + mqttTokToErr(brokerClient.Publish( + strings.Join([]string{"culmqtt", fmt.Sprintf("%s%s", housecode, device), "state"}, "/"), + 0x01, // QOS Level 1: At least once + true, + command, + )), + "publishing message", ) - - tok.Wait() - return tok.Error() } diff --git a/go.mod b/go.mod index fa47d23..89d633d 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Luzifer/rconfig/v2 v2.2.1 github.com/eclipse/paho.mqtt.golang v1.3.2 github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4 + github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.7.0 github.com/spf13/pflag v1.0.5 // indirect golang.org/x/net v0.0.0-20210119194325-5f4716e94777 // indirect diff --git a/go.sum b/go.sum index da06dd9..7c171fb 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4 h1:G2ztCwXov8mRvP0ZfjE6nAlaCX2XbykaeHdbT6KwDz0= github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4/go.mod h1:2RvX5ZjVtsznNZPEt4xwJXNJrM3VTZoQf7V6gk0ysvs= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= diff --git a/main.go b/main.go index fc0d6ab..ffc9b22 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "time" "github.com/Luzifer/rconfig/v2" "github.com/jacobsa/go-serial/serial" @@ -13,12 +14,13 @@ import ( var ( cfg = struct { - CULDevice string `flag:"cul-device" default:"/dev/ttyACM0" description:"TTY of the CUL to connect to"` - LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` - MQTTHost string `flag:"mqtt-host" default:"tcp://127.0.0.1:1883" description:"Connection URI for the broker"` - MQTTUser string `flag:"mqtt-user" default:"" description:"Username for broker connection"` - MQTTPass string `flag:"mqtt-pass" default:"" description:"Password for broker connection"` - VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` + CULDevice string `flag:"cul-device" default:"/dev/ttyACM0" description:"TTY of the CUL to connect to"` + LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` + MQTTHost string `flag:"mqtt-host" default:"tcp://127.0.0.1:1883" description:"Connection URI for the broker"` + MQTTUser string `flag:"mqtt-user" default:"" description:"Username for broker connection"` + MQTTPass string `flag:"mqtt-pass" default:"" description:"Password for broker connection"` + MQTTTimeout time.Duration `flag:"mqtt-timeout" default:"2s" description:"Timeout for MQTT actions"` + VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` }{} port io.ReadWriteCloser diff --git a/mqtt.go b/mqtt.go index 95109d8..1170b8a 100644 --- a/mqtt.go +++ b/mqtt.go @@ -1,6 +1,11 @@ package main -import mqtt "github.com/eclipse/paho.mqtt.golang" +import ( + "errors" + + mqtt "github.com/eclipse/paho.mqtt.golang" + log "github.com/sirupsen/logrus" +) var brokerClient mqtt.Client @@ -12,6 +17,19 @@ func init() { brokerClient = mqtt.NewClient(opts) - brokerClient.Connect().Wait() - brokerClient.Subscribe("culmqtt/+/send", 0x01, publishFS20ToCUL) + if err := mqttTokToErr(brokerClient.Connect()); err != nil { + log.WithError(err).Fatal("Connect to MQTT broker") + } + + if err := mqttTokToErr(brokerClient.Subscribe("culmqtt/+/send", 0x01, publishFS20ToCUL)); err != nil { + log.WithError(err).Fatal("Subscribe to topic") + } +} + +func mqttTokToErr(tok mqtt.Token) error { + if !tok.WaitTimeout(cfg.MQTTTimeout) { + return errors.New("command timed out") + } + + return tok.Error() }