基础教程

一个关于 Android Java 中 gRPC 的基础入门教程。

基础教程

一个关于 Android Java 中 gRPC 的基础入门教程。

本教程为 Android Java 程序员提供了使用 gRPC 的基础入门指南。

通过学习本示例,你将了解如何

  • 在 .proto 文件中定义服务。
  • 使用 protocol buffer 编译器生成客户端代码。
  • 使用 Java gRPC API 为你的服务编写一个简单的移动客户端。

本教程假设你已阅读过gRPC 简介并熟悉protocol buffers。本指南不涉及服务器端的任何内容。你可以查看Java 页面获取更多信息。

为何使用 gRPC?

我们的示例是一个简单的路由映射应用程序,它允许客户端获取其路线上的特性信息,创建路线摘要,以及与服务器和其他客户端交换路线信息(如交通更新)。

使用 gRPC,我们可以在一个 .proto 文件中一次性定义服务,并生成支持 gRPC 的任何语言的客户端和服务器,这些客户端和服务器可以在从大型数据中心的服务器到你自己的平板电脑等各种环境中运行——gRPC 为你处理了不同语言和环境之间通信的所有复杂性。我们还获得了使用 protocol buffers 的所有优势,包括高效的序列化、简单的 IDL 和便捷的接口更新。

示例代码与设置

本教程的示例代码位于grpc-java 的 examples/android 目录下。要下载示例,请克隆 grpc-java 仓库,运行以下命令

git clone -b v1.72.0 https://github.com/grpc/grpc-java.git

然后将当前目录更改为 grpc-java/examples/android

cd grpc-java/examples/android

你还需要安装相关工具来生成客户端接口代码——如果尚未安装,请按照grpc-java README 中的设置说明进行操作。

定义服务

我们的第一步(如你在gRPC 简介中所知)是使用protocol buffers 定义 gRPC 服务 以及方法的 请求响应 类型。你可以在routeguide/app/src/main/proto/route_guide.proto 文件中查看完整的 .proto 文件。

由于本示例中我们生成 Java 代码,因此在 .proto 文件中指定了 java_package 文件选项

option java_package = "io.grpc.examples";

这指定了我们希望为生成的 Java 类使用的包。如果在 .proto 文件中没有给出显式的 java_package 选项,则默认使用 proto 包(使用“package”关键字指定)。然而,proto 包通常不是好的 Java 包,因为 proto 包不期望以反向域名开头。如果我们从这个 .proto 生成其他语言的代码,java_package 选项没有效果。

要定义服务,我们在 .proto 文件中指定一个命名的 service

service RouteGuide {
   ...
}

然后我们在服务定义内部定义 rpc 方法,指定它们的请求和响应类型。gRPC 允许你定义四种服务方法,所有这些都在 RouteGuide 服务中使用

  • 一个 简单 RPC:客户端使用 stub 向服务器发送请求并等待响应返回,就像正常的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 一个 服务端流式 RPC:客户端向服务器发送请求并获取一个流来读取一系列消息。客户端从返回的流中读取直到没有更多消息。如你在我们的示例中看到,你通过在 响应 类型前放置 stream 关键字来指定服务端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 一个 客户端流式 RPC:客户端写入一系列消息并使用提供的流将它们发送到服务器。一旦客户端完成写入消息,它就会等待服务器读取所有消息并返回其响应。你通过在 请求 类型前放置 stream 关键字来指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 一个 双向流式 RPC:双方使用读写流发送一系列消息。两个流独立运行,因此客户端和服务器可以按他们喜欢的任何顺序读写:例如,服务器可以等到接收所有客户端消息后再写入其响应,或者它可以交替地读取一个消息然后写入一个消息,或者采用其他一些读写组合。每个流中的消息顺序都被保留。你通过在请求和响应类型前都放置 stream 关键字来指定此类方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

我们的 .proto 文件还包含我们服务方法中使用的所有请求和响应类型的 protocol buffer 消息类型定义——例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端代码

