基础教程

gRPC Java 基础教程介绍。

基础教程

gRPC Java 基础教程介绍。

本教程为 Java 程序员提供了一个使用 gRPC 的基础介绍。

通过学习本示例,您将了解如何

  • .proto 文件中定义服务。
  • 使用 protocol buffer 编译器生成服务端和客户端代码。
  • 使用 Java gRPC API 为您的服务编写一个简单的客户端和服务端。

本教程假设您已阅读过 gRPC 介绍 并熟悉 protocol buffers。请注意,本教程中的示例使用了 protocol buffers 语言的 proto3 版本:您可以在 proto3 语言指南Java 生成代码指南 中了解更多信息。

为什么使用 gRPC?

我们的示例是一个简单的路线映射应用程序,它允许客户端获取其路线上的特征信息、创建其路线摘要,并与服务端及其他客户端交换交通更新等路线信息。

使用 gRPC,我们可以在 .proto 文件中一次性定义服务,然后在 gRPC 支持的任何语言中生成客户端和服务端。这些客户端和服务端可以在从大型数据中心内的服务器到您自己的平板电脑等各种环境中运行——gRPC 为您处理了不同语言和环境之间通信的所有复杂性。我们还可以获得使用 protocol buffers 的所有优势,包括高效的序列化、简单的 IDL 以及易于接口更新。

示例代码和设置

本教程的示例代码位于 grpc/grpc-java/examples/src/main/java/io/grpc/examples/routeguide 中。要下载示例,请运行以下命令克隆 grpc-java 仓库的最新版本

git clone -b v1.72.0 --depth 1 https://github.com/grpc/grpc-java

然后将当前目录更改为 grpc-java/examples

cd grpc-java/examples

定义服务

我们的第一步(正如您在 gRPC 介绍 中了解到的)是使用 protocol buffers 定义 gRPC 服务以及方法 请求响应 类型。您可以在 grpc-java/examples/src/main/proto/route_guide.proto 中看到完整的 .proto 文件。

在本示例中,由于我们要生成 Java 代码,因此我们在 .proto 文件中指定了一个 java_package 文件选项

option java_package = "io.grpc.examples.routeguide";

这指定了我们为生成的 Java 类使用的包。如果在 .proto 文件中未给出显式的 java_package 选项,则默认情况下将使用 proto 包(使用“package”关键字指定)。但是,proto 包通常不是好的 Java 包,因为 proto 包不期望以反向域名开头。如果我们从此 .proto 文件生成其他语言的代码,java_package 选项将无效。

要定义服务,我们在 .proto 文件中指定一个命名的 service

service RouteGuide {
   ...
}

然后我们在服务定义内部定义 rpc 方法,指定它们的请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide 服务中使用

  • 一种简单的 RPC,客户端使用 stub 向服务端发送请求并等待响应返回,就像普通的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 一种服务端流式 RPC,客户端向服务端发送请求,并获得一个流来读取一系列消息。客户端从返回的流中读取,直到没有更多消息为止。正如您在示例中看到的,您可以通过在响应类型前放置 stream 关键字来指定服务端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 一种客户端流式 RPC,客户端写入一系列消息并通过提供的流发送给服务端。一旦客户端完成消息写入,它就会等待服务端读取所有消息并返回其响应。您可以通过在请求类型前放置 stream 关键字来指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 一种双向流式 RPC,双方使用读写流发送一系列消息。两个流独立运行,因此客户端和服务端可以按任何顺序读写:例如,服务端可以等待接收完所有客户端消息后再写入响应,或者可以交替地读取一条消息再写入一条消息,或采用读写的其他组合方式。每个流中消息的顺序得到保留。您可以通过在请求和响应前都放置 stream 关键字来指定此类方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

我们的 .proto 文件还包含服务方法中使用的所有请求和响应类型的 protocol buffer 消息类型定义 - 例如,这里是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务端代码

接下来,我们需要根据 .proto 服务定义生成 gRPC 客户端和服务端接口。我们使用 protocol buffer 编译器 protoc 和一个特殊的 gRPC Java 插件来完成此操作。您需要使用 proto3 编译器(它支持 proto2 和 proto3 语法)来生成 gRPC 服务。

使用 Gradle 或 Maven 时,protoc 构建插件可以在构建过程中生成必要的代码。您可以参考 grpc-java README 了解如何从您自己的 .proto 文件生成代码。

根据我们的服务定义生成以下类

  • Feature.javaPoint.javaRectangle.java 等,其中包含所有 protocol buffer 代码,用于填充、序列化和检索我们的请求和响应消息类型。
  • RouteGuideGrpc.java,其中包含(以及其他一些有用的代码)
    • 一个供 RouteGuide 服务端实现的基类,RouteGuideGrpc.RouteGuideImplBase,包含 RouteGuide 服务中定义的所有方法。
    • 客户端可以用来与 RouteGuide 服务端通信的 stub 类。

