Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/spf13/pflag"
zapraw "go.uber.org/zap"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"os"
Expand All @@ -38,9 +39,9 @@ import (
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
"sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding"
"sigs.k8s.io/aws-load-balancer-controller/pkg/version"
ctrl "sigs.k8s.io/controller-runtime"
corewebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/core"
elbv2webhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/elbv2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics"
// +kubebuilder:scaffold:imports
Expand Down Expand Up @@ -88,15 +89,20 @@ func main() {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

clientSet, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
setupLog.Error(err, "unable to obtain clientSet")
os.Exit(1)
}
podInfoRepo := k8s.NewDefaultPodInfoRepo(clientSet.CoreV1().RESTClient(), rtOpts.Namespace, ctrl.Log)
finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log)
podENIResolver := networking.NewDefaultPodENIInfoResolver(cloud.EC2(), cloud.VpcID(), ctrl.Log)
nodeENIResolver := networking.NewDefaultNodeENIInfoResolver(cloud.EC2(), ctrl.Log)
sgManager := networking.NewDefaultSecurityGroupManager(cloud.EC2(), ctrl.Log)
sgReconciler := networking.NewDefaultSecurityGroupReconciler(sgManager, ctrl.Log)
subnetResolver := networking.NewSubnetsResolver(cloud.EC2(), cloud.VpcID(), controllerCFG.ClusterName, ctrl.Log.WithName("subnets-resolver"))
tgbResManager := targetgroupbinding.NewDefaultResourceManager(mgr.GetClient(), cloud.ELBV2(),
podENIResolver, nodeENIResolver, sgManager, sgReconciler, cloud.VpcID(), controllerCFG.ClusterName, ctrl.Log)
podInfoRepo, podENIResolver, nodeENIResolver, sgManager, sgReconciler, cloud.VpcID(), controllerCFG.ClusterName, ctrl.Log)

ingGroupReconciler := ingress.NewGroupReconciler(cloud, mgr.GetClient(), mgr.GetEventRecorderFor("ingress"),
finalizerManager, sgManager, sgReconciler, subnetResolver,
Expand Down Expand Up @@ -125,10 +131,21 @@ func main() {
corewebhook.NewPodMutator(podReadinessGateInjector).SetupWithManager(mgr)
elbv2webhook.NewTargetGroupBindingMutator(cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr)
elbv2webhook.NewTargetGroupBindingValidator(ctrl.Log).SetupWithManager(mgr)
// +kubebuilder:scaffold:builder

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
//+kubebuilder:scaffold:builder

stopChan := ctrl.SetupSignalHandler()
go func() {
setupLog.Info("starting podInfo repo")
if err := podInfoRepo.Start(stopChan); err != nil {
setupLog.Error(err, "problem running podInfo repo")
os.Exit(1)
}
}()
if err := podInfoRepo.WaitForCacheSync(stopChan); err != nil {
setupLog.Error(err, "problem wait for podInfo repo sync")
os.Exit(1)
}
if err := mgr.Start(stopChan); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Expand Down
66 changes: 66 additions & 0 deletions mocks/k8s/mock_repo_info_repo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 51 additions & 45 deletions pkg/backend/endpoint_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,63 @@ import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// EndpointResolver resolves the endpoints for specific service & service Port.
// TODO: for pod endpoints, we currently rely on endpoints events, we might change to use pod events directly in the future.
// under current implementation with pod readinessGate enabled, an unready endpoint but not match our inclusionCriteria won't be registered,
// and it won't turn ready due to blocked by readinessGate, and no future endpoint events will trigger.
// We solve this by requeue the TGB if unready endpoints have the potential to be ready if reconcile in later time.

// EndpointResolver resolves the endpoints for specific service & service Port.
type EndpointResolver interface {
// ResolvePodEndpoints will resolve endpoints backed by pods directly.
ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString, opts ...EndpointResolveOption) ([]PodEndpoint, error)
// returns resolved podEndpoints and whether there are unready endpoints that can potentially turn ready in future reconciles.
ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString,
opts ...EndpointResolveOption) ([]PodEndpoint, bool, error)

// ResolveNodePortEndpoints will resolve endpoints backed by nodePort.
ResolveNodePortEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString, opts ...EndpointResolveOption) ([]NodePortEndpoint, error)
ResolveNodePortEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString,
opts ...EndpointResolveOption) ([]NodePortEndpoint, error)
}

