基础教程
Java 中 gRPC 的基本教程介绍。
基础教程
本教程为 Java 程序员提供 gRPC 入门指南。
通过学习本示例,您将了解如何:
- 在
.proto
文件中定义服务。 - 使用协议缓冲区编译器生成服务器和客户端代码。
- 使用 Java gRPC API 为您的服务编写简单的客户端和服务器。
它假设您已阅读gRPC 简介并且熟悉协议缓冲区。请注意,本教程中的示例使用协议缓冲区语言的proto3版本:您可以在proto3 语言指南和Java 生成的代码指南中找到更多信息。
为什么要使用 gRPC?
我们的示例是一个简单的路线地图应用程序,允许客户端获取有关其路线上要素的信息、创建路线摘要,并与服务器和其他客户端交换路线信息,例如交通更新。
借助 gRPC,我们可以在 .proto
文件中定义一次服务,并生成任何 gRPC 支持的语言的客户端和服务器,这些客户端和服务器可以在从大型数据中心内部的服务器到您自己的平板电脑等环境中运行 - gRPC 会为您处理不同语言和环境之间通信的所有复杂性。我们还可以获得使用协议缓冲区的所有优势,包括高效的序列化、简单的 IDL 和简单的接口更新。
示例代码和设置
我们教程的示例代码位于grpc/grpc-java/examples/src/main/java/io/grpc/examples/routeguide。要下载示例,请运行以下命令克隆 grpc-java
存储库中的最新版本
git clone -b v1.69.0 --depth 1 https://github.com/grpc/grpc-java
然后将当前目录更改为 grpc-java/examples
cd grpc-java/examples
定义服务
我们的第一步(正如您从gRPC 简介中了解到的那样)是使用协议缓冲区定义 gRPC 服务以及方法 请求和 响应类型。您可以在grpc-java/examples/src/main/proto/route_guide.proto中查看完整的 .proto
文件。
由于我们在此示例中生成 Java 代码,因此我们在 .proto
中指定了 java_package
文件选项
option java_package = "io.grpc.examples.routeguide";
这指定了我们想要用于生成的 Java 类的包。如果在 .proto
文件中没有给出显式的 java_package
选项,则默认情况下将使用 proto 包(使用“package”关键字指定)。但是,proto 包通常不是很好的 Java 包,因为 proto 包不应以反向域名开头。如果我们从这个 .proto
中生成另一种语言的代码,则 java_package
选项无效。
要定义服务,我们在 .proto
文件中指定一个命名的 service
service RouteGuide {
...
}
然后在我们的服务定义中定义 rpc
方法,指定它们的请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide
服务中使用
简单 RPC,客户端使用存根向服务器发送请求,并等待响应返回,就像普通的函数调用一样。
// Obtains the feature at a given position. rpc GetFeature(Point) returns (Feature) {}
服务端流式 RPC,客户端向服务器发送请求,并获取一个流以读取一系列消息。客户端从返回的流中读取,直到没有更多消息为止。正如您在我们的示例中看到的,您可以通过在 响应类型之前放置
stream
关键字来指定服务端流式方法。// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}
客户端流式 RPC,客户端写入一系列消息并将它们发送到服务器,同样使用提供的流。客户端完成写入消息后,它会等待服务器读取所有消息并返回其响应。您可以通过在 请求类型之前放置
stream
关键字来指定客户端流式方法。// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}
双向流式 RPC,其中双方都使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按照他们喜欢的任何顺序读取和写入:例如,服务器可以等待接收所有客户端消息后才写入其响应,或者它可以交替读取一条消息然后写入一条消息,或者其他一些读取和写入的组合。每个流中消息的顺序都会被保留。您可以通过在请求和响应之前都放置
stream
关键字来指定这种类型的方法。// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
我们的 .proto
文件还包含服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义 - 例如,这是 Point
消息类型
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
生成客户端和服务器代码
接下来,我们需要从我们的 .proto
服务定义生成 gRPC 客户端和服务器接口。我们使用带有特殊 gRPC Java 插件的协议缓冲区编译器 protoc
来完成此操作。您需要使用 proto3 编译器(它支持 proto2 和 proto3 语法)才能生成 gRPC 服务。
当使用 Gradle 或 Maven 时,protoc 构建插件可以生成必要的代码作为构建的一部分。您可以参考 grpc-java README 了解如何从您自己的 .proto
文件生成代码。
以下类是从我们的服务定义生成的
Feature.java
、Point.java
、Rectangle.java
以及其他包含所有协议缓冲区代码的类,用于填充、序列化和检索我们的请求和响应消息类型。RouteGuideGrpc.java
其中包含(以及一些其他有用的代码)RouteGuide
服务器实现的基类,RouteGuideGrpc.RouteGuideImplBase
,其中包含RouteGuide
服务中定义的所有方法。- 客户端可用于与
RouteGuide
服务器通信的存根类。
创建服务器
首先让我们看看如何创建一个 RouteGuide
服务器。如果您只对创建 gRPC 客户端感兴趣,您可以跳过此部分并直接转到 创建客户端(不过您可能会觉得它也很有趣!)。
要使我们的 RouteGuide
服务执行其工作,需要完成两个部分
- 覆盖从我们的服务定义生成的服务基类:执行我们服务的实际“工作”。
- 运行 gRPC 服务器以监听来自客户端的请求并返回服务响应。
您可以在 grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java 中找到我们的示例 RouteGuide
服务器。让我们仔细看看它是如何工作的。
实现 RouteGuide
如您所见,我们的服务器有一个 RouteGuideService
类,它扩展了生成的 RouteGuideGrpc.RouteGuideImplBase
抽象类
private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
...
}
简单 RPC
RouteGuideService
实现了我们所有的服务方法。让我们先看看最简单的方法 GetFeature()
,它只是从客户端获取一个 Point
,并从其数据库中返回相应的特征信息,格式为 Feature
。
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}
...
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}
// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
getFeature()
方法有两个参数
Point
:请求StreamObserver<Feature>
:响应观察者,这是一个特殊的接口,供服务器调用以返回其响应。
为了将我们的响应返回给客户端并完成调用
- 我们构造并填充一个
Feature
响应对象,以返回给客户端,如我们的服务定义中所指定的那样。在这个示例中,我们在一个单独的私有checkFeature()
方法中执行此操作。 - 我们使用响应观察者的
onNext()
方法来返回Feature
。 - 我们使用响应观察者的
onCompleted()
方法来指定我们已经完成处理 RPC。
服务端流式 RPC
接下来,让我们看看我们的一个流式 RPC。ListFeatures
是一个服务器端流式 RPC,因此我们需要向客户端发送多个 Feature
。
private final Collection<Feature> features;
...
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
与简单的 RPC 一样,此方法会获取一个请求对象(客户端想要查找 Feature
的 Rectangle
)和一个 StreamObserver
响应观察者。
这次,我们获取尽可能多的 Feature
对象以返回给客户端(在本例中,我们根据它们是否在我们的请求 Rectangle
内从服务的特征集合中选择它们),并使用其 onNext()
方法依次将它们写入响应观察者。最后,就像在我们的简单 RPC 中一样,我们使用响应观察者的 onCompleted()
方法告诉 gRPC 我们已经完成写入响应。
客户端流式 RPC
现在让我们看看更复杂的东西:客户端流式方法 RecordRoute()
,我们从客户端获取 Point
流,并返回一个带有他们的旅行信息的 RouteSummary
。
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
如您所见,与以前的方法类型一样,我们的方法会获取一个 StreamObserver
响应观察者参数,但这次它会返回一个 StreamObserver
,供客户端写入其 Point
。
在该方法主体中,我们实例化一个匿名 StreamObserver
以返回,其中我们
- 覆盖
onNext()
方法,以便每次客户端将Point
写入消息流时都获取特征和其他信息。 - 覆盖
onCompleted()
方法(当客户端完成写入消息时调用)以填充和构建我们的RouteSummary
。然后,我们使用我们的方法的响应观察者的onNext()
方法传递我们的RouteSummary
,然后调用它的onCompleted()
方法以完成服务器端的调用。
双向流式 RPC
最后,让我们看看我们的双向流式 RPC RouteChat()
。
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
与我们的客户端流式示例一样,我们既获取又返回 StreamObserver
响应观察者,但这次我们通过我们方法的响应观察者返回值,而客户端仍在向他们的消息流写入消息。此处读取和写入的语法与我们的客户端流式方法和服务器流式方法完全相同。尽管每一方都会始终按照写入顺序获取另一方的消息,但客户端和服务器都可以按照任何顺序读取和写入 — 这些流是完全独立运行的。
启动服务器
一旦我们实现了我们所有的方法,我们还需要启动一个 gRPC 服务器,以便客户端可以实际使用我们的服务。以下代码段显示了我们如何为我们的 RouteGuide
服务执行此操作
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
...
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
...
}
如您所见,我们使用 ServerBuilder
构建并启动我们的服务器。
为此,我们
- 使用构建器的
forPort()
方法指定我们要用于监听客户端请求的地址和端口。 - 创建我们的服务实现类
RouteGuideService
的实例,并将其传递给构建器的addService()
方法。 - 在构建器上调用
build()
和start()
,为我们的服务创建并启动 RPC 服务器。
创建客户端
在本节中,我们将了解如何为我们的 RouteGuide
服务创建客户端。你可以在 grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java 中查看我们完整的客户端代码示例。
实例化存根
要调用服务方法,我们首先需要创建一个存根,或者更准确地说,是两个存根:
- 一个阻塞/同步存根:这意味着 RPC 调用会等待服务器响应,并返回响应或抛出异常。
- 一个非阻塞/异步存根:它对服务器进行非阻塞调用,响应以异步方式返回。您只能使用异步存根进行某些类型的流式调用。
首先,我们需要为我们的存根创建一个 gRPC 通道,指定我们要连接的服务器地址和端口。
public RouteGuideClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
}
/** Construct client for accessing RouteGuide server using the existing channel. */
public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
我们使用 ManagedChannelBuilder
来创建通道。
现在,我们可以使用该通道,利用从 .proto
文件生成的 RouteGuideGrpc
类中提供的 newStub
和 newBlockingStub
方法创建我们的存根。
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
调用服务方法
现在,让我们看看如何调用我们的服务方法。
简单 RPC
在阻塞存根上调用简单的 RPC GetFeature
就如同调用本地方法一样简单。
Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature;
try {
feature = blockingStub.getFeature(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
我们创建一个并填充请求协议缓冲区对象(在我们的例子中是 Point
),将其传递给我们阻塞存根上的 getFeature()
方法,然后得到一个 Feature
。
如果发生错误,它会编码为 Status
,我们可以从 StatusRuntimeException
中获取该状态。
服务端流式 RPC
接下来,让我们看看服务器端流式调用 ListFeatures
,它返回一个地理 Feature
的流。
Rectangle request =
Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
正如您所看到的,它与我们刚刚看到的简单 RPC 非常相似,只是该方法不是返回单个 Feature
,而是返回一个 Iterator
,客户端可以使用该迭代器来读取所有返回的 Feature
。
客户端流式 RPC
现在来看一个稍微复杂一点的例子:客户端流式方法 RecordRoute
,其中我们将 Point
流发送到服务器,并返回单个 RouteSummary
。对于此方法,我们需要使用异步存根。如果您已经阅读过创建服务器,那么其中一些内容看起来可能会非常熟悉 - 异步流式 RPC 在双方的实现方式类似。
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
正如您所看到的,要调用此方法,我们需要创建一个 StreamObserver
,它实现了一个特殊接口,供服务器使用其 RouteSummary
响应进行调用。在我们的 StreamObserver
中,我们:
- 重写
onNext()
方法,以便在服务器向消息流写入RouteSummary
时打印返回的信息。 - 重写
onCompleted()
方法(当服务器在其一侧完成调用时调用),以减少我们可以检查的CountDownLatch
,以查看服务器是否已完成写入。
然后,我们将 StreamObserver
传递给异步存根的 recordRoute()
方法,并返回我们自己的 StreamObserver
请求观察者,以写入我们发送到服务器的 Point
。一旦我们完成写入点,我们使用请求观察者的 onCompleted()
方法来告诉 gRPC 我们已完成客户端的写入。完成后,我们检查我们的 CountDownLatch
,以检查服务器是否已在其一侧完成。
双向流式 RPC
最后,让我们看看我们的双向流式 RPC RouteChat()
。
public void routeChat() throws Exception {
info("*** RoutChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RouteChat Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 1),
newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
与我们的客户端流式示例一样,我们既获取又返回一个 StreamObserver
响应观察者,只是这次我们在服务器仍在向他们的消息流写入消息时,通过我们方法的响应观察者发送值。 此处读取和写入的语法与我们的客户端流式方法完全相同。尽管每一方将始终按写入顺序获取另一方的消息,但客户端和服务器都可以以任何顺序读取和写入 - 流是完全独立运行的。
尝试一下!
按照 示例目录 README 中的说明构建并运行客户端和服务器。