--- a/cmd/stream.go Sat May 06 19:38:01 2017 +0200
+++ b/cmd/stream.go Sat May 06 19:44:17 2017 +0200
@@ -8,6 +8,7 @@
import (
"io"
"os"
+ "strings"
"github.com/pkg/errors"
"github.com/spf13/cobra"
@@ -21,6 +22,9 @@
}
*/
+// Maximum number of websockets (1 hashtag <=> 1 ws)
+const maximumHashtagStreamWS = 4
+
// streamCmd represents the stream command
var streamCmd = &cobra.Command{
Use: "stream [user|local|public|:HASHTAG]",
@@ -34,7 +38,12 @@
madonctl stream local # Local timeline stream
madonctl stream public # Public timeline stream
madonctl stream :mastodon # Hashtag
- madonctl stream #madonctl`,
+ madonctl stream #madonctl
+
+Several (up to 4) hashtags can be given.
+Note: madonctl will use 1 websocket per hashtag stream.
+ madonctl stream #madonctl,#mastodon,#golang
+ madonctl stream :madonctl,mastodon,api`,
RunE: streamRunE,
ValidArgs: []string{"user", "public"},
ArgAliases: []string{"home"},
@@ -49,6 +58,7 @@
func streamRunE(cmd *cobra.Command, args []string) error {
streamName := "user"
tag := ""
+ var hashTagList []string
if len(args) > 0 {
if len(args) != 1 {
@@ -70,6 +80,18 @@
if len(tag) == 0 {
return errors.New("empty hashtag")
}
+ hashTagList = strings.Split(tag, ",")
+ for i, h := range hashTagList {
+ if h[0] == ':' || h[0] == '#' {
+ hashTagList[i] = h[1:]
+ }
+ if h == "" {
+ return errors.New("empty hashtag")
+ }
+ }
+ if len(hashTagList) > maximumHashtagStreamWS {
+ return errors.Errorf("too many hashtags, maximum is %d", maximumHashtagStreamWS)
+ }
}
}
@@ -80,9 +102,44 @@
evChan := make(chan madon.StreamEvent, 10)
stop := make(chan bool)
done := make(chan bool)
+ var err error
- // StreamListener(name string, hashTag string, events chan<- madon.StreamEvent, stopCh <-chan bool, doneCh chan<- bool) error
- err := gClient.StreamListener(streamName, tag, evChan, stop, done)
+ if streamName != "hashtag" || len(hashTagList) <= 1 { // Usual case: Only 1 stream
+ err = gClient.StreamListener(streamName, tag, evChan, stop, done)
+ } else { // Several streams
+ n := len(hashTagList)
+ tagEvCh := make([]chan madon.StreamEvent, n)
+ tagDoneCh := make([]chan bool, n)
+ for i, t := range hashTagList {
+ if verbose {
+ errPrint("Launching listener for tag '%s'", t)
+ }
+ tagEvCh[i] = make(chan madon.StreamEvent)
+ e := gClient.StreamListener(streamName, t, tagEvCh[i], stop, tagDoneCh[i])
+ if e != nil {
+ if i > 0 { // Close previous connections
+ close(stop)
+ }
+ err = e
+ break
+ }
+ // Forward events to main ev channel
+ go func(i int) {
+ for {
+ select {
+ case _, ok := <-tagDoneCh[i]:
+ if !ok { // end of streaming for this tag
+ done <- true
+ return
+ }
+ case ev := <-tagEvCh[i]:
+ evChan <- ev
+ }
+ }
+ }(i)
+ }
+ }
+
if err != nil {
errPrint("Error: %s", err.Error())
os.Exit(1)
@@ -101,7 +158,6 @@
select {
case _, ok := <-done:
if !ok { // done is closed, end of streaming
- done = nil
break LISTEN
}
case ev := <-evChan:
@@ -132,6 +188,7 @@
}
}
}
+ close(stop)
close(evChan)
return nil
}