commit 9eff140ae9a8d02c4959811fbe3f896cd0195cef Author: Knut Ahlers Date: Sun Feb 10 01:08:21 2019 +0100 Initial version diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..41a523d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +cam2mjpeg diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..8919855 --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module github.com/Luzifer/cam2mjpeg + +require ( + github.com/Luzifer/rconfig/v2 v2.2.1 + github.com/gofrs/uuid v3.2.0+incompatible + github.com/sirupsen/logrus v1.3.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..69241ad --- /dev/null +++ b/go.sum @@ -0,0 +1,22 @@ +github.com/Luzifer/rconfig/v2 v2.2.1 h1:zcDdLQlnlzwcBJ8E0WFzOkQE1pCMn3EbX0dFYkeTczg= +github.com/Luzifer/rconfig/v2 v2.2.1/go.mod h1:OKIX0/JRZrPJ/ZXXWklQEFXA6tBfWaljZbW37w+sqBw= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= +github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME= +github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 h1:WB265cn5OpO+hK3pikC9hpP1zI/KTwmyMFKloW9eOVc= +gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go new file mode 100644 index 0000000..dce9622 --- /dev/null +++ b/main.go @@ -0,0 +1,163 @@ +package main + +import ( + "bytes" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "strconv" + "sync" + + rconfig "github.com/Luzifer/rconfig/v2" + "github.com/gofrs/uuid" + log "github.com/sirupsen/logrus" +) + +var ( + cfg = struct { + Device string `flag:"input,i" default:"/dev/video0" description:"Video device to read from"` + FrameRate int `flag:"rate,r" default:"10" description:"Frame rate to show in MJPEG"` + Height int `flag:"height,h" default:"720" description:"Height of video frames"` + Listen string `flag:"listen" default:":3000" description:"Port/IP to listen on"` + LogLevel string `flag:"log-level" default:"info" description:"Log level (debug, info, warn, error, fatal)"` + Quality int `flag:"quality,q" default:"5" description:"Image quality (2..31)"` + VersionAndExit bool `flag:"version" default:"false" description:"Prints current version and exits"` + Width int `flag:"width,w" default:"1280" description:"Width of video frames"` + }{} + + requester = map[string]chan []byte{} + requesterLock = new(sync.RWMutex) + + version = "dev" +) + +var ( + beginOfJPEG = []byte{0xff, 0xd8} + endOfJPEG = []byte{0xff, 0xd9} +) + +const maxBacklog = 5 + +func init() { + if err := rconfig.ParseAndValidate(&cfg); err != nil { + log.Fatalf("Unable to parse commandline options: %s", err) + } + + if cfg.VersionAndExit { + fmt.Printf("imgdecode %s\n", version) + os.Exit(0) + } + + if l, err := log.ParseLevel(cfg.LogLevel); err != nil { + log.WithError(err).Fatal("Unable to parse log level") + } else { + log.SetLevel(l) + } +} + +func main() { + http.HandleFunc("/mjpeg", handle) + go func() { + log.WithError(http.ListenAndServe(cfg.Listen, nil)).Fatal("HTTP server has gone") + }() + + log.Debug("HTTP server spawned") + + cmd := exec.Command("ffmpeg", + "-f", "video4linux2", + "-input_format", "yuyv422", + "-s", fmt.Sprintf("%dx%d", cfg.Width, cfg.Height), + "-r", strconv.Itoa(cfg.FrameRate), + "-i", cfg.Device, + "-c:v", "mjpeg", + "-q:v", strconv.Itoa(cfg.Quality), + "-boundary_tag", "ffmpeg", + "-f", "image2pipe", + "-") + + cmd.Stderr = os.Stderr + + out, err := cmd.StdoutPipe() + if err != nil { + log.WithError(err).Fatal("Unable to create stdout pipe") + } + + if err := cmd.Start(); err != nil { + log.WithError(err).Fatal("Unable to spawn ffmpeg") + } + defer cmd.Process.Kill() + + log.Debug("ffmpeg spawned") + + buf := new(bytes.Buffer) + + for { + if _, err := io.CopyN(buf, out, 1024); err != nil { + log.WithError(err).Error("Failed to read ffmpeg output") + break + } + + eoj := bytes.Index(buf.Bytes(), endOfJPEG) + if eoj == -1 { + continue + } + + img := buf.Next(eoj + len(endOfJPEG)) + + if !bytes.HasPrefix(img, beginOfJPEG) || !bytes.HasSuffix(img, endOfJPEG) { + log.Warn("Found invalid JPEG, skipping") + continue + } + + go sendImage(img) + } +} + +func sendImage(jpg []byte) { + requesterLock.RLock() + defer requesterLock.RUnlock() + + if len(requester) == 0 { + return + } + + for _, c := range requester { + if len(c) < maxBacklog { + c <- jpg + } + } + + log.WithField("requesters", len(requester)).Debug("sent frame") +} + +func handle(res http.ResponseWriter, r *http.Request) { + imgChan := make(chan []byte, 10) + uid := uuid.Must(uuid.NewV4()).String() + + defer func() { + deregisterImgChan(uid) + close(imgChan) + }() + + registerImgChan(uid, imgChan) + + handleMJPEG(res, r, imgChan, uid) +} + +func registerImgChan(id string, ic chan []byte) { + requesterLock.Lock() + defer requesterLock.Unlock() + + requester[id] = ic + log.WithField("id", id).Debug("registered new requester") +} + +func deregisterImgChan(id string) { + requesterLock.Lock() + defer requesterLock.Unlock() + + delete(requester, id) + log.WithField("id", id).Debug("removed requester") +} diff --git a/mjpeg.go b/mjpeg.go new file mode 100644 index 0000000..8f9da58 --- /dev/null +++ b/mjpeg.go @@ -0,0 +1,61 @@ +package main + +import ( + "fmt" + "mime/multipart" + "net/http" + "net/textproto" + + log "github.com/sirupsen/logrus" +) + +func handleMJPEG(res http.ResponseWriter, r *http.Request, imgs chan []byte, uid string) { + if r.Method != "GET" { + http.Error(res, "405 Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + logger := log.WithField("id", uid) + + mimeWriter := multipart.NewWriter(res) + mimeWriter.SetBoundary("--boundary") + defer mimeWriter.Close() + + res.Header().Add("Connection", "close") + res.Header().Add("Cache-Control", "no-store, no-cache") + res.Header().Add("Content-Type", fmt.Sprintf("multipart/x-mixed-replace;boundary=%s", mimeWriter.Boundary())) + + cn := res.(http.CloseNotifier).CloseNotify() + errC := 0 + + for { + select { + case <-cn: + return + + case img := <-imgs: + partHeader := make(textproto.MIMEHeader) + partHeader.Add("Content-Type", "image/jpeg") + + partWriter, err := mimeWriter.CreatePart(partHeader) + if err != nil { + logger.WithError(err).Error("Unable to create mime part") + continue + } + + _, err = partWriter.Write(img) + if err != nil { + logger.WithError(err).Error("Unable to write image") + errC++ + + if errC > 5 { + logger.Error("Too many errors, killing connection") + return + } + continue + } + + errC = 0 + } + } +}