From 86092517ead1ea76416bf262caffb00037d66241 Mon Sep 17 00:00:00 2001 From: Knut Ahlers Date: Sun, 3 Jul 2022 15:45:32 +0200 Subject: [PATCH] Fix: Drop points if they haven't been added for 10m which happens for example when the database has the max number of series reached and a new series is to be added Signed-off-by: Knut Ahlers --- metrics.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/metrics.go b/metrics.go index 2cdcd4e..955b085 100644 --- a/metrics.go +++ b/metrics.go @@ -9,8 +9,9 @@ import ( ) const ( - influxWriteInterval = 10 * time.Second influxChunkSize = 1000 + influxPointExpiry = 10 * time.Minute // If the point isn't submitted in this time, drop it + influxWriteInterval = 10 * time.Second ) type metricsSender struct { @@ -46,6 +47,18 @@ func (m *metricsSender) RecordPoint(name string, tags map[string]string, fields return nil } +func (m *metricsSender) filterExpiredPoints(pts []*influx.Point) []*influx.Point { + var out []*influx.Point + + for _, pt := range pts { + if time.Since(pt.Time()) < influxPointExpiry { + out = append(out, pt) + } + } + + return out +} + func (m *metricsSender) resetBatch() error { b, err := influx.NewBatchPoints(influx.BatchPointsConfig{ Database: m.database, @@ -89,7 +102,7 @@ func (m *metricsSender) sendLoop() { if err := m.client.Write(chunk); err != nil { m.errs <- err - failedBatch.AddPoints(m.batch.Points()[i:end]) + failedBatch.AddPoints(m.filterExpiredPoints(m.batch.Points()[i:end])) continue } }