From a4b0712310f328824076d77b911b04fadef42e9f Mon Sep 17 00:00:00 2001 From: Dmitrii Te Date: Tue, 25 Mar 2025 18:29:08 +0100 Subject: [PATCH 01/13] Support multipart-upload benchmarking --- README.md | 36 +++++++ cli/cli.go | 1 + cli/multipart_upload.go | 71 ++++++++++++++ pkg/bench/multipart_upload.go | 178 ++++++++++++++++++++++++++++++++++ 4 files changed, 286 insertions(+) create mode 100644 cli/multipart_upload.go create mode 100644 pkg/bench/multipart_upload.go diff --git a/README.md b/README.md index 4481224b..d191b1a5 100644 --- a/README.md +++ b/README.md @@ -507,6 +507,42 @@ Throughput, split into 9 x 1s: warp: Cleanup done. ``` +## MULTIPART UPLOAD + +Multipart upload benchmark tests upload speed of parts. It creates multipart upload, uploads `--parts` parts of +`--part.size` size each and completes multipart upload when all parts are uploaded. + +Multipart upload test runs `--concurrent` separate multipart uploads. Each of those uploads split up to +`--part.concurrent` concurrent upload threads. So total concurrency is a `--concurrent` +multiplied by `--part.concurrent`. + +``` +λ warp multipart-upload --parts 100 --part.size 5MiB +╭─────────────────────────────────╮ +│ WARP S3 Benchmark Tool by MinIO │ +╰─────────────────────────────────╯ + +Benchmarking: +Benchmark data written to "warp-multipart-upload-2025-02-17[141311]-rsbn.json.zst"... + + λ ███████████████████████████████ + +Reqs: 1185, Errs:0, Objs:1185, Bytes: 5925.0MiB + - PUT Average: 3 Obj/s, 15.9MiB/s; Current 0 Obj/s, 238.8KiB/s, 97559.8 ms/req + + +Report: PUT. Concurrency: 20. Ran: 6m4s + * Average: 15.92 MiB/s, 3.18 obj/s + * Reqs: Avg: 119293.2ms, 50%: 120351.8ms, 90%: 132827.4ms, 99%: 141314.2ms, Fastest: 62788.9ms, Slowest: 202591.5ms, StdDev: 10800.6ms + +Throughput, split into 364 x 1s: + * Fastest: 17.3MiB/s, 3.46 obj/s + * 50% Median: 16.5MiB/s, 3.31 obj/s + * Slowest: 238.8KiB/s, 0.05 obj/s + + +Cleanup Done +``` ## ZIP diff --git a/cli/cli.go b/cli/cli.go index be67afa7..987e8e7c 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -98,6 +98,7 @@ func init() { versionedCmd, retentionCmd, multipartCmd, + multipartUploadCmd, zipCmd, snowballCmd, fanoutCmd, diff --git a/cli/multipart_upload.go b/cli/multipart_upload.go new file mode 100644 index 00000000..8e87956f --- /dev/null +++ b/cli/multipart_upload.go @@ -0,0 +1,71 @@ +package cli + +import ( + "github.com/minio/cli" + "github.com/minio/pkg/v3/console" + "github.com/minio/warp/pkg/bench" +) + +var multipartUploadFlags = []cli.Flag{ + cli.IntFlag{ + Name: "parts", + Value: 100, + Usage: "Number of parts to upload for each multipart upload", + }, + cli.StringFlag{ + Name: "part.size", + Value: "5MiB", + Usage: "Size of each part. Can be a number or MiB/GiB. Must be a single value >= 5MiB", + }, + cli.IntFlag{ + Name: "part.concurrent", + Value: 20, + Usage: "Run this many concurrent operations per each multipart upload. Must not exceed obj.size/part.size", + }, +} + +var MultiPartUploadCombinedFlags = combineFlags(globalFlags, ioFlags, multipartUploadFlags, genFlags, benchFlags, analyzeFlags) + +// MultipartUpload command +var multipartUploadCmd = cli.Command{ + Name: "multipart-upload", + Usage: "benchmark multipart upload", + Action: mainMutipartUpload, + Before: setGlobalsFromContext, + Flags: MultiPartUploadCombinedFlags, + CustomHelpTemplate: `NAME: + {{.HelpName}} - {{.Usage}} + +USAGE: + {{.HelpName}} [FLAGS] + -> see https://github.com/minio/warp#multipart-upload + +FLAGS: + {{range .VisibleFlags}}{{.}} + {{end}}`, +} + +// mainMutipartUpload is the entry point for multipart-upload command +func mainMutipartUpload(ctx *cli.Context) error { + checkMultipartUploadSyntax(ctx) + + b := &bench.MultipartUpload{ + Common: getCommon(ctx, newGenSource(ctx, "part.size")), + PartsNumber: ctx.Int("parts"), + PartsConcurrency: ctx.Int("part.concurrent"), + } + return runBench(ctx, b) +} + +func checkMultipartUploadSyntax(ctx *cli.Context) { + if ctx.NArg() > 0 { + console.Fatal("Command takes no arguments") + } + if ctx.Bool("disable-multipart") { + console.Fatal("cannot disable multipart for multipart-upload test") + } + + if ctx.Int("parts") > 10000 { + console.Fatal("parts can't be more than 10000") + } +} diff --git a/pkg/bench/multipart_upload.go b/pkg/bench/multipart_upload.go new file mode 100644 index 00000000..b8aa8fc7 --- /dev/null +++ b/pkg/bench/multipart_upload.go @@ -0,0 +1,178 @@ +package bench + +import ( + "context" + "errors" + "fmt" + "net/http" + "time" + + "github.com/minio/minio-go/v7" + "golang.org/x/sync/errgroup" +) + +// MultipartUpload benchmarks multipart upload speed. +type MultipartUpload struct { + Common + + PartsNumber int + PartsConcurrency int +} + +// Prepare for the benchmark run +func (g *MultipartUpload) Prepare(ctx context.Context) error { + if err := g.createEmptyBucket(ctx); err != nil { + return err + } + return nil +} + +// Start will execute the main benchmark. +// Operations should begin executing when the start channel is closed. +func (g *MultipartUpload) Start(ctx context.Context, wait chan struct{}) error { + eg, ctx := errgroup.WithContext(ctx) + c := g.Collector + if g.AutoTermDur > 0 { + ctx = c.AutoTerm(ctx, http.MethodPut, g.AutoTermScale, autoTermCheck, autoTermSamples, g.AutoTermDur) + } + + for i := 0; i < g.Concurrency; i++ { + thread := uint16(i) + eg.Go(func() error { + <-wait + + for { + select { + case <-ctx.Done(): + return nil + default: + } + + objectName := g.Source().Object().Name + + uploadID, err := g.createMultupartUpload(ctx, objectName) + if errors.Is(err, context.Canceled) { + return nil + } + if err != nil { + g.Error("create multipart upload error:", err) + continue + } + + err = g.uploadParts(ctx, thread, objectName, uploadID) + if errors.Is(err, context.Canceled) { + return nil + } + if err != nil { + g.Error("upload parts error:", err) + continue + } + + err = g.completeMultipartUpload(ctx, objectName, uploadID) + if err != nil { + g.Error("complete multipart upload") + } + } + }) + } + return eg.Wait() +} + +// Cleanup up after the benchmark run. +func (g *MultipartUpload) Cleanup(ctx context.Context) { + g.deleteAllInBucket(ctx, "") +} + +func (g *MultipartUpload) createMultupartUpload(ctx context.Context, objectName string) (string, error) { + if err := g.rpsLimit(ctx); err != nil { + return "", err + } + + // Non-terminating context. + nonTerm := context.Background() + + client, done := g.Client() + defer done() + c := minio.Core{Client: client} + return c.NewMultipartUpload(nonTerm, g.Bucket, objectName, g.PutOpts) +} + +func (g *MultipartUpload) uploadParts(ctx context.Context, thread uint16, objectName string, uploadID string) error { + partIdxCh := make(chan int, g.PartsNumber) + for i := range g.PartsNumber { + partIdxCh <- i + 1 + } + close(partIdxCh) + + eg, ctx := errgroup.WithContext(ctx) + + // Non-terminating context. + nonTerm := context.Background() + + for i := 0; i < g.PartsConcurrency; i++ { + eg.Go(func() error { + for ctx.Err() == nil { + var partIdx int + var ok bool + select { + case partIdx, ok = <-partIdxCh: + if !ok { + return nil + } + case <-ctx.Done(): + continue + } + + if err := g.rpsLimit(ctx); err != nil { + return err + } + + obj := g.Source().Object() + client, done := g.Client() + defer done() + core := minio.Core{Client: client} + op := Operation{ + OpType: http.MethodPut, + Thread: thread, + Size: obj.Size, + File: obj.Name, + ObjPerOp: 1, + Endpoint: client.EndpointURL().String(), + } + if g.DiscardOutput { + op.File = "" + } + + opts := minio.PutObjectPartOptions{ + SSE: g.Common.PutOpts.ServerSideEncryption, + DisableContentSha256: g.PutOpts.DisableContentSha256, + } + + op.Start = time.Now() + _, err := core.PutObjectPart(nonTerm, g.Bucket, objectName, uploadID, partIdx, obj.Reader, obj.Size, opts) + op.End = time.Now() + if err != nil { + err := fmt.Errorf("upload error: %w", err) + g.Error(err) + return err + } + g.Collector.Receiver() <- op + } + + return nil + }) + } + + return eg.Wait() +} + +func (g *MultipartUpload) completeMultipartUpload(_ context.Context, objectName string, uploadID string) error { + // Non-terminating context. + nonTerm := context.Background() + + cl, done := g.Client() + c := minio.Core{Client: cl} + defer done() + _, err := c.CompleteMultipartUpload(nonTerm, g.Bucket, objectName, uploadID, nil, g.PutOpts) + return err +} From e8a434466005456f412ff7ad871da2e977f941a4 Mon Sep 17 00:00:00 2001 From: d1tyo <149515374+d1tyo@users.noreply.github.com> Date: Wed, 26 Mar 2025 19:31:28 +0100 Subject: [PATCH 02/13] rename multipart-upload to multipart-put Co-authored-by: Klaus Post --- cli/multipart_upload.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/multipart_upload.go b/cli/multipart_upload.go index 8e87956f..090d4664 100644 --- a/cli/multipart_upload.go +++ b/cli/multipart_upload.go @@ -28,7 +28,7 @@ var MultiPartUploadCombinedFlags = combineFlags(globalFlags, ioFlags, multipartU // MultipartUpload command var multipartUploadCmd = cli.Command{ - Name: "multipart-upload", + Name: "multipart-put", Usage: "benchmark multipart upload", Action: mainMutipartUpload, Before: setGlobalsFromContext, From fe6c56948427244ccf84cb4aaa7715e72f438f97 Mon Sep 17 00:00:00 2001 From: d1tyo <149515374+d1tyo@users.noreply.github.com> Date: Wed, 26 Mar 2025 19:32:35 +0100 Subject: [PATCH 03/13] simplify error handling Co-authored-by: Klaus Post --- pkg/bench/multipart_upload.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/bench/multipart_upload.go b/pkg/bench/multipart_upload.go index b8aa8fc7..b55bed94 100644 --- a/pkg/bench/multipart_upload.go +++ b/pkg/bench/multipart_upload.go @@ -21,10 +21,7 @@ type MultipartUpload struct { // Prepare for the benchmark run func (g *MultipartUpload) Prepare(ctx context.Context) error { - if err := g.createEmptyBucket(ctx); err != nil { - return err - } - return nil + return g.createEmptyBucket(ctx) } // Start will execute the main benchmark. From 6367cb9f2e2de6b338f66a32a767cabb1a3fcf2e Mon Sep 17 00:00:00 2001 From: d1tyo <149515374+d1tyo@users.noreply.github.com> Date: Wed, 26 Mar 2025 19:38:52 +0100 Subject: [PATCH 04/13] better "parts" validation Co-authored-by: Klaus Post --- cli/multipart_upload.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cli/multipart_upload.go b/cli/multipart_upload.go index 090d4664..e435f4b9 100644 --- a/cli/multipart_upload.go +++ b/cli/multipart_upload.go @@ -68,4 +68,7 @@ func checkMultipartUploadSyntax(ctx *cli.Context) { if ctx.Int("parts") > 10000 { console.Fatal("parts can't be more than 10000") } + if ctx.Int("parts") <= 0 { + console.Fatal("parts must be at least 1") + } } From d1b600bd5019ec78dce633cc9fa95772a1c0533e Mon Sep 17 00:00:00 2001 From: Dmitrii Te Date: Wed, 26 Mar 2025 19:39:55 +0100 Subject: [PATCH 05/13] simplify for-loop condition --- pkg/bench/multipart_upload.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pkg/bench/multipart_upload.go b/pkg/bench/multipart_upload.go index b55bed94..bcc1cdb5 100644 --- a/pkg/bench/multipart_upload.go +++ b/pkg/bench/multipart_upload.go @@ -38,13 +38,7 @@ func (g *MultipartUpload) Start(ctx context.Context, wait chan struct{}) error { eg.Go(func() error { <-wait - for { - select { - case <-ctx.Done(): - return nil - default: - } - + for ctx.Err() == nil { objectName := g.Source().Object().Name uploadID, err := g.createMultupartUpload(ctx, objectName) @@ -70,6 +64,7 @@ func (g *MultipartUpload) Start(ctx context.Context, wait chan struct{}) error { g.Error("complete multipart upload") } } + return nil }) } return eg.Wait() From 95b1c025f079e06400eb0dc983b9db488d0741e3 Mon Sep 17 00:00:00 2001 From: Dmitrii Te Date: Wed, 26 Mar 2025 19:53:39 +0100 Subject: [PATCH 06/13] check all arguments --- cli/multipart_upload.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cli/multipart_upload.go b/cli/multipart_upload.go index e435f4b9..1564b04c 100644 --- a/cli/multipart_upload.go +++ b/cli/multipart_upload.go @@ -15,12 +15,12 @@ var multipartUploadFlags = []cli.Flag{ cli.StringFlag{ Name: "part.size", Value: "5MiB", - Usage: "Size of each part. Can be a number or MiB/GiB. Must be a single value >= 5MiB", + Usage: "Size of each part. Can be a number or MiB/GiB.", }, cli.IntFlag{ Name: "part.concurrent", Value: 20, - Usage: "Run this many concurrent operations per each multipart upload. Must not exceed obj.size/part.size", + Usage: "Run this many concurrent operations per each multipart upload. Must not exceed a number of parts.", }, } @@ -71,4 +71,10 @@ func checkMultipartUploadSyntax(ctx *cli.Context) { if ctx.Int("parts") <= 0 { console.Fatal("parts must be at least 1") } + if ctx.Int("part.concurrent") > ctx.Int("parts") { + console.Fatal("part.concurrent can't be more than parts") + } + if _, err := toSize(ctx.String("part.size")); err != nil { + console.Fatal("error parsing part.size:", err) + } } From 2949ba2d844d41c0911181966d8d65774f58b45c Mon Sep 17 00:00:00 2001 From: Dmitrii Te Date: Wed, 26 Mar 2025 19:59:39 +0100 Subject: [PATCH 07/13] check that part.size > 0 --- cli/multipart_upload.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cli/multipart_upload.go b/cli/multipart_upload.go index 1564b04c..aa4fc82b 100644 --- a/cli/multipart_upload.go +++ b/cli/multipart_upload.go @@ -71,10 +71,16 @@ func checkMultipartUploadSyntax(ctx *cli.Context) { if ctx.Int("parts") <= 0 { console.Fatal("parts must be at least 1") } + if ctx.Int("part.concurrent") > ctx.Int("parts") { console.Fatal("part.concurrent can't be more than parts") } - if _, err := toSize(ctx.String("part.size")); err != nil { + + sz, err := toSize(ctx.String("part.size")) + if err != nil { console.Fatal("error parsing part.size:", err) } + if sz <= 0 { + console.Fatal("part.size must be at least 1") + } } From 88223d0a383938281e0ab7bc3a801f95329753d9 Mon Sep 17 00:00:00 2001 From: Dmitrii Te Date: Wed, 26 Mar 2025 20:15:36 +0100 Subject: [PATCH 08/13] separate Thread value for separate part.concurrent thread --- README.md | 10 +++++----- pkg/bench/multipart_upload.go | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index d191b1a5..51ffe21c 100644 --- a/README.md +++ b/README.md @@ -507,23 +507,23 @@ Throughput, split into 9 x 1s: warp: Cleanup done. ``` -## MULTIPART UPLOAD +## MULTIPART PUT -Multipart upload benchmark tests upload speed of parts. It creates multipart upload, uploads `--parts` parts of +Multipart put benchmark tests upload speed of parts. It creates multipart upload, uploads `--parts` parts of `--part.size` size each and completes multipart upload when all parts are uploaded. -Multipart upload test runs `--concurrent` separate multipart uploads. Each of those uploads split up to +Multipart put test runs `--concurrent` separate multipart uploads. Each of those uploads split up to `--part.concurrent` concurrent upload threads. So total concurrency is a `--concurrent` multiplied by `--part.concurrent`. ``` -λ warp multipart-upload --parts 100 --part.size 5MiB +λ warp multipart-put --parts 100 --part.size 5MiB ╭─────────────────────────────────╮ │ WARP S3 Benchmark Tool by MinIO │ ╰─────────────────────────────────╯ Benchmarking: -Benchmark data written to "warp-multipart-upload-2025-02-17[141311]-rsbn.json.zst"... +Benchmark data written to "warp-multipart-put-2025-02-17[141311]-rsbn.json.zst"... λ ███████████████████████████████ diff --git a/pkg/bench/multipart_upload.go b/pkg/bench/multipart_upload.go index bcc1cdb5..e9d05e38 100644 --- a/pkg/bench/multipart_upload.go +++ b/pkg/bench/multipart_upload.go @@ -103,6 +103,7 @@ func (g *MultipartUpload) uploadParts(ctx context.Context, thread uint16, object for i := 0; i < g.PartsConcurrency; i++ { eg.Go(func() error { + i := i for ctx.Err() == nil { var partIdx int var ok bool @@ -125,7 +126,7 @@ func (g *MultipartUpload) uploadParts(ctx context.Context, thread uint16, object core := minio.Core{Client: client} op := Operation{ OpType: http.MethodPut, - Thread: thread, + Thread: thread*uint16(g.PartsConcurrency) + uint16(i), Size: obj.Size, File: obj.Name, ObjPerOp: 1, From 37f35ecca99aca317ad9dae044a2562e53e76455 Mon Sep 17 00:00:00 2001 From: Dmitrii Te Date: Wed, 26 Mar 2025 20:29:37 +0100 Subject: [PATCH 09/13] rename multipart-upload -> multipart-put --- cli/cli.go | 2 +- cli/{multipart_upload.go => multipart_put.go} | 26 +++++++++---------- .../{multipart_upload.go => multipart_put.go} | 16 ++++++------ 3 files changed, 22 insertions(+), 22 deletions(-) rename cli/{multipart_upload.go => multipart_put.go} (69%) rename pkg/bench/{multipart_upload.go => multipart_put.go} (84%) diff --git a/cli/cli.go b/cli/cli.go index 987e8e7c..46ccc642 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -98,7 +98,7 @@ func init() { versionedCmd, retentionCmd, multipartCmd, - multipartUploadCmd, + multipartPutCmd, zipCmd, snowballCmd, fanoutCmd, diff --git a/cli/multipart_upload.go b/cli/multipart_put.go similarity index 69% rename from cli/multipart_upload.go rename to cli/multipart_put.go index aa4fc82b..3bea966b 100644 --- a/cli/multipart_upload.go +++ b/cli/multipart_put.go @@ -6,7 +6,7 @@ import ( "github.com/minio/warp/pkg/bench" ) -var multipartUploadFlags = []cli.Flag{ +var multipartPutFlags = []cli.Flag{ cli.IntFlag{ Name: "parts", Value: 100, @@ -24,32 +24,32 @@ var multipartUploadFlags = []cli.Flag{ }, } -var MultiPartUploadCombinedFlags = combineFlags(globalFlags, ioFlags, multipartUploadFlags, genFlags, benchFlags, analyzeFlags) +var MultiPartPutCombinedFlags = combineFlags(globalFlags, ioFlags, multipartPutFlags, genFlags, benchFlags, analyzeFlags) -// MultipartUpload command -var multipartUploadCmd = cli.Command{ +// MultipartPut command +var multipartPutCmd = cli.Command{ Name: "multipart-put", Usage: "benchmark multipart upload", - Action: mainMutipartUpload, + Action: mainMutipartPut, Before: setGlobalsFromContext, - Flags: MultiPartUploadCombinedFlags, + Flags: MultiPartPutCombinedFlags, CustomHelpTemplate: `NAME: {{.HelpName}} - {{.Usage}} USAGE: {{.HelpName}} [FLAGS] - -> see https://github.com/minio/warp#multipart-upload + -> see https://github.com/minio/warp#multipart-put FLAGS: {{range .VisibleFlags}}{{.}} {{end}}`, } -// mainMutipartUpload is the entry point for multipart-upload command -func mainMutipartUpload(ctx *cli.Context) error { - checkMultipartUploadSyntax(ctx) +// mainMutipartPut is the entry point for multipart-put command +func mainMutipartPut(ctx *cli.Context) error { + checkMultipartPutSyntax(ctx) - b := &bench.MultipartUpload{ + b := &bench.MultipartPut{ Common: getCommon(ctx, newGenSource(ctx, "part.size")), PartsNumber: ctx.Int("parts"), PartsConcurrency: ctx.Int("part.concurrent"), @@ -57,12 +57,12 @@ func mainMutipartUpload(ctx *cli.Context) error { return runBench(ctx, b) } -func checkMultipartUploadSyntax(ctx *cli.Context) { +func checkMultipartPutSyntax(ctx *cli.Context) { if ctx.NArg() > 0 { console.Fatal("Command takes no arguments") } if ctx.Bool("disable-multipart") { - console.Fatal("cannot disable multipart for multipart-upload test") + console.Fatal("cannot disable multipart for multipart-put test") } if ctx.Int("parts") > 10000 { diff --git a/pkg/bench/multipart_upload.go b/pkg/bench/multipart_put.go similarity index 84% rename from pkg/bench/multipart_upload.go rename to pkg/bench/multipart_put.go index e9d05e38..49205023 100644 --- a/pkg/bench/multipart_upload.go +++ b/pkg/bench/multipart_put.go @@ -11,8 +11,8 @@ import ( "golang.org/x/sync/errgroup" ) -// MultipartUpload benchmarks multipart upload speed. -type MultipartUpload struct { +// MultipartPut benchmarks multipart upload speed. +type MultipartPut struct { Common PartsNumber int @@ -20,13 +20,13 @@ type MultipartUpload struct { } // Prepare for the benchmark run -func (g *MultipartUpload) Prepare(ctx context.Context) error { +func (g *MultipartPut) Prepare(ctx context.Context) error { return g.createEmptyBucket(ctx) } // Start will execute the main benchmark. // Operations should begin executing when the start channel is closed. -func (g *MultipartUpload) Start(ctx context.Context, wait chan struct{}) error { +func (g *MultipartPut) Start(ctx context.Context, wait chan struct{}) error { eg, ctx := errgroup.WithContext(ctx) c := g.Collector if g.AutoTermDur > 0 { @@ -71,11 +71,11 @@ func (g *MultipartUpload) Start(ctx context.Context, wait chan struct{}) error { } // Cleanup up after the benchmark run. -func (g *MultipartUpload) Cleanup(ctx context.Context) { +func (g *MultipartPut) Cleanup(ctx context.Context) { g.deleteAllInBucket(ctx, "") } -func (g *MultipartUpload) createMultupartUpload(ctx context.Context, objectName string) (string, error) { +func (g *MultipartPut) createMultupartUpload(ctx context.Context, objectName string) (string, error) { if err := g.rpsLimit(ctx); err != nil { return "", err } @@ -89,7 +89,7 @@ func (g *MultipartUpload) createMultupartUpload(ctx context.Context, objectName return c.NewMultipartUpload(nonTerm, g.Bucket, objectName, g.PutOpts) } -func (g *MultipartUpload) uploadParts(ctx context.Context, thread uint16, objectName string, uploadID string) error { +func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectName string, uploadID string) error { partIdxCh := make(chan int, g.PartsNumber) for i := range g.PartsNumber { partIdxCh <- i + 1 @@ -159,7 +159,7 @@ func (g *MultipartUpload) uploadParts(ctx context.Context, thread uint16, object return eg.Wait() } -func (g *MultipartUpload) completeMultipartUpload(_ context.Context, objectName string, uploadID string) error { +func (g *MultipartPut) completeMultipartUpload(_ context.Context, objectName string, uploadID string) error { // Non-terminating context. nonTerm := context.Background() From 248964b8151b8937151987a072364647e48e39fd Mon Sep 17 00:00:00 2001 From: Dmitrii Te Date: Wed, 26 Mar 2025 20:39:26 +0100 Subject: [PATCH 10/13] readme --- README.md | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 51ffe21c..2dcd7af5 100644 --- a/README.md +++ b/README.md @@ -522,23 +522,21 @@ multiplied by `--part.concurrent`. │ WARP S3 Benchmark Tool by MinIO │ ╰─────────────────────────────────╯ -Benchmarking: -Benchmark data written to "warp-multipart-put-2025-02-17[141311]-rsbn.json.zst"... +Benchmarking: Press 'q' to abort benchmark and print partial results... - λ ███████████████████████████████ + λ █████████████████████████████████████████████████████████████████████████ 100% -Reqs: 1185, Errs:0, Objs:1185, Bytes: 5925.0MiB - - PUT Average: 3 Obj/s, 15.9MiB/s; Current 0 Obj/s, 238.8KiB/s, 97559.8 ms/req +Reqs: 2034, Errs:0, Objs:2034, Bytes: 10170.0MiB + - PUT Average: 7 Obj/s, 33.2MiB/s; Current 2 Obj/s, 12.0MiB/s, 40233.9 ms/req +Report: PUT. Concurrency: 400. Ran: 5m5s + * Average: 33.24 MiB/s, 6.65 obj/s + * Reqs: Avg: 55325.0ms, 50%: 55521.4ms, 90%: 69196.4ms, 99%: 78513.4ms, Fastest: 4896.5ms, Slowest: 97884.8ms, StdDev: 10521.0ms -Report: PUT. Concurrency: 20. Ran: 6m4s - * Average: 15.92 MiB/s, 3.18 obj/s - * Reqs: Avg: 119293.2ms, 50%: 120351.8ms, 90%: 132827.4ms, 99%: 141314.2ms, Fastest: 62788.9ms, Slowest: 202591.5ms, StdDev: 10800.6ms - -Throughput, split into 364 x 1s: - * Fastest: 17.3MiB/s, 3.46 obj/s - * 50% Median: 16.5MiB/s, 3.31 obj/s - * Slowest: 238.8KiB/s, 0.05 obj/s +Throughput, split into 305 x 1s: + * Fastest: 39.1MiB/s, 7.81 obj/s + * 50% Median: 33.3MiB/s, 6.66 obj/s + * Slowest: 10.2MiB/s, 2.04 obj/s Cleanup Done From 7abd3df50788a51e0a8b452fecb5829672afabf5 Mon Sep 17 00:00:00 2001 From: d1tyo Date: Thu, 27 Mar 2025 19:04:43 +0100 Subject: [PATCH 11/13] Rename OpType to PUTPART and check that actual uploaded size is equal to expected one --- README.md | 28 ++++++++++++++-------------- pkg/bench/multipart_put.go | 15 ++++++++++++--- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 2dcd7af5..bf94b830 100644 --- a/README.md +++ b/README.md @@ -521,25 +521,25 @@ multiplied by `--part.concurrent`. ╭─────────────────────────────────╮ │ WARP S3 Benchmark Tool by MinIO │ ╰─────────────────────────────────╯ - + Benchmarking: Press 'q' to abort benchmark and print partial results... - + λ █████████████████████████████████████████████████████████████████████████ 100% + +Reqs: 15867, Errs:0, Objs:15867, Bytes: 1983.4MiB + - PUTPART Average: 266 Obj/s, 33.2MiB/s; Current 260 Obj/s, 32.5MiB/s, 1193.7 ms/req + +Report: PUTPART. Concurrency: 400. Ran: 58s + * Average: 33.36 MiB/s, 266.85 obj/s + * Reqs: Avg: 1262.5ms, 50%: 935.3ms, 90%: 2773.8ms, 99%: 4395.2ms, Fastest: 53.6ms, Slowest: 6976.4ms, StdDev: 1027.5ms -Reqs: 2034, Errs:0, Objs:2034, Bytes: 10170.0MiB - - PUT Average: 7 Obj/s, 33.2MiB/s; Current 2 Obj/s, 12.0MiB/s, 40233.9 ms/req - -Report: PUT. Concurrency: 400. Ran: 5m5s - * Average: 33.24 MiB/s, 6.65 obj/s - * Reqs: Avg: 55325.0ms, 50%: 55521.4ms, 90%: 69196.4ms, 99%: 78513.4ms, Fastest: 4896.5ms, Slowest: 97884.8ms, StdDev: 10521.0ms - -Throughput, split into 305 x 1s: - * Fastest: 39.1MiB/s, 7.81 obj/s - * 50% Median: 33.3MiB/s, 6.66 obj/s - * Slowest: 10.2MiB/s, 2.04 obj/s +Throughput, split into 58 x 1s: + * Fastest: 37.9MiB/s, 302.87 obj/s + * 50% Median: 34.3MiB/s, 274.10 obj/s + * Slowest: 19.8MiB/s, 158.41 obj/s -Cleanup Done +Cleanup Done ``` ## ZIP diff --git a/pkg/bench/multipart_put.go b/pkg/bench/multipart_put.go index 49205023..cc443761 100644 --- a/pkg/bench/multipart_put.go +++ b/pkg/bench/multipart_put.go @@ -91,7 +91,7 @@ func (g *MultipartPut) createMultupartUpload(ctx context.Context, objectName str func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectName string, uploadID string) error { partIdxCh := make(chan int, g.PartsNumber) - for i := range g.PartsNumber { + for i := 0; i < g.PartsNumber; i++ { partIdxCh <- i + 1 } close(partIdxCh) @@ -125,7 +125,7 @@ func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectNam defer done() core := minio.Core{Client: client} op := Operation{ - OpType: http.MethodPut, + OpType: "PUTPART", Thread: thread*uint16(g.PartsConcurrency) + uint16(i), Size: obj.Size, File: obj.Name, @@ -142,13 +142,22 @@ func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectNam } op.Start = time.Now() - _, err := core.PutObjectPart(nonTerm, g.Bucket, objectName, uploadID, partIdx, obj.Reader, obj.Size, opts) + res, err := core.PutObjectPart(nonTerm, g.Bucket, objectName, uploadID, partIdx, obj.Reader, obj.Size, opts) op.End = time.Now() if err != nil { err := fmt.Errorf("upload error: %w", err) g.Error(err) return err } + + if res.Size != obj.Size && op.Err == "" { + err := fmt.Sprint("short upload. want:", obj.Size, ", got:", res.Size) + if op.Err == "" { + op.Err = err + } + g.Error(err) + } + g.Collector.Receiver() <- op } From e9a9d191df6df9c217e4b909ab2d361628bbc79f Mon Sep 17 00:00:00 2001 From: d1tyo Date: Thu, 27 Mar 2025 19:06:53 +0100 Subject: [PATCH 12/13] minor cleanup in readme.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index bf94b830..45ceb9d7 100644 --- a/README.md +++ b/README.md @@ -539,7 +539,7 @@ Throughput, split into 58 x 1s: * Slowest: 19.8MiB/s, 158.41 obj/s -Cleanup Done +Cleanup Done ``` ## ZIP From b43a8a9ed7e9d3950167e3fb7ecf4585ad97347a Mon Sep 17 00:00:00 2001 From: d1tyo Date: Thu, 17 Apr 2025 18:48:44 +0200 Subject: [PATCH 13/13] fix linter issues --- pkg/bench/multipart_put.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/bench/multipart_put.go b/pkg/bench/multipart_put.go index cc443761..928cdfa5 100644 --- a/pkg/bench/multipart_put.go +++ b/pkg/bench/multipart_put.go @@ -89,7 +89,7 @@ func (g *MultipartPut) createMultupartUpload(ctx context.Context, objectName str return c.NewMultipartUpload(nonTerm, g.Bucket, objectName, g.PutOpts) } -func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectName string, uploadID string) error { +func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectName, uploadID string) error { partIdxCh := make(chan int, g.PartsNumber) for i := 0; i < g.PartsNumber; i++ { partIdxCh <- i + 1 @@ -137,7 +137,7 @@ func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectNam } opts := minio.PutObjectPartOptions{ - SSE: g.Common.PutOpts.ServerSideEncryption, + SSE: g.PutOpts.ServerSideEncryption, DisableContentSha256: g.PutOpts.DisableContentSha256, } @@ -168,7 +168,7 @@ func (g *MultipartPut) uploadParts(ctx context.Context, thread uint16, objectNam return eg.Wait() } -func (g *MultipartPut) completeMultipartUpload(_ context.Context, objectName string, uploadID string) error { +func (g *MultipartPut) completeMultipartUpload(_ context.Context, objectName, uploadID string) error { // Non-terminating context. nonTerm := context.Background()