基础教程

Dart 中 gRPC 的基础教程介绍。

基础教程

Dart 中 gRPC 的基础教程介绍。

本教程为 Dart 程序员提供了使用 gRPC 的基础介绍。

通过学习本示例,您将了解如何

  • .proto 文件中定义服务。
  • 使用协议缓冲区编译器生成服务器和客户端代码。
  • 使用 Dart gRPC API 为您的服务编写一个简单的客户端和服务器。

本文假设您已阅读 gRPC 简介 并熟悉 协议缓冲区。请注意,本教程中的示例使用了协议缓冲区语言的 proto3 版本:您可以在 proto3 语言指南 中了解更多信息。

为何使用 gRPC?

我们的示例是一个简单的路线映射应用程序,它允许客户端获取其路线上的特征信息、创建路线摘要以及与服务器和其他客户端交换交通更新等路线信息。

使用 gRPC,我们可以在 .proto 文件中定义服务一次,然后使用 gRPC 支持的任何语言生成客户端和服务器,这些客户端和服务器可以在从大型数据中心内的服务器到您自己的平板电脑等各种环境中运行 — gRPC 为您处理了不同语言和环境之间通信的所有复杂性。我们还获得了使用协议缓冲区的所有优势,包括高效序列化、简单的 IDL 和方便的接口更新。

示例代码和设置

本教程的示例代码位于 grpc/grpc-dart/example/route_guide 中。要下载示例,请运行以下命令克隆 grpc-dart 仓库

git clone --depth 1 https://github.com/grpc/grpc-dart

然后将当前目录更改为 grpc-dart/example/route_guide

cd grpc-dart/example/route_guide

您应该已经安装了生成客户端和服务端接口代码所需的工具 — 如果还没有,请参阅快速入门以获取设置说明。

定义服务

我们的第一步(正如您从gRPC 简介 中了解的那样)是使用 协议缓冲区 定义 gRPC 服务以及方法 请求响应 类型。您可以在 example/route_guide/protos/route_guide.proto 中看到完整的 .proto 文件。

要定义服务,请在您的 .proto 文件中指定一个名为 service 的内容

service RouteGuide {
   ...
}

然后,您可以在服务定义中定义 rpc 方法,指定它们的请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都用于 RouteGuide 服务中

  • 一种 简单 RPC,其中客户端使用 stub 向服务器发送请求并等待响应返回,就像普通的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 一种 服务器端流式 RPC,其中客户端向服务器发送请求并获取一个流来读取一系列消息。客户端从返回的流中读取消息,直到没有更多消息为止。如您在示例中看到的,您可以通过将 stream 关键字放在 响应 类型之前来指定服务器端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 一种 客户端流式 RPC,其中客户端写入一系列消息并将其发送到服务器,同样使用提供的流。一旦客户端完成消息写入,它会等待服务器读取所有消息并返回其响应。您可以通过将 stream 关键字放在 请求 类型之前来指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 一种 双向流式 RPC,其中双方都使用读写流发送一系列消息。两个流独立运行,因此客户端和服务器可以按任意顺序读写:例如,服务器可以等待接收所有客户端消息后再写入响应,或者它可以交替读取一条消息然后写入一条消息,或者读写的其他组合。每个流中的消息顺序得到保留。您可以通过将 stream 关键字放在请求和响应之前来指定这种类型的方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

我们的 .proto 文件还包含用于我们服务方法中所有请求和响应类型的协议缓冲区消息类型定义 — 例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务端代码

接下来,我们需要从 .proto 服务定义中生成 gRPC 客户端和服务端接口。我们使用协议缓冲区编译器 protoc 和一个特殊的 Dart 插件来完成此操作。这与我们在快速入门中执行的操作类似。

route_guide 示例目录中运行

protoc -I protos/ protos/route_guide.proto --dart_out=grpc:lib/src/generated

