mirror of
https://github.com/Luzifer/continuous-spark.git
synced 2024-12-20 09:41:19 +00:00
Compare commits
2 commits
ba2aadca60
...
f9bf10b30c
Author | SHA1 | Date | |
---|---|---|---|
f9bf10b30c | |||
13831bd953 |
6 changed files with 47 additions and 119 deletions
|
@ -1,3 +1,7 @@
|
||||||
|
# 1.1.1 / 2024-12-13
|
||||||
|
|
||||||
|
* Fix: Refactor throughput test and fix calculation error from v1.1.0
|
||||||
|
|
||||||
# 1.1.0 / 2024-12-12
|
# 1.1.0 / 2024-12-12
|
||||||
|
|
||||||
* Add source interface selection
|
* Add source interface selection
|
||||||
|
|
7
go.mod
7
go.mod
|
@ -5,19 +5,16 @@ go 1.22
|
||||||
toolchain go1.23.4
|
toolchain go1.23.4
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/Luzifer/rconfig v1.2.0
|
github.com/Luzifer/rconfig/v2 v2.5.2
|
||||||
github.com/influxdata/influxdb v1.11.8
|
github.com/influxdata/influxdb v1.11.8
|
||||||
github.com/sirupsen/logrus v1.9.3
|
github.com/sirupsen/logrus v1.9.3
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/kr/pretty v0.3.0 // indirect
|
github.com/kr/pretty v0.3.0 // indirect
|
||||||
github.com/onsi/ginkgo v1.10.1 // indirect
|
|
||||||
github.com/onsi/gomega v1.7.0 // indirect
|
|
||||||
github.com/rogpeppe/go-internal v1.9.0 // indirect
|
github.com/rogpeppe/go-internal v1.9.0 // indirect
|
||||||
github.com/spf13/pflag v1.0.5 // indirect
|
github.com/spf13/pflag v1.0.5 // indirect
|
||||||
github.com/stretchr/testify v1.8.3 // indirect
|
|
||||||
golang.org/x/sys v0.28.0 // indirect
|
golang.org/x/sys v0.28.0 // indirect
|
||||||
gopkg.in/validator.v2 v2.0.1 // indirect
|
gopkg.in/validator.v2 v2.0.1 // indirect
|
||||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
|
32
go.sum
32
go.sum
|
@ -1,15 +1,11 @@
|
||||||
github.com/Luzifer/rconfig v1.2.0 h1:waD1sqasGVSQSrExpLrQ9Q1JmMaltrS391VdOjWXP/I=
|
github.com/Luzifer/rconfig/v2 v2.5.2 h1:4Bfp8mTrCCK/xghUmUbh/qtKiLZA6RC0tHTgqkNw1m4=
|
||||||
github.com/Luzifer/rconfig v1.2.0/go.mod h1:9pet6z2+mm/UAB0jF/rf0s62USfHNolzgR6Q4KpsJI0=
|
github.com/Luzifer/rconfig/v2 v2.5.2/go.mod h1:HnqUWg+NQh60/neUqfMDDDo5d1v8UPuhwKR1HqM4VWQ=
|
||||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
|
||||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
|
||||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||||
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
|
|
||||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
|
||||||
github.com/influxdata/influxdb v1.11.8 h1:lX8MJDfk91O7nqzzonQkjk87gOeQy9V/Xp3gpELhG1s=
|
github.com/influxdata/influxdb v1.11.8 h1:lX8MJDfk91O7nqzzonQkjk87gOeQy9V/Xp3gpELhG1s=
|
||||||
github.com/influxdata/influxdb v1.11.8/go.mod h1:zRTAuk/Ie/V1LGxJUv8jfDmfv+ypz22lxfhc1MxC3rI=
|
github.com/influxdata/influxdb v1.11.8/go.mod h1:zRTAuk/Ie/V1LGxJUv8jfDmfv+ypz22lxfhc1MxC3rI=
|
||||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||||
|
@ -19,11 +15,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
|
||||||
github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
|
|
||||||
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
|
||||||
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
|
|
||||||
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
||||||
|
@ -35,33 +26,18 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||||
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
|
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||||
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
|
||||||
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
|
|
||||||
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
|
|
||||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
|
||||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
|
||||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
|
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
|
||||||
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
|
||||||
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
|
||||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||||
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
|
|
||||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
|
||||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
|
|
||||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
|
||||||
gopkg.in/validator.v2 v2.0.1 h1:xF0KWyGWXm/LM2G1TrEjqOu4pa6coO9AlWSf3msVfDY=
|
gopkg.in/validator.v2 v2.0.1 h1:xF0KWyGWXm/LM2G1TrEjqOu4pa6coO9AlWSf3msVfDY=
|
||||||
gopkg.in/validator.v2 v2.0.1/go.mod h1:lIUZBlB3Im4s/eYp39Ry/wkR02yOPhZ9IwIRBjuPuG8=
|
gopkg.in/validator.v2 v2.0.1/go.mod h1:lIUZBlB3Im4s/eYp39Ry/wkR02yOPhZ9IwIRBjuPuG8=
|
||||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
|
||||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
|
||||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
|
3
main.go
3
main.go
|
@ -7,7 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/Luzifer/rconfig"
|
"github.com/Luzifer/rconfig/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
const tsvPermission = 0o600
|
const tsvPermission = 0o600
|
||||||
|
@ -34,6 +34,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func initApp() (err error) {
|
func initApp() (err error) {
|
||||||
|
rconfig.AutoEnv(true)
|
||||||
if err = rconfig.ParseAndValidate(&cfg); err != nil {
|
if err = rconfig.ParseAndValidate(&cfg); err != nil {
|
||||||
return fmt.Errorf("parsing CLI params: %w", err)
|
return fmt.Errorf("parsing CLI params: %w", err)
|
||||||
}
|
}
|
||||||
|
|
9
spark.go
9
spark.go
|
@ -6,15 +6,16 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
protocolVersion uint16 = 0x00 // Protocol Version
|
protocolVersion = 0x00 // Protocol Version
|
||||||
blockSize int64 = 200 // size (KB) of each block of data copied to/from remote
|
blockSize = 200 // size (KB) of each block of data copied to/from remote
|
||||||
throughputTestLength uint = 10 // length of time to conduct each throughput test
|
throughputTestLength = 10 * time.Second // length of time to conduct each throughput test
|
||||||
numPings int = 30 // number of pings to attempt
|
numPings = 30 // number of pings to attempt
|
||||||
|
|
||||||
kbps = 1024.0
|
kbps = 1024.0
|
||||||
mbps = 1024.0 * kbps
|
mbps = 1024.0 * kbps
|
||||||
|
|
109
throughput.go
109
throughput.go
|
@ -1,20 +1,17 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
throughputBufferSize = 1024 * blockSize
|
bitsPerByte = 8
|
||||||
throughputBufferSizeBits = throughputBufferSize * 8
|
throughputChunkSize = 1024 * blockSize
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *sparkClient) ExecuteThroughputTest(t *testResult) (err error) {
|
func (s *sparkClient) ExecuteThroughputTest(t *testResult) (err error) {
|
||||||
|
@ -29,14 +26,7 @@ func (s *sparkClient) ExecuteThroughputTest(t *testResult) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//nolint:gocyclo
|
|
||||||
func (s *sparkClient) runSendTest(t *testResult) (err error) {
|
func (s *sparkClient) runSendTest(t *testResult) (err error) {
|
||||||
data := make([]byte, throughputBufferSize)
|
|
||||||
if _, err = rand.Read(data); err != nil {
|
|
||||||
return fmt.Errorf("gathering random data: %w", err)
|
|
||||||
}
|
|
||||||
dataReader := bytes.NewReader(data)
|
|
||||||
|
|
||||||
if err = s.connect(); err != nil {
|
if err = s.connect(); err != nil {
|
||||||
return fmt.Errorf("establishing connection: %w", err)
|
return fmt.Errorf("establishing connection: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -50,52 +40,10 @@ func (s *sparkClient) runSendTest(t *testResult) (err error) {
|
||||||
return fmt.Errorf("sending RCV command: %w", err)
|
return fmt.Errorf("sending RCV command: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
if t.Send.Min, t.Send.Max, t.Send.Avg, err = s.runThroughputTest(rand.Reader, s.conn); err != nil {
|
||||||
blockCount int64
|
return fmt.Errorf("testing throughput: %w", err)
|
||||||
totalStart = time.Now()
|
|
||||||
)
|
|
||||||
|
|
||||||
for {
|
|
||||||
start := time.Now()
|
|
||||||
|
|
||||||
if _, err = io.Copy(s.conn, dataReader); err != nil {
|
|
||||||
// If we get any of these errors, it probably just means that the server closed the connection
|
|
||||||
if err == io.EOF || err == io.ErrClosedPipe || err == syscall.EPIPE {
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if operr, ok := err.(*net.OpError); ok {
|
|
||||||
logrus.Printf("%s", operr.Err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if operr, ok := err.(*net.OpError); ok && operr.Err.Error() == syscall.ECONNRESET.Error() {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Errorf("copying data: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
bps := float64(throughputBufferSizeBits) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds()))
|
|
||||||
if bps < t.Send.Min {
|
|
||||||
t.Send.Min = bps
|
|
||||||
}
|
|
||||||
if bps > t.Send.Max {
|
|
||||||
t.Send.Max = bps
|
|
||||||
}
|
|
||||||
blockCount++
|
|
||||||
|
|
||||||
if _, err := dataReader.Seek(0, 0); err != nil {
|
|
||||||
return fmt.Errorf("seeking data reader: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if time.Since(totalStart) > time.Duration(throughputTestLength)*time.Second {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// average bit per second
|
|
||||||
t.Send.Avg = float64(throughputBufferSizeBits) / (float64(time.Since(totalStart).Nanoseconds()) / float64(time.Second.Nanoseconds()))
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,43 +61,44 @@ func (s *sparkClient) runRecvTest(t *testResult) (err error) {
|
||||||
return fmt.Errorf("writing SND command: %w", err)
|
return fmt.Errorf("writing SND command: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if t.Receive.Min, t.Receive.Max, t.Receive.Avg, err = s.runThroughputTest(s.conn, io.Discard); err != nil {
|
||||||
|
return fmt.Errorf("testing throughput: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*sparkClient) runThroughputTest(src io.Reader, dst io.Writer) (minT, maxT, avgT float64, err error) {
|
||||||
var (
|
var (
|
||||||
blockCount int64
|
dataTxBytes int64
|
||||||
totalStart = time.Now()
|
testStart = time.Now()
|
||||||
)
|
)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
start := time.Now()
|
segmentStart := time.Now()
|
||||||
|
|
||||||
if _, err = io.CopyN(io.Discard, s.conn, throughputBufferSize); err != nil {
|
n, err := io.CopyN(dst, src, throughputChunkSize)
|
||||||
// If we get any of these errors, it probably just means that the server closed the connection
|
if err != nil {
|
||||||
if err == io.EOF || err == io.ErrClosedPipe || err == syscall.EPIPE {
|
return 0, 0, 0, fmt.Errorf("copying data: %w", err)
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if operr, ok := err.(*net.OpError); ok && operr.Err.Error() == syscall.ECONNRESET.Error() {
|
dataTxBytes += n
|
||||||
break
|
|
||||||
|
bps := float64(n*bitsPerByte) / float64(time.Since(segmentStart).Seconds())
|
||||||
|
if bps < minT {
|
||||||
|
minT = bps
|
||||||
|
}
|
||||||
|
if bps > maxT {
|
||||||
|
maxT = bps
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("copying data: %w", err)
|
if time.Since(testStart) > throughputTestLength {
|
||||||
}
|
|
||||||
|
|
||||||
bps := float64(throughputBufferSizeBits) / (float64(time.Since(start).Nanoseconds()) / float64(time.Second.Nanoseconds()))
|
|
||||||
if bps < t.Receive.Min {
|
|
||||||
t.Receive.Min = bps
|
|
||||||
}
|
|
||||||
if bps > t.Receive.Max {
|
|
||||||
t.Receive.Max = bps
|
|
||||||
}
|
|
||||||
blockCount++
|
|
||||||
|
|
||||||
if time.Since(totalStart) > time.Duration(throughputTestLength)*time.Second {
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// average bit per second
|
// average bit per second
|
||||||
t.Receive.Avg = float64(throughputBufferSizeBits) / (float64(time.Since(totalStart).Nanoseconds()) / float64(time.Second.Nanoseconds()))
|
avgT = float64(dataTxBytes*bitsPerByte) / float64(time.Since(testStart).Seconds())
|
||||||
|
|
||||||
return nil
|
return minT, maxT, avgT, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue