streams.go
changeset 117 7f8ac782cf5d
child 120 579912e9d0ef
equal deleted inserted replaced
116:d237ffdd75c0 117:7f8ac782cf5d
       
     1 package gondole
       
     2 
       
     3 import (
       
     4 	"bufio"
       
     5 	"bytes"
       
     6 	"encoding/json"
       
     7 	"errors"
       
     8 	"fmt"
       
     9 	"io"
       
    10 	"log"
       
    11 	"net/http"
       
    12 	"strings"
       
    13 	"time"
       
    14 
       
    15 	"github.com/sendgrid/rest"
       
    16 )
       
    17 
       
    18 // StreamEvent contains a single event from the streaming API
       
    19 type StreamEvent struct {
       
    20 	Event string      // Name of the event (error, update, notification or delete)
       
    21 	Data  interface{} // Status, Notification or status ID
       
    22 	Error error       // Error message from the StreamListener
       
    23 }
       
    24 
       
    25 // openStream opens a stream URL and returns an http.Response
       
    26 // Note that the caller should close the connection when it's done reading
       
    27 // the stream.
       
    28 // The stream name can be "user", "public" or "hashtag".
       
    29 // For "hashtag", the hashTag argument cannot be empty.
       
    30 func (g *Client) openStream(streamName, hashTag string) (*http.Response, error) {
       
    31 	req := g.prepareRequest("streaming/" + streamName)
       
    32 
       
    33 	switch streamName {
       
    34 	case "user", "public":
       
    35 	case "hashtag":
       
    36 		if hashTag == "" {
       
    37 			return nil, ErrInvalidParameter
       
    38 		}
       
    39 		req.QueryParams["tag"] = hashTag
       
    40 	default:
       
    41 		return nil, ErrInvalidParameter
       
    42 	}
       
    43 
       
    44 	reqObj, err := rest.BuildRequestObject(req)
       
    45 	if err != nil {
       
    46 		return nil, fmt.Errorf("cannot build stream request: %s", err.Error())
       
    47 	}
       
    48 	resp, err := rest.MakeRequest(reqObj)
       
    49 	if err != nil {
       
    50 		return nil, fmt.Errorf("cannot open stream: %s", err.Error())
       
    51 	}
       
    52 	if resp.StatusCode != 200 {
       
    53 		resp.Body.Close()
       
    54 		return nil, errors.New(resp.Status)
       
    55 	}
       
    56 	return resp, nil
       
    57 }
       
    58 
       
    59 // readStream reads from the http.Response and sends events to the events channel
       
    60 // It stops when the connection is closed or when teh stopCh channel is closed.
       
    61 func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, r *http.Response) {
       
    62 	defer r.Body.Close()
       
    63 
       
    64 	reader := bufio.NewReader(r.Body)
       
    65 
       
    66 	var line, eventName string
       
    67 	for {
       
    68 		select {
       
    69 		case <-stopCh:
       
    70 			close(events)
       
    71 			return
       
    72 		default:
       
    73 		}
       
    74 
       
    75 		lineBytes, partial, err := reader.ReadLine()
       
    76 		if err != nil {
       
    77 			if err == io.EOF {
       
    78 				e := fmt.Errorf("connection closed: %s", err.Error())
       
    79 				log.Printf("Stream Reader: %s", e.Error())
       
    80 				events <- StreamEvent{Event: "error", Error: e}
       
    81 				close(events)
       
    82 				return
       
    83 			}
       
    84 			e := fmt.Errorf("ReadLine read error: %s", err.Error())
       
    85 			log.Printf("Stream Reader: %s", e.Error())
       
    86 			events <- StreamEvent{Event: "error", Error: e}
       
    87 			time.Sleep(10 * time.Second)
       
    88 		}
       
    89 
       
    90 		if partial {
       
    91 			e := fmt.Errorf("received incomplete line; not supported yet")
       
    92 			log.Printf("Stream Reader: %s", e.Error())
       
    93 			events <- StreamEvent{Event: "error", Error: e}
       
    94 			time.Sleep(5 * time.Second)
       
    95 			continue // Skip this
       
    96 		}
       
    97 
       
    98 		line = string(bytes.TrimSpace(lineBytes))
       
    99 
       
   100 		if line == "" {
       
   101 			continue // Skip empty line
       
   102 		}
       
   103 		if strings.HasPrefix(line, ":") {
       
   104 			continue // Skip comment
       
   105 		}
       
   106 
       
   107 		if strings.HasPrefix(line, "event: ") {
       
   108 			eventName = line[7:]
       
   109 			continue
       
   110 		}
       
   111 
       
   112 		if !strings.HasPrefix(line, "data: ") {
       
   113 			// XXX Needs improvement
       
   114 			e := fmt.Errorf("received unhandled event line '%s'", strings.Split(line, ":")[0])
       
   115 			log.Printf("Stream Reader: %s", e.Error())
       
   116 			events <- StreamEvent{Event: "error", Error: e}
       
   117 			continue
       
   118 		}
       
   119 
       
   120 		// This is a data line
       
   121 		data := []byte(line[6:])
       
   122 
       
   123 		var obj interface{}
       
   124 
       
   125 		// Decode API object
       
   126 		switch eventName {
       
   127 		case "update":
       
   128 			var s Status
       
   129 			if err := json.Unmarshal(data, &s); err != nil {
       
   130 				e := fmt.Errorf("could not unmarshal data: %s", err.Error())
       
   131 				log.Printf("Stream Reader: %s", e.Error())
       
   132 				events <- StreamEvent{Event: "error", Error: e}
       
   133 				continue
       
   134 			}
       
   135 			obj = s
       
   136 		case "notification":
       
   137 			var notif Notification
       
   138 			if err := json.Unmarshal(data, &notif); err != nil {
       
   139 				e := fmt.Errorf("could not unmarshal data: %s", err.Error())
       
   140 				log.Printf("Stream Reader: %s", e.Error())
       
   141 				events <- StreamEvent{Event: "error", Error: e}
       
   142 				continue
       
   143 			}
       
   144 			obj = notif
       
   145 		case "delete":
       
   146 			var statusID int
       
   147 			if err := json.Unmarshal(data, &statusID); err != nil {
       
   148 				e := fmt.Errorf("could not unmarshal data: %s", err.Error())
       
   149 				log.Printf("Stream Reader: %s", e.Error())
       
   150 				events <- StreamEvent{Event: "error", Error: e}
       
   151 				continue
       
   152 			}
       
   153 			obj = statusID
       
   154 		case "":
       
   155 			fallthrough
       
   156 		default:
       
   157 			e := fmt.Errorf("unhandled event '%s'", eventName)
       
   158 			log.Printf("Stream Reader: %s", e.Error())
       
   159 			events <- StreamEvent{Event: "error", Error: e}
       
   160 			continue
       
   161 		}
       
   162 
       
   163 		// Send event to the channel
       
   164 		events <- StreamEvent{Event: eventName, Data: obj}
       
   165 	}
       
   166 }
       
   167 
       
   168 // StreamListener listens to a stream from the Mastodon server
       
   169 // The stream 'name' can be "user", "public" or "hashtag".
       
   170 // For 'hashtag', the hashTag argument cannot be empty.
       
   171 // The events are sent to the events channel (the errors as well).
       
   172 // The streaming is terminated if the stop channel is closed.
       
   173 // Please note that this method launches a goroutine to listen to the events.
       
   174 // The 'events' channel is closed if the connection is closed by the server.
       
   175 func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool) error {
       
   176 	resp, err := g.openStream(name, hashTag)
       
   177 	if err != nil {
       
   178 		return err
       
   179 	}
       
   180 	go g.readStream(events, stopCh, resp)
       
   181 	return nil
       
   182 }