@@ -17,7 +17,13 @@ package rm
1717import (
1818 "context"
1919 "crypto/rand"
20+ "crypto/tls"
21+ "crypto/x509"
22+ "encoding/json"
23+ "io"
2024 "math/big"
25+ "net"
26+ "net/http"
2127 "os"
2228 "sort"
2329 "strconv"
@@ -37,6 +43,7 @@ import (
3743 pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
3844 podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
3945 "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
46+ "k8s.io/utils/strings/slices"
4047)
4148
4249const (
@@ -49,6 +56,13 @@ const (
4956 grpcAddress = "unix:///var/lib/kubelet/pod-resources/kubelet.sock"
5057 grpcBufferSize = 4 * 1024 * 1024
5158 grpcTimeout = 5 * time .Second
59+
60+ kubeletAPITimeout = 5 * time .Second
61+ kubeletAPIMaxRetries = 5
62+ kubeletHTTPSCertPath = "/var/lib/kubelet/pki/kubelet.crt"
63+ // This is detected incorrectly as credentials
64+ //nolint:gosec
65+ serviceAccountTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
5266)
5367
5468// Errors.
@@ -102,12 +116,14 @@ type resourceManager struct {
102116 prGetClientFunc getClientFunc
103117 assignments map [string ]podAssignmentDetails // pod name -> assignment details
104118 nodeName string
119+ hostIP string
105120 skipID string
106121 fullResourceName string
107122 retryTimeout time.Duration
108123 cleanupInterval time.Duration
109124 mutex sync.RWMutex // for devTree updates during scan
110125 cleanupMutex sync.RWMutex // for assignment details during cleanup
126+ useKubelet bool
111127}
112128
113129// NewDeviceInfo creates a new DeviceInfo.
@@ -137,17 +153,30 @@ func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error
137153
138154 rm := resourceManager {
139155 nodeName : os .Getenv ("NODE_NAME" ),
156+ hostIP : os .Getenv ("HOST_IP" ),
140157 clientset : clientset ,
141158 skipID : skipID ,
142159 fullResourceName : fullResourceName ,
143160 prGetClientFunc : podresources .GetV1Client ,
144161 assignments : make (map [string ]podAssignmentDetails ),
145162 retryTimeout : 1 * time .Second ,
146163 cleanupInterval : 20 * time .Minute ,
164+ useKubelet : true ,
147165 }
148166
149167 klog .Info ("GPU device plugin resource manager enabled" )
150168
169+ // Try listing Pods once to detect if Kubelet API works
170+ _ , err = rm .listPodsFromKubelet ()
171+
172+ if err != nil {
173+ klog .V (2 ).Info ("Not using Kubelet API" )
174+
175+ rm .useKubelet = false
176+ } else {
177+ klog .V (2 ).Info ("Using Kubelet API" )
178+ }
179+
151180 go func () {
152181 getRandDuration := func () time.Duration {
153182 cleanupIntervalSeconds := int (rm .cleanupInterval .Seconds ())
@@ -167,10 +196,7 @@ func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error
167196 // Gather both running and pending pods. It might happen that
168197 // cleanup is triggered between GetPreferredAllocation and Allocate
169198 // and it would remove the assignment data for the soon-to-be allocated pod
170- running := rm .listPodsOnNodeWithState (string (v1 .PodRunning ))
171- for podName , podItem := range rm .listPodsOnNodeWithState (string (v1 .PodPending )) {
172- running [podName ] = podItem
173- }
199+ running := rm .listPodsOnNodeWithStates ([]string {string (v1 .PodRunning ), string (v1 .PodPending )})
174200
175201 func () {
176202 rm .cleanupMutex .Lock ()
@@ -201,29 +227,141 @@ func getPodResourceKey(res *podresourcesv1.PodResources) string {
201227 return res .Namespace + "&" + res .Name
202228}
203229
204- func (rm * resourceManager ) listPodsOnNodeWithState (state string ) map [string ]* v1.Pod {
205- pods := make (map [string ]* v1.Pod )
206-
207- selector , err := fields .ParseSelector ("spec.nodeName=" + rm .nodeName +
208- ",status.phase=" + state )
230+ func (rm * resourceManager ) listPodsFromAPIServer () (* v1.PodList , error ) {
231+ selector , err := fields .ParseSelector ("spec.nodeName=" + rm .nodeName )
209232
210233 if err != nil {
211- return pods
234+ return & v1. PodList {}, err
212235 }
213236
237+ klog .V (4 ).Info ("Requesting pods from API server" )
238+
214239 podList , err := rm .clientset .CoreV1 ().Pods (v1 .NamespaceAll ).List (context .Background (), metav1.ListOptions {
215240 FieldSelector : selector .String (),
216241 })
217242
243+ if err != nil {
244+ klog .Error ("pod listing failed:" , err )
245+
246+ if err != nil {
247+ return & v1.PodList {}, err
248+ }
249+ }
250+
251+ return podList , nil
252+ }
253+
254+ // +kubebuilder:rbac:groups="",resources=nodes/proxy,verbs=list;get
255+
256+ func (rm * resourceManager ) listPodsFromKubelet () (* v1.PodList , error ) {
257+ var podList v1.PodList
258+
259+ token , err := os .ReadFile (serviceAccountTokenPath )
260+ if err != nil {
261+ klog .Warning ("Failed to read token for kubelet API access: " , err )
262+
263+ return & podList , err
264+ }
265+
266+ kubeletCert , err := os .ReadFile (kubeletHTTPSCertPath )
267+ if err != nil {
268+ klog .Warning ("Failed to read kubelet cert: " , err )
269+
270+ return & podList , err
271+ }
272+
273+ certPool := x509 .NewCertPool ()
274+ certPool .AppendCertsFromPEM (kubeletCert )
275+
276+ // There isn't an official documentation for the kubelet API. There is a blog post:
277+ // https://www.deepnetwork.com/blog/2020/01/13/kubelet-api.html
278+ // And a tool to work with the API:
279+ // https://github.com/cyberark/kubeletctl
280+
281+ kubeletURL := "https://" + rm .hostIP + ":10250/pods"
282+ req , _ := http .NewRequestWithContext (context .Background (), "GET" , kubeletURL , nil )
283+ req .Header .Set ("Authorization" , "Bearer " + string (token ))
284+
285+ tr := & http.Transport {
286+ TLSClientConfig : & tls.Config {
287+ MinVersion : tls .VersionTLS12 ,
288+ RootCAs : certPool ,
289+ ServerName : rm .nodeName ,
290+ },
291+ }
292+ client := & http.Client {
293+ Timeout : kubeletAPITimeout ,
294+ Transport : tr ,
295+ }
296+
297+ klog .V (4 ).Infof ("Requesting pods from kubelet (%s)" , kubeletURL )
298+
299+ resp , err := (* client ).Do (req )
300+ if err != nil {
301+ klog .Warning ("Failed to read pods from kubelet API: " , err )
302+
303+ return & podList , err
304+ }
305+
306+ body , err := io .ReadAll (resp .Body )
307+ if err != nil {
308+ klog .Warning ("Failed to read http response body: " , err )
309+
310+ return & podList , err
311+ }
312+
313+ resp .Body .Close ()
314+
315+ err = json .Unmarshal (body , & podList )
316+ if err != nil {
317+ klog .Warning ("Failed to unmarshal PodList from response: " , err )
318+
319+ return & podList , err
320+ }
321+
322+ return & podList , nil
323+ }
324+
325+ func (rm * resourceManager ) listPods () (* v1.PodList , error ) {
326+ // Try to use kubelet API as long as it provides listings within retries
327+ if rm .useKubelet {
328+ var neterr net.Error
329+
330+ for i := 0 ; i < kubeletAPIMaxRetries ; i ++ {
331+ if podList , err := rm .listPodsFromKubelet (); err == nil {
332+ return podList , nil
333+ } else if errors .As (err , neterr ); neterr .Timeout () {
334+ continue
335+ }
336+
337+ // If error is non-timeout, break to stop using kubelet API
338+ break
339+ }
340+
341+ klog .Warning ("Stopping Kubelet API use due to error/timeout" )
342+
343+ rm .useKubelet = false
344+ }
345+
346+ return rm .listPodsFromAPIServer ()
347+ }
348+
349+ func (rm * resourceManager ) listPodsOnNodeWithStates (states []string ) map [string ]* v1.Pod {
350+ pods := make (map [string ]* v1.Pod )
351+
352+ podList , err := rm .listPods ()
218353 if err != nil {
219354 klog .Error ("pod listing failed:" , err )
220355
221356 return pods
222357 }
223358
224359 for i := range podList .Items {
225- key := getPodKey (& podList .Items [i ])
226- pods [key ] = & podList .Items [i ]
360+ phase := string (podList .Items [i ].Status .Phase )
361+ if slices .Contains (states , phase ) {
362+ key := getPodKey (& podList .Items [i ])
363+ pods [key ] = & podList .Items [i ]
364+ }
227365 }
228366
229367 return pods
@@ -528,7 +666,7 @@ func (rm *resourceManager) findAllocationPodCandidate() (*podCandidate, error) {
528666
529667// getNodePendingGPUPods returns a map of pod names -> pods that are pending and use the gpu.
530668func (rm * resourceManager ) getNodePendingGPUPods () (map [string ]* v1.Pod , error ) {
531- pendingPods := rm .listPodsOnNodeWithState ( string (v1 .PodPending ))
669+ pendingPods := rm .listPodsOnNodeWithStates ([] string { string (v1 .PodPending )} )
532670
533671 for podName , pod := range pendingPods {
534672 if numGPUUsingContainers (pod , rm .fullResourceName ) == 0 {
0 commit comments