vendor/github.com/McKael/madon/v3/streams.go
changeset 265 05c40b36d3b2
parent 242 2a9ec03fe5a1
child 268 4dd196a4ee7c
equal deleted inserted replaced
264:8f478162d991 265:05c40b36d3b2
       
     1 /*
       
     2 Copyright 2017-2018 Mikael Berthe
       
     3 
       
     4 Licensed under the MIT license.  Please see the LICENSE file is this directory.
       
     5 */
       
     6 
       
     7 package madon
       
     8 
       
     9 import (
       
    10 	"encoding/json"
       
    11 	"net/url"
       
    12 	"strings"
       
    13 
       
    14 	"github.com/gorilla/websocket"
       
    15 	"github.com/pkg/errors"
       
    16 )
       
    17 
       
    18 // StreamEvent contains a single event from the streaming API
       
    19 type StreamEvent struct {
       
    20 	Event string      // Name of the event (error, update, notification or delete)
       
    21 	Data  interface{} // Status, Notification or status ID
       
    22 	Error error       // Error message from the StreamListener
       
    23 }
       
    24 
       
    25 // openStream opens a stream URL and returns an http.Response
       
    26 // Note that the caller should close the connection when it's done reading
       
    27 // the stream.
       
    28 // The stream name can be "user", "local", "public", "direct", "list" or
       
    29 // "hashtag".
       
    30 // When it is "hashtag", the param argument contains the hashtag.
       
    31 // When it is "list", the param argument contains the list ID.
       
    32 func (mc *Client) openStream(streamName, param string) (*websocket.Conn, error) {
       
    33 	var tag, list string
       
    34 
       
    35 	switch streamName {
       
    36 	case "user", "public", "public:local", "direct":
       
    37 	case "hashtag":
       
    38 		if param == "" {
       
    39 			return nil, ErrInvalidParameter
       
    40 		}
       
    41 		tag = param
       
    42 	case "list":
       
    43 		if param == "" {
       
    44 			return nil, ErrInvalidParameter
       
    45 		}
       
    46 		list = param
       
    47 	default:
       
    48 		return nil, ErrInvalidParameter
       
    49 	}
       
    50 
       
    51 	if !strings.HasPrefix(mc.APIBase, "http") {
       
    52 		return nil, errors.New("cannot create Websocket URL: unexpected API base URL")
       
    53 	}
       
    54 
       
    55 	// Build streaming websocket URL
       
    56 	u, err := url.Parse("ws" + mc.APIBase[4:] + "/v1/streaming/")
       
    57 	if err != nil {
       
    58 		return nil, errors.Wrap(err, "cannot create Websocket URL")
       
    59 	}
       
    60 
       
    61 	urlParams := url.Values{}
       
    62 	urlParams.Add("stream", streamName)
       
    63 	urlParams.Add("access_token", mc.UserToken.AccessToken)
       
    64 	if tag != "" {
       
    65 		urlParams.Add("tag", tag)
       
    66 	} else if list != "" {
       
    67 		urlParams.Add("list", list)
       
    68 	}
       
    69 	u.RawQuery = urlParams.Encode()
       
    70 
       
    71 	c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
       
    72 	return c, err
       
    73 }
       
    74 
       
    75 // readStream reads from the http.Response and sends events to the events channel
       
    76 // It stops when the connection is closed or when the stopCh channel is closed.
       
    77 // The foroutine will close the doneCh channel when it terminates.
       
    78 func (mc *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan bool, c *websocket.Conn) {
       
    79 	defer c.Close()
       
    80 	defer close(doneCh)
       
    81 
       
    82 	go func() {
       
    83 		select {
       
    84 		case <-stopCh:
       
    85 			// Close connection
       
    86 			c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
       
    87 		case <-doneCh:
       
    88 			// Leave
       
    89 		}
       
    90 	}()
       
    91 
       
    92 	for {
       
    93 		var msg struct {
       
    94 			Event   string
       
    95 			Payload interface{}
       
    96 		}
       
    97 
       
    98 		err := c.ReadJSON(&msg)
       
    99 		if err != nil {
       
   100 			if strings.Contains(err.Error(), "close 1000 (normal)") {
       
   101 				break // Connection properly closed
       
   102 			}
       
   103 			e := errors.Wrap(err, "read error")
       
   104 			events <- StreamEvent{Event: "error", Error: e}
       
   105 			break
       
   106 		}
       
   107 
       
   108 		var obj interface{}
       
   109 
       
   110 		// Decode API object
       
   111 		switch msg.Event {
       
   112 		case "update":
       
   113 			strPayload, ok := msg.Payload.(string)
       
   114 			if !ok {
       
   115 				e := errors.New("could not decode status: payload isn't a string")
       
   116 				events <- StreamEvent{Event: "error", Error: e}
       
   117 				continue
       
   118 			}
       
   119 			var s Status
       
   120 			if err := json.Unmarshal([]byte(strPayload), &s); err != nil {
       
   121 				e := errors.Wrap(err, "could not decode status")
       
   122 				events <- StreamEvent{Event: "error", Error: e}
       
   123 				continue
       
   124 			}
       
   125 			obj = s
       
   126 		case "notification":
       
   127 			strPayload, ok := msg.Payload.(string)
       
   128 			if !ok {
       
   129 				e := errors.New("could not decode notification: payload isn't a string")
       
   130 				events <- StreamEvent{Event: "error", Error: e}
       
   131 				continue
       
   132 			}
       
   133 			var notif Notification
       
   134 			if err := json.Unmarshal([]byte(strPayload), &notif); err != nil {
       
   135 				e := errors.Wrap(err, "could not decode notification")
       
   136 				events <- StreamEvent{Event: "error", Error: e}
       
   137 				continue
       
   138 			}
       
   139 			obj = notif
       
   140 		case "delete":
       
   141 			strPayload, ok := msg.Payload.(string)
       
   142 			if !ok {
       
   143 				e := errors.New("could not decode deletion: payload isn't a string")
       
   144 				events <- StreamEvent{Event: "error", Error: e}
       
   145 				continue
       
   146 			}
       
   147 			obj = strPayload // statusID
       
   148 		default:
       
   149 			e := errors.Errorf("unhandled event '%s'", msg.Event)
       
   150 			events <- StreamEvent{Event: "error", Error: e}
       
   151 			continue
       
   152 		}
       
   153 
       
   154 		// Send event to the channel
       
   155 		events <- StreamEvent{Event: msg.Event, Data: obj}
       
   156 	}
       
   157 }
       
   158 
       
   159 // StreamListener listens to a stream from the Mastodon server
       
   160 // The stream 'name' can be "user", "local", "public" or "hashtag".
       
   161 // For 'hashtag', the hashTag argument cannot be empty.
       
   162 // The events are sent to the events channel (the errors as well).
       
   163 // The streaming is terminated if the 'stopCh' channel is closed.
       
   164 // The 'doneCh' channel is closed if the connection is closed by the server.
       
   165 // Please note that this method launches a goroutine to listen to the events.
       
   166 func (mc *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan bool) error {
       
   167 	if mc == nil {
       
   168 		return ErrUninitializedClient
       
   169 	}
       
   170 
       
   171 	conn, err := mc.openStream(name, hashTag)
       
   172 	if err != nil {
       
   173 		return err
       
   174 	}
       
   175 	go mc.readStream(events, stopCh, doneCh, conn)
       
   176 	return nil
       
   177 }