--- a/streams.go Sat Apr 15 10:34:31 2017 +0200
+++ b/streams.go Sat Apr 15 12:35:20 2017 +0200
@@ -59,8 +59,9 @@
}
// readStream reads from the http.Response and sends events to the events channel
-// It stops when the connection is closed or when teh stopCh channel is closed.
-func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, r *http.Response) {
+// It stops when the connection is closed or when the stopCh channel is closed.
+// The foroutine will close the doneCh channel when it terminates.
+func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool, r *http.Response) {
defer r.Body.Close()
reader := bufio.NewReader(r.Body)
@@ -69,24 +70,23 @@
for {
select {
case <-stopCh:
- close(events)
+ close(doneCh)
return
default:
}
lineBytes, partial, err := reader.ReadLine()
if err != nil {
+ var e error
if err == io.EOF {
- e := fmt.Errorf("connection closed: %s", err.Error())
- log.Printf("Stream Reader: %s", e.Error())
- events <- StreamEvent{Event: "error", Error: e}
- close(events)
- return
+ e = fmt.Errorf("connection closed: %s", err.Error())
+ } else {
+ e = fmt.Errorf("read error: %s", err.Error())
}
- e := fmt.Errorf("ReadLine read error: %s", err.Error())
log.Printf("Stream Reader: %s", e.Error())
events <- StreamEvent{Event: "error", Error: e}
- time.Sleep(10 * time.Second)
+ close(doneCh)
+ return
}
if partial {
@@ -171,14 +171,14 @@
// The stream 'name' can be "user", "public" or "hashtag".
// For 'hashtag', the hashTag argument cannot be empty.
// The events are sent to the events channel (the errors as well).
-// The streaming is terminated if the stop channel is closed.
+// The streaming is terminated if the 'stopCh' channel is closed.
+// The 'doneCh' channel is closed if the connection is closed by the server.
// Please note that this method launches a goroutine to listen to the events.
-// The 'events' channel is closed if the connection is closed by the server.
-func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool) error {
+func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool) error {
resp, err := g.openStream(name, hashTag)
if err != nil {
return err
}
- go g.readStream(events, stopCh, resp)
+ go g.readStream(events, stopCh, doneCh, resp)
return nil
}