接下来我们需要从我们的 .proto 服务定义生成 gRPC 客户端接口。我们使用 protocol buffer 编译器 protoc 以及特殊的 gRPC Java 插件来完成此操作。你需要使用proto3 编译器(支持 proto2 和 proto3 语法)来生成 gRPC 服务。

本示例的构建系统也是 Java-gRPC 构建的一部分。请参考grpc-java READMEbuild.gradle 了解如何从你自己的 .proto 文件生成代码。注意,对于 Android,我们将使用针对移动用例优化的 protobuf lite。

从我们的服务定义中生成以下类

  • Feature.javaPoint.javaRectangle.java 等文件,它们包含所有 protocol buffer 代码,用于填充、序列化和检索我们的请求和响应消息类型。
  • RouteGuideGrpc.java,其中包含(以及其他一些有用的代码)
    • RouteGuide 服务器实现的基类 RouteGuideGrpc.RouteGuideImplBase,包含在 RouteGuide 服务中定义的所有方法。
    • 客户端可用于与 RouteGuide 服务器通信的 stub 类。

创建客户端

在本节中,我们将了解如何为我们的 RouteGuide 服务创建 Java 客户端。你可以在routeguide/app/src/main/java/io/grpc/routeguideexample/RouteGuideActivity.java 中查看完整的示例客户端代码。

创建 Stub

要调用服务方法,我们首先需要创建一个 stub,或者说,两个 stub

  • 一个 阻塞/同步 stub:这意味着 RPC 调用会等待服务器响应,并将返回响应或抛出异常。
  • 一个 非阻塞/异步 stub,它向服务器发起非阻塞调用,响应会异步返回。某些类型的流式调用只能使用异步 stub。

首先,我们需要为我们的 stub 创建一个 gRPC channel,指定我们要连接的服务器地址和端口:我们使用 ManagedChannelBuilder 创建 channel。

mChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();

现在我们可以使用 channel 通过从 .proto 生成的 RouteGuideGrpc 类中提供的 newStubnewBlockingStub 方法创建我们的 stub。

blockingStub = RouteGuideGrpc.newBlockingStub(mChannel);
asyncStub = RouteGuideGrpc.newStub(mChannel);

调用服务方法

现在让我们看看如何调用我们的服务方法。

简单 RPC

在阻塞 stub 上调用简单 RPC GetFeature 就像调用本地方法一样直接。

Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature = blockingStub.getFeature(request);

我们创建并填充一个请求 protocol buffer 对象(在本例中是 Point),将其传递给我们阻塞 stub 的 getFeature() 方法,并获取一个 Feature

服务端流式 RPC

接下来,让我们看看对 ListFeatures 的服务端流式调用,它返回地理 Feature 的流

Rectangle request =
    Rectangle.newBuilder()
        .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features = blockingStub.listFeatures(request);

如你所见,它与我们刚才看到的简单 RPC 非常相似,不同之处在于该方法不是返回单个 Feature,而是返回一个 Iterator,客户端可以使用它读取所有返回的 Feature

客户端流式 RPC

现在介绍一些稍微复杂的内容:客户端流式方法 RecordRoute,我们向服务器发送 Point 流并获取单个 RouteSummary。对于此方法,我们需要使用异步 stub。如果你已经阅读了创建服务器,其中一些内容可能看起来非常熟悉——异步流式 RPC 在两端以类似的方式实现。

