Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions internal/app/connectconformance/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,26 +170,25 @@ func (r *testResults) assert(
errs = append(errs, checkError(expected.Error, actual.Error)...)
errs = append(errs, checkPayloads(expected.Payloads, actual.Payloads)...)

// TODO - This check is for trailers-only and should really only apply to gRPC and gRPC-Web protocols.
// Previously, it checked for error != nil, which is compliant with gRPC. But gRPC-Web does trailers-only
// responses with no errors also.
if len(expected.Payloads) == 0 {
// When there are no messages in the body, the server may send a
// trailers-only response. In that case, it is acceptable for the expected
// headers and trailers to be merged into one set, and it is acceptable for the
// client to interpret them as either headers or trailers.
if len(expected.Payloads) == 0 &&
expected.Error != nil &&
(definition.Request.StreamType == conformancev1.StreamType_STREAM_TYPE_UNARY ||
definition.Request.StreamType == conformancev1.StreamType_STREAM_TYPE_CLIENT_STREAM) {
// For unary and client-stream operations, a server API may not provide a way to
// set headers and trailers separately on error but instead a way to return an
// error with embedded metadata. In that case, we may not be able to distinguish
// headers from trailers in the response -- they get unified into a single bag of
// "error metadata". The conformance client should record those as trailers when
// sending back a ClientResponseResult message.

// So first we see if normal attribute succeeds
metadataErrs := checkHeaders("response headers", expected.ResponseHeaders, actual.ResponseHeaders)
metadataErrs = append(metadataErrs, checkHeaders("response trailers", expected.ResponseTrailers, actual.ResponseTrailers)...)
if len(metadataErrs) > 0 {
// That did not work. So we test to see if client attributed them all as headers
// or all as trailers.
// That did not work. So we test to see if client attributed them all as trailers.
merged := mergeHeaders(expected.ResponseHeaders, expected.ResponseTrailers)
allHeadersErrs := checkHeaders("response metadata", merged, actual.ResponseHeaders)
allTrailersErrs := checkHeaders("response metadata", merged, actual.ResponseTrailers)
if len(allHeadersErrs) != 0 && len(allTrailersErrs) != 0 {
// These checks failed also. So the received headers/trailers are incorrect.
if allTrailersErrs := checkHeaders("response metadata", merged, actual.ResponseTrailers); len(allTrailersErrs) != 0 {
// That check failed also. So the received headers/trailers are incorrect.
// Report the original errors computed above.
errs = append(errs, metadataErrs...)
}
Expand Down
38 changes: 13 additions & 25 deletions internal/app/connectconformance/results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,20 @@ func TestResults_Assert(t *testing.T) {
{Data: []byte{0, 1, 2, 3, 4}},
},
}
testCase1 := &conformancev1.TestCase{ExpectedResponse: payload1}
testCase1 := &conformancev1.TestCase{
Request: &conformancev1.ClientCompatRequest{TestName: "abc1"},
ExpectedResponse: payload1,
}
payload2 := &conformancev1.ClientResponseResult{
Error: &conformancev1.Error{
Code: conformancev1.Code_CODE_ABORTED,
Message: proto.String("oops"),
},
}
testCase2 := &conformancev1.TestCase{ExpectedResponse: payload2}
testCase2 := &conformancev1.TestCase{
Request: &conformancev1.ClientCompatRequest{TestName: "abc2"},
ExpectedResponse: payload2,
}
results.assert("foo/bar/1", testCase1, payload2)
results.assert("foo/bar/2", testCase2, payload1)
results.assert("foo/bar/3", testCase1, payload1)
Expand Down Expand Up @@ -288,28 +294,7 @@ func TestResults_Assert_ReportsAllErrors(t *testing.T) {
},
},
{
name: "response meta misattributed allowed for trailers-only response (all in headers)",
expected: `{
"error": {"code": 5},
"response_headers": [
{"name": "abc", "value": ["xyz", "123"]},
{"name": "xyz", "value": ["value1"]}
],
"response_trailers": [
{"name": "Case-Does-Not-Matter-For-Name", "value": ["value2"]}
]
}`,
actual: `{
"error": {"code": 5},
"response_headers": [
{"name": "abc", "value": ["xyz", "123"]},
{"name": "xyz", "value": ["value1"]},
{"name": "Case-Does-Not-Matter-For-Name", "value": ["value2"]}
]
}`,
},
{
name: "response meta misattributed allowed for trailers-only response (all in trailers)",
name: "response meta all in trailers allowed for error with trailers-only response",
expected: `{
"error": {"code": 5},
"response_headers": [
Expand Down Expand Up @@ -694,7 +679,10 @@ func TestResults_Assert_ReportsAllErrors(t *testing.T) {
t.Parallel()
results := newResults(&testTrie{}, &testTrie{}, nil)

expected := &conformancev1.TestCase{ExpectedResponse: &conformancev1.ClientResponseResult{}}
expected := &conformancev1.TestCase{
Request: &conformancev1.ClientCompatRequest{StreamType: conformancev1.StreamType_STREAM_TYPE_UNARY},
ExpectedResponse: &conformancev1.ClientResponseResult{},
}
err := protojson.Unmarshal(([]byte)(testCase.expected), expected.ExpectedResponse)
require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ testCases:
- request:
testName: client-stream/first-request-exceeds-server-limit
streamType: STREAM_TYPE_CLIENT_STREAM
requestDelayMs: 50 # give server enough time to reject message and client to notice
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.ClientStreamRequest
responseDefinition:
Expand All @@ -56,9 +57,11 @@ testCases:
expectedResponse:
error:
code: CODE_RESOURCE_EXHAUSTED
numUnsentRequests: 2
- request:
testName: client-stream/subsequent-request-exceeds-server-limit
streamType: STREAM_TYPE_CLIENT_STREAM
requestDelayMs: 50
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.ClientStreamRequest
responseDefinition:
Expand All @@ -72,6 +75,7 @@ testCases:
expectedResponse:
error:
code: CODE_RESOURCE_EXHAUSTED
numUnsentRequests: 1
# Server Stream Tests ---------------------------------------------------------
- request:
testName: server-stream/request-equal-to-server-limit
Expand Down Expand Up @@ -99,6 +103,7 @@ testCases:
expectedResponse:
error:
code: CODE_RESOURCE_EXHAUSTED
numUnsentRequests: 1
# Bidi Stream Tests -----------------------------------------------------------
- request:
testName: bidi-stream/half-duplex/all-requests-equal-to-server-limit
Expand All @@ -117,6 +122,7 @@ testCases:
- request:
testName: bidi-stream/half-duplex/first-request-exceeds-server-limit
streamType: STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
requestDelayMs: 50
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
responseDefinition:
Expand All @@ -134,9 +140,11 @@ testCases:
expectedResponse:
error:
code: CODE_RESOURCE_EXHAUSTED
numUnsentRequests: 2
- request:
testName: bidi-stream/half-duplex/subsequent-request-exceeds-server-limit
streamType: STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
requestDelayMs: 50
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
responseDefinition:
Expand All @@ -154,6 +162,7 @@ testCases:
expectedResponse:
error:
code: CODE_RESOURCE_EXHAUSTED
numUnsentRequests: 1
- request:
testName: bidi-stream/full-duplex/all-requests-equal-to-server-limit
streamType: STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM
Expand Down Expand Up @@ -181,6 +190,7 @@ testCases:
fullDuplex: true
- "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
requestData: "dGVzdCByZXNwb25zZQ=="
requestDelayMs: 50
expandRequests:
- sizeRelativeToLimit: 10
- sizeRelativeToLimit: 0
Expand All @@ -189,11 +199,13 @@ testCases:
expectedResponse:
error:
code: CODE_RESOURCE_EXHAUSTED
numUnsentRequests: 2
# TODO - Need a way to populate the expected response payload because the test
# library padded it with size and we don't know what it looks like here.
# - request:
# testName: bidi-stream/full-duplex/subsequent-request-exceeds-server-limit
# streamType: STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM
# requestDelayMs: 50
# requestMessages:
# - "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
# responseDefinition:
Expand Down Expand Up @@ -222,3 +234,4 @@ testCases:
# requestData: "dGVzdCByZXNwb25zZQ=="
# error:
# code: CODE_RESOURCE_EXHAUSTED
# numUnsentRequests: 1
33 changes: 20 additions & 13 deletions internal/app/referenceclient/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ func (i *invoker) unary(

if err != nil {
// If an error was returned, first convert it to a Connect error
// so that we can get the headers from the Meta property. Then,
// so that we can get the trailers from the Meta property. Then,
// convert _that_ to a proto Error so we can set it in the response.
connectErr := internal.ConvertErrorToConnectError(err)
headers = internal.ConvertToProtoHeader(connectErr.Meta())
trailers = internal.ConvertToProtoHeader(connectErr.Meta())
protoErr = internal.ConvertConnectToProtoError(connectErr)
} else {
// If the call was successful, get the headers and trailers
Expand Down Expand Up @@ -318,11 +318,11 @@ func (i *invoker) clientStream(

ctx = i.withWireCapture(ctx)
stream := i.client.ClientStream(ctx)

var numUnsent int
// Add the specified request headers to the request
internal.AddHeaders(req.RequestHeaders, stream.RequestHeader())

for _, msg := range req.RequestMessages {
for i, msg := range req.RequestMessages {
csr := &conformancev1.ClientStreamRequest{}
if err := msg.UnmarshalTo(csr); err != nil {
return nil, err
Expand All @@ -332,6 +332,7 @@ func (i *invoker) clientStream(
time.Sleep(time.Duration(req.RequestDelayMs) * time.Millisecond)

if err := stream.Send(csr); err != nil && errors.Is(err, io.EOF) {
numUnsent = len(req.RequestMessages) - i
break
}
}
Expand All @@ -357,10 +358,10 @@ func (i *invoker) clientStream(
resp, err := stream.CloseAndReceive()
if err != nil {
// If an error was returned, first convert it to a Connect error
// so that we can get the headers from the Meta property. Then,
// so that we can get the trailers from the Meta property. Then,
// convert _that_ to a proto Error so we can set it in the response.
connectErr := internal.ConvertErrorToConnectError(err)
headers = internal.ConvertToProtoHeader(connectErr.Meta())
trailers = internal.ConvertToProtoHeader(connectErr.Meta())
protoErr = internal.ConvertConnectToProtoError(connectErr)
} else {
// If the call was successful, get the returned payloads
Expand All @@ -373,12 +374,13 @@ func (i *invoker) clientStream(
statusCode, feedback := i.examineWireDetails(ctx)

return &conformancev1.ClientResponseResult{
ResponseHeaders: headers,
ResponseTrailers: trailers,
Payloads: payloads,
Error: protoErr,
HttpStatusCode: statusCode,
Feedback: feedback,
ResponseHeaders: headers,
ResponseTrailers: trailers,
Payloads: payloads,
NumUnsentRequests: int32(numUnsent),
Error: protoErr,
HttpStatusCode: statusCode,
Feedback: feedback,
}, nil
}

Expand Down Expand Up @@ -424,13 +426,17 @@ func (i *invoker) bidiStream(

var protoErr *conformancev1.Error
totalRcvd := 0
for _, msg := range req.RequestMessages {
for i, msg := range req.RequestMessages {
bsr := &conformancev1.BidiStreamRequest{}
if err := msg.UnmarshalTo(bsr); err != nil {
// Return the error and nil result because this is an
// unmarshalling error unrelated to the RPC
return nil, err
}

// Sleep for any specified delay
time.Sleep(time.Duration(req.RequestDelayMs) * time.Millisecond)

if err := stream.Send(bsr); err != nil && errors.Is(err, io.EOF) {
// Call receive to get the error and convert it to a proto error
if _, recvErr := stream.Receive(); recvErr != nil {
Expand All @@ -442,6 +448,7 @@ func (i *invoker) bidiStream(
protoErr = internal.ConvertErrorToProtoError(err)
}
// Break the send loop
result.NumUnsentRequests = int32(len(req.RequestMessages) - i)
break
}
if fullDuplex {
Expand Down
15 changes: 9 additions & 6 deletions internal/app/referenceserver/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ func (s *conformanceServer) ServerStream(
if responseDefinition != nil { //nolint:nestif
internal.AddHeaders(responseDefinition.ResponseHeaders, stream.ResponseHeader())
internal.AddHeaders(responseDefinition.ResponseTrailers, stream.ResponseTrailer())
// Immediately send the headers/trailers on the stream so that they can be read by the client
if err := stream.Send(nil); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("error sending on stream: %w", err))

if len(responseDefinition.ResponseData) > 0 {
// Immediately send the headers/trailers on the stream so that they can be read by the client
if err := stream.Send(nil); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("error sending on stream: %w", err))
}
}

// Calculate the response delay if specified
Expand Down Expand Up @@ -278,7 +281,7 @@ func (s *conformanceServer) BidiStream(
internal.AddHeaders(responseDefinition.ResponseHeaders, stream.ResponseHeader())
internal.AddHeaders(responseDefinition.ResponseTrailers, stream.ResponseTrailer())

if fullDuplex {
if fullDuplex && len(responseDefinition.ResponseData) > 0 {
// Immediately send the headers on the stream so that they can be read by the client.
// We can only do this for full-duplex. For half-duplex operation, we must let client
// complete its upload before trying to send anything.
Expand All @@ -294,7 +297,7 @@ func (s *conformanceServer) BidiStream(

// If fullDuplex, then send one of the desired responses each time we get a message on the stream
if fullDuplex {
if responseDefinition == nil || respNum >= len(responseDefinition.ResponseData) {
if respNum >= len(responseDefinition.GetResponseData()) {
// If there are no responses to send, then break the receive loop
// and throw the error specified
break
Expand Down Expand Up @@ -331,7 +334,7 @@ func (s *conformanceServer) BidiStream(
}
}

if !fullDuplex {
if !fullDuplex && len(responseDefinition.GetResponseData()) > 0 {
// Now that upload is complete, we can immediately send headers for half-duplex calls.
if err := stream.Send(nil); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("error sending on stream: %w", err))
Expand Down