mirror of
https://github.com/Luzifer/go_helpers.git
synced 2024-12-23 20:41:19 +00:00
Add output splitter
Signed-off-by: Knut Ahlers <knut@ahlers.me>
This commit is contained in:
parent
15199b8e33
commit
c0ec3b8c27
1 changed files with 102 additions and 0 deletions
102
splitter/splitter.go
Normal file
102
splitter/splitter.go
Normal file
|
@ -0,0 +1,102 @@
|
|||
package splitter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"math"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var ErrClosedWriter = errors.New("Writing to closed writer prohibited")
|
||||
|
||||
// Splitter is a thread-safe writer to split multi-line output at newlines
|
||||
// and carriage-returns. For example to split program output having progress
|
||||
// lines in it like ffmpeg does.
|
||||
type Splitter struct {
|
||||
buffer []byte
|
||||
lock sync.Mutex
|
||||
output chan []byte
|
||||
|
||||
done bool
|
||||
}
|
||||
|
||||
// New creates a new splitter instance and starts the analyzer routeing inside
|
||||
func New() *Splitter {
|
||||
s := &Splitter{
|
||||
buffer: []byte{},
|
||||
output: make(chan []byte, 1000),
|
||||
done: false, // Explicit declaration though default
|
||||
}
|
||||
|
||||
go s.analyze()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Close disables the analyzer and prevents further write to the splitter
|
||||
func (c *Splitter) Close() error {
|
||||
c.done = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe returns a channel containing the output lines
|
||||
func (c Splitter) Subscribe() <-chan []byte { return c.output }
|
||||
|
||||
// Write is a standard implementation of io.Writer returning
|
||||
// ErrClosedWriter on a write after it got closed
|
||||
func (c *Splitter) Write(p []byte) (n int, err error) {
|
||||
if c.done {
|
||||
return 0, ErrClosedWriter
|
||||
}
|
||||
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
c.buffer = append(c.buffer, p...)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (c *Splitter) chunk(i int) {
|
||||
c.output <- c.buffer[0:i]
|
||||
c.buffer = c.buffer[i+1:]
|
||||
}
|
||||
|
||||
func (c *Splitter) analyze() {
|
||||
for !c.done {
|
||||
c.lock.Lock()
|
||||
|
||||
splits := []int{}
|
||||
for _, chr := range []byte{'\r', '\n'} {
|
||||
if i := bytes.IndexByte(c.buffer, chr); i > -1 {
|
||||
splits = append(splits, i)
|
||||
}
|
||||
}
|
||||
|
||||
if i := c.minIntSlice(splits); i > -1 {
|
||||
c.chunk(i)
|
||||
}
|
||||
|
||||
c.lock.Unlock()
|
||||
}
|
||||
|
||||
if len(c.buffer) > 0 {
|
||||
c.output <- c.buffer
|
||||
}
|
||||
|
||||
close(c.output)
|
||||
}
|
||||
|
||||
func (c *Splitter) minIntSlice(in []int) int {
|
||||
if len(in) == 0 {
|
||||
return -1
|
||||
}
|
||||
|
||||
min := math.MaxInt32
|
||||
for _, i := range in {
|
||||
if i < min {
|
||||
min = i
|
||||
}
|
||||
}
|
||||
|
||||
return min
|
||||
}
|
Loading…
Reference in a new issue