From 7c757a1d2f5c3692258e658db8db7d362784c9c2 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 6 May 2020 12:18:38 +0200 Subject: [PATCH] fix for generation aware retries issue --- .../operator/processing/EventScheduler.java | 46 +++++++++++++------ .../operator/processing/EventStore.java | 12 ++++- .../operator/ControllerExecutionIT.java | 29 +++++++++++- 3 files changed, 71 insertions(+), 16 deletions(-) diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java index 95d7d89777..d7b7a66023 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java @@ -64,19 +64,22 @@ void scheduleEventFromApi(CustomResourceEvent event) { try { lock.lock(); log.debug("Scheduling event from Api: {}", event); - if (event.getAction() == Action.DELETED) { - // This removes data from memory for deleted resource (prevent memory leak basically). - // Its quite interesting that this is always sufficient here (no finalizer or other mechanism needs to be involved). - // Thus, if operator is running we get DELETE the event, if not the memory is already gone anyways. - eventStore.removeLastGenerationForDeletedResource(event.resourceUid()); - if (event.getResource().getMetadata().getDeletionTimestamp() != null) { - // Note that we always use finalizers, we want to process delete event just in corner case, - // when we are not able to add finalizer (lets say because of optimistic locking error, and the resource was deleted instantly). - // We want to skip in case of finalizer was there since we don't want to execute delete method always at least 2x, - // which would be the result if we don't skip here. (there is no deletion timestamp if resource deleted without finalizer.) - log.debug("Skipping delete event since deletion timestamp is present on resource, so finalizer was in place."); - return; - } + if (event.getAction() == Action.DELETED && event.getResource().getMetadata().getDeletionTimestamp() != null) { + // This removes data from memory for deleted resource (prevent memory leak). + // There is am extreme corner case when there is no finalizer, we ignore this situation now. + eventStore.cleanup(event.resourceUid()); + // Note that we always use finalizers, we want to process delete event just in corner case, + // when we are not able to add finalizer (lets say because of optimistic locking error, and the resource was deleted instantly). + // We want to skip in case of finalizer was there since we don't want to execute delete method always at least 2x, + // which would be the result if we don't skip here. (there is no deletion timestamp if resource deleted without finalizer.) + log.debug("Skipping delete event since deletion timestamp is present on resource, so finalizer was in place."); + return; + } + if (generationAware) { + // we have to store the last event for generation aware retries, since if we received new events since + // the execution, which did not have increased generation we will fail automatically on a conflict + // on a retry. + eventStore.addLastEventForGenerationAwareRetry(event); } // In case of generation aware processing, we want to replace this even if generation not increased, // to have the most recent copy of the event. @@ -145,13 +148,28 @@ void eventProcessingFailed(CustomResourceEvent event) { scheduleNotYetScheduledEventForExecution(event.resourceUid()); } else { log.debug("Event processing failed. Attempting to re-schedule the event: {}", event); - scheduleEventForExecution(event); + if (generationAware) { + CustomResourceEvent eventToRetry = selectEventToRetry(event); + scheduleEventForExecution(eventToRetry); + } else { + scheduleEventForExecution(event); + } } } finally { lock.unlock(); } } + private CustomResourceEvent selectEventToRetry(CustomResourceEvent event) { + CustomResourceEvent lastEvent = eventStore.getReceivedLastEventForGenerationAwareRetry(event.resourceUid()); + if (!event.getResource().getMetadata().getResourceVersion() + .equals(lastEvent.getResource().getMetadata().getResourceVersion())) { + return lastEvent; + } else { + return event; + } + } + private void scheduleNotYetScheduledEventForExecution(String uuid) { CustomResourceEvent notScheduledEvent = eventStore.removeEventNotScheduled(uuid); scheduleEventForExecution(notScheduledEvent); diff --git a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventStore.java b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventStore.java index 9356e1d5f7..3cb12e98d0 100644 --- a/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventStore.java +++ b/operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventStore.java @@ -8,6 +8,7 @@ public class EventStore { private final Map eventsNotScheduled = new HashMap<>(); private final Map eventsUnderProcessing = new HashMap<>(); private final Map lastGeneration = new HashMap<>(); + private final Map receivedLastEventForGenerationAwareRetry = new HashMap<>(); public boolean containsNotScheduledEvent(String uuid) { return eventsNotScheduled.containsKey(uuid); @@ -52,7 +53,16 @@ public Long getLastStoredGeneration(CustomResourceEvent event) { return lastGeneration.get(event.getResource().getMetadata().getUid()); } - public void removeLastGenerationForDeletedResource(String uuid) { + public void addLastEventForGenerationAwareRetry(CustomResourceEvent event) { + receivedLastEventForGenerationAwareRetry.put(event.resourceUid(), event); + } + + public CustomResourceEvent getReceivedLastEventForGenerationAwareRetry(String uuid) { + return receivedLastEventForGenerationAwareRetry.get(uuid); + } + + public void cleanup(String uuid) { lastGeneration.remove(uuid); + receivedLastEventForGenerationAwareRetry.remove(uuid); } } diff --git a/operator-framework/src/test/java/com/github/containersolutions/operator/ControllerExecutionIT.java b/operator-framework/src/test/java/com/github/containersolutions/operator/ControllerExecutionIT.java index ba4367c1ea..86ee7c408b 100644 --- a/operator-framework/src/test/java/com/github/containersolutions/operator/ControllerExecutionIT.java +++ b/operator-framework/src/test/java/com/github/containersolutions/operator/ControllerExecutionIT.java @@ -9,6 +9,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.concurrent.TimeUnit; import static com.github.containersolutions.operator.IntegrationTestSupport.TEST_NAMESPACE; @@ -36,6 +37,7 @@ public void configMapGetsCreatedForTestCustomResource() { awaitResourcesCreatedOrUpdated(); awaitStatusUpdated(); + assertThat(integrationTestSupport.numberOfControllerExecutions()).isEqualTo(2); }); } @@ -52,6 +54,26 @@ public void eventIsSkippedChangedOnMetadataOnlyUpdate() { }); } + // We test the scenario when we receive 2 events, while the generation is not increased by the other. + // This will cause a conflict, and on retry the new version of the resource needs to be scheduled + // to avoid repeating conflicts + @Test + public void generationAwareRetryConflict() { + initAndCleanup(true); + integrationTestSupport.teardownIfSuccess(() -> { + TestCustomResource resource = testCustomResource(); + TestCustomResource resource2 = testCustomResource(); + resource2.getMetadata().getAnnotations().put("testannotation", "val"); + + integrationTestSupport.getCrOperations().inNamespace(TEST_NAMESPACE).create(resource); + integrationTestSupport.getCrOperations().inNamespace(TEST_NAMESPACE).createOrReplace(resource2); + + awaitResourcesCreatedOrUpdated(); + awaitStatusUpdated(10); + }); + } + + void awaitResourcesCreatedOrUpdated() { await("configmap created").atMost(5, TimeUnit.SECONDS) .untilAsserted(() -> { @@ -63,7 +85,11 @@ void awaitResourcesCreatedOrUpdated() { } void awaitStatusUpdated() { - await("cr status updated").atMost(5, TimeUnit.SECONDS) + awaitStatusUpdated(5); + } + + void awaitStatusUpdated(int timeout) { + await("cr status updated").atMost(timeout, TimeUnit.SECONDS) .untilAsserted(() -> { TestCustomResource cr = integrationTestSupport.getCrOperations().inNamespace(TEST_NAMESPACE).withName("test-custom-resource").get(); assertThat(cr).isNotNull(); @@ -78,6 +104,7 @@ private TestCustomResource testCustomResource() { .withName("test-custom-resource") .withNamespace(TEST_NAMESPACE) .build()); + resource.getMetadata().setAnnotations(new HashMap<>()); resource.setKind("CustomService"); resource.setSpec(new TestCustomResourceSpec()); resource.getSpec().setConfigMapName("test-config-map");