Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- A command for scanning messages in a kafka topic, and optionally writing them
to the local filesystem

### Changed

- Messages get persisted to $HOME/.kplay by default; this can be overridden via
a flag

## [v2.0.0] - Apr 23, 2025

### Added
Expand Down
21 changes: 14 additions & 7 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ func Execute() error {

func NewRootCommand() (*cobra.Command, error) {
var (
configPath string
configPathFull string
homeDir string
configPath string
configPathFull string
homeDir string
outputDir string

persistMessages bool
skipMessages bool
commitMessages bool
Expand Down Expand Up @@ -151,7 +153,7 @@ Behaviours
return fmt.Errorf("%w: %s", errCouldntPingBrokers, err.Error())
}

return tui.Render(cl, config, behaviours, homeDir)
return tui.Render(cl, config, behaviours, outputDir)
},
}

Expand Down Expand Up @@ -261,20 +263,22 @@ Behaviours
- consumer group %s
- authentication %s
- encoding %s
- brokers %v
- brokers %s
- number of messages %d
- save messages %v
- decode values %v
- output directory %s
- batch size %d
`,
config.Topic,
config.ConsumerGroup,
config.AuthenticationDisplay(),
config.EncodingDisplay(),
config.Brokers,
strings.Join(config.Brokers, "\n "),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This change improves readability for multiple brokers. However, it introduces a couple of inconsistencies:

  1. The value passed is now a string, but the format specifier for brokers on line 266 is still %v. It should be changed to %s for correctness.
  2. This improved formatting is only applied to the scan command. For a consistent user experience, please consider applying the same formatting to the debug output of the tui (line 126) and serve (line 178) commands.

scanBehaviours.NumMessages,
scanBehaviours.SaveMessages,
scanBehaviours.Decode,
outputDir,
scanBehaviours.BatchSize,
)

Expand Down Expand Up @@ -310,7 +314,7 @@ Behaviours

defer client.Close()

scanner := scan.New(client, config, scanBehaviours, homeDir)
scanner := scan.New(client, config, scanBehaviours, outputDir)

return scanner.Execute()
},
Expand All @@ -328,13 +332,15 @@ Behaviours
}

defaultConfigPath := filepath.Join(configDir, configFileName)
defaultOutputDir := filepath.Join(homeDir, ".kplay")

tuiCmd.Flags().StringVarP(&configPath, "config-path", "c", defaultConfigPath, "location of kplay's config file")
tuiCmd.Flags().BoolVarP(&persistMessages, "persist-messages", "p", false, "whether to start the TUI with the setting \"persist messages\" ON")
tuiCmd.Flags().BoolVarP(&skipMessages, "skip-messages", "s", false, "whether to start the TUI with the setting \"skip messages\" ON")
tuiCmd.Flags().BoolVarP(&commitMessages, "commit-messages", "C", true, "whether to start the TUI with the setting \"commit messages\" ON")
tuiCmd.Flags().StringVarP(&consumerGroup, consumerGroupFlag, "g", "", "consumer group to use (overrides the one in kplay's config file)")
tuiCmd.Flags().BoolVar(&debug, "debug", false, "whether to only display config picked up by kplay without running it")
tuiCmd.Flags().StringVarP(&outputDir, "output-dir", "O", defaultOutputDir, "directory to persist messages in")

serveCmd.Flags().StringVarP(&configPath, "config-path", "c", defaultConfigPath, "location of kplay's config file")
serveCmd.Flags().StringVarP(&consumerGroup, consumerGroupFlag, "g", "", "consumer group to use (overrides the one in kplay's config file)")
Expand All @@ -352,6 +358,7 @@ Behaviours
scanCmd.Flags().BoolVarP(&scanSaveMessages, "save-messages", "s", false, "whether to save kafka messages to the local filesystem")
scanCmd.Flags().BoolVarP(&scanDecode, "decode", "d", true, "whether to decode message values (false is equivalent to 'encodingFormat: raw' in kplay's config)")
scanCmd.Flags().UintVarP(&scanBatchSize, "batch-size", "b", 100, "number of messages to fetch per batch (must be greater than 0)")
scanCmd.Flags().StringVarP(&outputDir, "output-dir", "O", defaultOutputDir, "directory to save scan results in")

rootCmd.AddCommand(tuiCmd)
rootCmd.AddCommand(serveCmd)
Expand Down
8 changes: 4 additions & 4 deletions internal/scan/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Scanner struct {
client *kgo.Client
config t.Config
behaviours Behaviours
homeDir string
outputDir string
progress scanProgress
}

Expand All @@ -47,12 +47,12 @@ type scanProgress struct {
fsErrors []fsError
}

func New(client *kgo.Client, config t.Config, behaviours Behaviours, homeDir string) Scanner {
func New(client *kgo.Client, config t.Config, behaviours Behaviours, outputDir string) Scanner {
scanner := Scanner{
client: client,
config: config,
behaviours: behaviours,
homeDir: homeDir,
outputDir: outputDir,
}

return scanner
Expand Down Expand Up @@ -101,7 +101,7 @@ func (s *Scanner) scan(ctx context.Context) error {
var recordWriter *messageWriter

now := time.Now().Unix()
scanOutputDir := filepath.Join(s.homeDir, ".kplay", "messages", s.config.Topic)
scanOutputDir := filepath.Join(s.outputDir, "messages", s.config.Topic)

err := os.MkdirAll(scanOutputDir, 0o755)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions internal/tui/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ func FetchMessages(cl *kgo.Client, config t.Config, commit bool, numRecords int)
}
}

func saveRecordDetailsToDisk(msg t.Message, homeDir, topic string, notifyUserOnSuccess bool) tea.Cmd {
func saveRecordDetailsToDisk(msg t.Message, outputDir, topic string, notifyUserOnSuccess bool) tea.Cmd {
return func() tea.Msg {
filePath := filepath.Join(
homeDir,
".kplay",
outputDir,
"messages",
topic,
fmt.Sprintf("partition-%d", msg.Partition),
Expand Down
4 changes: 2 additions & 2 deletions internal/tui/initial.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

func InitialModel(kCl *kgo.Client, config t.Config, behaviours t.TUIBehaviours, homeDir string) Model {
func InitialModel(kCl *kgo.Client, config t.Config, behaviours t.TUIBehaviours, outputDir string) Model {
appDelegateKeys := newAppDelegateKeyMap()
appDelegate := newAppItemDelegate(appDelegateKeys)
jobItems := make([]list.Item, 0)
Expand All @@ -17,7 +17,7 @@ func InitialModel(kCl *kgo.Client, config t.Config, behaviours t.TUIBehaviours,
client: kCl,
msgsList: list.New(jobItems, appDelegate, listWidth, 0),
currentMsgIndex: -1,
homeDir: homeDir,
outputDir: outputDir,
behaviours: behaviours,
showHelpIndicator: true,
}
Expand Down
2 changes: 1 addition & 1 deletion internal/tui/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Model struct {
msgDetailsVP viewport.Model
msgDetailsVPReady bool
showHelpIndicator bool
homeDir string
outputDir string
behaviours t.TUIBehaviours
helpVPReady bool
terminalWidth int
Expand Down
4 changes: 2 additions & 2 deletions internal/tui/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

var errCouldntSetupDebugLogging = errors.New("couldn't set up debug logging")

func Render(kCl *kgo.Client, config t.Config, behaviours t.TUIBehaviours, homeDir string) error {
func Render(kCl *kgo.Client, config t.Config, behaviours t.TUIBehaviours, outputDir string) error {
if len(os.Getenv("DEBUG")) > 0 {
f, err := tea.LogToFile("debug.log", "debug")
if err != nil {
Expand All @@ -21,7 +21,7 @@ func Render(kCl *kgo.Client, config t.Config, behaviours t.TUIBehaviours, homeDi
defer f.Close()
}

p := tea.NewProgram(InitialModel(kCl, config, behaviours, homeDir), tea.WithAltScreen())
p := tea.NewProgram(InitialModel(kCl, config, behaviours, outputDir), tea.WithAltScreen())
_, err := p.Run()

return err
Expand Down
4 changes: 2 additions & 2 deletions internal/tui/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
break
}

cmds = append(cmds, saveRecordDetailsToDisk(message, m.homeDir, m.config.Topic, true))
cmds = append(cmds, saveRecordDetailsToDisk(message, m.outputDir, m.config.Topic, true))
}
case tea.WindowSizeMsg:
w1, h1 := messageListStyle.GetFrameSize()
Expand Down Expand Up @@ -247,7 +247,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
for _, message := range msg.messages {
m.msgsList.InsertItem(len(m.msgsList.Items()), message)
if m.behaviours.PersistMessages {
cmds = append(cmds, saveRecordDetailsToDisk(message, m.homeDir, m.config.Topic, false))
cmds = append(cmds, saveRecordDetailsToDisk(message, m.outputDir, m.config.Topic, false))
}
}
m.msg = fmt.Sprintf("%d message(s) fetched", len(msg.messages))
Expand Down