mirror of
https://github.com/Luzifer/continuous-spark.git
synced 2024-12-20 17:51:22 +00:00
Lint: Fix linter errors, improve error handling
Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
parent
9eec01877f
commit
f89a049907
5 changed files with 40 additions and 31 deletions
14
influx.go
14
influx.go
|
@ -21,7 +21,7 @@ type metricsSender struct {
|
||||||
influxDB string
|
influxDB string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMetricsSender(influxHost, influxUser, influxPass, influxDatabase string) (*metricsSender, error) {
|
func newMetricsSender(influxHost, influxUser, influxPass, influxDatabase string) (*metricsSender, error) {
|
||||||
out := &metricsSender{
|
out := &metricsSender{
|
||||||
errs: make(chan error, 10),
|
errs: make(chan error, 10),
|
||||||
influxDB: influxDatabase,
|
influxDB: influxDatabase,
|
||||||
|
@ -40,7 +40,7 @@ func (m *metricsSender) ForceTransmit() error {
|
||||||
func (m *metricsSender) RecordPoint(name string, tags map[string]string, fields map[string]interface{}) error {
|
func (m *metricsSender) RecordPoint(name string, tags map[string]string, fields map[string]interface{}) error {
|
||||||
pt, err := influx.NewPoint(name, tags, fields, time.Now())
|
pt, err := influx.NewPoint(name, tags, fields, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to create point")
|
||||||
}
|
}
|
||||||
|
|
||||||
m.batchLock.Lock()
|
m.batchLock.Lock()
|
||||||
|
@ -56,7 +56,7 @@ func (m *metricsSender) resetBatch() error {
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to create new points batch")
|
||||||
}
|
}
|
||||||
|
|
||||||
m.batch = b
|
m.batch = b
|
||||||
|
@ -80,9 +80,7 @@ func (m *metricsSender) transmit() error {
|
||||||
if err := m.client.Write(m.batch); err != nil {
|
if err := m.client.Write(m.batch); err != nil {
|
||||||
return errors.Wrap(err, "Unable to write recorded points")
|
return errors.Wrap(err, "Unable to write recorded points")
|
||||||
}
|
}
|
||||||
m.resetBatch()
|
return errors.Wrap(m.resetBatch(), "Unable to reset batch")
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *metricsSender) initialize(influxHost, influxUser, influxPass string) error {
|
func (m *metricsSender) initialize(influxHost, influxUser, influxPass string) error {
|
||||||
|
@ -94,12 +92,12 @@ func (m *metricsSender) initialize(influxHost, influxUser, influxPass string) er
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to create InfluxDB HTTP client")
|
||||||
}
|
}
|
||||||
|
|
||||||
m.client = influxClient
|
m.client = influxClient
|
||||||
if err := m.resetBatch(); err != nil {
|
if err := m.resetBatch(); err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to reset batch")
|
||||||
}
|
}
|
||||||
go m.sendLoop()
|
go m.sendLoop()
|
||||||
|
|
||||||
|
|
2
main.go
2
main.go
|
@ -53,7 +53,7 @@ func main() {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if cfg.InfluxDB != "" {
|
if cfg.InfluxDB != "" {
|
||||||
if metrics, err = NewMetricsSender(cfg.InfluxHost, cfg.InfluxUser, cfg.InfluxPass, cfg.InfluxDB); err != nil {
|
if metrics, err = newMetricsSender(cfg.InfluxHost, cfg.InfluxUser, cfg.InfluxPass, cfg.InfluxDB); err != nil {
|
||||||
log.WithError(err).Fatalf("Unable to initialize InfluxDB sender")
|
log.WithError(err).Fatalf("Unable to initialize InfluxDB sender")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
10
ping.go
10
ping.go
|
@ -4,6 +4,8 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type pingHistory []int64
|
type pingHistory []int64
|
||||||
|
@ -12,18 +14,20 @@ func (s *sparkClient) ExecutePingTest(t *testResult) error {
|
||||||
ph := pingHistory{}
|
ph := pingHistory{}
|
||||||
|
|
||||||
if err := s.connect(); err != nil {
|
if err := s.connect(); err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to connect")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.writeCommand("ECO"); err != nil {
|
if err := s.writeCommand("ECO"); err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to send ECO command")
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, 1)
|
buf := make([]byte, 1)
|
||||||
|
|
||||||
for i := 0; i < numPings; i++ {
|
for i := 0; i < numPings; i++ {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
s.conn.Write([]byte{46})
|
if _, err := s.conn.Write([]byte{46}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if _, err := s.conn.Read(buf); err != nil {
|
if _, err := s.conn.Read(buf); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
20
spark.go
20
spark.go
|
@ -7,6 +7,7 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -74,7 +75,7 @@ func newSparkClient(hostname string, port int) *sparkClient {
|
||||||
func (s *sparkClient) dial() error {
|
func (s *sparkClient) dial() error {
|
||||||
c, err := net.Dial("tcp", s.remote)
|
c, err := net.Dial("tcp", s.remote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to dial")
|
||||||
}
|
}
|
||||||
s.conn = c
|
s.conn = c
|
||||||
|
|
||||||
|
@ -83,13 +84,13 @@ func (s *sparkClient) dial() error {
|
||||||
|
|
||||||
func (s *sparkClient) connect() error {
|
func (s *sparkClient) connect() error {
|
||||||
if err := s.dial(); err != nil {
|
if err := s.dial(); err != nil {
|
||||||
return fmt.Errorf("Unable to connect to sparkyfish-server %q: %s", s.remote, err)
|
return errors.Wrapf(err, "Unable to connect to sparkyfish-server %q", s.remote)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.reader = bufio.NewReader(s.conn)
|
s.reader = bufio.NewReader(s.conn)
|
||||||
|
|
||||||
if err := s.writeCommand(fmt.Sprintf("HELO%d", protocolVersion)); err != nil {
|
if err := s.writeCommand(fmt.Sprintf("HELO%d", protocolVersion)); err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to send HELO command")
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.readGreeting()
|
return s.readGreeting()
|
||||||
|
@ -97,19 +98,19 @@ func (s *sparkClient) connect() error {
|
||||||
|
|
||||||
func (s *sparkClient) writeCommand(command string) error {
|
func (s *sparkClient) writeCommand(command string) error {
|
||||||
if _, err := fmt.Fprintf(s.conn, "%s\r\n", command); err != nil {
|
if _, err := fmt.Fprintf(s.conn, "%s\r\n", command); err != nil {
|
||||||
return fmt.Errorf("Unable to send command %q: %s", command, err)
|
return errors.Wrapf(err, "Unable to send command %q", command)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sparkClient) readGreeting() error {
|
func (s *sparkClient) readGreeting() error {
|
||||||
if helo, err := s.reader.ReadString('\n'); err != nil || strings.TrimSpace(helo) != "HELO" {
|
if helo, err := s.reader.ReadString('\n'); err != nil || strings.TrimSpace(helo) != "HELO" {
|
||||||
return fmt.Errorf("Unexpected response to greeting")
|
return errors.New("Unexpected response to greeting")
|
||||||
}
|
}
|
||||||
|
|
||||||
cn, err := s.reader.ReadString('\n')
|
cn, err := s.reader.ReadString('\n')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to read string")
|
||||||
}
|
}
|
||||||
|
|
||||||
cn = strings.TrimSpace(cn)
|
cn = strings.TrimSpace(cn)
|
||||||
|
@ -119,11 +120,14 @@ func (s *sparkClient) readGreeting() error {
|
||||||
|
|
||||||
loc, err := s.reader.ReadString('\n')
|
loc, err := s.reader.ReadString('\n')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to read string")
|
||||||
}
|
}
|
||||||
|
|
||||||
loc = strings.TrimSpace(loc)
|
loc = strings.TrimSpace(loc)
|
||||||
|
|
||||||
log.Debugf("Connected to %q in location %q", cn, loc)
|
log.WithFields(log.Fields{
|
||||||
|
"cn": cn,
|
||||||
|
"location": loc,
|
||||||
|
}).Debug("Connected to server")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,36 +3,37 @@ package main
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *sparkClient) ExecuteThroughputTest(t *testResult) error {
|
func (s *sparkClient) ExecuteThroughputTest(t *testResult) error {
|
||||||
if err := s.runSendTest(t); err != nil {
|
if err := s.runSendTest(t); err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Send-test failed")
|
||||||
}
|
}
|
||||||
return s.runRecvTest(t)
|
return errors.Wrap(s.runRecvTest(t), "Recv-test failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sparkClient) runSendTest(t *testResult) error {
|
func (s *sparkClient) runSendTest(t *testResult) error {
|
||||||
data := make([]byte, 1024*blockSize)
|
data := make([]byte, 1024*blockSize)
|
||||||
if _, err := rand.Read(data); err != nil {
|
if _, err := rand.Read(data); err != nil {
|
||||||
return fmt.Errorf("Was unable to gather random data: %s", err)
|
return errors.Wrap(err, "Unable to gather random data")
|
||||||
}
|
}
|
||||||
dataReader := bytes.NewReader(data)
|
dataReader := bytes.NewReader(data)
|
||||||
|
|
||||||
if err := s.connect(); err != nil {
|
if err := s.connect(); err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to connect")
|
||||||
}
|
}
|
||||||
defer s.conn.Close()
|
defer s.conn.Close()
|
||||||
|
|
||||||
if err := s.writeCommand("RCV"); err != nil {
|
if err := s.writeCommand("RCV"); err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to send RCV command")
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -57,7 +58,7 @@ func (s *sparkClient) runSendTest(t *testResult) error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("Error copying: %s", err)
|
return errors.Wrap(err, "Unable to copy data")
|
||||||
}
|
}
|
||||||
|
|
||||||
bps := float64(1024*blockSize*8) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds()))
|
bps := float64(1024*blockSize*8) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds()))
|
||||||
|
@ -69,7 +70,9 @@ func (s *sparkClient) runSendTest(t *testResult) error {
|
||||||
}
|
}
|
||||||
blockCount++
|
blockCount++
|
||||||
|
|
||||||
dataReader.Seek(0, 0)
|
if _, err := dataReader.Seek(0, 0); err != nil {
|
||||||
|
return errors.Wrap(err, "Unable to seek")
|
||||||
|
}
|
||||||
|
|
||||||
if time.Since(totalStart) > time.Duration(throughputTestLength)*time.Second {
|
if time.Since(totalStart) > time.Duration(throughputTestLength)*time.Second {
|
||||||
break
|
break
|
||||||
|
@ -84,12 +87,12 @@ func (s *sparkClient) runSendTest(t *testResult) error {
|
||||||
|
|
||||||
func (s *sparkClient) runRecvTest(t *testResult) error {
|
func (s *sparkClient) runRecvTest(t *testResult) error {
|
||||||
if err := s.connect(); err != nil {
|
if err := s.connect(); err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to connect")
|
||||||
}
|
}
|
||||||
defer s.conn.Close()
|
defer s.conn.Close()
|
||||||
|
|
||||||
if err := s.writeCommand("SND"); err != nil {
|
if err := s.writeCommand("SND"); err != nil {
|
||||||
return err
|
return errors.Wrap(err, "Unable to send SND command")
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -111,7 +114,7 @@ func (s *sparkClient) runRecvTest(t *testResult) error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("Error copying: %s", err)
|
return errors.Wrap(err, "Unable to copy data")
|
||||||
}
|
}
|
||||||
|
|
||||||
bps := float64(1024*blockSize*8) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds()))
|
bps := float64(1024*blockSize*8) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds()))
|
||||||
|
|
Loading…
Reference in a new issue