commit 5d9ae1fde52017def51066deedb43b2c342de1dd Author: Knut Ahlers Date: Wed Jul 26 11:29:10 2017 +0200 Initial version Signed-off-by: Knut Ahlers diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0ff7f9f --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +continuous-spark diff --git a/main.go b/main.go new file mode 100644 index 0000000..6e51c93 --- /dev/null +++ b/main.go @@ -0,0 +1,88 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "time" + + "github.com/Luzifer/rconfig" + "github.com/prometheus/client_golang/prometheus/promhttp" + log "github.com/sirupsen/logrus" +) + +var ( + cfg struct { + Hostname string `flag:"hostname" default:"" description:"Hostname / IP of the sparkyfish server" validate:"nonzero"` + Interval time.Duration `flag:"interval" default:"15m" description:"Interval to execute test in"` + Listen string `flag:"listen" default:":3000" description:"IP/Port to listen on"` + LogLevel string `flag:"log-level" default:"info" description:"Set log level (debug, info, warning, error)"` + Port int `flag:"port" default:"7121" description:"Port the sparkyfish server is running on"` + VersionAndExit bool `flag:"version" default:"false" description:"Print version information and exit"` + } + + version = "dev" +) + +func init() { + if err := rconfig.ParseAndValidate(&cfg); err != nil { + log.Fatalf("Unable to parse CLI parameters: %s", err) + } + + if cfg.VersionAndExit { + fmt.Printf("continuous-spark %s\n", version) + os.Exit(0) + } + + if l, err := log.ParseLevel(cfg.LogLevel); err == nil { + log.SetLevel(l) + } else { + log.Fatalf("Invalid log level: %s", err) + } +} + +func main() { + go func() { + if err := updateStats(execTest()); err != nil { + log.Error(err.Error()) + } + + for range time.Tick(cfg.Interval) { + if err := updateStats(execTest()); err != nil { + log.Error(err.Error()) + continue + } + } + }() + + http.Handle("/metrics", promhttp.Handler()) + http.ListenAndServe(cfg.Listen, nil) +} + +func updateStats(t *testResult, err error) error { + if err != nil { + return err + } + + pingAvg.Set(t.Ping.Avg) + thresholdAvg.WithLabelValues("recv").Set(t.Receive.Avg) + thresholdAvg.WithLabelValues("send").Set(t.Send.Avg) + + return nil +} + +func execTest() (*testResult, error) { + t := newTestResult() + + sc := newSparkClient(cfg.Hostname, cfg.Port) + if err := sc.ExecutePingTest(t); err != nil { + return nil, fmt.Errorf("Ping test fucked up: %s", err) + } + + if err := sc.ExecuteThroughputTest(t); err != nil { + return nil, fmt.Errorf("Throughput test fucked up: %s", err) + } + + log.Debugf("%s", t) + return t, nil +} diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..ceca6e0 --- /dev/null +++ b/metrics.go @@ -0,0 +1,19 @@ +package main + +import "github.com/prometheus/client_golang/prometheus" + +var ( + pingAvg = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "sparkyfish_ping_avg", + Help: "Average ping of the test run (ms)", + }) + thresholdAvg = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "sparkyfish_threshold_avg", + Help: "Average threshold of the test run (bps)", + }, []string{"direction"}) +) + +func init() { + prometheus.MustRegister(pingAvg) + prometheus.MustRegister(thresholdAvg) +} diff --git a/ping.go b/ping.go new file mode 100644 index 0000000..e820233 --- /dev/null +++ b/ping.go @@ -0,0 +1,89 @@ +package main + +import ( + "math" + "sort" + "time" +) + +type pingHistory []int64 + +func (s *sparkClient) ExecutePingTest(t *testResult) error { + ph := pingHistory{} + + if err := s.connect(); err != nil { + return err + } + + if err := s.writeCommand("ECO"); err != nil { + return err + } + + buf := make([]byte, 1) + + for i := 0; i < numPings; i++ { + start := time.Now() + s.conn.Write([]byte{46}) + + if _, err := s.conn.Read(buf); err != nil { + return err + } + + ph = append(ph, time.Since(start).Nanoseconds()/1000) + } + + ph = ph.toMilli() + + t.Ping.Min, t.Ping.Max = ph.minMax() + t.Ping.Avg = ph.mean() + t.Ping.Dev = ph.stdDev() + + return nil +} + +// toMilli Converts our ping history to milliseconds for display purposes +func (h *pingHistory) toMilli() []int64 { + var pingMilli []int64 + + for _, v := range *h { + pingMilli = append(pingMilli, v/1000) + } + + return pingMilli +} + +// mean generates a statistical mean of our historical ping times +func (h *pingHistory) mean() float64 { + var sum uint64 + for _, t := range *h { + sum = sum + uint64(t) + } + + return float64(sum / uint64(len(*h))) +} + +// variance calculates the variance of our historical ping times +func (h *pingHistory) variance() float64 { + var sqDevSum float64 + + mean := h.mean() + + for _, t := range *h { + sqDevSum = sqDevSum + math.Pow((float64(t)-mean), 2) + } + return sqDevSum / float64(len(*h)) +} + +// stdDev calculates the standard deviation of our historical ping times +func (h *pingHistory) stdDev() float64 { + return math.Sqrt(h.variance()) +} + +func (h *pingHistory) minMax() (float64, float64) { + var hist []int + for _, v := range *h { + hist = append(hist, int(v)) + } + sort.Ints(hist) + return float64(hist[0]), float64(hist[len(hist)-1]) +} diff --git a/spark.go b/spark.go new file mode 100644 index 0000000..07b5b14 --- /dev/null +++ b/spark.go @@ -0,0 +1,130 @@ +package main + +import ( + "bufio" + "fmt" + "math" + "net" + "strings" + + log "github.com/sirupsen/logrus" +) + +const ( + protocolVersion uint16 = 0x00 // Protocol Version + blockSize int64 = 200 // size (KB) of each block of data copied to/from remote + throughputTestLength uint = 10 // length of time to conduct each throughput test + maxPingTestLength uint = 10 // maximum time for ping test to complete + numPings int = 30 // number of pings to attempt + + KBPS = 1024.0 + MBPS = 1024.0 * KBPS +) + +type testResult struct { + Ping struct { + Min, Max, Avg, Dev float64 + } + Send struct { + Min, Max, Avg float64 + } + Receive struct { + Min, Max, Avg float64 + } +} + +func newTestResult() *testResult { + r := &testResult{} + r.Ping.Min = math.Inf(1) + r.Ping.Max = math.Inf(-1) + r.Send.Min = math.Inf(1) + r.Send.Max = math.Inf(-1) + r.Receive.Min = math.Inf(1) + r.Receive.Max = math.Inf(-1) + + return r +} + +func (t testResult) String() string { + return fmt.Sprintf("Ping(ms): min=%.2f max=%.2f avg=%.2f stddev=%.2f | Download(Mbps): min=%.2f max=%.2f avg=%.2f | Upload(Mbps): min=%.2f max=%.2f avg=%.2f", + t.Ping.Min, + t.Ping.Max, + t.Ping.Avg, + t.Ping.Dev, + t.Receive.Min/MBPS, + t.Receive.Max/MBPS, + t.Receive.Avg/MBPS, + t.Send.Min/MBPS, + t.Send.Max/MBPS, + t.Send.Avg/MBPS, + ) +} + +type sparkClient struct { + remote string + conn net.Conn + reader *bufio.Reader +} + +func newSparkClient(hostname string, port int) *sparkClient { + return &sparkClient{ + remote: fmt.Sprintf("%s:%d", hostname, port), + } +} + +func (s *sparkClient) dial() error { + c, err := net.Dial("tcp", s.remote) + if err != nil { + return err + } + s.conn = c + + return nil +} + +func (s *sparkClient) connect() error { + if err := s.dial(); err != nil { + return fmt.Errorf("Unable to connect to sparkyfish-server %q: %s", s.remote, err) + } + + s.reader = bufio.NewReader(s.conn) + + if err := s.writeCommand(fmt.Sprintf("HELO%d", protocolVersion)); err != nil { + return err + } + + return s.readGreeting() +} + +func (s *sparkClient) writeCommand(command string) error { + if _, err := fmt.Fprintf(s.conn, "%s\r\n", command); err != nil { + return fmt.Errorf("Unable to send command %q: %s", command, err) + } + return nil +} + +func (s *sparkClient) readGreeting() error { + if helo, err := s.reader.ReadString('\n'); err != nil || strings.TrimSpace(helo) != "HELO" { + return fmt.Errorf("Unexpected response to greeting") + } + + cn, err := s.reader.ReadString('\n') + if err != nil { + return err + } + + cn = strings.TrimSpace(cn) + if cn == "none" { + cn = s.remote + } + + loc, err := s.reader.ReadString('\n') + if err != nil { + return err + } + + loc = strings.TrimSpace(loc) + + log.Debugf("Connected to %q in location %q", cn, loc) + return nil +} diff --git a/throughput.go b/throughput.go new file mode 100644 index 0000000..e782321 --- /dev/null +++ b/throughput.go @@ -0,0 +1,135 @@ +package main + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "syscall" + "time" +) + +func (s *sparkClient) ExecuteThroughputTest(t *testResult) error { + if err := s.runSendTest(t); err != nil { + return err + } + return s.runRecvTest(t) +} + +func (s *sparkClient) runSendTest(t *testResult) error { + data := make([]byte, 1024*blockSize) + if _, err := rand.Read(data); err != nil { + return fmt.Errorf("Was unable to gather random data: %s", err) + } + dataReader := bytes.NewReader(data) + + if err := s.connect(); err != nil { + return err + } + defer s.conn.Close() + + if err := s.writeCommand("RCV"); err != nil { + return err + } + + var ( + blockCount int64 + totalStart = time.Now() + ) + + for { + start := time.Now() + + _, err := io.Copy(s.conn, dataReader) + if err != nil { + // If we get any of these errors, it probably just means that the server closed the connection + if err == io.EOF || err == io.ErrClosedPipe || err == syscall.EPIPE { + break + } + if operr, ok := err.(*net.OpError); ok { + log.Printf("%s", operr.Err) + } + + if operr, ok := err.(*net.OpError); ok && operr.Err.Error() == syscall.ECONNRESET.Error() { + break + } + + return fmt.Errorf("Error copying: %s", err) + } + + bps := float64(1024*blockSize*8) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds())) + if bps < t.Send.Min { + t.Send.Min = bps + } + if bps > t.Send.Max { + t.Send.Max = bps + } + blockCount++ + + dataReader.Seek(0, 0) + + if time.Since(totalStart) > time.Duration(throughputTestLength)*time.Second { + break + } + } + + // average bit per second + t.Send.Avg = float64(1024*blockSize*blockCount*8) / (float64(time.Since(totalStart).Nanoseconds()) / float64(time.Second.Nanoseconds())) + + return nil +} + +func (s *sparkClient) runRecvTest(t *testResult) error { + if err := s.connect(); err != nil { + return err + } + defer s.conn.Close() + + if err := s.writeCommand("SND"); err != nil { + return err + } + + var ( + blockCount int64 + totalStart = time.Now() + ) + + for { + start := time.Now() + + _, err := io.CopyN(ioutil.Discard, s.conn, 1024*blockSize) + if err != nil { + // If we get any of these errors, it probably just means that the server closed the connection + if err == io.EOF || err == io.ErrClosedPipe || err == syscall.EPIPE { + break + } + + if operr, ok := err.(*net.OpError); ok && operr.Err.Error() == syscall.ECONNRESET.Error() { + break + } + + return fmt.Errorf("Error copying: %s", err) + } + + bps := float64(1024*blockSize*8) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds())) + if bps < t.Receive.Min { + t.Receive.Min = bps + } + if bps > t.Receive.Max { + t.Receive.Max = bps + } + blockCount++ + + if time.Since(totalStart) > time.Duration(throughputTestLength)*time.Second { + break + } + } + + // average bit per second + t.Receive.Avg = float64(1024*blockSize*blockCount*8) / (float64(time.Since(totalStart).Nanoseconds()) / float64(time.Second.Nanoseconds())) + + return nil +}