Skip to content

Commit 93bc6ba

Browse files
authored
Change kube-adaptor Leader Election Loss Error Handling (#2296)
* Change kube-adaptor Leader Election Loss Error Handling Updates the kube-adaptor so that when the skupper-site-leader Lease is lost the kube-adaptor will retry instead of exiting. Signed-off-by: Christian Kruse <[email protected]> * remove harmful lease owner assignment Signed-off-by: Christian Kruse <[email protected]> * fix kube flow controller go routine leak Signed-off-by: Christian Kruse <[email protected]> --------- Signed-off-by: Christian Kruse <[email protected]>
1 parent 3842cc0 commit 93bc6ba

File tree

2 files changed

+67
-65
lines changed

2 files changed

+67
-65
lines changed

internal/kube/adaptor/collector.go

Lines changed: 51 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"log"
77
"log/slog"
88
"os"
9+
"sync"
910
"time"
1011

12+
"github.com/cenkalti/backoff/v4"
1113
"github.com/skupperproject/skupper/api/types"
1214
"github.com/skupperproject/skupper/internal/config"
1315
"github.com/skupperproject/skupper/internal/flow"
@@ -47,20 +49,6 @@ func (s *StatusSyncClient) Update(ctx context.Context, latest *corev1.ConfigMap)
4749
return err
4850
}
4951

50-
func updateLockOwner(lockname, namespace string, owner *metav1.OwnerReference, cli *internalclient.KubeClient) error {
51-
current, err := cli.Kube.CoordinationV1().Leases(namespace).Get(context.TODO(), lockname, metav1.GetOptions{})
52-
if err != nil {
53-
return err
54-
}
55-
if owner != nil {
56-
current.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
57-
*owner,
58-
}
59-
}
60-
_, err = cli.Kube.CoordinationV1().Leases(namespace).Update(context.TODO(), current, metav1.UpdateOptions{})
61-
return err
62-
}
63-
6452
func siteCollector(ctx context.Context, cli *internalclient.KubeClient) {
6553
siteData := map[string]string{}
6654
platform := config.GetPlatform()
@@ -84,11 +72,6 @@ func siteCollector(ctx context.Context, cli *internalclient.KubeClient) {
8472
log.Fatal("Failed to create site status config map ", err.Error())
8573
}
8674

87-
err = updateLockOwner(types.SiteLeaderLockName, cli.Namespace, &owner, cli)
88-
if err != nil {
89-
log.Println("Update lock error", err.Error())
90-
}
91-
9275
factory := session.NewContainerFactory("amqp://localhost:5672", session.ContainerConfig{ContainerID: "kube-flow-collector"})
9376
statusSyncClient := &StatusSyncClient{
9477
client: cli.Kube.CoreV1().ConfigMaps(cli.Namespace),
@@ -136,37 +119,56 @@ func startFlowController(ctx context.Context, cli *internalclient.KubeClient) er
136119
}
137120

138121
func runLeaderElection(lock *resourcelock.LeaseLock, id string, cli *internalclient.KubeClient) {
139-
ctx := context.Background()
140-
begin := time.Now()
141-
podname, _ := os.Hostname()
142-
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
143-
Lock: lock,
144-
ReleaseOnCancel: true,
145-
LeaseDuration: 15 * time.Second,
146-
RenewDeadline: 10 * time.Second,
147-
RetryPeriod: 2 * time.Second,
148-
Callbacks: leaderelection.LeaderCallbacks{
149-
OnStartedLeading: func(c context.Context) {
150-
log.Printf("COLLECTOR: Leader %s starting site collection after %s\n", podname, time.Since(begin))
151-
siteCollector(ctx, cli)
152-
if err := startFlowController(ctx, cli); err != nil {
153-
log.Printf("COLLECTOR: Failed to start controller for emitting site events: %s", err)
154-
}
155-
},
156-
OnStoppedLeading: func() {
157-
// we held the lock but lost it. This indicates that something
158-
// went wrong. Exit and restart.
159-
log.Fatalf("COLLECTOR: Lost leader lock after %s", time.Since(begin))
160-
},
161-
OnNewLeader: func(current_id string) {
162-
if current_id == id {
163-
// Remain as the leader
164-
return
165-
}
166-
log.Printf("COLLECTOR: New leader for site collection is %s\n", current_id)
122+
var (
123+
mu sync.Mutex
124+
leaderCtx context.Context
125+
leaderCtxCancel func()
126+
)
127+
// attempt to run leader election forever
128+
strategy := backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(0))
129+
backoff.RetryNotify(func() error {
130+
leaderelection.RunOrDie(context.Background(), leaderelection.LeaderElectionConfig{
131+
Lock: lock,
132+
ReleaseOnCancel: true,
133+
LeaseDuration: 15 * time.Second,
134+
RenewDeadline: 10 * time.Second,
135+
RetryPeriod: 2 * time.Second,
136+
Callbacks: leaderelection.LeaderCallbacks{
137+
OnStartedLeading: func(ctx context.Context) {
138+
mu.Lock()
139+
defer mu.Unlock()
140+
leaderCtx, leaderCtxCancel = context.WithCancel(ctx)
141+
log.Printf("COLLECTOR: Became leader. Starting status sync and site controller after %s.", strategy.GetElapsedTime())
142+
siteCollector(leaderCtx, cli)
143+
if err := startFlowController(leaderCtx, cli); err != nil {
144+
log.Printf("COLLECTOR: Failed to start controller for emitting site events: %s", err)
145+
}
146+
},
147+
OnStoppedLeading: func() {
148+
log.Printf("COLLECTOR: Lost leader lock after %s. Stopping status sync and site controller.", strategy.GetElapsedTime())
149+
mu.Lock()
150+
defer mu.Unlock()
151+
if leaderCtxCancel == nil {
152+
return
153+
}
154+
leaderCtxCancel()
155+
leaderCtx, leaderCtxCancel = nil, nil
156+
},
157+
OnNewLeader: func(current_id string) {
158+
if current_id == id {
159+
// Remain as the leader
160+
return
161+
}
162+
log.Printf("COLLECTOR: New leader for site collection is %s\n", current_id)
163+
},
167164
},
168-
},
169-
})
165+
})
166+
return fmt.Errorf("leader election died")
167+
},
168+
strategy,
169+
func(_ error, d time.Duration) {
170+
log.Printf("COLLECTOR: leader election failed. retrying after %s", d)
171+
})
170172
}
171173

