Skip to content

Commit

Permalink
Updated mmal and http
Browse files Browse the repository at this point in the history
  • Loading branch information
djthorpe committed Jan 13, 2021
1 parent 4a38aa0 commit aa22557
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 236 deletions.
126 changes: 2 additions & 124 deletions cmd/mmalplayer/main.go
Original file line number Diff line number Diff line change
@@ -1,133 +1,11 @@
package main

import (
"fmt"
"io"
"os"
"unsafe"

"github.com/djthorpe/gopi/v3/pkg/sys/mmal"
"github.com/djthorpe/gopi/v3/pkg/tool"
)

func main() {
args := os.Args
if len(args) != 2 {
fmt.Fprintln(os.Stderr, "Expected file argument")
os.Exit(-1)
}
fh, err := os.Open(args[1])
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(-1)
}
defer fh.Close()
fmt.Println("Decoding", fh.Name())
if err := Run(fh); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(-1)
}
}

func Run(r io.ReadSeeker) error {
decoder, err := mmal.MMALComponentCreate(mmal.MMAL_COMPONENT_DEFAULT_IMAGE_DECODER)
if err != nil {
return err
}
defer decoder.Free()

// Input
in := decoder.InputPorts()[0]
in.Format().Video().SetSize(1280, 720)

// Commit format
if err := in.FormatCommit(); err != nil {
return err
}

pool_in := in.CreatePool(in.BufferPreferred())
if pool_in == nil {
return fmt.Errorf("Failed to create pool_in")
}
defer in.FreePool(pool_in)

// Output
out := decoder.OutputPorts()[0]
if err := out.FormatFullCopy(in.Format()); err != nil {
return err
}

// Commit format
if err := in.FormatCommit(); err != nil {
return err
}

pool_out := in.CreatePool(out.BufferMin())
if pool_out == nil {
return fmt.Errorf("Failed to create pool_out")
}
defer out.FreePool(pool_out)
defer out.Flush()

// Enable all the input port and the output port.
if err := in.EnableWithCallback(input_callback); err != nil {
return err
}
defer in.Disable()

if err := out.EnableWithCallback(output_callback); err != nil {
return err
}
defer out.Disable()

// Enable the decoder and renderer
if err := decoder.Enable(); err != nil {
return err
}

// Data processing loop, eof = end of input file, eoe = end of encoding
eof := false
i := 0
for eof == false && i < 500 {
fmt.Println("LOOP eof=", eof, " i=", i)
i++

// Send data to decode to the input port of the video decoder
if eof == false {
fmt.Println(" SEND DATA INTO DECODER")
if buffer := pool_in.Get(); buffer != nil {
if err := buffer.Fill(r); err == io.EOF {
buffer.SetFlags(mmal.MMAL_BUFFER_HEADER_FLAG_EOS)
eof = true
} else if err != nil {
return err
} else {
if err := in.SendBuffer(buffer); err != nil {
return err
} else {
fmt.Println(" Sent to decoder -> ", buffer)
}
}
}
}
}

fmt.Println("FINISHED LOOP")

// Return success
return nil
}

// input_callback is called when a buffer should be discarded on an input port
func input_callback(port *mmal.MMALPort, buffer *mmal.MMALBuffer) {
// The decoder is done with the data, just recycle the buffer header into its pool
fmt.Println("input_callback done with buffer", buffer)
buffer.Release()
}

// output_callback is called when a buffer is available on an output port
func output_callback(port *mmal.MMALPort, buffer *mmal.MMALBuffer) {
queue := (*mmal.MMALQueue)(unsafe.Pointer(port.Userdata()))
// Queue the decoded video frame
queue.Put(buffer)
fmt.Println("output_callback, decoded video=", buffer, " => ", queue)
os.Exit(tool.CommandLine("mmalplayer", os.Args[1:], new(app)))
}
2 changes: 1 addition & 1 deletion etc/http/index.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{{ template "metadata" . }}
</head>
<body>
{{ ssi "include" "header.inc" }}
{{ ssi "include" "/inc/test" "wait" }}
<ul>
{{ range $file := .Content }}
<li><pre><a href="{{ $file.Path }}">{{ $file.Name }}</a></pre></li>
Expand Down
6 changes: 2 additions & 4 deletions etc/http/page.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
{{ template "metadata" . }}
</head>
<body>
{{ ssi "include" "header.inc" }}
<pre>
{{ .Content }}
</pre>
{{ ssi "include" "/inc/test" "wait" }}
<pre>{{ .Content }}</pre>
</body>
</html>

Expand Down
46 changes: 27 additions & 19 deletions pkg/http/fcgi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ type Server struct {
Handler http.Handler

// Private variables to flag shutdown
ctx context.Context
cancel context.CancelFunc
listener net.Listener
ctx context.Context
cancel context.CancelFunc
}

