1
0
Fork 0
mirror of https://github.com/Luzifer/continuous-spark.git synced 2024-10-18 04:44:24 +00:00

Breaking: Code cleanup, switch to InfluxDB for graphing

Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
Knut Ahlers 2019-01-27 15:25:13 +01:00
parent da773ee66f
commit be10a02a2c
Signed by: luzifer
GPG key ID: DC2729FDD34BE99E
7 changed files with 241 additions and 658 deletions

41
Gopkg.lock generated
View file

@ -9,6 +9,36 @@
revision = "7aef1d393c1e2d0758901853b59981c7adc67c7e" revision = "7aef1d393c1e2d0758901853b59981c7adc67c7e"
version = "v1.2.0" version = "v1.2.0"
[[projects]]
digest = "1:718c57979a9197414a044fff2028e57cb9fe145069e4f507059755bfe87f1bfe"
name = "github.com/influxdata/influxdb"
packages = [
"client/v2",
"models",
"pkg/escape",
]
pruneopts = "NUT"
revision = "698dbc789aff13c2678357a6b93ff73dd7136571"
version = "v1.7.3"
[[projects]]
digest = "1:937258f1293bc9295b4789b0abea5b4ec030e3caecb65a4e1dc0b6999957a5ed"
name = "github.com/influxdata/platform"
packages = [
"models",
"pkg/escape",
]
pruneopts = "NUT"
revision = "0f79e4ea3248354c789cba274542e0a8e55971db"
[[projects]]
digest = "1:14715f705ff5dfe0ffd6571d7d201dd8e921030f8070321a79380d8ca4ec1a24"
name = "github.com/pkg/errors"
packages = ["."]
pruneopts = "NUT"
revision = "ba968bfe8b2f7e042a574c888954fccecfa385b4"
version = "v0.8.1"
[[projects]] [[projects]]
digest = "1:e977852d2828769521a7da3a2fed51416ccc9d7e8d5ca0e63c64066f0bfd5f32" digest = "1:e977852d2828769521a7da3a2fed51416ccc9d7e8d5ca0e63c64066f0bfd5f32"
name = "github.com/sirupsen/logrus" name = "github.com/sirupsen/logrus"
@ -24,14 +54,6 @@
revision = "e57e3eeb33f795204c1ca35f56c44f83227c6e66" revision = "e57e3eeb33f795204c1ca35f56c44f83227c6e66"
version = "v1.0.0" version = "v1.0.0"
[[projects]]
digest = "1:f7da4c8f89b9af6758dda1bf5083b80e0c2ba1598f85fea8da788133be1f576d"
name = "github.com/stathat/go"
packages = ["."]
pruneopts = "NUT"
revision = "74669b9f388d9d788c97399a0824adbfee78400e"
version = "v1.0.0"
[[projects]] [[projects]]
digest = "1:eb24bda4388b169c14b6dcfd910eab9bfada54502fe4d1da80889730fbc63b41" digest = "1:eb24bda4388b169c14b6dcfd910eab9bfada54502fe4d1da80889730fbc63b41"
name = "golang.org/x/sys" name = "golang.org/x/sys"
@ -58,8 +80,9 @@
analyzer-version = 1 analyzer-version = 1
input-imports = [ input-imports = [
"github.com/Luzifer/rconfig", "github.com/Luzifer/rconfig",
"github.com/influxdata/influxdb/client/v2",
"github.com/pkg/errors",
"github.com/sirupsen/logrus", "github.com/sirupsen/logrus",
"github.com/stathat/go",
] ]
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View file

@ -25,6 +25,10 @@
# unused-packages = true # unused-packages = true
[[constraint]]
name = "github.com/influxdata/influxdb"
version = "1.7.3"
[[constraint]] [[constraint]]
name = "github.com/Luzifer/rconfig" name = "github.com/Luzifer/rconfig"
version = "1.2.0" version = "1.2.0"

View file

