基础教程

面向 Android Java 的 gRPC 基础入门教程。

基础教程

面向 Android Java 的 gRPC 基础入门教程。

本教程为 Android Java 开发人员提供了使用 gRPC 的基础入门知识。

通过此示例,您将学习如何

  • 在 .proto 文件中定义服务。
  • 使用协议缓冲区编译器生成客户端代码。
  • 使用 Java gRPC API 为您的服务编写一个简单的移动端客户端。

本教程假设您已经阅读了 gRPC 简介 并熟悉 Protocol Buffers。本指南不涉及服务端的相关内容。您可以查看 Java 页面 以获取更多信息。

为什么使用 gRPC?

我们的示例是一个简单的路线映射应用程序,它允许客户端获取其路线上特征的信息,创建其路线摘要,并与服务器及其他客户端交换路线信息(例如交通更新)。

使用 gRPC,我们可以一次在 .proto 文件中定义服务,并生成 gRPC 支持的任何语言的客户端和服务器。这些客户端和服务器可以在各种环境中运行,从大型数据中心内的服务器到您的平板电脑,应有尽有——不同语言和环境之间通信的所有复杂性都由 gRPC 为您处理。我们还可以获得使用 Protocol Buffers 的所有优势,包括高效的序列化、简单的 IDL 和易于更新的接口。

示例代码和设置

本教程的示例代码位于 grpc-java 的 examples/android 中。要下载该示例,请通过运行以下命令克隆 grpc-java 仓库:

git clone -b v1.80.0 https://github.com/grpc/grpc-java.git

然后将当前目录切换到 grpc-java/examples/android

cd grpc-java/examples/android

您还应该安装生成客户端接口代码所需的相关工具——如果尚未安装,请按照 grpc-java 自述文件 (README) 中的设置说明进行操作。

定义服务

第一步(正如您从 gRPC 简介 中所知)是使用 Protocol Buffers 定义 gRPC 服务以及方法请求响应类型。您可以在 routeguide/app/src/main/proto/route_guide.proto 中查看完整的 .proto 文件。

由于本示例中正在生成 Java 代码,因此我们在 .proto 文件中指定了 java_package 文件选项:

option java_package = "io.grpc.examples";

这指定了我们希望用于生成的 Java 类的包名。如果在 .proto 文件中没有显式给出 java_package 选项,则默认使用 proto 包(通过“package”关键字指定)。但是,由于不期望 proto 包以反向域名开头,因此 proto 包通常不适合作为 Java 包。如果我们从这个 .proto 文件生成其他语言的代码,java_package 选项将不起作用。

要定义服务,我们需要在 .proto 文件中指定一个命名的 service

service RouteGuide {
   ...
}

然后,我们在服务定义中定义 rpc 方法,并指定它们的请求和响应类型。gRPC 允许您定义四种类型的服务方法,所有这些方法都在 RouteGuide 服务中使用:

  • 简单 RPC:客户端使用存根向服务器发送请求,并等待响应返回,就像普通的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 服务端流式 RPC:客户端向服务器发送请求,并获得一个用于读取一系列消息的流。客户端会读取返回的流,直到没有更多消息为止。正如您在我们的示例中所见,通过在响应类型前放置 stream 关键字,可以指定服务端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 客户端流式 RPC:客户端写入一系列消息并通过提供的流发送给服务器。一旦客户端完成了消息写入,它将等待服务器读取所有消息并返回其响应。通过在请求类型前放置 stream 关键字,可以指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 双向流式 RPC:双方都使用读写流发送一系列消息。这两个流独立操作,因此客户端和服务器可以按任何他们喜欢的顺序读取和写入:例如,服务器可以等待接收所有客户端消息后再写入其响应,或者交替读取一条消息然后写入一条消息,亦或是其他读写组合。每条流中消息的顺序都会得到保留。通过在请求和响应之前都放置 stream 关键字,可以指定这种类型的方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

我们的 .proto 文件还包含我们服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义——例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端代码

