mirror of
https://github.com/Luzifer/bind-log-metrics.git
synced 2024-12-22 20:11:16 +00:00
Chunk metrics upload to avoid HTTP413
Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
parent
426bd3fb85
commit
cb7cb95995
1 changed files with 33 additions and 7 deletions
40
metrics.go
40
metrics.go
|
@ -5,10 +5,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
influx "github.com/influxdata/influxdb1-client/v2"
|
influx "github.com/influxdata/influxdb1-client/v2"
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
influxWriteInterval = 10 * time.Second
|
influxWriteInterval = 10 * time.Second
|
||||||
|
influxChunkSize = 10000
|
||||||
)
|
)
|
||||||
|
|
||||||
type metricsSender struct {
|
type metricsSender struct {
|
||||||
|
@ -48,7 +50,6 @@ func (m *metricsSender) resetBatch() error {
|
||||||
b, err := influx.NewBatchPoints(influx.BatchPointsConfig{
|
b, err := influx.NewBatchPoints(influx.BatchPointsConfig{
|
||||||
Database: m.database,
|
Database: m.database,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -61,14 +62,40 @@ func (m *metricsSender) sendLoop() {
|
||||||
for range time.Tick(influxWriteInterval) {
|
for range time.Tick(influxWriteInterval) {
|
||||||
|
|
||||||
m.batchLock.Lock()
|
m.batchLock.Lock()
|
||||||
if err := m.client.Write(m.batch); err != nil {
|
|
||||||
m.errs <- err
|
failedBatch, err := influx.NewBatchPoints(influx.BatchPointsConfig{
|
||||||
m.batchLock.Unlock()
|
Database: cfg.InfluxDBName,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
m.errs <- errors.Wrap(err, "creating batchpoints")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
m.resetBatch()
|
|
||||||
m.batchLock.Unlock()
|
|
||||||
|
|
||||||
|
for i := 0; i < len(m.batch.Points()); i += influxChunkSize {
|
||||||
|
chunk, err := influx.NewBatchPoints(influx.BatchPointsConfig{
|
||||||
|
Database: cfg.InfluxDBName,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
m.errs <- errors.Wrap(err, "creating batchpoints")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
end := i + influxChunkSize
|
||||||
|
if end > len(m.batch.Points()) {
|
||||||
|
end = len(m.batch.Points())
|
||||||
|
}
|
||||||
|
|
||||||
|
chunk.AddPoints(m.batch.Points()[i:end])
|
||||||
|
|
||||||
|
if err := m.client.Write(chunk); err != nil {
|
||||||
|
m.errs <- err
|
||||||
|
failedBatch.AddPoints(m.batch.Points()[i:end])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m.batch = failedBatch
|
||||||
|
m.batchLock.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +106,6 @@ func (m *metricsSender) initialize(host, user, pass string) error {
|
||||||
Password: pass,
|
Password: pass,
|
||||||
Timeout: 2 * time.Second,
|
Timeout: 2 * time.Second,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue