基础教程

Android Java 中 gRPC 的基本教程介绍。

基础教程

Android Java 中 gRPC 的基本教程介绍。

本教程为 Android Java 程序员提供了使用 gRPC 的基本介绍。

通过学习本示例,您将了解如何

  • 在 .proto 文件中定义服务。
  • 使用协议缓冲区编译器生成客户端代码。
  • 使用 Java gRPC API 为您的服务编写一个简单的移动客户端。

它假设您已阅读 gRPC 简介 并熟悉 协议缓冲区。本指南也不涵盖任何关于服务器端的内容。您可以查看 Java 页面 以获取更多信息。

为什么使用 gRPC?

我们的示例是一个简单的路线地图应用程序,它允许客户端获取关于他们路线上的要素信息,创建他们路线的摘要,并与服务器和其他客户端交换路线信息(例如交通更新)。

使用 gRPC,我们可以在 .proto 文件中定义一次服务,并在 gRPC 支持的任何语言中生成客户端和服务器,这些客户端和服务器反过来可以在从大型数据中心内的服务器到您自己的平板电脑等各种环境中运行——不同语言和环境之间通信的所有复杂性都由 gRPC 为您处理。我们还可以获得使用协议缓冲区的所有优势,包括高效的序列化、简单的 IDL 和易于更新的接口。

示例代码和设置

我们教程的示例代码位于 grpc-java 的 examples/android 中。要下载示例,请运行以下命令克隆 grpc-java 存储库

git clone -b v1.69.0 https://github.com/grpc/grpc-java.git

然后将当前目录更改为 grpc-java/examples/android

cd grpc-java/examples/android

您还应该安装相关工具以生成客户端接口代码 - 如果您还没有,请按照 grpc-java README 中的设置说明进行操作。

定义服务

我们的第一步(正如您从 gRPC 简介 中了解到的那样)是使用 协议缓冲区 定义 gRPC 服务 以及方法请求响应类型。您可以在 routeguide/app/src/main/proto/route_guide.proto 中查看完整的 .proto 文件。

由于我们在本示例中生成 Java 代码,因此我们在 .proto 中指定了一个 java_package 文件选项

option java_package = "io.grpc.examples";

这指定了我们想要用于生成的 Java 类的包。如果在 .proto 文件中没有给出明确的 java_package 选项,则默认情况下将使用 proto 包(使用 “package” 关键字指定)。但是,proto 包通常不会成为好的 Java 包,因为 proto 包不应该以反向域名开头。如果我们从这个 .proto 中生成另一种语言的代码,则 java_package 选项无效。

要定义服务,我们在 .proto 文件中指定一个名为 service 的服务

service RouteGuide {
   ...
}

然后在我们的服务定义中定义 rpc 方法,指定它们的请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide 服务中使用

  • 简单 RPC,其中客户端使用存根向服务器发送请求,并等待返回响应,就像正常的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 服务器端流式 RPC,其中客户端向服务器发送请求,并获得一个流来读取一系列消息。客户端从返回的流中读取,直到没有更多消息。正如您在我们的示例中看到的那样,通过在响应类型之前放置 stream 关键字来指定服务器端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 客户端流式 RPC,其中客户端写入一系列消息并将它们发送到服务器,同样使用提供的流。一旦客户端完成写入消息,它就会等待服务器读取所有消息并返回其响应。通过在请求类型之前放置 stream 关键字来指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 双向流式 RPC 允许双方使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按照他们喜欢的任何顺序进行读取和写入:例如,服务器可以等待接收所有客户端消息后再写入其响应,或者它可以交替读取消息然后写入消息,或者其他读取和写入的组合。每个流中消息的顺序都会被保留。您可以通过在请求和响应之前都放置 stream 关键字来指定这种类型的方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

我们的 .proto 文件还包含了服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义 - 例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端代码

接下来,我们需要从我们的 .proto 服务定义生成 gRPC 客户端接口。我们使用协议缓冲区编译器 protoc 和一个特殊的 gRPC Java 插件来完成此操作。您需要使用 proto3 编译器(它支持 proto2 和 proto3 语法)来生成 gRPC 服务。

此示例的构建系统也是 Java-gRPC 构建的一部分。请参考 grpc-java READMEbuild.gradle 以了解如何从您自己的 .proto 文件生成代码。请注意,对于 Android,我们将使用针对移动用例优化的 protobuf lite。

以下类是从我们的服务定义生成的

  • Feature.java, Point.java, Rectangle.java 以及其他包含所有用于填充、序列化和检索我们的请求和响应消息类型的协议缓冲区代码的类。
  • RouteGuideGrpc.java 包含(以及其他一些有用的代码)
    • 一个供 RouteGuide 服务器实现的基类,RouteGuideGrpc.RouteGuideImplBase,其中包含 RouteGuide 服务中定义的所有方法。
    • 客户端可以用来与 RouteGuide 服务器通信的存根类。

创建客户端

在本节中,我们将研究如何为我们的 RouteGuide 服务创建一个 Java 客户端。您可以在 routeguide/app/src/main/java/io/grpc/routeguideexample/RouteGuideActivity.java 中查看我们完整的客户端示例代码。

创建存根

要调用服务方法,我们首先需要创建一个存根,或者更确切地说,是两个存根

  • 一个阻塞/同步存根:这意味着 RPC 调用会等待服务器响应,并且会返回响应或引发异常。
  • 一个非阻塞/异步存根,它对服务器进行非阻塞调用,其中响应是异步返回的。您只能使用异步存根进行某些类型的流式调用。

首先,我们需要为我们的存根创建一个 gRPC 通道,指定我们要连接的服务器地址和端口:我们使用 ManagedChannelBuilder 来创建通道。

mChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();

现在我们可以使用通道,利用从我们的 .proto 生成的 RouteGuideGrpc 类中提供的 newStubnewBlockingStub 方法来创建我们的存根。

blockingStub = RouteGuideGrpc.newBlockingStub(mChannel);
asyncStub = RouteGuideGrpc.newStub(mChannel);

调用服务方法

现在让我们看看如何调用我们的服务方法。

简单 RPC

在阻塞存根上调用简单的 RPC GetFeature 就像调用本地方法一样简单。

Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature = blockingStub.getFeature(request);

我们创建一个并填充一个请求协议缓冲区对象(在我们的例子中是 Point),将其传递给我们的阻塞存根上的 getFeature() 方法,并得到一个 Feature

服务器端流式 RPC

接下来,让我们看看服务器端流式调用 ListFeatures,它返回一个地理 Feature 的流

Rectangle request =
    Rectangle.newBuilder()
        .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features = blockingStub.listFeatures(request);

正如你所看到的,它与我们刚才看到的简单 RPC 非常相似,只不过该方法不是返回单个 Feature,而是返回一个 Iterator,客户端可以使用它来读取所有返回的 Feature

客户端流式 RPC

现在来看一些更复杂的内容:客户端流式方法 RecordRoute,我们将 Point 的流发送到服务器并返回单个 RouteSummary。对于此方法,我们需要使用异步存根。如果你已经阅读了创建服务器,那么其中一些内容可能看起来非常熟悉 - 异步流式 RPC 在双方以类似的方式实现。

private String recordRoute(List<Point> points, int numPoints, RouteGuideStub asyncStub)
        throws InterruptedException, RuntimeException {
    final StringBuffer logs = new StringBuffer();
    appendLogs(logs, "*** RecordRoute");

    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
        @Override
        public void onNext(RouteSummary summary) {
            appendLogs(logs, "Finished trip with {0} points. Passed {1} features. "
                    + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
                    summary.getFeatureCount(), summary.getDistance(),
                    summary.getElapsedTime());
        }

        @Override
        public void onError(Throwable t) {
            failed = t;
            finishLatch.countDown();
        }

        @Override
        public void onCompleted() {
            appendLogs(logs, "Finished RecordRoute");
            finishLatch.countDown();
        }
    };

    StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
    try {
        // Send numPoints points randomly selected from the points list.
        Random rand = new Random();
        for (int i = 0; i < numPoints; ++i) {
            int index = rand.nextInt(points.size());
            Point point = points.get(index);
            appendLogs(logs, "Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
                    RouteGuideUtil.getLongitude(point));
            requestObserver.onNext(point);
            // Sleep for a bit before sending the next one.
            Thread.sleep(rand.nextInt(1000) + 500);
            if (finishLatch.getCount() == 0) {
                // RPC completed or errored before we finished sending.
                // Sending further requests won't error, but they will just be thrown away.
                break;
            }
        }
    } catch (RuntimeException e) {
        // Cancel RPC
        requestObserver.onError(e);
        throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // Receiving happens asynchronously
    if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        throw new RuntimeException(
               "Could not finish rpc within 1 minute, the server is likely down");
    }

    if (failed != null) {
        throw new RuntimeException(failed);
    }
    return logs.toString();
}

正如您所见,要调用此方法,我们需要创建一个 StreamObserver,它实现了一个特殊的接口,供服务器使用其 RouteSummary 响应进行调用。在我们的 StreamObserver 中,我们

  • 重写 onNext() 方法,以便在服务器将 RouteSummary 写入消息流时打印出返回的信息。
  • 重写 onCompleted() 方法(当服务器在其端完成调用时调用),以设置一个 SettableFuture,我们可以检查该 SettableFuture 以查看服务器是否已完成写入。

然后我们将 StreamObserver 传递给异步存根的 recordRoute() 方法,并取回我们自己的 StreamObserver 请求观察器,以便将我们的 Point 写入发送给服务器。一旦我们完成写入点,我们就使用请求观察器的 onCompleted() 方法来告诉 gRPC 我们已经完成在客户端的写入。一旦我们完成,我们就检查我们的 SettableFuture 以检查服务器是否已在其端完成。

双向流式 RPC

最后,让我们看一下我们的双向流式 RPC RouteChat()

private String routeChat(RouteGuideStub asyncStub) throws InterruptedException,
        RuntimeException {
    final StringBuffer logs = new StringBuffer();
    appendLogs(logs, "*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
            asyncStub.routeChat(new StreamObserver<RouteNote>() {
                @Override
                public void onNext(RouteNote note) {
                    appendLogs(logs, "Got message \"{0}\" at {1}, {2}", note.getMessage(),
                            note.getLocation().getLatitude(),
                            note.getLocation().getLongitude());
                }

                @Override
                public void onError(Throwable t) {
                    failed = t;
                    finishLatch.countDown();
                }

                @Override
                public void onCompleted() {
                    appendLogs(logs,"Finished RouteChat");
                    finishLatch.countDown();
                }
            });

    try {
        RouteNote[] requests =
                {newNote("First message", 0, 0), newNote("Second message", 0, 1),
                        newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

        for (RouteNote request : requests) {
            appendLogs(logs, "Sending message \"{0}\" at {1}, {2}", request.getMessage(),
                    request.getLocation().getLatitude(),
                    request.getLocation().getLongitude());
            requestObserver.onNext(request);
        }
    } catch (RuntimeException e) {
        // Cancel RPC
        requestObserver.onError(e);
        throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // Receiving happens asynchronously
    if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        throw new RuntimeException(
                "Could not finish rpc within 1 minute, the server is likely down");
    }

    if (failed != null) {
        throw new RuntimeException(failed);
    }

    return logs.toString();
}

与我们的客户端流式示例一样,我们既获得又返回一个 StreamObserver 响应观察器,只不过这次我们在服务器仍在向他们的消息流写入消息时,通过我们方法中的响应观察器发送值。此处读取和写入的语法与我们的客户端流式方法完全相同。尽管每一方都会始终按照消息的写入顺序接收另一方的消息,但客户端和服务器都可以按照任何顺序进行读取和写入 — 流是完全独立运行的。

尝试一下!

请按照 示例目录 README 中的说明构建并运行客户端和服务器。

上次修改时间 2024 年 11 月 25 日:feat: move the $ shell line indicator to scss (#1354) (ab8b3af)