Skip to content

Commit 12b5397

Browse files
M00nF1shTimothy-Dougherty
authored andcommitted
optimize the memory usage and fix bugs (kubernetes-sigs#1528)
* add pod repo * change requeue errors to use reason instead
1 parent fd6d32d commit 12b5397

25 files changed

+2241
-699
lines changed

main.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/spf13/pflag"
2323
zapraw "go.uber.org/zap"
2424
k8sruntime "k8s.io/apimachinery/pkg/runtime"
25+
"k8s.io/client-go/kubernetes"
2526
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
2627
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
2728
"os"
@@ -38,9 +39,9 @@ import (
3839
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
3940
"sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding"
4041
"sigs.k8s.io/aws-load-balancer-controller/pkg/version"
42+
ctrl "sigs.k8s.io/controller-runtime"
4143
corewebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/core"
4244
elbv2webhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/elbv2"
43-
ctrl "sigs.k8s.io/controller-runtime"
4445
"sigs.k8s.io/controller-runtime/pkg/log/zap"
4546
"sigs.k8s.io/controller-runtime/pkg/metrics"
4647
// +kubebuilder:scaffold:imports
@@ -88,15 +89,20 @@ func main() {
8889
setupLog.Error(err, "unable to start manager")
8990
os.Exit(1)
9091
}
91-
92+
clientSet, err := kubernetes.NewForConfig(mgr.GetConfig())
93+
if err != nil {
94+
setupLog.Error(err, "unable to obtain clientSet")
95+
os.Exit(1)
96+
}
97+
podInfoRepo := k8s.NewDefaultPodInfoRepo(clientSet.CoreV1().RESTClient(), rtOpts.Namespace, ctrl.Log)
9298
finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log)
9399
podENIResolver := networking.NewDefaultPodENIInfoResolver(cloud.EC2(), cloud.VpcID(), ctrl.Log)
94100
nodeENIResolver := networking.NewDefaultNodeENIInfoResolver(cloud.EC2(), ctrl.Log)
95101
sgManager := networking.NewDefaultSecurityGroupManager(cloud.EC2(), ctrl.Log)
96102
sgReconciler := networking.NewDefaultSecurityGroupReconciler(sgManager, ctrl.Log)
97103
subnetResolver := networking.NewSubnetsResolver(cloud.EC2(), cloud.VpcID(), controllerCFG.ClusterName, ctrl.Log.WithName("subnets-resolver"))
98104
tgbResManager := targetgroupbinding.NewDefaultResourceManager(mgr.GetClient(), cloud.ELBV2(),
99-
podENIResolver, nodeENIResolver, sgManager, sgReconciler, cloud.VpcID(), controllerCFG.ClusterName, ctrl.Log)
105+
podInfoRepo, podENIResolver, nodeENIResolver, sgManager, sgReconciler, cloud.VpcID(), controllerCFG.ClusterName, ctrl.Log)
100106

