-
Notifications
You must be signed in to change notification settings - Fork 4.5k
grpc: Fix cardinality violations in non-client streaming RPCs. #8385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #8385 +/- ##
==========================================
- Coverage 82.44% 82.35% -0.09%
==========================================
Files 413 413
Lines 40424 40527 +103
==========================================
+ Hits 33328 33378 +50
- Misses 5742 5778 +36
- Partials 1354 1371 +17
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to implement all of this in server.go
to avoid adding state to the serverStream
?
As discussed this with @arjan-bal offline, Cardinality violations can only be detected when messages are being read from the stream. This reading process occurs specifically within the server.RecvMsg() function. Since RecvMsg() is invoked from the user-implemented handler, it's not possible to detect cardinality violations during the initial stream setup phase. |
test/end2end_test.go
Outdated
} | ||
} | ||
|
||
func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClientSendsMultipleMessages
is not the same as ClientCallSendMsgTwice
. The former implies that the client is actually sending two messages, whereas the latter indicates that the client is calling the SendMsg method twice.
test/end2end_test.go
Outdated
go s.Serve(lis) | ||
defer s.Stop() | ||
|
||
// s := grpc.NewServer(ss) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this commented code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
test/end2end_test.go
Outdated
defer lis.Close() | ||
|
||
s := grpc.NewServer() | ||
serviceDesc := grpc.ServiceDesc{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we don't need to change the stream descriptors on the server side for this test. If so, please use a stubserver and override one of the handlers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamingOutputCall
have predefined calls to sendmsg() and recvmsg().
This test is to check the case when client sends no msg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The server behaviour shouldn't matter for this test, right? When the client sends 0 request messages, the server should send an internal error immediately. Are you seeing that this is not happening with stubserver?
test/end2end_test.go
Outdated
defer lis.Close() | ||
|
||
s := grpc.NewServer() | ||
serviceDesc := grpc.ServiceDesc{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to fake the stream descriptors in this test? If not, we should use the test service client and server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
test/end2end_test.go
Outdated
ss := stubserver.StubServer{ | ||
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { | ||
// This is second call to RecvMsg(), the initial call having been performed by the server handler. | ||
if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arjan-bal , as mentioned in the comment I'm using stubserver here.
But StreamingOutputCall
handler inherently calls RecvMsg() and second call to RecvMsg() in StreamingOutputCallF
cause a race to happen.
https://github.com/grpc/grpc-go/actions/runs/16136673126/job/45534355675?pr=8385
Should I revert it back to previous implementation and register a new service on the server to test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The race is in the writes performed on the err
variable declared at line 3746. We should declare a new err
variable inside the goroutine.
} else if err != nil { | ||
return err | ||
} | ||
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non client-streaming RPCs, but received another message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about something more like the first message:
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non client-streaming RPCs, but received another message") | |
return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-client-streaming RPC") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to keep it consistent with client.RecvMsg().
Lines 1180 to 1181 in a5e7cd6
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message") | |
} |
Let me know if multiple request messages
looks better, I'll change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These error messages go out on the wire. To clients written in any language. So they should generically explain the problem and not use Go-specific terminology, or internal implementation details like "expected EOF".
That's probably not appropriate on the client side, either, but slightly less bad. But since you're changing things, that one should probably look like:
return status.Errorf(codes.Internal, "cardinality violation: received multiple response messages for non-server-streaming RPC")
stream.go
Outdated
@@ -1774,6 +1775,9 @@ func (ss *serverStream) RecvMsg(m any) (err error) { | |||
binlog.Log(ss.ctx, chc) | |||
} | |||
} | |||
if !ss.desc.ClientStreams { | |||
return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-stream RPC") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-stream RPC") | |
return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
stream.go
Outdated
@@ -1774,6 +1775,9 @@ func (ss *serverStream) RecvMsg(m any) (err error) { | |||
binlog.Log(ss.ctx, chc) | |||
} | |||
} | |||
if !ss.desc.ClientStreams { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a behavior change? Users that call RecvMsg
would previously get io.EOF
and now they'll get a cardinality violation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there is a behaviour change.
For non-client-streaming RPCs, it will return internal
error for the following 2 cases.
- When there is 0 request message from Client.
- When Server call RecvMsg() twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the second behavior change is only possible if you are using the generic API, so wouldn't affect 99% of our users. But I still don't think this is a change we want to make. I would probably be in favor of it if we were not 1.0, but we need to keep backward compatibility unless there's enough justification, which doesn't seem to be the case here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, I have added an additional state in serverstream
to track the first call to server.recvMsg().
Based on that server will only return Internal
error when there is 0 request message from Client.
if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { | ||
t.Errorf("stream.SendMsg() = %v, want error %v", status.Code(err), codes.Internal) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we confirm that this cancels the RPC, too, as observed from the server? If the stream ends on the client side it must perform a RST_STREAM with the server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it did not do a RST_STREAM.
Following code causes this.
Lines 911 to 917 in a5e7cd6
if cs.sentLast { | |
return status.Errorf(codes.Internal, "SendMsg called after CloseSend") | |
} | |
if !cs.desc.ClientStreams { | |
cs.sentLast = true | |
} | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you confirm a RST_STREAM doesn't happen?
Because tracing through the code...that error results in a call to cs.finish
which will call csAttempt.finish
which calls transportStream.Close
. That should hopefully result in a RST_STREAM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You’re right — I traced it again and confirmed that the RST_STREAM does happen. The flow is:
cs.finish → csAttempt.finish → transportStream.Close → http2Client.closeStream → controlBuf.executeAndPut → cleanupStream.isTransportResponseFrame()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great. Can we update the test to confirm the stream is getting cancelled on the server side? I think you could just make it do send
operations and eventually see it error?
test/end2end_test.go
Outdated
defer lis.Close() | ||
|
||
s := grpc.NewServer() | ||
serviceDesc := grpc.ServiceDesc{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The server behaviour shouldn't matter for this test, right? When the client sends 0 request messages, the server should send an internal error immediately. Are you seeing that this is not happening with stubserver?
test/end2end_test.go
Outdated
|
||
ss := stubserver.StubServer{ | ||
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { | ||
// This is second call to RecvMsg(), the initial call having been performed by the server handler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the initial call having been performed by the server handler.
nit: I think a clearer way to mention the reason is that the the initial call to receive the request is made by the generated code
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test is no longer required.
test/end2end_test.go
Outdated
desc := &grpc.StreamDesc{ | ||
StreamName: "UnaryCall", | ||
ServerStreams: false, | ||
ClientStreams: false, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the test is for a misbehaving server, can we use a real test service client instead of manipulating StreamDesc
s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test is no longer required.
test/end2end_test.go
Outdated
} | ||
defer lis.Close() | ||
|
||
s := grpc.NewServer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to previous comments, can we use a stubserver if we don't need the server to misbehave? Also applicable to TestUnaryRPC_ClientCallSendMsgTwice
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -1588,6 +1589,8 @@ type serverStream struct { | |||
|
|||
sendCompressorName string | |||
|
|||
recvFirstMsg bool // recv frist msg from client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment doesn't really say anything. How about:
recvFirstMsg bool // recv frist msg from client | |
recvFirstMsg bool // set after the first message is received |
} else if err != nil { | ||
return err | ||
} | ||
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non client-streaming RPCs, but received another message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These error messages go out on the wire. To clients written in any language. So they should generically explain the problem and not use Go-specific terminology, or internal implementation details like "expected EOF".
That's probably not appropriate on the client side, either, but slightly less bad. But since you're changing things, that one should probably look like:
return status.Errorf(codes.Internal, "cardinality violation: received multiple response messages for non-server-streaming RPC")
if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { | ||
t.Errorf("stream.SendMsg() = %v, want error %v", status.Code(err), codes.Internal) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great. Can we update the test to confirm the stream is getting cancelled on the server side? I think you could just make it do send
operations and eventually see it error?
Partially addresses: #7286
In non-client streaming RPCs, the client's SendMsg() method is designed to automatically close the send operation after its initial call. If someone attempts to call Client.SendMsg() twice for non-client streaming RPCs, if will return with error
Internal desc = SendMsg called after CloseSend
.To mirror this behavior, the server-side logic has been updated so that calling RecvMsg() more than once for non-client streaming RPCs will now similarly return an
Internal
error.RELEASE NOTES: