Write stream implementation (#19)

Defines a WriteStream structure for buffering writes, similar to the existing ReadStream. WriteStreams can be created on OutEndpoints by using ep.NewStream().
This commit is contained in:
zagrodzki
2017-09-08 13:41:25 +02:00
committed by GitHub
parent 01840c1d23
commit 5c10dc8f4e
7 changed files with 368 additions and 84 deletions

View File

@@ -27,18 +27,54 @@ type transferIntf interface {
type stream struct {
// a fifo of USB transfers.
transfers chan transferIntf
// current holds the last transfer to return.
current transferIntf
// total/used are the number of all/used bytes in the current transfer.
total, used int
// delayedErr is the delayed error, returned to the user after all
// remaining data was read.
delayedErr error
// err is the first encountered error, returned to the user.
err error
// finished is true if transfers has been already closed.
finished bool
}
func (s *stream) setDelayedErr(err error) {
if s.delayedErr == nil {
s.delayedErr = err
func (s *stream) gotError(err error) {
if s.err == nil {
s.err = err
}
}
func (s *stream) noMore() {
if !s.finished {
close(s.transfers)
s.finished = true
}
}
func (s *stream) submitAll() {
count := len(s.transfers)
var all []transferIntf
for i := 0; i < count; i++ {
all = append(all, <-s.transfers)
}
for _, t := range all {
if err := t.submit(); err != nil {
t.free()
s.gotError(err)
s.noMore()
return
}
s.transfers <- t
}
return
}
func (s *stream) flushRemaining() {
s.noMore()
for t := range s.transfers {
t.cancel()
t.wait()
t.free()
}
}
func (s *stream) done() {
if s.err == nil {
close(s.transfers)
}
}
@@ -52,6 +88,10 @@ func (s *stream) setDelayedErr(err error) {
// data is left, io.EOF is returned.
type ReadStream struct {
s *stream
// current holds the last transfer to return.
current transferIntf
// total/used are the number of all/used bytes in the current transfer.
total, used int
}
// Read reads data from the transfer stream.
@@ -60,56 +100,49 @@ type ReadStream struct {
// After a non-nil error is returned, all subsequent attempts to read will
// return io.ErrClosedPipe.
// Read cannot be called concurrently with other Read or Close.
func (r ReadStream) Read(p []byte) (int, error) {
s := r.s
if s.transfers == nil {
func (r *ReadStream) Read(p []byte) (int, error) {
if r.s.transfers == nil {
return 0, io.ErrClosedPipe
}
if s.current == nil {
t, ok := <-s.transfers
if r.current == nil {
t, ok := <-r.s.transfers
if !ok {
// no more transfers in flight
s.transfers = nil
return 0, s.delayedErr
r.s.transfers = nil
return 0, r.s.err
}
n, err := t.wait()
if err != nil {
// wait error aborts immediately, all remaining data is invalid.
t.free()
if s.delayedErr == nil {
close(s.transfers)
}
for t := range s.transfers {
t.cancel()
t.wait()
t.free()
}
s.transfers = nil
r.s.flushRemaining()
r.s.transfers = nil
return n, err
}
s.current = t
s.total = n
s.used = 0
r.current = t
r.total = n
r.used = 0
}
use := s.total - s.used
use := r.total - r.used
if use > len(p) {
use = len(p)
}
copy(p, s.current.data()[s.used:s.used+use])
s.used += use
if s.used == s.total {
if s.delayedErr == nil {
if err := s.current.submit(); err == nil {
copy(p, r.current.data()[r.used:r.used+use])
r.used += use
if r.used == r.total {
if r.s.err == nil {
if err := r.current.submit(); err == nil {
// guaranteed to not block, len(transfers) == number of allocated transfers
s.transfers <- s.current
r.s.transfers <- r.current
} else {
s.setDelayedErr(err)
r.s.gotError(err)
r.s.noMore()
}
}
if s.delayedErr != nil {
s.current.free()
if r.s.err != nil {
r.current.free()
}
s.current = nil
r.current = nil
}
return use, nil
}
@@ -119,64 +152,112 @@ func (r ReadStream) Read(p []byte) (int, error) {
// in progress before returning an io.EOF error, unless another error
// was encountered earlier.
// Close cannot be called concurrently with Read.
func (r ReadStream) Close() error {
func (r *ReadStream) Close() error {
if r.s.transfers == nil {
return nil
}
r.s.setDelayedErr(io.EOF)
r.s.gotError(io.EOF)
r.s.noMore()
return nil
}
// WriteStream is a buffer that will send data asynchronously, reducing
// the latency between subsequent Write()s.
/*
type WriteStream struct {
s *stream
s *stream
total int
}
*/
// Write sends the data to the endpoint. Write returning a nil error doesn't
// mean that data was written to the device, only that it was written to the
// buffer. Only a call to Flush() that returns nil error guarantees that
// buffer. Only a call to Close() that returns nil error guarantees that
// all transfers have succeeded.
// TODO(sebek): not implemented and tested yet
/*
func (w WriteStream) Write(p []byte) (int, error) {
s := w.s
// If the slice passed to Write does not align exactly with the transfer
// buffer size (as declared in a call to NewStream), the last USB transfer
// of this Write will be sent with less data than the full buffer.
// After a non-nil error is returned, all subsequent attempts to write will
// return io.ErrClosedPipe.
// If Write encounters an error when preparing the transfer, the stream
// will still try to complete any pending transfers. The total number
// of bytes successfully written can be retrieved through a Written()
// call after Close() has returned.
// Write cannot be called concurrently with another Write, Written or Close.
func (w *WriteStream) Write(p []byte) (int, error) {
if w.s.transfers == nil || w.s.err != nil {
return 0, io.ErrClosedPipe
}
written := 0
all := len(p)
for written < all {
if s.current == nil {
s.current = <-s.transfers
s.total = len(s.current.data())
s.used = 0
t := <-w.s.transfers
n, err := t.wait() // unsubmitted transfers will return 0 bytes and no error
w.total += n
if err != nil {
t.free()
w.s.gotError(err)
// This branch is used only after all the transfers were set in flight.
// That means all transfers left in the queue are in flight.
// They must be ignored, since this wait() failed.
w.s.flushRemaining()
return written, err
}
use := all - written
if use > s.total {
use = s.total
if max := len(t.data()); use > max {
use = max
}
copy(s.current.data()[s.used:], p[written:written+use])
copy(t.data(), p[written:written+use])
if err := t.submit(); err != nil {
t.free()
w.s.gotError(err)
// Even though this submit failed, all the transfers in flight are still valid.
// Don't flush remaining transfers.
// We won't submit any more transfers.
w.s.noMore()
return written, err
}
written += use
w.s.transfers <- t // guaranteed non blocking
}
return 0, nil
return written, nil
}
func (w WriteStream) Flush() error {
return nil
// Close signals end of data to write. Close blocks until all transfers
// that were sent are finished. The error returned by Close is the first
// error encountered during writing the entire stream (if any).
// Close returning nil indicates all transfers completed successfuly.
// After Close, the total number of bytes successfuly written can be
// retrieved using Written().
// Close may not be called concurrently with Write, Close or Written.
func (w *WriteStream) Close() error {
if w.s.transfers == nil {
return io.ErrClosedPipe
}
w.s.noMore()
for t := range w.s.transfers {
n, err := t.wait()
w.total += n
t.free()
if err != nil {
w.s.gotError(err)
w.s.flushRemaining()
}
t.free()
}
w.s.transfers = nil
return w.s.err
}
*/
func newStream(tt []transferIntf, submit bool) *stream {
// Written returns the number of bytes successfuly written by the stream.
// Written may be called only after Close() has been called and returned.
func (w *WriteStream) Written() int {
return w.total
}
func newStream(tt []transferIntf) *stream {
s := &stream{
transfers: make(chan transferIntf, len(tt)),
}
for _, t := range tt {
if submit {
if err := t.submit(); err != nil {
t.free()
s.setDelayedErr(err)
break
}
}
s.transfers <- t
}
return s