@ -5,7 +5,7 @@
# Luzifer / continuous-spark # Luzifer / continuous-spark
This tool is a daemon intended to do continuous tests against a [sparkyfish server](https://github.com/chrissnell/sparkyfish). The measurements are sent to [StatHat](https://www.stathat.com/) for graphing and are stored locally in a TSV file for reference. This tool is a daemon intended to do continuous tests against a [sparkyfish server](https://github.com/chrissnell/sparkyfish). The measurements are sent to an InfluxDB for graphing and are stored locally in a TSV file for reference.
_Why?_ Quite easy: My internet connection is quite often way slower (170Kbit vs. 200Mbit) as promised in the contract I'm paying for so I set up this daemon on my NAS connected using 2x1Gbit cable to my providers FritzBox. On the other side an unmodified sparkyfish server is running on a 1Gbit datacenter connection. The test is executed every 15m and looking at the graph causes me to weep... _Why?_ Quite easy: My internet connection is quite often way slower (170Kbit vs. 200Mbit) as promised in the contract I'm paying for so I set up this daemon on my NAS connected using 2x1Gbit cable to my providers FritzBox. On the other side an unmodified sparkyfish server is running on a 1Gbit datacenter connection. The test is executed every 15m and looking at the graph causes me to weep...

107
influx.go Normal file
View file

@ -0,0 +1,107 @@
package main
import (
"sync"
"time"
influx "github.com/influxdata/influxdb/client/v2"
"github.com/pkg/errors"
)
const (
influxWriteInterval = 10 * time.Second
)
type metricsSender struct {
batch influx.BatchPoints
batchLock sync.Mutex
client influx.Client
errs chan error
influxDB string
}
func NewMetricsSender(influxHost, influxUser, influxPass, influxDatabase string) (*metricsSender, error) {
out := &metricsSender{
errs: make(chan error, 10),
influxDB: influxDatabase,
}
return out, out.initialize(influxHost, influxUser, influxPass)
}
func (m *metricsSender) Errors() <-chan error {
return m.errs
}
func (m *metricsSender) ForceTransmit() error {
return errors.Wrap(m.transmit(), "Unable to transmit recorded points")
}
func (m *metricsSender) RecordPoint(name string, tags map[string]string, fields map[string]interface{}) error {
pt, err := influx.NewPoint(name, tags, fields, time.Now())
if err != nil {
return err
}
m.batchLock.Lock()
defer m.batchLock.Unlock()
m.batch.AddPoint(pt)
return nil
}
func (m *metricsSender) resetBatch() error {
b, err := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: m.influxDB,
})
if err != nil {
return err
}
m.batch = b
return nil
}
func (m *metricsSender) sendLoop() {
for range time.Tick(influxWriteInterval) {
if err := m.transmit(); err != nil {
m.errs <- err
}
}
}
func (m *metricsSender) transmit() error {
m.batchLock.Lock()
defer m.batchLock.Unlock()
if err := m.client.Write(m.batch); err != nil {
return errors.Wrap(err, "Unable to write recorded points")
}
m.resetBatch()
return nil
}
func (m *metricsSender) initialize(influxHost, influxUser, influxPass string) error {
influxClient, err := influx.NewHTTPClient(influx.HTTPConfig{
Addr: influxHost,
Username: influxUser,
Password: influxPass,
Timeout: 2 * time.Second,
})
if err != nil {
return err
}
m.client = influxClient
if err := m.resetBatch(); err != nil {
return err
}
go m.sendLoop()
return nil
}

121
main.go
View file

