Change StreamListener prototype
authorMikael Berthe <mikael@lilotux.net>
Sat, 15 Apr 2017 12:35:20 +0200
changeset 122 50c7733ee886
parent 121 d192d9d0adfd
child 123 9b566c020a17
Change StreamListener prototype Change StreamListener prototype in order to be able to use several StreamListener's with the same event channel. Stop a listener when the Readline call fails, since we probably cannot know if we can resume safely.
streams.go
--- a/streams.go	Sat Apr 15 10:34:31 2017 +0200
+++ b/streams.go	Sat Apr 15 12:35:20 2017 +0200
@@ -59,8 +59,9 @@
 }
 
 // readStream reads from the http.Response and sends events to the events channel
-// It stops when the connection is closed or when teh stopCh channel is closed.
-func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, r *http.Response) {
+// It stops when the connection is closed or when the stopCh channel is closed.
+// The foroutine will close the doneCh channel when it terminates.
+func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool, r *http.Response) {
 	defer r.Body.Close()
 
 	reader := bufio.NewReader(r.Body)
@@ -69,24 +70,23 @@
 	for {
 		select {
 		case <-stopCh:
-			close(events)
+			close(doneCh)
 			return
 		default:
 		}
 
 		lineBytes, partial, err := reader.ReadLine()
 		if err != nil {
+			var e error
 			if err == io.EOF {
-				e := fmt.Errorf("connection closed: %s", err.Error())
-				log.Printf("Stream Reader: %s", e.Error())
-				events <- StreamEvent{Event: "error", Error: e}
-				close(events)
-				return
+				e = fmt.Errorf("connection closed: %s", err.Error())
+			} else {
+				e = fmt.Errorf("read error: %s", err.Error())
 			}
-			e := fmt.Errorf("ReadLine read error: %s", err.Error())
 			log.Printf("Stream Reader: %s", e.Error())
 			events <- StreamEvent{Event: "error", Error: e}
-			time.Sleep(10 * time.Second)
+			close(doneCh)
+			return
 		}
 
 		if partial {
@@ -171,14 +171,14 @@
 // The stream 'name' can be "user", "public" or "hashtag".
 // For 'hashtag', the hashTag argument cannot be empty.
 // The events are sent to the events channel (the errors as well).
-// The streaming is terminated if the stop channel is closed.
+// The streaming is terminated if the 'stopCh' channel is closed.
+// The 'doneCh' channel is closed if the connection is closed by the server.
 // Please note that this method launches a goroutine to listen to the events.
-// The 'events' channel is closed if the connection is closed by the server.
-func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool) error {
+func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool) error {
 	resp, err := g.openStream(name, hashTag)
 	if err != nil {
 		return err
 	}
-	go g.readStream(events, stopCh, resp)
+	go g.readStream(events, stopCh, doneCh, resp)
 	return nil
 }