基础教程

一份关于 Android Java 中 gRPC 的基本教程介绍。

基础教程

一份关于 Android Java 中 gRPC 的基本教程介绍。

本教程为 Android Java 程序员提供了 gRPC 工作的基本介绍。

通过此示例,您将学习如何

  • 在 .proto 文件中定义服务。
  • 使用协议缓冲区编译器生成客户端代码。
  • 使用 Java gRPC API 为您的服务编写一个简单的移动客户端。

本教程假定您已阅读 gRPC 简介 并熟悉 协议缓冲区。本指南不涉及服务器端内容。您可以查看 Java 页面以获取更多信息。

为什么使用 gRPC?

我们的示例是一个简单的路线映射应用程序,它允许客户端获取其路线上特征的信息,创建其路线摘要,并与服务器及其他客户端交换路线信息(例如交通更新)。

使用 gRPC,我们可以在一个 .proto 文件中定义一次服务,并以 gRPC 支持的任何语言生成客户端和服务器,这些客户端和服务器又可以在从大型数据中心内的服务器到您自己的平板电脑等各种环境中运行——不同语言和环境之间的所有复杂通信都由 gRPC 为您处理。我们还获得了使用协议缓冲区的所有优点,包括高效序列化、简单的 IDL 和易于接口更新。

示例代码和设置

我们教程的示例代码位于 grpc-java 的 examples/android 中。要下载示例,请运行以下命令克隆 grpc-java 仓库

git clone -b v1.73.0 https://github.com/grpc/grpc-java.git

然后将当前目录更改为 grpc-java/examples/android

cd grpc-java/examples/android

您还应该安装相关工具来生成客户端接口代码——如果您尚未安装,请按照 grpc-java README 中的设置说明进行操作。

定义服务

我们的第一步(正如您从 gRPC 简介中了解到的)是使用 协议缓冲区 定义 gRPC 服务以及方法请求响应类型。您可以在 routeguide/app/src/main/proto/route_guide.proto 中查看完整的 .proto 文件。

在此示例中,我们生成 Java 代码,因此在我们的 .proto 文件中指定了一个 java_package 文件选项:

option java_package = "io.grpc.examples";

这指定了我们希望为生成的 Java 类使用的包。如果在 .proto 文件中没有给出显式的 java_package 选项,则默认将使用 proto 包(使用“package”关键字指定)。然而,proto 包通常不是好的 Java 包,因为 proto 包不期望以反向域名开头。如果从这个 .proto 生成其他语言的代码,java_package 选项没有效果。

要定义服务,我们在 .proto 文件中指定一个命名的 service

service RouteGuide {
   ...
}

然后我们在服务定义中定义 rpc 方法,并指定它们的请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide 服务中使用:

  • 简单 RPC:客户端使用存根向服务器发送请求,并等待响应返回,就像普通的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 一种*服务器端流式 RPC*,其中客户端向服务器发送请求,并获得一个流以读取一系列消息。客户端从返回的流中读取,直到没有更多消息。正如您在我们的示例中看到的,您通过在*响应*类型前放置 stream 关键字来指定服务器端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 一种*客户端流式 RPC*,其中客户端写入一系列消息并将其发送到服务器,同样使用提供的流。一旦客户端完成消息写入,它会等待服务器读取所有消息并返回其响应。您通过在*请求*类型前放置 stream 关键字来指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 一种*双向流式 RPC*,其中双方使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按其喜欢的任何顺序读写:例如,服务器可以等待接收所有客户端消息后再写入其响应,或者它可以交替地读取一条消息然后写入一条消息,或者读写组合。每个流中的消息顺序都得到保留。您通过在请求和响应前都放置 stream 关键字来指定此类型的方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

我们的 .proto 文件还包含我们服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义——例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端代码

接下来,我们需要从我们的 .proto 服务定义生成 gRPC 客户端接口。我们使用协议缓冲区编译器 protoc 和一个特殊的 gRPC Java 插件来完成此操作。您需要使用 proto3 编译器(支持 proto2 和 proto3 语法)来生成 gRPC 服务。

