diff --git a/examples/BUILD.bazel b/examples/BUILD.bazel index 2a5ed52b35c..cd810eb4892 100644 --- a/examples/BUILD.bazel +++ b/examples/BUILD.bazel @@ -154,6 +154,15 @@ java_binary( ], ) +java_binary( + name = "custom-load-balance-client", + testonly = 1, + main_class = "io.grpc.examples.customloadbalance.CustomLoadBalanceClient", + runtime_deps = [ + ":examples", + ], +) + java_binary( name = "name-resolve-client", testonly = 1, diff --git a/examples/build.gradle b/examples/build.gradle index c45e8e9da54..2208f3c0de4 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -153,6 +153,13 @@ task loadBalanceClient(type: CreateStartScripts) { classpath = startScripts.classpath } +task customLoadBalanceClient(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.customloadbalance.CustomLoadBalanceClient' + applicationName = 'custom-load-balance-client' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + task nameResolveServer(type: CreateStartScripts) { mainClass = 'io.grpc.examples.nameresolve.NameResolveServer' applicationName = 'name-resolve-server' @@ -181,6 +188,7 @@ applicationDistribution.into('bin') { from(manualFlowControlServer) from(loadBalanceServer) from(loadBalanceClient) + from(customLoadBalanceClient) from(nameResolveServer) from(nameResolveClient) fileMode = 0755 diff --git a/examples/src/main/java/io/grpc/examples/customloadbalance/CustomLoadBalanceClient.java b/examples/src/main/java/io/grpc/examples/customloadbalance/CustomLoadBalanceClient.java new file mode 100644 index 00000000000..2cf50a80b0a --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/customloadbalance/CustomLoadBalanceClient.java @@ -0,0 +1,108 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.customloadbalance; + +import com.google.gson.Gson; +import io.grpc.Channel; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.LoadBalancerRegistry; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.NameResolverRegistry; +import io.grpc.StatusRuntimeException; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.examples.loadbalance.ExampleNameResolverProvider; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This client is intended for connecting with the {@code LoadBalanceServer} in the "loadbalance" + * example. + */ +public class CustomLoadBalanceClient { + + private static final Logger logger = Logger.getLogger(CustomLoadBalanceClient.class.getName()); + + private final GreeterGrpc.GreeterBlockingStub blockingStub; + + public CustomLoadBalanceClient(Channel channel) { + blockingStub = GreeterGrpc.newBlockingStub(channel); + } + + public void greet(String name) { + HelloRequest request = HelloRequest.newBuilder().setName(name).build(); + HelloReply response; + try { + response = blockingStub.sayHello(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + logger.info("Greeting: " + response.getMessage()); + } + + + public static void main(String[] args) throws Exception { + // We need to register the provider of our custom load balancer implementation + LoadBalancerRegistry.getDefaultRegistry() + .register(new ShufflingPickFirstLoadBalancerProvider()); + + NameResolverRegistry.getDefaultRegistry().register(new ExampleNameResolverProvider()); + + String target = "example:///lb.example.grpc.io"; + + logger.info("Use default first_pick load balance policy"); + ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) + .build(); + + try { + CustomLoadBalanceClient client = new CustomLoadBalanceClient(channel); + for (int i = 0; i < 5; i++) { + client.greet("request" + i); + } + } finally { + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + + logger.info("Change to custom shuffling_pick_first policy with a configured random seed"); + // The load balancer name in the config needs to match what getPolicyName() in the provider + // returns. The randomSeed field we are using also needs to be understood by the provider when + // parseLoadBalancingPolicyConfig() gets called. + Map serviceConfig = new Gson().fromJson( + "{ \"loadBalancingConfig\": " + + " [ { \"grpc.examples.customloadbalance.ShufflingPickFirst\": { \"randomSeed\": 123 } } ]" + + "}", + Map.class); + channel = ManagedChannelBuilder.forTarget(target) + .defaultServiceConfig(serviceConfig) + .usePlaintext() + .build(); + try { + CustomLoadBalanceClient client = new CustomLoadBalanceClient(channel); + for (int i = 0; i < 5; i++) { + client.greet("request" + i); + } + } finally { + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } +} diff --git a/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java b/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java new file mode 100644 index 00000000000..e76cde3dc36 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java @@ -0,0 +1,204 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.customloadbalance; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.SHUTDOWN; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; + +import com.google.common.base.MoreObjects; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.Status; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * An example {@link LoadBalancer} largely based on {@link ShufflingPickFirstLoadBalancer} that adds + * shuffling of the list of servers so that the first server provided in {@link ResolvedAddresses} + * won't necessarily be the server the channel will connect to. + */ +class ShufflingPickFirstLoadBalancer extends LoadBalancer { + + private final Helper helper; + private Subchannel subchannel; + + /** + * This class defines the configuration used by this {@link LoadBalancer}. Note that no part of + * the gRPC library is aware of this class, it will be populated by the + * {@link ShufflingPickFirstLoadBalancerProvider}. + */ + static class Config { + + final Long randomSeed; + + Config(Long randomSeed) { + this.randomSeed = randomSeed; + } + } + + public ShufflingPickFirstLoadBalancer(Helper helper) { + this.helper = checkNotNull(helper, "helper"); + } + + @Override + public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { + List servers = new ArrayList<>(resolvedAddresses.getAddresses()); + if (servers.isEmpty()) { + handleNameResolutionError(Status.UNAVAILABLE.withDescription( + "NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses() + + ", attrs=" + resolvedAddresses.getAttributes())); + return false; + } + + Config config + = (Config) resolvedAddresses.getLoadBalancingPolicyConfig(); + + Collections.shuffle(servers, + config.randomSeed != null ? new Random(config.randomSeed) : new Random()); + + if (subchannel == null) { + final Subchannel subchannel = helper.createSubchannel( + CreateSubchannelArgs.newBuilder() + .setAddresses(servers) + .build()); + subchannel.start(new SubchannelStateListener() { + @Override + public void onSubchannelState(ConnectivityStateInfo stateInfo) { + processSubchannelState(subchannel, stateInfo); + } + }); + this.subchannel = subchannel; + + helper.updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult())); + subchannel.requestConnection(); + } else { + subchannel.updateAddresses(servers); + } + + return true; + } + + @Override + public void handleNameResolutionError(Status error) { + if (subchannel != null) { + subchannel.shutdown(); + subchannel = null; + } + helper.updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); + } + + private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + ConnectivityState currentState = stateInfo.getState(); + if (currentState == SHUTDOWN) { + return; + } + if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) { + helper.refreshNameResolution(); + } + + SubchannelPicker picker; + switch (currentState) { + case IDLE: + picker = new RequestConnectionPicker(subchannel); + break; + case CONNECTING: + picker = new Picker(PickResult.withNoResult()); + break; + case READY: + picker = new Picker(PickResult.withSubchannel(subchannel)); + break; + case TRANSIENT_FAILURE: + picker = new Picker(PickResult.withError(stateInfo.getStatus())); + break; + default: + throw new IllegalArgumentException("Unsupported state:" + currentState); + } + helper.updateBalancingState(currentState, picker); + } + + + @Override + public void shutdown() { + if (subchannel != null) { + subchannel.shutdown(); + } + } + + @Override + public void requestConnection() { + if (subchannel != null) { + subchannel.requestConnection(); + } + } + + /** + * No-op picker which doesn't add any custom picking logic. It just passes already known result + * received in constructor. + */ + private static final class Picker extends SubchannelPicker { + + private final PickResult result; + + Picker(PickResult result) { + this.result = checkNotNull(result, "result"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return result; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(Picker.class).add("result", result).toString(); + } + } + + /** + * Picker that requests connection during the first pick, and returns noResult. + */ + private final class RequestConnectionPicker extends SubchannelPicker { + + private final Subchannel subchannel; + private final AtomicBoolean connectionRequested = new AtomicBoolean(false); + + RequestConnectionPicker(Subchannel subchannel) { + this.subchannel = checkNotNull(subchannel, "subchannel"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + if (connectionRequested.compareAndSet(false, true)) { + helper.getSynchronizationContext().execute(new Runnable() { + @Override + public void run() { + subchannel.requestConnection(); + } + }); + } + return PickResult.withNoResult(); + } + } +} \ No newline at end of file diff --git a/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancerProvider.java b/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancerProvider.java new file mode 100644 index 00000000000..d72b8ca1c8d --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancerProvider.java @@ -0,0 +1,65 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.customloadbalance; + +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; +import java.util.Map; + +public class ShufflingPickFirstLoadBalancerProvider extends LoadBalancerProvider { + + @Override + public ConfigOrError parseLoadBalancingPolicyConfig(Map rawLoadBalancingPolicyConfig) { + Long randomSeed = null; + + // The load balancing configuration generally comes from a remote source over the wire, be + // defensive when parsing it. + try { + Object randomSeedObj = rawLoadBalancingPolicyConfig.get("randomSeed"); + if (randomSeedObj instanceof Double) { + randomSeed = ((Double) randomSeedObj).longValue(); + } + return ConfigOrError.fromConfig(new ShufflingPickFirstLoadBalancer.Config(randomSeed)); + } catch (RuntimeException e) { + return ConfigOrError.fromError( + Status.UNAVAILABLE.withDescription("unable to parse LB config").withCause(e)); + } + } + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return new ShufflingPickFirstLoadBalancer(helper); + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return "grpc.examples.customloadbalance.ShufflingPickFirst"; + } +}