Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResourceFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -19,6 +20,7 @@ public interface ConfigurationService {
ObjectMapper OBJECT_MAPPER = new ObjectMapper();

Cloner DEFAULT_CLONER = new Cloner() {
@SuppressWarnings("unchecked")
@Override
public HasMetadata clone(HasMetadata object) {
try {
Expand Down Expand Up @@ -126,4 +128,8 @@ default boolean closeClientOnStop() {
default ObjectMapper getObjectMapper() {
return OBJECT_MAPPER;
}

default DependentResourceFactory dependentResourceFactory() {
return new DependentResourceFactory() {};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ private RecentOperationCacheFiller<R> eventSourceAsRecentOperationCacheFiller()
return (RecentOperationCacheFiller<R>) ((EventSourceProvider<P>) this).getEventSource();
}

@SuppressWarnings("unchecked")
// this cannot be done in constructor since event source might be initialized later
protected boolean isFilteringEventSource() {
if (this instanceof EventSourceProvider) {
Expand All @@ -135,6 +136,7 @@ protected boolean isFilteringEventSource() {
}
}

@SuppressWarnings("unchecked")
// this cannot be done in constructor since event source might be initialized later
protected boolean isRecentOperationCacheFiller() {
if (this instanceof EventSourceProvider) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;

import java.lang.reflect.InvocationTargetException;

import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;

public interface DependentResourceFactory {

default <T extends DependentResource<?, ?>> T createFrom(DependentResourceSpec<T, ?> spec) {
try {
return spec.getDependentResourceClass().getConstructor().newInstance();
} catch (InstantiationException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
throw new IllegalArgumentException("Cannot instantiate DependentResource "
+ spec.getDependentResourceClass().getCanonicalName(), e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;

public interface ResourceTypeAware<R> {

Class<R> resourceType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,21 +19,25 @@
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.monitoring.Metrics.ControllerExecution;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ContextInitializer;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.processing.dependent.DependentResourceManager;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResourceConfigurator;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.KubernetesClientAware;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;

@SuppressWarnings({"unchecked"})
@SuppressWarnings({"unchecked", "rawtypes"})
@Ignore
public class Controller<P extends HasMetadata> implements Reconciler<P>,
LifecycleAware, EventSourceInitializer<P> {
Expand All @@ -43,22 +48,51 @@ public class Controller<P extends HasMetadata> implements Reconciler<P>,
private final ControllerConfiguration<P> configuration;
private final KubernetesClient kubernetesClient;
private final EventSourceManager<P> eventSourceManager;
private final DependentResourceManager<P> dependents;
private final List<DependentResource> dependents;
private final boolean contextInitializer;

public Controller(Reconciler<P> reconciler,
ControllerConfiguration<P> configuration,
KubernetesClient kubernetesClient) {
this.reconciler = reconciler;
this.configuration = configuration;
this.kubernetesClient = kubernetesClient;
contextInitializer = reconciler instanceof ContextInitializer;

eventSourceManager = new EventSourceManager<>(this);
dependents = new DependentResourceManager<>(this);

dependents = configuration.getDependentResources().stream()
.map(drs -> createAndConfigureFrom(drs, kubernetesClient))
.collect(Collectors.toList());
}

@SuppressWarnings("rawtypes")
private DependentResource createAndConfigureFrom(DependentResourceSpec spec,
KubernetesClient client) {
final var dependentResource =
ConfigurationServiceProvider.instance().dependentResourceFactory().createFrom(spec);

if (dependentResource instanceof KubernetesClientAware) {
((KubernetesClientAware) dependentResource).setKubernetesClient(client);
}

if (dependentResource instanceof DependentResourceConfigurator) {
final var configurator = (DependentResourceConfigurator) dependentResource;
spec.getDependentResourceConfiguration().ifPresent(configurator::configureWith);
}
return dependentResource;
}

private void initContextIfNeeded(P resource, Context<P> context) {
if (contextInitializer) {
((ContextInitializer<P>) reconciler).initContext(resource, context);
}
}

@Override
public DeleteControl cleanup(P resource, Context<P> context) {
dependents.cleanup(resource, context);
initContextIfNeeded(resource, context);
dependents.forEach(dependent -> dependent.cleanup(resource, context));

try {
return metrics().timeControllerExecution(
Expand Down Expand Up @@ -90,7 +124,8 @@ public DeleteControl execute() {

@Override
public UpdateControl<P> reconcile(P resource, Context<P> context) throws Exception {
dependents.reconcile(resource, context);
initContextIfNeeded(resource, context);
dependents.forEach(dependent -> dependent.reconcile(resource, context));

return metrics().timeControllerExecution(
new ControllerExecution<>() {
Expand Down Expand Up @@ -131,8 +166,12 @@ private Metrics metrics() {

@Override
public List<EventSource> prepareEventSources(EventSourceContext<P> context) {
final var dependentSources = dependents.prepareEventSources(context);
List<EventSource> sources = new LinkedList<>(dependentSources);
List<EventSource> sources = new LinkedList<>();
dependents.stream()
.filter(dependentResource -> dependentResource instanceof EventSourceProvider)
.map(EventSourceProvider.class::cast)
.map(provider -> provider.initEventSource(context))
.forEach(sources::add);

// add manually defined event sources
if (reconciler instanceof EventSourceInitializer) {
Expand Down Expand Up @@ -267,6 +306,6 @@ public void stop() {

@SuppressWarnings("rawtypes")
public List<DependentResource> getDependents() {
return dependents.getDependents();
return dependents;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@
import java.util.Optional;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.Utils;
import io.javaoperatorsdk.operator.api.reconciler.dependent.AbstractDependentResource;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.ResourceTypeAware;
import io.javaoperatorsdk.operator.processing.event.ExternalResourceCachingEventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;

public abstract class AbstractCachingDependentResource<R, P extends HasMetadata>
extends AbstractDependentResource<R, P> implements EventSourceProvider<P> {
extends AbstractDependentResource<R, P>
implements EventSourceProvider<P>, ResourceTypeAware<R> {

protected ExternalResourceCachingEventSource<R, P> eventSource;
private final Class<R> resourceType;

protected AbstractCachingDependentResource(Class<R> resourceType) {
this.resourceType = resourceType;
}

public Optional<R> fetchResource(P primaryResource) {
return eventSource.getAssociated(primaryResource);
Expand All @@ -23,8 +29,9 @@ public EventSource getEventSource() {
return eventSource;
}

protected Class<R> resourceType() {
return (Class<R>) Utils.getFirstTypeArgumentFromExtendedClass(getClass());
@Override
public Class<R> resourceType() {
return resourceType;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.javaoperatorsdk.operator.processing.dependent.external;

import io.fabric8.kubernetes.api.model.HasMetadata;

public abstract class AbstractPollingDependentResource<R, P extends HasMetadata>
extends AbstractCachingDependentResource<R, P> {

public static final int DEFAULT_POLLING_PERIOD = 5000;
private long pollingPeriod;

protected AbstractPollingDependentResource(Class<R> resourceType) {
this(resourceType, DEFAULT_POLLING_PERIOD);
}

public AbstractPollingDependentResource(Class<R> resourceType, long pollingPeriod) {
super(resourceType);
this.pollingPeriod = pollingPeriod;
}

public void setPollingPeriod(long pollingPeriod) {
this.pollingPeriod = pollingPeriod;
}

public long getPollingPeriod() {
return pollingPeriod;
}
}
Loading