diff -r d237ffdd75c0 -r 7f8ac782cf5d streams.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/streams.go Fri Apr 14 22:56:50 2017 +0200 @@ -0,0 +1,182 @@ +package gondole + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + "strings" + "time" + + "github.com/sendgrid/rest" +) + +// StreamEvent contains a single event from the streaming API +type StreamEvent struct { + Event string // Name of the event (error, update, notification or delete) + Data interface{} // Status, Notification or status ID + Error error // Error message from the StreamListener +} + +// openStream opens a stream URL and returns an http.Response +// Note that the caller should close the connection when it's done reading +// the stream. +// The stream name can be "user", "public" or "hashtag". +// For "hashtag", the hashTag argument cannot be empty. +func (g *Client) openStream(streamName, hashTag string) (*http.Response, error) { + req := g.prepareRequest("streaming/" + streamName) + + switch streamName { + case "user", "public": + case "hashtag": + if hashTag == "" { + return nil, ErrInvalidParameter + } + req.QueryParams["tag"] = hashTag + default: + return nil, ErrInvalidParameter + } + + reqObj, err := rest.BuildRequestObject(req) + if err != nil { + return nil, fmt.Errorf("cannot build stream request: %s", err.Error()) + } + resp, err := rest.MakeRequest(reqObj) + if err != nil { + return nil, fmt.Errorf("cannot open stream: %s", err.Error()) + } + if resp.StatusCode != 200 { + resp.Body.Close() + return nil, errors.New(resp.Status) + } + return resp, nil +} + +// readStream reads from the http.Response and sends events to the events channel +// It stops when the connection is closed or when teh stopCh channel is closed. +func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, r *http.Response) { + defer r.Body.Close() + + reader := bufio.NewReader(r.Body) + + var line, eventName string + for { + select { + case <-stopCh: + close(events) + return + default: + } + + lineBytes, partial, err := reader.ReadLine() + if err != nil { + if err == io.EOF { + e := fmt.Errorf("connection closed: %s", err.Error()) + log.Printf("Stream Reader: %s", e.Error()) + events <- StreamEvent{Event: "error", Error: e} + close(events) + return + } + e := fmt.Errorf("ReadLine read error: %s", err.Error()) + log.Printf("Stream Reader: %s", e.Error()) + events <- StreamEvent{Event: "error", Error: e} + time.Sleep(10 * time.Second) + } + + if partial { + e := fmt.Errorf("received incomplete line; not supported yet") + log.Printf("Stream Reader: %s", e.Error()) + events <- StreamEvent{Event: "error", Error: e} + time.Sleep(5 * time.Second) + continue // Skip this + } + + line = string(bytes.TrimSpace(lineBytes)) + + if line == "" { + continue // Skip empty line + } + if strings.HasPrefix(line, ":") { + continue // Skip comment + } + + if strings.HasPrefix(line, "event: ") { + eventName = line[7:] + continue + } + + if !strings.HasPrefix(line, "data: ") { + // XXX Needs improvement + e := fmt.Errorf("received unhandled event line '%s'", strings.Split(line, ":")[0]) + log.Printf("Stream Reader: %s", e.Error()) + events <- StreamEvent{Event: "error", Error: e} + continue + } + + // This is a data line + data := []byte(line[6:]) + + var obj interface{} + + // Decode API object + switch eventName { + case "update": + var s Status + if err := json.Unmarshal(data, &s); err != nil { + e := fmt.Errorf("could not unmarshal data: %s", err.Error()) + log.Printf("Stream Reader: %s", e.Error()) + events <- StreamEvent{Event: "error", Error: e} + continue + } + obj = s + case "notification": + var notif Notification + if err := json.Unmarshal(data, ¬if); err != nil { + e := fmt.Errorf("could not unmarshal data: %s", err.Error()) + log.Printf("Stream Reader: %s", e.Error()) + events <- StreamEvent{Event: "error", Error: e} + continue + } + obj = notif + case "delete": + var statusID int + if err := json.Unmarshal(data, &statusID); err != nil { + e := fmt.Errorf("could not unmarshal data: %s", err.Error()) + log.Printf("Stream Reader: %s", e.Error()) + events <- StreamEvent{Event: "error", Error: e} + continue + } + obj = statusID + case "": + fallthrough + default: + e := fmt.Errorf("unhandled event '%s'", eventName) + log.Printf("Stream Reader: %s", e.Error()) + events <- StreamEvent{Event: "error", Error: e} + continue + } + + // Send event to the channel + events <- StreamEvent{Event: eventName, Data: obj} + } +} + +// StreamListener listens to a stream from the Mastodon server +// The stream 'name' can be "user", "public" or "hashtag". +// For 'hashtag', the hashTag argument cannot be empty. +// The events are sent to the events channel (the errors as well). +// The streaming is terminated if the stop channel is closed. +// Please note that this method launches a goroutine to listen to the events. +// The 'events' channel is closed if the connection is closed by the server. +func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool) error { + resp, err := g.openStream(name, hashTag) + if err != nil { + return err + } + go g.readStream(events, stopCh, resp) + return nil +}