Skip to content
This repository was archived by the owner on Jan 30, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/barrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestBarrier(t *testing.T) {

barrier := NewBarrier()
barrier.AddFrameNavigation(frame)
frame.emit(EventFrameNavigation, "some data")
frame.emit(log, EventFrameNavigation, "some data")

err := barrier.Wait(ctx)
require.Nil(t, err)
Expand Down
4 changes: 2 additions & 2 deletions common/browser.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (b *Browser) getPages() []*Page {
func (b *Browser) initEvents() error {
var cancelCtx context.Context
cancelCtx, b.evCancelFn = context.WithCancel(b.ctx)
chHandler := make(chan Event)
chHandler := make(chan Event, EventListenerDefaultChanBufferSize)

b.conn.on(cancelCtx, []string{
cdproto.EventTargetAttachedToTarget,
Expand Down Expand Up @@ -313,7 +313,7 @@ func (b *Browser) onAttachedToTarget(ev *target.EventAttachedToTarget) {
b.sessionIDtoTargetID[ev.SessionID] = evti.TargetID
b.sessionIDtoTargetIDMu.Unlock()

browserCtx.emit(EventBrowserContextPage, p)
browserCtx.emit(b.logger, EventBrowserContextPage, p)
default:
b.logger.Warnf(
"Browser:onAttachedToTarget", "sid:%v tid:%v bctxid:%v bctx nil:%t, unknown target type: %q",
Expand Down
2 changes: 1 addition & 1 deletion common/browser_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (b *BrowserContext) WaitForEvent(event string, optsOrPredicate goja.Value)
}

evCancelCtx, evCancelFn := context.WithCancel(b.ctx)
chEvHandler := make(chan Event)
chEvHandler := make(chan Event, EventListenerDefaultChanBufferSize)
ch := make(chan interface{})

go func() {
Expand Down
4 changes: 3 additions & 1 deletion common/browser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func TestBrowserNewPageInContext(t *testing.T) {
// newPageInContext will return this page by searching it by its targetID in the wait event handler.
tc.b.pages[targetID] = &Page{targetID: targetID}

log := log.NewNullLogger()

tc.b.conn = fakeConn{
execute: func(
ctx context.Context, method string, params easyjson.Marshaler, res easyjson.Unmarshaler,
Expand All @@ -68,7 +70,7 @@ func TestBrowserNewPageInContext(t *testing.T) {
// EventBrowserContextPage to be fired. this normally happens when the browser's
// onAttachedToTarget event is fired. here, we imitate as if the browser created a target for
// the page.
tc.bc.emit(EventBrowserContextPage, &Page{targetID: targetID})
tc.bc.emit(log, EventBrowserContextPage, &Page{targetID: targetID})

return nil
},
Expand Down
8 changes: 4 additions & 4 deletions common/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (c *Connection) close(code int) error {
}
c.sessionsMu.Unlock()

c.emit(EventConnectionClose, nil)
c.emit(c.logger, EventConnectionClose, nil)
})

return err
Expand Down Expand Up @@ -368,11 +368,11 @@ func (c *Connection) recvLoop() {
c.logger.Errorf("cdp", "%s", err)
continue
}
c.emit(string(msg.Method), ev)
c.emit(c.logger, string(msg.Method), ev)

case msg.ID != 0:
c.logger.Debugf("Connection:recvLoop:msg.ID:emit", "sid:%v method:%q", msg.SessionID, msg.Method)
c.emit("", &msg)
c.emit(c.logger, "", &msg)

default:
c.logger.Errorf("cdp", "ignoring malformed incoming message (missing id or method): %#v (message: %s)", msg, msg.Error.Message)
Expand Down Expand Up @@ -505,7 +505,7 @@ func (c *Connection) Execute(ctx context.Context, method string, params easyjson
// Setup event handler used to block for response to message being sent.
ch := make(chan *cdproto.Message, 1)
evCancelCtx, evCancelFn := context.WithCancel(ctx)
chEvHandler := make(chan Event)
chEvHandler := make(chan Event, EventListenerDefaultChanBufferSize)
go func() {
for {
select {
Expand Down
31 changes: 24 additions & 7 deletions common/event_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ package common

import (
"context"
"time"

"github.com/grafana/xk6-browser/log"
)

// Ensure BaseEventEmitter implements the EventEmitter interface.
Expand Down Expand Up @@ -76,6 +79,10 @@ const (
// Worker

EventWorkerClose string = "close"

// Event listener.

EventListenerDefaultChanBufferSize int64 = 10
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not have to work with buffered channels, but the issue is with handlers which not only listen and handle incoming events, they can also emit events too. Without the buffered channel this would deadlock 😭

Copy link
Collaborator

@inancgumus inancgumus Sep 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it 10?

I'm asking this because, technically, a buffered channel is the same as an unbuffered channel—Only the prior will deadlock when it's full.

Copy link
Collaborator Author

@ankur22 ankur22 Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 10 was a just a arbitrary number.

I was thinking about this, and it might be better to look at trying to prevent the handler(s) from emitting and receiving event(s), so that we don't need the buffered channels (and leave it as unbuffered). IMO, it doesn't feel right for a handler to be doing two jobs, and should concentrate on handling the events and not emitting new ones, but I need to look into this a bit more (which could take some time to make sense of).

If it's too much work for the handler to not emit, then maybe we need to allow for async emitting (i.e. another buffered channel on the receiving end of the emit).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Inanc. If we're relying on a buffered channel to avoid the deadlock issue, then this change merely alleviates the problem during low activity. On complex sites with many concurrent events we would probably run into it again once the buffer is full.

Have you tested with a complex script like the Vistaprint one from #381? It will need adjusting to the recent breaking changes.

)

// Event as emitted by an EventEmiter.
Expand Down Expand Up @@ -103,7 +110,7 @@ type eventHandler struct {

// EventEmitter that all event emitters need to implement.
type EventEmitter interface {
emit(event string, data interface{})
emit(logger *log.Logger, event string, data interface{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not change the emit() signature for this. The logger is irrelevant to the event and data being emitted.

Instead, I think it would be cleaner if we add a logger argument to NewBaseEventEmitter and use it in emit() instead. It's not the same thing, as it would be using the event emitter's logger instead of the emit() caller's, but do we need that distinction?

on(ctx context.Context, events []string, ch chan Event)
onAll(ctx context.Context, ch chan Event)
}
Expand Down Expand Up @@ -164,12 +171,22 @@ func (e *BaseEventEmitter) sync(fn func()) {
<-done
}

func (e *BaseEventEmitter) emit(event string, data interface{}) {
func (e *BaseEventEmitter) emit(logger *log.Logger, event string, data interface{}) {
emitEvent := func(eh eventHandler) {
select {
case eh.ch <- Event{event, data}:
case <-eh.ctx.Done():
// TODO: handle the error
for {
select {
case eh.ch <- Event{event, data}:
return
case <-eh.ctx.Done():
// TODO: handle the error
return
case <-time.After(time.Second):
// If this is printed then a handler is deadlocked on something
// and the channel (which should be buffered) is full. It might
// be that by increasing the buffer size on the channel will help
// alleviate the problem.
logger.Error("emit timed out waiting for handler to read message")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we then cut this short here (return without for)? Now it retries, is it expected?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i'll add a return if we're all happy with this change -- it's currently useful as a tool to debug an issue with pprof 🙂

}
}
}
emitTo := func(handlers []eventHandler) (updated []eventHandler) {
Expand All @@ -180,7 +197,7 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) {
handlers = append(handlers[:i], handlers[i+1:]...)
continue
default:
go emitEvent(handler)
emitEvent(handler)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bye bye goroutine -- this helps keep the order of incoming events.

i++
}
}
Expand Down
14 changes: 10 additions & 4 deletions common/event_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

"github.com/chromedp/cdproto"
"github.com/stretchr/testify/require"

"github.com/grafana/xk6-browser/log"
)

func TestEventEmitterSpecificEvent(t *testing.T) {
Expand All @@ -51,10 +53,11 @@ func TestEventEmitterSpecificEvent(t *testing.T) {
cancelCtx, cancelFn := context.WithCancel(ctx)
emitter := NewBaseEventEmitter(cancelCtx)
ch := make(chan Event)
log := log.NewNullLogger()

emitter.on(cancelCtx, []string{cdproto.EventTargetTargetCreated}, ch)
cancelFn()
emitter.emit(cdproto.EventTargetTargetCreated, nil) // Event handlers are removed as part of event emission
emitter.emit(log, cdproto.EventTargetTargetCreated, nil) // Event handlers are removed as part of event emission

emitter.sync(func() {
require.Contains(t, emitter.handlers, cdproto.EventTargetTargetCreated)
Expand All @@ -66,9 +69,10 @@ func TestEventEmitterSpecificEvent(t *testing.T) {
ctx := context.Background()
emitter := NewBaseEventEmitter(ctx)
ch := make(chan Event, 1)
log := log.NewNullLogger()

emitter.on(ctx, []string{cdproto.EventTargetTargetCreated}, ch)
emitter.emit(cdproto.EventTargetTargetCreated, "hello world")
emitter.emit(log, cdproto.EventTargetTargetCreated, "hello world")
msg := <-ch

emitter.sync(func() {
Expand Down Expand Up @@ -100,10 +104,11 @@ func TestEventEmitterAllEvents(t *testing.T) {
emitter := NewBaseEventEmitter(ctx)
cancelCtx, cancelFn := context.WithCancel(ctx)
ch := make(chan Event)
log := log.NewNullLogger()

emitter.onAll(cancelCtx, ch)
cancelFn()
emitter.emit(cdproto.EventTargetTargetCreated, nil) // Event handlers are removed as part of event emission
emitter.emit(log, cdproto.EventTargetTargetCreated, nil) // Event handlers are removed as part of event emission

emitter.sync(func() {
require.Len(t, emitter.handlersAll, 0)
Expand All @@ -114,9 +119,10 @@ func TestEventEmitterAllEvents(t *testing.T) {
ctx := context.Background()
emitter := NewBaseEventEmitter(ctx)
ch := make(chan Event, 1)
log := log.NewNullLogger()

emitter.onAll(ctx, ch)
emitter.emit(cdproto.EventTargetTargetCreated, "hello world")
emitter.emit(log, cdproto.EventTargetTargetCreated, "hello world")
msg := <-ch

emitter.sync(func() {
Expand Down
10 changes: 5 additions & 5 deletions common/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,16 @@ func (f *Frame) recalculateLifecycle() {
if f.hasSubtreeLifecycleEventFired(k) {
continue
}
f.emit(EventFrameAddLifecycle, k)
f.emit(f.log, EventFrameAddLifecycle, k)

if f != mainFrame {
continue
}
switch k {
case LifecycleEventLoad:
f.page.emit(EventPageLoad, nil)
f.page.emit(f.log, EventPageLoad, nil)
case LifecycleEventDOMContentLoad:
f.page.emit(EventPageDOMContentLoaded, nil)
f.page.emit(f.log, EventPageDOMContentLoaded, nil)
}
}

Expand All @@ -242,7 +242,7 @@ func (f *Frame) recalculateLifecycle() {
{
for k := range f.subtreeLifecycleEvents {
if ok := events[k]; !ok {
f.emit(EventFrameRemoveLifecycle, k)
f.emit(f.log, EventFrameRemoveLifecycle, k)
}
}
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func (f *Frame) navigated(name string, url string, loaderID string) {
f.name = name
f.url = url
f.loaderID = loaderID
f.page.emit(EventPageFrameNavigated, f)
f.page.emit(f.log, EventPageFrameNavigated, f)
}

func (f *Frame) nullContext(execCtxID runtime.ExecutionContextID) {
Expand Down
18 changes: 9 additions & 9 deletions common/frame_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (m *FrameManager) frameAbortedNavigation(frameID cdp.FrameID, errorText, do
err: errors.New(errorText),
}
frame.pendingDocument = nil
frame.emit(EventFrameNavigation, ne)
frame.emit(m.logger, EventFrameNavigation, ne)
}

func (m *FrameManager) frameAttached(frameID cdp.FrameID, parentFrameID cdp.FrameID) {
Expand All @@ -183,7 +183,7 @@ func (m *FrameManager) frameAttached(frameID cdp.FrameID, parentFrameID cdp.Fram
m.logger.Debugf("FrameManager:frameAttached:emit:EventPageFrameAttached",
"fmid:%d fid:%v pfid:%v", m.ID(), frameID, parentFrameID)

m.page.emit(EventPageFrameAttached, frame)
m.page.emit(m.logger, EventPageFrameAttached, frame)
}
}

Expand Down Expand Up @@ -325,7 +325,7 @@ func (m *FrameManager) frameNavigated(frameID cdp.FrameID, parentFrameID cdp.Fra
m.ID(), frameID, parentFrameID, documentID, name, url, initial, documentID)

frame.clearLifecycle()
frame.emit(EventFrameNavigation, &NavigationEvent{url: url, name: name, newDocument: frame.currentDocument})
frame.emit(m.logger, EventFrameNavigation, &NavigationEvent{url: url, name: name, newDocument: frame.currentDocument})

// TODO: when we add API support for storage we need to track origins
// if !initial {
Expand Down Expand Up @@ -357,7 +357,7 @@ func (m *FrameManager) frameNavigatedWithinDocument(frameID cdp.FrameID, url str
"fmid:%d fid:%v furl:%s url:%s", m.ID(), frameID, frame.URL(), url)

frame.setURL(url)
frame.emit(EventFrameNavigation, &NavigationEvent{url: url, name: frame.Name()})
frame.emit(m.logger, EventFrameNavigation, &NavigationEvent{url: url, name: frame.Name()})
}

func (m *FrameManager) frameRequestedNavigation(frameID cdp.FrameID, url string, documentID string) error {
Expand Down Expand Up @@ -438,15 +438,15 @@ func (m *FrameManager) removeFramesRecursively(frame *Frame) {
"fmid:%d fid:%v fname:%s furl:%s",
m.ID(), frame.ID(), frame.Name(), frame.URL())

m.page.emit(EventPageFrameDetached, frame)
m.page.emit(m.logger, EventPageFrameDetached, frame)
}
}

func (m *FrameManager) requestFailed(req *Request, canceled bool) {
m.logger.Debugf("FrameManager:requestFailed", "fmid:%d rurl:%s", m.ID(), req.URL())

delete(m.inflightRequests, req.getID())
defer m.page.emit(EventPageRequestFailed, req)
defer m.page.emit(m.logger, EventPageRequestFailed, req)

frame := req.getFrame()
if frame == nil {
Expand Down Expand Up @@ -485,7 +485,7 @@ func (m *FrameManager) requestFinished(req *Request) {
m.ID(), req.URL())

delete(m.inflightRequests, req.getID())
defer m.page.emit(EventPageRequestFinished, req)
defer m.page.emit(m.logger, EventPageRequestFinished, req)

frame := req.getFrame()
if frame == nil {
Expand All @@ -509,15 +509,15 @@ func (m *FrameManager) requestFinished(req *Request) {
func (m *FrameManager) requestReceivedResponse(res *Response) {
m.logger.Debugf("FrameManager:requestReceivedResponse", "fmid:%d rurl:%s", m.ID(), res.URL())

m.page.emit(EventPageResponse, res)
m.page.emit(m.logger, EventPageResponse, res)
}

func (m *FrameManager) requestStarted(req *Request) {
m.logger.Debugf("FrameManager:requestStarted", "fmid:%d rurl:%s", m.ID(), req.URL())

m.framesMu.Lock()
defer m.framesMu.Unlock()
defer m.page.emit(EventPageRequest, req)
defer m.page.emit(m.logger, EventPageRequest, req)

m.inflightRequests[req.getID()] = true
frame := req.getFrame()
Expand Down
4 changes: 2 additions & 2 deletions common/frame_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewFrameSession(
contextIDToContextMu: sync.Mutex{},
contextIDToContext: make(map[cdpruntime.ExecutionContextID]*ExecutionContext),
isolatedWorlds: make(map[string]bool),
eventCh: make(chan Event),
eventCh: make(chan Event, EventListenerDefaultChanBufferSize),
childSessions: make(map[cdp.FrameID]*FrameSession),
vu: k6ext.GetVU(ctx),
k6Metrics: k6ext.GetCustomMetrics(ctx),
Expand Down Expand Up @@ -543,7 +543,7 @@ func (fs *FrameSession) onConsoleAPICalled(event *cdpruntime.EventConsoleAPICall
}

func (fs *FrameSession) onExceptionThrown(event *cdpruntime.EventExceptionThrown) {
fs.page.emit(EventPageError, event.ExceptionDetails)
fs.page.emit(fs.logger, EventPageError, event.ExceptionDetails)
}

func (fs *FrameSession) onExecutionContextCreated(event *cdpruntime.EventExecutionContextCreated) {
Expand Down
2 changes: 1 addition & 1 deletion common/frame_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestFrameManagerFrameAbortedNavigationShouldEmitANonNilPendingDocument(t *t
fm.frames[frame.id] = frame

// listen for frame navigation events
recv := make(chan Event)
recv := make(chan Event, EventListenerDefaultChanBufferSize)
frame.on(ctx, []string{EventFrameNavigation}, recv)

// emit the navigation event
Expand Down
4 changes: 2 additions & 2 deletions common/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func createWaitForEventHandler(
chan interface{}, context.CancelFunc,
) {
evCancelCtx, evCancelFn := context.WithCancel(ctx)
chEvHandler := make(chan Event)
chEvHandler := make(chan Event, EventListenerDefaultChanBufferSize)
ch := make(chan interface{})

go func() {
Expand Down Expand Up @@ -193,7 +193,7 @@ func createWaitForEventPredicateHandler(
chan interface{}, context.CancelFunc,
) {
evCancelCtx, evCancelFn := context.WithCancel(ctx)
chEvHandler := make(chan Event)
chEvHandler := make(chan Event, EventListenerDefaultChanBufferSize)
ch := make(chan interface{})

go func() {
Expand Down
6 changes: 3 additions & 3 deletions common/network_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ func (m *NetworkManager) handleRequestRedirect(req *Request, redirectResponse *n
delete(m.attemptedAuth, req.interceptionID);
*/

m.emit(cdproto.EventNetworkResponseReceived, resp)
m.emit(cdproto.EventNetworkLoadingFinished, req)
m.emit(m.logger, cdproto.EventNetworkResponseReceived, resp)
m.emit(m.logger, cdproto.EventNetworkLoadingFinished, req)
}

func (m *NetworkManager) initDomains() error {
Expand All @@ -340,7 +340,7 @@ func (m *NetworkManager) initDomains() error {
}

func (m *NetworkManager) initEvents() {
chHandler := make(chan Event)
chHandler := make(chan Event, EventListenerDefaultChanBufferSize)
m.session.on(m.ctx, []string{
cdproto.EventNetworkLoadingFailed,
cdproto.EventNetworkLoadingFinished,
Expand Down
Loading