Skip to content

Commit 4d858c5

Browse files
committed
pkg/deviceplugin: move to grpc.NewClient()
grpc.NewClient(), added in grpc-go v1.63, is the preferred way to create a new ClientConn. In most of our usages, moving away from grpc.Dial*() to it is straightforward. However, we've also relied on grpc.Dial*()'s behavior to automatically make a new connection to "test" a connection is successful isn't available anymore. Combined with grpc.WithBlock dialoption this usage is considered "especially bad" way to handle a client connection. The recommended approach to test a server connection is to separately make a connection and watch the connection state to become Ready. This change follows that recommendation. Signed-off-by: Mikko Ylinen <[email protected]>
1 parent 8c885da commit 4d858c5

File tree

3 files changed

+35
-33
lines changed

3 files changed

+35
-33
lines changed

cmd/gpu_plugin/rm/gpu_plugin_resource_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (w *mockPodResources) Get(ctx context.Context,
9292
}
9393

9494
func newMockResourceManager(pods []v1.Pod) ResourceManager {
95-
client, err := grpc.Dial("fake", grpc.WithTransportCredentials(insecure.NewCredentials()))
95+
client, err := grpc.NewClient("fake", grpc.WithTransportCredentials(insecure.NewCredentials()))
9696
if err != nil {
9797
fmt.Fprintf(os.Stderr, "failed to create client: %v\n", err)
9898

pkg/deviceplugin/server.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/fsnotify/fsnotify"
2727
"github.com/pkg/errors"
2828
"google.golang.org/grpc"
29+
"google.golang.org/grpc/connectivity"
2930
"google.golang.org/grpc/credentials/insecure"
3031

3132
"k8s.io/klog/v2"
@@ -326,15 +327,9 @@ func watchFile(file string) error {
326327
}
327328

328329
func (srv *server) registerWithKubelet(kubeletSocket, pluginEndPoint, resourceName string) error {
329-
ctx := context.Background()
330-
331-
conn, err := grpc.DialContext(ctx, kubeletSocket,
332-
grpc.WithTransportCredentials(insecure.NewCredentials()),
333-
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
334-
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
335-
}))
330+
conn, err := grpc.NewClient(filepath.Join("unix://", kubeletSocket), grpc.WithTransportCredentials(insecure.NewCredentials()))
336331
if err != nil {
337-
return errors.Wrap(err, "Cannot connect to kubelet service")
332+
return errors.Wrap(err, "Cannot create a gRPC client")
338333
}
339334

340335
defer conn.Close()
@@ -347,7 +342,7 @@ func (srv *server) registerWithKubelet(kubeletSocket, pluginEndPoint, resourceNa
347342
Options: srv.getDevicePluginOptions(),
348343
}
349344

350-
_, err = client.Register(ctx, reqt)
345+
_, err = client.Register(context.Background(), reqt)
351346
if err != nil {
352347
return errors.Wrap(err, "Cannot register to kubelet service")
353348
}
@@ -358,20 +353,33 @@ func (srv *server) registerWithKubelet(kubeletSocket, pluginEndPoint, resourceNa
358353
// waitForServer checks if grpc server is alive
359354
// by making grpc blocking connection to the server socket.
360355
func waitForServer(socket string, timeout time.Duration) error {
356+
conn, err := grpc.NewClient(filepath.Join("unix://", socket), grpc.WithTransportCredentials(insecure.NewCredentials()))
357+
if err != nil {
358+
return errors.Wrap(err, "Cannot create a gRPC client")
359+
}
360+
361+
defer conn.Close()
362+
361363
ctx, cancel := context.WithTimeout(context.Background(), timeout)
362364

363365
defer cancel()
364366

365-
conn, err := grpc.DialContext(ctx, socket,
366-
grpc.WithTransportCredentials(insecure.NewCredentials()),
367-
grpc.WithBlock(),
368-
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
369-
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
370-
}),
371-
)
372-
if conn != nil {
373-
_ = conn.Close()
374-
}
367+
// A blocking dial blocks until the clientConn is ready. Based
368+
// on grpc-go's DialContext() that moved to use NewClient() but
369+
// marked DialContext() deprecated.
370+
for {
371+
state := conn.GetState()
372+
if state == connectivity.Idle {
373+
conn.Connect()
374+
}
375+
376+
if state == connectivity.Ready {
377+
return nil
378+
}
375379

376-
return errors.Wrapf(err, "Failed dial context at %s", socket)
380+
if !conn.WaitForStateChange(ctx, state) {
381+
// ctx got timeout or canceled.
382+
return errors.Wrapf(ctx.Err(), "Failed dial context at %s", socket)
383+
}
384+
}
377385
}

pkg/deviceplugin/server_test.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"net"
2222
"os"
2323
"path"
24+
"path/filepath"
2425
"reflect"
2526
"sync"
2627
"testing"
@@ -111,7 +112,7 @@ func (k *kubeletStub) start() error {
111112
return waitForServer(k.socket, 10*time.Second)
112113
}
113114

114-
func TestRegisterWithKublet(t *testing.T) {
115+
func TestRegisterWithKubelet(t *testing.T) {
115116
pluginSocket := path.Join(devicePluginPath, pluginEndpoint)
116117

117118
srv := newTestServer()
@@ -180,11 +181,8 @@ func TestSetupAndServe(t *testing.T) {
180181

181182
ctx := context.Background()
182183

183-
conn, err := grpc.DialContext(ctx, pluginSocket,
184-
grpc.WithTransportCredentials(insecure.NewCredentials()),
185-
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
186-
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
187-
}))
184+
conn, err := grpc.NewClient(filepath.Join("unix://", pluginSocket),
185+
grpc.WithTransportCredentials(insecure.NewCredentials()))
188186
if err != nil {
189187
t.Fatalf("Failed to get connection: %+v", err)
190188
}
@@ -231,12 +229,8 @@ func TestSetupAndServe(t *testing.T) {
231229
time.Sleep(1 * time.Second)
232230
}
233231

234-
conn, err = grpc.DialContext(ctx, pluginSocket,
235-
grpc.WithTransportCredentials(insecure.NewCredentials()),
236-
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
237-
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
238-
}))
239-
232+
conn, err = grpc.NewClient(filepath.Join("unix://", pluginSocket),
233+
grpc.WithTransportCredentials(insecure.NewCredentials()))
240234
if err != nil {
241235
t.Fatalf("Failed to get connection: %+v", err)
242236
}

0 commit comments

Comments
 (0)