此示例的构建系统也是 Java-gRPC 构建的一部分。请参考 grpc-java READMEbuild.gradle 以了解如何从您自己的 .proto 文件生成代码。请注意,对于 Android,我们将使用针对移动用例优化的 protobuf lite。

以下类是根据我们的服务定义生成的:

  • Feature.javaPoint.javaRectangle.java 以及其他包含所有协议缓冲区代码的类,用于填充、序列化和检索我们的请求和响应消息类型。
  • RouteGuideGrpc.java,其中包含(以及一些其他有用的代码):
    • 一个供 RouteGuide 服务器实现的基类 RouteGuideGrpc.RouteGuideImplBase,其中包含 RouteGuide 服务中定义的所有方法。
    • 客户端可以用来与 RouteGuide 服务器通信的 *stub* 类。

创建客户端

在本节中,我们将探讨为我们的 RouteGuide 服务创建 Java 客户端。您可以在 routeguide/app/src/main/java/io/grpc/routeguideexample/RouteGuideActivity.java 中查看完整的示例客户端代码。

创建存根

要调用服务方法,我们首先需要创建一个 *stub*,或者更确切地说,两个 *stub*:

  • 一个*阻塞/同步* stub:这意味着 RPC 调用会等待服务器响应,并会返回响应或抛出异常。
  • 一个*非阻塞/异步* stub,它对服务器进行非阻塞调用,响应是异步返回的。某些类型的流式调用只能使用异步 stub。

首先,我们需要为我们的 stub 创建一个 gRPC *通道*,指定我们要连接的服务器地址和端口:我们使用 ManagedChannelBuilder 来创建通道。

mChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();

现在我们可以使用该通道,通过我们在 .proto 文件中生成的 RouteGuideGrpc 类中提供的 newStubnewBlockingStub 方法来创建我们的 stubs。

blockingStub = RouteGuideGrpc.newBlockingStub(mChannel);
asyncStub = RouteGuideGrpc.newStub(mChannel);

调用服务方法

现在让我们看看如何调用我们的服务方法。

简单 RPC

在阻塞 stub 上调用简单的 RPC GetFeature 就像调用本地方法一样简单。

Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature = blockingStub.getFeature(request);

我们创建并填充一个请求协议缓冲区对象(在本例中为 Point),将其传递给我们的阻塞 stub 上的 getFeature() 方法,并获得一个 Feature

服务器端流式 RPC

接下来,让我们看看对 ListFeatures 的服务器端流式调用,它返回一个地理 Feature 序列流:

Rectangle request =
    Rectangle.newBuilder()
        .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features = blockingStub.listFeatures(request);

正如您所看到的,它与我们刚才看到的简单 RPC 非常相似,只是它不是返回单个 Feature,而是返回一个 Iterator,客户端可以使用它来读取所有返回的 Feature

客户端流式 RPC

现在来看一些更复杂的内容:客户端流式方法 RecordRoute,我们在此向服务器发送 Point 对象流并获得单个 RouteSummary。对于此方法,我们需要使用异步 stub。如果您已经阅读了 创建服务器,其中一些内容可能看起来非常熟悉——异步流式 RPC 在两端以类似的方式实现。

