|
1 package gondole |
|
2 |
|
3 import ( |
|
4 "bufio" |
|
5 "bytes" |
|
6 "encoding/json" |
|
7 "errors" |
|
8 "fmt" |
|
9 "io" |
|
10 "log" |
|
11 "net/http" |
|
12 "strings" |
|
13 "time" |
|
14 |
|
15 "github.com/sendgrid/rest" |
|
16 ) |
|
17 |
|
18 // StreamEvent contains a single event from the streaming API |
|
19 type StreamEvent struct { |
|
20 Event string // Name of the event (error, update, notification or delete) |
|
21 Data interface{} // Status, Notification or status ID |
|
22 Error error // Error message from the StreamListener |
|
23 } |
|
24 |
|
25 // openStream opens a stream URL and returns an http.Response |
|
26 // Note that the caller should close the connection when it's done reading |
|
27 // the stream. |
|
28 // The stream name can be "user", "public" or "hashtag". |
|
29 // For "hashtag", the hashTag argument cannot be empty. |
|
30 func (g *Client) openStream(streamName, hashTag string) (*http.Response, error) { |
|
31 req := g.prepareRequest("streaming/" + streamName) |
|
32 |
|
33 switch streamName { |
|
34 case "user", "public": |
|
35 case "hashtag": |
|
36 if hashTag == "" { |
|
37 return nil, ErrInvalidParameter |
|
38 } |
|
39 req.QueryParams["tag"] = hashTag |
|
40 default: |
|
41 return nil, ErrInvalidParameter |
|
42 } |
|
43 |
|
44 reqObj, err := rest.BuildRequestObject(req) |
|
45 if err != nil { |
|
46 return nil, fmt.Errorf("cannot build stream request: %s", err.Error()) |
|
47 } |
|
48 resp, err := rest.MakeRequest(reqObj) |
|
49 if err != nil { |
|
50 return nil, fmt.Errorf("cannot open stream: %s", err.Error()) |
|
51 } |
|
52 if resp.StatusCode != 200 { |
|
53 resp.Body.Close() |
|
54 return nil, errors.New(resp.Status) |
|
55 } |
|
56 return resp, nil |
|
57 } |
|
58 |
|
59 // readStream reads from the http.Response and sends events to the events channel |
|
60 // It stops when the connection is closed or when teh stopCh channel is closed. |
|
61 func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, r *http.Response) { |
|
62 defer r.Body.Close() |
|
63 |
|
64 reader := bufio.NewReader(r.Body) |
|
65 |
|
66 var line, eventName string |
|
67 for { |
|
68 select { |
|
69 case <-stopCh: |
|
70 close(events) |
|
71 return |
|
72 default: |
|
73 } |
|
74 |
|
75 lineBytes, partial, err := reader.ReadLine() |
|
76 if err != nil { |
|
77 if err == io.EOF { |
|
78 e := fmt.Errorf("connection closed: %s", err.Error()) |
|
79 log.Printf("Stream Reader: %s", e.Error()) |
|
80 events <- StreamEvent{Event: "error", Error: e} |
|
81 close(events) |
|
82 return |
|
83 } |
|
84 e := fmt.Errorf("ReadLine read error: %s", err.Error()) |
|
85 log.Printf("Stream Reader: %s", e.Error()) |
|
86 events <- StreamEvent{Event: "error", Error: e} |
|
87 time.Sleep(10 * time.Second) |
|
88 } |
|
89 |
|
90 if partial { |
|
91 e := fmt.Errorf("received incomplete line; not supported yet") |
|
92 log.Printf("Stream Reader: %s", e.Error()) |
|
93 events <- StreamEvent{Event: "error", Error: e} |
|
94 time.Sleep(5 * time.Second) |
|
95 continue // Skip this |
|
96 } |
|
97 |
|
98 line = string(bytes.TrimSpace(lineBytes)) |
|
99 |
|
100 if line == "" { |
|
101 continue // Skip empty line |
|
102 } |
|
103 if strings.HasPrefix(line, ":") { |
|
104 continue // Skip comment |
|
105 } |
|
106 |
|
107 if strings.HasPrefix(line, "event: ") { |
|
108 eventName = line[7:] |
|
109 continue |
|
110 } |
|
111 |
|
112 if !strings.HasPrefix(line, "data: ") { |
|
113 // XXX Needs improvement |
|
114 e := fmt.Errorf("received unhandled event line '%s'", strings.Split(line, ":")[0]) |
|
115 log.Printf("Stream Reader: %s", e.Error()) |
|
116 events <- StreamEvent{Event: "error", Error: e} |
|
117 continue |
|
118 } |
|
119 |
|
120 // This is a data line |
|
121 data := []byte(line[6:]) |
|
122 |
|
123 var obj interface{} |
|
124 |
|
125 // Decode API object |
|
126 switch eventName { |
|
127 case "update": |
|
128 var s Status |
|
129 if err := json.Unmarshal(data, &s); err != nil { |
|
130 e := fmt.Errorf("could not unmarshal data: %s", err.Error()) |
|
131 log.Printf("Stream Reader: %s", e.Error()) |
|
132 events <- StreamEvent{Event: "error", Error: e} |
|
133 continue |
|
134 } |
|
135 obj = s |
|
136 case "notification": |
|
137 var notif Notification |
|
138 if err := json.Unmarshal(data, ¬if); err != nil { |
|
139 e := fmt.Errorf("could not unmarshal data: %s", err.Error()) |
|
140 log.Printf("Stream Reader: %s", e.Error()) |
|
141 events <- StreamEvent{Event: "error", Error: e} |
|
142 continue |
|
143 } |
|
144 obj = notif |
|
145 case "delete": |
|
146 var statusID int |
|
147 if err := json.Unmarshal(data, &statusID); err != nil { |
|
148 e := fmt.Errorf("could not unmarshal data: %s", err.Error()) |
|
149 log.Printf("Stream Reader: %s", e.Error()) |
|
150 events <- StreamEvent{Event: "error", Error: e} |
|
151 continue |
|
152 } |
|
153 obj = statusID |
|
154 case "": |
|
155 fallthrough |
|
156 default: |
|
157 e := fmt.Errorf("unhandled event '%s'", eventName) |
|
158 log.Printf("Stream Reader: %s", e.Error()) |
|
159 events <- StreamEvent{Event: "error", Error: e} |
|
160 continue |
|
161 } |
|
162 |
|
163 // Send event to the channel |
|
164 events <- StreamEvent{Event: eventName, Data: obj} |
|
165 } |
|
166 } |
|
167 |
|
168 // StreamListener listens to a stream from the Mastodon server |
|
169 // The stream 'name' can be "user", "public" or "hashtag". |
|
170 // For 'hashtag', the hashTag argument cannot be empty. |
|
171 // The events are sent to the events channel (the errors as well). |
|
172 // The streaming is terminated if the stop channel is closed. |
|
173 // Please note that this method launches a goroutine to listen to the events. |
|
174 // The 'events' channel is closed if the connection is closed by the server. |
|
175 func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool) error { |
|
176 resp, err := g.openStream(name, hashTag) |
|
177 if err != nil { |
|
178 return err |
|
179 } |
|
180 go g.readStream(events, stopCh, resp) |
|
181 return nil |
|
182 } |