Skip to content

grpc: Fix cardinality violations in non-client streaming RPCs. #8385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: master
Choose a base branch
from

Conversation

Pranjali-2501
Copy link
Contributor

@Pranjali-2501 Pranjali-2501 commented Jun 6, 2025

Partially addresses: #7286

In non-client streaming RPCs, the client's SendMsg() method is designed to automatically close the send operation after its initial call. If someone attempts to call Client.SendMsg() twice for non-client streaming RPCs, if will return with error Internal desc = SendMsg called after CloseSend.
To mirror this behavior, the server-side logic has been updated so that calling RecvMsg() more than once for non-client streaming RPCs will now similarly return an Internal error.

RELEASE NOTES:

  • grpc: return status code INTERNAL when client send more than one request in unary and server streaming RPC.

@Pranjali-2501 Pranjali-2501 added this to the 1.74 Release milestone Jun 6, 2025
Copy link

codecov bot commented Jun 6, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 82.35%. Comparing base (20bd1e7) to head (0f5248a).
Report is 34 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #8385      +/-   ##
==========================================
- Coverage   82.44%   82.35%   -0.09%     
==========================================
  Files         413      413              
  Lines       40424    40527     +103     
==========================================
+ Hits        33328    33378      +50     
- Misses       5742     5778      +36     
- Partials     1354     1371      +17     
Files with missing lines Coverage Δ
server.go 81.54% <100.00%> (-0.53%) ⬇️
stream.go 81.93% <100.00%> (+0.27%) ⬆️

... and 42 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@dfawley dfawley removed their assignment Jun 6, 2025
@dfawley dfawley removed their request for review June 6, 2025 21:16
Copy link
Member

@dfawley dfawley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to implement all of this in server.go to avoid adding state to the serverStream?

@Pranjali-2501 Pranjali-2501 changed the title grpc: Fix cardinality violations in server streaming RPC. grpc: Fix cardinality violations in non-client streaming RPCs. Jun 11, 2025
@Pranjali-2501
Copy link
Contributor Author

Is it possible to implement all of this in server.go to avoid adding state to the serverStream?

As discussed this with @arjan-bal offline, Cardinality violations can only be detected when messages are being read from the stream. This reading process occurs specifically within the server.RecvMsg() function. Since RecvMsg() is invoked from the user-implemented handler, it's not possible to detect cardinality violations during the initial stream setup phase.

}
}

func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClientSendsMultipleMessages is not the same as ClientCallSendMsgTwice. The former implies that the client is actually sending two messages, whereas the latter indicates that the client is calling the SendMsg method twice.

go s.Serve(lis)
defer s.Stop()

// s := grpc.NewServer(ss)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this commented code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

defer lis.Close()

s := grpc.NewServer()
serviceDesc := grpc.ServiceDesc{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we don't need to change the stream descriptors on the server side for this test. If so, please use a stubserver and override one of the handlers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamingOutputCall have predefined calls to sendmsg() and recvmsg().
This test is to check the case when client sends no msg.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server behaviour shouldn't matter for this test, right? When the client sends 0 request messages, the server should send an internal error immediately. Are you seeing that this is not happening with stubserver?

defer lis.Close()

s := grpc.NewServer()
serviceDesc := grpc.ServiceDesc{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to fake the stream descriptors in this test? If not, we should use the test service client and server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@arjan-bal arjan-bal assigned Pranjali-2501 and unassigned arjan-bal Jul 4, 2025
ss := stubserver.StubServer{
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
// This is second call to RecvMsg(), the initial call having been performed by the server handler.
if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal {
Copy link
Contributor Author

@Pranjali-2501 Pranjali-2501 Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arjan-bal , as mentioned in the comment I'm using stubserver here.
But StreamingOutputCall handler inherently calls RecvMsg() and second call to RecvMsg() in StreamingOutputCallF cause a race to happen.
https://github.com/grpc/grpc-go/actions/runs/16136673126/job/45534355675?pr=8385

Should I revert it back to previous implementation and register a new service on the server to test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race is in the writes performed on the err variable declared at line 3746. We should declare a new err variable inside the goroutine.

} else if err != nil {
return err
}
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non client-streaming RPCs, but received another message")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something more like the first message:

Suggested change
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non client-streaming RPCs, but received another message")
return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-client-streaming RPC")

Copy link
Contributor Author

@Pranjali-2501 Pranjali-2501 Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to keep it consistent with client.RecvMsg().

grpc-go/stream.go

Lines 1180 to 1181 in a5e7cd6

return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}

Let me know if multiple request messages looks better, I'll change it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These error messages go out on the wire. To clients written in any language. So they should generically explain the problem and not use Go-specific terminology, or internal implementation details like "expected EOF".

That's probably not appropriate on the client side, either, but slightly less bad. But since you're changing things, that one should probably look like:

	return status.Errorf(codes.Internal, "cardinality violation: received multiple response messages for non-server-streaming RPC")

stream.go Outdated
@@ -1774,6 +1775,9 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
binlog.Log(ss.ctx, chc)
}
}
if !ss.desc.ClientStreams {
return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-stream RPC")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-stream RPC")
return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