运行此命令会在 route_guide 示例目录下的 lib/src/generated 目录中生成以下文件

  • route_guide.pb.dart
  • route_guide.pbenum.dart
  • route_guide.pbgrpc.dart
  • route_guide.pbjson.dart

这包含

  • 填充、序列化和检索我们的请求和响应消息类型的所有协议缓冲区代码
  • 一个接口类型(或 stub),供客户端调用 RouteGuide 服务中定义的方法。
  • 一个供服务器实现的接口类型,也包含 RouteGuide 服务中定义的方法。

创建服务器

首先,让我们看看如何创建一个 RouteGuide 服务器。如果您只对创建 gRPC 客户端感兴趣,可以跳过此部分并直接转到创建客户端(尽管您可能仍然觉得它很有趣!)。

要让我们的 RouteGuide 服务正常工作,需要两部分

  • 实现从服务定义生成的服务接口:执行我们服务的实际“工作”。
  • 运行 gRPC 服务器来侦听客户端请求并将其分派到正确的服务实现。

您可以在 grpc-dart/example/route_guide/lib/src/server.dart 中找到我们的示例 RouteGuide 服务器。让我们仔细看看它是如何工作的。

实现 RouteGuide

如您所见,我们的服务器有一个 RouteGuideService 类,它扩展了生成的抽象类 RouteGuideServiceBase

class RouteGuideService extends RouteGuideServiceBase {
  Future<Feature> getFeature(grpc.ServiceCall call, Point request) async {
    ...
  }

  Stream<Feature> listFeatures(
      grpc.ServiceCall call, Rectangle request) async* {
    ...
  }

  Future<RouteSummary> recordRoute(
      grpc.ServiceCall call, Stream<Point> request) async {
    ...
  }

  Stream<RouteNote> routeChat(
      grpc.ServiceCall call, Stream<RouteNote> request) async* {
    ...
  }

  ...
}
简单 RPC

RouteGuideService 实现了我们所有的服务方法。我们先来看看最简单的类型,GetFeature,它只是从客户端获取一个 Point,并从其数据库中返回相应的特征信息,作为 Feature

/// GetFeature handler. Returns a feature for the given location.
/// The [context] object provides access to client metadata, cancellation, etc.
@override
Future<Feature> getFeature(grpc.ServiceCall call, Point request) async {
  return featuresDb.firstWhere((f) => f.location == request,
      orElse: () => Feature()..location = request);
}

该方法被传递一个用于 RPC 的上下文对象和客户端的 Point 协议缓冲区请求。它返回一个包含响应信息的 Feature 协议缓冲区对象。在该方法中,我们用适当的信息填充 Feature,然后将其 return 给 gRPC 框架,该框架会将其发送回客户端。

服务器端流式 RPC

现在我们来看看我们的一个流式 RPC。ListFeatures 是一个服务器端流式 RPC,因此我们需要向客户端发送回多个 Feature

/// ListFeatures handler. Returns a stream of features within the given
/// rectangle.
@override
Stream<Feature> listFeatures(
    grpc.ServiceCall call, Rectangle request) async* {
  final normalizedRectangle = _normalize(request);
  // For each feature, check if it is in the given bounding box
  for (var feature in featuresDb) {
    if (feature.name.isEmpty) continue;
    final location = feature.location;
    if (_contains(normalizedRectangle, location)) {
      yield feature;
    }
  }
}

如您所见,这次我们不再在方法中获取和返回简单的请求和响应对象,而是获取一个请求对象(客户端希望在其中查找 FeatureRectangle),并返回一个 Feature 对象的 Stream

在该方法中,我们根据需要填充任意数量的 Feature 对象,并使用 yield 将它们添加到返回的流中。当方法返回时,流会自动关闭,告诉 gRPC 我们已完成响应写入。

如果在此调用中发生任何错误,该错误将作为异常添加到流中,并且 gRPC 层会将其转换为适当的 RPC 状态,并通过网络发送。

客户端流式 RPC