@ -6,28 +6,29 @@ import (
"os" "os"
"time" "time"
"github.com/Luzifer/rconfig" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
stathat "github.com/stathat/go"
)
const ( "github.com/Luzifer/rconfig"
metricPing = "[CS] Ping"
metricThresholdRX = "[CS] Threshold RX"
metricThresholdTX = "[CS] Threshold TX"
) )
var ( var (
cfg struct { cfg struct {
Hostname string `flag:"hostname" default:"" description:"Hostname / IP of the sparkyfish server" validate:"nonzero"` InfluxDB string `flag:"influx-db" default:"" description:"Name of the database to write to (if unset, InfluxDB feature is disabled)"`
InfluxHost string `flag:"influx-host" default:"http://localhost:8086" description:"Host with protocol of the InfluxDB"`
InfluxPass string `flag:"influx-pass" default:"" description:"Password for the InfluxDB user"`
InfluxUser string `flag:"influx-user" default:"" description:"Username for the InfluxDB connection"`
Interval time.Duration `flag:"interval" default:"15m" description:"Interval to execute test in"` Interval time.Duration `flag:"interval" default:"15m" description:"Interval to execute test in"`
LogLevel string `flag:"log-level" default:"info" description:"Set log level (debug, info, warning, error)"` LogLevel string `flag:"log-level" default:"info" description:"Set log level (debug, info, warning, error)"`
StatHatEZKey string `flag:"stathat-ezkey" default:"" description:"Key to post metrics to" validate:"nonzero"` OneShot bool `flag:"oneshot,1" default:"false" description:"Execute one measurement and exit (for cron execution)"`
Port int `flag:"port" default:"7121" description:"Port the sparkyfish server is running on"` Port int `flag:"port" default:"7121" description:"Port the sparkyfish server is running on"`
TSVFile string `flag:"tsv-file" default:"measures.tsv" description:"File to write the results to"` Server string `flag:"server" default:"" description:"Hostname / IP of the sparkyfish server" validate:"nonzero"`
TSVFile string `flag:"tsv-file" default:"measures.tsv" description:"File to write the results to (set to empty string to disable)"`
VersionAndExit bool `flag:"version" default:"false" description:"Print version information and exit"` VersionAndExit bool `flag:"version" default:"false" description:"Print version information and exit"`
} }
metrics *metricsSender
version = "dev" version = "dev"
) )
@ -49,40 +50,112 @@ func init() {
} }
func main() { func main() {
var err error
if cfg.InfluxDB != "" {
if metrics, err = NewMetricsSender(cfg.InfluxHost, cfg.InfluxUser, cfg.InfluxPass, cfg.InfluxDB); err != nil {
log.WithError(err).Fatalf("Unable to initialize InfluxDB sender")
}
}
if err := updateStats(execTest()); err != nil { if err := updateStats(execTest()); err != nil {
log.Error(err.Error()) log.WithError(err).Error("Unable to update stats")
}
if cfg.OneShot {
// Return before loop for oneshot execution
if err := metrics.ForceTransmit(); err != nil {
log.WithError(err).Error("Unable to store metrics")
}
return
} }
for range time.Tick(cfg.Interval) { for range time.Tick(cfg.Interval) {
if err := updateStats(execTest()); err != nil { if err := updateStats(execTest()); err != nil {
log.Error(err.Error()) log.WithError(err).Error("Unable to update stats")
continue
} }
} }
} }
func updateStats(t *testResult, err error) error { func updateStats(t *testResult, err error) error {
if err != nil { if err != nil {
return err return errors.Wrap(err, "Got error from test function")
} }
stathat.PostEZValue(metricPing, cfg.StatHatEZKey, t.Ping.Avg) hostname, err := os.Hostname()
stathat.PostEZValue(metricThresholdRX, cfg.StatHatEZKey, t.Receive.Avg) if err != nil {
stathat.PostEZValue(metricThresholdTX, cfg.StatHatEZKey, t.Send.Avg) return errors.Wrap(err, "Unable to get local hostname")
}
return writeTSV(t) if metrics != nil {
if err := metrics.RecordPoint(
"sparkyfish_ping",
map[string]string{
"hostname": hostname,
"server": cfg.Server,
},
map[string]interface{}{
"avg": t.Ping.Avg,
"min": t.Ping.Min,
"max": t.Ping.Max,
"dev": t.Ping.Dev,
},
); err != nil {
return errors.Wrap(err, "Unable to record 'ping' metric")
}
if err := metrics.RecordPoint(
"sparkyfish_transfer",
map[string]string{
"direction": "down",
"hostname": hostname,
"server": cfg.Server,
},
map[string]interface{}{
"avg": t.Receive.Avg,
"min": t.Receive.Min,
"max": t.Receive.Max,
},
); err != nil {
return errors.Wrap(err, "Unable to record 'down' metric")
}
if err := metrics.RecordPoint(
"sparkyfish_transfer",
map[string]string{
"direction": "up",
"hostname": hostname,
"server": cfg.Server,
},
map[string]interface{}{
"avg": t.Send.Avg,
"min": t.Send.Min,
"max": t.Send.Max,
},
); err != nil {
return errors.Wrap(err, "Unable to record 'up' metric")
}
}
if cfg.TSVFile != "" {
if err := writeTSV(t); err != nil {
return errors.Wrap(err, "Unable to write TSV file")
}
}
return nil
} }
func writeTSV(t *testResult) error { func writeTSV(t *testResult) error {
if _, err := os.Stat(cfg.TSVFile); err != nil && os.IsNotExist(err) { if _, err := os.Stat(cfg.TSVFile); err != nil && os.IsNotExist(err) {
if err := ioutil.WriteFile(cfg.TSVFile, []byte("Date\tPing Min (ms)\tPing Avg (ms)\tPing Max (ms)\tPing StdDev (ms)\tRX Avg (bps)\tTX Avg (bps)\n"), 0644); err != nil { if err := ioutil.WriteFile(cfg.TSVFile, []byte("Date\tPing Min (ms)\tPing Avg (ms)\tPing Max (ms)\tPing StdDev (ms)\tRX Avg (bps)\tTX Avg (bps)\n"), 0644); err != nil {
return err return errors.Wrap(err, "Unable to write initial TSV headers")
} }
} }
f, err := os.OpenFile(cfg.TSVFile, os.O_APPEND|os.O_WRONLY, os.ModeAppend) f, err := os.OpenFile(cfg.TSVFile, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
if err != nil { if err != nil {
return err return errors.Wrap(err, "Unable to open TSV file")
} }
defer f.Close() defer f.Close()
@ -96,19 +169,19 @@ func writeTSV(t *testResult) error {
t.Send.Avg, t.Send.Avg,
) )
return err return errors.Wrap(err, "Unable to write measurement to TSV file")
} }
func execTest() (*testResult, error) { func execTest() (*testResult, error) {
t := newTestResult() t := newTestResult()
sc := newSparkClient(cfg.Hostname, cfg.Port) sc := newSparkClient(cfg.Server, cfg.Port)
if err := sc.ExecutePingTest(t); err != nil { if err := sc.ExecutePingTest(t); err != nil {
return nil, fmt.Errorf("Ping test fucked up: %s", err) return nil, errors.Wrap(err, "Ping-test failed")
} }
if err := sc.ExecuteThroughputTest(t); err != nil { if err := sc.ExecuteThroughputTest(t); err != nil {
return nil, fmt.Errorf("Throughput test fucked up: %s", err) return nil, errors.Wrap(err, "Throughput test failed")
} }
log.Debugf("%s", t) log.Debugf("%s", t)

19
vendor/github.com/stathat/go/LICENSE generated vendored
View file

@ -1,19 +0,0 @@
Copyright (C) 2012 Numerotron Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View file

@ -1,605 +0,0 @@
// Copyright (C) 2012 Numerotron Inc.
// Use of this source code is governed by an MIT-style license
// that can be found in the LICENSE file.
// Copyright 2012 Numerotron Inc.
// Use of this source code is governed by an MIT-style license
// that can be found in the LICENSE file.
//
// Developed at www.stathat.com by Patrick Crosby
// Contact us on twitter with any questions: twitter.com/stat_hat
// The stathat package makes it easy to post any values to your StatHat
// account.
package stathat
import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"strconv"
"sync"
"time"
)
const hostname = "api.stathat.com"
type statKind int
const (
_ = iota
kcounter statKind = iota
kvalue
)
func (sk statKind) classicPath() string {
switch sk {
case kcounter:
return "/c"
case kvalue:
return "/v"
}
return ""
}
type apiKind int
const (
_ = iota
classic apiKind = iota
ez
)
func (ak apiKind) path(sk statKind) string {
switch ak {
case ez:
return "/ez"
case classic:
return sk.classicPath()
}
return ""
}
type statReport struct {
StatKey string
UserKey string
Value float64
Timestamp int64
statType statKind
apiType apiKind
}
// Reporter describes an interface for communicating with the StatHat API
type Reporter interface {
PostCount(statKey, userKey string, count int) error
PostCountTime(statKey, userKey string, count int, timestamp int64) error
PostCountOne(statKey, userKey string) error
PostValue(statKey, userKey string, value float64) error
PostValueTime(statKey, userKey string, value float64, timestamp int64) error
PostEZCountOne(statName, ezkey string) error
PostEZCount(statName, ezkey string, count int) error
PostEZCountTime(statName, ezkey string, count int, timestamp int64) error
PostEZValue(statName, ezkey string, value float64) error
PostEZValueTime(statName, ezkey string, value float64, timestamp int64) error
WaitUntilFinished(timeout time.Duration) bool
}
// BasicReporter is a StatHat client that can report stat values/counts to the servers.
type BasicReporter struct {
reports chan *statReport
done chan bool
client *http.Client
wg *sync.WaitGroup
}
// NewReporter returns a new Reporter. You must specify the channel bufferSize and the
// goroutine poolSize. You can pass in nil for the transport and it will create an
// http transport with MaxIdleConnsPerHost set to the goroutine poolSize. Note if you
// pass in your own transport, it's a good idea to have its MaxIdleConnsPerHost be set
// to at least the poolSize to allow for effective connection reuse.
func NewReporter(bufferSize, poolSize int, transport http.RoundTripper) Reporter {
r := new(BasicReporter)
if transport == nil {
transport = &http.Transport{
// Allow for an idle connection per goroutine.
MaxIdleConnsPerHost: poolSize,
}
}
r.client = &http.Client{Transport: transport}
r.reports = make(chan *statReport, bufferSize)
r.done = make(chan bool)
r.wg = new(sync.WaitGroup)
for i := 0; i < poolSize; i++ {
r.wg.Add(1)
go r.processReports()
}
return r
}
type statCache struct {
counterStats map[string]int
valueStats map[string][]float64
}
func (sc *statCache) AverageValue(statName string) float64 {
total := 0.0
values := sc.valueStats[statName]
if len(values) == 0 {
return total
}
for _, value := range values {
total += value
}
return total / float64(len(values))
}
// BatchReporter wraps an existing Reporter in order to implement sending stats
// to the StatHat server in batch. The flow is only available for the EZ API.
// The following describes how stats are sent:
// 1.) PostEZCountOne is called and adds the stat request to a queue.
// 2.) PostEZCountOne is called again on the same stat, the value in the queue is incremented.
// 3.) After batchInterval amount of time, all stat requests from the queue are
// sent to the server.
type BatchReporter struct {
sync.Mutex
r Reporter
batchInterval time.Duration
caches map[string]*statCache
shutdownBatchCh chan struct{}
}
// DefaultReporter is the default instance of *Reporter.
var DefaultReporter = NewReporter(100000, 10, nil)
var testingEnv = false
type testPost struct {
url string
values url.Values
}
var testPostChannel chan *testPost
// The Verbose flag determines if the package should write verbose output to stdout.
var Verbose = false
func setTesting() {
testingEnv = true
testPostChannel = make(chan *testPost)
}
func newEZStatCount(statName, ezkey string, count int) *statReport {
return &statReport{StatKey: statName,
UserKey: ezkey,
Value: float64(count),
statType: kcounter,
apiType: ez}
}
func newEZStatValue(statName, ezkey string, value float64) *statReport {
return &statReport{StatKey: statName,
UserKey: ezkey,
Value: value,
statType: kvalue,
apiType: ez}
}
func newClassicStatCount(statKey, userKey string, count int) *statReport {
return &statReport{StatKey: statKey,
UserKey: userKey,
Value: float64(count),
statType: kcounter,
apiType: classic}
}
func newClassicStatValue(statKey, userKey string, value float64) *statReport {
return &statReport{StatKey: statKey,
UserKey: userKey,
Value: value,
statType: kvalue,
apiType: classic}
}
func (sr *statReport) values() url.Values {
switch sr.apiType {
case ez:
return sr.ezValues()
case classic:
return sr.classicValues()
}
return nil
}
func (sr *statReport) ezValues() url.Values {
switch sr.statType {
case kcounter:
return sr.ezCounterValues()
case kvalue:
return sr.ezValueValues()
}
return nil
}
func (sr *statReport) classicValues() url.Values {
switch sr.statType {
case kcounter:
return sr.classicCounterValues()
case kvalue:
return sr.classicValueValues()
}
return nil
}
func (sr *statReport) ezCommonValues() url.Values {
result := make(url.Values)
result.Set("stat", sr.StatKey)
result.Set("ezkey", sr.UserKey)
if sr.Timestamp > 0 {
result.Set("t", sr.timeString())
}
return result
}
func (sr *statReport) classicCommonValues() url.Values {
result := make(url.Values)
result.Set("key", sr.StatKey)
result.Set("ukey", sr.UserKey)
if sr.Timestamp > 0 {
result.Set("t", sr.timeString())
}
return result
}
func (sr *statReport) ezCounterValues() url.Values {
result := sr.ezCommonValues()
result.Set("count", sr.valueString())
return result
}
func (sr *statReport) ezValueValues() url.Values {
result := sr.ezCommonValues()
result.Set("value", sr.valueString())
return result
}
func (sr *statReport) classicCounterValues() url.Values {
result := sr.classicCommonValues()
result.Set("count", sr.valueString())
return result
}
func (sr *statReport) classicValueValues() url.Values {
result := sr.classicCommonValues()
result.Set("value", sr.valueString())
return result
}
func (sr *statReport) valueString() string {
return strconv.FormatFloat(sr.Value, 'g', -1, 64)
}
func (sr *statReport) timeString() string {
return strconv.FormatInt(sr.Timestamp, 10)
}
func (sr *statReport) path() string {
return sr.apiType.path(sr.statType)
}
func (sr *statReport) url() string {
return fmt.Sprintf("https://%s%s", hostname, sr.path())
}
// Using the classic API, posts a count to a stat using DefaultReporter.
func PostCount(statKey, userKey string, count int) error {
return DefaultReporter.PostCount(statKey, userKey, count)
}
// Using the classic API, posts a count to a stat using DefaultReporter at a specific
// time.
func PostCountTime(statKey, userKey string, count int, timestamp int64) error {
return DefaultReporter.PostCountTime(statKey, userKey, count, timestamp)
}
// Using the classic API, posts a count of 1 to a stat using DefaultReporter.
func PostCountOne(statKey, userKey string) error {
return DefaultReporter.PostCountOne(statKey, userKey)
}
// Using the classic API, posts a value to a stat using DefaultReporter.
func PostValue(statKey, userKey string, value float64) error {
return DefaultReporter.PostValue(statKey, userKey, value)
}
// Using the classic API, posts a value to a stat at a specific time using DefaultReporter.
func PostValueTime(statKey, userKey string, value float64, timestamp int64) error {
return DefaultReporter.PostValueTime(statKey, userKey, value, timestamp)
}
// Using the EZ API, posts a count of 1 to a stat using DefaultReporter.
func PostEZCountOne(statName, ezkey string) error {
return DefaultReporter.PostEZCountOne(statName, ezkey)
}
// Using the EZ API, posts a count to a stat using DefaultReporter.
func PostEZCount(statName, ezkey string, count int) error {
return DefaultReporter.PostEZCount(statName, ezkey, count)
}
// Using the EZ API, posts a count to a stat at a specific time using DefaultReporter.
func PostEZCountTime(statName, ezkey string, count int, timestamp int64) error {
return DefaultReporter.PostEZCountTime(statName, ezkey, count, timestamp)
}
// Using the EZ API, posts a value to a stat using DefaultReporter.
func PostEZValue(statName, ezkey string, value float64) error {
return DefaultReporter.PostEZValue(statName, ezkey, value)
}
// Using the EZ API, posts a value to a stat at a specific time using DefaultReporter.
func PostEZValueTime(statName, ezkey string, value float64, timestamp int64) error {
return DefaultReporter.PostEZValueTime(statName, ezkey, value, timestamp)
}
// Wait for all stats to be sent, or until timeout. Useful for simple command-
// line apps to defer a call to this in main()
func WaitUntilFinished(timeout time.Duration) bool {
return DefaultReporter.WaitUntilFinished(timeout)
}
// Using the classic API, posts a count to a stat.
func (r *BasicReporter) PostCount(statKey, userKey string, count int) error {
r.add(newClassicStatCount(statKey, userKey, count))
return nil
}
// Using the classic API, posts a count to a stat at a specific time.
func (r *BasicReporter) PostCountTime(statKey, userKey string, count int, timestamp int64) error {
x := newClassicStatCount(statKey, userKey, count)
x.Timestamp = timestamp
r.add(x)
return nil
}
// Using the classic API, posts a count of 1 to a stat.
func (r *BasicReporter) PostCountOne(statKey, userKey string) error {
return r.PostCount(statKey, userKey, 1)
}
// Using the classic API, posts a value to a stat.
func (r *BasicReporter) PostValue(statKey, userKey string, value float64) error {
r.add(newClassicStatValue(statKey, userKey, value))
return nil
}
// Using the classic API, posts a value to a stat at a specific time.
func (r *BasicReporter) PostValueTime(statKey, userKey string, value float64, timestamp int64) error {
x := newClassicStatValue(statKey, userKey, value)
x.Timestamp = timestamp
r.add(x)
return nil
}
// Using the EZ API, posts a count of 1 to a stat.
func (r *BasicReporter) PostEZCountOne(statName, ezkey string) error {
return r.PostEZCount(statName, ezkey, 1)
}
// Using the EZ API, posts a count to a stat.
func (r *BasicReporter) PostEZCount(statName, ezkey string, count int) error {
r.add(newEZStatCount(statName, ezkey, count))
return nil
}
// Using the EZ API, posts a count to a stat at a specific time.
func (r *BasicReporter) PostEZCountTime(statName, ezkey string, count int, timestamp int64) error {
x := newEZStatCount(statName, ezkey, count)
x.Timestamp = timestamp
r.add(x)
return nil
}
// Using the EZ API, posts a value to a stat.
func (r *BasicReporter) PostEZValue(statName, ezkey string, value float64) error {
r.add(newEZStatValue(statName, ezkey, value))
return nil
}
// Using the EZ API, posts a value to a stat at a specific time.
func (r *BasicReporter) PostEZValueTime(statName, ezkey string, value float64, timestamp int64) error {
x := newEZStatValue(statName, ezkey, value)
x.Timestamp = timestamp
r.add(x)
return nil
}
func (r *BasicReporter) processReports() {
for sr := range r.reports {
if Verbose {
log.Printf("posting stat to stathat: %s, %v", sr.url(), sr.values())
}
if testingEnv {
if Verbose {
log.Printf("in test mode, putting stat on testPostChannel")
}
testPostChannel <- &testPost{sr.url(), sr.values()}
continue
}
resp, err := r.client.PostForm(sr.url(), sr.values())
if err != nil {
log.Printf("error posting stat to stathat: %s", err)
continue
}
if Verbose {
body, _ := ioutil.ReadAll(resp.Body)
log.Printf("stathat post result: %s", body)
} else {
// Read the body even if we don't intend to use it. Otherwise golang won't pool the connection.
// See also: http://stackoverflow.com/questions/17948827/reusing-http-connections-in-golang/17953506#17953506
io.Copy(ioutil.Discard, resp.Body)
}
resp.Body.Close()
}
r.wg.Done()
}
func (r *BasicReporter) add(rep *statReport) {
select {
case r.reports <- rep:
default:
}
}
func (r *BasicReporter) finish() {
close(r.reports)
r.wg.Wait()
r.done <- true
}
// Wait for all stats to be sent, or until timeout. Useful for simple command-
// line apps to defer a call to this in main()
func (r *BasicReporter) WaitUntilFinished(timeout time.Duration) bool {
go r.finish()
select {
case <-r.done:
return true
case <-time.After(timeout):
return false
}
}
// NewBatchReporter creates a batching stat reporter. The interval parameter
// specifies how often stats should be posted to the StatHat server.
func NewBatchReporter(reporter Reporter, interval time.Duration) Reporter {
br := &BatchReporter{
r: reporter,
batchInterval: interval,
caches: make(map[string]*statCache),
shutdownBatchCh: make(chan struct{}),
}
go br.batchLoop()
return br
}
func (br *BatchReporter) getEZCache(ezkey string) *statCache {
var cache *statCache
var ok bool
// Fetch ezkey cache
if cache, ok = br.caches[ezkey]; !ok {
cache = &statCache{
counterStats: make(map[string]int),
valueStats: make(map[string][]float64),
}
br.caches[ezkey] = cache
}
return cache
}
func (br *BatchReporter) PostEZCount(statName, ezkey string, count int) error {
br.Lock()
defer br.Unlock()
// Increment stat by count
br.getEZCache(ezkey).counterStats[statName] += count
return nil
}
func (br *BatchReporter) PostEZCountOne(statName, ezkey string) error {
return br.PostEZCount(statName, ezkey, 1)
}
func (br *BatchReporter) PostEZValue(statName, ezkey string, value float64) error {
br.Lock()
defer br.Unlock()
// Update value cache
cache := br.getEZCache(ezkey)
cache.valueStats[statName] = append(cache.valueStats[statName], value)
return nil
}
func (br *BatchReporter) batchPost() {
// Copy and clear cache
br.Lock()
caches := br.caches
br.caches = make(map[string]*statCache)
br.Unlock()
// Post stats
for ezkey, cache := range caches {
// Post counters
for statName, count := range cache.counterStats {
br.r.PostEZCount(statName, ezkey, count)
}
// Post values
for statName := range cache.valueStats {
br.r.PostEZValue(statName, ezkey, cache.AverageValue(statName))
}
}
}
func (br *BatchReporter) batchLoop() {
for {
select {
case <-br.shutdownBatchCh:
return
case <-time.After(br.batchInterval):
br.batchPost()
}
}
}
func (br *BatchReporter) PostCount(statKey, userKey string, count int) error {
return br.r.PostCount(statKey, userKey, count)
}
func (br *BatchReporter) PostCountTime(statKey, userKey string, count int, timestamp int64) error {
return br.r.PostCountTime(statKey, userKey, count, timestamp)
}
func (br *BatchReporter) PostCountOne(statKey, userKey string) error {
return br.r.PostCountOne(statKey, userKey)
}
func (br *BatchReporter) PostValue(statKey, userKey string, value float64) error {
return br.r.PostValue(statKey, userKey, value)
}
func (br *BatchReporter) PostValueTime(statKey, userKey string, value float64, timestamp int64) error {
return br.r.PostValueTime(statKey, userKey, value, timestamp)
}
func (br *BatchReporter) PostEZCountTime(statName, ezkey string, count int, timestamp int64) error {
return br.r.PostEZCountTime(statName, ezkey, count, timestamp)
}
func (br *BatchReporter) PostEZValueTime(statName, ezkey string, value float64, timestamp int64) error {
return br.r.PostEZValueTime(statName, ezkey, value, timestamp)
}
func (br *BatchReporter) WaitUntilFinished(timeout time.Duration) bool {
// Shut down batch loop
close(br.shutdownBatchCh)
// One last post
br.batchPost()
return br.r.WaitUntilFinished(timeout)
}