@@ -16,14 +16,22 @@ package rm
1616
1717import (
1818 "context"
19- "math/rand"
19+ "crypto/tls"
20+ "crypto/x509"
21+ "encoding/json"
22+ "io"
23+ "math/big"
24+ "net"
25+ "net/http"
2026 "os"
2127 "sort"
2228 "strconv"
2329 "strings"
2430 "sync"
2531 "time"
2632
33+ cryptorand "crypto/rand"
34+
2735 dpapi "github.com/intel/intel-device-plugins-for-kubernetes/pkg/deviceplugin"
2836 "github.com/pkg/errors"
2937 "google.golang.org/grpc"
@@ -36,6 +44,7 @@ import (
3644 pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
3745 podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
3846 "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
47+ "k8s.io/utils/strings/slices"
3948)
4049
4150const (
@@ -48,6 +57,13 @@ const (
4857 grpcAddress = "unix:///var/lib/kubelet/pod-resources/kubelet.sock"
4958 grpcBufferSize = 4 * 1024 * 1024
5059 grpcTimeout = 5 * time .Second
60+
61+ kubeletAPITimeout = 5 * time .Second
62+ kubeletAPIMaxRetries = 5
63+ kubeletHTTPSCertPath = "/var/lib/kubelet/pki/kubelet.crt"
64+ // This is detected incorrectly as credentials
65+ //nolint:gosec
66+ serviceAccountTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
5167)
5268
5369// Errors.
@@ -101,12 +117,14 @@ type resourceManager struct {
101117 prGetClientFunc getClientFunc
102118 assignments map [string ]podAssignmentDetails // pod name -> assignment details
103119 nodeName string
120+ hostIP string
104121 skipID string
105122 fullResourceName string
106123 retryTimeout time.Duration
107124 cleanupInterval time.Duration
108125 mutex sync.RWMutex // for devTree updates during scan
109126 cleanupMutex sync.RWMutex // for assignment details during cleanup
127+ useKubelet bool
110128}
111129
112130// NewDeviceInfo creates a new DeviceInfo.
@@ -136,37 +154,50 @@ func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error
136154
137155 rm := resourceManager {
138156 nodeName : os .Getenv ("NODE_NAME" ),
157+ hostIP : os .Getenv ("HOST_IP" ),
139158 clientset : clientset ,
140159 skipID : skipID ,
141160 fullResourceName : fullResourceName ,
142161 prGetClientFunc : podresources .GetV1Client ,
143162 assignments : make (map [string ]podAssignmentDetails ),
144163 retryTimeout : 1 * time .Second ,
145164 cleanupInterval : 20 * time .Minute ,
165+ useKubelet : true ,
146166 }
147167
148168 klog .Info ("GPU device plugin resource manager enabled" )
149169
170+ // Try listing Pods once to detect if Kubelet API works
171+ _ , err = rm .listPodsFromKubelet ()
172+
173+ if err != nil {
174+ klog .V (2 ).Info ("Not using Kubelet API" )
175+
176+ rm .useKubelet = false
177+ } else {
178+ klog .V (2 ).Info ("Using Kubelet API" )
179+ }
180+
150181 go func () {
151- rand .Seed (time .Now ().UnixNano ())
152- cleanupIntervalSeconds := rm .cleanupInterval .Seconds ()
153- n := rand .Intn (int (cleanupIntervalSeconds ))
182+ getRandDuration := func () time.Duration {
183+ cleanupIntervalSeconds := int (rm .cleanupInterval .Seconds ())
154184
155- ticker := time .NewTicker (rm .cleanupInterval / 2 + time .Duration (n )* time .Second )
185+ n , _ := cryptorand .Int (cryptorand .Reader , big .NewInt (int64 (cleanupIntervalSeconds )))
186+
187+ return rm .cleanupInterval / 2 + time .Duration (n .Int64 ())* time .Second
188+ }
189+
190+ ticker := time .NewTicker (getRandDuration ())
156191
157192 for range ticker .C {
158193 klog .V (4 ).Info ("Running cleanup" )
159194
160- n = rand .Intn (int (cleanupIntervalSeconds ))
161- ticker .Reset (rm .cleanupInterval / 2 + time .Duration (n )* time .Second )
195+ ticker .Reset (getRandDuration ())
162196
163197 // Gather both running and pending pods. It might happen that
164198 // cleanup is triggered between GetPreferredAllocation and Allocate
165199 // and it would remove the assignment data for the soon-to-be allocated pod
166- running := rm .listPodsOnNodeWithState (string (v1 .PodRunning ))
167- for podName , podItem := range rm .listPodsOnNodeWithState (string (v1 .PodPending )) {
168- running [podName ] = podItem
169- }
200+ running := rm .listPodsOnNodeWithStates ([]string {string (v1 .PodRunning ), string (v1 .PodPending )})
170201
171202 func () {
172203 rm .cleanupMutex .Lock ()
@@ -197,29 +228,134 @@ func getPodResourceKey(res *podresourcesv1.PodResources) string {
197228 return res .Namespace + "&" + res .Name
198229}
199230
200- func (rm * resourceManager ) listPodsOnNodeWithState (state string ) map [string ]* v1.Pod {
201- pods := make (map [string ]* v1.Pod )
202-
203- selector , err := fields .ParseSelector ("spec.nodeName=" + rm .nodeName +
204- ",status.phase=" + state )
231+ func (rm * resourceManager ) listPodsFromAPIServer () (* v1.PodList , error ) {
232+ selector , err := fields .ParseSelector ("spec.nodeName=" + rm .nodeName )
205233
206234 if err != nil {
207- return pods
235+ return & v1. PodList {}, err
208236 }
209237
238+ klog .V (4 ).Info ("Requesting pods from API server" )
239+
210240 podList , err := rm .clientset .CoreV1 ().Pods (v1 .NamespaceAll ).List (context .Background (), metav1.ListOptions {
211241 FieldSelector : selector .String (),
212242 })
213243
244+ if err != nil {
245+ klog .Error ("pod listing failed:" , err )
246+
247+ if err != nil {
248+ return & v1.PodList {}, err
249+ }
250+ }
251+
252+ return podList , nil
253+ }
254+
255+ // +kubebuilder:rbac:groups="",resources=nodes/proxy,verbs=list;get
256+
257+ func (rm * resourceManager ) listPodsFromKubelet () (* v1.PodList , error ) {
258+ var podList v1.PodList
259+
260+ token , err := os .ReadFile (serviceAccountTokenPath )
261+ if err != nil {
262+ klog .Warning ("Failed to read token for kubelet API access: " , err )
263+
264+ return & podList , err
265+ }
266+
267+ kubeletCert , err := os .ReadFile (kubeletHTTPSCertPath )
268+ if err != nil {
269+ klog .Warning ("Failed to read kubelet cert: " , err )
270+
271+ return & podList , err
272+ }
273+
274+ certPool := x509 .NewCertPool ()
275+ certPool .AppendCertsFromPEM (kubeletCert )
276+
277+ kubeletURL := "https://" + rm .hostIP + ":10250/pods"
278+ req , _ := http .NewRequestWithContext (context .Background (), "GET" , kubeletURL , nil )
279+ req .Header .Set ("Authorization" , "Bearer " + string (token ))
280+
281+ tr := & http.Transport {
282+ TLSClientConfig : & tls.Config {
283+ MinVersion : tls .VersionTLS12 ,
284+ RootCAs : certPool ,
285+ ServerName : rm .nodeName ,
286+ },
287+ }
288+ client := & http.Client {
289+ Timeout : kubeletAPITimeout ,
290+ Transport : tr ,
291+ }
292+
293+ klog .V (4 ).Infof ("Requesting pods from kubelet (%s)" , kubeletURL )
294+
295+ resp , err := (* client ).Do (req )
296+ if err != nil {
297+ klog .Warning ("Failed to read pods from kubelet API: " , err )
298+
299+ return & podList , err
300+ }
301+
302+ body , err := io .ReadAll (resp .Body )
303+ if err != nil {
304+ klog .Warning ("Failed to read http response body: " , err )
305+
306+ return & podList , err
307+ }
308+
309+ resp .Body .Close ()
310+
311+ err = json .Unmarshal (body , & podList )
312+ if err != nil {
313+ klog .Warning ("Failed to unmarshal PodList from response: " , err )
314+
315+ return & podList , err
316+ }
317+
318+ return & podList , nil
319+ }
320+
321+ func (rm * resourceManager ) listPods () (* v1.PodList , error ) {
322+ // Try to use kubelet API as long as it provides listings within retries
323+ if rm .useKubelet {
324+ for i := 0 ; i < kubeletAPIMaxRetries ; i ++ {
325+ if podList , err := rm .listPodsFromKubelet (); err == nil {
326+ return podList , nil
327+ } else if neterr , ok := err .(net.Error ); ok && neterr .Timeout () {
328+ continue
329+ }
330+
331+ // If error is non-timeout, break to stop using kubelet API
332+ break
333+ }
334+
335+ klog .Warning ("Stopping Kubelet API use due to error/timeout" )
336+
337+ rm .useKubelet = false
338+ }
339+
340+ return rm .listPodsFromAPIServer ()
341+ }
342+
343+ func (rm * resourceManager ) listPodsOnNodeWithStates (states []string ) map [string ]* v1.Pod {
344+ pods := make (map [string ]* v1.Pod )
345+
346+ podList , err := rm .listPods ()
214347 if err != nil {
215348 klog .Error ("pod listing failed:" , err )
216349
217350 return pods
218351 }
219352
220353 for i := range podList .Items {
221- key := getPodKey (& podList .Items [i ])
222- pods [key ] = & podList .Items [i ]
354+ phase := string (podList .Items [i ].Status .Phase )
355+ if slices .Contains (states , phase ) {
356+ key := getPodKey (& podList .Items [i ])
357+ pods [key ] = & podList .Items [i ]
358+ }
223359 }
224360
225361 return pods
@@ -524,7 +660,7 @@ func (rm *resourceManager) findAllocationPodCandidate() (*podCandidate, error) {
524660
525661// getNodePendingGPUPods returns a map of pod names -> pods that are pending and use the gpu.
526662func (rm * resourceManager ) getNodePendingGPUPods () (map [string ]* v1.Pod , error ) {
527- pendingPods := rm .listPodsOnNodeWithState ( string (v1 .PodPending ))
663+ pendingPods := rm .listPodsOnNodeWithStates ([] string { string (v1 .PodPending )} )
528664
529665 for podName , pod := range pendingPods {
530666 if numGPUUsingContainers (pod , rm .fullResourceName ) == 0 {
0 commit comments