Skip to content

Commit 16081a6

Browse files
committed
issue-12678 - refactored client_manager to reuse KubernetesCoreClient for blob storage initialization; added GetClientSet method to KubernetesCoreInterface
Signed-off-by: Helber Belmiro <[email protected]>
1 parent ff3dbaa commit 16081a6

File tree

3 files changed

+23
-13
lines changed

3 files changed

+23
-13
lines changed

backend/src/apiserver/client/kubernetes_core.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,37 @@ import (
66
"github.com/cenkalti/backoff"
77
"github.com/golang/glog"
88
"github.com/kubeflow/pipelines/backend/src/common/util"
9+
"k8s.io/client-go/kubernetes"
910
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
1011
)
1112

1213
type KubernetesCoreInterface interface {
1314
PodClient(namespace string) v1.PodInterface
15+
GetClientSet() kubernetes.Interface
1416
}
1517

1618
type KubernetesCore struct {
1719
coreV1Client v1.CoreV1Interface
20+
clientSet kubernetes.Interface
1821
}
1922

2023
func (c *KubernetesCore) PodClient(namespace string) v1.PodInterface {
2124
return c.coreV1Client.Pods(namespace)
2225
}
2326

27+
func (c *KubernetesCore) GetClientSet() kubernetes.Interface {
28+
return c.clientSet
29+
}
30+
2431
func createKubernetesCore(clientParams util.ClientParameters) (KubernetesCoreInterface, error) {
2532
clientSet, err := getKubernetesClientset(clientParams)
2633
if err != nil {
2734
return nil, err
2835
}
29-
return &KubernetesCore{clientSet.CoreV1()}, nil
36+
return &KubernetesCore{
37+
coreV1Client: clientSet.CoreV1(),
38+
clientSet: clientSet,
39+
}, nil
3040
}
3141

3242
// CreateKubernetesCoreOrFatal creates a new client for the Kubernetes pod.

backend/src/apiserver/client/kubernetes_core_fake.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/kubeflow/pipelines/backend/src/common/util"
2121
policyv1 "k8s.io/api/policy/v1"
2222
policyv1beta1 "k8s.io/api/policy/v1beta1"
23+
"k8s.io/client-go/kubernetes"
2324
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
2425
)
2526

@@ -34,6 +35,11 @@ func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface {
3435
return c.podClientFake
3536
}
3637

38+
func (c *FakeKuberneteCoreClient) GetClientSet() kubernetes.Interface {
39+
// Return nil for fake implementation - tests that need this should use a mock
40+
return nil
41+
}
42+
3743
func NewFakeKuberneteCoresClient() *FakeKuberneteCoreClient {
3844
return &FakeKuberneteCoreClient{&FakePodClient{}}
3945
}
@@ -50,6 +56,11 @@ func (c *FakeKubernetesCoreClientWithBadPodClient) PodClient(namespace string) v
5056
return c.podClientFake
5157
}
5258

59+
func (c *FakeKubernetesCoreClientWithBadPodClient) GetClientSet() kubernetes.Interface {
60+
// Return nil for fake implementation
61+
return nil
62+
}
63+
5364
func (c *FakePodClient) EvictV1(context.Context, *policyv1.Eviction) error {
5465
return nil
5566
}

backend/src/apiserver/client_manager/client_manager.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -301,19 +301,8 @@ func (c *ClientManager) init(options *Options) error {
301301

302302
c.k8sCoreClient = client.CreateKubernetesCoreOrFatal(common.GetDurationConfig(initConnectionTimeout), clientParams)
303303

304-
restConfig, err := util.GetKubernetesConfig()
305-
if err != nil {
306-
return err
307-
}
308-
restConfig.QPS = float32(clientParams.QPS)
309-
restConfig.Burst = clientParams.Burst
310-
k8sClient, err := kubernetes.NewForConfig(restConfig)
311-
if err != nil {
312-
return util.Wrap(err, "Failed to create Kubernetes client for blob storage")
313-
}
314-
315304
glog.Info("Initializing Object store client...")
316-
c.objectStore = initBlobObjectStore(options.Context, common.GetDurationConfig(initConnectionTimeout), k8sClient)
305+
c.objectStore = initBlobObjectStore(options.Context, common.GetDurationConfig(initConnectionTimeout), c.k8sCoreClient.GetClientSet())
317306
glog.Info("Object store client initialized successfully")
318307

319308
runStore := storage.NewRunStore(db, c.time)

0 commit comments

Comments
 (0)