Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,20 @@ task cancellationServer(type: CreateStartScripts) {
classpath = startScripts.classpath
}

task multiplexingServer(type: CreateStartScripts) {
mainClass = 'io.grpc.examples.multiplex.MultiplexingServer'
applicationName = 'multiplexing-server'
outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
classpath = startScripts.classpath
}

task sharingClient(type: CreateStartScripts) {
mainClass = 'io.grpc.examples.multiplex.SharingClient'
applicationName = 'sharing-client'
outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
classpath = startScripts.classpath
}

applicationDistribution.into('bin') {
from(routeGuideServer)
from(routeGuideClient)
Expand All @@ -239,5 +253,7 @@ applicationDistribution.into('bin') {
from(keepAliveClient)
from(cancellationClient)
from(cancellationServer)
from(multiplexingServer)
from(sharingClient)
fileMode = 0755
}
111 changes: 111 additions & 0 deletions examples/src/main/java/io/grpc/examples/multiplex/EchoService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.examples.multiplex;

import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.examples.echo.EchoGrpc;
import io.grpc.examples.echo.EchoRequest;
import io.grpc.examples.echo.EchoResponse;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
* Service that echoes back whatever is sent to it.
*/
public class EchoService extends EchoGrpc.EchoImplBase {
private static final Logger logger = Logger.getLogger(EchoService.class.getName());

@Override
public void unaryEcho(EchoRequest request,
StreamObserver<EchoResponse> responseObserver) {
logger.info("Received echo request: " + request.getMessage());
EchoResponse response = EchoResponse.newBuilder().setMessage(request.getMessage()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}

@Override
public void serverStreamingEcho(EchoRequest request,
StreamObserver<EchoResponse> responseObserver) {
logger.info("Received server streaming echo request: " + request.getMessage());
EchoResponse response = EchoResponse.newBuilder().setMessage(request.getMessage()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}

@Override
public StreamObserver<EchoRequest> clientStreamingEcho(
final StreamObserver<EchoResponse> responseObserver) {
return new StreamObserver<EchoRequest>() {
List<String> requestList = new ArrayList<>();

@Override
public void onNext(EchoRequest request) {
logger.info("Received client streaming echo request: " + request.getMessage());
requestList.add(request.getMessage());
}

@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "echo stream cancelled or had a problem and is no longer usable " + t.getMessage());
responseObserver.onError(t);
}

@Override
public void onCompleted() {
logger.info("Client streaming complete");
String reply = requestList.stream().collect(Collectors.joining(", "));
EchoResponse response = EchoResponse.newBuilder().setMessage(reply).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
}

@Override
public StreamObserver<EchoRequest> bidirectionalStreamingEcho(
final StreamObserver<EchoResponse> responseObserver) {
return new StreamObserver<EchoRequest>() {
@Override
public void onNext(EchoRequest request) {
logger.info("Received bidirection streaming echo request: " + request.getMessage());
EchoResponse response = EchoResponse.newBuilder().setMessage(request.getMessage()).build();
responseObserver.onNext(response);
}

@Override
public void onError(Throwable t) {
logger.log(Level.WARNING,
"echo stream cancelled or had a problem and is no longer usable " + t.getMessage());
responseObserver.onError(t);
}

@Override
public void onCompleted() {
logger.info("Bidirectional stream completed from client side");
responseObserver.onCompleted();
}
};
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.examples.multiplex;

import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.examples.echo.EchoGrpc;
import io.grpc.examples.echo.EchoRequest;
import io.grpc.examples.echo.EchoResponse;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
* A sample gRPC server that serves both the Greeting and Echo services.
*/
public class MultiplexingServer {

private static final Logger logger = Logger.getLogger(MultiplexingServer.class.getName());

private final int port;
private Server server;

public MultiplexingServer(int port) throws IOException {
this.port = port;
}

private void start() throws IOException {
/* The port on which the server should run */
server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
.addService(new MultiplexingServer.GreeterImpl())
.addService(new EchoService())
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
MultiplexingServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}

private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}

/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

/**
* Main launches the server from the command line.
*/
public static void main(String[] args) throws IOException, InterruptedException {
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tH:%1$tM:%1$tS %4$s %2$s: %5$s%6$s%n");

final MultiplexingServer server = new MultiplexingServer(50051);
server.start();
server.blockUntilShutdown();
}

static class GreeterImpl extends GreeterGrpc.GreeterImplBase {

@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
logger.info("Received sayHello request: " + req.getName());
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}
Loading