diff --git a/Gopkg.lock b/Gopkg.lock index 19cbc1d..d6cbc9f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -9,6 +9,36 @@ revision = "7aef1d393c1e2d0758901853b59981c7adc67c7e" version = "v1.2.0" +[[projects]] + digest = "1:718c57979a9197414a044fff2028e57cb9fe145069e4f507059755bfe87f1bfe" + name = "github.com/influxdata/influxdb" + packages = [ + "client/v2", + "models", + "pkg/escape", + ] + pruneopts = "NUT" + revision = "698dbc789aff13c2678357a6b93ff73dd7136571" + version = "v1.7.3" + +[[projects]] + digest = "1:937258f1293bc9295b4789b0abea5b4ec030e3caecb65a4e1dc0b6999957a5ed" + name = "github.com/influxdata/platform" + packages = [ + "models", + "pkg/escape", + ] + pruneopts = "NUT" + revision = "0f79e4ea3248354c789cba274542e0a8e55971db" + +[[projects]] + digest = "1:14715f705ff5dfe0ffd6571d7d201dd8e921030f8070321a79380d8ca4ec1a24" + name = "github.com/pkg/errors" + packages = ["."] + pruneopts = "NUT" + revision = "ba968bfe8b2f7e042a574c888954fccecfa385b4" + version = "v0.8.1" + [[projects]] digest = "1:e977852d2828769521a7da3a2fed51416ccc9d7e8d5ca0e63c64066f0bfd5f32" name = "github.com/sirupsen/logrus" @@ -24,14 +54,6 @@ revision = "e57e3eeb33f795204c1ca35f56c44f83227c6e66" version = "v1.0.0" -[[projects]] - digest = "1:f7da4c8f89b9af6758dda1bf5083b80e0c2ba1598f85fea8da788133be1f576d" - name = "github.com/stathat/go" - packages = ["."] - pruneopts = "NUT" - revision = "74669b9f388d9d788c97399a0824adbfee78400e" - version = "v1.0.0" - [[projects]] digest = "1:eb24bda4388b169c14b6dcfd910eab9bfada54502fe4d1da80889730fbc63b41" name = "golang.org/x/sys" @@ -58,8 +80,9 @@ analyzer-version = 1 input-imports = [ "github.com/Luzifer/rconfig", + "github.com/influxdata/influxdb/client/v2", + "github.com/pkg/errors", "github.com/sirupsen/logrus", - "github.com/stathat/go", ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index c478a51..fb2544c 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -25,6 +25,10 @@ # unused-packages = true +[[constraint]] + name = "github.com/influxdata/influxdb" + version = "1.7.3" + [[constraint]] name = "github.com/Luzifer/rconfig" version = "1.2.0" diff --git a/README.md b/README.md index 4d72612..53f2220 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ # Luzifer / continuous-spark -This tool is a daemon intended to do continuous tests against a [sparkyfish server](https://github.com/chrissnell/sparkyfish). The measurements are sent to [StatHat](https://www.stathat.com/) for graphing and are stored locally in a TSV file for reference. +This tool is a daemon intended to do continuous tests against a [sparkyfish server](https://github.com/chrissnell/sparkyfish). The measurements are sent to an InfluxDB for graphing and are stored locally in a TSV file for reference. _Why?_ Quite easy: My internet connection is quite often way slower (170Kbit vs. 200Mbit) as promised in the contract I'm paying for so I set up this daemon on my NAS connected using 2x1Gbit cable to my providers FritzBox. On the other side an unmodified sparkyfish server is running on a 1Gbit datacenter connection. The test is executed every 15m and looking at the graph causes me to weep... diff --git a/influx.go b/influx.go new file mode 100644 index 0000000..f5c1ad9 --- /dev/null +++ b/influx.go @@ -0,0 +1,107 @@ +package main + +import ( + "sync" + "time" + + influx "github.com/influxdata/influxdb/client/v2" + "github.com/pkg/errors" +) + +const ( + influxWriteInterval = 10 * time.Second +) + +type metricsSender struct { + batch influx.BatchPoints + batchLock sync.Mutex + client influx.Client + errs chan error + + influxDB string +} + +func NewMetricsSender(influxHost, influxUser, influxPass, influxDatabase string) (*metricsSender, error) { + out := &metricsSender{ + errs: make(chan error, 10), + influxDB: influxDatabase, + } + return out, out.initialize(influxHost, influxUser, influxPass) +} + +func (m *metricsSender) Errors() <-chan error { + return m.errs +} + +func (m *metricsSender) ForceTransmit() error { + return errors.Wrap(m.transmit(), "Unable to transmit recorded points") +} + +func (m *metricsSender) RecordPoint(name string, tags map[string]string, fields map[string]interface{}) error { + pt, err := influx.NewPoint(name, tags, fields, time.Now()) + if err != nil { + return err + } + + m.batchLock.Lock() + defer m.batchLock.Unlock() + m.batch.AddPoint(pt) + + return nil +} + +func (m *metricsSender) resetBatch() error { + b, err := influx.NewBatchPoints(influx.BatchPointsConfig{ + Database: m.influxDB, + }) + + if err != nil { + return err + } + + m.batch = b + return nil +} + +func (m *metricsSender) sendLoop() { + for range time.Tick(influxWriteInterval) { + + if err := m.transmit(); err != nil { + m.errs <- err + } + + } +} + +func (m *metricsSender) transmit() error { + m.batchLock.Lock() + defer m.batchLock.Unlock() + + if err := m.client.Write(m.batch); err != nil { + return errors.Wrap(err, "Unable to write recorded points") + } + m.resetBatch() + + return nil +} + +func (m *metricsSender) initialize(influxHost, influxUser, influxPass string) error { + influxClient, err := influx.NewHTTPClient(influx.HTTPConfig{ + Addr: influxHost, + Username: influxUser, + Password: influxPass, + Timeout: 2 * time.Second, + }) + + if err != nil { + return err + } + + m.client = influxClient + if err := m.resetBatch(); err != nil { + return err + } + go m.sendLoop() + + return nil +} diff --git a/main.go b/main.go index 5011258..de23eba 100644 --- a/main.go +++ b/main.go @@ -6,28 +6,29 @@ import ( "os" "time" - "github.com/Luzifer/rconfig" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" - stathat "github.com/stathat/go" -) -const ( - metricPing = "[CS] Ping" - metricThresholdRX = "[CS] Threshold RX" - metricThresholdTX = "[CS] Threshold TX" + "github.com/Luzifer/rconfig" ) var ( cfg struct { - Hostname string `flag:"hostname" default:"" description:"Hostname / IP of the sparkyfish server" validate:"nonzero"` + InfluxDB string `flag:"influx-db" default:"" description:"Name of the database to write to (if unset, InfluxDB feature is disabled)"` + InfluxHost string `flag:"influx-host" default:"http://localhost:8086" description:"Host with protocol of the InfluxDB"` + InfluxPass string `flag:"influx-pass" default:"" description:"Password for the InfluxDB user"` + InfluxUser string `flag:"influx-user" default:"" description:"Username for the InfluxDB connection"` Interval time.Duration `flag:"interval" default:"15m" description:"Interval to execute test in"` LogLevel string `flag:"log-level" default:"info" description:"Set log level (debug, info, warning, error)"` - StatHatEZKey string `flag:"stathat-ezkey" default:"" description:"Key to post metrics to" validate:"nonzero"` + OneShot bool `flag:"oneshot,1" default:"false" description:"Execute one measurement and exit (for cron execution)"` Port int `flag:"port" default:"7121" description:"Port the sparkyfish server is running on"` - TSVFile string `flag:"tsv-file" default:"measures.tsv" description:"File to write the results to"` + Server string `flag:"server" default:"" description:"Hostname / IP of the sparkyfish server" validate:"nonzero"` + TSVFile string `flag:"tsv-file" default:"measures.tsv" description:"File to write the results to (set to empty string to disable)"` VersionAndExit bool `flag:"version" default:"false" description:"Print version information and exit"` } + metrics *metricsSender + version = "dev" ) @@ -49,40 +50,112 @@ func init() { } func main() { + var err error + + if cfg.InfluxDB != "" { + if metrics, err = NewMetricsSender(cfg.InfluxHost, cfg.InfluxUser, cfg.InfluxPass, cfg.InfluxDB); err != nil { + log.WithError(err).Fatalf("Unable to initialize InfluxDB sender") + } + } + if err := updateStats(execTest()); err != nil { - log.Error(err.Error()) + log.WithError(err).Error("Unable to update stats") + } + + if cfg.OneShot { + // Return before loop for oneshot execution + if err := metrics.ForceTransmit(); err != nil { + log.WithError(err).Error("Unable to store metrics") + } + return } for range time.Tick(cfg.Interval) { if err := updateStats(execTest()); err != nil { - log.Error(err.Error()) - continue + log.WithError(err).Error("Unable to update stats") } } } func updateStats(t *testResult, err error) error { if err != nil { - return err + return errors.Wrap(err, "Got error from test function") } - stathat.PostEZValue(metricPing, cfg.StatHatEZKey, t.Ping.Avg) - stathat.PostEZValue(metricThresholdRX, cfg.StatHatEZKey, t.Receive.Avg) - stathat.PostEZValue(metricThresholdTX, cfg.StatHatEZKey, t.Send.Avg) + hostname, err := os.Hostname() + if err != nil { + return errors.Wrap(err, "Unable to get local hostname") + } - return writeTSV(t) + if metrics != nil { + if err := metrics.RecordPoint( + "sparkyfish_ping", + map[string]string{ + "hostname": hostname, + "server": cfg.Server, + }, + map[string]interface{}{ + "avg": t.Ping.Avg, + "min": t.Ping.Min, + "max": t.Ping.Max, + "dev": t.Ping.Dev, + }, + ); err != nil { + return errors.Wrap(err, "Unable to record 'ping' metric") + } + + if err := metrics.RecordPoint( + "sparkyfish_transfer", + map[string]string{ + "direction": "down", + "hostname": hostname, + "server": cfg.Server, + }, + map[string]interface{}{ + "avg": t.Receive.Avg, + "min": t.Receive.Min, + "max": t.Receive.Max, + }, + ); err != nil { + return errors.Wrap(err, "Unable to record 'down' metric") + } + + if err := metrics.RecordPoint( + "sparkyfish_transfer", + map[string]string{ + "direction": "up", + "hostname": hostname, + "server": cfg.Server, + }, + map[string]interface{}{ + "avg": t.Send.Avg, + "min": t.Send.Min, + "max": t.Send.Max, + }, + ); err != nil { + return errors.Wrap(err, "Unable to record 'up' metric") + } + } + + if cfg.TSVFile != "" { + if err := writeTSV(t); err != nil { + return errors.Wrap(err, "Unable to write TSV file") + } + } + + return nil } func writeTSV(t *testResult) error { if _, err := os.Stat(cfg.TSVFile); err != nil && os.IsNotExist(err) { if err := ioutil.WriteFile(cfg.TSVFile, []byte("Date\tPing Min (ms)\tPing Avg (ms)\tPing Max (ms)\tPing StdDev (ms)\tRX Avg (bps)\tTX Avg (bps)\n"), 0644); err != nil { - return err + return errors.Wrap(err, "Unable to write initial TSV headers") } } f, err := os.OpenFile(cfg.TSVFile, os.O_APPEND|os.O_WRONLY, os.ModeAppend) if err != nil { - return err + return errors.Wrap(err, "Unable to open TSV file") } defer f.Close() @@ -96,19 +169,19 @@ func writeTSV(t *testResult) error { t.Send.Avg, ) - return err + return errors.Wrap(err, "Unable to write measurement to TSV file") } func execTest() (*testResult, error) { t := newTestResult() - sc := newSparkClient(cfg.Hostname, cfg.Port) + sc := newSparkClient(cfg.Server, cfg.Port) if err := sc.ExecutePingTest(t); err != nil { - return nil, fmt.Errorf("Ping test fucked up: %s", err) + return nil, errors.Wrap(err, "Ping-test failed") } if err := sc.ExecuteThroughputTest(t); err != nil { - return nil, fmt.Errorf("Throughput test fucked up: %s", err) + return nil, errors.Wrap(err, "Throughput test failed") } log.Debugf("%s", t) diff --git a/vendor/github.com/stathat/go/LICENSE b/vendor/github.com/stathat/go/LICENSE deleted file mode 100644 index 814c09c..0000000 --- a/vendor/github.com/stathat/go/LICENSE +++ /dev/null @@ -1,19 +0,0 @@ -Copyright (C) 2012 Numerotron Inc. - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. diff --git a/vendor/github.com/stathat/go/stathat.go b/vendor/github.com/stathat/go/stathat.go deleted file mode 100644 index fa001b2..0000000 --- a/vendor/github.com/stathat/go/stathat.go +++ /dev/null @@ -1,605 +0,0 @@ -// Copyright (C) 2012 Numerotron Inc. -// Use of this source code is governed by an MIT-style license -// that can be found in the LICENSE file. - -// Copyright 2012 Numerotron Inc. -// Use of this source code is governed by an MIT-style license -// that can be found in the LICENSE file. -// -// Developed at www.stathat.com by Patrick Crosby -// Contact us on twitter with any questions: twitter.com/stat_hat - -// The stathat package makes it easy to post any values to your StatHat -// account. -package stathat - -import ( - "fmt" - "io" - "io/ioutil" - "log" - "net/http" - "net/url" - "strconv" - "sync" - "time" -) - -const hostname = "api.stathat.com" - -type statKind int - -const ( - _ = iota - kcounter statKind = iota - kvalue -) - -func (sk statKind) classicPath() string { - switch sk { - case kcounter: - return "/c" - case kvalue: - return "/v" - } - return "" -} - -type apiKind int - -const ( - _ = iota - classic apiKind = iota - ez -) - -func (ak apiKind) path(sk statKind) string { - switch ak { - case ez: - return "/ez" - case classic: - return sk.classicPath() - } - return "" -} - -type statReport struct { - StatKey string - UserKey string - Value float64 - Timestamp int64 - statType statKind - apiType apiKind -} - -// Reporter describes an interface for communicating with the StatHat API -type Reporter interface { - PostCount(statKey, userKey string, count int) error - PostCountTime(statKey, userKey string, count int, timestamp int64) error - PostCountOne(statKey, userKey string) error - PostValue(statKey, userKey string, value float64) error - PostValueTime(statKey, userKey string, value float64, timestamp int64) error - PostEZCountOne(statName, ezkey string) error - PostEZCount(statName, ezkey string, count int) error - PostEZCountTime(statName, ezkey string, count int, timestamp int64) error - PostEZValue(statName, ezkey string, value float64) error - PostEZValueTime(statName, ezkey string, value float64, timestamp int64) error - WaitUntilFinished(timeout time.Duration) bool -} - -// BasicReporter is a StatHat client that can report stat values/counts to the servers. -type BasicReporter struct { - reports chan *statReport - done chan bool - client *http.Client - wg *sync.WaitGroup -} - -// NewReporter returns a new Reporter. You must specify the channel bufferSize and the -// goroutine poolSize. You can pass in nil for the transport and it will create an -// http transport with MaxIdleConnsPerHost set to the goroutine poolSize. Note if you -// pass in your own transport, it's a good idea to have its MaxIdleConnsPerHost be set -// to at least the poolSize to allow for effective connection reuse. -func NewReporter(bufferSize, poolSize int, transport http.RoundTripper) Reporter { - r := new(BasicReporter) - if transport == nil { - transport = &http.Transport{ - // Allow for an idle connection per goroutine. - MaxIdleConnsPerHost: poolSize, - } - } - r.client = &http.Client{Transport: transport} - r.reports = make(chan *statReport, bufferSize) - r.done = make(chan bool) - r.wg = new(sync.WaitGroup) - for i := 0; i < poolSize; i++ { - r.wg.Add(1) - go r.processReports() - } - return r -} - -type statCache struct { - counterStats map[string]int - valueStats map[string][]float64 -} - -func (sc *statCache) AverageValue(statName string) float64 { - total := 0.0 - values := sc.valueStats[statName] - if len(values) == 0 { - return total - } - for _, value := range values { - total += value - } - return total / float64(len(values)) -} - -// BatchReporter wraps an existing Reporter in order to implement sending stats -// to the StatHat server in batch. The flow is only available for the EZ API. -// The following describes how stats are sent: -// 1.) PostEZCountOne is called and adds the stat request to a queue. -// 2.) PostEZCountOne is called again on the same stat, the value in the queue is incremented. -// 3.) After batchInterval amount of time, all stat requests from the queue are -// sent to the server. -type BatchReporter struct { - sync.Mutex - r Reporter - batchInterval time.Duration - caches map[string]*statCache - shutdownBatchCh chan struct{} -} - -// DefaultReporter is the default instance of *Reporter. -var DefaultReporter = NewReporter(100000, 10, nil) - -var testingEnv = false - -type testPost struct { - url string - values url.Values -} - -var testPostChannel chan *testPost - -// The Verbose flag determines if the package should write verbose output to stdout. -var Verbose = false - -func setTesting() { - testingEnv = true - testPostChannel = make(chan *testPost) -} - -func newEZStatCount(statName, ezkey string, count int) *statReport { - return &statReport{StatKey: statName, - UserKey: ezkey, - Value: float64(count), - statType: kcounter, - apiType: ez} -} - -func newEZStatValue(statName, ezkey string, value float64) *statReport { - return &statReport{StatKey: statName, - UserKey: ezkey, - Value: value, - statType: kvalue, - apiType: ez} -} - -func newClassicStatCount(statKey, userKey string, count int) *statReport { - return &statReport{StatKey: statKey, - UserKey: userKey, - Value: float64(count), - statType: kcounter, - apiType: classic} -} - -func newClassicStatValue(statKey, userKey string, value float64) *statReport { - return &statReport{StatKey: statKey, - UserKey: userKey, - Value: value, - statType: kvalue, - apiType: classic} -} - -func (sr *statReport) values() url.Values { - switch sr.apiType { - case ez: - return sr.ezValues() - case classic: - return sr.classicValues() - } - - return nil -} - -func (sr *statReport) ezValues() url.Values { - switch sr.statType { - case kcounter: - return sr.ezCounterValues() - case kvalue: - return sr.ezValueValues() - } - return nil -} - -func (sr *statReport) classicValues() url.Values { - switch sr.statType { - case kcounter: - return sr.classicCounterValues() - case kvalue: - return sr.classicValueValues() - } - return nil -} - -func (sr *statReport) ezCommonValues() url.Values { - result := make(url.Values) - result.Set("stat", sr.StatKey) - result.Set("ezkey", sr.UserKey) - if sr.Timestamp > 0 { - result.Set("t", sr.timeString()) - } - return result -} - -func (sr *statReport) classicCommonValues() url.Values { - result := make(url.Values) - result.Set("key", sr.StatKey) - result.Set("ukey", sr.UserKey) - if sr.Timestamp > 0 { - result.Set("t", sr.timeString()) - } - return result -} - -func (sr *statReport) ezCounterValues() url.Values { - result := sr.ezCommonValues() - result.Set("count", sr.valueString()) - return result -} - -func (sr *statReport) ezValueValues() url.Values { - result := sr.ezCommonValues() - result.Set("value", sr.valueString()) - return result -} - -func (sr *statReport) classicCounterValues() url.Values { - result := sr.classicCommonValues() - result.Set("count", sr.valueString()) - return result -} - -func (sr *statReport) classicValueValues() url.Values { - result := sr.classicCommonValues() - result.Set("value", sr.valueString()) - return result -} - -func (sr *statReport) valueString() string { - return strconv.FormatFloat(sr.Value, 'g', -1, 64) -} - -func (sr *statReport) timeString() string { - return strconv.FormatInt(sr.Timestamp, 10) -} - -func (sr *statReport) path() string { - return sr.apiType.path(sr.statType) -} - -func (sr *statReport) url() string { - return fmt.Sprintf("https://%s%s", hostname, sr.path()) -} - -// Using the classic API, posts a count to a stat using DefaultReporter. -func PostCount(statKey, userKey string, count int) error { - return DefaultReporter.PostCount(statKey, userKey, count) -} - -// Using the classic API, posts a count to a stat using DefaultReporter at a specific -// time. -func PostCountTime(statKey, userKey string, count int, timestamp int64) error { - return DefaultReporter.PostCountTime(statKey, userKey, count, timestamp) -} - -// Using the classic API, posts a count of 1 to a stat using DefaultReporter. -func PostCountOne(statKey, userKey string) error { - return DefaultReporter.PostCountOne(statKey, userKey) -} - -// Using the classic API, posts a value to a stat using DefaultReporter. -func PostValue(statKey, userKey string, value float64) error { - return DefaultReporter.PostValue(statKey, userKey, value) -} - -// Using the classic API, posts a value to a stat at a specific time using DefaultReporter. -func PostValueTime(statKey, userKey string, value float64, timestamp int64) error { - return DefaultReporter.PostValueTime(statKey, userKey, value, timestamp) -} - -// Using the EZ API, posts a count of 1 to a stat using DefaultReporter. -func PostEZCountOne(statName, ezkey string) error { - return DefaultReporter.PostEZCountOne(statName, ezkey) -} - -// Using the EZ API, posts a count to a stat using DefaultReporter. -func PostEZCount(statName, ezkey string, count int) error { - return DefaultReporter.PostEZCount(statName, ezkey, count) -} - -// Using the EZ API, posts a count to a stat at a specific time using DefaultReporter. -func PostEZCountTime(statName, ezkey string, count int, timestamp int64) error { - return DefaultReporter.PostEZCountTime(statName, ezkey, count, timestamp) -} - -// Using the EZ API, posts a value to a stat using DefaultReporter. -func PostEZValue(statName, ezkey string, value float64) error { - return DefaultReporter.PostEZValue(statName, ezkey, value) -} - -// Using the EZ API, posts a value to a stat at a specific time using DefaultReporter. -func PostEZValueTime(statName, ezkey string, value float64, timestamp int64) error { - return DefaultReporter.PostEZValueTime(statName, ezkey, value, timestamp) -} - -// Wait for all stats to be sent, or until timeout. Useful for simple command- -// line apps to defer a call to this in main() -func WaitUntilFinished(timeout time.Duration) bool { - return DefaultReporter.WaitUntilFinished(timeout) -} - -// Using the classic API, posts a count to a stat. -func (r *BasicReporter) PostCount(statKey, userKey string, count int) error { - r.add(newClassicStatCount(statKey, userKey, count)) - return nil -} - -// Using the classic API, posts a count to a stat at a specific time. -func (r *BasicReporter) PostCountTime(statKey, userKey string, count int, timestamp int64) error { - x := newClassicStatCount(statKey, userKey, count) - x.Timestamp = timestamp - r.add(x) - return nil -} - -// Using the classic API, posts a count of 1 to a stat. -func (r *BasicReporter) PostCountOne(statKey, userKey string) error { - return r.PostCount(statKey, userKey, 1) -} - -// Using the classic API, posts a value to a stat. -func (r *BasicReporter) PostValue(statKey, userKey string, value float64) error { - r.add(newClassicStatValue(statKey, userKey, value)) - return nil -} - -// Using the classic API, posts a value to a stat at a specific time. -func (r *BasicReporter) PostValueTime(statKey, userKey string, value float64, timestamp int64) error { - x := newClassicStatValue(statKey, userKey, value) - x.Timestamp = timestamp - r.add(x) - return nil -} - -// Using the EZ API, posts a count of 1 to a stat. -func (r *BasicReporter) PostEZCountOne(statName, ezkey string) error { - return r.PostEZCount(statName, ezkey, 1) -} - -// Using the EZ API, posts a count to a stat. -func (r *BasicReporter) PostEZCount(statName, ezkey string, count int) error { - r.add(newEZStatCount(statName, ezkey, count)) - return nil -} - -// Using the EZ API, posts a count to a stat at a specific time. -func (r *BasicReporter) PostEZCountTime(statName, ezkey string, count int, timestamp int64) error { - x := newEZStatCount(statName, ezkey, count) - x.Timestamp = timestamp - r.add(x) - return nil -} - -// Using the EZ API, posts a value to a stat. -func (r *BasicReporter) PostEZValue(statName, ezkey string, value float64) error { - r.add(newEZStatValue(statName, ezkey, value)) - return nil -} - -// Using the EZ API, posts a value to a stat at a specific time. -func (r *BasicReporter) PostEZValueTime(statName, ezkey string, value float64, timestamp int64) error { - x := newEZStatValue(statName, ezkey, value) - x.Timestamp = timestamp - r.add(x) - return nil -} - -func (r *BasicReporter) processReports() { - for sr := range r.reports { - if Verbose { - log.Printf("posting stat to stathat: %s, %v", sr.url(), sr.values()) - } - - if testingEnv { - if Verbose { - log.Printf("in test mode, putting stat on testPostChannel") - } - testPostChannel <- &testPost{sr.url(), sr.values()} - continue - } - - resp, err := r.client.PostForm(sr.url(), sr.values()) - if err != nil { - log.Printf("error posting stat to stathat: %s", err) - continue - } - - if Verbose { - body, _ := ioutil.ReadAll(resp.Body) - log.Printf("stathat post result: %s", body) - } else { - // Read the body even if we don't intend to use it. Otherwise golang won't pool the connection. - // See also: http://stackoverflow.com/questions/17948827/reusing-http-connections-in-golang/17953506#17953506 - io.Copy(ioutil.Discard, resp.Body) - } - - resp.Body.Close() - } - r.wg.Done() -} - -func (r *BasicReporter) add(rep *statReport) { - select { - case r.reports <- rep: - default: - } -} - -func (r *BasicReporter) finish() { - close(r.reports) - r.wg.Wait() - r.done <- true -} - -// Wait for all stats to be sent, or until timeout. Useful for simple command- -// line apps to defer a call to this in main() -func (r *BasicReporter) WaitUntilFinished(timeout time.Duration) bool { - go r.finish() - select { - case <-r.done: - return true - case <-time.After(timeout): - return false - } -} - -// NewBatchReporter creates a batching stat reporter. The interval parameter -// specifies how often stats should be posted to the StatHat server. -func NewBatchReporter(reporter Reporter, interval time.Duration) Reporter { - - br := &BatchReporter{ - r: reporter, - batchInterval: interval, - caches: make(map[string]*statCache), - shutdownBatchCh: make(chan struct{}), - } - - go br.batchLoop() - - return br -} - -func (br *BatchReporter) getEZCache(ezkey string) *statCache { - var cache *statCache - var ok bool - - // Fetch ezkey cache - if cache, ok = br.caches[ezkey]; !ok { - cache = &statCache{ - counterStats: make(map[string]int), - valueStats: make(map[string][]float64), - } - br.caches[ezkey] = cache - } - - return cache -} - -func (br *BatchReporter) PostEZCount(statName, ezkey string, count int) error { - br.Lock() - defer br.Unlock() - - // Increment stat by count - br.getEZCache(ezkey).counterStats[statName] += count - - return nil -} - -func (br *BatchReporter) PostEZCountOne(statName, ezkey string) error { - return br.PostEZCount(statName, ezkey, 1) -} - -func (br *BatchReporter) PostEZValue(statName, ezkey string, value float64) error { - br.Lock() - defer br.Unlock() - - // Update value cache - cache := br.getEZCache(ezkey) - cache.valueStats[statName] = append(cache.valueStats[statName], value) - - return nil -} - -func (br *BatchReporter) batchPost() { - - // Copy and clear cache - br.Lock() - caches := br.caches - br.caches = make(map[string]*statCache) - br.Unlock() - - // Post stats - for ezkey, cache := range caches { - // Post counters - for statName, count := range cache.counterStats { - br.r.PostEZCount(statName, ezkey, count) - } - - // Post values - for statName := range cache.valueStats { - br.r.PostEZValue(statName, ezkey, cache.AverageValue(statName)) - } - } -} - -func (br *BatchReporter) batchLoop() { - for { - select { - case <-br.shutdownBatchCh: - return - case <-time.After(br.batchInterval): - br.batchPost() - } - } -} - -func (br *BatchReporter) PostCount(statKey, userKey string, count int) error { - return br.r.PostCount(statKey, userKey, count) -} - -func (br *BatchReporter) PostCountTime(statKey, userKey string, count int, timestamp int64) error { - return br.r.PostCountTime(statKey, userKey, count, timestamp) -} - -func (br *BatchReporter) PostCountOne(statKey, userKey string) error { - return br.r.PostCountOne(statKey, userKey) -} - -func (br *BatchReporter) PostValue(statKey, userKey string, value float64) error { - return br.r.PostValue(statKey, userKey, value) -} - -func (br *BatchReporter) PostValueTime(statKey, userKey string, value float64, timestamp int64) error { - return br.r.PostValueTime(statKey, userKey, value, timestamp) -} - -func (br *BatchReporter) PostEZCountTime(statName, ezkey string, count int, timestamp int64) error { - return br.r.PostEZCountTime(statName, ezkey, count, timestamp) -} - -func (br *BatchReporter) PostEZValueTime(statName, ezkey string, value float64, timestamp int64) error { - return br.r.PostEZValueTime(statName, ezkey, value, timestamp) -} - -func (br *BatchReporter) WaitUntilFinished(timeout time.Duration) bool { - // Shut down batch loop - close(br.shutdownBatchCh) - - // One last post - br.batchPost() - - return br.r.WaitUntilFinished(timeout) -}