基础教程
gRPC Java 基础教程简介。
基础教程
本教程为 Java 程序员提供了 gRPC 基础使用介绍。
通过此示例,您将学习如何
- 在
.proto
文件中定义服务。 - 使用协议缓冲区编译器生成服务器和客户端代码。
- 使用 Java gRPC API 为您的服务编写一个简单的客户端和服务器。
本教程假定您已阅读gRPC 简介并熟悉Protocol Buffers。请注意,本教程中的示例使用 proto3 版本的 Protocol Buffers 语言:您可以在proto3 语言指南和Java 生成代码指南中找到更多信息。
为什么使用 gRPC?
我们的示例是一个简单的路线映射应用程序,它允许客户端获取其路线上特征的信息,创建其路线摘要,并与服务器及其他客户端交换路线信息(例如交通更新)。
使用 gRPC,我们可以在一个 .proto
文件中定义一次服务,然后生成 gRPC 支持的任何语言的客户端和服务器,这些客户端和服务器可以在从大型数据中心内的服务器到您自己的平板电脑等各种环境中运行——不同语言和环境之间的所有复杂通信都由 gRPC 为您处理。我们还获得了使用 Protocol Buffers 的所有优势,包括高效序列化、简单的 IDL 和易于接口更新。
示例代码和设置
我们教程的示例代码位于 grpc/grpc-java/examples/src/main/java/io/grpc/examples/routeguide。要下载示例,请通过运行以下命令克隆 grpc-java
仓库的最新版本
git clone -b v1.73.0 --depth 1 https://github.com/grpc/grpc-java
然后将当前目录更改为 grpc-java/examples
cd grpc-java/examples
定义服务
我们的第一步(正如您从gRPC 简介中所了解的)是使用Protocol Buffers定义 gRPC 服务以及方法请求和响应类型。您可以在 grpc-java/examples/src/main/proto/route_guide.proto 中查看完整的 .proto
文件。
由于在此示例中我们生成 Java 代码,因此我们在 .proto
文件中指定了 java_package
文件选项
option java_package = "io.grpc.examples.routeguide";
这指定了我们希望用于生成的 Java 类的包。如果 .proto
文件中没有给出显式的 java_package
选项,那么默认将使用 proto 包(使用“package”关键字指定)。然而,proto 包通常不适合作为 Java 包,因为 proto 包不要求以反向域名开头。如果从此 .proto
文件生成其他语言的代码,java_package
选项将无效。
要定义服务,我们在 .proto
文件中指定一个命名 service
:
service RouteGuide {
...
}
然后我们在服务定义中定义 rpc
方法,并指定其请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide
服务中使用
简单 RPC:客户端使用存根向服务器发送请求,并等待响应返回,就像普通的函数调用一样。
// Obtains the feature at a given position. rpc GetFeature(Point) returns (Feature) {}
服务器端流式 RPC:客户端向服务器发送请求,并获得一个流以读取一系列消息。客户端从返回的流中读取,直到没有更多消息为止。如您在示例中所见,通过在响应类型前放置
stream
关键字来指定服务器端流式方法。// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}
客户端流式 RPC:客户端写入一系列消息并将其发送到服务器,同样使用提供的流。一旦客户端完成消息写入,它会等待服务器读取所有消息并返回其响应。通过在请求类型前放置
stream
关键字来指定客户端流式方法。// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}
双向流式 RPC:双方使用读写流发送一系列消息。两个流独立运行,因此客户端和服务器可以按其喜欢的任何顺序读写:例如,服务器可以等待接收所有客户端消息后再写入其响应,或者它可以交替地读取一条消息然后写入一条消息,或者读写操作的其他组合。每个流中消息的顺序都得到保留。通过在请求和响应前都放置
stream
关键字来指定此类方法。// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
我们的 .proto
文件还包含我们服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义——例如,这是 Point
消息类型
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
生成客户端和服务器代码
接下来,我们需要从 .proto
服务定义中生成 gRPC 客户端和服务器接口。我们使用 Protocol Buffers 编译器 protoc
和一个特殊的 gRPC Java 插件来完成此操作。您需要使用 proto3 编译器(支持 proto2 和 proto3 语法)来生成 gRPC 服务。
当使用 Gradle 或 Maven 时,protoc 构建插件可以在构建过程中生成必要的代码。您可以参考 grpc-java README 以了解如何从您自己的 .proto
文件生成代码。
以下类是从我们的服务定义中生成的
Feature.java
、Point.java
、Rectangle.java
以及其他包含所有 Protocol Buffers 代码的类,用于填充、序列化和检索我们的请求和响应消息类型。RouteGuideGrpc.java
包含(以及一些其他有用的代码)- 一个用于
RouteGuide
服务器实现的基类RouteGuideGrpc.RouteGuideImplBase
,其中包含在RouteGuide
服务中定义的所有方法。 - 客户端可以用来与
RouteGuide
服务器通信的存根类。
- 一个用于
创建服务器
首先让我们看看如何创建一个 RouteGuide
服务器。如果您只对创建 gRPC 客户端感兴趣,您可以跳过此部分并直接跳到创建客户端(尽管您可能仍然会觉得它很有趣!)。
使我们的 RouteGuide
服务正常工作需要两部分
- 覆盖从我们的服务定义生成的服务基类:执行我们服务的实际“工作”。
- 运行 gRPC 服务器以侦听来自客户端的请求并返回服务响应。
您可以在 grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java 中找到我们的示例 RouteGuide
服务器。让我们仔细看看它是如何工作的。
实现 RouteGuide
如您所见,我们的服务器有一个 RouteGuideService
类,它扩展了生成的 RouteGuideGrpc.RouteGuideImplBase
抽象类
private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
...
}
简单 RPC
RouteGuideService
实现了我们所有的服务方法。我们首先来看看最简单的方法,GetFeature()
,它只是从客户端获取一个 Point
,并从其数据库中以 Feature
形式返回相应的特征信息。
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}
...
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}
// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
getFeature()
方法接受两个参数
Point
:请求StreamObserver<Feature>
:一个响应观察者,这是服务器用来返回响应的特殊接口。
要将响应返回给客户端并完成调用
- 我们构建并填充一个
Feature
响应对象返回给客户端,如我们的服务定义中所指定。在此示例中,我们在一个单独的私有checkFeature()
方法中完成此操作。 - 我们使用响应观察者的
onNext()
方法来返回Feature
。 - 我们使用响应观察者的
onCompleted()
方法来指定我们已完成对 RPC 的处理。
服务器端流式 RPC
接下来我们看看其中一个流式 RPC。ListFeatures
是一个服务器端流式 RPC,因此我们需要向客户端返回多个 Feature
。
private final Collection<Feature> features;
...
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
与简单的 RPC 类似,此方法获取一个请求对象(客户端希望在其中找到 Feature
的 Rectangle
)和一个 StreamObserver
响应观察者。
这一次,我们获取所有需要返回给客户端的 Feature
对象(在本例中,我们根据它们是否在我们的请求 Rectangle
中从服务的特征集合中选择它们),并使用其 onNext()
方法将它们依次写入响应观察者。最后,与我们的简单 RPC 一样,我们使用响应观察者的 onCompleted()
方法告知 gRPC 我们已完成响应的写入。
客户端流式 RPC
现在让我们看一个稍微复杂一点的:客户端流式方法 RecordRoute()
,我们从客户端获取一个 Point
流并返回一个包含其行程信息的 RouteSummary
。
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
如您所见,与之前的方法类型类似,我们的方法获取一个 StreamObserver
响应观察者参数,但这次它返回一个 StreamObserver
供客户端写入其 Point
。
在方法体中,我们实例化一个匿名 StreamObserver
并返回,在该观察者中我们
- 覆盖
onNext()
方法,以便在客户端每次向消息流写入Point
时获取特征和其他信息。 - 覆盖
onCompleted()
方法(在客户端完成消息写入时调用),以填充和构建我们的RouteSummary
。然后我们使用RouteSummary
调用我们方法自身的响应观察者的onNext()
方法,再调用其onCompleted()
方法以完成服务器端的调用。
双向流式 RPC
最后,让我们看看我们的双向流式 RPC RouteChat()
。
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
与我们的客户端流式示例一样,我们都获取并返回一个 StreamObserver
响应观察者,不同之处在于,这次我们在客户端仍在向其消息流写入消息时,通过我们方法的响应观察者返回值。这里的读写语法与我们的客户端流式和服务器流式方法完全相同。尽管每一方总是会按照消息写入的顺序接收对方的消息,但客户端和服务器都可以以任何顺序读写——流是完全独立操作的。
启动服务器
一旦我们实现了所有方法,我们还需要启动一个 gRPC 服务器,以便客户端能够实际使用我们的服务。以下代码片段展示了我们如何为 RouteGuide
服务执行此操作
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
...
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
...
}
如您所见,我们使用 ServerBuilder
构建并启动服务器。
为此,我们
- 使用构建器的
forPort()
方法指定我们希望用于侦听客户端请求的地址和端口。 - 创建我们的服务实现类
RouteGuideService
的实例,并将其传递给构建器的addService()
方法。 - 在构建器上调用
build()
和start()
,以创建并启动我们服务的 RPC 服务器。
创建客户端
在本节中,我们将介绍如何为我们的 RouteGuide
服务创建客户端。您可以在 grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java 中查看完整的示例客户端代码。
实例化存根
要调用服务方法,我们首先需要创建一个存根,或者更确切地说,是两个存根
- 一个阻塞/同步存根:这意味着 RPC 调用会等待服务器响应,并会返回响应或抛出异常。
- 一个非阻塞/异步存根,它向服务器发起非阻塞调用,其中响应是异步返回的。您只能使用异步存根进行某些类型的流式调用。
首先,我们需要为我们的存根创建一个 gRPC 通道,指定我们想要连接的服务器地址和端口
public RouteGuideClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
}
/** Construct client for accessing RouteGuide server using the existing channel. */
public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
我们使用 ManagedChannelBuilder
来创建通道。
现在我们可以使用该通道,通过我们在 .proto
文件中生成的 RouteGuideGrpc
类中提供的 newStub
和 newBlockingStub
方法来创建存根。
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
调用服务方法
现在让我们看看如何调用我们的服务方法。
简单 RPC
在阻塞存根上调用简单的 RPC GetFeature
就像调用本地方法一样简单。
Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature;
try {
feature = blockingStub.getFeature(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
我们创建并填充一个请求 Protocol Buffers 对象(在本例中是 Point
),将其传递给我们阻塞存根上的 getFeature()
方法,并获得一个 Feature
。
如果发生错误,它会编码为一个 Status
,我们可以从 StatusRuntimeException
中获取它。
服务器端流式 RPC
接下来,我们看看对 ListFeatures
的服务器端流式调用,它返回一个地理 Feature
流
Rectangle request =
Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
如您所见,它与我们刚才看到的简单 RPC 非常相似,不同之处在于,该方法不是返回单个 Feature
,而是返回一个 Iterator
,客户端可以使用它读取所有返回的 Feature
。
客户端流式 RPC
现在来看一个稍微复杂一点的:客户端流式方法 RecordRoute
,我们向服务器发送一个 Point
流,并获得一个 RouteSummary
。对于此方法,我们需要使用异步存根。如果您已经阅读了创建服务器,其中的一些内容可能看起来非常熟悉——异步流式 RPC 在两端的实现方式相似。
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
如您所见,要调用此方法,我们需要创建一个 StreamObserver
,它实现了一个特殊接口,供服务器用于其 RouteSummary
响应的调用。在我们的 StreamObserver
中,我们
- 覆盖
onNext()
方法,以便在服务器将RouteSummary
写入消息流时打印返回的信息。 - 覆盖
onCompleted()
方法(在服务器完成其端的调用时调用),以减少一个CountDownLatch
,我们可以检查它来查看服务器是否已完成写入。
然后,我们将 StreamObserver
传递给异步存根的 recordRoute()
方法,并获取我们自己的 StreamObserver
请求观察者,用于写入要发送到服务器的 Point
。一旦我们完成点的写入,我们使用请求观察者的 onCompleted()
方法来告知 gRPC 我们已在客户端完成写入。完成操作后,我们检查我们的 CountDownLatch
以确认服务器已在其端完成操作。
双向流式 RPC
最后,让我们看看我们的双向流式 RPC RouteChat()
。
public void routeChat() throws Exception {
info("*** RoutChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RouteChat Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 1),
newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
与我们的客户端流式示例一样,我们都获取并返回一个 StreamObserver
响应观察者,不同之处在于,这次我们在服务器仍在向其消息流写入消息时,通过我们方法的响应观察者发送值。这里的读写语法与我们的客户端流式方法完全相同。尽管每一方总是会按照消息写入的顺序接收对方的消息,但客户端和服务器都可以以任何顺序读写——流是完全独立操作的。
尝试一下!
按照 示例目录 README 中的说明构建并运行客户端和服务器。