现在我们来看一个稍微复杂一点的:客户端流式方法 RecordRoute,其中我们从客户端获取一个 Point 流,并返回一个包含其行程信息的单个 RouteSummary。如您所见,这次请求参数是一个流,服务器可以使用它从客户端读取请求消息。服务器返回其单个响应,就像简单 RPC 的情况一样。

/// RecordRoute handler. Gets a stream of points, and responds with statistics
/// about the "trip": number of points, number of known features visited,
/// total distance traveled, and total time spent.
@override
Future<RouteSummary> recordRoute(
    grpc.ServiceCall call, Stream<Point> request) async {
  int pointCount = 0;
  int featureCount = 0;
  double distance = 0.0;
  Point previous;
  final timer = Stopwatch();

  await for (var location in request) {
    if (!timer.isRunning) timer.start();
    pointCount++;
    final feature = featuresDb.firstWhereOrNull((f) => f.location == location);
    if (feature != null) {
      featureCount++;
    }
    // For each point after the first, add the incremental distance from the
    // previous point to the total distance value.
    if (previous != null) distance += _distance(previous, location);
    previous = location;
  }
  timer.stop();
  return RouteSummary()
    ..pointCount = pointCount
    ..featureCount = featureCount
    ..distance = distance.round()
    ..elapsedTime = timer.elapsed.inSeconds;
}

在方法体中,我们在请求流中使用 await for 重复读取客户端的请求(在本例中为 Point 对象),直到没有更多消息为止。请求流完成后,服务器可以返回其 RouteSummary

双向流式 RPC

最后,我们来看看我们的双向流式 RPC RouteChat()

/// RouteChat handler. Receives a stream of message/location pairs, and
/// responds with a stream of all previous messages at each of those
/// locations.
@override
Stream<RouteNote> routeChat(
    grpc.ServiceCall call, Stream<RouteNote> request) async* {
  await for (var note in request) {
    final notes = routeNotes.putIfAbsent(note.location, () => <RouteNote>[]);
    for (var note in notes) yield note;
    notes.add(note);
  }
}

这次我们获取一个 RouteNote 流,正如在我们的客户端流式示例中一样,它可以用于读取消息。然而,这次我们在客户端仍在向 他们的 消息流写入消息的同时,通过我们方法返回的流返回值。

这里的读写语法与我们的客户端流式和服务器端流式方法相同。尽管每一方总是会按照消息写入的顺序接收对方的消息,但客户端和服务器都可以按任意顺序读写 — 这些流完全独立运行。

启动服务器

一旦我们实现了所有方法,我们还需要启动一个 gRPC 服务器,以便客户端能够实际使用我们的服务。以下代码片段展示了如何为我们的 RouteGuide 服务执行此操作

Future<void> main(List<String> args) async {
  final server = grpc.Server.create([RouteGuideService()]);
  await server.serve(port: 8080);
  print('Server listening...');
}

要构建和启动服务器,我们

  1. 使用 grpc.Server.create() 创建 gRPC 服务器实例,并提供服务实现的列表。
  2. 在服务器上调用 serve() 以开始侦听请求,可以选择传入要侦听的地址和端口。服务器将继续异步处理请求,直到在其上调用 shutdown() 为止。

创建客户端

在本节中,我们将介绍如何为我们的 RouteGuide 服务创建一个 Dart 客户端。完整的客户端代码可在 grpc-dart/example/route_guide/lib/src/client.dart 获取。

创建 stub

要调用服务方法,我们首先需要创建一个 gRPC 通道 来与服务器通信。我们通过将服务器地址和端口号传递给 ClientChannel() 来创建它,如下所示

final channel = ClientChannel('127.0.0.1',
    port: 8080,
    options: const ChannelOptions(
        credentials: ChannelCredentials.insecure()));

如有必要,您可以使用 ChannelOptions 为通道设置 TLS 选项(例如,受信任的证书)。

gRPC 通道 设置好后,我们需要一个客户端 stub 来执行 RPC。我们通过实例化 RouteGuideClient 来获得它,该类由示例 .proto 文件生成的包提供。

