cmd/stream.go
author rjp <zimpenfish@gmail.com>
Mon, 23 Jan 2023 16:39:02 +0000
changeset 267 5b91a65ba95a
parent 239 605a00e9d1ab
child 265 05c40b36d3b2
permissions -rw-r--r--
Update to handle non-int64 IDs Pleroma/Akkoma and GotoSocial use opaque IDs rather than `int64`s like Mastodon which means that `madon` can't talk to either of those. This commit updates everything that can be an ID to `madon.ActivityID` which is an alias for `string` - can't create a specific type for it since there's more than a few places where they're concatenated directly to strings for URLs, etc. Which means it could just as easily be a direct `string` type itself but I find that having distinct types can often make the code more readable and understandable. One extra bit is that `statusOpts` has grown a `_hasReplyTo` boolean to indicate whether the `--in-reply-to` flag was given or not because we can't distinguish because "empty because default" or "empty because given and empty". Another way around this would be to set the default to some theoretically impossible or unlikely string but you never know when someone might spin up an instance where, e.g., admin posts have negative integer IDs.

// Copyright © 2017-2018 Mikael Berthe <mikael@lilotux.net>
//
// Licensed under the MIT license.
// Please see the LICENSE file is this directory.

package cmd

import (
	"io"
	"os"
	"strings"

	"github.com/pkg/errors"
	"github.com/spf13/cobra"

	"github.com/McKael/madon/v2"
)

var streamOpts struct {
	command           string
	notificationsOnly bool
	notificationTypes string
}

// Maximum number of websockets (1 hashtag <=> 1 ws)
const maximumHashtagStreamWS = 4

// streamCmd represents the stream command
var streamCmd = &cobra.Command{
	Use:   "stream [user|local|public|direct|!LIST|:HASHTAG]",
	Short: "Listen to an event stream",
	Long: `Listen to an event stream

The stream command stays connected to the server and listen to a stream of
events (user, local or federated).
A list-based stream can be displayed by prefixing the list ID with a '!'.
It can also get a hashtag-based stream if the keyword is prefixed with
':' or '#'.`,
	Example: `  madonctl stream           # User timeline stream
  madonctl stream local     # Local timeline stream
  madonctl stream public    # Public timeline stream
  madonctl stream direct    # Direct messages stream
  madonctl stream '!42'     # List (ID 42)
  madonctl stream :mastodon # Hashtag
  madonctl stream #madonctl
  madonctl stream --notifications-only
  madonctl stream --notifications-only --notification-types mentions,follows

Several (up to 4) hashtags can be given.
Note: madonctl will use 1 websocket per hashtag stream.
  madonctl stream #madonctl,#mastodon,#golang
  madonctl stream :madonctl,mastodon,api
`,
	RunE:       streamRunE,
	ValidArgs:  []string{"user", "public", "direct"},
	ArgAliases: []string{"home"},
}

func init() {
	RootCmd.AddCommand(streamCmd)

	streamCmd.Flags().StringVar(&streamOpts.command, "command", "", "Execute external command")
	streamCmd.Flags().BoolVar(&streamOpts.notificationsOnly, "notifications-only", false, "Display only notifications (user stream)")
	streamCmd.Flags().StringVar(&streamOpts.notificationTypes, "notification-types", "", "Filter notifications (mentions, favourites, reblogs, follows)")
}