// NewDefaultEndpointResolver constructs new defaultEndpointResolver
func NewDefaultEndpointResolver(k8sClient client.Client, logger logr.Logger) *defaultEndpointResolver {
func NewDefaultEndpointResolver(k8sClient client.Client, podInfoRepo k8s.PodInfoRepo, logger logr.Logger) *defaultEndpointResolver {
return &defaultEndpointResolver{
k8sClient: k8sClient,
logger: logger,
k8sClient: k8sClient,
podInfoRepo: podInfoRepo,
logger: logger,
}
}

var _ EndpointResolver = &defaultEndpointResolver{}

// default implementation for EndpointResolver
type defaultEndpointResolver struct {
k8sClient client.Client
logger logr.Logger
k8sClient client.Client
podInfoRepo k8s.PodInfoRepo
logger logr.Logger
}

func (r *defaultEndpointResolver) ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString, opts ...EndpointResolveOption) ([]PodEndpoint, error) {
resolveOpts := EndpointResolveOptions{
NodeSelector: labels.Nothing(),
}
func (r *defaultEndpointResolver) ResolvePodEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString,
opts ...EndpointResolveOption) ([]PodEndpoint, bool, error) {
resolveOpts := defaultEndpointResolveOptions()
resolveOpts.ApplyOptions(opts)

svc, svcPort, err := r.findServiceAndServicePort(ctx, svcKey, port)
if err != nil {
return nil, err
return nil, false, err
}

epsKey := k8s.NamespacedName(svc) // k8s Endpoints have same name as k8s Service
eps := &corev1.Endpoints{}
if err := r.k8sClient.Get(ctx, epsKey, eps); err != nil {
return nil, err
return nil, false, err
}

containsPotentialReadyEndpoints := false
var endpoints []PodEndpoint
for _, epSubset := range eps.Subsets {
for _, epPort := range epSubset.Ports {
Expand All @@ -65,43 +74,53 @@ func (r *defaultEndpointResolver) ResolvePodEndpoints(ctx context.Context, svcKe
if epAddr.TargetRef == nil || epAddr.TargetRef.Kind != "Pod" {
continue
}
epPod, err := r.findPodByReference(ctx, svc.Namespace, *epAddr.TargetRef)
pod, exists, err := r.findPodByReference(ctx, svc.Namespace, *epAddr.TargetRef)
if err != nil {
return nil, err
return nil, false, err
}
endpoints = append(endpoints, buildPodEndpoint(epPod, epAddr, epPort))
if !exists {
return nil, false, errors.New("couldn't find podInfo for ready endpoint")
}
endpoints = append(endpoints, buildPodEndpoint(pod, epAddr, epPort))
}

if len(resolveOpts.UnreadyPodInclusionCriteria) != 0 {
if len(resolveOpts.PodReadinessGates) != 0 {
for _, epAddr := range epSubset.NotReadyAddresses {
if epAddr.TargetRef == nil || epAddr.TargetRef.Kind != "Pod" {
continue
}
epPod, err := r.findPodByReference(ctx, svc.Namespace, *epAddr.TargetRef)
pod, exists, err := r.findPodByReference(ctx, svc.Namespace, *epAddr.TargetRef)
if err != nil {
return nil, err
return nil, false, err
}
if !exists {
containsPotentialReadyEndpoints = true
continue
}
if !pod.HasAnyOfReadinessGates(resolveOpts.PodReadinessGates) {
continue
}
if isPodMeetCriteria(epPod, resolveOpts.UnreadyPodInclusionCriteria) {
endpoints = append(endpoints, buildPodEndpoint(epPod, epAddr, epPort))
if !pod.IsContainersReady() {
containsPotentialReadyEndpoints = true
continue
}
endpoints = append(endpoints, buildPodEndpoint(pod, epAddr, epPort))
}
}
}
}

return endpoints, nil
return endpoints, containsPotentialReadyEndpoints, nil
}

func (r *defaultEndpointResolver) ResolveNodePortEndpoints(ctx context.Context, svcKey types.NamespacedName, port intstr.IntOrString, opts ...EndpointResolveOption) ([]NodePortEndpoint, error) {
resolveOpts := EndpointResolveOptions{
NodeSelector: labels.Nothing(),
}
resolveOpts := defaultEndpointResolveOptions()
resolveOpts.ApplyOptions(opts)

svc, svcPort, err := r.findServiceAndServicePort(ctx, svcKey, port)
if err != nil {
return nil, err
}

if svc.Spec.Type != corev1.ServiceTypeNodePort && svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
return nil, errors.Errorf("service type must be either 'NodePort' or 'LoadBalancer': %v", svcKey)
}
Expand Down Expand Up @@ -140,25 +159,12 @@ func (r *defaultEndpointResolver) findServiceAndServicePort(ctx context.Context,
return svc, svcPort, nil
}

func (r *defaultEndpointResolver) findPodByReference(ctx context.Context, namespace string, podRef corev1.ObjectReference) (*corev1.Pod, error) {
pod := &corev1.Pod{}
func (r *defaultEndpointResolver) findPodByReference(ctx context.Context, namespace string, podRef corev1.ObjectReference) (k8s.PodInfo, bool, error) {
podKey := types.NamespacedName{Namespace: namespace, Name: podRef.Name}
if err := r.k8sClient.Get(ctx, podKey, pod); err != nil {
return nil, err
}
return pod, nil
}

func isPodMeetCriteria(pod *corev1.Pod, criteria []PodPredicate) bool {
for _, criterion := range criteria {
if !criterion(pod) {
return false
}
}
return true
return r.podInfoRepo.Get(ctx, podKey)
}

func buildPodEndpoint(pod *corev1.Pod, epAddr corev1.EndpointAddress, epPort corev1.EndpointPort) PodEndpoint {
func buildPodEndpoint(pod k8s.PodInfo, epAddr corev1.EndpointAddress, epPort corev1.EndpointPort) PodEndpoint {
return PodEndpoint{
IP: epAddr.IP,
Port: int64(epPort.Port),
Expand Down
Loading