1
0
Fork 0
mirror of https://github.com/Luzifer/continuous-spark.git synced 2024-10-18 12:54:24 +00:00

Initial version

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2017-07-26 11:29:10 +02:00
commit 5d9ae1fde5
Signed by: luzifer
GPG key ID: DC2729FDD34BE99E
6 changed files with 462 additions and 0 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
continuous-spark

88
main.go Normal file
View file

@ -0,0 +1,88 @@
package main
import (
"fmt"
"net/http"
"os"
"time"
"github.com/Luzifer/rconfig"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
)
var (
cfg struct {
Hostname string `flag:"hostname" default:"" description:"Hostname / IP of the sparkyfish server" validate:"nonzero"`
Interval time.Duration `flag:"interval" default:"15m" description:"Interval to execute test in"`
Listen string `flag:"listen" default:":3000" description:"IP/Port to listen on"`
LogLevel string `flag:"log-level" default:"info" description:"Set log level (debug, info, warning, error)"`
Port int `flag:"port" default:"7121" description:"Port the sparkyfish server is running on"`
VersionAndExit bool `flag:"version" default:"false" description:"Print version information and exit"`
}
version = "dev"
)
func init() {
if err := rconfig.ParseAndValidate(&cfg); err != nil {
log.Fatalf("Unable to parse CLI parameters: %s", err)
}
if cfg.VersionAndExit {
fmt.Printf("continuous-spark %s\n", version)
os.Exit(0)
}
if l, err := log.ParseLevel(cfg.LogLevel); err == nil {
log.SetLevel(l)
} else {
log.Fatalf("Invalid log level: %s", err)
}
}
func main() {
go func() {
if err := updateStats(execTest()); err != nil {
log.Error(err.Error())
}
for range time.Tick(cfg.Interval) {
if err := updateStats(execTest()); err != nil {
log.Error(err.Error())
continue
}
}
}()
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(cfg.Listen, nil)
}
func updateStats(t *testResult, err error) error {
if err != nil {
return err
}
pingAvg.Set(t.Ping.Avg)
thresholdAvg.WithLabelValues("recv").Set(t.Receive.Avg)
thresholdAvg.WithLabelValues("send").Set(t.Send.Avg)
return nil
}
func execTest() (*testResult, error) {
t := newTestResult()
sc := newSparkClient(cfg.Hostname, cfg.Port)
if err := sc.ExecutePingTest(t); err != nil {
return nil, fmt.Errorf("Ping test fucked up: %s", err)
}
if err := sc.ExecuteThroughputTest(t); err != nil {
return nil, fmt.Errorf("Throughput test fucked up: %s", err)
}
log.Debugf("%s", t)
return t, nil
}

19
metrics.go Normal file
View file

@ -0,0 +1,19 @@
package main
import "github.com/prometheus/client_golang/prometheus"
var (
pingAvg = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "sparkyfish_ping_avg",
Help: "Average ping of the test run (ms)",
})
thresholdAvg = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "sparkyfish_threshold_avg",
Help: "Average threshold of the test run (bps)",
}, []string{"direction"})
)
func init() {
prometheus.MustRegister(pingAvg)
prometheus.MustRegister(thresholdAvg)
}

89
ping.go Normal file
View file

