Add streaming API support
authorMikael Berthe <mikael@lilotux.net>
Fri, 14 Apr 2017 22:56:50 +0200
changeset 117 7f8ac782cf5d
parent 116 d237ffdd75c0
child 118 d9c798e09f0a
Add streaming API support
streams.go
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/streams.go	Fri Apr 14 22:56:50 2017 +0200
@@ -0,0 +1,182 @@
+package gondole
+
+import (
+	"bufio"
+	"bytes"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"log"
+	"net/http"
+	"strings"
+	"time"
+
+	"github.com/sendgrid/rest"
+)
+
+// StreamEvent contains a single event from the streaming API
+type StreamEvent struct {
+	Event string      // Name of the event (error, update, notification or delete)
+	Data  interface{} // Status, Notification or status ID
+	Error error       // Error message from the StreamListener
+}
+
+// openStream opens a stream URL and returns an http.Response
+// Note that the caller should close the connection when it's done reading
+// the stream.
+// The stream name can be "user", "public" or "hashtag".
+// For "hashtag", the hashTag argument cannot be empty.
+func (g *Client) openStream(streamName, hashTag string) (*http.Response, error) {
+	req := g.prepareRequest("streaming/" + streamName)
+
+	switch streamName {
+	case "user", "public":
+	case "hashtag":
+		if hashTag == "" {
+			return nil, ErrInvalidParameter
+		}
+		req.QueryParams["tag"] = hashTag
+	default:
+		return nil, ErrInvalidParameter
+	}
+
+	reqObj, err := rest.BuildRequestObject(req)
+	if err != nil {
+		return nil, fmt.Errorf("cannot build stream request: %s", err.Error())
+	}
+	resp, err := rest.MakeRequest(reqObj)
+	if err != nil {
+		return nil, fmt.Errorf("cannot open stream: %s", err.Error())
+	}
+	if resp.StatusCode != 200 {
+		resp.Body.Close()
+		return nil, errors.New(resp.Status)
+	}
+	return resp, nil
+}
+
+// readStream reads from the http.Response and sends events to the events channel
+// It stops when the connection is closed or when teh stopCh channel is closed.
+func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, r *http.Response) {
+	defer r.Body.Close()
+
+	reader := bufio.NewReader(r.Body)
+
+	var line, eventName string
+	for {
+		select {
+		case <-stopCh:
+			close(events)
+			return
+		default:
+		}
+
+		lineBytes, partial, err := reader.ReadLine()
+		if err != nil {
+			if err == io.EOF {
+				e := fmt.Errorf("connection closed: %s", err.Error())
+				log.Printf("Stream Reader: %s", e.Error())
+				events <- StreamEvent{Event: "error", Error: e}
+				close(events)
+				return
+			}
+			e := fmt.Errorf("ReadLine read error: %s", err.Error())
+			log.Printf("Stream Reader: %s", e.Error())
+			events <- StreamEvent{Event: "error", Error: e}
+			time.Sleep(10 * time.Second)
+		}
+
+		if partial {
+			e := fmt.Errorf("received incomplete line; not supported yet")
+			log.Printf("Stream Reader: %s", e.Error())
+			events <- StreamEvent{Event: "error", Error: e}
+			time.Sleep(5 * time.Second)
+			continue // Skip this
+		}
+
+		line = string(bytes.TrimSpace(lineBytes))
+
+		if line == "" {
+			continue // Skip empty line
+		}
+		if strings.HasPrefix(line, ":") {
+			continue // Skip comment
+		}
+
+		if strings.HasPrefix(line, "event: ") {
+			eventName = line[7:]
+			continue
+		}
+
+		if !strings.HasPrefix(line, "data: ") {
+			// XXX Needs improvement
+			e := fmt.Errorf("received unhandled event line '%s'", strings.Split(line, ":")[0])
+			log.Printf("Stream Reader: %s", e.Error())
+			events <- StreamEvent{Event: "error", Error: e}
+			continue
+		}
+
+		// This is a data line
+		data := []byte(line[6:])
+
+		var obj interface{}
+
+		// Decode API object
+		switch eventName {
+		case "update":
+			var s Status
+			if err := json.Unmarshal(data, &s); err != nil {
+				e := fmt.Errorf("could not unmarshal data: %s", err.Error())
+				log.Printf("Stream Reader: %s", e.Error())
+				events <- StreamEvent{Event: "error", Error: e}
+				continue
+			}
+			obj = s
+		case "notification":
+			var notif Notification
+			if err := json.Unmarshal(data, &notif); err != nil {
+				e := fmt.Errorf("could not unmarshal data: %s", err.Error())
+				log.Printf("Stream Reader: %s", e.Error())
+				events <- StreamEvent{Event: "error", Error: e}
+				continue
+			}
+			obj = notif
+		case "delete":
+			var statusID int
+			if err := json.Unmarshal(data, &statusID); err != nil {
+				e := fmt.Errorf("could not unmarshal data: %s", err.Error())
+				log.Printf("Stream Reader: %s", e.Error())
+				events <- StreamEvent{Event: "error", Error: e}
+				continue
+			}
+			obj = statusID
+		case "":
+			fallthrough
+		default:
+			e := fmt.Errorf("unhandled event '%s'", eventName)
+			log.Printf("Stream Reader: %s", e.Error())
+			events <- StreamEvent{Event: "error", Error: e}
+			continue
+		}
+
+		// Send event to the channel
+		events <- StreamEvent{Event: eventName, Data: obj}
+	}
+}
+
+// StreamListener listens to a stream from the Mastodon server
+// The stream 'name' can be "user", "public" or "hashtag".
+// For 'hashtag', the hashTag argument cannot be empty.
+// The events are sent to the events channel (the errors as well).
+// The streaming is terminated if the stop channel is closed.
+// Please note that this method launches a goroutine to listen to the events.
+// The 'events' channel is closed if the connection is closed by the server.
+func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool) error {
+	resp, err := g.openStream(name, hashTag)
+	if err != nil {
+		return err
+	}
+	go g.readStream(events, stopCh, resp)
+	return nil
+}