cmd/stream.go
author Mikael Berthe <mikael@lilotux.net>
Thu, 22 Sep 2022 16:32:45 +0200
changeset 261 270cc4dda0c5
parent 239 605a00e9d1ab
child 265 05c40b36d3b2
permissions -rw-r--r--
Change version to 2.4.0-dev

// Copyright © 2017-2018 Mikael Berthe <mikael@lilotux.net>
//
// Licensed under the MIT license.
// Please see the LICENSE file is this directory.

package cmd

import (
	"io"
	"os"
	"strings"

	"github.com/pkg/errors"
	"github.com/spf13/cobra"

	"github.com/McKael/madon/v2"
)

var streamOpts struct {
	command           string
	notificationsOnly bool
	notificationTypes string
}

// Maximum number of websockets (1 hashtag <=> 1 ws)
const maximumHashtagStreamWS = 4

// streamCmd represents the stream command
var streamCmd = &cobra.Command{
	Use:   "stream [user|local|public|direct|!LIST|:HASHTAG]",
	Short: "Listen to an event stream",
	Long: `Listen to an event stream

The stream command stays connected to the server and listen to a stream of
events (user, local or federated).
A list-based stream can be displayed by prefixing the list ID with a '!'.
It can also get a hashtag-based stream if the keyword is prefixed with
':' or '#'.`,
	Example: `  madonctl stream           # User timeline stream
  madonctl stream local     # Local timeline stream
  madonctl stream public    # Public timeline stream
  madonctl stream direct    # Direct messages stream
  madonctl stream '!42'     # List (ID 42)
  madonctl stream :mastodon # Hashtag
  madonctl stream #madonctl
  madonctl stream --notifications-only
  madonctl stream --notifications-only --notification-types mentions,follows

Several (up to 4) hashtags can be given.
Note: madonctl will use 1 websocket per hashtag stream.
  madonctl stream #madonctl,#mastodon,#golang
  madonctl stream :madonctl,mastodon,api
`,
	RunE:       streamRunE,
	ValidArgs:  []string{"user", "public", "direct"},
	ArgAliases: []string{"home"},
}

func init() {
	RootCmd.AddCommand(streamCmd)

	streamCmd.Flags().StringVar(&streamOpts.command, "command", "", "Execute external command")
	streamCmd.Flags().BoolVar(&streamOpts.notificationsOnly, "notifications-only", false, "Display only notifications (user stream)")
	streamCmd.Flags().StringVar(&streamOpts.notificationTypes, "notification-types", "", "Filter notifications (mentions, favourites, reblogs, follows)")
}

func streamRunE(cmd *cobra.Command, args []string) error {
	streamName := "user"
	var param string
	var hashTagList []string

	if len(args) > 0 {
		if len(args) != 1 {
			return errors.New("too many parameters")
		}
		arg := args[0]
		switch arg {
		case "", "user":
		case "public":
		case "direct":
			streamName = arg
		case "local":
			streamName = "public:local"
		default:
			if arg[0] == '!' {
				// List-based stream
				streamName = "list"
				param = arg[1:]
				if len(param) == 0 {
					return errors.New("empty list ID")
				}
				break
			}
			if arg[0] != ':' && arg[0] != '#' {
				return errors.New("invalid argument")
			}
			streamName = "hashtag"
			param = arg[1:]
			if len(param) == 0 {
				return errors.New("empty hashtag")
			}
			hashTagList = strings.Split(param, ",")
			for i, h := range hashTagList {
				if h[0] == ':' || h[0] == '#' {
					hashTagList[i] = h[1:]
				}
				if h == "" {
					return errors.New("empty hashtag")
				}
			}
			if len(hashTagList) > maximumHashtagStreamWS {
				return errors.Errorf("too many hashtags, maximum is %d", maximumHashtagStreamWS)
			}
		}
	}

	if err := madonInit(true); err != nil {
		return err
	}

	var filterMap *map[string]bool
	if streamOpts.notificationTypes != "" {
		var err error
		filterMap, err = buildFilterMap(streamOpts.notificationTypes)
		if err != nil {
			return err
		}
	}

	evChan := make(chan madon.StreamEvent, 10)
	stop := make(chan bool)
	done := make(chan bool)
	var err error

	if streamName != "hashtag" || len(hashTagList) <= 1 { // Usual case: Only 1 stream
		err = gClient.StreamListener(streamName, param, evChan, stop, done)
	} else { // Several streams
		n := len(hashTagList)
		tagEvCh := make([]chan madon.StreamEvent, n)
		tagDoneCh := make([]chan bool, n)
		for i, t := range hashTagList {
			if verbose {
				errPrint("Launching listener for tag '%s'", t)
			}
			tagEvCh[i] = make(chan madon.StreamEvent)
			tagDoneCh[i] = make(chan bool)
			e := gClient.StreamListener(streamName, t, tagEvCh[i], stop, tagDoneCh[i])
			if e != nil {
				if i > 0 { // Close previous connections
					close(stop)
				}
				err = e
				break
			}
			// Forward events to main ev channel
			go func(i int) {
				for {
					select {
					case _, ok := <-tagDoneCh[i]:
						if !ok { // end of streaming for this tag
							done <- true
							return
						}
					case ev := <-tagEvCh[i]:
						evChan <- ev
					}
				}
			}(i)
		}
	}

	if err != nil {
		errPrint("Error: %s", err.Error())
		os.Exit(1)
	}

	p, err := getPrinter()
	if err != nil {
		close(stop)
		<-done
		close(evChan)
		errPrint("Error: %s", err.Error())
		os.Exit(1)
	}

	// Set up external command
	p.setCommand(streamOpts.command)

LISTEN:
	for {
		select {
		case v, ok := <-done:
			if !ok || v == true { // done is closed, end of streaming
				break LISTEN
			}
		case ev := <-evChan:
			switch ev.Event {
			case "error":
				if ev.Error != nil {
					if ev.Error == io.ErrUnexpectedEOF {
						errPrint("The stream connection was unexpectedly closed")
						continue
					}
					errPrint("Error event: [%s] %s", ev.Event, ev.Error)
					continue
				}
				errPrint("Event: [%s]", ev.Event)
			case "update":
				if streamOpts.notificationsOnly {
					continue
				}
				s := ev.Data.(madon.Status)
				if err = p.printObj(&s); err != nil {
					break LISTEN
				}
				continue
			case "notification":
				n := ev.Data.(madon.Notification)
				if filterMap != nil && !(*filterMap)[n.Type] {
					continue
				}
				if p.printObj(&n); err != nil {
					break LISTEN
				}
				continue
			case "delete":
				if streamOpts.notificationsOnly {
					continue
				}
				// TODO PrintObj ?
				errPrint("Event: [%s] Status %s was deleted", ev.Event, ev.Data.(string))
			default:
				errPrint("Unhandled event: [%s] %T", ev.Event, ev.Data)
			}
		}
	}
	close(stop)
	close(evChan)
	if err != nil {
		errPrint("Error: %s", err.Error())
		os.Exit(1)
	}
	return nil
}