private String recordRoute(List<Point> points, int numPoints, RouteGuideStub asyncStub)
        throws InterruptedException, RuntimeException {
    final StringBuffer logs = new StringBuffer();
    appendLogs(logs, "*** RecordRoute");

    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
        @Override
        public void onNext(RouteSummary summary) {
            appendLogs(logs, "Finished trip with {0} points. Passed {1} features. "
                    + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
                    summary.getFeatureCount(), summary.getDistance(),
                    summary.getElapsedTime());
        }

        @Override
        public void onError(Throwable t) {
            failed = t;
            finishLatch.countDown();
        }

        @Override
        public void onCompleted() {
            appendLogs(logs, "Finished RecordRoute");
            finishLatch.countDown();
        }
    };

    StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
    try {
        // Send numPoints points randomly selected from the points list.
        Random rand = new Random();
        for (int i = 0; i < numPoints; ++i) {
            int index = rand.nextInt(points.size());
            Point point = points.get(index);
            appendLogs(logs, "Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
                    RouteGuideUtil.getLongitude(point));
            requestObserver.onNext(point);
            // Sleep for a bit before sending the next one.
            Thread.sleep(rand.nextInt(1000) + 500);
            if (finishLatch.getCount() == 0) {
                // RPC completed or errored before we finished sending.
                // Sending further requests won't error, but they will just be thrown away.
                break;
            }
        }
    } catch (RuntimeException e) {
        // Cancel RPC
        requestObserver.onError(e);
        throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // Receiving happens asynchronously
    if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        throw new RuntimeException(
               "Could not finish rpc within 1 minute, the server is likely down");
    }

    if (failed != null) {
        throw new RuntimeException(failed);
    }
    return logs.toString();
}

如你所见,要调用此方法,我们需要创建一个 StreamObserver,它实现了一个特殊接口,供服务器使用其 RouteSummary 响应进行调用。在我们的 StreamObserver 中,我们

  • 覆盖 onNext() 方法,以便在服务器将 RouteSummary 写入消息流时打印返回的信息。
  • 覆盖 onCompleted() 方法(当 服务器 在其端完成调用时调用),以设置一个 SettableFuture,我们可以检查它来查看服务器是否已完成写入。

然后我们将 StreamObserver 传递给异步 stub 的 recordRoute() 方法,并取回我们自己的 StreamObserver 请求观察者,用于写入要发送到服务器的 Point。写入完成后,我们使用请求观察者的 onCompleted() 方法通知 gRPC 我们已在客户端完成写入。完成后,我们检查 SettableFuture 以确认服务器已在其端完成。

双向流式 RPC

最后,让我们看看我们的双向流式 RPC RouteChat()

private String routeChat(RouteGuideStub asyncStub) throws InterruptedException,
        RuntimeException {
    final StringBuffer logs = new StringBuffer();
    appendLogs(logs, "*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
            asyncStub.routeChat(new StreamObserver<RouteNote>() {
                @Override
                public void onNext(RouteNote note) {
                    appendLogs(logs, "Got message \"{0}\" at {1}, {2}", note.getMessage(),
                            note.getLocation().getLatitude(),
                            note.getLocation().getLongitude());
                }

                @Override
                public void onError(Throwable t) {
                    failed = t;
                    finishLatch.countDown();
                }

                @Override
                public void onCompleted() {
                    appendLogs(logs,"Finished RouteChat");
                    finishLatch.countDown();
                }
            });

    try {
        RouteNote[] requests =
                {newNote("First message", 0, 0), newNote("Second message", 0, 1),
                        newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

        for (RouteNote request : requests) {
            appendLogs(logs, "Sending message \"{0}\" at {1}, {2}", request.getMessage(),
                    request.getLocation().getLatitude(),
                    request.getLocation().getLongitude());
            requestObserver.onNext(request);
        }
    } catch (RuntimeException e) {
        // Cancel RPC
        requestObserver.onError(e);
        throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // Receiving happens asynchronously
    if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        throw new RuntimeException(
                "Could not finish rpc within 1 minute, the server is likely down");
    }

    if (failed != null) {
        throw new RuntimeException(failed);
    }

    return logs.toString();
}

与我们的客户端流式示例一样,我们都会获取并返回一个 StreamObserver 响应观察者,不同之处在于这次我们在服务器仍在向它们自己的消息流写入消息时,通过我们方法的响应观察者发送值。这里的读写语法与我们的客户端流式方法完全相同。尽管每一端总是会按消息写入的顺序接收对方的消息,但客户端和服务器都可以按任意顺序读写——这些流是完全独立运行的。

试一试!

按照示例目录 README 中的说明来构建和运行客户端和服务器。