@ -0,0 +1,89 @@
package main
import (
"math"
"sort"
"time"
)
type pingHistory []int64
func (s *sparkClient) ExecutePingTest(t *testResult) error {
ph := pingHistory{}
if err := s.connect(); err != nil {
return err
}
if err := s.writeCommand("ECO"); err != nil {
return err
}
buf := make([]byte, 1)
for i := 0; i < numPings; i++ {
start := time.Now()
s.conn.Write([]byte{46})
if _, err := s.conn.Read(buf); err != nil {
return err
}
ph = append(ph, time.Since(start).Nanoseconds()/1000)
}
ph = ph.toMilli()
t.Ping.Min, t.Ping.Max = ph.minMax()
t.Ping.Avg = ph.mean()
t.Ping.Dev = ph.stdDev()
return nil
}
// toMilli Converts our ping history to milliseconds for display purposes
func (h *pingHistory) toMilli() []int64 {
var pingMilli []int64
for _, v := range *h {
pingMilli = append(pingMilli, v/1000)
}
return pingMilli
}
// mean generates a statistical mean of our historical ping times
func (h *pingHistory) mean() float64 {
var sum uint64
for _, t := range *h {
sum = sum + uint64(t)
}
return float64(sum / uint64(len(*h)))
}
// variance calculates the variance of our historical ping times
func (h *pingHistory) variance() float64 {
var sqDevSum float64
mean := h.mean()
for _, t := range *h {
sqDevSum = sqDevSum + math.Pow((float64(t)-mean), 2)
}
return sqDevSum / float64(len(*h))
}
// stdDev calculates the standard deviation of our historical ping times
func (h *pingHistory) stdDev() float64 {
return math.Sqrt(h.variance())
}
func (h *pingHistory) minMax() (float64, float64) {
var hist []int
for _, v := range *h {
hist = append(hist, int(v))
}
sort.Ints(hist)
return float64(hist[0]), float64(hist[len(hist)-1])
}

130
spark.go Normal file
View file

@ -0,0 +1,130 @@
package main
import (
"bufio"
"fmt"
"math"
"net"
"strings"
log "github.com/sirupsen/logrus"
)
const (
protocolVersion uint16 = 0x00 // Protocol Version
blockSize int64 = 200 // size (KB) of each block of data copied to/from remote
throughputTestLength uint = 10 // length of time to conduct each throughput test
maxPingTestLength uint = 10 // maximum time for ping test to complete
numPings int = 30 // number of pings to attempt
KBPS = 1024.0
MBPS = 1024.0 * KBPS
)
type testResult struct {
Ping struct {
Min, Max, Avg, Dev float64
}
Send struct {
Min, Max, Avg float64
}
Receive struct {
Min, Max, Avg float64
}
}
func newTestResult() *testResult {
r := &testResult{}
r.Ping.Min = math.Inf(1)
r.Ping.Max = math.Inf(-1)
r.Send.Min = math.Inf(1)
r.Send.Max = math.Inf(-1)
r.Receive.Min = math.Inf(1)
r.Receive.Max = math.Inf(-1)
return r
}
func (t testResult) String() string {
return fmt.Sprintf("Ping(ms): min=%.2f max=%.2f avg=%.2f stddev=%.2f | Download(Mbps): min=%.2f max=%.2f avg=%.2f | Upload(Mbps): min=%.2f max=%.2f avg=%.2f",
t.Ping.Min,
t.Ping.Max,
t.Ping.Avg,
t.Ping.Dev,
t.Receive.Min/MBPS,
t.Receive.Max/MBPS,
t.Receive.Avg/MBPS,
t.Send.Min/MBPS,
t.Send.Max/MBPS,
t.Send.Avg/MBPS,
)
}
type sparkClient struct {
remote string
conn net.Conn
reader *bufio.Reader
}
func newSparkClient(hostname string, port int) *sparkClient {
return &sparkClient{
remote: fmt.Sprintf("%s:%d", hostname, port),
}
}
func (s *sparkClient) dial() error {
c, err := net.Dial("tcp", s.remote)
if err != nil {
return err
}
s.conn = c
return nil
}
func (s *sparkClient) connect() error {
if err := s.dial(); err != nil {
return fmt.Errorf("Unable to connect to sparkyfish-server %q: %s", s.remote, err)
}
s.reader = bufio.NewReader(s.conn)
if err := s.writeCommand(fmt.Sprintf("HELO%d", protocolVersion)); err != nil {
return err
}
return s.readGreeting()
}
func (s *sparkClient) writeCommand(command string) error {
if _, err := fmt.Fprintf(s.conn, "%s\r\n", command); err != nil {
return fmt.Errorf("Unable to send command %q: %s", command, err)
}
return nil
}
func (s *sparkClient) readGreeting() error {
if helo, err := s.reader.ReadString('\n'); err != nil || strings.TrimSpace(helo) != "HELO" {
return fmt.Errorf("Unexpected response to greeting")
}
cn, err := s.reader.ReadString('\n')
if err != nil {
return err
}
cn = strings.TrimSpace(cn)
if cn == "none" {
cn = s.remote
}
loc, err := s.reader.ReadString('\n')
if err != nil {
return err
}
loc = strings.TrimSpace(loc)
log.Debugf("Connected to %q in location %q", cn, loc)
return nil
}

