Skip to content

Commit cd5f3d9

Browse files
committed
Add impl for uvip
1 parent c684de5 commit cd5f3d9

File tree

26 files changed

+1846
-47
lines changed

26 files changed

+1846
-47
lines changed

cmd/kube-apiserver/app/aggregator.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
genericapiserver "k8s.io/apiserver/pkg/server"
3838
"k8s.io/apiserver/pkg/server/healthz"
3939
utilfeature "k8s.io/apiserver/pkg/util/feature"
40+
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
4041
kubeexternalinformers "k8s.io/client-go/informers"
4142
"k8s.io/client-go/tools/cache"
4243
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
@@ -57,6 +58,7 @@ func createAggregatorConfig(
5758
externalInformers kubeexternalinformers.SharedInformerFactory,
5859
serviceResolver aggregatorapiserver.ServiceResolver,
5960
proxyTransport *http.Transport,
61+
peerProxy utilpeerproxy.Interface,
6062
pluginInitializers []admission.PluginInitializer,
6163
) (*aggregatorapiserver.Config, error) {
6264
// make a shallow copy to let us twiddle a few things
@@ -76,6 +78,16 @@ func createAggregatorConfig(
7678
genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition
7779
}
7880

81+
if peerProxy != nil {
82+
originalHandlerChainBuilder := genericConfig.BuildHandlerChainFunc
83+
genericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler {
84+
// Add peer proxy handler to aggregator-apiserver.
85+
// wrap the peer proxy handler first.
86+
apiHandler = peerProxy.WrapHandler(apiHandler)
87+
return originalHandlerChainBuilder(apiHandler, c)
88+
}
89+
}
90+
7991
// copy the etcd options so we don't mutate originals.
8092
// we assume that the etcd options have been completed already. avoid messing with anything outside
8193
// of changes to StorageConfig as that may lead to unexpected behavior when the options are applied.
@@ -104,6 +116,8 @@ func createAggregatorConfig(
104116
ExtraConfig: aggregatorapiserver.ExtraConfig{
105117
ProxyClientCertFile: commandOptions.ProxyClientCertFile,
106118
ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
119+
PeerCAFile: commandOptions.PeerCAFile,
120+
PeerAdvertiseAddress: commandOptions.PeerAdvertiseAddress,
107121
ServiceResolver: serviceResolver,
108122
ProxyTransport: proxyTransport,
109123
RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,

cmd/kube-apiserver/app/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
8484
}
8585
c.ApiExtensions = apiExtensions
8686

87-
aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, pluginInitializer)
87+
aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, controlPlane.ExtraConfig.PeerProxy, pluginInitializer)
8888
if err != nil {
8989
return nil, err
9090
}

cmd/kube-apiserver/app/server.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import (
5757
"k8s.io/klog/v2"
5858
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
5959
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
60+
"k8s.io/kubernetes/pkg/features"
6061

6162
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
6263
"k8s.io/kubernetes/pkg/api/legacyscheme"
@@ -258,6 +259,21 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
258259
},
259260
}
260261

