Skip to content

Commit f5fd74f

Browse files
ejona86larry-safran
authored andcommitted
examples: Add pre-serialized-message example (grpc#10112)
This came out of the question grpc#9707, and could be useful to others.
1 parent e27f98e commit f5fd74f

File tree

5 files changed

+319
-0
lines changed

5 files changed

+319
-0
lines changed

examples/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ $ bazel-bin/hello-world-client
205205

206206
- [JWT-based Authentication](example-jwt-auth)
207207

208+
- [Pre-serialized messages](src/main/java/io/grpc/examples/preserialized)
209+
208210
## Unit test examples
209211

210212
Examples for unit testing gRPC clients and servers are located in [examples/src/test](src/test).

examples/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ createStartScripts('io.grpc.examples.multiplex.MultiplexingServer')
110110
createStartScripts('io.grpc.examples.multiplex.SharingClient')
111111
createStartScripts('io.grpc.examples.nameresolve.NameResolveClient')
112112
createStartScripts('io.grpc.examples.nameresolve.NameResolveServer')
113+
createStartScripts('io.grpc.examples.preserialized.PreSerializedClient')
114+
createStartScripts('io.grpc.examples.preserialized.PreSerializedServer')
113115
createStartScripts('io.grpc.examples.retrying.RetryingHelloWorldClient')
114116
createStartScripts('io.grpc.examples.retrying.RetryingHelloWorldServer')
115117
createStartScripts('io.grpc.examples.routeguide.RouteGuideClient')
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2023 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.examples.preserialized;
18+
19+
import com.google.common.io.ByteStreams;
20+
import io.grpc.MethodDescriptor;
21+
import java.io.ByteArrayInputStream;
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
25+
/**
26+
* A marshaller that produces a byte[] instead of decoding into typical POJOs. It can be used for
27+
* any message type.
28+
*/
29+
final class ByteArrayMarshaller implements MethodDescriptor.Marshaller<byte[]> {
30+
@Override
31+
public byte[] parse(InputStream stream) {
32+
try {
33+
return ByteStreams.toByteArray(stream);
34+
} catch (IOException ex) {
35+
throw new RuntimeException(ex);
36+
}
37+
}
38+
39+
@Override
40+
public InputStream stream(byte[] b) {
41+
return new ByteArrayInputStream(b);
42+
}
43+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2023 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.examples.preserialized;
18+
19+
import io.grpc.CallOptions;
20+
import io.grpc.Channel;
21+
import io.grpc.Grpc;
22+
import io.grpc.InsecureChannelCredentials;
23+
import io.grpc.ManagedChannel;
24+
import io.grpc.MethodDescriptor;
25+
import io.grpc.StatusRuntimeException;
26+
import io.grpc.examples.helloworld.GreeterGrpc;
27+
import io.grpc.examples.helloworld.HelloReply;
28+
import io.grpc.examples.helloworld.HelloRequest;
29+
import io.grpc.stub.ClientCalls;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.logging.Level;
32+
import java.util.logging.Logger;
33+
34+
/**
35+
* A client that requests a greeting from a hello-world server, but using a pre-serialized request.
36+
* This is a performance optimization that can be useful if you read the request from on-disk or a
37+
* database where it is already serialized, or if you need to send the same complicated message to
38+
* many servers. The same approach can avoid deserializing responses, to be stored in a database.
39+
* This adjustment is client-side only; the server is unable to detect the difference, so this
40+
* client is fully-compatible with the normal {@link HelloWorldServer}.
41+
*/
42+
public class PreSerializedClient {
43+
private static final Logger logger = Logger.getLogger(PreSerializedClient.class.getName());
44+
45+
/**
46+
* Modified sayHello() descriptor with bytes as the request, instead of HelloRequest. By adjusting
47+
* toBuilder() you can choose which of the request and response are bytes.
48+
*/
49+
private static final MethodDescriptor<byte[], HelloReply> SAY_HELLO
50+
= GreeterGrpc.getSayHelloMethod()
51+
.toBuilder(new ByteArrayMarshaller(), GreeterGrpc.getSayHelloMethod().getResponseMarshaller())
52+
.build();
53+
54+
private final Channel channel;
55+
56+
/** Construct client for accessing hello-world server using the existing channel. */
57+
public PreSerializedClient(Channel channel) {
58+
this.channel = channel;
59+
}
60+
61+
/** Say hello to server. */
62+
public void greet(String name) {
63+
logger.info("Will try to greet " + name + " ...");
64+
byte[] request = HelloRequest.newBuilder().setName(name).build().toByteArray();
65+
HelloReply response;
66+
try {
67+
// Stubs use ClientCalls to send RPCs. Since the generated stub won't have byte[] in its
68+
// method signature, this uses ClientCalls directly. It isn't as convenient, but it behaves
69+
// the same as a normal stub.
70+
response = ClientCalls.blockingUnaryCall(channel, SAY_HELLO, CallOptions.DEFAULT, request);
71+
} catch (StatusRuntimeException e) {
72+
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
73+
return;
74+
}
75+
logger.info("Greeting: " + response.getMessage());
76+
}
77+
78+
/**
79+
* Greet server. If provided, the first element of {@code args} is the name to use in the
80+
* greeting. The second argument is the target server.
81+
*/
82+
public static void main(String[] args) throws Exception {
83+
String user = "world";
84+
String target = "localhost:50051";
85+
if (args.length > 0) {
86+
if ("--help".equals(args[0])) {
87+
System.err.println("Usage: [name [target]]");
88+
System.err.println("");
89+
System.err.println(" name The name you wish to be greeted by. Defaults to " + user);
90+
System.err.println(" target The server to connect to. Defaults to " + target);
91+
System.exit(1);
92+
}
93+
user = args[0];
94+
}
95+
if (args.length > 1) {
96+
target = args[1];
97+
}
98+
99+
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
100+
.build();
101+
try {
102+
PreSerializedClient client = new PreSerializedClient(channel);
103+
client.greet(user);
104+
} finally {
105+
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
106+
}
107+
}
108+
}
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright 2023 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.examples.preserialized;
18+
19+
import io.grpc.BindableService;
20+
import io.grpc.Grpc;
21+
import io.grpc.InsecureServerCredentials;
22+
import io.grpc.MethodDescriptor;
23+
import io.grpc.Server;
24+
import io.grpc.ServerCallHandler;
25+
import io.grpc.ServerMethodDefinition;
26+
import io.grpc.ServerServiceDefinition;
27+
import io.grpc.ServiceDescriptor;
28+
import io.grpc.examples.helloworld.GreeterGrpc;
29+
import io.grpc.examples.helloworld.HelloReply;
30+
import io.grpc.examples.helloworld.HelloRequest;
31+
import io.grpc.stub.ServerCalls;
32+
import io.grpc.stub.StreamObserver;
33+
import java.io.IOException;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.logging.Logger;
36+
37+
/**
38+
* Server that provides a {@code Greeter} service, but that uses a pre-serialized response. This is
39+
* a performance optimization that can be useful if you read the response from on-disk or a database
40+
* where it is already serialized, or if you need to send the same complicated message to many
41+
* clients. The same approach can avoid deserializing requests, to be stored in a database. This
42+
* adjustment is server-side only; the client is unable to detect the differences, so this server is
43+
* fully-compatible with the normal {@link HelloWorldClient}.
44+
*/
45+
public class PreSerializedServer {
46+
private static final Logger logger = Logger.getLogger(PreSerializedServer.class.getName());
47+
48+
private Server server;
49+
50+
private void start() throws IOException {
51+
int port = 50051;
52+
server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
53+
.addService(new GreeterImpl())
54+
.build()
55+
.start();
56+
logger.info("Server started, listening on " + port);
57+
Runtime.getRuntime().addShutdownHook(new Thread() {
58+
@Override
59+
public void run() {
60+
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
61+
System.err.println("*** shutting down gRPC server since JVM is shutting down");
62+
try {
63+
PreSerializedServer.this.stop();
64+
} catch (InterruptedException e) {
65+
e.printStackTrace(System.err);
66+
}
67+
System.err.println("*** server shut down");
68+
}
69+
});
70+
}
71+
72+
private void stop() throws InterruptedException {
73+
if (server != null) {
74+
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
75+
}
76+
}
77+
78+
/**
79+
* Await termination on the main thread since the grpc library uses daemon threads.
80+
*/
81+
private void blockUntilShutdown() throws InterruptedException {
82+
if (server != null) {
83+
server.awaitTermination();
84+
}
85+
}
86+
87+
/**
88+
* Main launches the server from the command line.
89+
*/
90+
public static void main(String[] args) throws IOException, InterruptedException {
91+
final PreSerializedServer server = new PreSerializedServer();
92+
server.start();
93+
server.blockUntilShutdown();
94+
}
95+
96+
static class GreeterImpl implements GreeterGrpc.AsyncService, BindableService {
97+
98+
public void byteSayHello(HelloRequest req, StreamObserver<byte[]> responseObserver) {
99+
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
100+
responseObserver.onNext(reply.toByteArray());
101+
responseObserver.onCompleted();
102+
}
103+
104+
@Override
105+
public ServerServiceDefinition bindService() {
106+
MethodDescriptor<HelloRequest, HelloReply> sayHello = GreeterGrpc.getSayHelloMethod();
107+
// Modifying the method descriptor to use bytes as the response, instead of HelloReply. By
108+
// adjusting toBuilder() you can choose which of the request and response are bytes.
109+
MethodDescriptor<HelloRequest, byte[]> byteSayHello = sayHello
110+
.toBuilder(sayHello.getRequestMarshaller(), new ByteArrayMarshaller())
111+
.build();
112+
// GreeterGrpc.bindService() will bind every service method, including sayHello(). (Although
113+
// Greeter only has one method, this approach would work for any service.) AsyncService
114+
// provides a default implementation of sayHello() that returns UNIMPLEMENTED, and that
115+
// implementation will be used by bindService(). replaceMethod() will rewrite that method to
116+
// use our byte-based method instead.
117+
//
118+
// The generated bindService() uses ServerCalls to make RPC handlers. Since the generated
119+
// bindService() won't expect byte[] in the AsyncService, this uses ServerCalls directly. It
120+
// isn't as convenient, but it behaves the same as a normal RPC handler.
121+
return replaceMethod(
122+
GreeterGrpc.bindService(this),
123+
byteSayHello,
124+
ServerCalls.asyncUnaryCall(this::byteSayHello));
125+
}
126+
127+
/** Rewrites the ServerServiceDefinition replacing one method's definition. */
128+
private static <ReqT, RespT> ServerServiceDefinition replaceMethod(
129+
ServerServiceDefinition def,
130+
MethodDescriptor<ReqT, RespT> newDesc,
131+
ServerCallHandler<ReqT, RespT> newHandler) {
132+
// There are two data structures involved. The first is the "descriptor" which describes the
133+
// service and methods as a schema. This is the same on client and server. The second is the
134+
// "definition" which includes the handlers to execute methods. This is specific to the server
135+
// and is generated by "bind." This adjusts both the descriptor and definition.
136+
137+
// Descriptor
138+
ServiceDescriptor desc = def.getServiceDescriptor();
139+
ServiceDescriptor.Builder descBuilder = ServiceDescriptor.newBuilder(desc.getName())
140+
.setSchemaDescriptor(desc.getSchemaDescriptor())
141+
.addMethod(newDesc); // Add the modified method
142+
// Copy methods other than the modified one
143+
for (MethodDescriptor<?,?> md : desc.getMethods()) {
144+
if (newDesc.getFullMethodName().equals(md.getFullMethodName())) {
145+
continue;
146+
}
147+
descBuilder.addMethod(md);
148+
}
149+
150+
// Definition
151+
ServerServiceDefinition.Builder defBuilder =
152+
ServerServiceDefinition.builder(descBuilder.build())
153+
.addMethod(newDesc, newHandler); // Add the modified method
154+
// Copy methods other than the modified one
155+
for (ServerMethodDefinition<?,?> smd : def.getMethods()) {
156+
if (newDesc.getFullMethodName().equals(smd.getMethodDescriptor().getFullMethodName())) {
157+
continue;
158+
}
159+
defBuilder.addMethod(smd);
160+
}
161+
return defBuilder.build();
162+
}
163+
}
164+
}

0 commit comments

Comments
 (0)