创建服务端

首先,我们来看看如何创建 RouteGuide 服务端。如果您只对创建 gRPC 客户端感兴趣,可以跳过本节,直接前往创建客户端(不过您可能仍然觉得它有趣!)。

实现我们的 RouteGuide 服务功能包含两个部分

  • 覆盖根据我们的服务定义生成的服务基类:执行我们服务的实际“工作”。
  • 运行 gRPC 服务端,监听来自客户端的请求并返回服务响应。

您可以在 grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java 中找到我们的 RouteGuide 服务端示例。让我们仔细看看它是如何工作的。

实现 RouteGuide

如您所见,我们的服务端有一个 RouteGuideService 类,它扩展了生成的抽象类 RouteGuideGrpc.RouteGuideImplBase

private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
...
}
简单的 RPC

RouteGuideService 实现了我们所有的服务方法。我们先看看最简单的方法 GetFeature(),它只是从客户端获取一个 Point,并从其数据库中返回相应的特征信息(以 Feature 形式)。

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  responseObserver.onNext(checkFeature(request));
  responseObserver.onCompleted();
}

...

private Feature checkFeature(Point location) {
  for (Feature feature : features) {
    if (feature.getLocation().getLatitude() == location.getLatitude()
        && feature.getLocation().getLongitude() == location.getLongitude()) {
      return feature;
    }
  }

  // No feature was found, return an unnamed feature.
  return Feature.newBuilder().setName("").setLocation(location).build();
}

getFeature() 方法接受两个参数

  • Point:请求
  • StreamObserver<Feature>:响应观察者,这是服务端用来调用并返回响应的特殊接口。

要将响应返回给客户端并完成调用,请执行以下操作:

  1. 我们构造并填充一个 Feature 响应对象返回给客户端,如我们的服务定义中所指定。在此示例中,我们在一个单独的私有方法 checkFeature() 中执行此操作。
  2. 我们使用响应观察者的 onNext() 方法返回 Feature
  3. 我们使用响应观察者的 onCompleted() 方法来指明我们已经处理完 RPC。
服务端流式 RPC

接下来我们看看其中一个流式 RPC。ListFeatures 是一个服务端流式 RPC,因此我们需要向客户端发送多个 Feature

private final Collection<Feature> features;

...

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

与简单的 RPC 一样,此方法获取一个请求对象(客户端想在其中找到 FeatureRectangle)和一个 StreamObserver 响应观察者。

这次,我们获取需要返回给客户端的尽可能多的 Feature 对象(在本例中,我们根据它们是否在我们请求的 Rectangle 内部,从服务的特征集合中选择它们),然后使用响应观察者的 onNext() 方法依次将它们写入。最后,与简单 RPC 一样,我们使用响应观察者的 onCompleted() 方法告知 gRPC 我们已完成响应的写入。

客户端流式 RPC

现在我们来看看稍微复杂一些的方法:客户端流式方法 RecordRoute(),它从客户端获取一个 Point 流,并返回一个包含其行程信息的单个 RouteSummary

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

如您所见,与之前的方法类型一样,我们的方法接受一个 StreamObserver 响应观察者参数,但这次它返回一个供客户端写入其 PointStreamObserver

在方法体中,我们实例化一个匿名 StreamObserver 来返回,在该观察者中,我们

  • 覆盖 onNext() 方法,以便每次客户端向消息流写入一个 Point 时,获取特征及其他信息。
  • 覆盖 onCompleted() 方法(在客户端完成消息写入时调用),以填充和构建我们的 RouteSummary。然后,我们调用我们方法自身的响应观察者的 onNext() 方法,传入我们的 RouteSummary,然后调用其 onCompleted() 方法,从而从服务端完成调用。
双向流式 RPC

最后,我们来看看我们的双向流式 RPC RouteChat()

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

与我们的客户端流式示例一样,我们既接收又返回一个 StreamObserver 响应观察者,不同之处在于这次我们在客户端仍在向消息流写入消息时,通过我们方法的响应观察者返回值。这里的读写语法与我们的客户端流式和服务端流式方法完全相同。尽管每一方总是会按照消息写入的顺序接收对方的消息,但客户端和服务端都可以按任何顺序读写——流完全独立运行。

启动服务端

实现所有方法后,我们还需要启动一个 gRPC 服务端,以便客户端能够实际使用我们的服务。以下代码片段展示了如何为我们的 RouteGuide 服务执行此操作

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
...
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
 ...
}

如您所见,我们使用 ServerBuilder 构建并启动我们的服务端。

为此,我们需要

  1. 使用构建器的 forPort() 方法指定要用来监听客户端请求的地址和端口。
  2. 创建我们的服务实现类 RouteGuideService 的实例,并将其传递给构建器的 addService() 方法。
  3. 在构建器上调用 build()start() 来为我们的服务创建并启动一个 RPC 服务器。