101107
ingGroupReconciler := ingress.NewGroupReconciler(cloud, mgr.GetClient(), mgr.GetEventRecorderFor("ingress"),
102108
finalizerManager, sgManager, sgReconciler, subnetResolver,
@@ -125,10 +131,21 @@ func main() {
125131
corewebhook.NewPodMutator(podReadinessGateInjector).SetupWithManager(mgr)
126132
elbv2webhook.NewTargetGroupBindingMutator(cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr)
127133
elbv2webhook.NewTargetGroupBindingValidator(ctrl.Log).SetupWithManager(mgr)
128-
// +kubebuilder:scaffold:builder
129-
130-
setupLog.Info("starting manager")
131-
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
134+
//+kubebuilder:scaffold:builder
135+
136+
stopChan := ctrl.SetupSignalHandler()
137+
go func() {
138+
setupLog.Info("starting podInfo repo")
139+
if err := podInfoRepo.Start(stopChan); err != nil {
140+
setupLog.Error(err, "problem running podInfo repo")
141+
os.Exit(1)
142+
}
143+
}()
144+
if err := podInfoRepo.WaitForCacheSync(stopChan); err != nil {
145+
setupLog.Error(err, "problem wait for podInfo repo sync")
146+
os.Exit(1)
147+
}
148+
if err := mgr.Start(stopChan); err != nil {
132149
setupLog.Error(err, "problem running manager")
133150
os.Exit(1)
134151
}

mocks/k8s/mock_repo_info_repo.go

Lines changed: 66 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/backend/endpoint_resolver.go

Lines changed: 51 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,54 +5,63 @@ import (
55
"github.com/go-logr/logr"
66
"github.com/pkg/errors"
77
corev1 "k8s.io/api/core/v1"
8-
"k8s.io/apimachinery/pkg/labels"
98
"k8s.io/apimachinery/pkg/types"
109
"k8s.io/apimachinery/pkg/util/intstr"
1110
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
1211
"sigs.k8s.io/controller-runtime/pkg/client"
1312
)
1413

15-
// EndpointResolver resolves the endpoints for specific service & service Port.
14+
// TODO: for pod endpoints, we currently rely on endpoints events, we might change to use pod events directly in the future.
15+
// under current implementation with pod readinessGate enabled, an unready endpoint but not match our inclusionCriteria won't be registered,
16+
// and it won't turn ready due to blocked by readinessGate, and no future endpoint events will trigger.
17+
// We solve this by requeue the TGB if unready endpoints have the potential to be ready if reconcile in later time.
18+
19+
// EndpointResolver resolves the endpoints for specific service & service Port.
1620
type EndpointResolver interface {
1721
// ResolvePodEndpoints will resolve endpoints backed by pods directly.
18-
ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString, opts ...EndpointResolveOption) ([]PodEndpoint, error)
22+
// returns resolved podEndpoints and whether there are unready endpoints that can potentially turn ready in future reconciles.
23+
ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString,
24+
opts ...EndpointResolveOption) ([]PodEndpoint, bool, error)
1925

2026
// ResolveNodePortEndpoints will resolve endpoints backed by nodePort.
21-
ResolveNodePortEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString, opts ...EndpointResolveOption) ([]NodePortEndpoint, error)
27+
ResolveNodePortEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString,
28+
opts ...EndpointResolveOption) ([]NodePortEndpoint, error)
2229
}
2330

