GRPC Tutorial
GRPC Tutorial
GRPC Tutorial
OVERVIEW ................................................................................................................................ 3
AUDIENCE................................................................................................................................. 3
PREREQUISITE............................................................................................................................ 3
.PROTO FILE............................................................................................................................. 13
. SETTING UP GRPC SERVER........................................................................................................ 15
. SETTING UP GRPC CLIENT ........................................................................................................ 18
. CLIENT SERVER CALL ................................................................................................................ 20
.PROTO FILE............................................................................................................................. 22
. SETTING UP GRPC SERVER........................................................................................................ 24
. SETTING UP GRPC CLIENT ........................................................................................................ 25
. CLIENT SERVER CALL ................................................................................................................ 26
.PROTO FILE............................................................................................................................. 28
. SETTING UP GRPC SERVER........................................................................................................ 30
. SETTING UP GRPC CLIENT ........................................................................................................ 34
. CLIENT SERVER CALL ................................................................................................................ 36
.PROTO FILE............................................................................................................................. 38
. SETTING UP GRPC SERVER........................................................................................................ 40
1
. SETTING UP GRPC CLIENT ........................................................................................................ 44
. CLIENT SERVER CALL ................................................................................................................ 47
.PROTO FILE............................................................................................................................. 49
. SETTING UP GRPC SERVER........................................................................................................ 51
. SETTING UP GRPC CLIENT ........................................................................................................ 56
. CLIENT SERVER CALL ................................................................................................................ 60
.PROTO FILE............................................................................................................................. 63
. SETTING UP GRPC SERVER........................................................................................................ 65
. SETTING UP GRPC CLIENT ........................................................................................................ 70
. CLIENT SERVER CALL ................................................................................................................ 74
2
Overview
gRPC is a framework from Google. It provides an efficient and
language-independent way to make Remote Procedure Calls. It supports
remote procedure calls from languages like Java, Python, Go, Dart, etc. It is
heavily used for creating efficient remote procedure calls across industries by
various companies.
The major use-case for gRPC is the making of remote procedure calls
performant while ensuring language independence. Remote procedure calls
play an important role in microservices/distributed environment where a lot of
data is transferred across services. That is why, it becomes a very useful
framework in developing applications that require high scalability and
performance.
Audience
This tutorial deep dives into various components that make gRPC a very useful
library. It is directed towards software professionals who want to develop highly
scalable and performant applications. Post this tutorial, you would have an
intermediate knowledge of gRPC and its usage.
Prerequisite
To learn from this tutorial, you need to have a good hold over Java or Python
and a basic knowledge of data structure is preferable.
3
gRPC – Introduction
Before we jump into gRPC, let us have a brief background about remote
procedure calls which is what gRPC does.
Why gRPC?
Google Remote Procedure Calls (gRPC) provides a framework to perform the
remote procedure calls. But there are some other libraries and mechanisms to
execute code on remote machine. So, what makes gRPC special? Let's find
out.
4
Efficient Data Compaction: In microservice environment, given that
multiple communications take place over a network, it is critical that the
data that we are sending is as succinct as possible. We need to avoid
any superfluous data to ensure that the data is quickly transferred.
Given that gRPC uses Google Protocol Buffer internally, it has this
feature to its advantage.
Language
Yes Yes
independent
HTTP version HTTP/2 HTTP 1.1
5
gRPC – Setup
Protoc Setup
Note that the setup is required only for Python. For Java, all of this is handled
by Maven file. Let us install the "proto" binary which we will use to
auto-generate the code of our ".proto" files. The binaries can be found at
https://github.com/protocolbuffers/protobuf/releases/. Choose the correct
binary based on the OS. We will install the proto binary on Windows, but the
steps are not very different for Linux.
Once installed, ensure that you are able to access it via command line:
protoc --version
libprotoc 3.15.6
It means that Protobuf is correctly installed. Now, let us move to the Project
Structure.
We also need to setup the plugin required for gRPC code generation.
It will install all the required binaries and add them to the path.
Project Structure
Here is the overall project structure that we would have:
6
The code related to individual languages go into their respective directories.
We will have a separate directory to store our proto files. And, here is the
project structure that we would be having for Java:
7
Project Dependency
Now that we have installed protoc, we can auto-generate the code from the
proto files using protoc. Let us first create a Java project.
Following is the Maven configuration that we will use for our Java project. Note
that it contains the required library for Protobuf as well.
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.38.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
8
<artifactId>grpc-protobuf</artifactId>
<version>1.38.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.38.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-
simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
9
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.1</version>
<executions>
<execution>
<id>test</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/target/generated-
sources</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
10
<protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.d
etected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-
java:1.38.0:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>../common_proto_files</protoSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<configuration>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
11
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
12
gRPC – Hello World App with Java
Let us now create a basic "Hello World" like app that will use gRPC along with
Java.
.proto file
First let us define the "greeting.proto" file in common_proto_files:
syntax = "proto3";
service Greeter {
rpc greet (ClientInput) returns (ServerOutput) {}
}
message ClientInput {
string greeting = 1;
string name = 2;
}
message ServerOutput {
string message = 1;
}
Let us now take a closer look at this code block and see the role of each line:
syntax = "proto3";
The "syntax" here represents the version of Protobuf that we are using. So, we
are using the latest version 3 and the schema thus can use all the syntax which
is valid for version 3.
13
package tutorial;
The package here is used for conflict resolution if, say, we have multiple
classes/members with the same name.
This argument is specific to Java, i.e., the package where to auto-generate the
code from the ".proto" file.
service Greeter {
rpc greet(ClientInput) returns (ServerOutput) {}
}
This block represents the name of the service "Greeter" and the function name
"greet" which can be called. The "greet" function takes in the input of type
"ClientInput" and returns the output of type "ServerOutput". Now let us look
at these types.
message ClientInput {
string greeting = 1;
string name = 2;
}
In the above block, we have defined the ClientInput which contains two
attributes, "greeting" and the "name", both of them being strings. The client
is supposed to send the object of type "ClientInput" to the server.
message ServerOutput {
string message = 1;
}
In the above block, we have defined that, given a "ClientInput", the server
would return the "ServerOutput" which will contain a single attribute "message".
The server is supposed to send the object of type "ServerOutput" to the client.
Note that we already had the Maven setup done to auto-generating our class
files as well as our RPC code. So, now we can simply compile our project:
14
This should auto-generate the source code required for us to use gRPC. The
source code would be placed under:
Let us write our server code to serve the above function and save it in
com.tp.grpc.GreetServer.java:
package com.tp.grpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ClientInput;
import com.tp.greeting.Greeting.ServerOutput;
15
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl()).build().start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
@Override
public void greet(ClientInput req,
StreamObserver<ServerOutput> responseObserver) {
logger.info("Got request from client: " + req);
ServerOutput reply =
ServerOutput.newBuilder().setMessage("Server says " +
"\"" + req.getGreeting() + " " + req.getName()
+ "\"").build();
responseObserver.onNext(reply);
16
responseObserver.onCompleted();
}
}
The above code starts a gRPC server at a specified port and serves the
functions and services which we had written in our proto file. Let us walk
through the above code:
2. But before starting the server, we assign the server the service which
we want to run, i.e., in our case, the Greeter service.
3. For this purpose, we need to pass the service instance to the server,
so we go ahead and create a service instance, i.e., in our case, the
GreeterImpl.
6. The method works on the above input, does the computation, and then
is supposed to return the mentioned output in the ".proto" file, i.e., in
our case, the ServerOutput.
17
. Setting up gRPC client
Now that we have written the code for the server, let us setup a client which
can call these functions.
Let us write our client code to call the above function and save it in
com.tp.grpc.GreetClient.java:
package com.tp.grpc;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
18
ClientInput request =
ClientInput.newBuilder().setName(username).setGreeting(greeti
ng).build();
ServerOutput response;
try {
response = blockingStub.greet(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Got following from the server: " +
response.getMessage());
}
ManagedChannel channel =
ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
GreetClient client = new GreetClient(channel);
client.makeGreeting(greeting, username);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
19
}
}
}
The above code starts a gRPC server at a specified port and serves the
functions and services which we had written in our proto file. Let us walk
through the above code:
1. Starting from main method, we accept two arguments, i.e., name and
the greeting.
4. Then, we simply create the expected input defined in the ".proto" file,
i.e., in our case, the ClientInput.
5. We ultimately make the call and await result from the server.
For running the code, fire up two shells. Start the server on the first shell by
executing the following command:
20
Now, let us start the client.
And now, if we open the server logs, we will get to see the following:
So, the client was able to call the server as expected and the server responded
with greeting the client back.
21
gRPC – Hello World App with Python
Let us now create a basic "Hello World" like app that will use gRPC along with
Python.
.proto file
First let us define the greeting.proto file in common_proto_files:
syntax = "proto3";
service Greeter {
rpc greet (ClientInput) returns (ServerOutput) {}
}
message ClientInput {
string greeting = 1;
string name = 2;
}
message ServerOutput {
string message = 1;
}
Let us now take a closer look at each of the lines in the above block:
syntax = "proto3";
The "syntax" here represents the version of Protobuf we are using. So, we are
using the latest version 3 and the schema thus can use all the syntax which is
valid for version 3.
package tutorial;
22
The package here is used for conflict resolution if, say, we have multiple
classes/members with the same name.
service Greeter {
rpc greet(ClientInput) returns (ServerOutput) {}
}
This block represents the name of the service "Greeter" and the function name
"greet" which can be called. The "greet" function takes in the input of type
"ClientInput" and returns the output of type "ServerOutput". Now let us look
at these types.
message ClientInput {
string greeting = 1;
string name = 2;
}
In the above block, we have defined the ClientInput which contains two
attributes, "greeting" and the "name" both of them being strings. The client is
supposed to send the object of type of "ClientInput" to the server.
message ServerOutput {
string message = 1;
}
Here, we have also defined that, given a "ClientInput", the server would
return the "ServerOutput" with a single attribute "message". The server is
supposed to send the object of type "ServerOutput" to the client.
Now, let us generate the underlying code for the Protobuf classes and the
gRPC classes. For doing that, we need to execute the following command:
However, note that to execute the command, we need to install the correct
dependency as mentioned in the setup section of the tutorial.
23
This should auto-generate the source code required for us to use gRPC. The
source code would be placed under:
Let us write our server code to serve the above function and save it in
server.py:
import grpc
import greeting_pb2
import greeting_pb2_grpc
class Greeter(greeting_pb2_grpc.GreeterServicer):
def server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
greeting_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
print("gRPC starting")
server.start()
24
server.wait_for_termination()
server()
The above code starts a gRPC server at a specified port and serves the
functions and services which we had written in our proto file. Let us walk
through the above code:
2. But before starting the server, we assign the server the service which
we want to run, i.e., in our case, the Greeter service.
3. For this purpose, we need to pass the service instance to the server,
so we go ahead and create a service instance, i.e., in our case, the
Greeter.
5. The method expects the an object of type as defined in the .proto file,
i.e., for us, the request.
6. The method works on the above input, does the computation, and then
is supposed to return the mentioned output in the .proto file, i.e., in our
case, the ServerOutput.
Let us write our client code to call the above function and save it in client.py:
import grpc
import greeting_pb2
import greeting_pb2_grpc
25
def run():
with grpc.insecure_channel('localhost:50051') as channel:
stub = greeting_pb2_grpc.GreeterStub(channel)
response = stub.greet(greeting_pb2.ClientInput(name='John',
greeting = "Yo"))
print("Greeter client received following from server: " +
response.message)
run()
The above code starts a gRPC server at a specified port and serves the
functions and services which we had written in our proto file. Let us walk
through the above code:
1. Starting from the main method, we have setup a Channel for gRPC
communication with our server.
2. And then, we create a stub using the channel. This is where we use
the service "Greeter" whose functions we plan to call. A stub is nothing
but a wrapper which hides the complexity of the remote call from the
caller.
3. Then, we simply create the expected input defined in the proto file, i.e.,
in our case, the ClientInput. We have hard-coded two arguments,
i.e., name and the greeting.
4. We ultimately make the call and await the result from the server.
For running the code, fire up two shells. Start the server on the first shell by
executing the following command:
python .\server.py
26
We would get the following output:
gRPC starting
python .\client.py
And now, if we open the server logs, we will get to see the following data:
gRPC starting
Got request greeting: "Yo"
name: "John"
So, as we see, the client was able to call the server as expected and the server
responded with greeting the client back.
27
gRPC – Unary gRPC
We will now look at various types of communication that the gRPC framework
supports. We will use an example of Bookstore where the client can search
and place an order for book delivery.
Let us see unary gRPC communication where we let the client search for a
title and return randomly one of the book matching the title queried for.
.proto file
First let us define the bookstore.proto file in common_proto_files:
syntax = "proto3";
service BookStore {
rpc first (BookSearch) returns (Book) {}
}
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
28
Let us now take a closer look at each of the lines in the above block.
syntax = "proto3";
The "syntax" here represents the version of Protobuf we are using. We are
using the latest version 3 and the schema thus can use all the syntax which is
valid for version 3.
package tutorial;
The package here is used for conflict resolution if, say, we have multiple
classes/members with the same name.
This argument is specific to Java, i.e., the package where to auto-generate the
code from the .proto file.
service BookStore {
rpc first (BookSearch) returns (Book) {}
}
This represents the name of the service "BookStore" and the function name
"first" which can be called. The "first" function takes in the input of type
"BookSearch" and returns the output of type "Book". So, effectively, we let the
client search for a title and return one of the book matching the title queried for.
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
In the above block, we have defined the BookSearch which contains the
attributes like name, author, and genre. The client is supposed to send the
object of type of "BookSearch" to the server.
message Book {
string name = 1;
29
string author = 2;
int32 price = 3;
}
Here, we have also defined that, given a "BookSearch", the server would return
the "Book" which contains book attributes along with the price of the book.
The server is supposed to send the object of type of "Book" to the client.
Note that we already had the Maven setup done for auto-generating our class
files as well as our RPC code. So, now we can simply compile our project:
This should auto-generate the source code required for us to use gRPC. The
source code would be placed under:
Let us write our server code to serve the above function and save it in
com.tp.bookstore.BookeStoreServerUnary.java:
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
30
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
31
private Server server;
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
32
@Override
public void first(BookSearch searchQuery,
StreamObserver<Book> responseObserver) {
logger.info("Searching for book with title: " +
searchQuery.getName());
List<String> matchingBookTitles =
bookMap.keySet().stream()
.filter(title ->
title.startsWith(searchQuery.getName().trim()))
.collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
foundBook =
bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
The above code starts a gRPC server at a specified port and serves the
functions and services which we had written in our proto file. Let us walk
through the above code:
2. But before starting the server, we assign the server the service which
we want to run, i.e., in our case, the BookStore service.
3. For this purpose, we need to pass the service instance to the server,
so we go ahead and create a service instance, i.e., in our case, the
BookStoreImpl
33
4. The service instance need to provide an implementation of the
method/function which is present in the .proto file, i.e., in our case, the
first method.
5. The method expects an object of type as defined in the .proto file, i.e.,
for us the BookSearch
6. The method searches for the book in the available bookMap and then
returns the Book by calling the onNext() method. Once done, the server
announces that it is done with the output by calling onCompleted()
Let us write our client code to call the above function and save it in
com.tp.bookstore.BookStoreClientUnaryBlocking.java:
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
34
private static final Logger logger =
Logger.getLogger(BookStoreClientUnaryBlocking.class.getName());
Book response;
try {
response = blockingStub.first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}",
e.getStatus());
return;
}
logger.info("Got following book from server: " +
response);
}
ManagedChannel channel =
ManagedChannelBuilder.forTarget(serverAddress)
35
.usePlaintext()
.build();
try {
BookStoreClientUnaryBlocking client = new
BookStoreClientUnaryBlocking(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5,
TimeUnit.SECONDS);
}
}
}
The above code starts a gRPC server at a specified port and serves the
functions and services which we had written in our proto file. Let us walk
through the above code:
1. Starting from the main method, we accept one argument, i.e., the title
of the book we want to search for.
3. And then, we create a blocking stub using the channel. This is where
we choose the service "BookStore" whose functions we plan to call. A
"stub" is nothing but a wrapper which hides the complexity of the remote
call from the caller.
4. Then, we simply create the expected input defined in the .proto file,
i.e., in our case BookSearch and we add the title name we want the
server to search for.
5. We ultimately make the call and await the result from the server.
36
Start the gRPC server.
The Client queries the Server for a book with a given name/title.
The Server searches the book in its store.
The Server then responds with the book and its other attributes.
Now, that we have defined our proto file, written our server and the client code,
let us proceed to execute this code and see things in action.
For running the code, fire up two shells. Start the server on the first shell by
executing the following command:
So, as we see, the client was able to get the book details by querying the server
with the name of the book.
37
gRPC – Server Streaming RPC
Let us now discuss how server streaming works while using gRPC
communication. In this case, the client will search for books with a given author.
Assume the server requires some time to go through all the books. Instead of
waiting to give all the books after going through all the books, the server instead
would provide books in a streaming fashion, i.e., as soon as it finds one.
.proto file
First let us define the bookstore.proto file in common_proto_files:
syntax = "proto3";
service BookStore {
rpc searchByAuthor (BookSearch) returns (stream Book) {}
}
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
The following block represents the name of the service "BookStore" and the
function name "searchByAuthor" which can be called. The "searchByAuthor"
38
function takes in the input of type "BookSearch" and returns the stream of type
"Book". So, effectively, we let the client search for a title and return one of the
book matching the author queried for.
service BookStore {
rpc searchByAuthor (BookSearch) returns (stream Book) {}
}
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
Here, we have defined BookSearch which contains a few attributes like name,
author and genre. The client is supposed to send the object of type
"BookSearch" to the server.
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
We have also defined that, given a "BookSearch", the server would return a
stream of "Book" which contains the book attributes along with the price of the
book. The server is supposed to send a stream of "Book".
Note that we already had the Maven setup done for auto-generating our class
files as well as our RPC code. So, now we can simply compile our project:
This should auto-generate the source code required for us to use gRPC. The
source code would be placed under:
39
Protobuf gRPC code: target/generated-sources/protobuf/grpc-
java/com.tp.bookstore
Let us write our server code to serve the above function and save it in
com.tp.bookstore.BookeStoreServerStreaming.java:
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
40
bookMap.put("Great Gatsby",
Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird",
Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India",
Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise",
Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman",
Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
41
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
@Override
public void searchByAuthor(BookSearch searchQuery,
StreamObserver<Book> responseObserver) {
logger.info("Searching for book with author: " +
searchQuery.getAuthor());
for (Entry<String, Book> bookEntry :
bookMap.entrySet()) {
try {
logger.info("Going through more
books....");
Thread.sleep(5000);
42
} catch (InterruptedException e) {
e.printStackTrace();
}
if(bookEntry.getValue().getAuthor().startsWith(searchQuery.ge
tAuthor())){
logger.info("Found book with required
author: " + bookEntry.getValue().getName()
+ ". Sending....");
responseObserver.onNext(bookEntry.getValue());
}
}
responseObserver.onCompleted();
}
}
}
The above code starts a gRPC server at a specified port and serves the
functions and services which we had written in our proto file. Let us walk
through the above code:
2. But before starting the server, we assign the server the service which
we want to run, i.e., in our case, the BookStore service.
3. For this purpose, we need to pass the service instance to the server,
so we go ahead and create a service instance, i.e., in our case, the
BookStoreImpl
5. The method expects an object of type as defined in the .proto file, i.e.,
the BookSearch.
43
6. The method searches for the book in the available bookMap and then
returns a stream of Books.
8. When the server is done with the request, it shuts down the channel by
calling onCompleted().
Let us write our client code to call the above function and save it in
com.tp.bookstore.BookStoreClientServerStreamingBlocking.java:
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
44
public class BookStoreClientServerStreamingBlocking {
private static final Logger logger =
Logger.getLogger(BookStoreClientServerStreamingBlocking.class
.getName());
public BookStoreClientServerStreamingBlocking(Channel
channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
Iterator<Book> response;
try {
response = blockingStub.searchByAuthor(request);
while(response.hasNext()) {
logger.info("Found book: " + response.next());
}
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}",
e.getStatus());
return;
}
}
45
public static void main(String[] args) throws Exception {
String authorName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel =
ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientServerStreamingBlocking client = new
BookStoreClientServerStreamingBlocking(channel);
client.getBook(authorName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
The above code starts a gRPC server at a specified port and serves the
functions and services which we had written in our proto file. Let us walk
through the above code:
1. Starting from the main method, we accept one argument, i.e., the title
of the book we want to search for.
3. And then, we create a blocking stub using the channel. This is where
we choose the service "BookStore" whose functions we plan to call.
4. Then, we simply create the expected input defined in the .proto file,
i.e., in our case, BookSearch, and we add the title that we want the
server to search for.
5. We ultimately make the call and get an iterator on valid Books. When
we iterate, we get the corresponding Books made available by the
Server.
46
6. Finally, we close the channel to avoid any resource leak.
The Client queries the Server for a book with a given author.
The Server responds whenever it finds a book with the given criteria.
The Server does not wait for all the valid books to be available. It sends
the output as soon as it finds one. And then repeats the process.
Now, that we have defined our proto file, written our server and the client code,
let us proceed and execute this code and see things in action.
For running the code, fire up two shells. Start the server on the first shell by
executing the following command:
47
Jul 03, 2021 10:40:31 PM
com.tp.bookstore.BookStoreClientServerStreamingBlocking
getBook
INFO: Querying for book with author: Har
So, as we see, the client was able to get the book details by querying the server
with the name of the book. But more importantly, the client got the 1st book
and the 2nd book at different timestamps, i.e., a gap of almost 5 seconds.
48
gRPC – Client Streaming RPC
Let us see now see how client streaming works while using gRPC
communication. In this case, the client will search and add books to the cart.
Once the client is done adding all the books, the server would provide the
checkout cart value to the client.
.proto file
First let us define the bookstore.proto file in common_proto_files:
syntax = "proto3";
service BookStore {
rpc totalCartValue (stream Book) returns (Cart) {}
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
message Cart {
int32 books = 1;
int32 price = 2;
}
Here, the following block represents the name of the service "BookStore" and
the function name "totalCartValue" which can be called. The
"totalCartValue" function takes in the input of type "Book" which is a stream.
And the function returns an object of type "Cart". So, effectively, we let the
49
client add books in a streaming fashion and once the client is done, the server
provides the total cart value to the client.
service BookStore {
rpc totalCartValue (stream Book) returns (Cart) {}
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
The client would send in the "Book" it wants to buy. It may not be the complete
book info; it can simply be the title of the book.
message Cart {
int32 books = 1;
int32 price = 2;
}
The server, on getting the list of books, would return the "Cart" object which is
nothing but the total number of books the client has purchased and the total
price.
Note that we already had the Maven setup done for auto-generating our class
files as well as our RPC code. So, now we can simply compile our project:
This should auto-generate the source code required for us to use gRPC. The
source code would be placed under:
50
. Setting up gRPC server
Now that we have defined the proto file which contains the function definition,
let us setup a server which can serve call these functions.
Let us write our server code to serve the above function and save it in
com.tp.bookstore.BookeStoreServerClientStreaming.java:
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
51
bookMap.put("Great Gatsby",
Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird",
Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India",
Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise",
Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman",
Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
52
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
@Override
public StreamObserver<Book>
totalCartValue(StreamObserver<Cart> responseObserver) {
@Override
public void onNext(Book book) {
53
logger.info("Searching for book with title
starting with: " + book.getName());
if(bookEntry.getValue().getName().startsWith(book.getName()))
{
logger.info("Found book, adding to
cart:....");
bookCart.add(bookEntry.getValue());
}
}
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream:
" + t);
}
@Override
public void onCompleted() {
int cartValue = 0;
responseObserver.onNext(Cart.newBuilder()
.setPrice(cartValue)
.setBooks(bookCart.size()).build());
54
responseObserver.onCompleted();
}
};
}
}
}
The above code starts a gRPC server at a specified port and serves the
functions and services which we had written in our proto file. Let us walk
through the above code:
2. But before starting the server, we assign the server the service which
we want to run, i.e., in our case, the BookStore service.
3. For this purpose, we need to pass in the service instance to the server,
so we go ahead and create a service instance, i.e., in our case, the
BookStoreImpl
5. Now, given that this is the case of client streaming, the server will get a
list of Book (defined in the proto file) as the client adds them. The
server thus returns a custom stream observer. This stream observer
implements what happens when a new Book is found and what
happens when the stream is closed.
7. When the client is done with the addition of Books, the stream
observer's onCompleted() method is called. This method implements
what the server wants to send when the client is done adding Book, i.e.,
it returns the Cart object to the client.
55
8. Finally, we also have a shutdown hook to ensure clean shutting down
of the server when we are done executing our code.
Let us write our client code to call the above function and save it in
com.tp.bookstore.BookStoreClientStreamingClient.java:
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
56
private static final Logger logger =
Logger.getLogger(BookStoreClientStreamingClient.class.getName
());
StreamObserver<Book> streamClientSender;
@Override
public void onNext(Cart cart) {
logger.info("Order summary:" +
"\nTotal number of Books:" + cart.getBooks()+
"\nTotal Order Value:" + cart.getPrice());
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading response from
Server: " + t);
}
@Override
public void onCompleted() {
//logger.info("Server: Done reading orderreading cart");
57
serverResponseCompleted = true;
}
};
return observer;
}
if(streamClientSender == null) {
streamClientSender =
stub.totalCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
58
String serverAddress = "localhost:50051";
ManagedChannel channel =
ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientStreamingClient client = new
BookStoreClientStreamingClient(channel);
String bookName = "";
while(true) {
System.out.println("Type book name to be added to
the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
client.addBook(bookName);
}
while(client.serverResponseCompleted == false) {
Thread.sleep(2000);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
59
The above code starts a gRPC client and connects to a server at a specified
port and call the service and functions which we had written in our proto file.
Let us walk through the above code:
1. Starting from the main method, we accept the name of the books to be
added to the cart. Once all the books are to added, the user is expected
to print "EXIT".
4. Then, we simply create the expected input defined in the proto file, i.e.,
in our case, Book, and we add the title name we want the server to add.
5. But given this is the case of client streaming, we first create a stream
observer for the server. This server stream observer lists the behavior
on what needs to be done when the server responds, i.e., onNext()
and onCompleted()
6. And using the stub, we also get the client stream observer. We use this
stream observer for sending the data, i.e., Book, to be added to the cart.
We ultimately, make the call and get an iterator on valid Books. When
we iterate, we get the corresponding Books made available by the
Server.
7. And once our order is complete, we ensure that the client stream
observer is closed. It tells the server to calculate the Cart Value and
provide that as an output
The Server searches the book in its store and adds them to the cart.
60
When the client is done ordering, the Server responds the total cart
value of the client.
Now that we have defined our proto file, written our server and the client code,
let us proceed to execute this code and see things in action.
For running the code, fire up two shells. Start the server on the first shell by
executing the following command:
61
Once we have added the books and we input "EXIT", the server then calculates
the cart value and here is the output we get:
EXIT
Jul 24, 2021 5:53:33 PM
com.tp.bookstore.BookStoreClientStreamingClient completeOrder
INFO: Done, waiting for server to create order summary...
Jul 24, 2021 5:53:33 PM
com.tp.bookstore.BookStoreClientStreamingClient$1 onNext
INFO: Order summary:
Total number of Books: 2
Total Order Value: 800
So, as we can see, the client was able to add books. And once all the books
were added, the server responds with the total number of books and the total
price.
62
gRPC – Bidirectional RPC
Let us see now see how the client-server streaming works while using gRPC
communication. In this case, the client will search and add books to the cart.
The server would respond with live cart value every time a book is added.
.proto file
First let us define the bookstore.proto file in common_proto_files:
syntax = "proto3";
service BookStore {
rpc liveCartValue (stream Book) returns (stream Cart) {}
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
message Cart {
int32 books = 1;
int32 price = 2;
}
The following block represents the name of the service "BookStore" and the
function name "liveCartValue" which can be called. The "liveCartValue"
function takes in the input of type "Book" which is a stream. And the function
returns a stream of object of type "Cart". So, effectively, we let the client add
63
books in a streaming fashion and whenever a new book is added, the server
responds the current cart value to the client.
service BookStore {
rpc liveCartValue (stream Book) returns (stream Cart) {}
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
The client would send in the "Book" it wants to buy. It does not have to be the
complete book info; it can simply be title of the book.
message Cart {
int32 books = 1;
int32 price = 2;
}
The server, on getting the list of books, would return the "Cart" object which is
nothing but the total number of books the client has purchased and the total
price.
Note that we already had the Maven setup done for auto-generating our class
files as well as our RPC code. So, now we can simply compile our project:
This should auto-generate the source code required for us to use gRPC. The
source code would be placed under:
64
. Setting up gRPC server
Now that we have defined the proto file which contains the function definition,
let us setup a server which can call these functions.
Let us write our server code to serve the above function and save it in
com.tp.bookstore.BookeStoreServerBothStreaming.java:
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
65
bookMap.put("Great Gatsby",
Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird",
Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India",
Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise",
Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman",
Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
66
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
@Override
public StreamObserver<Book>
liveCartValue(StreamObserver<Cart> responseObserver) {
@Override
public void onNext(Book book) {
67
logger.info("Searching for book with title
starting with: " + book.getName());
if(bookEntry.getValue().getName().startsWith(book.getName()))
{
logger.info("Found book, adding to
cart:....");
bookCart.add(bookEntry.getValue());
cartValue +=
bookEntry.getValue().getPrice();
}
}
responseObserver.onNext(Cart.newBuilder()
.setPrice(cartValue)
.setBooks(bookCart.size()).build());
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream: " + t);
}
@Override
public void onCompleted() {
logger.info("Order completed");
responseObserver.onCompleted();
68
}
};
}
}
}
The above code starts a gRPC server at a specified port and serves the
functions and services which we had written in our proto file. Let us walk
through the above code:
2. But before starting the server, we assign the server the service which
we want to run, i.e., in our case, the BookStore service.
3. For this purpose, we need to pass in the service instance to the server,
so we go ahead and create service instance i.e. in our case the
BookStoreImpl
5. Now, given that this is the case of server and client streaming, the
server will get the list of Books (defined in the proto file) as the client
adds them. The server thus returns a custom stream observer. This
stream observer implements what happens when a new Book is found
and what happens when the stream is closed.
7. When the client is done with the addition of Books, the stream
observer's onCompleted() method is called. This method implements
what the server wants to do when the client is done adding the Books,
i.e., claim it is done with taking the client order.
69
. Setting up gRPC client
Now that we have written the code for the server, let us setup a client which
can call these functions.
Let us write our client code to call the above function and save it in
com.tp.bookstore.BookStoreClientBothStreaming.java:
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
70
private final BookStoreStub stub;
StreamObserver<Book> streamClientSender;
serverIntermediateResponseCompleted = true;
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading response from
Server: " + t);
}
@Override
public void onCompleted() {
//logger.info("Server: Done reading orderreading
cart");
71
serverResponseCompleted = true;
}
};
return observer;
}
if(streamClientSender == null) {
streamClientSender =
stub.liveCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
72
public static void main(String[] args) throws Exception {
String serverAddress = "localhost:50051";
ManagedChannel channel =
ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientBothStreaming client = new
BookStoreClientBothStreaming(channel);
String bookName = "";
while(true) {
if(client.serverIntermediateResponseCompleted ==
true) {
System.out.println("Type book name to be
added to the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
client.serverIntermediateResponseCompleted = false;
client.addBook(bookName);
Thread.sleep(500);
}
}
while(client.serverResponseCompleted == false) {
Thread.sleep(2000);
}
73
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
The above code starts a gRPC client and connects to a server at a specified
port and call the functions and services which we had written in our proto file.
Let us walk through the above code:
1. Starting from the main method, we accept the name of the books to be
added to the cart. Once all the books are to be added, the user is
expected to print "EXIT".
4. Then, we simply create the expected input defined in the proto file, i.e.,
in our case, Book, and we add the title we want the server to add.
5. But given that this is the case of both server and client streaming, we
first create a stream observer for the server. This server stream
observer lists the behavior on what needs to be done when the server
responds, i.e., onNext() and onCompleted().
6. And using the stub, we also get the client stream observer. We use this
stream observer for sending the data, i.e., the Book to be added to the cart.
7. And once our order is complete, we ensure that the client stream
observer is closed. This tells the server to close the stream and perform
the cleanup.
74
Start the gRPC server.
The Server searches the book in its store and adds them to the cart.
With each book addition, the server tells the client about the cart value.
When the client is done ordering, both the server and the client close
the stream.
Now that we have defined our proto file, written our server and the client code,
let us now execute this code and see things in action.
For running the code, fire up two shells. Start the server on the first shell by
executing the following command:
75
Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreClientBothStreaming$1 onNext
INFO: Order summary:
Total number of Books: 1
Total Order Value: 300
So, as we can see, we get the current cart value of the order. Let us now add
one more book to our client.
Once we have added the books and we input "EXIT", the client shuts down.
So, as we can see the client was able to add books. And as the books are
being added, the server responds with the current cart value.
76
gRPC – Client Calls
gRPC client supports two types of client calls, i.e., how the client calls the
server. Following are the two ways:
Note that a blocking client call is possible for unary calls and server streaming
calls.
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
77
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
Book response;
try {
response = blockingStub.first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Got following book from server: " + response);
}
78
String serverAddress = "localhost:50051";
ManagedChannel channel =
ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientUnaryBlocking client = new
BookStoreClientUnaryBlocking(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
BookSearch request =
BookSearch.newBuilder().setName(bookName).build();
Book response;
response = blockingStub.first(request);
This is where we use the blockingStub to call the RPC method first() to get
the book details.
79
Similarly, for server streaming, we can use the blocking stub:
Iterator<Book> response;
try {
response = blockingStub.searchByAuthor(request);
while(response.hasNext()) {
logger.info("Found book: " + response.next());
}
Where we call the RPC method searchByAuthor method and iterate over the
response till the server stream has not ended.
Note that a non-blocking client call is possible for unary calls as well as
streaming calls. However, we would specifically look at the case of server
streaming call to compare it against a blocking call.
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
80
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public BookStoreClientServerStreamingNonBlocking(Channel
channel) {
nonBlockingStub = BookStoreGrpc.newStub(channel);
}
@Override
public void onNext(Book book) {
logger.info("Server returned following book: " +
book);
}
81
@Override
public void onError(Throwable t) {
logger.info("Error while reading response from
Server: " + t);
}
@Override
public void onCompleted() {
}
};
return observer;
}
try {
nonBlockingStub.searchByAuthor(request,
getServerResponseObserver());
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}",
e.getStatus());
return;
}
}
82
ManagedChannel channel =
ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientServerStreamingNonBlocking client = new
BookStoreClientServerStreamingNonBlocking(channel);
client.getBook(authorName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
It defines that the stub is a non-blocking one. Similarly, the following code is
used to handle the response that we get from the server. Once the server
sends the response, we log the output.
83
The following gRPC call is a non-blocking call.
try {
nonBlockingStub.searchByAuthor(request,
getServerResponseObserver());
}
This is how we ensure that our client does not need to wait till we have the
server complete the execution of searchByAuthor. That would be handled
directly by the stream observer object as and when the server returns the Book
objects.
84
gRPC – Timeouts and Cancellation
Request Timeout
gRPC supports specifying a timeout for both client as well as the server.
The Client can specify during runtime the amount of time it wants to
wait before cancelling the request.
The Server can also check on its end whether the requests need to be
catered to or has the Client already given up on the request.
Let us take an example where the client expects a response in 2 seconds, but
the server takes a longer time. So, here is our server code:
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
85
public class BookeStoreServerUnaryTimeout {
private static final Logger logger =
Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName()
);
86
.addService(new BookStoreImpl()).build().start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
@Override
public void first(BookSearch searchQuery,
StreamObserver<Book> responseObserver) {
87
logger.info("Searching for book with title: " +
searchQuery.getName());
logger.info("This may take more time...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<String> matchingBookTitles =
bookMap.keySet().stream()
.filter(title ->
title.startsWith(searchQuery.getName().trim()))
.collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
foundBook = bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
In the above code, the server searches for the book the title of which the client
has provided. We added a dummy sleep so that we can see the request getting
timed out.
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
88
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
Book response;
try {
response = blockingStub.withDeadlineAfter(2,
TimeUnit.SECONDS).first(request);
89
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}",
e.getStatus());
return;
}
logger.info("Got following book from server: " +
response);
}
ManagedChannel channel =
ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientUnaryBlockingTimeout client = new
BookStoreClientUnaryBlockingTimeout(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5,
TimeUnit.SECONDS);
}
}
}
The above code calls the server with a title to search for. But more
importantly, it provides a timeout of 2 seconds to the gRPC call.
Let us now see this in action. For running the code, fire up two shells. Start the
server on the first shell by executing the following command:
90
java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerUnaryTimeout
So, as we can see, the client did not get the response in 2 seconds, hence it
cancelled the request and termed it as a timeout, i.e., DEADLINE_EXCEEDED.
91
Request Cancellation
gRPC supports canceling of requests both from the client as well as the server
side. The Client can specify during runtime the amount of time it wants to wait
before cancelling the request. The Server can also check on its end whether
the requests need to be catered to or has the client already given up on the
request.
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
92
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby",
Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird",
Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India",
Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise",
Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman",
Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
93
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
@Override
public StreamObserver<Book>
totalCartValue(StreamObserver<Cart> responseObserver) {
94
@Override
public void onNext(Book book) {
logger.info("Searching for book with title
starting with: " + book.getName());
if(bookEntry.getValue().getName().startsWith(book.getName()))
{
logger.info("Found book, adding to
cart:....");
bookCart.add(bookEntry.getValue());
}
}
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream:
" + t);
}
@Override
public void onCompleted() {
int cartValue = 0;
responseObserver.onNext(Cart.newBuilder()
95
.setPrice(cartValue)
.setBooks(bookCart.size()).build());
responseObserver.onCompleted();
}
};
}
}
}
This server code is a simple example of client side streaming. The server
simply tracks the books which the client wants and at the end, it provides the
total Cart Value of the order.
But there is nothing special here with respect to cancelation of request as that
is something the client would invoke. So, let us look at the client code.
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
96
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.Cart;
@Override
public void onNext(Cart cart) {
logger.info("Order summary:" +
"\nTotal number of Books: " + cart.getBooks() +
"\nTotal Order Value: " + cart.getPrice());
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading response from
Server: " + t);
}
@Override
97
public void onCompleted() {
}
};
return observer;
}
if(streamClientSender == null) {
withCancellation = Context.current().withCancellation();
streamClientSender =
stub.totalCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
98
public void cancelOrder() {
withCancellation.cancel(null);
}
try {
BookStoreClientStreamingClientCancelation client =
new BookStoreClientStreamingClientCancelation(channel);
String bookName = "";
while(true) {
System.out.println("Type book name to be added to
the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
if(bookName.equals("CANCEL")) {
client.cancelOrder();
break;
}
client.addBook(bookName);
}
99
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
So, if we see the above code, the following line defines a context which has
cancellation enabled.
withCancellation = Context.current().withCancellation();
And here is the method which would be called when the user types CANCEL.
This would cancel the order and also let the server know about it.
Let us now see this in action. For running the code, fire up two shells. Start the
server on the first shell by executing the following command:
100
Type book name to be added to the cart....
Great
Jul 31, 2021 3:30:55 PM
com.tp.bookstore.BookStoreClientStreamingClientCancelation
addBook
INFO: Adding book with title starting with: Great
Type book name to be added to the cart....
CANCEL
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookStoreClientStreamingClientCancelation$1
onError
INFO: Error while reading response from Server:
io.grpc.StatusRuntimeException: UNAVAILABLE: Channel
shutdownNow invoked
So, as we can see, the client initiated a cancellation of the request it made to
the server. The server was also notified about the cancellation.
101
gRPC – Send/Receive Metadata
Let us take an example to understand it better. Here is our client code which
sends the hostname as metadata:
package com.tp.bookstore;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
102
import java.util.logging.Logger;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
return new
ForwardingClientCall.SimpleForwardingClientCall<ReqT,
RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT>
responseListener, Metadata headers) {
logger.info("Added metadata");
headers.put(Metadata.Key.of("HOSTNAME",
ASCII_STRING_MARSHALLER), "MY_HOST");
103
super.start(responseListener, headers);
}
};
}
}
ManagedChannel channel =
ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.intercept(new BookClientInterceptor())
.build();
104
try {
BookStoreClientUnaryBlockingMetadata client = new
BookStoreClientUnaryBlockingMetadata(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT>
interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new
ForwardingClientCall.SimpleForwardingClientCall<ReqT,
RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT>
responseListener, Metadata headers) {
logger.info("Added metadata");
headers.put(Metadata.Key.of("HOSTNAME",
ASCII_STRING_MARSHALLER), "MY_HOST");
super.start(responseListener, headers);
}
};
}
}
We intercept any call which is being made by the client and then add hostname
metadata to it before it is called further.
105
Server Reads Metadata
Now, let us look at the server code which reads this metadata:
package com.tp.bookstore;
import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
106
bookMap.put("Great Gatsby",
Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
}
@Override
public <ReqT, RespT> Listener<ReqT>
interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
107
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
@Override
public void first(BookSearch searchQuery,
StreamObserver<Book> responseObserver) {
logger.info("Searching for book with title: " +
searchQuery.getName());
List<String> matchingBookTitles =
bookMap.keySet().stream()
.filter(title ->
title.startsWith(searchQuery.getName().trim()))
.collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
108
foundBook = bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
We intercept any call which is incoming to the server and then log the metadata
before the call can be handled by the actual method.
Client-Server Call
Let us now see this in action. For running the code, fire up two shells. Start the
server on the first shell by executing the following command:
109
Now, let us start the client.
110