Skip to content

Commit bc967e9

Browse files
rphillipsatiratree
authored andcommitted
UPSTREAM: <carry>: add management support to kubelet
UPSTREAM: <carry>: management workloads enhancement 741 UPSTREAM: <carry>: lower verbosity of managed workloads logging Support for managed workloads was introduced by PR#627. However, the the CPU manager reconcile loop now seems to flood kubelet log with "reconcileState: skipping pod; pod is managed" warnings. Lower the verbosity of these log messages. UPSTREAM: <carry>: set correctly static pods CPUs when workload partitioning is disabled UPSTREAM: <carry>: Remove reserved CPUs from default set Remove reserved CPUs from default set when workload partitioning is enabled. Co-Authored-By: Brent Rowsell <[email protected]> Signed-off-by: Artyom Lukianov <[email protected]> Signed-off-by: Don Penney <[email protected]> OpenShift-Rebase-Source: b762ced OpenShift-Rebase-Source: 63cf793 OpenShift-Rebase-Source: 32af64c UPSTREAM: <carry>: add management support to kubelet UPSTREAM: <carry>: OCPBUGS-29520: fix cpu manager default cpuset check in workload partitioned env (this can be squashed to 04070bb UPSTREAM: : add management support to kubelet) Workload partitioning makes the separation between reserved and workload cpus more strict. It is therefore expected the reserved cpus are NOT part of the default cpuset and the existing check was overzealous. First execution of kubelet after reboot never gets here as the cpuset is computed on line 209. However a kubelet restart without reboot skips this code, recovers from state file and runs the check on line 220. This was uncovered by decoupling the cpu manager state file cleanup from kubelet restart, doing it only once at reboot as part of OCPBUGS-24366 UPSTREAM: <carry>: add management workload check for guaranteed qos when static pods have workload partitioning enabled we should not alter their resources if they are Guaranteed QoS, this change adds a check for Guaranteed QoS Signed-off-by: ehila <[email protected]> test: add unit tests for error states Signed-off-by: ehila <[email protected]>
1 parent 2567b6a commit bc967e9

File tree

10 files changed

+1017
-14
lines changed

10 files changed

+1017
-14
lines changed

pkg/kubelet/cm/cpumanager/cpu_manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3636
"k8s.io/kubernetes/pkg/kubelet/config"
3737
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
38+
"k8s.io/kubernetes/pkg/kubelet/managed"
3839
"k8s.io/kubernetes/pkg/kubelet/status"
3940
"k8s.io/utils/cpuset"
4041
)
@@ -407,13 +408,18 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
407408
failure = []reconciledContainer{}
408409

409410
m.removeStaleState()
411+
workloadEnabled := managed.IsEnabled()
410412
for _, pod := range m.activePods() {
411413
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
412414
if !ok {
413415
klog.V(4).InfoS("ReconcileState: skipping pod; status not found", "pod", klog.KObj(pod))
414416
failure = append(failure, reconciledContainer{pod.Name, "", ""})
415417
continue
416418
}
419+
if enabled, _, _ := managed.IsPodManaged(pod); workloadEnabled && enabled {
420+
klog.V(4).InfoS("[cpumanager] reconcileState: skipping pod; pod is managed (pod: %s)", pod.Name)
421+
continue
422+
}
417423

418424
allContainers := pod.Spec.InitContainers
419425
allContainers = append(allContainers, pod.Spec.Containers...)

pkg/kubelet/cm/cpumanager/policy_static.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
3030
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3131
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
32+
"k8s.io/kubernetes/pkg/kubelet/managed"
3233
"k8s.io/kubernetes/pkg/kubelet/metrics"
3334
"k8s.io/kubernetes/pkg/kubelet/types"
3435
"k8s.io/utils/cpuset"
@@ -203,14 +204,20 @@ func (p *staticPolicy) validateState(s state.State) error {
203204
// state is empty initialize
204205
allCPUs := p.topology.CPUDetails.CPUs()
205206
s.SetDefaultCPUSet(allCPUs)
207+
if managed.IsEnabled() {
208+
defaultCpus := s.GetDefaultCPUSet().Difference(p.reservedCPUs)
209+
s.SetDefaultCPUSet(defaultCpus)
210+
}
206211
return nil
207212
}
208213

209214
// State has already been initialized from file (is not empty)
210215
// 1. Check if the reserved cpuset is not part of default cpuset because:
211216
// - kube/system reserved have changed (increased) - may lead to some containers not being able to start
212217
// - user tampered with file
213-
if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
218+
// 2. This only applies when managed mode is disabled. Active workload partitioning feature
219+
// removes the reserved cpus from the default cpu mask on purpose.
220+
if !managed.IsEnabled() && !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
214221
return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"",
215222
p.reservedCPUs.String(), tmpDefaultCPUset.String())
216223
}
@@ -241,9 +248,17 @@ func (p *staticPolicy) validateState(s state.State) error {
241248
}
242249
}
243250
totalKnownCPUs = totalKnownCPUs.Union(tmpCPUSets...)
244-
if !totalKnownCPUs.Equals(p.topology.CPUDetails.CPUs()) {
251+
availableCPUs := p.topology.CPUDetails.CPUs()
252+
253+
// CPU (workload) partitioning removes reserved cpus
254+
// from the default mask intentionally
255+
if managed.IsEnabled() {
256+
availableCPUs = availableCPUs.Difference(p.reservedCPUs)
257+
}
258+
259+
if !totalKnownCPUs.Equals(availableCPUs) {
245260
return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
246-
p.topology.CPUDetails.CPUs().String(), totalKnownCPUs.String())
261+
availableCPUs.String(), totalKnownCPUs.String())
247262
}
248263