接下来,我们需要从 .proto 服务定义中生成 gRPC 客户端接口。我们使用带有特殊 gRPC Java 插件的 Protocol Buffer 编译器 protoc 来完成此操作。您需要使用 proto3 编译器(同时支持 proto2 和 proto3 语法)来生成 gRPC 服务。

本示例的构建系统也是 Java-gRPC 构建的一部分。有关如何从您自己的 .proto 文件生成代码的信息,请参考 grpc-java 自述文件 (README)build.gradle。请注意,对于 Android,我们将使用针对移动场景优化的 protobuf lite。

以下类是根据我们的服务定义生成的:

  • Feature.javaPoint.javaRectangle.java 以及其他包含所有用于填充、序列化和检索请求与响应消息类型的 Protocol Buffer 代码的类。
  • RouteGuideGrpc.java,其中包含(以及其他一些有用的代码):
    • 一个供 RouteGuide 服务端实现的基类 RouteGuideGrpc.RouteGuideImplBase,其中包含了 RouteGuide 服务中定义的所有方法。
    • 存根 (stub) 类,客户端可以使用它们与 RouteGuide 服务端通信。

创建客户端

在本节中,我们将介绍如何为 RouteGuide 服务创建一个 Java 客户端。您可以在 routeguide/app/src/main/java/io/grpc/routeguideexample/RouteGuideActivity.java 中查看我们完整的示例客户端代码。

创建存根

要调用服务方法,我们首先需要创建一个存根 (stub),更准确地说是两个存根:

  • 阻塞/同步存根:这意味着 RPC 调用会等待服务端响应,并返回响应或抛出异常。
  • 非阻塞/异步存根:它向服务端发起非阻塞调用,响应是异步返回的。某些类型的流式调用只能使用异步存根。

首先,我们需要为存根创建一个 gRPC 通道 (channel),并指定我们要连接的服务端地址和端口:我们使用 ManagedChannelBuilder 来创建通道。

mChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();

现在,我们可以使用从 .proto 文件生成的 RouteGuideGrpc 类中提供的 newStubnewBlockingStub 方法,利用该通道来创建存根。

blockingStub = RouteGuideGrpc.newBlockingStub(mChannel);
asyncStub = RouteGuideGrpc.newStub(mChannel);

调用服务方法

现在让我们看看如何调用我们的服务方法。

简单 RPC

在阻塞存根上调用简单 RPC GetFeature 就像调用本地方法一样简单。

Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature = blockingStub.getFeature(request);

我们创建并填充一个请求 Protocol Buffer 对象(本例中为 Point),将其传递给阻塞存根上的 getFeature() 方法,然后取回一个 Feature

服务器端流式 RPC

接下来,让我们看一个对 ListFeatures 的服务端流式调用,它返回一系列地理 Feature 流:

Rectangle request =
    Rectangle.newBuilder()
        .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features = blockingStub.listFeatures(request);

如您所见,它与我们刚才看到的简单 RPC 非常相似,只是该方法没有返回单个 Feature,而是返回了一个 Iterator,客户端可以使用它来读取所有返回的 Feature

客户端流式 RPC

现在来看一个更复杂的情况:客户端流式方法 RecordRoute,我们将 Point 流发送给服务端,并取回一个 RouteSummary。对于此方法,我们需要使用异步存根。如果您已经阅读过 创建服务端,那么其中一些内容可能会让您感到很熟悉——异步流式 RPC 在双方的实现方式都很相似。

