diff --git a/pkg/resourcecache/rebuilder.go b/pkg/resourcecache/rebuilder.go index 863d36e1c..c460760ab 100644 --- a/pkg/resourcecache/rebuilder.go +++ b/pkg/resourcecache/rebuilder.go @@ -2,7 +2,6 @@ package resourcecache import ( "fmt" - "net/http" "sync" "time" @@ -100,21 +99,31 @@ func (rc *ResourceCache) getAllResources(lctx *lmctx.LMContext) (*Store, error) return tmpStore, nil } -func (rc *ResourceCache) getDevices(lctx *lmctx.LMContext, grpID int32) (*lm.GetImmediateDeviceListByDeviceGroupIDOK, error) { +func (rc *ResourceCache) getDevices(lctx *lmctx.LMContext, grpID int32) ([]*models.Device, error) { conf, err := config.GetConfig(lctx) if err != nil { return nil, err } clctx := lctx.LMContextWith(map[string]interface{}{constants.PartitionKey: conf.ClusterName}) - params := lm.NewGetImmediateDeviceListByDeviceGroupIDParams() - params.SetID(grpID) - command := rc.LMRequester.GetImmediateResourceListByResourceGroupIDCommand(clctx, params) - resp, err := rc.LMRequester.SendReceive(clctx, command) - if err != nil { - return nil, err + var result []*models.Device + totalReceived := int32(0) + for { + params := lm.NewGetImmediateDeviceListByDeviceGroupIDParams() + params.SetID(grpID) + params.SetOffset(&totalReceived) + command := rc.LMRequester.GetImmediateResourceListByResourceGroupIDCommand(clctx, params) + resp, err := rc.LMRequester.SendReceive(clctx, command) + if err != nil { + return nil, err + } + result = append(result, resp.(*lm.GetImmediateDeviceListByDeviceGroupIDOK).Payload.Items...) + totalReceived += int32(len(resp.(*lm.GetImmediateDeviceListByDeviceGroupIDOK).Payload.Items)) + if totalReceived >= resp.(*lm.GetImmediateDeviceListByDeviceGroupIDOK).Payload.Total { + break + } } - return resp.(*lm.GetImmediateDeviceListByDeviceGroupIDOK), nil + return result, nil } func (rc *ResourceCache) fetchGroupDevices(lctx *lmctx.LMContext, inChan <-chan int32, outChan chan<- *models.Device) { @@ -146,10 +155,8 @@ func (rc *ResourceCache) ResourceGroupProcessor(lctx *lmctx.LMContext, inChan <- if err != nil { log.Warnf("fetch resources for %v failed with %v", grpID, err) } - if resp != nil && util.GetHTTPStatusCodeFromLMSDKError(resp) == http.StatusOK { - for _, resource := range resp.Payload.Items { - outChan <- resource - } + for _, resource := range resp { + outChan <- resource } } } @@ -173,6 +180,7 @@ func (rc *ResourceCache) accumulateDeviceCache(lctx *lmctx.LMContext, inChan <-c log.Debugf("New cache map : %v", store) } +// nolint: cyclop func (rc *ResourceCache) storeDevice(lctx *lmctx.LMContext, resourceObj *models.Device, clusterName string, store *Store) bool { log := lmlog.Logger(lctx) if resourceObj == nil || @@ -206,6 +214,15 @@ func (rc *ResourceCache) storeDevice(lctx *lmctx.LMContext, resourceObj *models. return false } + if emeta, ok := store.Exists(lctx, key, meta.Container); ok && emeta.LMID != meta.LMID { + if emeta.CreatedOn > meta.CreatedOn { + emeta.Container += "-dupl" + store.Set(lctx, key, emeta) + } else { + meta.Container += "-dupl" + } + } + store.Set(lctx, key, meta) return true } diff --git a/pkg/resourcecache/resource_cache.go b/pkg/resourcecache/resource_cache.go index 2372f35c0..a9d3845bc 100644 --- a/pkg/resourcecache/resource_cache.go +++ b/pkg/resourcecache/resource_cache.go @@ -303,7 +303,7 @@ func (rc *ResourceCache) SoftRefresh(lctx *lmctx.LMContext, container string) { for _, meta := range list { resp, err := rc.getDevices(lctx, meta.LMID) if err != nil && resp != nil { - for _, resource := range resp.Payload.Items { + for _, resource := range resp { rc.storeDevice(lctx, resource, conf.ClusterName, rc.store) } } diff --git a/pkg/sync/initsyncer.go b/pkg/sync/initsyncer.go index 9bfad79e3..fc344c98d 100644 --- a/pkg/sync/initsyncer.go +++ b/pkg/sync/initsyncer.go @@ -73,6 +73,28 @@ func (i *InitSyncer) Sync(lctx *lmctx.LMContext) { list := i.ResourceManager.GetResourceCache().List() log.Tracef("Current cache: %v", list) + log.Infof("Deleting duplicate resources if any") + for _, entry := range list { + log.Tracef("Iterate resource cache entry : %v ", entry) + cacheResourceName := entry.K + cacheResourceMeta := entry.V + + if ignoreSync[cacheResourceName.Resource] || cacheResourceName.Resource == enums.Namespaces { + continue + } + + if strings.HasSuffix(cacheResourceMeta.Container, "-dupl") { + childLctx := lmlog.LMContextWithFields(lctx, logrus.Fields{ + "name": cacheResourceName.Resource.FQName(cacheResourceName.Name), + "type": cacheResourceName.Resource.Singular(), + "ns": cacheResourceMeta.Container, + "event": "sync", + }) + childLctx = childLctx.LMContextWith(map[string]interface{}{constants.PartitionKey: fmt.Sprintf("%s-%s", cacheResourceName.Resource.String(), cacheResourceName.Name)}) + i.deleteResource(childLctx, cacheResourceName, cacheResourceMeta) + } + } + for _, entry := range list { log.Tracef("Iterate resource cache entry : %v ", entry) cacheResourceName := entry.K @@ -102,10 +124,9 @@ func (i *InitSyncer) Sync(lctx *lmctx.LMContext) { if !ok || clusterPresentMeta.UID != cacheResourceMeta.UID || (conf.RegisterGenericFilter && !util.EvaluateExclusion(clusterPresentMeta.Labels)) { - - i.deleteResource(childLctx, log, cacheResourceName, cacheResourceMeta) + i.deleteResource(childLctx, cacheResourceName, cacheResourceMeta) } else if resolveConflicts { - i.resolveConflicts(childLctx, cacheResourceMeta, clusterPresentMeta, cacheResourceName, log) + i.resolveConflicts(childLctx, cacheResourceMeta, clusterPresentMeta, cacheResourceName) } } @@ -140,7 +161,8 @@ func (i *InitSyncer) deleteNamespace(allK8SResourcesStore *resourcecache.Store, } // nolint: gocognit -func (i *InitSyncer) resolveConflicts(lctx *lmctx.LMContext, cacheMeta types.ResourceMeta, clusterResourceMeta types.ResourceMeta, cacheResourceName types.ResourceName, log *logrus.Entry) { +func (i *InitSyncer) resolveConflicts(lctx *lmctx.LMContext, cacheMeta types.ResourceMeta, clusterResourceMeta types.ResourceMeta, cacheResourceName types.ResourceName) { + log := lmlog.Logger(lctx) rt := cacheResourceName.Resource if clusterResourceMeta.DisplayName != cacheMeta.DisplayName || cacheMeta.HasSysCategory(rt.GetConflictsCategory()) { conf, err := config.GetConfig(lctx) @@ -171,7 +193,8 @@ func (i *InitSyncer) resolveConflicts(lctx *lmctx.LMContext, cacheMeta types.Res } } -func (i *InitSyncer) deleteResource(lctx *lmctx.LMContext, log *logrus.Entry, resourceName types.ResourceName, resourceMeta types.ResourceMeta) { +func (i *InitSyncer) deleteResource(lctx *lmctx.LMContext, resourceName types.ResourceName, resourceMeta types.ResourceMeta) { + log := lmlog.Logger(lctx) conf, err := config.GetConfig(lctx) if err != nil { log.Errorf("Failed to get config") diff --git a/pkg/types/common.go b/pkg/types/common.go index 1271b0d66..f7a343092 100644 --- a/pkg/types/common.go +++ b/pkg/types/common.go @@ -44,6 +44,7 @@ type ResourceMeta struct { Labels map[string]string `json:"labels"` SysCategories []string `json:"sys_categories"` UID k8stypes.UID `json:"uid"` + CreatedOn int64 `json:"created_on"` } func (resourceMeta ResourceMeta) HasSysCategory(category string) bool { diff --git a/pkg/utilities/utilities.go b/pkg/utilities/utilities.go index a8c02bbd4..fc20434cf 100644 --- a/pkg/utilities/utilities.go +++ b/pkg/utilities/utilities.go @@ -173,6 +173,7 @@ func GetResourceMetaFromResource(resource *models.Device) (types.ResourceMeta, e Labels: labels, SysCategories: categories, UID: k8stypes.UID(GetResourcePropertyValue(resource, constants.K8sResourceUIDPropertyKey)), + CreatedOn: resource.CreatedOn, }, nil }