262+
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
263+
config.ExtraConfig.PeerEndpointLeaseReconciler, err = controlplaneapiserver.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
264+
if err != nil {
265+
return nil, nil, nil, err
266+
}
267+
// build peer proxy config only if peer ca file exists
268+
if opts.PeerCAFile != "" {
269+
config.ExtraConfig.PeerProxy, err = controlplaneapiserver.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
270+
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.ExtraConfig.PeerEndpointLeaseReconciler, config.GenericConfig.Serializer)
271+
if err != nil {
272+
return nil, nil, nil, err
273+
}
274+
}
275+
}
276+
261277
clientCAProvider, err := opts.Authentication.ClientCert.GetClientCAContentProvider()
262278
if err != nil {
263279
return nil, nil, nil, err

pkg/controlplane/apiserver/config.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,25 @@ import (
2828
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
2929
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
3030
genericfeatures "k8s.io/apiserver/pkg/features"
31+
"k8s.io/apiserver/pkg/reconcilers"
3132
genericapiserver "k8s.io/apiserver/pkg/server"
3233
"k8s.io/apiserver/pkg/server/egressselector"
3334
"k8s.io/apiserver/pkg/server/filters"
3435
serverstorage "k8s.io/apiserver/pkg/server/storage"
36+
"k8s.io/apiserver/pkg/storageversion"
3537
utilfeature "k8s.io/apiserver/pkg/util/feature"
3638
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
3739
"k8s.io/apiserver/pkg/util/openapi"
40+
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
3841
clientgoinformers "k8s.io/client-go/informers"
3942
clientgoclientset "k8s.io/client-go/kubernetes"
43+
"k8s.io/client-go/transport"
4044
"k8s.io/component-base/version"
45+
"k8s.io/klog/v2"
4146
openapicommon "k8s.io/kube-openapi/pkg/common"
4247

4348
"k8s.io/kubernetes/pkg/api/legacyscheme"
49+
api "k8s.io/kubernetes/pkg/apis/core"
4450
"k8s.io/kubernetes/pkg/controlplane"
4551
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver/options"
4652
"k8s.io/kubernetes/pkg/kubeapiserver"
@@ -193,3 +199,50 @@ func BuildPriorityAndFairness(s controlplaneapiserver.CompletedOptions, extclien
193199
s.GenericServerRunOptions.RequestTimeout/4,
194200
), nil
195201
}
202+
203+
// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
204+
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
205+
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (reconcilers.PeerEndpointLeaseReconciler, error) {
206+
ttl := controlplane.DefaultEndpointReconcilerTTL
207+
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
208+
if err != nil {
209+
return nil, fmt.Errorf("error creating storage factory config: %w", err)
210+
}
211+
reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
212+
return reconciler, err
213+
}
214+
215+
func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
216+
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress,
217+
apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
218+
if proxyClientCertFile == "" {
219+
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
220+
}
221+
if proxyClientKeyFile == "" {
222+
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
223+
}
224+
// create proxy client config
225+
clientConfig := &transport.Config{
226+
TLS: transport.TLSConfig{
227+
Insecure: false,
228+
CertFile: proxyClientCertFile,
229+
KeyFile: proxyClientKeyFile,
230+
CAFile: peerCAFile,
231+
ServerName: "kubernetes.default.svc",
232+
}}
233+
234+
// build proxy transport
235+
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
236+
if transportBuildingError != nil {
237+
klog.Error(transportBuildingError.Error())
238+
return nil, transportBuildingError
239+
}
240+
return utilpeerproxy.NewPeerProxyHandler(
241+
versionedInformer,
242+
svm,
243+
proxyRoundTripper,
244+
apiServerID,
245+
reconciler,
246+
serializer,
247+
), nil
248+
}

pkg/controlplane/apiserver/options/options.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"strings"
2525
"time"
2626

