From 67fc4c80605890c5aff77f2b72c85a98a6a9c02d Mon Sep 17 00:00:00 2001 From: Sarah French Date: Fri, 19 Sep 2025 13:11:01 +0100 Subject: [PATCH 1/5] Update code to not expect a chunk when EOF encountered --- internal/plugin6/grpc_provider.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/internal/plugin6/grpc_provider.go b/internal/plugin6/grpc_provider.go index 64fc8ddc68b8..71e2c62310b5 100644 --- a/internal/plugin6/grpc_provider.go +++ b/internal/plugin6/grpc_provider.go @@ -1515,11 +1515,8 @@ func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) (resp p for { chunk, err := client.Recv() if err == io.EOF { - // End of stream, we're done - if chunk != nil { - // TODO: The EOF error could be just returned alongside the last chunk? - resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(chunk.Diagnostics)) - } + // No chunk is returned alongside an EOF. + // And as EOF is a sentinel error it isn't wrapped as a diagnostic. break } if err != nil { From a6fb6f4f7b87c68d7a8a1ab3d00daf2644ba37f6 Mon Sep 17 00:00:00 2001 From: Sarah French Date: Fri, 19 Sep 2025 13:12:55 +0100 Subject: [PATCH 2/5] Return early if any grpc errors are encountered during ReadStateBytes --- internal/plugin6/grpc_provider.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/plugin6/grpc_provider.go b/internal/plugin6/grpc_provider.go index 71e2c62310b5..b701d990e55f 100644 --- a/internal/plugin6/grpc_provider.go +++ b/internal/plugin6/grpc_provider.go @@ -1542,7 +1542,11 @@ func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) (resp p logger.Trace("GRPCProvider.v6: ReadStateBytes: read bytes of a chunk", n) } - logger.Trace("GRPCProvider.v6: ReadStateBytes: received all chunks", buf.Len()) + if resp.Diagnostics.HasErrors() { + logger.Trace("GRPCProvider.v6: ReadStateBytes: experienced an error when receiving state data from the provider", resp.Diagnostics.Err()) + return resp + } + if buf.Len() != expectedTotalLength { err = fmt.Errorf("expected state file of total %d bytes, received %d bytes", expectedTotalLength, buf.Len()) From bcc6d1414787cc2034cbf46780fdffc28ca83e4c Mon Sep 17 00:00:00 2001 From: Sarah French Date: Fri, 19 Sep 2025 13:15:04 +0100 Subject: [PATCH 3/5] Close the stream with CloseSend once everything's read without error. Add test case about handling grpc errors from CloseSend. --- internal/plugin6/grpc_provider.go | 10 ++++++ internal/plugin6/grpc_provider_test.go | 48 ++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/internal/plugin6/grpc_provider.go b/internal/plugin6/grpc_provider.go index b701d990e55f..4793c854750c 100644 --- a/internal/plugin6/grpc_provider.go +++ b/internal/plugin6/grpc_provider.go @@ -1553,6 +1553,16 @@ func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) (resp p resp.Diagnostics = resp.Diagnostics.Append(err) return resp } + + // We're done, so close the stream + logger.Trace("GRPCProvider.v6: ReadStateBytes: received all chunks, total bytes: ", buf.Len()) + err = client.CloseSend() + if err != nil { + resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err)) + return resp + } + + // Add the state data in the response once we know there are no errors resp.Bytes = buf.Bytes() return resp diff --git a/internal/plugin6/grpc_provider_test.go b/internal/plugin6/grpc_provider_test.go index ab029d4d3773..16fe495e648b 100644 --- a/internal/plugin6/grpc_provider_test.go +++ b/internal/plugin6/grpc_provider_test.go @@ -3556,6 +3556,9 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { return ret.resp, ret.err }).Times(3) + // There will be a call to CloseSend to close the stream + mockReadBytesClient.EXPECT().CloseSend().Return(nil).Times(1) + // Act request := providers.ReadStateBytesRequest{ TypeName: expectedReq.TypeName, @@ -3820,6 +3823,51 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { t.Fatalf("expected data to be omitted in error condition, but got: %q", string(resp.Bytes)) } }) + + t.Run("when closing the stream, grpc errors are surfaced via diagnostics", func(t *testing.T) { + client := mockProviderClient(t) + p := &GRPCProvider{ + client: client, + ctx: context.Background(), + } + + // Call to ReadStateBytes + // > Assert the arguments received + // > Define the returned mock client + mockClient := mockReadStateBytesClient(t) + expectedReq := &proto.ReadStateBytes_Request{ + TypeName: "mock_store", + StateId: backend.DefaultStateName, + } + client.EXPECT().ReadStateBytes( + gomock.Any(), + gomock.Eq(expectedReq), + ).Return(mockClient, nil) + + // Sufficient mocking of Recv to get to the call to CloseSend + mockClient.EXPECT().Recv().Return(&proto.ReadStateBytes_Response{}, io.EOF) + + // Force a gRPC error from CloseSend + mockError := errors.New("grpc error forced in test") + mockClient.EXPECT().CloseSend().Return(mockError).Times(1) + + // Act + request := providers.ReadStateBytesRequest{ + TypeName: expectedReq.TypeName, + StateId: expectedReq.StateId, + } + resp := p.ReadStateBytes(request) + + // Assert returned values + checkDiagsHasError(t, resp.Diagnostics) + wantErr := fmt.Sprintf("Plugin error: The plugin returned an unexpected error from plugin6.(*GRPCProvider).ReadStateBytes: %s", mockError) + if resp.Diagnostics.Err().Error() != wantErr { + t.Fatalf("expected error diagnostic %q, but got: %q", wantErr, resp.Diagnostics.Err()) + } + if len(resp.Bytes) != 0 { + t.Fatalf("expected data to be omitted in error condition, but got: %q", string(resp.Bytes)) + } + }) } func TestGRPCProvider_WriteStateBytes(t *testing.T) { From cf2c9db5f5e755a245e0f15aa16d716f3016a8cc Mon Sep 17 00:00:00 2001 From: Sarah French Date: Fri, 19 Sep 2025 13:16:45 +0100 Subject: [PATCH 4/5] Fix test case about warnings: We would expect to receive a chunk with data alongside the warning and have a normal closing of the stream after EOF --- internal/plugin6/grpc_provider_test.go | 47 +++++++++++++++++++++----- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/internal/plugin6/grpc_provider_test.go b/internal/plugin6/grpc_provider_test.go index 16fe495e648b..918793b1de1b 100644 --- a/internal/plugin6/grpc_provider_test.go +++ b/internal/plugin6/grpc_provider_test.go @@ -3755,15 +3755,44 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { ).Return(mockReadBytesClient, nil) // Define what will be returned by each call to Recv - mockReadBytesClient.EXPECT().Recv().Return(&proto.ReadStateBytes_Response{ - Diagnostics: []*proto.Diagnostic{ - &proto.Diagnostic{ - Severity: proto.Diagnostic_WARNING, - Summary: "Warning from test", - Detail: "This warning is forced by the test case", + chunk := "hello world" + totalLength := len(chunk) + mockResp := map[int]struct { + resp *proto.ReadStateBytes_Response + err error + }{ + 0: { + resp: &proto.ReadStateBytes_Response{ + Bytes: []byte(chunk), + TotalLength: int64(totalLength), + Range: &proto.StateRange{ + Start: 0, + End: int64(len(chunk)), + }, + Diagnostics: []*proto.Diagnostic{ + { + Severity: proto.Diagnostic_WARNING, + Summary: "Warning from test", + Detail: "This warning is forced by the test case", + }, + }, }, + err: nil, }, - }, io.EOF) + 1: { + resp: &proto.ReadStateBytes_Response{}, + err: io.EOF, + }, + } + var count int + mockReadBytesClient.EXPECT().Recv().DoAndReturn(func() (*proto.ReadStateBytes_Response, error) { + ret := mockResp[count] + count++ + return ret.resp, ret.err + }).Times(2) + + // There will be a call to CloseSend to close the stream + mockReadBytesClient.EXPECT().CloseSend().Return(nil).Times(1) // Act request := providers.ReadStateBytesRequest{ @@ -3778,8 +3807,8 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { if resp.Diagnostics.ErrWithWarnings().Error() != expectedWarn { t.Fatalf("expected warning diagnostic %q, but got: %q", expectedWarn, resp.Diagnostics.ErrWithWarnings().Error()) } - if len(resp.Bytes) != 0 { - t.Fatalf("expected data to be omitted in error condition, but got: %q", string(resp.Bytes)) + if len(resp.Bytes) == 0 { + t.Fatal("expected data to included despite warnings, but got no bytes") } }) From ed0bae5b322c2744e991addc465442cc466abec5 Mon Sep 17 00:00:00 2001 From: Sarah French Date: Fri, 19 Sep 2025 13:17:02 +0100 Subject: [PATCH 5/5] Add log line, remove unneeded type info --- internal/plugin6/grpc_provider.go | 2 ++ internal/plugin6/grpc_provider_test.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/plugin6/grpc_provider.go b/internal/plugin6/grpc_provider.go index 4793c854750c..61ca88d8863c 100644 --- a/internal/plugin6/grpc_provider.go +++ b/internal/plugin6/grpc_provider.go @@ -1548,6 +1548,8 @@ func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) (resp p } if buf.Len() != expectedTotalLength { + logger.Trace("GRPCProvider.v6: ReadStateBytes: received %d bytes but expected the total bytes to be %d.", buf.Len(), expectedTotalLength) + err = fmt.Errorf("expected state file of total %d bytes, received %d bytes", expectedTotalLength, buf.Len()) resp.Diagnostics = resp.Diagnostics.Append(err) diff --git a/internal/plugin6/grpc_provider_test.go b/internal/plugin6/grpc_provider_test.go index 918793b1de1b..ef438f4c45f4 100644 --- a/internal/plugin6/grpc_provider_test.go +++ b/internal/plugin6/grpc_provider_test.go @@ -3707,7 +3707,7 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { // Define what will be returned by each call to Recv mockReadBytesClient.EXPECT().Recv().Return(&proto.ReadStateBytes_Response{ Diagnostics: []*proto.Diagnostic{ - &proto.Diagnostic{ + { Severity: proto.Diagnostic_ERROR, Summary: "Error from test", Detail: "This error is forced by the test case",