1
0
Fork 0
mirror of https://github.com/Luzifer/continuous-spark.git synced 2024-12-20 17:51:22 +00:00
continuous-spark/throughput.go

156 lines
3.6 KiB
Go
Raw Normal View History

package main
import (
"bytes"
"crypto/rand"
2024-12-12 12:12:29 +00:00
"fmt"
"io"
"net"
"syscall"
"time"
2024-12-12 12:12:29 +00:00
"github.com/sirupsen/logrus"
)
2024-12-12 12:12:29 +00:00
const (
throughputBufferSize = 1024 * blockSize
throughputBufferSizeBits = throughputBufferSize * 8
)
func (s *sparkClient) ExecuteThroughputTest(t *testResult) (err error) {
if err = s.runSendTest(t); err != nil {
return fmt.Errorf("running send-test: %w", err)
}
2024-12-12 12:12:29 +00:00
if err = s.runRecvTest(t); err != nil {
return fmt.Errorf("running recv-test: %w", err)
}
return nil
}
2024-12-12 12:12:29 +00:00
//nolint:gocyclo
func (s *sparkClient) runSendTest(t *testResult) (err error) {
data := make([]byte, throughputBufferSize)
if _, err = rand.Read(data); err != nil {
return fmt.Errorf("gathering random data: %w", err)
}
dataReader := bytes.NewReader(data)
2024-12-12 12:12:29 +00:00
if err = s.connect(); err != nil {
return fmt.Errorf("establishing connection: %w", err)
}
2024-12-12 12:12:29 +00:00
defer func() {
if err := s.conn.Close(); err != nil {
logrus.WithError(err).Error("closing connection (leaked fd)")
}
}()
2024-12-12 12:12:29 +00:00
if err = s.writeCommand("RCV"); err != nil {
return fmt.Errorf("sending RCV command: %w", err)
}
var (
blockCount int64
totalStart = time.Now()
)
for {
start := time.Now()
2024-12-12 12:12:29 +00:00
if _, err = io.Copy(s.conn, dataReader); err != nil {
// If we get any of these errors, it probably just means that the server closed the connection
if err == io.EOF || err == io.ErrClosedPipe || err == syscall.EPIPE {
break
}
2024-12-12 12:12:29 +00:00
if operr, ok := err.(*net.OpError); ok {
2024-12-12 12:12:29 +00:00
logrus.Printf("%s", operr.Err)
}
if operr, ok := err.(*net.OpError); ok && operr.Err.Error() == syscall.ECONNRESET.Error() {
break
}
2024-12-12 12:12:29 +00:00
return fmt.Errorf("copying data: %w", err)
}
2024-12-12 12:12:29 +00:00
bps := float64(throughputBufferSizeBits) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds()))
if bps < t.Send.Min {
t.Send.Min = bps
}
if bps > t.Send.Max {
t.Send.Max = bps
}
blockCount++
if _, err := dataReader.Seek(0, 0); err != nil {
2024-12-12 12:12:29 +00:00
return fmt.Errorf("seeking data reader: %w", err)
}
if time.Since(totalStart) > time.Duration(throughputTestLength)*time.Second {
break
}
}
// average bit per second
2024-12-12 12:12:29 +00:00
t.Send.Avg = float64(throughputBufferSizeBits) / (float64(time.Since(totalStart).Nanoseconds()) / float64(time.Second.Nanoseconds()))
return nil
}
2024-12-12 12:12:29 +00:00
func (s *sparkClient) runRecvTest(t *testResult) (err error) {
if err = s.connect(); err != nil {
return fmt.Errorf("establishing connection: %w", err)
}
2024-12-12 12:12:29 +00:00
defer func() {
if err := s.conn.Close(); err != nil {
logrus.WithError(err).Error("closing connection (leaked fd)")
}
}()
2024-12-12 12:12:29 +00:00
if err = s.writeCommand("SND"); err != nil {
return fmt.Errorf("writing SND command: %w", err)
}
var (
blockCount int64
totalStart = time.Now()
)
for {
start := time.Now()
2024-12-12 12:12:29 +00:00
if _, err = io.CopyN(io.Discard, s.conn, throughputBufferSize); err != nil {
// If we get any of these errors, it probably just means that the server closed the connection
if err == io.EOF || err == io.ErrClosedPipe || err == syscall.EPIPE {
break
}
if operr, ok := err.(*net.OpError); ok && operr.Err.Error() == syscall.ECONNRESET.Error() {
break
}
2024-12-12 12:12:29 +00:00
return fmt.Errorf("copying data: %w", err)
}
2024-12-12 12:12:29 +00:00
bps := float64(throughputBufferSizeBits) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds()))
if bps < t.Receive.Min {
t.Receive.Min = bps
}
if bps > t.Receive.Max {
t.Receive.Max = bps
}
blockCount++
if time.Since(totalStart) > time.Duration(throughputTestLength)*time.Second {
break
}
}
// average bit per second
2024-12-12 12:12:29 +00:00
t.Receive.Avg = float64(throughputBufferSizeBits) / (float64(time.Since(totalStart).Nanoseconds()) / float64(time.Second.Nanoseconds()))
return nil
}