249264
return nil

pkg/kubelet/cm/cpumanager/policy_static_test.go

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"reflect"
2222
"testing"
2323

24+
"k8s.io/kubernetes/pkg/kubelet/managed"
25+
2426
v1 "k8s.io/api/core/v1"
2527
utilfeature "k8s.io/apiserver/pkg/util/feature"
2628
featuregatetesting "k8s.io/component-base/featuregate/testing"
@@ -939,17 +941,18 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
939941
// above test cases are without kubelet --reserved-cpus cmd option
940942
// the following tests are with --reserved-cpus configured
941943
type staticPolicyTestWithResvList struct {
942-
description string
943-
topo *topology.CPUTopology
944-
numReservedCPUs int
945-
reserved cpuset.CPUSet
946-
stAssignments state.ContainerCPUAssignments
947-
stDefaultCPUSet cpuset.CPUSet
948-
pod *v1.Pod
949-
expErr error
950-
expNewErr error
951-
expCPUAlloc bool
952-
expCSet cpuset.CPUSet
944+
description string
945+
topo *topology.CPUTopology
946+
numReservedCPUs int
947+
reserved cpuset.CPUSet
948+
stAssignments state.ContainerCPUAssignments
949+
stDefaultCPUSet cpuset.CPUSet
950+
pod *v1.Pod
951+
expErr error
952+
expNewErr error
953+
expCPUAlloc bool
954+
expCSet cpuset.CPUSet
955+
managementPartition bool
953956
}
954957

