|
1 /* |
|
2 Copyright 2017-2018 Mikael Berthe |
|
3 |
|
4 Licensed under the MIT license. Please see the LICENSE file is this directory. |
|
5 */ |
|
6 |
|
7 package madon |
|
8 |
|
9 import ( |
|
10 "encoding/json" |
|
11 "net/url" |
|
12 "strings" |
|
13 |
|
14 "github.com/gorilla/websocket" |
|
15 "github.com/pkg/errors" |
|
16 ) |
|
17 |
|
18 // StreamEvent contains a single event from the streaming API |
|
19 type StreamEvent struct { |
|
20 Event string // Name of the event (error, update, notification or delete) |
|
21 Data interface{} // Status, Notification or status ID |
|
22 Error error // Error message from the StreamListener |
|
23 } |
|
24 |
|
25 // openStream opens a stream URL and returns an http.Response |
|
26 // Note that the caller should close the connection when it's done reading |
|
27 // the stream. |
|
28 // The stream name can be "user", "local", "public", "direct", "list" or |
|
29 // "hashtag". |
|
30 // When it is "hashtag", the param argument contains the hashtag. |
|
31 // When it is "list", the param argument contains the list ID. |
|
32 func (mc *Client) openStream(streamName, param string) (*websocket.Conn, error) { |
|
33 var tag, list string |
|
34 |
|
35 switch streamName { |
|
36 case "user", "public", "public:local", "direct": |
|
37 case "hashtag": |
|
38 if param == "" { |
|
39 return nil, ErrInvalidParameter |
|
40 } |
|
41 tag = param |
|
42 case "list": |
|
43 if param == "" { |
|
44 return nil, ErrInvalidParameter |
|
45 } |
|
46 list = param |
|
47 default: |
|
48 return nil, ErrInvalidParameter |
|
49 } |
|
50 |
|
51 if !strings.HasPrefix(mc.APIBase, "http") { |
|
52 return nil, errors.New("cannot create Websocket URL: unexpected API base URL") |
|
53 } |
|
54 |
|
55 // Build streaming websocket URL |
|
56 u, err := url.Parse("ws" + mc.APIBase[4:] + "/v1/streaming/") |
|
57 if err != nil { |
|
58 return nil, errors.Wrap(err, "cannot create Websocket URL") |
|
59 } |
|
60 |
|
61 urlParams := url.Values{} |
|
62 urlParams.Add("stream", streamName) |
|
63 urlParams.Add("access_token", mc.UserToken.AccessToken) |
|
64 if tag != "" { |
|
65 urlParams.Add("tag", tag) |
|
66 } else if list != "" { |
|
67 urlParams.Add("list", list) |
|
68 } |
|
69 u.RawQuery = urlParams.Encode() |
|
70 |
|
71 c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) |
|
72 return c, err |
|
73 } |
|
74 |
|
75 // readStream reads from the http.Response and sends events to the events channel |
|
76 // It stops when the connection is closed or when the stopCh channel is closed. |
|
77 // The foroutine will close the doneCh channel when it terminates. |
|
78 func (mc *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan bool, c *websocket.Conn) { |
|
79 defer c.Close() |
|
80 defer close(doneCh) |
|
81 |
|
82 go func() { |
|
83 select { |
|
84 case <-stopCh: |
|
85 // Close connection |
|
86 c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) |
|
87 case <-doneCh: |
|
88 // Leave |
|
89 } |
|
90 }() |
|
91 |
|
92 for { |
|
93 var msg struct { |
|
94 Event string |
|
95 Payload interface{} |
|
96 } |
|
97 |
|
98 err := c.ReadJSON(&msg) |
|
99 if err != nil { |
|
100 if strings.Contains(err.Error(), "close 1000 (normal)") { |
|
101 break // Connection properly closed |
|
102 } |
|
103 e := errors.Wrap(err, "read error") |
|
104 events <- StreamEvent{Event: "error", Error: e} |
|
105 break |
|
106 } |
|
107 |
|
108 var obj interface{} |
|
109 |
|
110 // Decode API object |
|
111 switch msg.Event { |
|
112 case "update": |
|
113 strPayload, ok := msg.Payload.(string) |
|
114 if !ok { |
|
115 e := errors.New("could not decode status: payload isn't a string") |
|
116 events <- StreamEvent{Event: "error", Error: e} |
|
117 continue |
|
118 } |
|
119 var s Status |
|
120 if err := json.Unmarshal([]byte(strPayload), &s); err != nil { |
|
121 e := errors.Wrap(err, "could not decode status") |
|
122 events <- StreamEvent{Event: "error", Error: e} |
|
123 continue |
|
124 } |
|
125 obj = s |
|
126 case "notification": |
|
127 strPayload, ok := msg.Payload.(string) |
|
128 if !ok { |
|
129 e := errors.New("could not decode notification: payload isn't a string") |
|
130 events <- StreamEvent{Event: "error", Error: e} |
|
131 continue |
|
132 } |
|
133 var notif Notification |
|
134 if err := json.Unmarshal([]byte(strPayload), ¬if); err != nil { |
|
135 e := errors.Wrap(err, "could not decode notification") |
|
136 events <- StreamEvent{Event: "error", Error: e} |
|
137 continue |
|
138 } |
|
139 obj = notif |
|
140 case "delete": |
|
141 strPayload, ok := msg.Payload.(string) |
|
142 if !ok { |
|
143 e := errors.New("could not decode deletion: payload isn't a string") |
|
144 events <- StreamEvent{Event: "error", Error: e} |
|
145 continue |
|
146 } |
|
147 obj = strPayload // statusID |
|
148 default: |
|
149 e := errors.Errorf("unhandled event '%s'", msg.Event) |
|
150 events <- StreamEvent{Event: "error", Error: e} |
|
151 continue |
|
152 } |
|
153 |
|
154 // Send event to the channel |
|
155 events <- StreamEvent{Event: msg.Event, Data: obj} |
|
156 } |
|
157 } |
|
158 |
|
159 // StreamListener listens to a stream from the Mastodon server |
|
160 // The stream 'name' can be "user", "local", "public" or "hashtag". |
|
161 // For 'hashtag', the hashTag argument cannot be empty. |
|
162 // The events are sent to the events channel (the errors as well). |
|
163 // The streaming is terminated if the 'stopCh' channel is closed. |
|
164 // The 'doneCh' channel is closed if the connection is closed by the server. |
|
165 // Please note that this method launches a goroutine to listen to the events. |
|
166 func (mc *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan bool) error { |
|
167 if mc == nil { |
|
168 return ErrUninitializedClient |
|
169 } |
|
170 |
|
171 conn, err := mc.openStream(name, hashTag) |
|
172 if err != nil { |
|
173 return err |
|
174 } |
|
175 go mc.readStream(events, stopCh, doneCh, conn) |
|
176 return nil |
|
177 } |