-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathstreamreader_js.go
97 lines (85 loc) · 2.32 KB
/
streamreader_js.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package wasmws
import (
"errors"
"io"
"sync"
"syscall/js"
)
//streamReader is an io.ReadCloser implementation for Javascript's ReadableStream
// See: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream
type streamReader struct {
remaining []byte
jsPromise js.Value
err error
}
var streamReaderPool = sync.Pool{
New: func() interface{} {
return new(streamReader)
},
}
//newStreamReaderPromise returns a streamReader from a JavaScript promise for
// a stream reader: See https://developer.mozilla.org/en-US/docs/Web/API/Blob/stream
func newStreamReaderPromise(streamPromise js.Value) *streamReader {
sr := streamReaderPool.Get().(*streamReader)
sr.jsPromise = streamPromise
return sr
}
//Close closes the streamReader and returns it to a pool. DO NOT USE FURTHER!
func (sr *streamReader) Close() error {
sr.Reset()
streamReaderPool.Put(sr)
return nil
}
//Reset makes this streamReader ready for reuse
func (sr *streamReader) Reset() {
const bufMax = socketStreamThresholdBytes
sr.jsPromise, sr.err = js.Value{}, nil
if cap(sr.remaining) < bufMax {
sr.remaining = sr.remaining[:0]
} else {
sr.remaining = nil
}
}
//Read implements the standard io.Reader interface
func (sr *streamReader) Read(p []byte) (n int, err error) {
if sr.err != nil {
return 0, sr.err
}
if len(sr.remaining) == 0 {
readCh, errCh := make(chan []byte, 1), make(chan error, 1)
successCallback := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
if args[0].Get("done").Bool() {
errCh <- io.EOF
return nil
}
jsBuf := args[0].Get("value")
count := jsBuf.Get("byteLength").Int()
var goBuf []byte
if count <= cap(sr.remaining) {
goBuf = sr.remaining[:count]
} else {
goBuf = make([]byte, count)
}
js.CopyBytesToGo(goBuf, jsBuf)
readCh <- goBuf
return nil
})
defer successCallback.Release()
failureCallback := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
errCh <- errors.New(args[0].Get("message").String()) //Send TypeError
return nil
})
defer failureCallback.Release()
//Wait for callback
sr.jsPromise.Call("read").Call("then", successCallback, failureCallback)
select {
case sr.remaining = <-readCh:
case err := <-errCh:
sr.err = err
return 0, err
}
}
n = copy(p, sr.remaining)
sr.remaining = sr.remaining[n:]
return n, nil
}