diff --git a/go.mod b/go.mod index 2992360..4455bb3 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ toolchain go1.23.4 require ( github.com/Luzifer/rconfig v1.2.0 github.com/influxdata/influxdb v1.11.8 - github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.3 ) diff --git a/go.sum b/go.sum index c7df2b4..62a792c 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,6 @@ github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= diff --git a/influx.go b/influx.go index 33edcab..f9bdac0 100644 --- a/influx.go +++ b/influx.go @@ -1,14 +1,15 @@ package main import ( + "fmt" "sync" "time" influx "github.com/influxdata/influxdb/client/v2" - "github.com/pkg/errors" ) const ( + influxTimeout = 2 * time.Second influxWriteInterval = 10 * time.Second ) @@ -23,7 +24,7 @@ type metricsSender struct { func newMetricsSender(influxHost, influxUser, influxPass, influxDatabase string) (*metricsSender, error) { out := &metricsSender{ - errs: make(chan error, 10), + errs: make(chan error, 1), influxDB: influxDatabase, } return out, out.initialize(influxHost, influxUser, influxPass) @@ -33,14 +34,17 @@ func (m *metricsSender) Errors() <-chan error { return m.errs } -func (m *metricsSender) ForceTransmit() error { - return errors.Wrap(m.transmit(), "Unable to transmit recorded points") +func (m *metricsSender) ForceTransmit() (err error) { + if err = m.transmit(); err != nil { + return fmt.Errorf("transmitting recorded points: %w", err) + } + return nil } func (m *metricsSender) RecordPoint(name string, tags map[string]string, fields map[string]interface{}) error { pt, err := influx.NewPoint(name, tags, fields, time.Now()) if err != nil { - return errors.Wrap(err, "Unable to create point") + return fmt.Errorf("creating point: %w", err) } m.batchLock.Lock() @@ -54,9 +58,8 @@ func (m *metricsSender) resetBatch() error { b, err := influx.NewBatchPoints(influx.BatchPointsConfig{ Database: m.influxDB, }) - if err != nil { - return errors.Wrap(err, "Unable to create new points batch") + return fmt.Errorf("creating points batch: %w", err) } m.batch = b @@ -65,22 +68,25 @@ func (m *metricsSender) resetBatch() error { func (m *metricsSender) sendLoop() { for range time.Tick(influxWriteInterval) { - if err := m.transmit(); err != nil { m.errs <- err } - } } -func (m *metricsSender) transmit() error { +func (m *metricsSender) transmit() (err error) { m.batchLock.Lock() defer m.batchLock.Unlock() - if err := m.client.Write(m.batch); err != nil { - return errors.Wrap(err, "Unable to write recorded points") + if err = m.client.Write(m.batch); err != nil { + return fmt.Errorf("writing recorded points: %w", err) } - return errors.Wrap(m.resetBatch(), "Unable to reset batch") + + if err = m.resetBatch(); err != nil { + return fmt.Errorf("resetting batch: %w", err) + } + + return nil } func (m *metricsSender) initialize(influxHost, influxUser, influxPass string) error { @@ -88,16 +94,15 @@ func (m *metricsSender) initialize(influxHost, influxUser, influxPass string) er Addr: influxHost, Username: influxUser, Password: influxPass, - Timeout: 2 * time.Second, + Timeout: influxTimeout, }) - if err != nil { - return errors.Wrap(err, "Unable to create InfluxDB HTTP client") + return fmt.Errorf("creating InfluxDB client: %w", err) } m.client = influxClient - if err := m.resetBatch(); err != nil { - return errors.Wrap(err, "Unable to reset batch") + if err = m.resetBatch(); err != nil { + return fmt.Errorf("resetting batch: %w", err) } go m.sendLoop() diff --git a/main.go b/main.go index cca0edd..6fabaa4 100644 --- a/main.go +++ b/main.go @@ -2,16 +2,16 @@ package main import ( "fmt" - "io/ioutil" "os" "time" - "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "github.com/Luzifer/rconfig" ) +const tsvPermission = 0o600 + var ( cfg struct { InfluxDB string `flag:"influx-db" default:"" description:"Name of the database to write to (if unset, InfluxDB feature is disabled)"` @@ -33,47 +33,52 @@ var ( version = "dev" ) -func init() { - if err := rconfig.ParseAndValidate(&cfg); err != nil { - log.Fatalf("Unable to parse CLI parameters: %s", err) +func initApp() (err error) { + if err = rconfig.ParseAndValidate(&cfg); err != nil { + return fmt.Errorf("parsing CLI params: %w", err) } - if cfg.VersionAndExit { - fmt.Printf("continuous-spark %s\n", version) - os.Exit(0) + l, err := logrus.ParseLevel(cfg.LogLevel) + if err != nil { + return fmt.Errorf("parsing log-level: %w", err) } + logrus.SetLevel(l) - if l, err := log.ParseLevel(cfg.LogLevel); err == nil { - log.SetLevel(l) - } else { - log.Fatalf("Invalid log level: %s", err) - } + return nil } func main() { var err error + if err = initApp(); err != nil { + logrus.WithError(err).Fatal("initializing app") + } + + if cfg.VersionAndExit { + fmt.Printf("continuous-spark %s\n", version) //nolint:forbidigo + os.Exit(0) + } if cfg.InfluxDB != "" { if metrics, err = newMetricsSender(cfg.InfluxHost, cfg.InfluxUser, cfg.InfluxPass, cfg.InfluxDB); err != nil { - log.WithError(err).Fatalf("Unable to initialize InfluxDB sender") + logrus.WithError(err).Fatalf("initializing InfluxDB sender") } go func() { for err := range metrics.Errors() { - log.WithError(err).Error("Unable to transmit metrics") + logrus.WithError(err).Error("transmitting metrics") } }() } if err := updateStats(execTest()); err != nil { - log.WithError(err).Error("Unable to update stats") + logrus.WithError(err).Error("updating stats") } if cfg.OneShot { // Return before loop for oneshot execution if metrics != nil { if err := metrics.ForceTransmit(); err != nil { - log.WithError(err).Error("Unable to store metrics") + logrus.WithError(err).Error("storing metrics") } } return @@ -81,19 +86,19 @@ func main() { for range time.Tick(cfg.Interval) { if err := updateStats(execTest()); err != nil { - log.WithError(err).Error("Unable to update stats") + logrus.WithError(err).Error("updating stats") } } } func updateStats(t *testResult, err error) error { if err != nil { - return errors.Wrap(err, "Got error from test function") + return err } hostname, err := os.Hostname() if err != nil { - return errors.Wrap(err, "Unable to get local hostname") + return fmt.Errorf("getting hostname: %w", err) } if metrics != nil { @@ -110,7 +115,7 @@ func updateStats(t *testResult, err error) error { "dev": t.Ping.Dev, }, ); err != nil { - return errors.Wrap(err, "Unable to record 'ping' metric") + return fmt.Errorf("recording ping-metric: %w", err) } if err := metrics.RecordPoint( @@ -126,7 +131,7 @@ func updateStats(t *testResult, err error) error { "max": t.Receive.Max, }, ); err != nil { - return errors.Wrap(err, "Unable to record 'down' metric") + return fmt.Errorf("recording down-metric: %w", err) } if err := metrics.RecordPoint( @@ -142,33 +147,37 @@ func updateStats(t *testResult, err error) error { "max": t.Send.Max, }, ); err != nil { - return errors.Wrap(err, "Unable to record 'up' metric") + return fmt.Errorf("recording up-metric: %w", err) } } if cfg.TSVFile != "" { if err := writeTSV(t); err != nil { - return errors.Wrap(err, "Unable to write TSV file") + return fmt.Errorf("writing TSV file: %w", err) } } return nil } -func writeTSV(t *testResult) error { - if _, err := os.Stat(cfg.TSVFile); err != nil && os.IsNotExist(err) { - if err := ioutil.WriteFile(cfg.TSVFile, []byte("Date\tPing Min (ms)\tPing Avg (ms)\tPing Max (ms)\tPing StdDev (ms)\tRX Avg (bps)\tTX Avg (bps)\n"), 0o644); err != nil { - return errors.Wrap(err, "Unable to write initial TSV headers") +func writeTSV(t *testResult) (err error) { + if _, err = os.Stat(cfg.TSVFile); err != nil && os.IsNotExist(err) { + if err = os.WriteFile(cfg.TSVFile, []byte("Date\tPing Min (ms)\tPing Avg (ms)\tPing Max (ms)\tPing StdDev (ms)\tRX Avg (bps)\tTX Avg (bps)\n"), tsvPermission); err != nil { + return fmt.Errorf("writing TSV headers: %w", err) } } f, err := os.OpenFile(cfg.TSVFile, os.O_APPEND|os.O_WRONLY, os.ModeAppend) if err != nil { - return errors.Wrap(err, "Unable to open TSV file") + return fmt.Errorf("opening TSV file: %w", err) } - defer f.Close() + defer func() { + if err := f.Close(); err != nil { + logrus.WithError(err).Error("closing TSV file (leaked fd)") + } + }() - _, err = fmt.Fprintf(f, "%s\t%.2f\t%.2f\t%.2f\t%.2f\t%.0f\t%.0f\n", + if _, err = fmt.Fprintf(f, "%s\t%.2f\t%.2f\t%.2f\t%.2f\t%.0f\t%.0f\n", time.Now().Format(time.RFC3339), t.Ping.Min, t.Ping.Avg, @@ -176,9 +185,11 @@ func writeTSV(t *testResult) error { t.Ping.Dev, t.Receive.Avg, t.Send.Avg, - ) + ); err != nil { + return fmt.Errorf("writing measurement: %w", err) + } - return errors.Wrap(err, "Unable to write measurement to TSV file") + return nil } func execTest() (*testResult, error) { @@ -186,13 +197,13 @@ func execTest() (*testResult, error) { sc := newSparkClient(cfg.Server, cfg.Port, cfg.Interface) if err := sc.ExecutePingTest(t); err != nil { - return nil, errors.Wrap(err, "Ping-test failed") + return nil, fmt.Errorf("executing ping-test: %w", err) } if err := sc.ExecuteThroughputTest(t); err != nil { - return nil, errors.Wrap(err, "Throughput test failed") + return nil, fmt.Errorf("executing throughput-test: %w", err) } - log.Debugf("%s", t) + logrus.Debugf("%s", t) return t, nil } diff --git a/ping.go b/ping.go index 159cebe..0b9e610 100644 --- a/ping.go +++ b/ping.go @@ -1,24 +1,23 @@ package main import ( + "fmt" "math" "sort" "time" - - "github.com/pkg/errors" ) type pingHistory []int64 -func (s *sparkClient) ExecutePingTest(t *testResult) error { +func (s *sparkClient) ExecutePingTest(t *testResult) (err error) { ph := pingHistory{} - if err := s.connect(); err != nil { - return errors.Wrap(err, "Unable to connect") + if err = s.connect(); err != nil { + return fmt.Errorf("connecting: %w", err) } - if err := s.writeCommand("ECO"); err != nil { - return errors.Wrap(err, "Unable to send ECO command") + if err = s.writeCommand("ECO"); err != nil { + return fmt.Errorf("writing ECO command: %w", err) } buf := make([]byte, 1) @@ -26,14 +25,14 @@ func (s *sparkClient) ExecutePingTest(t *testResult) error { for i := 0; i < numPings; i++ { start := time.Now() if _, err := s.conn.Write([]byte{46}); err != nil { - return err + return fmt.Errorf("writing ping byte: %w", err) } if _, err := s.conn.Read(buf); err != nil { - return err + return fmt.Errorf("reading ping response: %w", err) } - ph = append(ph, time.Since(start).Nanoseconds()/1000) + ph = append(ph, time.Since(start).Microseconds()) } ph = ph.toMilli() @@ -50,7 +49,7 @@ func (h *pingHistory) toMilli() []int64 { var pingMilli []int64 for _, v := range *h { - pingMilli = append(pingMilli, v/1000) + pingMilli = append(pingMilli, (time.Duration(v) * time.Microsecond).Milliseconds()) } return pingMilli @@ -58,12 +57,12 @@ func (h *pingHistory) toMilli() []int64 { // mean generates a statistical mean of our historical ping times func (h *pingHistory) mean() float64 { - var sum uint64 + var sum int64 for _, t := range *h { - sum = sum + uint64(t) + sum += t } - return float64(sum / uint64(len(*h))) + return float64(sum / int64(len(*h))) } // variance calculates the variance of our historical ping times @@ -73,7 +72,7 @@ func (h *pingHistory) variance() float64 { mean := h.mean() for _, t := range *h { - sqDevSum = sqDevSum + math.Pow((float64(t)-mean), 2) + sqDevSum += math.Pow((float64(t) - mean), 2) //nolint:mnd } return sqDevSum / float64(len(*h)) } @@ -83,7 +82,7 @@ func (h *pingHistory) stdDev() float64 { return math.Sqrt(h.variance()) } -func (h *pingHistory) minMax() (float64, float64) { +func (h *pingHistory) minMax() (minPing float64, maxPing float64) { var hist []int for _, v := range *h { hist = append(hist, int(v)) diff --git a/spark.go b/spark.go index e63b7bd..15ed8d0 100644 --- a/spark.go +++ b/spark.go @@ -7,8 +7,7 @@ import ( "net" "strings" - "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" ) const ( @@ -80,60 +79,60 @@ func (s *sparkClient) dial() error { if s.bindInterfaceName != "" { iface, err := net.InterfaceByName(s.bindInterfaceName) if err != nil { - return errors.Wrap(err, "select interface") + return fmt.Errorf("selecting interface: %w", err) } addrs, err := iface.Addrs() if err != nil { - return errors.Wrap(err, "get interface IPs") + return fmt.Errorf("getting interface IPs: %w", err) } if len(addrs) == 0 { - return errors.New("no addresses found on interface") + return fmt.Errorf("no addresses found on interface") } d.LocalAddr = &net.TCPAddr{IP: addrs[0].(*net.IPNet).IP} - log.WithField("ip", d.LocalAddr).Warn("Set local address") + logrus.WithField("ip", d.LocalAddr).Warn("Set local address") } c, err := d.Dial("tcp", s.remote) if err != nil { - return errors.Wrap(err, "Unable to dial") + return fmt.Errorf("dialing remote server: %w", err) } s.conn = c return nil } -func (s *sparkClient) connect() error { - if err := s.dial(); err != nil { - return errors.Wrapf(err, "Unable to connect to sparkyfish-server %q", s.remote) +func (s *sparkClient) connect() (err error) { + if err = s.dial(); err != nil { + return fmt.Errorf("connecting to remote %q: %w", s.remote, err) } s.reader = bufio.NewReader(s.conn) - if err := s.writeCommand(fmt.Sprintf("HELO%d", protocolVersion)); err != nil { - return errors.Wrap(err, "Unable to send HELO command") + if err = s.writeCommand(fmt.Sprintf("HELO%d", protocolVersion)); err != nil { + return fmt.Errorf("writing HELO command: %w", err) } return s.readGreeting() } -func (s *sparkClient) writeCommand(command string) error { - if _, err := fmt.Fprintf(s.conn, "%s\r\n", command); err != nil { - return errors.Wrapf(err, "Unable to send command %q", command) +func (s *sparkClient) writeCommand(command string) (err error) { + if _, err = fmt.Fprintf(s.conn, "%s\r\n", command); err != nil { + return fmt.Errorf("sending command %q: %w", command, err) } return nil } func (s *sparkClient) readGreeting() error { if helo, err := s.reader.ReadString('\n'); err != nil || strings.TrimSpace(helo) != "HELO" { - return errors.New("Unexpected response to greeting") + return fmt.Errorf("unexpected response to greeting") } cn, err := s.reader.ReadString('\n') if err != nil { - return errors.Wrap(err, "Unable to read string") + return fmt.Errorf("reading cn string: %w", err) } cn = strings.TrimSpace(cn) @@ -143,14 +142,15 @@ func (s *sparkClient) readGreeting() error { loc, err := s.reader.ReadString('\n') if err != nil { - return errors.Wrap(err, "Unable to read string") + return fmt.Errorf("reading loc string: %w", err) } loc = strings.TrimSpace(loc) - log.WithFields(log.Fields{ + logrus.WithFields(logrus.Fields{ "cn": cn, "location": loc, }).Debug("Connected to server") + return nil } diff --git a/throughput.go b/throughput.go index 69bd23f..091a644 100644 --- a/throughput.go +++ b/throughput.go @@ -3,37 +3,51 @@ package main import ( "bytes" "crypto/rand" + "fmt" "io" - "io/ioutil" - "log" "net" "syscall" "time" - "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) -func (s *sparkClient) ExecuteThroughputTest(t *testResult) error { - if err := s.runSendTest(t); err != nil { - return errors.Wrap(err, "Send-test failed") +const ( + throughputBufferSize = 1024 * blockSize + throughputBufferSizeBits = throughputBufferSize * 8 +) + +func (s *sparkClient) ExecuteThroughputTest(t *testResult) (err error) { + if err = s.runSendTest(t); err != nil { + return fmt.Errorf("running send-test: %w", err) } - return errors.Wrap(s.runRecvTest(t), "Recv-test failed") + + if err = s.runRecvTest(t); err != nil { + return fmt.Errorf("running recv-test: %w", err) + } + + return nil } -func (s *sparkClient) runSendTest(t *testResult) error { - data := make([]byte, 1024*blockSize) - if _, err := rand.Read(data); err != nil { - return errors.Wrap(err, "Unable to gather random data") +//nolint:gocyclo +func (s *sparkClient) runSendTest(t *testResult) (err error) { + data := make([]byte, throughputBufferSize) + if _, err = rand.Read(data); err != nil { + return fmt.Errorf("gathering random data: %w", err) } dataReader := bytes.NewReader(data) - if err := s.connect(); err != nil { - return errors.Wrap(err, "Unable to connect") + if err = s.connect(); err != nil { + return fmt.Errorf("establishing connection: %w", err) } - defer s.conn.Close() + defer func() { + if err := s.conn.Close(); err != nil { + logrus.WithError(err).Error("closing connection (leaked fd)") + } + }() - if err := s.writeCommand("RCV"); err != nil { - return errors.Wrap(err, "Unable to send RCV command") + if err = s.writeCommand("RCV"); err != nil { + return fmt.Errorf("sending RCV command: %w", err) } var ( @@ -44,24 +58,24 @@ func (s *sparkClient) runSendTest(t *testResult) error { for { start := time.Now() - _, err := io.Copy(s.conn, dataReader) - if err != nil { + if _, err = io.Copy(s.conn, dataReader); err != nil { // If we get any of these errors, it probably just means that the server closed the connection if err == io.EOF || err == io.ErrClosedPipe || err == syscall.EPIPE { break } + if operr, ok := err.(*net.OpError); ok { - log.Printf("%s", operr.Err) + logrus.Printf("%s", operr.Err) } if operr, ok := err.(*net.OpError); ok && operr.Err.Error() == syscall.ECONNRESET.Error() { break } - return errors.Wrap(err, "Unable to copy data") + return fmt.Errorf("copying data: %w", err) } - bps := float64(1024*blockSize*8) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds())) + bps := float64(throughputBufferSizeBits) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds())) if bps < t.Send.Min { t.Send.Min = bps } @@ -71,7 +85,7 @@ func (s *sparkClient) runSendTest(t *testResult) error { blockCount++ if _, err := dataReader.Seek(0, 0); err != nil { - return errors.Wrap(err, "Unable to seek") + return fmt.Errorf("seeking data reader: %w", err) } if time.Since(totalStart) > time.Duration(throughputTestLength)*time.Second { @@ -80,19 +94,23 @@ func (s *sparkClient) runSendTest(t *testResult) error { } // average bit per second - t.Send.Avg = float64(1024*blockSize*blockCount*8) / (float64(time.Since(totalStart).Nanoseconds()) / float64(time.Second.Nanoseconds())) + t.Send.Avg = float64(throughputBufferSizeBits) / (float64(time.Since(totalStart).Nanoseconds()) / float64(time.Second.Nanoseconds())) return nil } -func (s *sparkClient) runRecvTest(t *testResult) error { - if err := s.connect(); err != nil { - return errors.Wrap(err, "Unable to connect") +func (s *sparkClient) runRecvTest(t *testResult) (err error) { + if err = s.connect(); err != nil { + return fmt.Errorf("establishing connection: %w", err) } - defer s.conn.Close() + defer func() { + if err := s.conn.Close(); err != nil { + logrus.WithError(err).Error("closing connection (leaked fd)") + } + }() - if err := s.writeCommand("SND"); err != nil { - return errors.Wrap(err, "Unable to send SND command") + if err = s.writeCommand("SND"); err != nil { + return fmt.Errorf("writing SND command: %w", err) } var ( @@ -103,8 +121,7 @@ func (s *sparkClient) runRecvTest(t *testResult) error { for { start := time.Now() - _, err := io.CopyN(ioutil.Discard, s.conn, 1024*blockSize) - if err != nil { + if _, err = io.CopyN(io.Discard, s.conn, throughputBufferSize); err != nil { // If we get any of these errors, it probably just means that the server closed the connection if err == io.EOF || err == io.ErrClosedPipe || err == syscall.EPIPE { break @@ -114,10 +131,10 @@ func (s *sparkClient) runRecvTest(t *testResult) error { break } - return errors.Wrap(err, "Unable to copy data") + return fmt.Errorf("copying data: %w", err) } - bps := float64(1024*blockSize*8) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds())) + bps := float64(throughputBufferSizeBits) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds())) if bps < t.Receive.Min { t.Receive.Min = bps } @@ -132,7 +149,7 @@ func (s *sparkClient) runRecvTest(t *testResult) error { } // average bit per second - t.Receive.Avg = float64(1024*blockSize*blockCount*8) / (float64(time.Since(totalStart).Nanoseconds()) / float64(time.Second.Nanoseconds())) + t.Receive.Avg = float64(throughputBufferSizeBits) / (float64(time.Since(totalStart).Nanoseconds()) / float64(time.Second.Nanoseconds())) return nil }