private String recordRoute(List<Point> points, int numPoints, RouteGuideStub asyncStub)
        throws InterruptedException, RuntimeException {
    final StringBuffer logs = new StringBuffer();
    appendLogs(logs, "*** RecordRoute");

    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
        @Override
        public void onNext(RouteSummary summary) {
            appendLogs(logs, "Finished trip with {0} points. Passed {1} features. "
                    + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
                    summary.getFeatureCount(), summary.getDistance(),
                    summary.getElapsedTime());
        }

        @Override
        public void onError(Throwable t) {
            failed = t;
            finishLatch.countDown();
        }

        @Override
        public void onCompleted() {
            appendLogs(logs, "Finished RecordRoute");
            finishLatch.countDown();
        }
    };

    StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
    try {
        // Send numPoints points randomly selected from the points list.
        Random rand = new Random();
        for (int i = 0; i < numPoints; ++i) {
            int index = rand.nextInt(points.size());
            Point point = points.get(index);
            appendLogs(logs, "Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
                    RouteGuideUtil.getLongitude(point));
            requestObserver.onNext(point);
            // Sleep for a bit before sending the next one.
            Thread.sleep(rand.nextInt(1000) + 500);
            if (finishLatch.getCount() == 0) {
                // RPC completed or errored before we finished sending.
                // Sending further requests won't error, but they will just be thrown away.
                break;
            }
        }
    } catch (RuntimeException e) {
        // Cancel RPC
        requestObserver.onError(e);
        throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // Receiving happens asynchronously
    if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        throw new RuntimeException(
               "Could not finish rpc within 1 minute, the server is likely down");
    }

    if (failed != null) {
        throw new RuntimeException(failed);
    }
    return logs.toString();
}

如您所见,要调用此方法,我们需要创建一个 StreamObserver,它实现了服务器用来调用其 RouteSummary 响应的特殊接口。在我们的 StreamObserver 中,我们:

  • 重写 onNext() 方法,以便当服务器将 RouteSummary 写入消息流时打印出返回的信息。
  • 重写 onCompleted() 方法(当*服务器*在其端完成调用时调用),以设置一个 SettableFuture,我们可以检查它来查看服务器是否已完成写入。

然后我们将 StreamObserver 传递给异步 stub 的 recordRoute() 方法,并获得我们自己的 StreamObserver 请求观察器,以写入要发送到服务器的 Point 对象。一旦我们完成写入点,我们使用请求观察器的 onCompleted() 方法告知 gRPC 我们已在客户端完成写入。完成后,我们检查 SettableFuture 以确认服务器已在其端完成。

双向流式 RPC

最后,让我们看看我们的双向流式 RPC RouteChat()

private String routeChat(RouteGuideStub asyncStub) throws InterruptedException,
        RuntimeException {
    final StringBuffer logs = new StringBuffer();
    appendLogs(logs, "*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
            asyncStub.routeChat(new StreamObserver<RouteNote>() {
                @Override
                public void onNext(RouteNote note) {
                    appendLogs(logs, "Got message \"{0}\" at {1}, {2}", note.getMessage(),
                            note.getLocation().getLatitude(),
                            note.getLocation().getLongitude());
                }

                @Override
                public void onError(Throwable t) {
                    failed = t;
                    finishLatch.countDown();
                }

                @Override
                public void onCompleted() {
                    appendLogs(logs,"Finished RouteChat");
                    finishLatch.countDown();
                }
            });

    try {
        RouteNote[] requests =
                {newNote("First message", 0, 0), newNote("Second message", 0, 1),
                        newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

        for (RouteNote request : requests) {
            appendLogs(logs, "Sending message \"{0}\" at {1}, {2}", request.getMessage(),
                    request.getLocation().getLatitude(),
                    request.getLocation().getLongitude());
            requestObserver.onNext(request);
        }
    } catch (RuntimeException e) {
        // Cancel RPC
        requestObserver.onError(e);
        throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // Receiving happens asynchronously
    if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        throw new RuntimeException(
                "Could not finish rpc within 1 minute, the server is likely down");
    }

    if (failed != null) {
        throw new RuntimeException(failed);
    }

    return logs.toString();
}

与我们的客户端流式示例一样,我们都获取并返回一个 StreamObserver 响应观察器,但这次我们通过方法响应观察器发送值,而服务器仍在向*它们*的消息流写入消息。这里的读写语法与我们的客户端流式方法完全相同。尽管每一方总是会按照消息写入的顺序接收对方的消息,但客户端和服务器都可以按任何顺序读写——这些流完全独立运行。

尝试一下!

请按照 示例目录 README 中的说明构建并运行客户端和服务器。