135
throughput.go Normal file
View file

@ -0,0 +1,135 @@
package main
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"syscall"
"time"
)
func (s *sparkClient) ExecuteThroughputTest(t *testResult) error {
if err := s.runSendTest(t); err != nil {
return err
}
return s.runRecvTest(t)
}
func (s *sparkClient) runSendTest(t *testResult) error {
data := make([]byte, 1024*blockSize)
if _, err := rand.Read(data); err != nil {
return fmt.Errorf("Was unable to gather random data: %s", err)
}
dataReader := bytes.NewReader(data)
if err := s.connect(); err != nil {
return err
}
defer s.conn.Close()
if err := s.writeCommand("RCV"); err != nil {
return err
}
var (
blockCount int64
totalStart = time.Now()
)
for {
start := time.Now()
_, err := io.Copy(s.conn, dataReader)
if err != nil {
// If we get any of these errors, it probably just means that the server closed the connection
if err == io.EOF || err == io.ErrClosedPipe || err == syscall.EPIPE {
break
}
if operr, ok := err.(*net.OpError); ok {
log.Printf("%s", operr.Err)
}
if operr, ok := err.(*net.OpError); ok && operr.Err.Error() == syscall.ECONNRESET.Error() {
break
}
return fmt.Errorf("Error copying: %s", err)
}
bps := float64(1024*blockSize*8) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds()))
if bps < t.Send.Min {
t.Send.Min = bps
}
if bps > t.Send.Max {
t.Send.Max = bps
}
blockCount++
dataReader.Seek(0, 0)
if time.Since(totalStart) > time.Duration(throughputTestLength)*time.Second {
break
}
}
// average bit per second
t.Send.Avg = float64(1024*blockSize*blockCount*8) / (float64(time.Since(totalStart).Nanoseconds()) / float64(time.Second.Nanoseconds()))
return nil
}
func (s *sparkClient) runRecvTest(t *testResult) error {
if err := s.connect(); err != nil {
return err
}
defer s.conn.Close()
if err := s.writeCommand("SND"); err != nil {
return err
}
var (
blockCount int64
totalStart = time.Now()
)
for {
start := time.Now()
_, err := io.CopyN(ioutil.Discard, s.conn, 1024*blockSize)
if err != nil {
// If we get any of these errors, it probably just means that the server closed the connection
if err == io.EOF || err == io.ErrClosedPipe || err == syscall.EPIPE {
break
}
if operr, ok := err.(*net.OpError); ok && operr.Err.Error() == syscall.ECONNRESET.Error() {
break
}
return fmt.Errorf("Error copying: %s", err)
}
bps := float64(1024*blockSize*8) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds()))
if bps < t.Receive.Min {
t.Receive.Min = bps
}
if bps > t.Receive.Max {
t.Receive.Max = bps
}
blockCount++
if time.Since(totalStart) > time.Duration(throughputTestLength)*time.Second {
break
}
}
// average bit per second
t.Receive.Avg = float64(1024*blockSize*blockCount*8) / (float64(time.Since(totalStart).Nanoseconds()) / float64(time.Second.Nanoseconds()))
return nil
}