955958
func TestStaticPolicyStartWithResvList(t *testing.T) {
@@ -981,9 +984,32 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
981984
stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
982985
expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"),
983986
},
987+
{
988+
description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled",
989+
topo: topoDualSocketHT,
990+
numReservedCPUs: 2,
991+
stAssignments: state.ContainerCPUAssignments{},
992+
managementPartition: true,
993+
expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
994+
},
995+
{
996+
description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled during recovery",
997+
topo: topoDualSocketHT,
998+
numReservedCPUs: 2,
999+
stAssignments: state.ContainerCPUAssignments{},
1000+
stDefaultCPUSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1001+
managementPartition: true,
1002+
expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1003+
},
9841004
}
9851005
for _, testCase := range testCases {
9861006
t.Run(testCase.description, func(t *testing.T) {
1007+
wasManaged := managed.IsEnabled()
1008+
managed.TestOnlySetEnabled(testCase.managementPartition)
1009+
defer func() {
1010+
managed.TestOnlySetEnabled(wasManaged)
1011+
}()
1012+
9871013
p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil)
9881014
if !reflect.DeepEqual(err, testCase.expNewErr) {
9891015
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",

pkg/kubelet/cm/qos_container_manager_linux.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/kubernetes/pkg/api/v1/resource"
3636
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
3737
kubefeatures "k8s.io/kubernetes/pkg/features"
38+
"k8s.io/kubernetes/pkg/kubelet/managed"
3839
)
3940

4041
const (
@@ -174,6 +175,9 @@ func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass]
174175
reuseReqs := make(v1.ResourceList, 4)
175176
for i := range pods {
176177
pod := pods[i]
178+
if enabled, _, _ := managed.IsPodManaged(pod); enabled {
179+
continue
180+
}
177181
qosClass := v1qos.GetPodQOS(pod)
178182
if qosClass != v1.PodQOSBurstable {
179183
// we only care about the burstable qos tier

pkg/kubelet/config/file.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/types"
3131
"k8s.io/client-go/tools/cache"
3232
api "k8s.io/kubernetes/pkg/apis/core"
33+
"k8s.io/kubernetes/pkg/kubelet/managed"
3334
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
3435
utilio "k8s.io/utils/io"
3536
)
@@ -230,6 +231,16 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
230231
if podErr != nil {
231232
return pod, podErr
232233
}
234+
if managed.IsEnabled() {
235+
if newPod, _, err := managed.ModifyStaticPodForPinnedManagement(pod); err != nil {
236+
klog.V(2).Error(err, "Static Pod is managed but errored", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
237+
} else if newPod != nil {
238+
klog.V(2).InfoS("Static Pod is managed. Using modified pod", "name", newPod.ObjectMeta.Name, "namespace", newPod.ObjectMeta.Namespace, "annotations", newPod.Annotations)
239+
pod = newPod
240+
} else {
241+
klog.V(2).InfoS("Static Pod is not managed", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
242+
}
243+
}
233244
return pod, nil
234245
}
235246

pkg/kubelet/kubelet.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ import (
8989
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
9090
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
9191
"k8s.io/kubernetes/pkg/kubelet/logs"
92+
"k8s.io/kubernetes/pkg/kubelet/managed"
9293
"k8s.io/kubernetes/pkg/kubelet/metrics"
9394
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
9495
"k8s.io/kubernetes/pkg/kubelet/network/dns"
@@ -630,6 +631,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
630631

631632
klet.runtimeService = kubeDeps.RemoteRuntimeService
632633

634+
if managed.IsEnabled() {
635+
klog.InfoS("Pinned Workload Management Enabled")
636+
}
637+
633638
if kubeDeps.KubeClient != nil {
634639
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
635640
}

pkg/kubelet/kubelet_node_status.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ import (
3838
"k8s.io/klog/v2"
3939
kubeletapis "k8s.io/kubelet/pkg/apis"
4040
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
41+
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
4142
"k8s.io/kubernetes/pkg/kubelet/events"
43+
"k8s.io/kubernetes/pkg/kubelet/managed"
4244
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
4345
taintutil "k8s.io/kubernetes/pkg/util/taints"
4446
volutil "k8s.io/kubernetes/pkg/volume/util"
@@ -117,6 +119,9 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
117119
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
118120
requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
119121
requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate
122+
if managed.IsEnabled() {
123+
requiresUpdate = kl.addManagementNodeCapacity(node, existingNode) || requiresUpdate
124+
}
120125
if requiresUpdate {
121126
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
122127
klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
@@ -127,6 +132,25 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
127132
return true
128133
}
129134

135+
// addManagementNodeCapacity adds the managednode capacity to the node
136+
func (kl *Kubelet) addManagementNodeCapacity(initialNode, existingNode *v1.Node) bool {
137+
updateDefaultResources(initialNode, existingNode)
138+
machineInfo, err := kl.cadvisor.MachineInfo()
139+
if err != nil {
140+
klog.Errorf("Unable to calculate managed node capacity for %q: %v", kl.nodeName, err)
141+
return false
142+
}
143+
cpuRequest := cadvisor.CapacityFromMachineInfo(machineInfo)[v1.ResourceCPU]
144+
cpuRequestInMilli := cpuRequest.MilliValue()
145+
newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)
146+
managedResourceName := managed.GenerateResourceName("management")
147+
if existingCapacity, ok := existingNode.Status.Capacity[managedResourceName]; ok && existingCapacity.Equal(*newCPURequest) {
148+
return false
149+
}
150+
existingNode.Status.Capacity[managedResourceName] = *newCPURequest
151+
return true
152+
}
153+
130154
// reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported
131155
func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
132156
requiresUpdate := updateDefaultResources(initialNode, existingNode)
@@ -418,6 +442,9 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
418442
}
419443
}
420444
}
445+
if managed.IsEnabled() {
446+
kl.addManagementNodeCapacity(node, node)
447+
}
421448

422449
kl.setNodeStatus(ctx, node)
423450

pkg/kubelet/managed/cpu_shares.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package managed
2+
3+
const (
4+
// These limits are defined in the kernel:
5+
// https://github.com/torvalds/linux/blob/0bddd227f3dc55975e2b8dfa7fc6f959b062a2c7/kernel/sched/sched.h#L427-L428
6+
MinShares = 2
7+
MaxShares = 262144
8+
9+
SharesPerCPU = 1024
10+
MilliCPUToCPU = 1000
11+
)
12+
13+
// MilliCPUToShares converts the milliCPU to CFS shares.
14+
func MilliCPUToShares(milliCPU int64) uint64 {
15+
if milliCPU == 0 {
16+
// Docker converts zero milliCPU to unset, which maps to kernel default
17+
// for unset: 1024. Return 2 here to really match kernel default for
18+
// zero milliCPU.
19+
return MinShares
20+
}
21+
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
22+
shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU
23+
if shares < MinShares {
24+
return MinShares
25+
}
26+
if shares > MaxShares {
27+
return MaxShares
28+
}
29+
return uint64(shares)
30+
}

0 commit comments

Comments
 (0)