1
0
Fork 0
mirror of https://github.com/Luzifer/go_helpers.git synced 2024-10-18 14:24:20 +00:00
go_helpers/splitter/splitter.go

103 lines
1.8 KiB
Go
Raw Permalink Normal View History

package splitter
import (
"bytes"
"errors"
"math"
"sync"
)
var ErrClosedWriter = errors.New("Writing to closed writer prohibited")
// Splitter is a thread-safe writer to split multi-line output at newlines
// and carriage-returns. For example to split program output having progress
// lines in it like ffmpeg does.
type Splitter struct {
buffer []byte
lock sync.Mutex
output chan []byte
done bool
}
// New creates a new splitter instance and starts the analyzer routeing inside
func New() *Splitter {
s := &Splitter{
buffer: []byte{},
output: make(chan []byte, 1000),
done: false, // Explicit declaration though default
}
go s.analyze()
return s
}
// Close disables the analyzer and prevents further write to the splitter
func (c *Splitter) Close() error {
c.done = true
return nil
}
// Subscribe returns a channel containing the output lines
func (c Splitter) Subscribe() <-chan []byte { return c.output }
// Write is a standard implementation of io.Writer returning
// ErrClosedWriter on a write after it got closed
func (c *Splitter) Write(p []byte) (n int, err error) {
if c.done {
return 0, ErrClosedWriter
}
c.lock.Lock()
defer c.lock.Unlock()
c.buffer = append(c.buffer, p...)
return len(p), nil
}
func (c *Splitter) chunk(i int) {
c.output <- c.buffer[0:i]
c.buffer = c.buffer[i+1:]
}
func (c *Splitter) analyze() {
for !c.done {
c.lock.Lock()
splits := []int{}
for _, chr := range []byte{'\r', '\n'} {
if i := bytes.IndexByte(c.buffer, chr); i > -1 {
splits = append(splits, i)
}
}
if i := c.minIntSlice(splits); i > -1 {
c.chunk(i)
}
c.lock.Unlock()
}
if len(c.buffer) > 0 {
c.output <- c.buffer
}
close(c.output)
}
func (c *Splitter) minIntSlice(in []int) int {
if len(in) == 0 {
return -1
}
min := math.MaxInt32
for _, i := range in {
if i < min {
min = i
}
}
return min
}