27+
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
2728
genericoptions "k8s.io/apiserver/pkg/server/options"
2829
"k8s.io/apiserver/pkg/storage/storagebackend"
2930
"k8s.io/client-go/util/keyutil"
@@ -63,6 +64,16 @@ type Options struct {
6364
ProxyClientCertFile string
6465
ProxyClientKeyFile string
6566

67+
// PeerCAFile is the ca bundle used by this kube-apiserver to verify peer apiservers'
68+
// serving certs when routing a request to the peer in the case the request can not be served
69+
// locally due to version skew.
70+
PeerCAFile string
71+
72+
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
73+
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
74+
// version skew.
75+
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
76+
6677
EnableAggregatorRouting bool
6778
AggregatorRejectForwardingRedirects bool
6879

@@ -154,6 +165,20 @@ func (s *Options) AddFlags(fss *cliflag.NamedFlagSets) {
154165
"when it must call out during a request. This includes proxying requests to a user "+
155166
"api-server and calling out to webhook admission plugins.")
156167

168+
fs.StringVar(&s.PeerCAFile, "peer-ca-file", s.PeerCAFile,
169+
"If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this file will be used to verify serving certificates of peer kube-apiservers. "+
170+
"This flag is only used in clusters configured with multiple kube-apiservers for high availability.")
171+
172+
fs.StringVar(&s.PeerAdvertiseAddress.PeerAdvertiseIP, "peer-advertise-ip", s.PeerAdvertiseAddress.PeerAdvertiseIP,
173+
"If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this IP will be used by peer kube-apiservers to proxy requests to this kube-apiserver "+
174+
"when the request cannot be handled by the peer due to version skew between the kube-apiservers. "+
175+
"This flag is only used in clusters configured with multiple kube-apiservers for high availability. ")
176+
177+
fs.StringVar(&s.PeerAdvertiseAddress.PeerAdvertisePort, "peer-advertise-port", s.PeerAdvertiseAddress.PeerAdvertisePort,
178+
"If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this port will be used by peer kube-apiservers to proxy requests to this kube-apiserver "+
179+
"when the request cannot be handled by the peer due to version skew between the kube-apiservers. "+
180+
"This flag is only used in clusters configured with multiple kube-apiservers for high availability. ")
181+
157182
fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting,
158183
"Turns on aggregator routing requests to endpoints IP rather than cluster IP.")
159184

pkg/controlplane/apiserver/options/validation.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
genericfeatures "k8s.io/apiserver/pkg/features"
2626
utilfeature "k8s.io/apiserver/pkg/util/feature"
2727
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
28+
"k8s.io/kubernetes/pkg/features"
2829

2930
"k8s.io/kubernetes/pkg/api/legacyscheme"
3031
)
@@ -69,6 +70,32 @@ func validateAPIPriorityAndFairness(options *Options) []error {
6970
return nil
7071
}
7172

73+
func validateUnknownVersionInteroperabilityProxyFeature() []error {
74+
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
75+
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
76+
return nil
77+
}
78+
return []error{fmt.Errorf("UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled")}
79+
}
80+
return nil
81+
}
82+
83+
func validateUnknownVersionInteroperabilityProxyFlags(options *Options) []error {
84+
err := []error{}
85+
if !utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
86+
if options.PeerCAFile != "" {
87+
err = append(err, fmt.Errorf("--peer-ca-file requires UnknownVersionInteroperabilityProxy feature to be turned on"))
88+
}
89+
if options.PeerAdvertiseAddress.PeerAdvertiseIP != "" {
90+
err = append(err, fmt.Errorf("--peer-advertise-ip requires UnknownVersionInteroperabilityProxy feature to be turned on"))
91+
}
92+
if options.PeerAdvertiseAddress.PeerAdvertisePort != "" {
93+
err = append(err, fmt.Errorf("--peer-advertise-port requires UnknownVersionInteroperabilityProxy feature to be turned on"))
94+
}
95+
}
96+
return err
97+
}
98+
7299
// Validate checks Options and return a slice of found errs.
73100
func (s *Options) Validate() []error {
74101
var errs []error
@@ -83,6 +110,8 @@ func (s *Options) Validate() []error {
83110
errs = append(errs, s.APIEnablement.Validate(legacyscheme.Scheme, apiextensionsapiserver.Scheme, aggregatorscheme.Scheme)...)
84111
errs = append(errs, validateTokenRequest(s)...)
85112
errs = append(errs, s.Metrics.Validate()...)
113+
errs = append(errs, validateUnknownVersionInteroperabilityProxyFeature()...)
114+
errs = append(errs, validateUnknownVersionInteroperabilityProxyFlags(s)...)
86115

87116
return errs
88117
}

pkg/controlplane/apiserver/options/validation_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,13 @@ import (
2222

2323
kubeapiserveradmission "k8s.io/apiserver/pkg/admission"
2424
genericoptions "k8s.io/apiserver/pkg/server/options"
25+
utilfeature "k8s.io/apiserver/pkg/util/feature"
26+
"k8s.io/component-base/featuregate"
2527
basemetrics "k8s.io/component-base/metrics"
28+
"k8s.io/kubernetes/pkg/features"
2629

30+
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
31+
featuregatetesting "k8s.io/component-base/featuregate/testing"
2732
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
2833
)
2934

@@ -80,6 +85,83 @@ func TestValidateAPIPriorityAndFairness(t *testing.T) {
8085
}
8186
}
8287

