@@ -48,6 +48,11 @@ func (p *GRPCProviderPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Serve
48
48
return nil
49
49
}
50
50
51
+ // grpcMaxMessageSize is the maximum gRPC send and receive message sizes
52
+ // This matches the maximum set by a server implemented via terraform-plugin-go.
53
+ // See https://github.com/hashicorp/terraform-plugin-go/blob/a361c9bf/tfprotov6/tf6server/server.go#L88
54
+ const grpcMaxMessageSize = 256 << 20
55
+
51
56
// GRPCProvider handles the client, or core side of the plugin rpc connection.
52
57
// The GRPCProvider methods are mostly a translation layer between the
53
58
// terraform providers types and the grpc proto types, directly converting
@@ -75,8 +80,12 @@ type GRPCProvider struct {
75
80
76
81
// schema stores the schema for this provider. This is used to properly
77
82
// serialize the requests for schemas.
78
- mu sync.Mutex
79
83
schema providers.GetProviderSchemaResponse
84
+ // stateChunkSize stores the negotiated chunk size for any implemented
85
+ // state store (keyed by type name) that have gone through successful configuration.
86
+ stateChunkSize map [string ]int
87
+
88
+ mu sync.Mutex
80
89
}
81
90
82
91
func (p * GRPCProvider ) GetProviderSchema () providers.GetProviderSchemaResponse {
@@ -1503,25 +1512,38 @@ func (p *GRPCProvider) ConfigureStateStore(r providers.ConfigureStateStoreReques
1503
1512
return resp
1504
1513
}
1505
1514
1515
+ clientCapabilities := stateStoreClientCapabilitiesToProto (r .Capabilities )
1516
+ logger .Trace ("GRPCProvider.v6: ConfigureStateStore: proposing client capabilities" , clientCapabilities )
1517
+
1506
1518
protoReq := & proto6.ConfigureStateStore_Request {
1507
1519
TypeName : r .TypeName ,
1508
1520
Config : & proto6.DynamicValue {
1509
1521
Msgpack : mp ,
1510
1522
},
1523
+ Capabilities : clientCapabilities ,
1511
1524
}
1512
1525
1513
1526
protoResp , err := p .client .ConfigureStateStore (p .ctx , protoReq )
1514
1527
if err != nil {
1515
1528
resp .Diagnostics = resp .Diagnostics .Append (grpcErr (err ))
1516
1529
return resp
1517
1530
}
1531
+ resp .Capabilities = stateStoreServerCapabilitiesFromProto (protoResp .Capabilities )
1532
+ logger .Trace ("GRPCProvider.v6: ConfigureStateStore: received server capabilities" , resp .Capabilities )
1533
+
1518
1534
resp .Diagnostics = resp .Diagnostics .Append (convert .ProtoToDiagnostics (protoResp .Diagnostics ))
1519
1535
return resp
1520
1536
}
1521
1537
1522
1538
func (p * GRPCProvider ) ReadStateBytes (r providers.ReadStateBytesRequest ) (resp providers.ReadStateBytesResponse ) {
1523
1539
logger .Trace ("GRPCProvider.v6: ReadStateBytes" )
1524
1540
1541
+ // ReadStateBytes can be more sensitive to message sizes
1542
+ // so we ensure it aligns with (the lower) terraform-plugin-go.
1543
+ opts := grpc.MaxRecvMsgSizeCallOption {
1544
+ MaxRecvMsgSize : grpcMaxMessageSize ,
1545
+ }
1546
+
1525
1547
schema := p .GetProviderSchema ()
1526
1548
if schema .Diagnostics .HasErrors () {
1527
1549
resp .Diagnostics = schema .Diagnostics
@@ -1544,14 +1566,15 @@ func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) (resp p
1544
1566
ctx , cancel := context .WithCancel (p .ctx )
1545
1567
defer cancel ()
1546
1568
1547
- client , err := p .client .ReadStateBytes (ctx , protoReq )
1569
+ client , err := p .client .ReadStateBytes (ctx , protoReq , opts )
1548
1570
if err != nil {
1549
1571
resp .Diagnostics = resp .Diagnostics .Append (grpcErr (err ))
1550
1572
return resp
1551
1573
}
1552
1574
1553
1575
buf := & bytes.Buffer {}
1554
1576
var expectedTotalLength int
1577
+ // TODO: Send warning if client misbehaves and uses (lower) chunk size that we didn't agree on
1555
1578
for {
1556
1579
chunk , err := client .Recv ()
1557
1580
if err == io .EOF {
@@ -1613,6 +1636,18 @@ func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) (resp p
1613
1636
func (p * GRPCProvider ) WriteStateBytes (r providers.WriteStateBytesRequest ) (resp providers.WriteStateBytesResponse ) {
1614
1637
logger .Trace ("GRPCProvider.v6: WriteStateBytes" )
1615
1638
1639
+ // WriteStateBytes can be more sensitive to message sizes
1640
+ // so we ensure it aligns with (the lower) terraform-plugin-go.
1641
+ opts := grpc.MaxSendMsgSizeCallOption {
1642
+ MaxSendMsgSize : grpcMaxMessageSize ,
1643
+ }
1644
+
1645
+ chunkSize , ok := p .stateChunkSize [r .TypeName ]
1646
+ if ! ok {
1647
+ resp .Diagnostics = resp .Diagnostics .Append (fmt .Errorf ("Unable to determine chunk size for provider %s; this is a bug in Terraform - please report it" , r .TypeName ))
1648
+ return resp
1649
+ }
1650
+
1616
1651
schema := p .GetProviderSchema ()
1617
1652
if schema .Diagnostics .HasErrors () {
1618
1653
resp .Diagnostics = schema .Diagnostics
@@ -1630,10 +1665,7 @@ func (p *GRPCProvider) WriteStateBytes(r providers.WriteStateBytesRequest) (resp
1630
1665
ctx , cancel := context .WithCancel (p .ctx )
1631
1666
defer cancel ()
1632
1667
1633
- // TODO: Configurable chunk size
1634
- chunkSize := 4 * 1_000_000 // 4MB
1635
-
1636
- client , err := p .client .WriteStateBytes (ctx )
1668
+ client , err := p .client .WriteStateBytes (ctx , opts )
1637
1669
if err != nil {
1638
1670
resp .Diagnostics = resp .Diagnostics .Append (grpcErr (err ))
1639
1671
return resp
@@ -1683,6 +1715,15 @@ func (p *GRPCProvider) WriteStateBytes(r providers.WriteStateBytesRequest) (resp
1683
1715
return resp
1684
1716
}
1685
1717
1718
+ func (p * GRPCProvider ) SetStateStoreChunkSize (typeName string , size int ) {
1719
+ p .mu .Lock ()
1720
+ defer p .mu .Unlock ()
1721
+ if p .stateChunkSize == nil {
1722
+ p .stateChunkSize = make (map [string ]int , 1 )
1723
+ }
1724
+ p .stateChunkSize [typeName ] = size
1725
+ }
1726
+
1686
1727
func (p * GRPCProvider ) GetStates (r providers.GetStatesRequest ) (resp providers.GetStatesResponse ) {
1687
1728
logger .Trace ("GRPCProvider.v6: GetStates" )
1688
1729
@@ -1936,3 +1977,15 @@ func clientCapabilitiesToProto(c providers.ClientCapabilities) *proto6.ClientCap
1936
1977
WriteOnlyAttributesAllowed : c .WriteOnlyAttributesAllowed ,
1937
1978
}
1938
1979
}
1980
+
1981
+ func stateStoreClientCapabilitiesToProto (c providers.StateStoreClientCapabilities ) * proto6.StateStoreClientCapabilities {
1982
+ return & proto6.StateStoreClientCapabilities {
1983
+ ChunkSize : c .ChunkSize ,
1984
+ }
1985
+ }
1986
+
1987
+ func stateStoreServerCapabilitiesFromProto (c * proto6.StateStoreServerCapabilities ) providers.StateStoreServerCapabilities {
1988
+ return providers.StateStoreServerCapabilities {
1989
+ ChunkSize : c .ChunkSize ,
1990
+ }
1991
+ }
0 commit comments