streams.go
changeset 138 23d3a518d0ad
parent 130 c450bb73f59a
child 140 7665451f74cc
equal deleted inserted replaced
137:acaea3179f4d 138:23d3a518d0ad
     2 Copyright 2017 Mikael Berthe
     2 Copyright 2017 Mikael Berthe
     3 
     3 
     4 Licensed under the MIT license.  Please see the LICENSE file is this directory.
     4 Licensed under the MIT license.  Please see the LICENSE file is this directory.
     5 */
     5 */
     6 
     6 
     7 package gondole
     7 package madon
     8 
     8 
     9 import (
     9 import (
    10 	"bufio"
    10 	"bufio"
    11 	"bytes"
    11 	"bytes"
    12 	"encoding/json"
    12 	"encoding/json"
    31 // openStream opens a stream URL and returns an http.Response
    31 // openStream opens a stream URL and returns an http.Response
    32 // Note that the caller should close the connection when it's done reading
    32 // Note that the caller should close the connection when it's done reading
    33 // the stream.
    33 // the stream.
    34 // The stream name can be "user", "public" or "hashtag".
    34 // The stream name can be "user", "public" or "hashtag".
    35 // For "hashtag", the hashTag argument cannot be empty.
    35 // For "hashtag", the hashTag argument cannot be empty.
    36 func (g *Client) openStream(streamName, hashTag string) (*http.Response, error) {
    36 func (mc *Client) openStream(streamName, hashTag string) (*http.Response, error) {
    37 	params := make(apiCallParams)
    37 	params := make(apiCallParams)
    38 
    38 
    39 	switch streamName {
    39 	switch streamName {
    40 	case "user", "public":
    40 	case "user", "public":
    41 	case "hashtag":
    41 	case "hashtag":
    45 		params["tag"] = hashTag
    45 		params["tag"] = hashTag
    46 	default:
    46 	default:
    47 		return nil, ErrInvalidParameter
    47 		return nil, ErrInvalidParameter
    48 	}
    48 	}
    49 
    49 
    50 	req, err := g.prepareRequest("streaming/"+streamName, rest.Get, params)
    50 	req, err := mc.prepareRequest("streaming/"+streamName, rest.Get, params)
    51 	if err != nil {
    51 	if err != nil {
    52 		return nil, fmt.Errorf("cannot build stream request: %s", err.Error())
    52 		return nil, fmt.Errorf("cannot build stream request: %s", err.Error())
    53 	}
    53 	}
    54 
    54 
    55 	reqObj, err := rest.BuildRequestObject(req)
    55 	reqObj, err := rest.BuildRequestObject(req)
    69 }
    69 }
    70 
    70 
    71 // readStream reads from the http.Response and sends events to the events channel
    71 // readStream reads from the http.Response and sends events to the events channel
    72 // It stops when the connection is closed or when the stopCh channel is closed.
    72 // It stops when the connection is closed or when the stopCh channel is closed.
    73 // The foroutine will close the doneCh channel when it terminates.
    73 // The foroutine will close the doneCh channel when it terminates.
    74 func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool, r *http.Response) {
    74 func (mc *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool, r *http.Response) {
    75 	defer r.Body.Close()
    75 	defer r.Body.Close()
    76 
    76 
    77 	reader := bufio.NewReader(r.Body)
    77 	reader := bufio.NewReader(r.Body)
    78 
    78 
    79 	var line, eventName string
    79 	var line, eventName string
   182 // For 'hashtag', the hashTag argument cannot be empty.
   182 // For 'hashtag', the hashTag argument cannot be empty.
   183 // The events are sent to the events channel (the errors as well).
   183 // The events are sent to the events channel (the errors as well).
   184 // The streaming is terminated if the 'stopCh' channel is closed.
   184 // The streaming is terminated if the 'stopCh' channel is closed.
   185 // The 'doneCh' channel is closed if the connection is closed by the server.
   185 // The 'doneCh' channel is closed if the connection is closed by the server.
   186 // Please note that this method launches a goroutine to listen to the events.
   186 // Please note that this method launches a goroutine to listen to the events.
   187 func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool) error {
   187 func (mc *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool) error {
   188 	if g == nil {
   188 	if mc == nil {
   189 		return fmt.Errorf("use of uninitialized gondole client")
   189 		return ErrUninitializedClient
   190 	}
   190 	}
   191 
   191 
   192 	resp, err := g.openStream(name, hashTag)
   192 	resp, err := mc.openStream(name, hashTag)
   193 	if err != nil {
   193 	if err != nil {
   194 		return err
   194 		return err
   195 	}
   195 	}
   196 	go g.readStream(events, stopCh, doneCh, resp)
   196 	go mc.readStream(events, stopCh, doneCh, resp)
   197 	return nil
   197 	return nil
   198 }
   198 }