diff --git a/metrics.go b/metrics.go index 761eb76..53c7941 100644 --- a/metrics.go +++ b/metrics.go @@ -5,10 +5,12 @@ import ( "time" influx "github.com/influxdata/influxdb1-client/v2" + "github.com/pkg/errors" ) const ( influxWriteInterval = 10 * time.Second + influxChunkSize = 10000 ) type metricsSender struct { @@ -48,7 +50,6 @@ func (m *metricsSender) resetBatch() error { b, err := influx.NewBatchPoints(influx.BatchPointsConfig{ Database: m.database, }) - if err != nil { return err } @@ -61,14 +62,40 @@ func (m *metricsSender) sendLoop() { for range time.Tick(influxWriteInterval) { m.batchLock.Lock() - if err := m.client.Write(m.batch); err != nil { - m.errs <- err - m.batchLock.Unlock() + + failedBatch, err := influx.NewBatchPoints(influx.BatchPointsConfig{ + Database: cfg.InfluxDBName, + }) + if err != nil { + m.errs <- errors.Wrap(err, "creating batchpoints") continue } - m.resetBatch() - m.batchLock.Unlock() + for i := 0; i < len(m.batch.Points()); i += influxChunkSize { + chunk, err := influx.NewBatchPoints(influx.BatchPointsConfig{ + Database: cfg.InfluxDBName, + }) + if err != nil { + m.errs <- errors.Wrap(err, "creating batchpoints") + continue + } + + end := i + influxChunkSize + if end > len(m.batch.Points()) { + end = len(m.batch.Points()) + } + + chunk.AddPoints(m.batch.Points()[i:end]) + + if err := m.client.Write(chunk); err != nil { + m.errs <- err + failedBatch.AddPoints(m.batch.Points()[i:end]) + continue + } + } + + m.batch = failedBatch + m.batchLock.Unlock() } } @@ -79,7 +106,6 @@ func (m *metricsSender) initialize(host, user, pass string) error { Password: pass, Timeout: 2 * time.Second, }) - if err != nil { return err }