stream.go Outdated
@@ -1774,6 +1775,9 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
binlog.Log(ss.ctx, chc)
}
}
if !ss.desc.ClientStreams {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a behavior change? Users that call RecvMsg would previously get io.EOF and now they'll get a cardinality violation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is a behaviour change.

For non-client-streaming RPCs, it will return internal error for the following 2 cases.

  • When there is 0 request message from Client.
  • When Server call RecvMsg() twice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the second behavior change is only possible if you are using the generic API, so wouldn't affect 99% of our users. But I still don't think this is a change we want to make. I would probably be in favor of it if we were not 1.0, but we need to keep backward compatibility unless there's enough justification, which doesn't seem to be the case here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, I have added an additional state in serverstream to track the first call to server.recvMsg().

Based on that server will only return Internal error when there is 0 request message from Client.

Comment on lines +3831 to +3833
if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal {
t.Errorf("stream.SendMsg() = %v, want error %v", status.Code(err), codes.Internal)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we confirm that this cancels the RPC, too, as observed from the server? If the stream ends on the client side it must perform a RST_STREAM with the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it did not do a RST_STREAM.
Following code causes this.

grpc-go/stream.go

Lines 911 to 917 in a5e7cd6

if cs.sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}
if !cs.desc.ClientStreams {
cs.sentLast = true
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you confirm a RST_STREAM doesn't happen?

Because tracing through the code...that error results in a call to cs.finish which will call csAttempt.finish which calls transportStream.Close. That should hopefully result in a RST_STREAM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You’re right — I traced it again and confirmed that the RST_STREAM does happen. The flow is:
cs.finish → csAttempt.finish → transportStream.Close → http2Client.closeStream → controlBuf.executeAndPut → cleanupStream.isTransportResponseFrame().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. Can we update the test to confirm the stream is getting cancelled on the server side? I think you could just make it do send operations and eventually see it error?

defer lis.Close()

s := grpc.NewServer()
serviceDesc := grpc.ServiceDesc{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server behaviour shouldn't matter for this test, right? When the client sends 0 request messages, the server should send an internal error immediately. Are you seeing that this is not happening with stubserver?


ss := stubserver.StubServer{
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
// This is second call to RecvMsg(), the initial call having been performed by the server handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the initial call having been performed by the server handler.

nit: I think a clearer way to mention the reason is that the the initial call to receive the request is made by the generated code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test is no longer required.

Comment on lines 3881 to 3885
desc := &grpc.StreamDesc{
StreamName: "UnaryCall",
ServerStreams: false,
ClientStreams: false,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the test is for a misbehaving server, can we use a real test service client instead of manipulating StreamDescs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test is no longer required.

}
defer lis.Close()

s := grpc.NewServer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to previous comments, can we use a stubserver if we don't need the server to misbehave? Also applicable to TestUnaryRPC_ClientCallSendMsgTwice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@arjan-bal arjan-bal assigned Pranjali-2501 and unassigned arjan-bal Jul 22, 2025
@Pranjali-2501 Pranjali-2501 removed their assignment Jul 25, 2025
@@ -1588,6 +1589,8 @@ type serverStream struct {

sendCompressorName string

recvFirstMsg bool // recv frist msg from client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't really say anything. How about:

Suggested change
recvFirstMsg bool // recv frist msg from client
recvFirstMsg bool // set after the first message is received

} else if err != nil {
return err
}
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non client-streaming RPCs, but received another message")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These error messages go out on the wire. To clients written in any language. So they should generically explain the problem and not use Go-specific terminology, or internal implementation details like "expected EOF".

That's probably not appropriate on the client side, either, but slightly less bad. But since you're changing things, that one should probably look like:

	return status.Errorf(codes.Internal, "cardinality violation: received multiple response messages for non-server-streaming RPC")

Comment on lines +3831 to +3833
if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal {
t.Errorf("stream.SendMsg() = %v, want error %v", status.Code(err), codes.Internal)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. Can we update the test to confirm the stream is getting cancelled on the server side? I think you could just make it do send operations and eventually see it error?

@dfawley dfawley assigned Pranjali-2501 and unassigned dfawley Jul 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants