Skip to content

Commit 0868403

Browse files
committed
examples: custom load balancer example
1 parent b09473b commit 0868403

File tree

7 files changed

+624
-0
lines changed

7 files changed

+624
-0
lines changed

examples/build.gradle

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,20 @@ task loadBalanceClient(type: CreateStartScripts) {
153153
classpath = startScripts.classpath
154154
}
155155

156+
task customLoadBalanceServer(type: CreateStartScripts) {
157+
mainClass = 'io.grpc.examples.customloadbalance.CustomLoadBalanceServer'
158+
applicationName = 'custom-load-balance-server'
159+
outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
160+
classpath = startScripts.classpath
161+
}
162+
163+
task customLoadBalanceClient(type: CreateStartScripts) {
164+
mainClass = 'io.grpc.examples.customloadbalance.CustomLoadBalanceClient'
165+
applicationName = 'custom-load-balance-client'
166+
outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
167+
classpath = startScripts.classpath
168+
}
169+
156170
task nameResolveServer(type: CreateStartScripts) {
157171
mainClass = 'io.grpc.examples.nameresolve.NameResolveServer'
158172
applicationName = 'name-resolve-server'
@@ -181,6 +195,8 @@ applicationDistribution.into('bin') {
181195
from(manualFlowControlServer)
182196
from(loadBalanceServer)
183197
from(loadBalanceClient)
198+
from(customLoadBalanceServer)
199+
from(customLoadBalanceClient)
184200
from(nameResolveServer)
185201
from(nameResolveClient)
186202
fileMode = 0755
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2022 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.examples.customloadbalance;
18+
19+
import com.google.gson.Gson;
20+
import io.grpc.Channel;
21+
import io.grpc.LoadBalancerRegistry;
22+
import io.grpc.ManagedChannel;
23+
import io.grpc.ManagedChannelBuilder;
24+
import io.grpc.NameResolverRegistry;
25+
import io.grpc.StatusRuntimeException;
26+
import io.grpc.examples.helloworld.GreeterGrpc;
27+
import io.grpc.examples.helloworld.HelloReply;
28+
import io.grpc.examples.helloworld.HelloRequest;
29+
import io.grpc.examples.loadbalance.ExampleNameResolverProvider;
30+
import java.util.Map;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.logging.Level;
33+
import java.util.logging.Logger;
34+
35+
public class CustomLoadBalanceClient {
36+
37+
private static final Logger logger = Logger.getLogger(CustomLoadBalanceClient.class.getName());
38+
39+
public static final String exampleScheme = "example";
40+
public static final String exampleServiceName = "lb.example.grpc.io";
41+
42+
private final GreeterGrpc.GreeterBlockingStub blockingStub;
43+
44+
public CustomLoadBalanceClient(Channel channel) {
45+
blockingStub = GreeterGrpc.newBlockingStub(channel);
46+
}
47+
48+
public void greet(String name) {
49+
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
50+
HelloReply response;
51+
try {
52+
response = blockingStub.sayHello(request);
53+
} catch (StatusRuntimeException e) {
54+
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
55+
return;
56+
}
57+
logger.info("Greeting: " + response.getMessage());
58+
}
59+
60+
61+
public static void main(String[] args) throws Exception {
62+
// We need to register the provider of our custom load balancer implementation
63+
LoadBalancerRegistry.getDefaultRegistry()
64+
.register(new ShufflingPickFirstLoadBalancerProvider());
65+
66+
NameResolverRegistry.getDefaultRegistry().register(new ExampleNameResolverProvider());
67+
68+
String target = String.format("%s:///%s", exampleScheme, exampleServiceName);
69+
70+
logger.info("Use default first_pick load balance policy");
71+
ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
72+
.usePlaintext()
73+
.build();
74+
try {
75+
CustomLoadBalanceClient client = new CustomLoadBalanceClient(channel);
76+
for (int i = 0; i < 5; i++) {
77+
client.greet("request" + i);
78+
}
79+
} finally {
80+
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
81+
}
82+
83+
logger.info("Change to custom shuffling_pick_first policy with a configured random seed");
84+
// The load balancer name in the config needs to match what getPolicyName() in the provider
85+
// returns. The randomSeed field we are using also needs to be understood by the provider when
86+
// parseLoadBalancingPolicyConfig() gets called.
87+
Map<String, ?> serviceConfig = new Gson().fromJson(
88+
"{ \"loadBalancingConfig\": " +
89+
" [ { \"example.shuffling_pick_first\": { \"randomSeed\": 123 } } ]" +
90+
"}",
91+
Map.class);
92+
channel = ManagedChannelBuilder.forTarget(target)
93+
.defaultServiceConfig(serviceConfig)
94+
.usePlaintext()
95+
.build();
96+
try {
97+
CustomLoadBalanceClient client = new CustomLoadBalanceClient(channel);
98+
for (int i = 0; i < 5; i++) {
99+
client.greet("request" + i);
100+
}
101+
} finally {
102+
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
103+
}
104+
}
105+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2023 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.grpc.examples.customloadbalance;
17+
18+
import io.grpc.Server;
19+
import io.grpc.ServerBuilder;
20+
import io.grpc.examples.helloworld.GreeterGrpc;
21+
import io.grpc.examples.helloworld.HelloReply;
22+
import io.grpc.examples.helloworld.HelloRequest;
23+
import io.grpc.stub.StreamObserver;
24+
25+
import java.io.IOException;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.logging.Logger;
28+
29+
public class CustomLoadBalanceServer {
30+
31+
private static final Logger logger = Logger.getLogger(
32+
io.grpc.examples.customloadbalance.CustomLoadBalanceServer.class.getName());
33+
static public final int serverCount = 3;
34+
static public final int startPort = 50051;
35+
private Server[] servers;
36+
37+
private void start() throws IOException {
38+
servers = new Server[serverCount];
39+
for (int i = 0; i < serverCount; i++) {
40+
int port = startPort + i;
41+
servers[i] = ServerBuilder.forPort(port)
42+
.addService(new GreeterImpl(port))
43+
.build()
44+
.start();
45+
logger.info("Server started, listening on " + port);
46+
}
47+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
48+
System.err.println("*** shutting down gRPC server since JVM is shutting down");
49+
try {
50+
CustomLoadBalanceServer.this.stop();
51+
} catch (InterruptedException e) {
52+
e.printStackTrace(System.err);
53+
}
54+
System.err.println("*** server shut down");
55+
}));
56+
}
57+
58+
private void stop() throws InterruptedException {
59+
for (int i = 0; i < serverCount; i++) {
60+
if (servers[i] != null) {
61+
servers[i].shutdown().awaitTermination(30, TimeUnit.SECONDS);
62+
}
63+
}
64+
}
65+
66+
private void blockUntilShutdown() throws InterruptedException {
67+
for (int i = 0; i < serverCount; i++) {
68+
if (servers[i] != null) {
69+
servers[i].awaitTermination();
70+
}
71+
}
72+
}
73+
74+
public static void main(String[] args) throws IOException, InterruptedException {
75+
final CustomLoadBalanceServer server = new CustomLoadBalanceServer();
76+
server.start();
77+
server.blockUntilShutdown();
78+
}
79+
80+
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
81+
82+
int port;
83+
84+
public GreeterImpl(int port) {
85+
this.port = port;
86+
}
87+
88+
@Override
89+
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
90+
HelloReply reply = HelloReply.newBuilder()
91+
.setMessage("Hello " + req.getName() + " from server<" + this.port + ">").build();
92+
responseObserver.onNext(reply);
93+
responseObserver.onCompleted();
94+
}
95+
}
96+
}
97+
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2022 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.examples.customloadbalance;
18+
19+
import com.google.common.collect.ImmutableMap;
20+
import io.grpc.EquivalentAddressGroup;
21+
import io.grpc.NameResolver;
22+
import io.grpc.Status;
23+
24+
import io.grpc.examples.loadbalance.LoadBalanceServer;
25+
import java.net.InetSocketAddress;
26+
import java.net.SocketAddress;
27+
import java.net.URI;
28+
import java.util.Arrays;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.stream.Collectors;
32+
import java.util.stream.Stream;
33+
34+
import static io.grpc.examples.loadbalance.LoadBalanceClient.exampleServiceName;
35+
36+
public class ExampleNameResolver extends NameResolver {
37+
38+
private Listener2 listener;
39+
40+
private final URI uri;
41+
42+
private final Map<String,List<InetSocketAddress>> addrStore;
43+
44+
public ExampleNameResolver(URI targetUri) {
45+
this.uri = targetUri;
46+
// This is a fake name resolver, so we just hard code the address here.
47+
addrStore = ImmutableMap.<String,List<InetSocketAddress>>builder()
48+
.put(exampleServiceName,
49+
Stream.iterate(LoadBalanceServer.startPort,p->p+1)
50+
.limit(LoadBalanceServer.serverCount)
51+
.map(port->new InetSocketAddress("localhost",port))
52+
.collect(Collectors.toList())
53+
)
54+
.build();
55+
}
56+
57+
@Override
58+
public String getServiceAuthority() {
59+
// Be consistent with behavior in grpc-go, authority is saved in Host field of URI.
60+
if (uri.getHost() != null) {
61+
return uri.getHost();
62+
}
63+
return "no host";
64+
}
65+
66+
@Override
67+
public void shutdown() {
68+
}
69+
70+
@Override
71+
public void start(Listener2 listener) {
72+
this.listener = listener;
73+
this.resolve();
74+
}
75+
76+
@Override
77+
public void refresh() {
78+
this.resolve();
79+
}
80+
81+
private void resolve() {
82+
List<InetSocketAddress> addresses = addrStore.get(uri.getPath().substring(1));
83+
try {
84+
List<EquivalentAddressGroup> equivalentAddressGroup = addresses.stream()
85+
// convert to socket address
86+
.map(this::toSocketAddress)
87+
// every socket address is a single EquivalentAddressGroup, so they can be accessed randomly
88+
.map(Arrays::asList)
89+
.map(this::addrToEquivalentAddressGroup)
90+
.collect(Collectors.toList());
91+
92+
ResolutionResult resolutionResult = ResolutionResult.newBuilder()
93+
.setAddresses(equivalentAddressGroup)
94+
.build();
95+
96+
this.listener.onResult(resolutionResult);
97+
98+
} catch (Exception e){
99+
// when error occurs, notify listener
100+
this.listener.onError(Status.UNAVAILABLE.withDescription("Unable to resolve host ").withCause(e));
101+
}
102+
}
103+
104+
private SocketAddress toSocketAddress(InetSocketAddress address) {
105+
return new InetSocketAddress(address.getHostName(), address.getPort());
106+
}
107+
108+
private EquivalentAddressGroup addrToEquivalentAddressGroup(List<SocketAddress> addrList) {
109+
return new EquivalentAddressGroup(addrList);
110+
}
111+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2022 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.examples.customloadbalance;
18+
19+
import io.grpc.NameResolver;
20+
import io.grpc.NameResolverProvider;
21+
22+
import io.grpc.examples.loadbalance.ExampleNameResolver;
23+
import java.net.URI;
24+
25+
import static io.grpc.examples.loadbalance.LoadBalanceClient.exampleScheme;
26+
27+
public class ExampleNameResolverProvider extends NameResolverProvider {
28+
@Override
29+
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
30+
return new ExampleNameResolver(targetUri);
31+
}
32+
33+
@Override
34+
protected boolean isAvailable() {
35+
return true;
36+
}
37+
38+
@Override
39+
protected int priority() {
40+
return 5;
41+
}
42+
43+
@Override
44+
// gRPC choose the first NameResolverProvider that supports the target URI scheme.
45+
public String getDefaultScheme() {
46+
return exampleScheme;
47+
}
48+
}

0 commit comments

Comments
 (0)