Skip to content

Commit 735be02

Browse files
committed
Make CRDs built and aggregated lazily for oasv2
1 parent b2a9c06 commit 735be02

File tree

3 files changed

+639
-75
lines changed

3 files changed

+639
-75
lines changed

staging/src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go

Lines changed: 102 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,18 @@ package openapi
1818

1919
import (
2020
"fmt"
21-
"reflect"
2221
"sync"
2322
"time"
2423

24+
"github.com/google/uuid"
2525
"k8s.io/apimachinery/pkg/api/errors"
2626
"k8s.io/apimachinery/pkg/labels"
2727
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2828
"k8s.io/apimachinery/pkg/util/wait"
2929
"k8s.io/client-go/tools/cache"
3030
"k8s.io/client-go/util/workqueue"
3131
"k8s.io/klog/v2"
32+
"k8s.io/kube-openapi/pkg/cached"
3233
"k8s.io/kube-openapi/pkg/handler"
3334
"k8s.io/kube-openapi/pkg/validation/spec"
3435

@@ -49,21 +50,69 @@ type Controller struct {
4950

5051
queue workqueue.RateLimitingInterface
5152

52-
staticSpec *spec.Swagger
53+
staticSpec *spec.Swagger
54+
5355
openAPIService *handler.OpenAPIService
5456

55-
// specs per version and per CRD name
56-
lock sync.Mutex
57-
crdSpecs map[string]map[string]*spec.Swagger
57+
// specs by name. The specs are lazily constructed on request.
58+
// The lock is for the map only.
59+
lock sync.Mutex
60+
specsByName map[string]*specCache
61+
}
62+
63+
// specCache holds the merged version spec for a CRD as well as the CRD object.
64+
// The spec is created lazily from the CRD object on request.
65+
// The mergedVersionSpec is only created on instantiation and is never
66+
// changed. crdCache is a cached.Replaceable and updates are thread
67+
// safe. Thus, no lock is needed to protect this struct.
68+
type specCache struct {
69+
crdCache cached.Replaceable[*apiextensionsv1.CustomResourceDefinition]
70+
mergedVersionSpec cached.Data[*spec.Swagger]
71+
}
72+
73+
func (s *specCache) update(crd *apiextensionsv1.CustomResourceDefinition) {
74+
s.crdCache.Replace(cached.NewResultOK(crd, generateCRDHash(crd)))
75+
}
76+
77+
func createSpecCache(crd *apiextensionsv1.CustomResourceDefinition) *specCache {
78+
s := specCache{}
79+
s.update(crd)
80+
81+
s.mergedVersionSpec = cached.NewTransformer[*apiextensionsv1.CustomResourceDefinition](func(result cached.Result[*apiextensionsv1.CustomResourceDefinition]) cached.Result[*spec.Swagger] {
82+
if result.Err != nil {
83+
// This should never happen, but return the err if it does.
84+
return cached.NewResultErr[*spec.Swagger](result.Err)
85+
}
86+
crd := result.Data
87+
mergeSpec := &spec.Swagger{}
88+
for _, v := range crd.Spec.Versions {
89+
if !v.Served {
90+
continue
91+
}
92+
s, err := builder.BuildOpenAPIV2(crd, v.Name, builder.Options{V2: true})
93+
// Defaults must be pruned here for CRDs to cleanly merge with the static
94+
// spec that already has defaults pruned
95+
if err != nil {
96+
return cached.NewResultErr[*spec.Swagger](err)
97+
}
98+
s.Definitions = handler.PruneDefaults(s.Definitions)
99+
mergeSpec, err = builder.MergeSpecs(mergeSpec, s)
100+
if err != nil {
101+
return cached.NewResultErr[*spec.Swagger](err)
102+
}
103+
}
104+
return cached.NewResultOK(mergeSpec, generateCRDHash(crd))
105+
}, &s.crdCache)
106+
return &s
58107
}
59108

60109
// NewController creates a new Controller with input CustomResourceDefinition informer
61110
func NewController(crdInformer informers.CustomResourceDefinitionInformer) *Controller {
62111
c := &Controller{
63-
crdLister: crdInformer.Lister(),
64-
crdsSynced: crdInformer.Informer().HasSynced,
65-
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_openapi_controller"),
66-
crdSpecs: map[string]map[string]*spec.Swagger{},
112+
crdLister: crdInformer.Lister(),
113+
crdsSynced: crdInformer.Informer().HasSynced,
114+
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_openapi_controller"),
115+
specsByName: map[string]*specCache{},
67116
}
68117

69118
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -102,18 +151,9 @@ func (c *Controller) Run(staticSpec *spec.Swagger, openAPIService *handler.OpenA
102151
if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
103152
continue
104153
}
105-
newSpecs, changed, err := buildVersionSpecs(crd, nil)
106-
if err != nil {
107-
utilruntime.HandleError(fmt.Errorf("failed to build OpenAPI spec of CRD %s: %v", crd.Name, err))
108-
} else if !changed {
109-
continue
110-
}
111-
c.crdSpecs[crd.Name] = newSpecs
112-
}
113-
if err := c.updateSpecLocked(); err != nil {
114-
utilruntime.HandleError(fmt.Errorf("failed to initially create OpenAPI spec for CRDs: %v", err))
115-
return
154+
c.specsByName[crd.Name] = createSpecCache(crd)
116155
}
156+
c.updateSpecLocked()
117157

118158
// only start one worker thread since its a slow moving API
119159
go wait.Until(c.runWorker, time.Second, stopCh)
@@ -164,76 +204,59 @@ func (c *Controller) sync(name string) error {
164204

165205
// do we have to remove all specs of this CRD?
166206
if errors.IsNotFound(err) || !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
167-
if _, found := c.crdSpecs[name]; !found {
207+
if _, found := c.specsByName[name]; !found {
168208
return nil
169209
}
170-
delete(c.crdSpecs, name)
210+
delete(c.specsByName, name)
171211
klog.V(2).Infof("Updating CRD OpenAPI spec because %s was removed", name)
172212
regenerationCounter.With(map[string]string{"crd": name, "reason": "remove"})
173-
return c.updateSpecLocked()
213+
c.updateSpecLocked()
214+
return nil
174215
}
175216

176-
// compute CRD spec and see whether it changed
177-
oldSpecs, updated := c.crdSpecs[crd.Name]
178-
newSpecs, changed, err := buildVersionSpecs(crd, oldSpecs)
179-
if err != nil {
180-
return err
181-
}
182-
if !changed {
217+
// If CRD spec already exists, update the CRD.
218+
// specCache.update() includes the ETag so an update on a spec
219+
// resulting in the same ETag will be a noop.
220+
s, exists := c.specsByName[crd.Name]
221+
if exists {
222+
s.update(crd)
223+
klog.V(2).Infof("Updating CRD OpenAPI spec because %s changed", name)
224+
regenerationCounter.With(map[string]string{"crd": name, "reason": "update"})
183225
return nil
184226
}
185227

186-
// update specs of this CRD
187-
c.crdSpecs[crd.Name] = newSpecs
228+
c.specsByName[crd.Name] = createSpecCache(crd)
188229
klog.V(2).Infof("Updating CRD OpenAPI spec because %s changed", name)
189-
reason := "add"
190-
if updated {
191-
reason = "update"
192-
}
193-
regenerationCounter.With(map[string]string{"crd": name, "reason": reason})
194-
return c.updateSpecLocked()
230+
regenerationCounter.With(map[string]string{"crd": name, "reason": "add"})
231+
c.updateSpecLocked()
232+
return nil
195233
}
196234

197-
func buildVersionSpecs(crd *apiextensionsv1.CustomResourceDefinition, oldSpecs map[string]*spec.Swagger) (map[string]*spec.Swagger, bool, error) {
198-
newSpecs := map[string]*spec.Swagger{}
199-
anyChanged := false
200-
for _, v := range crd.Spec.Versions {
201-
if !v.Served {
202-
continue
203-
}
204-
spec, err := builder.BuildOpenAPIV2(crd, v.Name, builder.Options{V2: true})
205-
// Defaults must be pruned here for CRDs to cleanly merge with the static
206-
// spec that already has defaults pruned
207-
spec.Definitions = handler.PruneDefaults(spec.Definitions)
208-
if err != nil {
209-
return nil, false, err
210-
}
211-
newSpecs[v.Name] = spec
212-
if oldSpecs[v.Name] == nil || !reflect.DeepEqual(oldSpecs[v.Name], spec) {
213-
anyChanged = true
214-
}
215-
}
216-
if !anyChanged && len(oldSpecs) == len(newSpecs) {
217-
return newSpecs, false, nil
235+
// updateSpecLocked updates the cached spec graph.
236+
func (c *Controller) updateSpecLocked() {
237+
specList := make([]cached.Data[*spec.Swagger], 0, len(c.specsByName))
238+
for crd := range c.specsByName {
239+
specList = append(specList, c.specsByName[crd].mergedVersionSpec)
218240
}
219241

220-
return newSpecs, true, nil
221-
}
222-
223-
// updateSpecLocked aggregates all OpenAPI specs and updates openAPIService.
224-
// It is not thread-safe. The caller is responsible to hold proper lock (Controller.lock).
225-
func (c *Controller) updateSpecLocked() error {
226-
crdSpecs := []*spec.Swagger{}
227-
for _, versionSpecs := range c.crdSpecs {
228-
for _, s := range versionSpecs {
229-
crdSpecs = append(crdSpecs, s)
242+
cache := cached.NewListMerger(func(results []cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] {
243+
localCRDSpec := make([]*spec.Swagger, 0, len(results))
244+
for k := range results {
245+
if results[k].Err == nil {
246+
localCRDSpec = append(localCRDSpec, results[k].Data)
247+
}
230248
}
231-
}
232-
mergedSpec, err := builder.MergeSpecs(c.staticSpec, crdSpecs...)
233-
if err != nil {
234-
return fmt.Errorf("failed to merge specs: %v", err)
235-
}
236-
return c.openAPIService.UpdateSpec(mergedSpec)
249+
mergedSpec, err := builder.MergeSpecs(c.staticSpec, localCRDSpec...)
250+
if err != nil {
251+
return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to merge specs: %v", err))
252+
}
253+
// A UUID is returned for the etag because we will only
254+
// create a new merger when a CRD has changed. A hash based
255+
// etag is more expensive because the CRDs are not
256+
// premarshalled.
257+
return cached.NewResultOK(mergedSpec, uuid.New().String())
258+
}, specList)
259+
c.openAPIService.UpdateSpecLazy(cache)
237260
}
238261

239262
func (c *Controller) addCustomResourceDefinition(obj interface{}) {
@@ -269,3 +292,7 @@ func (c *Controller) deleteCustomResourceDefinition(obj interface{}) {
269292
func (c *Controller) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
270293
c.queue.Add(obj.Name)
271294
}
295+
296+
func generateCRDHash(crd *apiextensionsv1.CustomResourceDefinition) string {
297+
return fmt.Sprintf("%s,%d", crd.UID, crd.Generation)
298+
}

0 commit comments

Comments
 (0)