Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func mainAnalyze(ctx *cli.Context) error {
err := bench.StreamOperationsFromCSV(rc, false, ctx.Int("analyze.offset"), ctx.Int("analyze.limit"), log, opCh)
fatalIf(probe.NewError(err), "Unable to parse input")
}()
final = *aggregate.Live(opCh, nil, "")
final = *aggregate.Live(opCh, nil, "", nil)
}
rep := final.Report(aggregate.ReportOptions{
Details: true,
Expand Down
9 changes: 3 additions & 6 deletions cli/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,19 +533,16 @@ func addCollector(ctx *cli.Context, b bench.Benchmark) (bench.OpsCollector, chan

if !ctx.Bool("full") {
updates := make(chan aggregate.UpdateReq, 1000)
c := aggregate.LiveCollector(context.Background(), updates, pRandASCII(4))
c.AddOutput(common.ExtraOut...)
c := aggregate.LiveCollector(context.Background(), updates, pRandASCII(4), common.ExtraOut)
common.Collector = c
return bench.EmptyOpsCollector, updates
}
if common.DiscardOutput {
common.Collector = bench.NewNullCollector()
common.Collector.AddOutput(common.ExtraOut...)
common.Collector = bench.NewNullCollector(common.ExtraOut...)
return bench.EmptyOpsCollector, nil
}
var retrieveOps bench.OpsCollector
common.Collector, retrieveOps = bench.NewOpsCollector()
common.Collector.AddOutput(common.ExtraOut...)
common.Collector, retrieveOps = bench.NewOpsCollector(common.ExtraOut...)
return retrieveOps, nil
}

Expand Down
2 changes: 1 addition & 1 deletion cli/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,5 @@ func parseChecksum(ctx *cli.Context) (useMD5 bool, ct minio.ChecksumType) {
}
}
}
return
return useMD5, ct
}
9 changes: 2 additions & 7 deletions pkg/aggregate/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