private String recordRoute(List<Point> points, int numPoints, RouteGuideStub asyncStub)
        throws InterruptedException, RuntimeException {
    final StringBuffer logs = new StringBuffer();
    appendLogs(logs, "*** RecordRoute");

    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
        @Override
        public void onNext(RouteSummary summary) {
            appendLogs(logs, "Finished trip with {0} points. Passed {1} features. "
                    + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
                    summary.getFeatureCount(), summary.getDistance(),
                    summary.getElapsedTime());
        }

        @Override
        public void onError(Throwable t) {
            failed = t;
            finishLatch.countDown();
        }

        @Override
        public void onCompleted() {
            appendLogs(logs, "Finished RecordRoute");
            finishLatch.countDown();
        }
    };

    StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
    try {
        // Send numPoints points randomly selected from the points list.
        Random rand = new Random();
        for (int i = 0; i < numPoints; ++i) {
            int index = rand.nextInt(points.size());
            Point point = points.get(index);
            appendLogs(logs, "Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
                    RouteGuideUtil.getLongitude(point));
            requestObserver.onNext(point);
            // Sleep for a bit before sending the next one.
            Thread.sleep(rand.nextInt(1000) + 500);
            if (finishLatch.getCount() == 0) {
                // RPC completed or errored before we finished sending.
                // Sending further requests won't error, but they will just be thrown away.
                break;
            }
        }
    } catch (RuntimeException e) {
        // Cancel RPC
        requestObserver.onError(e);
        throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // Receiving happens asynchronously
    if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        throw new RuntimeException(
               "Could not finish rpc within 1 minute, the server is likely down");
    }

    if (failed != null) {
        throw new RuntimeException(failed);
    }
    return logs.toString();
}

如您所见,要调用此方法,我们需要创建一个 StreamObserver,它实现了一个特殊的接口,供服务端调用其 RouteSummary 响应。在我们的 StreamObserver 中,我们:

  • 重写 onNext() 方法,以便在服务端向消息流写入 RouteSummary 时打印返回的信息。
  • 重写 onCompleted() 方法(在服务端结束调用时调用),以设置一个我们可以检查的 SettableFuture,从而确认服务端是否已完成写入。

然后,我们将 StreamObserver 传递给异步存根的 recordRoute() 方法,并取回我们自己的 StreamObserver 请求观察者,用于写入我们要发送给服务端的数据点 Point。写入数据点后,我们使用请求观察者的 onCompleted() 方法来通知 gRPC 我们已在客户端侧完成了写入。完成后,我们检查 SettableFuture 以确认服务端已在对应侧完成了调用。

双向流式 RPC

最后,让我们看看我们的双向流式 RPC RouteChat()

private String routeChat(RouteGuideStub asyncStub) throws InterruptedException,
        RuntimeException {
    final StringBuffer logs = new StringBuffer();
    appendLogs(logs, "*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
            asyncStub.routeChat(new StreamObserver<RouteNote>() {
                @Override
                public void onNext(RouteNote note) {
                    appendLogs(logs, "Got message \"{0}\" at {1}, {2}", note.getMessage(),
                            note.getLocation().getLatitude(),
                            note.getLocation().getLongitude());
                }

                @Override
                public void onError(Throwable t) {
                    failed = t;
                    finishLatch.countDown();
                }

                @Override
                public void onCompleted() {
                    appendLogs(logs,"Finished RouteChat");
                    finishLatch.countDown();
                }
            });

    try {
        RouteNote[] requests =
                {newNote("First message", 0, 0), newNote("Second message", 0, 1),
                        newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

        for (RouteNote request : requests) {
            appendLogs(logs, "Sending message \"{0}\" at {1}, {2}", request.getMessage(),
                    request.getLocation().getLatitude(),
                    request.getLocation().getLongitude());
            requestObserver.onNext(request);
        }
    } catch (RuntimeException e) {
        // Cancel RPC
        requestObserver.onError(e);
        throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // Receiving happens asynchronously
    if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        throw new RuntimeException(
                "Could not finish rpc within 1 minute, the server is likely down");
    }

    if (failed != null) {
        throw new RuntimeException(failed);
    }

    return logs.toString();
}

与我们的客户端流式示例一样,我们既获得又返回一个 StreamObserver 响应观察者,但这次我们通过方法的响应观察者发送值,同时服务端仍在向消息流写入消息。此处的读写语法与客户端流式方法完全相同。尽管每一方总是会按发送顺序收到对方的消息,但客户端和服务端都可以按任何顺序读写——流的操作是完全独立的。

尝试一下!

请按照 示例目录 README 中的说明来构建并运行客户端和服务端。