88+
func TestValidateUnknownVersionInteroperabilityProxy(t *testing.T) {
89+
tests := []struct {
90+
name string
91+
featureEnabled bool
92+
errShouldContain string
93+
peerCAFile string
94+
peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
95+
}{
96+
{
97+
name: "feature disabled but peerCAFile set",
98+
featureEnabled: false,
99+
peerCAFile: "foo",
100+
errShouldContain: "--peer-ca-file requires UnknownVersionInteroperabilityProxy feature to be turned on",
101+
},
102+
{
103+
name: "feature disabled but peerAdvertiseIP set",
104+
featureEnabled: false,
105+
peerAdvertiseAddress: peerreconcilers.PeerAdvertiseAddress{PeerAdvertiseIP: "1.2.3.4"},
106+
errShouldContain: "--peer-advertise-ip requires UnknownVersionInteroperabilityProxy feature to be turned on",
107+
},
108+
{
109+
name: "feature disabled but peerAdvertisePort set",
110+
featureEnabled: false,
111+
peerAdvertiseAddress: peerreconcilers.PeerAdvertiseAddress{PeerAdvertisePort: "1"},
112+
errShouldContain: "--peer-advertise-port requires UnknownVersionInteroperabilityProxy feature to be turned on",
113+
},
114+
}
115+
116+
for _, test := range tests {
117+
t.Run(test.name, func(t *testing.T) {
118+
options := &Options{
119+
PeerCAFile: test.peerCAFile,
120+
PeerAdvertiseAddress: test.peerAdvertiseAddress,
121+
}
122+
if test.featureEnabled {
123+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.UnknownVersionInteroperabilityProxy, true)()
124+
}
125+
var errMessageGot string
126+
if errs := validateUnknownVersionInteroperabilityProxyFlags(options); len(errs) > 0 {
127+
errMessageGot = errs[0].Error()
128+
}
129+
if !strings.Contains(errMessageGot, test.errShouldContain) {
130+
t.Errorf("Expected error message to contain: %q, but got: %q", test.errShouldContain, errMessageGot)
131+
}
132+
133+
})
134+
}
135+
}
136+
137+
func TestValidateUnknownVersionInteroperabilityProxyFeature(t *testing.T) {
138+
const conflict = "UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled"
139+
tests := []struct {
140+
name string
141+
featuresEnabled []featuregate.Feature
142+
}{
143+
{
144+
name: "enabled: UnknownVersionInteroperabilityProxy, disabled: StorageVersionAPI",
145+
featuresEnabled: []featuregate.Feature{features.UnknownVersionInteroperabilityProxy},
146+
},
147+
}
148+
149+
for _, test := range tests {
150+
t.Run(test.name, func(t *testing.T) {
151+
for _, feature := range test.featuresEnabled {
152+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, true)()
153+
}
154+
var errMessageGot string
155+
if errs := validateUnknownVersionInteroperabilityProxyFeature(); len(errs) > 0 {
156+
errMessageGot = errs[0].Error()
157+
}
158+
if !strings.Contains(errMessageGot, conflict) {
159+
t.Errorf("Expected error message to contain: %q, but got: %q", conflict, errMessageGot)
160+
}
161+
})
162+
}
163+
}
164+
83165
func TestValidateOptions(t *testing.T) {
84166
testCases := []struct {
85167
name string

0 commit comments

Comments
 (0)