2431
// NewDefaultEndpointResolver constructs new defaultEndpointResolver
25-
func NewDefaultEndpointResolver(k8sClient client.Client, logger logr.Logger) *defaultEndpointResolver {
32+
func NewDefaultEndpointResolver(k8sClient client.Client, podInfoRepo k8s.PodInfoRepo, logger logr.Logger) *defaultEndpointResolver {
2633
return &defaultEndpointResolver{
27-
k8sClient: k8sClient,
28-
logger: logger,
34+
k8sClient: k8sClient,
35+
podInfoRepo: podInfoRepo,
36+
logger: logger,
2937
}
3038
}
3139

3240
var _ EndpointResolver = &defaultEndpointResolver{}
3341

3442
// default implementation for EndpointResolver
3543
type defaultEndpointResolver struct {
36-
k8sClient client.Client
37-
logger logr.Logger
44+
k8sClient client.Client
45+
podInfoRepo k8s.PodInfoRepo
46+
logger logr.Logger
3847
}
3948

40-
func (r *defaultEndpointResolver) ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString, opts ...EndpointResolveOption) ([]PodEndpoint, error) {
41-
resolveOpts := EndpointResolveOptions{
42-
NodeSelector: labels.Nothing(),
43-
}
49+
func (r *defaultEndpointResolver) ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString,
50+
opts ...EndpointResolveOption) ([]PodEndpoint, bool, error) {
51+
resolveOpts := defaultEndpointResolveOptions()
4452
resolveOpts.ApplyOptions(opts)
53+
4554
svc, svcPort, err := r.findServiceAndServicePort(ctx, svcKey, port)
4655
if err != nil {
47-
return nil, err
56+
return nil, false, err
4857
}
49-
5058
epsKey := k8s.NamespacedName(svc) // k8s Endpoints have same name as k8s Service
5159
eps := &corev1.Endpoints{}
5260
if err := r.k8sClient.Get(ctx, epsKey, eps); err != nil {
53-
return nil, err
61+
return nil, false, err
5462
}
5563

64+
containsPotentialReadyEndpoints := false
5665
var endpoints []PodEndpoint
5766
for _, epSubset := range eps.Subsets {
5867
for _, epPort := range epSubset.Ports {
@@ -65,43 +74,53 @@ func (r *defaultEndpointResolver) ResolvePodEndpoints(ctx context.Context, svcKe
6574
if epAddr.TargetRef == nil || epAddr.TargetRef.Kind != "Pod" {
6675
continue
6776
}
68-
epPod, err := r.findPodByReference(ctx, svc.Namespace, *epAddr.TargetRef)
77+
pod, exists, err := r.findPodByReference(ctx, svc.Namespace, *epAddr.TargetRef)
6978
if err != nil {
70-
return nil, err
79+
return nil, false, err
7180
}
72-
endpoints = append(endpoints, buildPodEndpoint(epPod, epAddr, epPort))
81+
if !exists {
82+
return nil, false, errors.New("couldn't find podInfo for ready endpoint")
83+
}
84+
endpoints = append(endpoints, buildPodEndpoint(pod, epAddr, epPort))
7385
}
7486

75-
if len(resolveOpts.UnreadyPodInclusionCriteria) != 0 {
87+
if len(resolveOpts.PodReadinessGates) != 0 {
7688
for _, epAddr := range epSubset.NotReadyAddresses {
7789
if epAddr.TargetRef == nil || epAddr.TargetRef.Kind != "Pod" {
7890
continue
7991
}
80-
epPod, err := r.findPodByReference(ctx, svc.Namespace, *epAddr.TargetRef)
92+
pod, exists, err := r.findPodByReference(ctx, svc.Namespace, *epAddr.TargetRef)
8193
if err != nil {
82-
return nil, err
94+
return nil, false, err
95+
}
96+
if !exists {
97+
containsPotentialReadyEndpoints = true
98+
continue
99+
}
100+
if !pod.HasAnyOfReadinessGates(resolveOpts.PodReadinessGates) {
101+
continue
83102
}
84-
if isPodMeetCriteria(epPod, resolveOpts.UnreadyPodInclusionCriteria) {
85-
endpoints = append(endpoints, buildPodEndpoint(epPod, epAddr, epPort))
103+
if !pod.IsContainersReady() {
104+
containsPotentialReadyEndpoints = true
105+
continue
86106
}
107+
endpoints = append(endpoints, buildPodEndpoint(pod, epAddr, epPort))
87108
}
88109
}
89110
}
90111
}
91112

92-
return endpoints, nil
113+
return endpoints, containsPotentialReadyEndpoints, nil
93114
}
94115

95116
func (r *defaultEndpointResolver) ResolveNodePortEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString, opts ...EndpointResolveOption) ([]NodePortEndpoint, error) {
96-
resolveOpts := EndpointResolveOptions{
97-
NodeSelector: labels.Nothing(),
98-
}
117+
resolveOpts := defaultEndpointResolveOptions()
99118
resolveOpts.ApplyOptions(opts)
119+
100120
svc, svcPort, err := r.findServiceAndServicePort(ctx, svcKey, port)
101121
if err != nil {
102122
return nil, err
103123
}
104-
105124
if svc.Spec.Type != corev1.ServiceTypeNodePort && svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
106125
return nil, errors.Errorf("service type must be either 'NodePort' or 'LoadBalancer': %v", svcKey)
107126
}
@@ -140,25 +159,12 @@ func (r *defaultEndpointResolver) findServiceAndServicePort(ctx context.Context,
140159
return svc, svcPort, nil
141160
}
142161

143-
func (r *defaultEndpointResolver) findPodByReference(ctx context.Context, namespace string, podRef corev1.ObjectReference) (*corev1.Pod, error) {
144-
pod := &corev1.Pod{}
162+
func (r *defaultEndpointResolver) findPodByReference(ctx context.Context, namespace string, podRef corev1.ObjectReference) (k8s.PodInfo, bool, error) {
145163
podKey := types.NamespacedName{Namespace: namespace, Name: podRef.Name}
146-
if err := r.k8sClient.Get(ctx, podKey, pod); err != nil {
147-
return nil, err
148-
}
149-
return pod, nil
150-
}
151-
152-
func isPodMeetCriteria(pod *corev1.Pod, criteria []PodPredicate) bool {
153-
for _, criterion := range criteria {
154-
if !criterion(pod) {
155-
return false
156-
}
157-
}
158-
return true
164+
return r.podInfoRepo.Get(ctx, podKey)
159165
}
160166

161-
func buildPodEndpoint(pod *corev1.Pod, epAddr corev1.EndpointAddress, epPort corev1.EndpointPort) PodEndpoint {
167+
func buildPodEndpoint(pod k8s.PodInfo, epAddr corev1.EndpointAddress, epPort corev1.EndpointPort) PodEndpoint {
162168
return PodEndpoint{
163169
IP: epAddr.IP,
164170
Port: int64(epPort.Port),

0 commit comments

Comments
 (0)