streams.go
changeset 122 50c7733ee886
parent 120 579912e9d0ef
child 128 a5a00fad7a32
--- a/streams.go	Sat Apr 15 10:34:31 2017 +0200
+++ b/streams.go	Sat Apr 15 12:35:20 2017 +0200
@@ -59,8 +59,9 @@
 }
 
 // readStream reads from the http.Response and sends events to the events channel
-// It stops when the connection is closed or when teh stopCh channel is closed.
-func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, r *http.Response) {
+// It stops when the connection is closed or when the stopCh channel is closed.
+// The foroutine will close the doneCh channel when it terminates.
+func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool, r *http.Response) {
 	defer r.Body.Close()
 
 	reader := bufio.NewReader(r.Body)
@@ -69,24 +70,23 @@
 	for {
 		select {
 		case <-stopCh:
-			close(events)
+			close(doneCh)
 			return
 		default:
 		}
 
 		lineBytes, partial, err := reader.ReadLine()
 		if err != nil {
+			var e error
 			if err == io.EOF {
-				e := fmt.Errorf("connection closed: %s", err.Error())
-				log.Printf("Stream Reader: %s", e.Error())
-				events <- StreamEvent{Event: "error", Error: e}
-				close(events)
-				return
+				e = fmt.Errorf("connection closed: %s", err.Error())
+			} else {
+				e = fmt.Errorf("read error: %s", err.Error())
 			}
-			e := fmt.Errorf("ReadLine read error: %s", err.Error())
 			log.Printf("Stream Reader: %s", e.Error())
 			events <- StreamEvent{Event: "error", Error: e}
-			time.Sleep(10 * time.Second)
+			close(doneCh)
+			return
 		}
 
 		if partial {
@@ -171,14 +171,14 @@
 // The stream 'name' can be "user", "public" or "hashtag".
 // For 'hashtag', the hashTag argument cannot be empty.
 // The events are sent to the events channel (the errors as well).
-// The streaming is terminated if the stop channel is closed.
+// The streaming is terminated if the 'stopCh' channel is closed.
+// The 'doneCh' channel is closed if the connection is closed by the server.
 // Please note that this method launches a goroutine to listen to the events.
-// The 'events' channel is closed if the connection is closed by the server.
-func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool) error {
+func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool) error {
 	resp, err := g.openStream(name, hashTag)
 	if err != nil {
 		return err
 	}
-	go g.readStream(events, stopCh, resp)
+	go g.readStream(events, stopCh, doneCh, resp)
 	return nil
 }