Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions examples/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ java_binary(
],
)

java_binary(
name = "custom-load-balance-client",
testonly = 1,
main_class = "io.grpc.examples.customloadbalance.CustomLoadBalanceClient",
runtime_deps = [
":examples",
],
)

java_binary(
name = "name-resolve-client",
testonly = 1,
Expand Down
8 changes: 8 additions & 0 deletions examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ task loadBalanceClient(type: CreateStartScripts) {
classpath = startScripts.classpath
}

task customLoadBalanceClient(type: CreateStartScripts) {
mainClass = 'io.grpc.examples.customloadbalance.CustomLoadBalanceClient'
applicationName = 'custom-load-balance-client'
outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
classpath = startScripts.classpath
}

task nameResolveServer(type: CreateStartScripts) {
mainClass = 'io.grpc.examples.nameresolve.NameResolveServer'
applicationName = 'name-resolve-server'
Expand Down Expand Up @@ -181,6 +188,7 @@ applicationDistribution.into('bin') {
from(manualFlowControlServer)
from(loadBalanceServer)
from(loadBalanceClient)
from(customLoadBalanceClient)
from(nameResolveServer)
from(nameResolveClient)
fileMode = 0755
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.examples.customloadbalance;

import com.google.gson.Gson;
import io.grpc.Channel;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolverRegistry;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.examples.loadbalance.ExampleNameResolverProvider;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* This client is intended for connecting with the @{link LoadBalanceServer} in the "loadbalance"
* example.
*/
public class CustomLoadBalanceClient {

private static final Logger logger = Logger.getLogger(CustomLoadBalanceClient.class.getName());

public static final String exampleScheme = "example";
public static final String exampleServiceName = "lb.example.grpc.io";

private final GreeterGrpc.GreeterBlockingStub blockingStub;

public CustomLoadBalanceClient(Channel channel) {
blockingStub = GreeterGrpc.newBlockingStub(channel);
}

public void greet(String name) {
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
response = blockingStub.sayHello(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Greeting: " + response.getMessage());
}


public static void main(String[] args) throws Exception {
// We need to register the provider of our custom load balancer implementation
LoadBalancerRegistry.getDefaultRegistry()
.register(new ShufflingPickFirstLoadBalancerProvider());

NameResolverRegistry.getDefaultRegistry().register(new ExampleNameResolverProvider());

String target = String.format("%s:///%s", exampleScheme, exampleServiceName);

logger.info("Use default first_pick load balance policy");
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();

try {
CustomLoadBalanceClient client = new CustomLoadBalanceClient(channel);
for (int i = 0; i < 5; i++) {
client.greet("request" + i);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}

logger.info("Change to custom shuffling_pick_first policy with a configured random seed");
// The load balancer name in the config needs to match what getPolicyName() in the provider
// returns. The randomSeed field we are using also needs to be understood by the provider when
// parseLoadBalancingPolicyConfig() gets called.
Map<String, ?> serviceConfig = new Gson().fromJson(
"{ \"loadBalancingConfig\": " +
" [ { \"example.shuffling_pick_first\": { \"randomSeed\": 123 } } ]" +
"}",
Map.class);
channel = ManagedChannelBuilder.forTarget(target)
.defaultServiceConfig(serviceConfig)
.usePlaintext()
.build();
try {
CustomLoadBalanceClient client = new CustomLoadBalanceClient(channel);
for (int i = 0; i < 5; i++) {
client.greet("request" + i);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.examples.customloadbalance;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;

import com.google.common.base.MoreObjects;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* An example {@link LoadBalancer} largely based on {@link ShufflingPickFirstLoadBalancer} that adds
* shuffling of the list of servers so that the first server provided in {@link ResolvedAddresses}
* won't necessarily be the server the channel will connect to.
*/
public class ShufflingPickFirstLoadBalancer extends LoadBalancer {

private final Helper helper;
private Subchannel subchannel;

/**
* This class defines the configuration used by this {@link LoadBalancer}. Note that no part of
* the gRPC library is aware of this interface, it will be populated by the
* {@link ShufflingPickFirstLoadBalancerProvider}.
*/
static class Config {

final Long randomSeed;

Config(Long randomSeed) {
this.randomSeed = randomSeed;
}
}

public ShufflingPickFirstLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
}

@Override
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers = new ArrayList<>(resolvedAddresses.getAddresses());
if (servers.isEmpty()) {
handleNameResolutionError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
+ ", attrs=" + resolvedAddresses.getAttributes()));
return false;
}

Config config
= (Config) resolvedAddresses.getLoadBalancingPolicyConfig();

Collections.shuffle(servers,
config.randomSeed != null ? new Random(config.randomSeed) : new Random());

if (subchannel == null) {
final Subchannel subchannel = helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(servers)
.build());
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo stateInfo) {
processSubchannelState(subchannel, stateInfo);
}
});
this.subchannel = subchannel;

helper.updateBalancingState(CONNECTING,
new Picker(PickResult.withSubchannel(this.subchannel)));
subchannel.requestConnection();
} else {
subchannel.updateAddresses(servers);
}

return true;
}

@Override
public void handleNameResolutionError(Status error) {
if (subchannel != null) {
subchannel.shutdown();
subchannel = null;
}
helper.updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
}

private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
ConnectivityState currentState = stateInfo.getState();
if (currentState == SHUTDOWN) {
return;
}
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}

SubchannelPicker picker;
switch (currentState) {
case IDLE:
picker = new RequestConnectionPicker(subchannel);
break;
case CONNECTING:
picker = new Picker(PickResult.withNoResult());
break;
case READY:
picker = new Picker(PickResult.withSubchannel(subchannel));
break;
case TRANSIENT_FAILURE:
picker = new Picker(PickResult.withError(stateInfo.getStatus()));
break;
default:
throw new IllegalArgumentException("Unsupported state:" + currentState);
}
helper.updateBalancingState(currentState, picker);
}


@Override
public void shutdown() {
if (subchannel != null) {
subchannel.shutdown();
}
}

@Override
public void requestConnection() {
if (subchannel != null) {
subchannel.requestConnection();
}
}

/**
* No-op picker which doesn't add any custom picking logic. It just passes already known result
* received in constructor.
*/
private static final class Picker extends SubchannelPicker {

private final PickResult result;

Picker(PickResult result) {
this.result = checkNotNull(result, "result");
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return result;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(Picker.class).add("result", result).toString();
}
}

/**
* Picker that requests connection during the first pick, and returns noResult.
*/
private final class RequestConnectionPicker extends SubchannelPicker {

private final Subchannel subchannel;
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);

RequestConnectionPicker(Subchannel subchannel) {
this.subchannel = checkNotNull(subchannel, "subchannel");
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
if (connectionRequested.compareAndSet(false, true)) {
helper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
subchannel.requestConnection();
}
});
}
return PickResult.withNoResult();
}
}
}
Loading