-
Notifications
You must be signed in to change notification settings - Fork 64
Description
Describe the bug
If a progressive call InvocationHandler (callee) function returns an error before it receives the final progressive message from the caller:
- The handler function is incorrectly called again after returning an error
- The callee
Clientcannot be closed because of a blocking write to a full chanresChan <- handler(ctx, msg)
To Reproduce
I wrote a test function that can be dropped into client_test.go. I'll include it at the end.
Expected behavior
According to the second example in Continuations on Completed Calls
Due to network delays, the Dealer may be unaware that the call is completed by time it sends another progressive INVOCATION.When a Callee receives an INVOCATION under the following conditions:
- the Callee supports Progressive Call Invocations,
- the INVOCATION request ID does not correspond to a new call request, and,
- the INVOCATION request ID does not match any RPC invocation in progress,
then it MUST ignore and discard that INVOCATION message without any further correlated response to the Dealer.
Emphasis my own, but because Nexus is handling the protocol, it should not call the handler function after the handler returns an error.
More importantly, it should not deadlock when the handler returns an error before the caller is done sending progressive invocations. The diagram that follows the above quoted text clearly shows this is legal.
Environment (please complete the following information):
- OS: Linux, Ubuntu 22.04
- Nexus Router version: branch
v3HEAD5cfa511313807ad6802ff4b5a2b738d18507ef23 - Name and version of WAMP Client library (if applicable): ^
Additional context
Test:
// Tests that a callee can return error while caller IsInProgress
func TestProgressiveCallInvocationCalleeError(t *testing.T) {
// Connect two clients to the same server
callee, caller, rooter := connectedTestClients(t)
const forcedError = wamp.URI("error.forced")
moreArgsSent := make(chan struct{}, 1)
errorRaised := false
invocationHandler := func(ctx context.Context, inv *wamp.Invocation) InvokeResult {
switch inv.Arguments[0].(int) {
case 1:
// Eat the first arg
t.Log("n=1 Returning OmitResult")
return InvokeResult{Err: wamp.InternalProgressiveOmitResult}
case 2:
t.Log("n=2 Waiting for moreArgsSent")
// Wait till the 4th arg is sent which means 3 should already
// be waiting
<-moreArgsSent
time.Sleep(100 * time.Millisecond)
errorRaised = true
t.Log("n=2 Returning error (as expected)")
// Error
return InvokeResult{Err: forcedError}
default:
// BUG: The handler function should never be called again
t.Error("Handler should not have been called after error returned")
return InvokeResult{Err: forcedError}
}
}
const procName = "nexus.test.progprocerr"
// Register procedure
err := callee.Register(procName, invocationHandler, nil)
require.NoError(t, err)
// Test calling the procedure.
callArgs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
ctx := context.Background()
callSends := 0
sendProgDataCb := func(ctx context.Context) (options wamp.Dict, args wamp.List, kwargs wamp.Dict, err error) {
options = wamp.Dict{}
if callSends == (len(callArgs) - 1) {
options[wamp.OptProgress] = false
} else {
options[wamp.OptProgress] = true
}
args = wamp.List{callArgs[callSends]}
callSends++
// signal the handler should return its error
if 4 == callSends {
moreArgsSent <- struct{}{}
}
t.Logf("Sending n=%v", callSends)
return options, args, nil, nil
}
result, err := caller.CallProgressive(ctx, procName, sendProgDataCb, nil)
require.Error(t, err, "Expected call to return an error")
require.Nil(t, result, "Expected call to return no result")
var rErr RPCError
if errors.As(err, &rErr) {
require.Equal(t, forcedError, rErr.Err.Error, "Unexpected error URI")
} else {
t.Error("Unexpected error type")
}
require.GreaterOrEqual(t, callSends, 4)
require.True(t, errorRaised, "Error was never raised in handler")
// Test unregister.
err = callee.Unregister(procName)
// require.NoError(t, err)
// BUG: Show deadlock
t.Log("Closing rooter")
rooter.Close()
t.Log("Closing caller")
caller.Close()
goleak.VerifyNone(t)
t.Log("Closing callee")
// BUG: We never get past here
callee.Close()
t.Log("All closed")
goleak.VerifyNone(t)
t.Log("Done")
}Terminal output:
=== RUN TestProgressiveCallInvocationCalleeError
2025/01/25 01:56:38 Starting router
2025/01/25 01:56:38 Added realm: nexus.test
client_test.go:766: Sending n=1
client_test.go:766: Sending n=2
client_test.go:766: Sending n=3
client_test.go:766: Sending n=4
client_test.go:766: Sending n=5
client_test.go:766: Sending n=6
client_test.go:766: Sending n=7
client_test.go:766: Sending n=8
client_test.go:766: Sending n=9
client_test.go:720: n=1 Returning OmitResult
client_test.go:723: n=2 Waiting for moreArgsSent
client_test.go:766: Sending n=10
client_test.go:729: n=2 Returning error (as expected)
client_test.go:734: Handler should not have been called after error returned
client_test.go:734: Handler should not have been called after error returned
client_test.go:788: Closing rooter
2025/01/25 01:56:38 Realm nexus.test completed shutdown
2025/01/25 01:56:38 Router stopped
client_test.go:790: Closing caller
client_test.go:792: found unexpected goroutines:
[Goroutine 28 in state chan send, with github.com/gammazero/nexus/v3/client.(*Client).runHandleInvocation on top of the stack:
goroutine 28 [chan send]:
github.com/gammazero/nexus/v3/client.(*Client).runHandleInvocation(0xc0001484e0, 0xc000166300)
/home/user/git/nexus/client/client.go:1644 +0xd6d
github.com/gammazero/nexus/v3/client.(*Client).runReceiveFromRouter(0xc0001484e0, {0x8e11c0?, 0xc000166300?})
/home/user/git/nexus/client/client.go:1453 +0x13e
github.com/gammazero/nexus/v3/client.(*Client).run(0xc0001484e0)
/home/user/git/nexus/client/client.go:1428 +0x1c5
created by github.com/gammazero/nexus/v3/client.NewClient in goroutine 19
/home/user/git/nexus/client/client.go:262 +0x509
Goroutine 35 in state chan send, with github.com/gammazero/nexus/v3/client.(*Client).runHandleInvocation.func1.1 on top of the stack:
goroutine 35 [chan send]:
github.com/gammazero/nexus/v3/client.(*Client).runHandleInvocation.func1.1()
/home/user/git/nexus/client/client.go:1669 +0xfd
created by github.com/gammazero/nexus/v3/client.(*Client).runHandleInvocation.func1 in goroutine 34
/home/user/git/nexus/client/client.go:1655 +0x18a
]
client_test.go:793: Closing callee
// process doesn't exit
I'll likely put up a PR in the coming days, but if a maintainer wants to tackle this, just please let me know.