func streamRunE(cmd *cobra.Command, args []string) error {
	streamName := "user"
	var param string
	var hashTagList []string

	if len(args) > 0 {
		if len(args) != 1 {
			return errors.New("too many parameters")
		}
		arg := args[0]
		switch arg {
		case "", "user":
		case "public":
		case "direct":
			streamName = arg
		case "local":
			streamName = "public:local"
		default:
			if arg[0] == '!' {
				// List-based stream
				streamName = "list"
				param = arg[1:]
				if len(param) == 0 {
					return errors.New("empty list ID")
				}
				break
			}
			if arg[0] != ':' && arg[0] != '#' {
				return errors.New("invalid argument")
			}
			streamName = "hashtag"
			param = arg[1:]
			if len(param) == 0 {
				return errors.New("empty hashtag")
			}
			hashTagList = strings.Split(param, ",")
			for i, h := range hashTagList {
				if h[0] == ':' || h[0] == '#' {
					hashTagList[i] = h[1:]
				}
				if h == "" {
					return errors.New("empty hashtag")
				}
			}
			if len(hashTagList) > maximumHashtagStreamWS {
				return errors.Errorf("too many hashtags, maximum is %d", maximumHashtagStreamWS)
			}
		}
	}

	if err := madonInit(true); err != nil {
		return err
	}

	var filterMap *map[string]bool
	if streamOpts.notificationTypes != "" {
		var err error
		filterMap, err = buildFilterMap(streamOpts.notificationTypes)
		if err != nil {
			return err
		}
	}

	evChan := make(chan madon.StreamEvent, 10)
	stop := make(chan bool)
	done := make(chan bool)
	var err error

	if streamName != "hashtag" || len(hashTagList) <= 1 { // Usual case: Only 1 stream
		err = gClient.StreamListener(streamName, param, evChan, stop, done)
	} else { // Several streams
		n := len(hashTagList)
		tagEvCh := make([]chan madon.StreamEvent, n)
		tagDoneCh := make([]chan bool, n)
		for i, t := range hashTagList {
			if verbose {
				errPrint("Launching listener for tag '%s'", t)
			}
			tagEvCh[i] = make(chan madon.StreamEvent)
			tagDoneCh[i] = make(chan bool)
			e := gClient.StreamListener(streamName, t, tagEvCh[i], stop, tagDoneCh[i])
			if e != nil {
				if i > 0 { // Close previous connections
					close(stop)
				}
				err = e
				break
			}
			// Forward events to main ev channel
			go func(i int) {
				for {
					select {
					case _, ok := <-tagDoneCh[i]:
						if !ok { // end of streaming for this tag
							done <- true
							return
						}
					case ev := <-tagEvCh[i]:
						evChan <- ev
					}
				}
			}(i)
		}
	}

	if err != nil {
		errPrint("Error: %s", err.Error())
		os.Exit(1)
	}

	p, err := getPrinter()
	if err != nil {
		close(stop)
		<-done
		close(evChan)
		errPrint("Error: %s", err.Error())
		os.Exit(1)
	}

	// Set up external command
	p.setCommand(streamOpts.command)

LISTEN:
	for {
		select {
		case v, ok := <-done:
			if !ok || v == true { // done is closed, end of streaming
				break LISTEN
			}
		case ev := <-evChan:
			switch ev.Event {
			case "error":
				if ev.Error != nil {
					if ev.Error == io.ErrUnexpectedEOF {
						errPrint("The stream connection was unexpectedly closed")
						continue
					}
					errPrint("Error event: [%s] %s", ev.Event, ev.Error)
					continue
				}
				errPrint("Event: [%s]", ev.Event)
			case "update":
				if streamOpts.notificationsOnly {
					continue
				}
				s := ev.Data.(madon.Status)
				if err = p.printObj(&s); err != nil {
					break LISTEN
				}
				continue
			case "notification":
				n := ev.Data.(madon.Notification)
				if filterMap != nil && !(*filterMap)[n.Type] {
					continue
				}
				if p.printObj(&n); err != nil {
					break LISTEN
				}
				continue
			case "delete":
				if streamOpts.notificationsOnly {
					continue
				}
				// TODO PrintObj ?
				errPrint("Event: [%s] Status %s was deleted", ev.Event, ev.Data.(string))
			default:
				errPrint("Unhandled event: [%s] %T", ev.Event, ev.Data)
			}
		}
	}
	close(stop)
	close(evChan)
	if err != nil {
		errPrint("Error: %s", err.Error())
		os.Exit(1)
	}
	return nil
}