From b50bc8761f8ef1f56209af4ca7d45aa385131b62 Mon Sep 17 00:00:00 2001 From: Sebastian Zagrodzki Date: Sun, 23 Apr 2017 10:45:04 +0200 Subject: [PATCH] First version of streaming transfers. --- rawread/main.go | 16 ++++- usb/device.go | 4 +- usb/endpoint.go | 18 +++-- usb/endpoint_stream.go | 44 ++++++++++++ usb/endpoint_test.go | 2 +- usb/transfer.go | 7 +- usb/transfer_stream.go | 134 ++++++++++++++++++++++++++++++++++++ usb/transfer_stream_test.go | 132 +++++++++++++++++++++++++++++++++++ 8 files changed, 345 insertions(+), 12 deletions(-) create mode 100644 usb/endpoint_stream.go create mode 100644 usb/transfer_stream.go create mode 100644 usb/transfer_stream_test.go diff --git a/rawread/main.go b/rawread/main.go index 04c70f5..40cefaf 100644 --- a/rawread/main.go +++ b/rawread/main.go @@ -19,6 +19,7 @@ package main import ( "flag" "fmt" + "io" "log" "os" "strconv" @@ -36,6 +37,7 @@ var ( endpoint = flag.Uint("endpoint", 1, "Endpoint number to which to connect (without the leading 0x8).") debug = flag.Int("debug", 3, "Debug level for libusb.") size = flag.Uint("read_size", 1024, "Number of bytes of data to read in a single transaction.") + bufSize = flag.Uint("buffer_size", 0, "Number of buffer transfers, for data prefetching.") num = flag.Uint("read_num", 0, "Number of read transactions to perform. 0 means infinite.") ) @@ -141,14 +143,24 @@ func main() { log.Printf("Connecting to endpoint %d...", *endpoint) ep, err := dev.InEndpoint(uint8(*config), uint8(*iface), uint8(*alternate), uint8(*endpoint)) if err != nil { - log.Fatalf("open: %s", err) + log.Fatalf("dev.InEndpoint(): %s", err) } log.Printf("Found endpoint: %s", ep) + var rdr io.Reader = ep + if *bufSize > 1 { + log.Print("Creating buffer...") + s, err := ep.NewStream(*size, *bufSize) + if err != nil { + log.Fatalf("ep.NewStream(): %v", err) + } + defer s.Close() + rdr = s + } log.Print("Reading...") buf := make([]byte, *size) for i := uint(0); *num == 0 || i < *num; i++ { - num, err := ep.Read(buf) + num, err := rdr.Read(buf) if err != nil { log.Fatalf("Reading from device failed: %v", err) } diff --git a/usb/device.go b/usb/device.go index 2e88e9f..dba7f77 100644 --- a/usb/device.go +++ b/usb/device.go @@ -190,9 +190,9 @@ func (d *Device) InEndpoint(cfgNum, ifNum, setNum, epNum uint8) (*InEndpoint, er if err != nil { return nil, err } + ep.SetTimeout(d.ReadTimeout) return &InEndpoint{ endpoint: ep, - timeout: d.ReadTimeout, }, nil } @@ -202,9 +202,9 @@ func (d *Device) OutEndpoint(cfgNum, ifNum, setNum, epNum uint8) (*OutEndpoint, if err != nil { return nil, err } + ep.SetTimeout(d.WriteTimeout) return &OutEndpoint{ endpoint: ep, - timeout: d.WriteTimeout, }, nil } diff --git a/usb/endpoint.go b/usb/endpoint.go index 7563b4e..f13c729 100644 --- a/usb/endpoint.go +++ b/usb/endpoint.go @@ -73,6 +73,8 @@ type endpoint struct { InterfaceSetting Info EndpointInfo + + timeout time.Duration } // String returns a human-readable description of the endpoint. @@ -80,12 +82,18 @@ func (e *endpoint) String() string { return e.Info.String() } -func (e *endpoint) transfer(buf []byte, timeout time.Duration) (int, error) { +// SetTimeout sets a timeout duration for all new USB involving +// this endpoint. +func (e *endpoint) SetTimeout(t time.Duration) { + e.timeout = t +} + +func (e *endpoint) transfer(buf []byte) (int, error) { if len(buf) == 0 { return 0, nil } - t, err := newUSBTransfer(e.h, &e.Info, buf, timeout) + t, err := newUSBTransfer(e.h, &e.Info, buf, e.timeout) if err != nil { return 0, err } @@ -113,21 +121,19 @@ func newEndpoint(h *libusbDevHandle, s InterfaceSetting, e EndpointInfo) *endpoi // InEndpoint represents an IN endpoint open for transfer. type InEndpoint struct { *endpoint - timeout time.Duration } // Read reads data from an IN endpoint. func (e *InEndpoint) Read(buf []byte) (int, error) { - return e.transfer(buf, e.timeout) + return e.transfer(buf) } // OutEndpoint represents an OUT endpoint open for transfer. type OutEndpoint struct { *endpoint - timeout time.Duration } // Write writes data to an OUT endpoint. func (e *OutEndpoint) Write(buf []byte) (int, error) { - return e.transfer(buf, e.timeout) + return e.transfer(buf) } diff --git a/usb/endpoint_stream.go b/usb/endpoint_stream.go new file mode 100644 index 0000000..85b4404 --- /dev/null +++ b/usb/endpoint_stream.go @@ -0,0 +1,44 @@ +// Copyright 2017 the gousb Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package usb + +func (e *endpoint) newStream(size, count uint, submit bool) (*stream, error) { + var ts []transferIntf + for i := uint(0); i < count; i++ { + t, err := newUSBTransfer(e.h, &e.Info, make([]byte, size), e.timeout) + if err != nil { + for _, t := range ts { + t.free() + } + return nil, err + } + ts = append(ts, t) + } + return newStream(ts, submit), nil +} + +// NewStream prepares a new read stream that will keep reading data from the +// endpoint until closed. +// Size defines a buffer size for a single read transaction and count +// defines how many transactions should be active at any time. +// By keeping multiple transfers active at the same time, a Stream reduces +// the latency between subsequent transfers and increases reading throughput. +func (e *InEndpoint) NewStream(size, count uint) (ReadStream, error) { + s, err := e.newStream(size, count, true) + if err != nil { + return ReadStream{}, err + } + return ReadStream{s}, nil +} diff --git a/usb/endpoint_test.go b/usb/endpoint_test.go index 9928b70..9cb08bf 100644 --- a/usb/endpoint_test.go +++ b/usb/endpoint_test.go @@ -93,7 +93,7 @@ func TestEndpoint(t *testing.T) { fakeT.status = tc.status close(fakeT.done) }() - got, err := ep.transfer(tc.buf, time.Second) + got, err := ep.transfer(tc.buf) if (err != nil) != tc.wantErr { t.Errorf("%s, %s: ep.transfer(...): got err: %v, err != nil is %v, want %v", epData.ei, tc.desc, err, err != nil, tc.wantErr) continue diff --git a/usb/transfer.go b/usb/transfer.go index 6307ebb..996d82f 100644 --- a/usb/transfer.go +++ b/usb/transfer.go @@ -33,7 +33,7 @@ type usbTransfer struct { // done is blocking until the transfer is complete and data and transfer // status are available. done chan struct{} - // submitted is true if this transfer was passed to libusb through submit() + // submitted is true if submit() was called on this transfer. submitted bool } @@ -110,6 +110,11 @@ func (t *usbTransfer) free() error { return nil } +// data returns the slice containing transfer buffer. +func (t *usbTransfer) data() []byte { + return t.buf +} + // newUSBTransfer allocates a new transfer structure for communication with a // given device/endpoint, with buf as the underlying transfer buffer. func newUSBTransfer(dev *libusbDevHandle, ei *EndpointInfo, buf []byte, timeout time.Duration) (*usbTransfer, error) { diff --git a/usb/transfer_stream.go b/usb/transfer_stream.go new file mode 100644 index 0000000..4d5d882 --- /dev/null +++ b/usb/transfer_stream.go @@ -0,0 +1,134 @@ +// Copyright 2017 the gousb Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package usb + +import "io" + +type transferIntf interface { + submit() error + cancel() error + wait() (int, error) + free() error + data() []byte +} + +type stream struct { + // a fifo of USB transfers. + transfers chan transferIntf + // current holds the last transfer to return. + current transferIntf + // total/used are the number of all/used bytes in the current transfer. + total, used int + // err is the first error encountered, returned to the user as soon + // as all remaining data was read. + err error +} + +func (s *stream) cleanup() { + close(s.transfers) + for t := range s.transfers { + t.cancel() + t.wait() + t.free() + } +} + +type ReadStream struct { + s *stream +} + +// Read reads data from the transfer stream. +// The data will come from at most a single transfer, so the returned number +// might be smaller than the length of p. +// After a non-nil error is returned, all subsequent attempts to read will +// return io.ErrClosedPipe. +func (r ReadStream) Read(p []byte) (int, error) { + s := r.s + if s.current == nil { + t, ok := <-s.transfers + if !ok { + // no more transfers in flight + retErr := io.ErrClosedPipe + if s.err != nil { + retErr = s.err + s.err = nil + } + return 0, retErr + } + n, err := t.wait() + if err != nil { + s.err = err + } + s.current = t + s.total = n + s.used = 0 + } + use := s.total - s.used + if use > len(p) { + use = len(p) + } + copy(p, s.current.data()[s.used:s.used+use]) + s.used += use + if s.used == s.total { + if s.err == nil { + if err := s.current.submit(); err == nil { + // guaranteed to not block, len(transfers) == number of allocated transfers + s.transfers <- s.current + } else { + s.err = err + } + } + if s.err != nil { + s.current.free() + } + s.current = nil + } + var retErr error + if s.current == nil && s.err != nil { + s.cleanup() + retErr = s.err + s.err = nil + } + return use, retErr +} + +// Close signals that the transfer should stop. After Close is called, +// subsequent Read()s will return data from all transfers that were already +// in progress before returning an io.EOF error, unless another error +// was encountered earlier. +func (r ReadStream) Close() { + s := r.s + if s.err != nil { + s.err = io.EOF + } +} + +func newStream(tt []transferIntf, submit bool) *stream { + s := &stream{ + transfers: make(chan transferIntf, len(tt)), + } + for _, t := range tt { + s.transfers <- t + } + if submit { + for _, t := range tt { + if err := t.submit(); err != nil { + s.err = err + break + } + } + } + return s +} diff --git a/usb/transfer_stream_test.go b/usb/transfer_stream_test.go new file mode 100644 index 0000000..570f055 --- /dev/null +++ b/usb/transfer_stream_test.go @@ -0,0 +1,132 @@ +// Copyright 2017 the gousb Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package usb + +import ( + "errors" + "io" + "testing" +) + +var fakeTransferBuf = make([]byte, 1500) + +type fakeStreamResult struct { + n int + waitErr error + submitErr error +} + +type fakeStreamTransfer struct { + res []fakeStreamResult + inFlight bool + released bool +} + +func (f *fakeStreamTransfer) submit() error { + if f.released { + return errors.New("submit() called on a free()d transfer") + } + if f.inFlight { + return errors.New("submit() called twice") + } + if len(f.res) == 0 { + return io.ErrUnexpectedEOF + } + f.inFlight = true + res := f.res[0] + if res.submitErr != nil { + f.res = nil + return res.submitErr + } + return nil +} + +func (f *fakeStreamTransfer) wait() (int, error) { + if f.released { + return 0, errors.New("wait() called on a free()d transfer") + } + if !f.inFlight { + return 0, errors.New("wait() called without submit()") + } + if len(f.res) == 0 { + return 0, io.ErrUnexpectedEOF + } + f.inFlight = false + res := f.res[0] + if res.waitErr != nil { + f.res = nil + } + return res.n, res.waitErr +} + +func (f *fakeStreamTransfer) free() error { + if f.released { + return errors.New("free() called twice") + } + f.released = true + return nil +} + +func (f *fakeStreamTransfer) cancel() error { return nil } +func (f *fakeStreamTransfer) data() []byte { return fakeTransferBuf } + +var sentinelError = errors.New("sentinel error") + +func TestReadStream(t *testing.T) { + transfers := []*fakeStreamTransfer{ + {res: []fakeStreamResult{ + {n: 500}, + }}, + {res: []fakeStreamResult{ + {n: 500}, + }}, + {res: []fakeStreamResult{ + {n: 123, waitErr: sentinelError}, + }}, + {res: []fakeStreamResult{ + {n: 500}, + }}, + } + intfs := make([]transferIntf, len(transfers)) + for i := range transfers { + intfs[i] = transfers[i] + } + s := ReadStream{newStream(intfs, true)} + buf := make([]byte, 400) + for _, rs := range []struct { + want int + err error + }{ + {400, nil}, + {100, nil}, + {400, nil}, + {100, nil}, + {123, sentinelError}, + {0, io.ErrClosedPipe}, + } { + n, err := s.Read(buf) + if n != rs.want { + t.Errorf("Read(): got %d bytes, want %d", n, rs.want) + } + if err != rs.err { + t.Errorf("Read(): got error %v, want %v", err, rs.err) + } + } + for i := range transfers { + if !transfers[i].released { + t.Errorf("Transfer #%d was not freed after stream completed", i) + } + } +}