57 } |
57 } |
58 return resp, nil |
58 return resp, nil |
59 } |
59 } |
60 |
60 |
61 // readStream reads from the http.Response and sends events to the events channel |
61 // readStream reads from the http.Response and sends events to the events channel |
62 // It stops when the connection is closed or when teh stopCh channel is closed. |
62 // It stops when the connection is closed or when the stopCh channel is closed. |
63 func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, r *http.Response) { |
63 // The foroutine will close the doneCh channel when it terminates. |
|
64 func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool, r *http.Response) { |
64 defer r.Body.Close() |
65 defer r.Body.Close() |
65 |
66 |
66 reader := bufio.NewReader(r.Body) |
67 reader := bufio.NewReader(r.Body) |
67 |
68 |
68 var line, eventName string |
69 var line, eventName string |
69 for { |
70 for { |
70 select { |
71 select { |
71 case <-stopCh: |
72 case <-stopCh: |
72 close(events) |
73 close(doneCh) |
73 return |
74 return |
74 default: |
75 default: |
75 } |
76 } |
76 |
77 |
77 lineBytes, partial, err := reader.ReadLine() |
78 lineBytes, partial, err := reader.ReadLine() |
78 if err != nil { |
79 if err != nil { |
|
80 var e error |
79 if err == io.EOF { |
81 if err == io.EOF { |
80 e := fmt.Errorf("connection closed: %s", err.Error()) |
82 e = fmt.Errorf("connection closed: %s", err.Error()) |
81 log.Printf("Stream Reader: %s", e.Error()) |
83 } else { |
82 events <- StreamEvent{Event: "error", Error: e} |
84 e = fmt.Errorf("read error: %s", err.Error()) |
83 close(events) |
|
84 return |
|
85 } |
85 } |
86 e := fmt.Errorf("ReadLine read error: %s", err.Error()) |
|
87 log.Printf("Stream Reader: %s", e.Error()) |
86 log.Printf("Stream Reader: %s", e.Error()) |
88 events <- StreamEvent{Event: "error", Error: e} |
87 events <- StreamEvent{Event: "error", Error: e} |
89 time.Sleep(10 * time.Second) |
88 close(doneCh) |
|
89 return |
90 } |
90 } |
91 |
91 |
92 if partial { |
92 if partial { |
93 e := fmt.Errorf("received incomplete line; not supported yet") |
93 e := fmt.Errorf("received incomplete line; not supported yet") |
94 log.Printf("Stream Reader: %s", e.Error()) |
94 log.Printf("Stream Reader: %s", e.Error()) |
169 |
169 |
170 // StreamListener listens to a stream from the Mastodon server |
170 // StreamListener listens to a stream from the Mastodon server |
171 // The stream 'name' can be "user", "public" or "hashtag". |
171 // The stream 'name' can be "user", "public" or "hashtag". |
172 // For 'hashtag', the hashTag argument cannot be empty. |
172 // For 'hashtag', the hashTag argument cannot be empty. |
173 // The events are sent to the events channel (the errors as well). |
173 // The events are sent to the events channel (the errors as well). |
174 // The streaming is terminated if the stop channel is closed. |
174 // The streaming is terminated if the 'stopCh' channel is closed. |
|
175 // The 'doneCh' channel is closed if the connection is closed by the server. |
175 // Please note that this method launches a goroutine to listen to the events. |
176 // Please note that this method launches a goroutine to listen to the events. |
176 // The 'events' channel is closed if the connection is closed by the server. |
177 func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool) error { |
177 func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool) error { |
|
178 resp, err := g.openStream(name, hashTag) |
178 resp, err := g.openStream(name, hashTag) |
179 if err != nil { |
179 if err != nil { |
180 return err |
180 return err |
181 } |
181 } |
182 go g.readStream(events, stopCh, resp) |
182 go g.readStream(events, stopCh, doneCh, resp) |
183 return nil |
183 return nil |
184 } |
184 } |