# HG changeset patch # User Mikael Berthe # Date 1492252520 -7200 # Node ID 50c7733ee886298a86574a4ece6ac632ae62a7ec # Parent d192d9d0adfd50df3cd1b3b12c8bc0ccb3b014bf Change StreamListener prototype Change StreamListener prototype in order to be able to use several StreamListener's with the same event channel. Stop a listener when the Readline call fails, since we probably cannot know if we can resume safely. diff -r d192d9d0adfd -r 50c7733ee886 streams.go --- a/streams.go Sat Apr 15 10:34:31 2017 +0200 +++ b/streams.go Sat Apr 15 12:35:20 2017 +0200 @@ -59,8 +59,9 @@ } // readStream reads from the http.Response and sends events to the events channel -// It stops when the connection is closed or when teh stopCh channel is closed. -func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, r *http.Response) { +// It stops when the connection is closed or when the stopCh channel is closed. +// The foroutine will close the doneCh channel when it terminates. +func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool, r *http.Response) { defer r.Body.Close() reader := bufio.NewReader(r.Body) @@ -69,24 +70,23 @@ for { select { case <-stopCh: - close(events) + close(doneCh) return default: } lineBytes, partial, err := reader.ReadLine() if err != nil { + var e error if err == io.EOF { - e := fmt.Errorf("connection closed: %s", err.Error()) - log.Printf("Stream Reader: %s", e.Error()) - events <- StreamEvent{Event: "error", Error: e} - close(events) - return + e = fmt.Errorf("connection closed: %s", err.Error()) + } else { + e = fmt.Errorf("read error: %s", err.Error()) } - e := fmt.Errorf("ReadLine read error: %s", err.Error()) log.Printf("Stream Reader: %s", e.Error()) events <- StreamEvent{Event: "error", Error: e} - time.Sleep(10 * time.Second) + close(doneCh) + return } if partial { @@ -171,14 +171,14 @@ // The stream 'name' can be "user", "public" or "hashtag". // For 'hashtag', the hashTag argument cannot be empty. // The events are sent to the events channel (the errors as well). -// The streaming is terminated if the stop channel is closed. +// The streaming is terminated if the 'stopCh' channel is closed. +// The 'doneCh' channel is closed if the connection is closed by the server. // Please note that this method launches a goroutine to listen to the events. -// The 'events' channel is closed if the connection is closed by the server. -func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool) error { +func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool) error { resp, err := g.openStream(name, hashTag) if err != nil { return err } - go g.readStream(events, stopCh, resp) + go g.readStream(events, stopCh, doneCh, resp) return nil }