Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add tests for eventmachine consume
Signed-off-by: Pavel Kositsyn <[email protected]>
  • Loading branch information
pkositsyn authored and jpkrohling committed May 6, 2021
commit 6175cdac0378050b9912ca628a1b59e66511c8d4
30 changes: 20 additions & 10 deletions processor/groupbytraceprocessor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ type tracesWithID struct {
td pdata.Traces
}

// eventMachine is a simple machine that accepts events in a typically non-blocking manner,
// processing the events serially, to ensure that data at the consumer is consistent.
// eventMachine is a machine that accepts events in a typically non-blocking manner,
// processing the events serially per worker scope, to ensure that data at the consumer is consistent.
// Just like the machine itself is non-blocking, consumers are expected to also not block
// on the callbacks, otherwise, events might pile up. When enough events are piled up, firing an
// event will block until enough capacity is available to accept the events.
Expand Down Expand Up @@ -112,7 +112,7 @@ func newEventMachine(logger *zap.Logger, bufferSize int, numWorkers int, numTrac
}

func (em *eventMachine) startInBackground() {
em.start()
em.startWorkers()
go em.periodicMetrics()
}

Expand Down Expand Up @@ -141,7 +141,7 @@ func (em *eventMachine) periodicMetrics() {
})
}

func (em *eventMachine) start() {
func (em *eventMachine) startWorkers() {
for _, worker := range em.workers {
go worker.start()
}
Expand Down Expand Up @@ -220,6 +220,7 @@ func (em *eventMachine) handleEvent(e event, w *eventMachineWorker) {
}
}

// consume takes a single trace and routes it to one of the workers.
func (em *eventMachine) consume(td pdata.Traces) error {
traceID, err := getTraceID(td)
if err != nil {
Expand All @@ -228,21 +229,30 @@ func (em *eventMachine) consume(td pdata.Traces) error {

var bucket uint64
if len(em.workers) != 1 {
hash := hashPool.Get().(*maphash.Hash)
defer hashPool.Put(hash)

bytes := traceID.Bytes()
hash.Write(bytes[:])
bucket = hash.Sum64() % uint64(len(em.workers))
bucket = workerIndexForTraceID(traceID, len(em.workers))
}

em.logger.Debug("scheduled trace to worker", zap.Uint64("id", bucket))

em.workers[bucket].fire(event{
typ: traceReceived,
payload: tracesWithID{id: traceID, td: td},
})
return nil
}

func workerIndexForTraceID(traceID pdata.TraceID, numWorkers int) uint64 {
hash := hashPool.Get().(*maphash.Hash)
defer func() {
hash.Reset()
hashPool.Put(hash)
}()

bytes := traceID.Bytes()
hash.Write(bytes[:])
return hash.Sum64() % uint64(numWorkers)
}

func (em *eventMachine) shutdown() {
em.logger.Info("shutting down the event manager", zap.Int("pending-events", em.numEvents()))
em.shutdownLock.Lock()
Expand Down
112 changes: 107 additions & 5 deletions processor/groupbytraceprocessor/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package groupbytraceprocessor

import (
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -245,11 +246,114 @@ func TestEventUnknownType(t *testing.T) {
wg.Wait()
}

func TestEventTracePerWorker(t *testing.T) {
for _, tt := range []struct {
casename string
traceID [16]byte
errString string
}{
{
casename: "invalid traceID",
errString: "eventmachine consume failed:",
},

{
casename: "traceID 1",
traceID: [16]byte{1},
},

{
casename: "traceID 2",
traceID: [16]byte{2},
},

{
casename: "traceID 3",
traceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
},
} {
t.Run(tt.casename, func(t *testing.T) {
em := newEventMachine(logger, 200, 100, 1_000)

var wg sync.WaitGroup
var workerForTrace *eventMachineWorker
em.onTraceReceived = func(td tracesWithID, w *eventMachineWorker) error {
workerForTrace = w
w.fire(event{
typ: traceExpired,
payload: pdata.NewTraceID([16]byte{1}),
})
return nil
}
em.onTraceExpired = func(id pdata.TraceID, w *eventMachineWorker) error {
assert.Equal(t, workerForTrace, w)
wg.Done()
return nil
}
em.startInBackground()
defer em.shutdown()

td := pdata.NewTraces()
td.ResourceSpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
if tt.traceID != [16]byte{} {
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
span := pdata.NewSpan()
span.SetTraceID(pdata.NewTraceID(tt.traceID))
span.CopyTo(td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0))
}

// test
wg.Add(1)
err := em.consume(td)

// verify
if tt.errString == "" {
require.NoError(t, err)
} else {
wg.Done()
require.Truef(t, strings.HasPrefix(err.Error(), tt.errString), "error should have prefix %q", tt.errString)
}

wg.Wait()
})
}
}

func TestEventConsumeConsistency(t *testing.T) {
for _, tt := range []struct {
casename string
traceID [16]byte
}{
{
casename: "trace 1",
traceID: [16]byte{1, 2, 3, 4},
},

{
casename: "trace 2",
traceID: [16]byte{2, 3, 4, 5},
},
} {
t.Run(tt.casename, func(t *testing.T) {
realTraceID := workerIndexForTraceID(pdata.NewTraceID(tt.traceID), 100)
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 30; j++ {
assert.Equal(t, realTraceID, workerIndexForTraceID(pdata.NewTraceID(tt.traceID), 100))
}
}()
}
wg.Wait()
})
}
}

func TestEventShutdown(t *testing.T) {
// prepare
logger, err := zap.NewDevelopment()
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)

Expand Down Expand Up @@ -328,8 +432,6 @@ func TestPeriodicMetrics(t *testing.T) {
// try to be nice with the next consumer (test)
defer view.Unregister(views...)

logger, err := zap.NewDevelopment()
require.NoError(t, err)
em := newEventMachine(logger, 50, 1, 1_000)
em.metricsCollectionInterval = time.Millisecond

Expand Down
19 changes: 13 additions & 6 deletions processor/groupbytraceprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ import (
// groupByTraceProcessor is a processor that keeps traces in memory for a given duration, with the expectation
// that the trace will be complete once this duration expires. After the duration, the trace is sent to the next consumer.
// This processor uses a buffered event machine, which converts operations into events for non-blocking processing, but
// keeping all operations serialized. This ensures that we don't need locks but that the state is consistent across go routines.
// Each in-flight trace is registered with a go routine, which will be called after the given duration and dispatched to the event
// keeping all operations serialized per worker scope. This ensures that we don't need locks but that the state is consistent across go routines.
// Initially, all incoming batches are split into different traces and distributed among workers by a hash of traceID in eventMachine.consume method.
// Afterwards, the trace is registered with a go routine, which will be called after the given duration and dispatched to the event
// machine for further processing.
// The typical data flow looks like this:
// ConsumeTraces -> event(traceReceived) -> onTraceReceived -> AfterFunc(duration, event(traceExpired)) -> onTraceExpired
// ConsumeTraces -> eventMachine.consume(trace) -> event(traceReceived) -> onTraceReceived -> AfterFunc(duration, event(traceExpired)) -> onTraceExpired
// async markAsReleased -> event(traceReleased) -> onTraceReleased -> nextConsumer
// This processor uses also a ring buffer to hold the in-flight trace IDs, so that we don't hold more than the given maximum number
// Each worker in the eventMachine also uses a ring buffer to hold the in-flight trace IDs, so that we don't hold more than the given maximum number
// of traces in memory/storage. Items that are evicted from the buffer are discarded without warning.
type groupByTraceProcessor struct {
nextConsumer consumer.Traces
Expand All @@ -54,6 +55,8 @@ type groupByTraceProcessor struct {

var _ component.TracesProcessor = (*groupByTraceProcessor)(nil)

const bufferSize = 10_000

// newGroupByTraceProcessor returns a new processor.
func newGroupByTraceProcessor(logger *zap.Logger, st storage, nextConsumer consumer.Traces, config Config) *groupByTraceProcessor {
// the event machine will buffer up to N concurrent events before blocking
Expand Down Expand Up @@ -110,6 +113,8 @@ func (sp *groupByTraceProcessor) Shutdown(_ context.Context) error {
func (sp *groupByTraceProcessor) onTraceReceived(trace tracesWithID, worker *eventMachineWorker) error {
traceID := trace.id
if worker.buffer.contains(traceID) {
sp.logger.Debug("trace is already in memory storage")

// it exists in memory already, just append the spans to the trace in the storage
if err := sp.addSpans(traceID, trace.td); err != nil {
return fmt.Errorf("couldn't add spans to existing trace: %w", err)
Expand Down Expand Up @@ -210,8 +215,10 @@ func (sp *groupByTraceProcessor) onTraceReleased(rss []pdata.ResourceSpans) erro
for _, rs := range rss {
trace.ResourceSpans().Append(rs)
}
stats.Record(context.Background(), mReleasedSpans.M(int64(trace.SpanCount())))
stats.Record(context.Background(), mReleasedTraces.M(1))
stats.Record(context.Background(),
mReleasedSpans.M(int64(trace.SpanCount())),
mReleasedTraces.M(1),
)

// Do async consuming not to block event worker
go func() {
Expand Down
Loading