31 // openStream opens a stream URL and returns an http.Response |
31 // openStream opens a stream URL and returns an http.Response |
32 // Note that the caller should close the connection when it's done reading |
32 // Note that the caller should close the connection when it's done reading |
33 // the stream. |
33 // the stream. |
34 // The stream name can be "user", "public" or "hashtag". |
34 // The stream name can be "user", "public" or "hashtag". |
35 // For "hashtag", the hashTag argument cannot be empty. |
35 // For "hashtag", the hashTag argument cannot be empty. |
36 func (g *Client) openStream(streamName, hashTag string) (*http.Response, error) { |
36 func (mc *Client) openStream(streamName, hashTag string) (*http.Response, error) { |
37 params := make(apiCallParams) |
37 params := make(apiCallParams) |
38 |
38 |
39 switch streamName { |
39 switch streamName { |
40 case "user", "public": |
40 case "user", "public": |
41 case "hashtag": |
41 case "hashtag": |
45 params["tag"] = hashTag |
45 params["tag"] = hashTag |
46 default: |
46 default: |
47 return nil, ErrInvalidParameter |
47 return nil, ErrInvalidParameter |
48 } |
48 } |
49 |
49 |
50 req, err := g.prepareRequest("streaming/"+streamName, rest.Get, params) |
50 req, err := mc.prepareRequest("streaming/"+streamName, rest.Get, params) |
51 if err != nil { |
51 if err != nil { |
52 return nil, fmt.Errorf("cannot build stream request: %s", err.Error()) |
52 return nil, fmt.Errorf("cannot build stream request: %s", err.Error()) |
53 } |
53 } |
54 |
54 |
55 reqObj, err := rest.BuildRequestObject(req) |
55 reqObj, err := rest.BuildRequestObject(req) |
69 } |
69 } |
70 |
70 |
71 // readStream reads from the http.Response and sends events to the events channel |
71 // readStream reads from the http.Response and sends events to the events channel |
72 // It stops when the connection is closed or when the stopCh channel is closed. |
72 // It stops when the connection is closed or when the stopCh channel is closed. |
73 // The foroutine will close the doneCh channel when it terminates. |
73 // The foroutine will close the doneCh channel when it terminates. |
74 func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool, r *http.Response) { |
74 func (mc *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool, r *http.Response) { |
75 defer r.Body.Close() |
75 defer r.Body.Close() |
76 |
76 |
77 reader := bufio.NewReader(r.Body) |
77 reader := bufio.NewReader(r.Body) |
78 |
78 |
79 var line, eventName string |
79 var line, eventName string |
182 // For 'hashtag', the hashTag argument cannot be empty. |
182 // For 'hashtag', the hashTag argument cannot be empty. |
183 // The events are sent to the events channel (the errors as well). |
183 // The events are sent to the events channel (the errors as well). |
184 // The streaming is terminated if the 'stopCh' channel is closed. |
184 // The streaming is terminated if the 'stopCh' channel is closed. |
185 // The 'doneCh' channel is closed if the connection is closed by the server. |
185 // The 'doneCh' channel is closed if the connection is closed by the server. |
186 // Please note that this method launches a goroutine to listen to the events. |
186 // Please note that this method launches a goroutine to listen to the events. |
187 func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool) error { |
187 func (mc *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool) error { |
188 if g == nil { |
188 if mc == nil { |
189 return fmt.Errorf("use of uninitialized gondole client") |
189 return ErrUninitializedClient |
190 } |
190 } |
191 |
191 |
192 resp, err := g.openStream(name, hashTag) |
192 resp, err := mc.openStream(name, hashTag) |
193 if err != nil { |
193 if err != nil { |
194 return err |
194 return err |
195 } |
195 } |
196 go g.readStream(events, stopCh, doneCh, resp) |
196 go mc.readStream(events, stopCh, doneCh, resp) |
197 return nil |
197 return nil |
198 } |
198 } |