创建客户端

在本节中,我们将介绍如何创建 RouteGuide 服务的客户端。您可以在 grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java 中查看我们的完整示例客户端代码。

实例化 stub

要调用服务方法,我们首先需要创建一个 存根 (stub),或者更准确地说,是两个存根。

  • 一个 阻塞式/同步式 存根:这意味着 RPC 调用会等待服务器响应,要么返回响应,要么抛出异常。
  • 一个 非阻塞式/异步式 存根,它向服务器发起非阻塞调用,响应会异步返回。某些类型的流式调用只能使用异步存根进行。

首先我们需要为我们的存根创建一个 gRPC 通道 (channel),指定我们要连接的服务器地址和端口。

public RouteGuideClient(String host, int port) {
  this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
}

/** Construct client for accessing RouteGuide server using the existing channel. */
public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
  channel = channelBuilder.build();
  blockingStub = RouteGuideGrpc.newBlockingStub(channel);
  asyncStub = RouteGuideGrpc.newStub(channel);
}

我们使用 ManagedChannelBuilder 来创建通道。

现在我们可以使用通道来创建我们的存根,利用我们从 `.proto` 文件生成的 RouteGuideGrpc 类中提供的 newStubnewBlockingStub 方法。

blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);

调用服务方法

现在让我们看看如何调用我们的服务方法。

简单的 RPC

在阻塞式存根上调用简单的 RPC GetFeature 就像调用本地方法一样直接。

Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature;
try {
  feature = blockingStub.getFeature(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

我们创建并填充一个请求协议缓冲区对象(在本例中是 Point),将其传递给阻塞式存根上的 getFeature() 方法,然后获得一个 Feature

如果发生错误,它会被编码为 Status 对象,我们可以从 StatusRuntimeException 中获取它。

服务端流式 RPC

接下来,我们来看看服务器端流式调用 ListFeatures,它返回一个地理 Feature 流。

Rectangle request =
    Rectangle.newBuilder()
        .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

如您所见,这与我们刚才介绍的简单 RPC 非常相似,不同之处在于该方法不是返回单个 Feature,而是返回一个 Iterator,客户端可以使用它来读取所有返回的 Feature

客户端流式 RPC

现在来看一些更复杂的内容:客户端流式方法 RecordRoute,我们向服务器发送一个 Point 流,并获得一个 RouteSummary。对于此方法,我们需要使用异步存根。如果您已经阅读了 创建服务器,其中的一些内容可能看起来非常熟悉——异步流式 RPC 在客户端和服务器端的实现方式类似。

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

如您所见,要调用此方法,我们需要创建一个 StreamObserver,它实现了一个特殊的接口,供服务器用于发送 RouteSummary 响应。在我们的 StreamObserver 中,我们

  • 覆盖 onNext() 方法,以便在服务器将 RouteSummary 写入消息流时打印出返回的信息。
  • 覆盖 onCompleted() 方法(在 服务器 端完成调用时调用),以递减一个 CountDownLatch,我们可以通过它来检查服务器是否已完成写入。

然后我们将 StreamObserver 传递给异步存根的 recordRoute() 方法,并获得我们自己的 StreamObserver 请求观察者,用于写入要发送到服务器的 Point。一旦我们完成写入点,我们就使用请求观察者的 onCompleted() 方法来告诉 gRPC 我们已在客户端完成写入。完成后,我们检查 CountDownLatch 以确认服务器端已完成。

双向流式 RPC

最后,我们来看看我们的双向流式 RPC RouteChat()

public void routeChat() throws Exception {
  info("*** RoutChat");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteNote> requestObserver =
      asyncStub.routeChat(new StreamObserver<RouteNote>() {
        @Override
        public void onNext(RouteNote note) {
          info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
              .getLatitude(), note.getLocation().getLongitude());
        }

        @Override
        public void onError(Throwable t) {
          Status status = Status.fromThrowable(t);
          logger.log(Level.WARNING, "RouteChat Failed: {0}", status);
          finishLatch.countDown();
        }

        @Override
        public void onCompleted() {
          info("Finished RouteChat");
          finishLatch.countDown();
        }
      });

  try {
    RouteNote[] requests =
        {newNote("First message", 0, 0), newNote("Second message", 0, 1),
            newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

    for (RouteNote request : requests) {
      info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
          .getLatitude(), request.getLocation().getLongitude());
      requestObserver.onNext(request);
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

与我们的客户端流式示例一样,我们既获取也返回一个 StreamObserver 响应观察者,不同之处在于这次我们在服务器仍在向其消息流写入消息时,通过我们方法的响应观察者发送值。这里的读写语法与我们的客户端流式方法完全相同。尽管每一方总是会按消息写入的顺序收到另一方的消息,但客户端和服务器可以按任何顺序读写——流是完全独立运行的。

试试看!

按照 示例目录 README 中的说明构建并运行客户端和服务器。