172174
func StartCollector(cli *internalclient.KubeClient) {

internal/kube/flow/controller.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -130,26 +130,26 @@ func (c *Controller) Run(ctx context.Context) {
130130
c.logger.Error("amqp session error", slog.Any("error", err), slog.Bool("retryable", retryable))
131131
})
132132
go c.manager.Run(mgmtCtx)
133+
// handle shutdown though work queue
134+
go func() {
135+
<-mgmtCtx.Done()
136+
c.queue.ShutDown()
137+
}()
133138
for {
134-
select {
135-
case <-ctx.Done():
139+
item, done := c.queue.Get()
140+
if done {
141+
c.logger.Info("workqueue shutdown")
136142
return
137-
default:
138-
item, done := c.queue.Get()
139-
if done {
140-
c.logger.Info("workqueue shutdown")
141-
return
142-
}
143-
evnt := item.(workEvent)
144-
switch evnt.Type {
145-
case eventTypeProcess:
146-
if err := c.handlePodEvent(evnt); err != nil {
147-
c.queue.AddRateLimited(evnt)
148-
}
143+
}
144+
evnt := item.(workEvent)
145+
switch evnt.Type {
146+
case eventTypeProcess:
147+
if err := c.handlePodEvent(evnt); err != nil {
148+
c.queue.AddRateLimited(evnt)
149149
}
150-
c.queue.Forget(item)
151-
c.queue.Done(item)
152150
}
151+
c.queue.Forget(item)
152+
c.queue.Done(item)
153153
}
154154
}
155155

0 commit comments

Comments
 (0)