Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void readsSecondaryInManyToOneCases() throws InterruptedException {
.isEqualTo(1));
}

Job job() {
public static Job job() {
var job = new Job();
job.setMetadata(new ObjectMetaBuilder()
.withName("job1")
Expand All @@ -48,7 +48,7 @@ Job job() {
return job;
}

Cluster cluster() {
public static Cluster cluster() {
Cluster cluster = new Cluster();
cluster.setMetadata(new ObjectMetaBuilder()
.withName(CLUSTER_NAME)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.javaoperatorsdk.operator;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.sample.primarytosecondary.Cluster;
import io.javaoperatorsdk.operator.sample.primarytosecondary.JobReconciler;

import static io.javaoperatorsdk.operator.PrimaryToSecondaryIT.cluster;
import static io.javaoperatorsdk.operator.PrimaryToSecondaryIT.job;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class PrimaryToSecondaryMissingIT {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add some description that this unit test is to showcase the role of PtoS mapper and that it it not meant to be used as reusable code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, will add, thx!!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder()
.withAdditionalCustomResourceDefinition(Cluster.class)
.withReconciler(new JobReconciler(false))
.build();

@Test
void missingPrimaryToSecondaryCausesIssueAccessingSecondary() throws InterruptedException {
var reconciler = operator.getReconcilerOfType(JobReconciler.class);
operator.create(cluster());
Thread.sleep(300);
operator.create(job());

await().untilAsserted(() -> {
assertThat(reconciler.isErrorOccurred()).isTrue();
assertThat(reconciler.getNumberOfExecutions()).isZero();
});
}

@Test
void accessingDirectlyTheCacheWorksWithoutPToSMapper() throws InterruptedException {
var reconciler = operator.getReconcilerOfType(JobReconciler.class);
reconciler.setGetResourceDirectlyFromCache(true);
operator.create(cluster());
Thread.sleep(300);
operator.create(job());

await().untilAsserted(() -> {
assertThat(reconciler.isErrorOccurred()).isFalse();
assertThat(reconciler.getNumberOfExecutions()).isPositive();
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,42 @@

@ControllerConfiguration()
public class JobReconciler
implements Reconciler<Job>, EventSourceInitializer<Job> {
implements Reconciler<Job>, EventSourceInitializer<Job>, ErrorStatusHandler<Job> {

private static final String JOB_CLUSTER_INDEX = "job-cluster-index";

private final AtomicInteger numberOfExecutions = new AtomicInteger(0);

private final boolean addPrimaryToSecondaryMapper;
private boolean getResourceDirectlyFromCache = false;
private volatile boolean errorOccurred;

public JobReconciler() {
this(true);
}

public JobReconciler(boolean addPrimaryToSecondaryMapper) {
this.addPrimaryToSecondaryMapper = addPrimaryToSecondaryMapper;
}

@Override
public UpdateControl<Job> reconcile(
Job resource, Context<Job> context) {

context.getSecondaryResource(Cluster.class)
.orElseThrow(() -> new IllegalStateException("Secondary resource should be present"));
if (!getResourceDirectlyFromCache) {
// this is possible always when there is primary to secondary mapper
context.getSecondaryResource(Cluster.class)
.orElseThrow(() -> new IllegalStateException("Secondary resource should be present"));
} else {
// reading the resource from cache as alternative, works without primary to secondary mapper
var informerEventSource = (InformerEventSource<Cluster, Job>) context.eventSourceRetriever()
.getResourceEventSourceFor(Cluster.class);
informerEventSource
.get(new ResourceID(resource.getSpec().getClusterName(),
resource.getMetadata().getNamespace()))
.orElseThrow(
() -> new IllegalStateException("Secondary resource cannot be read from cache"));
}
numberOfExecutions.addAndGet(1);
return UpdateControl.noUpdate();
}
Expand All @@ -36,20 +60,22 @@ public Map<String, EventSource> prepareEventSources(EventSourceContext<Job> cont
context.getPrimaryCache().addIndexer(JOB_CLUSTER_INDEX, (job -> List
.of(indexKey(job.getSpec().getClusterName(), job.getMetadata().getNamespace()))));

InformerConfiguration<Cluster> informerConfiguration =
InformerConfiguration.InformerConfigurationBuilder<Cluster> informerConfiguration =
InformerConfiguration.from(Cluster.class, context)
.withSecondaryToPrimaryMapper(cluster -> context.getPrimaryCache()
.byIndex(JOB_CLUSTER_INDEX, indexKey(cluster.getMetadata().getName(),
cluster.getMetadata().getNamespace()))
.stream().map(ResourceID::fromResource).collect(Collectors.toSet()))
.withPrimaryToSecondaryMapper(
(PrimaryToSecondaryMapper<Job>) primary -> Set.of(new ResourceID(
primary.getSpec().getClusterName(), primary.getMetadata().getNamespace())))
.withNamespacesInheritedFromController(context)
.build();
.withNamespacesInheritedFromController(context);

if (addPrimaryToSecondaryMapper) {
informerConfiguration = informerConfiguration.withPrimaryToSecondaryMapper(
(PrimaryToSecondaryMapper<Job>) primary -> Set.of(new ResourceID(
primary.getSpec().getClusterName(), primary.getMetadata().getNamespace())));
}

return EventSourceInitializer
.nameEventSources(new InformerEventSource<>(informerConfiguration, context));
.nameEventSources(new InformerEventSource<>(informerConfiguration.build(), context));
}

private String indexKey(String clusterName, String namespace) {
Expand All @@ -59,4 +85,20 @@ private String indexKey(String clusterName, String namespace) {
public int getNumberOfExecutions() {
return numberOfExecutions.get();
}

@Override
public ErrorStatusUpdateControl<Job> updateErrorStatus(Job resource, Context<Job> context,
Exception e) {
errorOccurred = true;
return ErrorStatusUpdateControl.noStatusUpdate();
}

public boolean isErrorOccurred() {
return errorOccurred;
}

public JobReconciler setGetResourceDirectlyFromCache(boolean getResourceDirectlyFromCache) {
this.getResourceDirectlyFromCache = getResourceDirectlyFromCache;
return this;
}
}