diff -r 8f478162d991 -r 05c40b36d3b2 vendor/github.com/McKael/madon/v2/streams.go --- a/vendor/github.com/McKael/madon/v2/streams.go Thu Sep 22 16:37:07 2022 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,177 +0,0 @@ -/* -Copyright 2017-2018 Mikael Berthe - -Licensed under the MIT license. Please see the LICENSE file is this directory. -*/ - -package madon - -import ( - "encoding/json" - "net/url" - "strings" - - "github.com/gorilla/websocket" - "github.com/pkg/errors" -) - -// StreamEvent contains a single event from the streaming API -type StreamEvent struct { - Event string // Name of the event (error, update, notification or delete) - Data interface{} // Status, Notification or status ID - Error error // Error message from the StreamListener -} - -// openStream opens a stream URL and returns an http.Response -// Note that the caller should close the connection when it's done reading -// the stream. -// The stream name can be "user", "local", "public", "direct", "list" or -// "hashtag". -// When it is "hashtag", the param argument contains the hashtag. -// When it is "list", the param argument contains the list ID. -func (mc *Client) openStream(streamName, param string) (*websocket.Conn, error) { - var tag, list string - - switch streamName { - case "user", "public", "public:local", "direct": - case "hashtag": - if param == "" { - return nil, ErrInvalidParameter - } - tag = param - case "list": - if param == "" { - return nil, ErrInvalidParameter - } - list = param - default: - return nil, ErrInvalidParameter - } - - if !strings.HasPrefix(mc.APIBase, "http") { - return nil, errors.New("cannot create Websocket URL: unexpected API base URL") - } - - // Build streaming websocket URL - u, err := url.Parse("ws" + mc.APIBase[4:] + "/v1/streaming/") - if err != nil { - return nil, errors.Wrap(err, "cannot create Websocket URL") - } - - urlParams := url.Values{} - urlParams.Add("stream", streamName) - urlParams.Add("access_token", mc.UserToken.AccessToken) - if tag != "" { - urlParams.Add("tag", tag) - } else if list != "" { - urlParams.Add("list", list) - } - u.RawQuery = urlParams.Encode() - - c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) - return c, err -} - -// readStream reads from the http.Response and sends events to the events channel -// It stops when the connection is closed or when the stopCh channel is closed. -// The foroutine will close the doneCh channel when it terminates. -func (mc *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan bool, c *websocket.Conn) { - defer c.Close() - defer close(doneCh) - - go func() { - select { - case <-stopCh: - // Close connection - c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) - case <-doneCh: - // Leave - } - }() - - for { - var msg struct { - Event string - Payload interface{} - } - - err := c.ReadJSON(&msg) - if err != nil { - if strings.Contains(err.Error(), "close 1000 (normal)") { - break // Connection properly closed - } - e := errors.Wrap(err, "read error") - events <- StreamEvent{Event: "error", Error: e} - break - } - - var obj interface{} - - // Decode API object - switch msg.Event { - case "update": - strPayload, ok := msg.Payload.(string) - if !ok { - e := errors.New("could not decode status: payload isn't a string") - events <- StreamEvent{Event: "error", Error: e} - continue - } - var s Status - if err := json.Unmarshal([]byte(strPayload), &s); err != nil { - e := errors.Wrap(err, "could not decode status") - events <- StreamEvent{Event: "error", Error: e} - continue - } - obj = s - case "notification": - strPayload, ok := msg.Payload.(string) - if !ok { - e := errors.New("could not decode notification: payload isn't a string") - events <- StreamEvent{Event: "error", Error: e} - continue - } - var notif Notification - if err := json.Unmarshal([]byte(strPayload), ¬if); err != nil { - e := errors.Wrap(err, "could not decode notification") - events <- StreamEvent{Event: "error", Error: e} - continue - } - obj = notif - case "delete": - strPayload, ok := msg.Payload.(string) - if !ok { - e := errors.New("could not decode deletion: payload isn't a string") - events <- StreamEvent{Event: "error", Error: e} - continue - } - obj = strPayload // statusID - default: - e := errors.Errorf("unhandled event '%s'", msg.Event) - events <- StreamEvent{Event: "error", Error: e} - continue - } - - // Send event to the channel - events <- StreamEvent{Event: msg.Event, Data: obj} - } -} - -// StreamListener listens to a stream from the Mastodon server -// The stream 'name' can be "user", "local", "public" or "hashtag". -// For 'hashtag', the hashTag argument cannot be empty. -// The events are sent to the events channel (the errors as well). -// The streaming is terminated if the 'stopCh' channel is closed. -// The 'doneCh' channel is closed if the connection is closed by the server. -// Please note that this method launches a goroutine to listen to the events. -func (mc *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan bool) error { - if mc == nil { - return ErrUninitializedClient - } - - conn, err := mc.openStream(name, hashTag) - if err != nil { - return err - } - go mc.readStream(events, stopCh, doneCh, conn) - return nil -}