Add support for several hashtag streams
authorMikael Berthe <mikael@lilotux.net>
Sat, 06 May 2017 19:44:17 +0200
changeset 74 78e1c63e4bbe
parent 73 82b0189f3a65
child 75 447ac88b67c6
Add support for several hashtag streams
cmd/stream.go
--- a/cmd/stream.go	Sat May 06 19:38:01 2017 +0200
+++ b/cmd/stream.go	Sat May 06 19:44:17 2017 +0200
@@ -8,6 +8,7 @@
 import (
 	"io"
 	"os"
+	"strings"
 
 	"github.com/pkg/errors"
 	"github.com/spf13/cobra"
@@ -21,6 +22,9 @@
 }
 */
 
+// Maximum number of websockets (1 hashtag <=> 1 ws)
+const maximumHashtagStreamWS = 4
+
 // streamCmd represents the stream command
 var streamCmd = &cobra.Command{
 	Use:   "stream [user|local|public|:HASHTAG]",
@@ -34,7 +38,12 @@
   madonctl stream local     # Local timeline stream
   madonctl stream public    # Public timeline stream
   madonctl stream :mastodon # Hashtag
-  madonctl stream #madonctl`,
+  madonctl stream #madonctl
+
+Several (up to 4) hashtags can be given.
+Note: madonctl will use 1 websocket per hashtag stream.
+  madonctl stream #madonctl,#mastodon,#golang
+  madonctl stream :madonctl,mastodon,api`,
 	RunE:       streamRunE,
 	ValidArgs:  []string{"user", "public"},
 	ArgAliases: []string{"home"},
@@ -49,6 +58,7 @@
 func streamRunE(cmd *cobra.Command, args []string) error {
 	streamName := "user"
 	tag := ""
+	var hashTagList []string
 
 	if len(args) > 0 {
 		if len(args) != 1 {
@@ -70,6 +80,18 @@
 			if len(tag) == 0 {
 				return errors.New("empty hashtag")
 			}
+			hashTagList = strings.Split(tag, ",")
+			for i, h := range hashTagList {
+				if h[0] == ':' || h[0] == '#' {
+					hashTagList[i] = h[1:]
+				}
+				if h == "" {
+					return errors.New("empty hashtag")
+				}
+			}
+			if len(hashTagList) > maximumHashtagStreamWS {
+				return errors.Errorf("too many hashtags, maximum is %d", maximumHashtagStreamWS)
+			}
 		}
 	}
 
@@ -80,9 +102,44 @@
 	evChan := make(chan madon.StreamEvent, 10)
 	stop := make(chan bool)
 	done := make(chan bool)
+	var err error
 
-	// StreamListener(name string, hashTag string, events chan<- madon.StreamEvent, stopCh <-chan bool, doneCh chan<- bool) error
-	err := gClient.StreamListener(streamName, tag, evChan, stop, done)
+	if streamName != "hashtag" || len(hashTagList) <= 1 { // Usual case: Only 1 stream
+		err = gClient.StreamListener(streamName, tag, evChan, stop, done)
+	} else { // Several streams
+		n := len(hashTagList)
+		tagEvCh := make([]chan madon.StreamEvent, n)
+		tagDoneCh := make([]chan bool, n)
+		for i, t := range hashTagList {
+			if verbose {
+				errPrint("Launching listener for tag '%s'", t)
+			}
+			tagEvCh[i] = make(chan madon.StreamEvent)
+			e := gClient.StreamListener(streamName, t, tagEvCh[i], stop, tagDoneCh[i])
+			if e != nil {
+				if i > 0 { // Close previous connections
+					close(stop)
+				}
+				err = e
+				break
+			}
+			// Forward events to main ev channel
+			go func(i int) {
+				for {
+					select {
+					case _, ok := <-tagDoneCh[i]:
+						if !ok { // end of streaming for this tag
+							done <- true
+							return
+						}
+					case ev := <-tagEvCh[i]:
+						evChan <- ev
+					}
+				}
+			}(i)
+		}
+	}
+
 	if err != nil {
 		errPrint("Error: %s", err.Error())
 		os.Exit(1)
@@ -101,7 +158,6 @@
 		select {
 		case _, ok := <-done:
 			if !ok { // done is closed, end of streaming
-				done = nil
 				break LISTEN
 			}
 		case ev := <-evChan:
@@ -132,6 +188,7 @@
 			}
 		}
 	}
+	close(stop)
 	close(evChan)
 	return nil
 }