// LiveCollector return a collector, and a channel that will return the
// current aggregate on the channel whenever it is requested.
func LiveCollector(ctx context.Context, updates chan UpdateReq, clientID string) bench.Collector {
func LiveCollector(ctx context.Context, updates chan UpdateReq, clientID string, extra []chan<- bench.Operation) bench.Collector {
c := collector{
rcv: make(chan bench.Operation, 1000),
}
Expand All @@ -39,7 +39,7 @@ func LiveCollector(ctx context.Context, updates chan UpdateReq, clientID string)
}
c.updates = updates
go func() {
final := Live(c.rcv, updates, clientID)
final := Live(c.rcv, updates, clientID, extra)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -67,7 +67,6 @@ type UpdateReq struct {
type collector struct {
mu sync.Mutex
rcv chan bench.Operation
extra []chan<- bench.Operation
updates chan<- UpdateReq
doneFn []context.CancelFunc
}
Expand Down Expand Up @@ -157,10 +156,6 @@ func (c *collector) Receiver() chan<- bench.Operation {
return c.rcv
}

func (c *collector) AddOutput(operations ...chan<- bench.Operation) {
c.extra = append(c.extra, operations...)
}

func (c *collector) Close() {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
16 changes: 14 additions & 2 deletions pkg/aggregate/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,13 +555,18 @@ func newRealTime() Realtime {
}

// Live collects operations and update requests.
func Live(ops <-chan bench.Operation, updates chan UpdateReq, clientID string) *Realtime {
func Live(ops <-chan bench.Operation, updates chan UpdateReq, clientID string, extra []chan<- bench.Operation) *Realtime {
a := newRealTime()
var reset atomic.Bool
var update atomic.Pointer[Realtime]
if updates != nil {
done := make(chan struct{})
defer close(done)
defer func() {
close(done)
for _, c := range extra {
close(c)
}
}()
go func() {
var finalQ []UpdateReq
defer func() {
Expand Down Expand Up @@ -608,6 +613,13 @@ func Live(ops <-chan bench.Operation, updates chan UpdateReq, clientID string) *
op.ClientID = clientID
}
var wg sync.WaitGroup
wg.Add(len(extra))
for _, c := range extra {
go func() {
c <- op
wg.Done()
}()
}
wg.Add(6)
// 1
go func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/bench/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (s Segment) SpeedPerSec() (mib, ops, objs float64) {
mib = float64(s.TotalBytes) / (1024 * 1024) / scale
ops = float64(s.OpsEnded) / scale
objs = s.Objects / scale
return
return mib, ops, objs
}

// Print segments to a supplied writer.
Expand Down
25 changes: 10 additions & 15 deletions pkg/bench/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ type Collector interface {
// Receiver returns the receiver of input
Receiver() chan<- Operation

// AddOutput allows to add additional inputs.
AddOutput(...chan<- Operation)

// Close the collector
Close()
}
Expand All @@ -61,16 +58,17 @@ type collector struct {

// NewOpsCollector returns a collector that will collect all operations in memory.
// After calling Close the returned function can be used to retrieve the operations.
func NewOpsCollector() (Collector, OpsCollector) {
func NewOpsCollector(extra ...chan<- Operation) (Collector, OpsCollector) {
r := &collector{
ops: make(Operations, 0, 10000),
rcv: make(chan Operation, 1000),
ops: make(Operations, 0, 10000),
rcv: make(chan Operation, 1000),
extra: extra,
}
r.rcvWg.Add(1)
go func() {
defer r.rcvWg.Done()
for op := range r.rcv {
for _, ch := range r.extra {
for _, ch := range extra {
ch <- op
}
r.opsMu.Lock()
Expand All @@ -85,16 +83,17 @@ func NewOpsCollector() (Collector, OpsCollector) {
}

// NewNullCollector collects operations, but discards them.
func NewNullCollector() Collector {
func NewNullCollector(extra ...chan<- Operation) Collector {
r := &collector{
ops: make(Operations, 0),
rcv: make(chan Operation, 1000),
ops: make(Operations, 0),
rcv: make(chan Operation, 1000),
extra: extra,
}
r.rcvWg.Add(1)
go func() {
defer r.rcvWg.Done()
for op := range r.rcv {
for _, ch := range r.extra {
for _, ch := range extra {
ch <- op
}
}
Expand Down Expand Up @@ -193,7 +192,3 @@ func (c *collector) Close() {
c.extra = nil
}
}

func (c *collector) AddOutput(x ...chan<- Operation) {
c.extra = append(c.extra, x...)
}
22 changes: 11 additions & 11 deletions pkg/bench/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ func (o Operation) Aggregate(s *Segment) (done bool) {
}
done = false
if len(s.OpType) > 0 && o.OpType != s.OpType {
return
return done
}
if o.End.Before(s.Start) {
return
return done
}
startedInSegment := o.Start.After(s.Start) || o.Start.Equal(s.Start)
endedInSegment := o.End.Before(s.EndsBefore)
Expand All @@ -116,7 +116,7 @@ func (o Operation) Aggregate(s *Segment) (done bool) {
if startedInSegment && endedInSegment {
if len(o.Err) != 0 {
s.Errors++
return
return done
}
// We are completely within segment.
s.TotalBytes += o.Size
Expand All @@ -126,23 +126,23 @@ func (o Operation) Aggregate(s *Segment) (done bool) {
s.ObjsPerOp = o.ObjPerOp
s.Objects += float64(o.ObjPerOp)
s.ReqAvg += float64(o.End.Sub(o.Start)) / float64(time.Millisecond)
return
return done
}
// Operation partially within segment.
s.PartialOps++
if startedInSegment {
s.OpsStarted++
if len(o.Err) != 0 {
// Errors are only counted in segments they ends in.
return
return done
}

}
if endedInSegment {
s.OpsEnded++
if len(o.Err) != 0 {
s.Errors++
return
return done
}
s.ReqAvg += float64(o.End.Sub(o.Start)) / float64(time.Millisecond)
}
Expand Down Expand Up @@ -759,7 +759,7 @@ func (o Operations) Duration() time.Duration {
// TimeRange returns the full time range from start of first operation to end of the last.
func (o Operations) TimeRange() (start, end time.Time) {
if len(o) == 0 {
return
return start, end
}
start = o[0].Start
end = o[0].End
Expand All @@ -771,7 +771,7 @@ func (o Operations) TimeRange() (start, end time.Time) {
end = op.End
}
}
return
return start, end
}

// ActiveTimeRange returns the "active" time range.
Expand All @@ -780,7 +780,7 @@ func (o Operations) TimeRange() (start, end time.Time) {
// If there is no active time range both values will be the same.
func (o Operations) ActiveTimeRange(allThreads bool) (start, end time.Time) {
if len(o) == 0 {
return
return start, end
}
// Only discard one.
if !allThreads {
Expand Down Expand Up @@ -808,7 +808,7 @@ func (o Operations) ActiveTimeRange(allThreads bool) (start, end time.Time) {
return start, start
}

return
return start, end
}
threads := o.Threads()
firstEnded := make(map[uint16]time.Time, threads)
Expand Down Expand Up @@ -840,7 +840,7 @@ func (o Operations) ActiveTimeRange(allThreads bool) (start, end time.Time) {
if start.After(end) {
return start, start
}
return
return start, end
}

// Threads returns the number of threads found.
Expand Down
Loading