1
0
Fork 0
mirror of https://github.com/Luzifer/go_helpers.git synced 2024-10-18 14:24:20 +00:00
go_helpers/io/throttledReader.go
Knut Ahlers 7179a1859b
Add ThrottledReader as io-helper
Signed-off-by: Knut Ahlers <knut@ahlers.me>
2024-03-06 17:16:34 +01:00

64 lines
1.5 KiB
Go

// Package io contains helpers for I/O tasks
package io
import (
"errors"
"fmt"
"io"
"time"
)
// ThrottledReader implements a reader imposing a rate limit to the
// reading side to i.e. limit downloads, limit I/O on a filesystem, …
// The reads will burst and then wait until the rate "calmed" to the
// desired rate.
type ThrottledReader struct {
startRead time.Time
totalReadBytes uint64
readRateBpns float64
next io.Reader
}
// NewThrottledReader creates a reader with next as its underlying reader and
// rate as its throttle rate in Bytes / Second
func NewThrottledReader(next io.Reader, rate float64) *ThrottledReader {
return &ThrottledReader{next: next, readRateBpns: rate / float64(time.Second)}
}
// Read implements the io.Reader interface
func (t *ThrottledReader) Read(p []byte) (n int, err error) {
if t.startRead.IsZero() {
t.startRead = time.Now()
}
// First read is for free
n, err = t.next.Read(p)
if err != nil {
if errors.Is(err, io.EOF) {
return n, io.EOF
}
return n, fmt.Errorf("reading from next: %w", err)
}
// Count the data
t.totalReadBytes += uint64(n)
// Now lets see how long we need to wait
var (
currentRate float64
timePassedNS = int64(time.Since(t.startRead))
)
if timePassedNS > 0 {
currentRate = float64(t.totalReadBytes) / float64(timePassedNS)
}
if currentRate > t.readRateBpns {
timeToWait := int64(float64(t.totalReadBytes)/t.readRateBpns - float64(timePassedNS))
time.Sleep(time.Duration(timeToWait))
}
// Waited long enough, rate is fine again, return
return n, nil
}