diff --git a/metrics.go b/metrics.go index 2cdcd4e..955b085 100644 --- a/metrics.go +++ b/metrics.go @@ -9,8 +9,9 @@ import ( ) const ( - influxWriteInterval = 10 * time.Second influxChunkSize = 1000 + influxPointExpiry = 10 * time.Minute // If the point isn't submitted in this time, drop it + influxWriteInterval = 10 * time.Second ) type metricsSender struct { @@ -46,6 +47,18 @@ func (m *metricsSender) RecordPoint(name string, tags map[string]string, fields return nil } +func (m *metricsSender) filterExpiredPoints(pts []*influx.Point) []*influx.Point { + var out []*influx.Point + + for _, pt := range pts { + if time.Since(pt.Time()) < influxPointExpiry { + out = append(out, pt) + } + } + + return out +} + func (m *metricsSender) resetBatch() error { b, err := influx.NewBatchPoints(influx.BatchPointsConfig{ Database: m.database, @@ -89,7 +102,7 @@ func (m *metricsSender) sendLoop() { if err := m.client.Write(chunk); err != nil { m.errs <- err - failedBatch.AddPoints(m.batch.Points()[i:end]) + failedBatch.AddPoints(m.filterExpiredPoints(m.batch.Points()[i:end])) continue } }