stub = RouteGuideClient(channel,
    options: CallOptions(timeout: Duration(seconds: 30)));

当服务需要时,您可以使用 CallOptions 设置身份验证凭据(例如,GCE 凭据或 JWT 凭据)。RouteGuide 服务不需要任何凭据。

调用服务方法

现在我们来看看如何调用我们的服务方法。请注意,在 gRPC-Dart 中,RPC 总是异步的,这意味着 RPC 返回一个 FutureStream,必须对其进行监听才能从服务器获取响应或错误。

简单 RPC

调用简单 RPC GetFeature 几乎与调用本地方法一样直接。

final point = Point()
  ..latitude = 409146138
  ..longitude = -746188906;
final feature = await stub.getFeature(point));

如您所见,我们在之前获得的 stub 上调用该方法。在方法参数中,我们传递一个请求协议缓冲区对象(在本例中为 Point)。我们还可以传递一个可选的 CallOptions 对象,如有必要,它允许我们更改 RPC 的行为,例如超时。如果调用不返回错误,返回的 Future 会通过服务器的响应信息完成。如果发生错误,Future 将通过错误完成。

服务器端流式 RPC

这里是我们调用服务器端流式方法 ListFeatures 的地方,它返回一个地理 Feature 流。如果您已经阅读了创建服务器,其中一些内容可能看起来非常熟悉 — 流式 RPC 在两侧的实现方式类似。

final rect = Rectangle()...; // initialize a Rectangle

try {
  await for (var feature in stub.listFeatures(rect)) {
    print(feature);
  }
catch (e) {
  print('ERROR: $e');
}

与简单 RPC 一样,我们向方法传递一个请求。然而,这次我们不是获取一个 Future,而是获取一个 Stream。客户端可以使用该流读取服务器的响应。

我们在返回的流上使用 await for 重复读取服务器的响应,直到没有更多消息(在本例中为 Feature 对象)。

客户端流式 RPC

客户端流式方法 RecordRoute 与服务器端方法类似,不同之处在于我们向该方法传递一个 Stream 并获得一个 Future

final random = Random();

// Generate a number of random points
Stream<Point> generateRoute(int count) async* {
  for (int i = 0; i < count; i++) {
    final point = featuresDb[random.nextInt(featuresDb.length)].location;
    yield point;
  }
}

final pointCount = random.nextInt(100) + 2; // Traverse at least two points

final summary = await stub.recordRoute(generateRoute(pointCount));
print('Route summary: $summary');

由于 generateRoute() 方法是 async*,点将在 gRPC 侦听请求流并将点消息发送到服务器时生成。一旦流完成(当 generateRoute() 返回时),gRPC 就知道我们已完成写入并正在等待接收响应。返回的 Future 将要么通过从服务器接收到的 RouteSummary 消息完成,要么发生错误。

双向流式 RPC

最后,我们来看看我们的双向流式 RPC RouteChat()。与 RecordRoute 的情况一样,我们向该方法传递一个流,我们将在其中写入请求消息;并且像在 ListFeatures 中一样,我们获得一个可以用来读取响应消息的流。然而,这次我们将通过我们方法返回的流发送值,而服务器也在向 它们的 消息流写入消息。

Stream<RouteNote> outgoingNotes = ...;

final responses = stub.routeChat(outgoingNotes);
await for (var note in responses) {
  print('Got message ${note.message} at ${note.location.latitude}, ${note
      .location.longitude}');
}

这里的读写语法与我们的客户端流式和服务器端流式方法非常相似。尽管每一方总是会按照消息写入的顺序接收对方的消息,但客户端和服务器都可以按任意顺序读写 — 这些流完全独立运行。

试试看!

在示例目录中操作

cd example/route_guide

获取包

dart pub get

运行服务器

dart bin/server.dart

在另一个终端中,运行客户端

dart bin/client.dart

报告问题

如果您在使用 Dart gRPC 时遇到问题,请在我们的问题跟踪器中提交问题