func (s *Server) ListenAndServe() error {
var l net.Listener
var err error
var wg sync.WaitGroup

// Create listender
if s.Path == "" {
l, err = net.FileListener(os.Stdin)
if err != nil {
if l, err := net.FileListener(os.Stdin); err != nil {
return err
} else {
s.listener = l
}
} else {
// Check for existing file and remove it. Cannot use a directory
Expand All @@ -51,12 +51,13 @@ func (s *Server) ListenAndServe() error {
return err
}

l, err = net.Listen("unix", s.Path)
if err != nil {
if l, err := net.Listen("unix", s.Path); err != nil {
return err
} else {
s.listener = l
}
}
defer l.Close()
defer s.listener.Close()

// Set default handler
if s.Handler == nil {
Expand All @@ -67,17 +68,23 @@ func (s *Server) ListenAndServe() error {
s.ctx, s.cancel = context.WithCancel(context.Background())

// Continue accepting requests until shutdown
for range s.ctx.Done() {
rw, err := l.Accept()
if err != nil {
return err
FOR_LOOP:
for {
select {
case <-s.ctx.Done():
break FOR_LOOP
default:
rw, err := s.listener.Accept()
if err != nil {
return err
}
c := newChild(rw, s.Handler)
wg.Add(1)
go func() {
c.serve()
wg.Done()
}()
}
c := newChild(rw, s.Handler)
wg.Add(1)
go func() {
defer wg.Done()
c.serve()
}()
}

// Wait until all connections served
Expand All @@ -89,6 +96,7 @@ func (s *Server) ListenAndServe() error {

func (s *Server) Close() error {
if s.cancel != nil {
s.listener.Close()
s.cancel()
}
return nil
Expand Down
25 changes: 18 additions & 7 deletions pkg/http/handler/template.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package handler

import (
"encoding/json"
"bytes"
"fmt"
"html/template"
"net/http"
Expand Down Expand Up @@ -130,28 +130,39 @@ func (this *Templates) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if tmpl == nil {
if data, ok := ctx.Content.([]byte); ok {
w.Header().Set("Content-Length", fmt.Sprint(len(data)))
w.WriteHeader(http.StatusOK)
if req.Method != http.MethodHead {
w.Write(data)
}
return
} else {
this.ServeError(w, Error(req, http.StatusInternalServerError))
return
}
return
}

// Debugging
if this.Logger.IsDebug() {
if json, err := json.MarshalIndent(ctx.Content, " ", " "); err == nil {
this.Debugf(string(json))
/*
if this.Logger.IsDebug() {
if json, err := json.MarshalIndent(ctx.Content, " ", " "); err == nil {
this.Debugf(string(json))
}
}
}
*/

// Execute through a template
if err := tmpl.Execute(w, ctx.Content); err != nil {
data := new(bytes.Buffer)
if err := tmpl.Execute(data, ctx.Content); err != nil {
this.ServeError(w, Error(req, http.StatusInternalServerError, err.Error()))
return
}

// Set content length and write data
w.Header().Set("Content-Length", fmt.Sprint(data.Len()))
w.WriteHeader(http.StatusOK)
if req.Method != http.MethodHead {
w.Write(data.Bytes())
}
}

/////////////////////////////////////////////////////////////////////
Expand Down
8 changes: 4 additions & 4 deletions pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (this *Server) StartInBackground(network, addr string) error {
case errs <- result:
break
default:
this.Debug("StartInBackground:", result)
this.Debug("StartInBackground: ", result)
}
}
this.WaitGroup.Done()
Expand Down Expand Up @@ -284,10 +284,10 @@ func (this *Server) RegisterService(path interface{}, service gopi.Service) erro
if this.mux == nil {
return gopi.ErrOutOfOrder.WithPrefix("RegisterService")
} else if handler_, ok := service.(http.Handler); ok == false {
return gopi.ErrBadParameter.WithPrefix("RegisterService", "service")
return gopi.ErrBadParameter.WithPrefix("RegisterService: ", "service")
} else if path == nil {
if handler_, ok := service.(Transport); ok == false {
return gopi.ErrBadParameter.WithPrefix("RegisterService", "Does not implement SetHandler")
return gopi.ErrBadParameter.WithPrefix("RegisterService: ", "Does not implement SetHandler")
} else {
if this.handler == nil {
handler_.SetHandler(this.mux)
Expand All @@ -298,7 +298,7 @@ func (this *Server) RegisterService(path interface{}, service gopi.Service) erro
}
} else {
if path_, ok := path.(string); ok == false {
return gopi.ErrBadParameter.WithPrefix("RegisterService", "path")
return gopi.ErrBadParameter.WithPrefix("RegisterService: ", "path")
} else {
this.mux.Handle(path_, handler_)
}
Expand Down
Loading

0 comments on commit aa22557

Please sign in to comment.