Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,40 @@ Throughput, split into 9 x 1s:
warp: Cleanup done.
```

## MULTIPART PUT

Multipart put benchmark tests upload speed of parts. It creates multipart upload, uploads `--parts` parts of
`--part.size` size each and completes multipart upload when all parts are uploaded.

Multipart put test runs `--concurrent` separate multipart uploads. Each of those uploads split up to
`--part.concurrent` concurrent upload threads. So total concurrency is a `--concurrent`
multiplied by `--part.concurrent`.

```
λ warp multipart-put --parts 100 --part.size 5MiB
╭─────────────────────────────────╮
│ WARP S3 Benchmark Tool by MinIO │
╰─────────────────────────────────╯

Benchmarking: Press 'q' to abort benchmark and print partial results...

λ █████████████████████████████████████████████████████████████████████████ 100%

Reqs: 15867, Errs:0, Objs:15867, Bytes: 1983.4MiB
- PUTPART Average: 266 Obj/s, 33.2MiB/s; Current 260 Obj/s, 32.5MiB/s, 1193.7 ms/req

Report: PUTPART. Concurrency: 400. Ran: 58s
* Average: 33.36 MiB/s, 266.85 obj/s
* Reqs: Avg: 1262.5ms, 50%: 935.3ms, 90%: 2773.8ms, 99%: 4395.2ms, Fastest: 53.6ms, Slowest: 6976.4ms, StdDev: 1027.5ms

Throughput, split into 58 x 1s:
* Fastest: 37.9MiB/s, 302.87 obj/s
* 50% Median: 34.3MiB/s, 274.10 obj/s
* Slowest: 19.8MiB/s, 158.41 obj/s


Cleanup Done
```

## ZIP

Expand Down
1 change: 1 addition & 0 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func init() {
versionedCmd,
retentionCmd,
multipartCmd,
multipartPutCmd,
zipCmd,
snowballCmd,
fanoutCmd,
Expand Down
86 changes: 86 additions & 0 deletions cli/multipart_put.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package cli

import (
"github.com/minio/cli"
"github.com/minio/pkg/v3/console"
"github.com/minio/warp/pkg/bench"
)

var multipartPutFlags = []cli.Flag{
cli.IntFlag{
Name: "parts",
Value: 100,
Usage: "Number of parts to upload for each multipart upload",
},
cli.StringFlag{
Name: "part.size",
Value: "5MiB",
Usage: "Size of each part. Can be a number or MiB/GiB.",
},
cli.IntFlag{
Name: "part.concurrent",
Value: 20,
Usage: "Run this many concurrent operations per each multipart upload. Must not exceed a number of parts.",
},
}

var MultiPartPutCombinedFlags = combineFlags(globalFlags, ioFlags, multipartPutFlags, genFlags, benchFlags, analyzeFlags)

// MultipartPut command
var multipartPutCmd = cli.Command{
Name: "multipart-put",
Usage: "benchmark multipart upload",
Action: mainMutipartPut,
Before: setGlobalsFromContext,
Flags: MultiPartPutCombinedFlags,
CustomHelpTemplate: `NAME:
{{.HelpName}} - {{.Usage}}

USAGE:
{{.HelpName}} [FLAGS]
-> see https://github.com/minio/warp#multipart-put

FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}`,
}

// mainMutipartPut is the entry point for multipart-put command
func mainMutipartPut(ctx *cli.Context) error {
checkMultipartPutSyntax(ctx)

b := &bench.MultipartPut{
Common: getCommon(ctx, newGenSource(ctx, "part.size")),
PartsNumber: ctx.Int("parts"),
PartsConcurrency: ctx.Int("part.concurrent"),
}
return runBench(ctx, b)
}

func checkMultipartPutSyntax(ctx *cli.Context) {
if ctx.NArg() > 0 {
console.Fatal("Command takes no arguments")
}
if ctx.Bool("disable-multipart") {
console.Fatal("cannot disable multipart for multipart-put test")
}

if ctx.Int("parts") > 10000 {
console.Fatal("parts can't be more than 10000")
}
if ctx.Int("parts") <= 0 {
console.Fatal("parts must be at least 1")
}

if ctx.Int("part.concurrent") > ctx.Int("parts") {
console.Fatal("part.concurrent can't be more than parts")
}

sz, err := toSize(ctx.String("part.size"))
if err != nil {
console.Fatal("error parsing part.size:", err)
}
if sz <= 0 {
console.Fatal("part.size must be at least 1")
}
}
180 changes: 180 additions & 0 deletions pkg/bench/multipart_put.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package bench

import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/minio/minio-go/v7"
"golang.org/x/sync/errgroup"
)

// MultipartPut benchmarks multipart upload speed.
type MultipartPut struct {
Common

PartsNumber int
PartsConcurrency int
}

// Prepare for the benchmark run
func (g *MultipartPut) Prepare(ctx context.Context) error {
return g.createEmptyBucket(ctx)
}

// Start will execute the main benchmark.
// Operations should begin executing when the start channel is closed.
func (g *MultipartPut) Start(ctx context.Context, wait chan struct{}) error {
eg, ctx := errgroup.WithContext(ctx)
c := g.Collector
if g.AutoTermDur > 0 {
ctx = c.AutoTerm(ctx, http.MethodPut, g.AutoTermScale, autoTermCheck, autoTermSamples, g.AutoTermDur)
}

for i := 0; i < g.Concurrency; i++ {
thread := uint16(i)
eg.Go(func() error {
<-wait

for ctx.Err() == nil {
objectName := g.Source().Object().Name

uploadID, err := g.createMultupartUpload(ctx, objectName)
if errors.Is(err, context.Canceled) {
return nil
}
if err != nil {
g.Error("create multipart upload error:", err)
continue
}

err = g.uploadParts(ctx, thread, objectName, uploadID)
if errors.Is(err, context.Canceled) {
return nil
}
if err != nil {
g.Error("upload parts error:", err)
continue
}

err = g.completeMultipartUpload(ctx, objectName, uploadID)
if err != nil {
g.Error("complete multipart upload")
}
}
return nil
})
}
return eg.Wait()
}

// Cleanup up after the benchmark run.
func (g *MultipartPut) Cleanup(ctx context.Context) {
g.deleteAllInBucket(ctx, "")
}

func (g *MultipartPut) createMultupartUpload(ctx context.Context, objectName string) (string, error) {
if err := g.rpsLimit(ctx); err != nil {
return "", err
}

// Non-terminating context.
nonTerm := context.Background()

client, done := g.Client()
defer done()
c := minio.Core{Client: client}
return c.NewMultipartUpload(nonTerm, g.Bucket, objectName, g.PutOpts)
}

func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectName, uploadID string) error {
partIdxCh := make(chan int, g.PartsNumber)
for i := 0; i < g.PartsNumber; i++ {
partIdxCh <- i + 1
}
close(partIdxCh)

eg, ctx := errgroup.WithContext(ctx)

// Non-terminating context.
nonTerm := context.Background()

for i := 0; i < g.PartsConcurrency; i++ {
eg.Go(func() error {
i := i
for ctx.Err() == nil {
var partIdx int
var ok bool
select {
case partIdx, ok = <-partIdxCh:
if !ok {
return nil
}
case <-ctx.Done():
continue
}

if err := g.rpsLimit(ctx); err != nil {
return err
}

obj := g.Source().Object()
client, done := g.Client()
defer done()
core := minio.Core{Client: client}
op := Operation{
OpType: "PUTPART",
Thread: thread*uint16(g.PartsConcurrency) + uint16(i),
Size: obj.Size,
File: obj.Name,
ObjPerOp: 1,
Endpoint: client.EndpointURL().String(),
}
if g.DiscardOutput {
op.File = ""
}

opts := minio.PutObjectPartOptions{
SSE: g.PutOpts.ServerSideEncryption,
DisableContentSha256: g.PutOpts.DisableContentSha256,
}

op.Start = time.Now()
res, err := core.PutObjectPart(nonTerm, g.Bucket, objectName, uploadID, partIdx, obj.Reader, obj.Size, opts)
op.End = time.Now()
if err != nil {
err := fmt.Errorf("upload error: %w", err)
g.Error(err)
return err
}

if res.Size != obj.Size && op.Err == "" {
err := fmt.Sprint("short upload. want:", obj.Size, ", got:", res.Size)
if op.Err == "" {
op.Err = err
}
g.Error(err)
}

g.Collector.Receiver() <- op
}

return nil
})
}

return eg.Wait()
}

func (g *MultipartPut) completeMultipartUpload(_ context.Context, objectName, uploadID string) error {
// Non-terminating context.
nonTerm := context.Background()

cl, done := g.Client()
c := minio.Core{Client: cl}
defer done()
_, err := c.CompleteMultipartUpload(nonTerm, g.Bucket, objectName, uploadID, nil, g.PutOpts)
return err
}
Loading