streams.go
changeset 122 50c7733ee886
parent 120 579912e9d0ef
child 128 a5a00fad7a32
equal deleted inserted replaced
121:d192d9d0adfd 122:50c7733ee886
    57 	}
    57 	}
    58 	return resp, nil
    58 	return resp, nil
    59 }
    59 }
    60 
    60 
    61 // readStream reads from the http.Response and sends events to the events channel
    61 // readStream reads from the http.Response and sends events to the events channel
    62 // It stops when the connection is closed or when teh stopCh channel is closed.
    62 // It stops when the connection is closed or when the stopCh channel is closed.
    63 func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, r *http.Response) {
    63 // The foroutine will close the doneCh channel when it terminates.
       
    64 func (g *Client) readStream(events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool, r *http.Response) {
    64 	defer r.Body.Close()
    65 	defer r.Body.Close()
    65 
    66 
    66 	reader := bufio.NewReader(r.Body)
    67 	reader := bufio.NewReader(r.Body)
    67 
    68 
    68 	var line, eventName string
    69 	var line, eventName string
    69 	for {
    70 	for {
    70 		select {
    71 		select {
    71 		case <-stopCh:
    72 		case <-stopCh:
    72 			close(events)
    73 			close(doneCh)
    73 			return
    74 			return
    74 		default:
    75 		default:
    75 		}
    76 		}
    76 
    77 
    77 		lineBytes, partial, err := reader.ReadLine()
    78 		lineBytes, partial, err := reader.ReadLine()
    78 		if err != nil {
    79 		if err != nil {
       
    80 			var e error
    79 			if err == io.EOF {
    81 			if err == io.EOF {
    80 				e := fmt.Errorf("connection closed: %s", err.Error())
    82 				e = fmt.Errorf("connection closed: %s", err.Error())
    81 				log.Printf("Stream Reader: %s", e.Error())
    83 			} else {
    82 				events <- StreamEvent{Event: "error", Error: e}
    84 				e = fmt.Errorf("read error: %s", err.Error())
    83 				close(events)
       
    84 				return
       
    85 			}
    85 			}
    86 			e := fmt.Errorf("ReadLine read error: %s", err.Error())
       
    87 			log.Printf("Stream Reader: %s", e.Error())
    86 			log.Printf("Stream Reader: %s", e.Error())
    88 			events <- StreamEvent{Event: "error", Error: e}
    87 			events <- StreamEvent{Event: "error", Error: e}
    89 			time.Sleep(10 * time.Second)
    88 			close(doneCh)
       
    89 			return
    90 		}
    90 		}
    91 
    91 
    92 		if partial {
    92 		if partial {
    93 			e := fmt.Errorf("received incomplete line; not supported yet")
    93 			e := fmt.Errorf("received incomplete line; not supported yet")
    94 			log.Printf("Stream Reader: %s", e.Error())
    94 			log.Printf("Stream Reader: %s", e.Error())
   169 
   169 
   170 // StreamListener listens to a stream from the Mastodon server
   170 // StreamListener listens to a stream from the Mastodon server
   171 // The stream 'name' can be "user", "public" or "hashtag".
   171 // The stream 'name' can be "user", "public" or "hashtag".
   172 // For 'hashtag', the hashTag argument cannot be empty.
   172 // For 'hashtag', the hashTag argument cannot be empty.
   173 // The events are sent to the events channel (the errors as well).
   173 // The events are sent to the events channel (the errors as well).
   174 // The streaming is terminated if the stop channel is closed.
   174 // The streaming is terminated if the 'stopCh' channel is closed.
       
   175 // The 'doneCh' channel is closed if the connection is closed by the server.
   175 // Please note that this method launches a goroutine to listen to the events.
   176 // Please note that this method launches a goroutine to listen to the events.
   176 // The 'events' channel is closed if the connection is closed by the server.
   177 func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool, doneCh chan<- bool) error {
   177 func (g *Client) StreamListener(name, hashTag string, events chan<- StreamEvent, stopCh <-chan bool) error {
       
   178 	resp, err := g.openStream(name, hashTag)
   178 	resp, err := g.openStream(name, hashTag)
   179 	if err != nil {
   179 	if err != nil {
   180 		return err
   180 		return err
   181 	}
   181 	}
   182 	go g.readStream(events, stopCh, resp)
   182 	go g.readStream(events, stopCh, doneCh, resp)
   183 	return nil
   183 	return nil
   184 }
   184 }