基础教程

Dart 中 gRPC 的基本教程介绍。

基础教程

Dart 中 gRPC 的基本教程介绍。

本教程为 Dart 程序员提供了使用 gRPC 的基本入门指南。

通过学习此示例,你将了解如何

  • .proto 文件中定义服务。
  • 使用协议缓冲区编译器生成服务器和客户端代码。
  • 使用 Dart gRPC API 为你的服务编写一个简单的客户端和服务器。

它假定你已阅读 gRPC 简介 并熟悉 协议缓冲区。请注意,本教程中的示例使用协议缓冲区语言的 proto3 版本:你可以在 proto3 语言指南 中了解更多信息。

为什么要使用 gRPC?

我们的示例是一个简单的路线地图应用程序,它允许客户端获取有关其路线上功能的信息、创建其路线的摘要,以及与服务器和其他客户端交换路线信息(例如交通更新)。

使用 gRPC,我们可以在 .proto 文件中定义一次服务,并以任何 gRPC 支持的语言生成客户端和服务器,这些客户端和服务器又可以在从大型数据中心内的服务器到你自己的平板电脑等环境中运行 — gRPC 为你处理不同语言和环境之间通信的所有复杂性。我们还获得了使用协议缓冲区的所有优点,包括高效的序列化、简单的 IDL 和轻松的接口更新。

示例代码和设置

我们的教程示例代码位于 grpc/grpc-dart/example/route_guide。要下载该示例,请运行以下命令克隆 grpc-dart 存储库

git clone --depth 1 https://github.com/grpc/grpc-dart

然后将当前目录更改为 grpc-dart/example/route_guide

cd grpc-dart/example/route_guide

你应已安装生成客户端和服务器接口代码所需的工具 - 如果没有,请参阅 快速入门 获取设置说明。

定义服务

我们的第一步(正如你从 gRPC 简介 中了解的那样)是使用 协议缓冲区 定义 gRPC 服务 以及方法 请求响应 类型。你可以在 example/route_guide/protos/route_guide.proto 中查看完整的 .proto 文件。

要定义服务,请在 .proto 文件中指定一个名为 service 的服务

service RouteGuide {
   ...
}

然后,你在服务定义中定义 rpc 方法,指定它们的请求和响应类型。gRPC 允许你定义四种类型的服务方法,所有这些方法都在 RouteGuide 服务中使用

  • 一个简单 RPC,其中客户端使用存根向服务器发送请求,并等待响应返回,就像正常的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 一个服务器端流式 RPC,其中客户端向服务器发送请求,并获取一个流以读取一系列消息。客户端从返回的流中读取,直到没有更多消息为止。正如你在我们的示例中看到的那样,你通过在响应类型之前放置 stream 关键字来指定服务器端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 一个客户端流式 RPC,其中客户端写入一系列消息并将其发送到服务器,同样使用提供的流。客户端完成写入消息后,它会等待服务器读取所有消息并返回其响应。你通过在请求类型之前放置 stream 关键字来指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 一个双向流式 RPC,其中双方都使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按任意顺序读取和写入:例如,服务器可以等待接收所有客户端消息,然后再写入其响应,或者它可以交替读取消息然后写入消息,或者读取和写入的某些其他组合。每个流中消息的顺序都将保留。你通过在请求和响应之前都放置 stream 关键字来指定此类型的方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

我们的 .proto 文件还包含用于我们的服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义 - 例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务器代码

接下来,我们需要从我们的 .proto 服务定义中生成 gRPC 客户端和服务器接口。我们使用协议缓冲区编译器 protoc 和一个特殊的 Dart 插件来执行此操作。这与我们在 快速入门 中所做的类似。

route_guide 示例目录运行

protoc -I protos/ protos/route_guide.proto --dart_out=grpc:lib/src/generated

运行此命令会在 route_guide 示例目录下的 lib/src/generated 目录中生成以下文件

  • route_guide.pb.dart
  • route_guide.pbenum.dart
  • route_guide.pbgrpc.dart
  • route_guide.pbjson.dart

这包含

  • 所有用于填充、序列化和检索我们的请求和响应消息类型的协议缓冲区代码
  • 客户端用于调用在 RouteGuide 服务中定义的方法的接口类型(或存根)。
  • 服务器用于实现,同样包含在 RouteGuide 服务中定义的方法的接口类型。

创建服务器

首先,让我们看看如何创建一个 RouteGuide 服务器。如果您只对创建 gRPC 客户端感兴趣,您可以跳过本节,直接转到 创建客户端(尽管您可能也会觉得它很有趣!)。

要使我们的 RouteGuide 服务工作,需要完成两个部分

  • 实现从我们的服务定义生成的服务接口:执行我们服务的实际“工作”。
  • 运行一个 gRPC 服务器来监听来自客户端的请求,并将它们分派到正确的服务实现。

您可以在 grpc-dart/example/route_guide/lib/src/server.dart 中找到我们的示例 RouteGuide 服务器。让我们仔细看看它是如何工作的。

实现 RouteGuide

如您所见,我们的服务器有一个 RouteGuideService 类,它继承了生成的抽象 RouteGuideServiceBase

class RouteGuideService extends RouteGuideServiceBase {
  Future<Feature> getFeature(grpc.ServiceCall call, Point request) async {
    ...
  }

  Stream<Feature> listFeatures(
      grpc.ServiceCall call, Rectangle request) async* {
    ...
  }

  Future<RouteSummary> recordRoute(
      grpc.ServiceCall call, Stream<Point> request) async {
    ...
  }

  Stream<RouteNote> routeChat(
      grpc.ServiceCall call, Stream<RouteNote> request) async* {
    ...
  }

  ...
}
简单 RPC

RouteGuideService 实现了我们所有的服务方法。让我们先看看最简单的类型 GetFeature,它只是从客户端获取一个 Point,并从其数据库中返回相应的特征信息,类型为 Feature

/// GetFeature handler. Returns a feature for the given location.
/// The [context] object provides access to client metadata, cancellation, etc.
@override
Future<Feature> getFeature(grpc.ServiceCall call, Point request) async {
  return featuresDb.firstWhere((f) => f.location == request,
      orElse: () => Feature()..location = request);
}

该方法会传递一个 RPC 的上下文对象和客户端的 Point 协议缓冲区请求。它返回一个包含响应信息的 Feature 协议缓冲区对象。在该方法中,我们用适当的信息填充 Feature,然后将其 return 给 gRPC 框架,该框架将其发送回客户端。

服务器端流式 RPC

现在让我们看看我们的一个流式 RPC。ListFeatures 是一个服务器端流式 RPC,所以我们需要向我们的客户端发回多个 Feature

/// ListFeatures handler. Returns a stream of features within the given
/// rectangle.
@override
Stream<Feature> listFeatures(
    grpc.ServiceCall call, Rectangle request) async* {
  final normalizedRectangle = _normalize(request);
  // For each feature, check if it is in the given bounding box
  for (var feature in featuresDb) {
    if (feature.name.isEmpty) continue;
    final location = feature.location;
    if (_contains(normalizedRectangle, location)) {
      yield feature;
    }
  }
}

如您所见,这次我们在方法中获得一个请求对象(我们的客户端想要在其中找到 FeatureRectangle),而不是获取和返回简单的请求和响应对象,并返回一个 Feature 对象的 Stream

在该方法中,我们填充我们需要返回的尽可能多的 Feature 对象,使用 yield 将它们添加到返回的流中。当方法返回时,流会自动关闭,告诉 gRPC 我们已经完成写入响应。

如果在此调用中发生任何错误,该错误将作为异常添加到流中,并且 gRPC 层会将其转换为适当的 RPC 状态,以便在线路上发送。

客户端流式 RPC

现在让我们看看稍微复杂一点的内容:客户端流式方法 RecordRoute,我们从中获取一个来自客户端的 Point 流,并返回一个包含他们行程信息的单个 RouteSummary。如您所见,这次请求参数是一个流,服务器可以使用该流来读取客户端的请求消息。服务器像在简单 RPC 情况中一样返回其单个响应。

/// RecordRoute handler. Gets a stream of points, and responds with statistics
/// about the "trip": number of points, number of known features visited,
/// total distance traveled, and total time spent.
@override
Future<RouteSummary> recordRoute(
    grpc.ServiceCall call, Stream<Point> request) async {
  int pointCount = 0;
  int featureCount = 0;
  double distance = 0.0;
  Point previous;
  final timer = Stopwatch();

  await for (var location in request) {
    if (!timer.isRunning) timer.start();
    pointCount++;
    final feature = featuresDb.firstWhereOrNull((f) => f.location == location);
    if (feature != null) {
      featureCount++;
    }
    // For each point after the first, add the incremental distance from the
    // previous point to the total distance value.
    if (previous != null) distance += _distance(previous, location);
    previous = location;
  }
  timer.stop();
  return RouteSummary()
    ..pointCount = pointCount
    ..featureCount = featureCount
    ..distance = distance.round()
    ..elapsedTime = timer.elapsed.inSeconds;
}

在方法体中,我们在请求流中使用 await for 来重复读取客户端的请求(在本例中为 Point 对象),直到没有更多消息为止。请求流完成后,服务器可以返回其 RouteSummary

双向流式 RPC

最后,让我们看看我们的双向流式 RPC RouteChat()

/// RouteChat handler. Receives a stream of message/location pairs, and
/// responds with a stream of all previous messages at each of those
/// locations.
@override
Stream<RouteNote> routeChat(
    grpc.ServiceCall call, Stream<RouteNote> request) async* {
  await for (var note in request) {
    final notes = routeNotes.putIfAbsent(note.location, () => <RouteNote>[]);
    for (var note in notes) yield note;
    notes.add(note);
  }
}

这次我们得到一个 RouteNote 的流,它像在我们的客户端流式示例中一样,可以用来读取消息。但是,这次当客户端仍在向他们的消息流写入消息时,我们通过该方法返回的流返回值。

这里的读取和写入语法与我们的客户端流式和服务器流式方法相同。尽管每一方总是按照写入顺序获取另一方的消息,但客户端和服务器都可以按任何顺序读取和写入 —— 这些流是完全独立运行的。

启动服务器

一旦我们实现了所有的方法,我们还需要启动一个 gRPC 服务器,以便客户端可以实际使用我们的服务。以下代码片段显示了我们如何为我们的 RouteGuide 服务执行此操作

Future<void> main(List<String> args) async {
  final server = grpc.Server.create([RouteGuideService()]);
  await server.serve(port: 8080);
  print('Server listening...');
}

要构建并启动服务器,我们需要

  1. 使用 grpc.Server.create() 创建 gRPC 服务器的实例,并提供一个服务实现列表。
  2. 调用服务器上的 serve() 以开始监听请求,可以选择传入要监听的地址和端口。服务器将继续异步处理请求,直到在其上调用 shutdown() 为止。

创建客户端

在本节中,我们将研究如何为我们的 RouteGuide 服务创建一个 Dart 客户端。完整的客户端代码可从 grpc-dart/example/route_guide/lib/src/client.dart 获取。

创建存根

要调用服务方法,我们首先需要创建一个 gRPC 通道来与服务器通信。我们通过将服务器地址和端口号传递给 ClientChannel() 来创建它,如下所示

final channel = ClientChannel('127.0.0.1',
    port: 8080,
    options: const ChannelOptions(
        credentials: ChannelCredentials.insecure()));

如有必要,您可以使用 ChannelOptions 为通道设置 TLS 选项(例如,受信任的证书)。

一旦 gRPC 通道设置好,我们需要一个客户端存根来执行 RPC。我们通过实例化 RouteGuideClient 来获取它,该实例由从示例 .proto 文件生成的包提供。

stub = RouteGuideClient(channel,
    options: CallOptions(timeout: Duration(seconds: 30)));

当服务需要身份验证凭据时,您可以使用 CallOptions 设置身份验证凭据(例如,GCE 凭据或 JWT 凭据)。RouteGuide 服务不需要任何凭据。

调用服务方法

现在让我们看看如何调用我们的服务方法。请注意,在 gRPC-Dart 中,RPC 始终是异步的,这意味着 RPC 返回一个 FutureStream,必须监听它们才能从服务器获取响应或错误。

简单 RPC

调用简单的 RPC GetFeature 几乎与调用本地方法一样简单。

final point = Point()
  ..latitude = 409146138
  ..longitude = -746188906;
final feature = await stub.getFeature(point));

如您所见,我们在之前获取的存根上调用该方法。在我们的方法参数中,我们传递一个请求协议缓冲区对象(在本例中为 Point)。我们还可以传递一个可选的 CallOptions 对象,该对象允许我们在必要时更改 RPC 的行为,例如超时。如果调用没有返回错误,则返回的 Future 会完成,并包含来自服务器的响应信息。如果出现错误,Future 将完成并带有该错误。

服务器端流式 RPC

这是我们调用服务器端流式方法 ListFeatures 的地方,它返回一个地理 Feature 的流。如果您已经阅读了 创建服务器,那么其中一些可能看起来非常熟悉 - 流式 RPC 在两端的实现方式相似。

final rect = Rectangle()...; // initialize a Rectangle

try {
  await for (var feature in stub.listFeatures(rect)) {
    print(feature);
  }
catch (e) {
  print('ERROR: $e');
}

与简单的 RPC 一样,我们向该方法传递一个请求。但是,我们得到的不是 Future,而是 Stream。客户端可以使用该流来读取服务器的响应。

我们在返回的流上使用 await for 来重复读取服务器对响应协议缓冲区对象(在本例中为 Feature)的响应,直到没有更多消息为止。

客户端流式 RPC

客户端流式方法 RecordRoute 与服务器端方法类似,只是我们向该方法传递一个 Stream 并返回一个 Future

final random = Random();

// Generate a number of random points
Stream<Point> generateRoute(int count) async* {
  for (int i = 0; i < count; i++) {
    final point = featuresDb[random.nextInt(featuresDb.length)].location;
    yield point;
  }
}

final pointCount = random.nextInt(100) + 2; // Traverse at least two points

final summary = await stub.recordRoute(generateRoute(pointCount));
print('Route summary: $summary');

由于 generateRoute() 方法是 async*,因此当 gRPC 监听请求流并将点消息发送到服务器时,这些点将会被生成。一旦流完成(当 generateRoute() 返回时),gRPC 就会知道我们已经完成了写入,并正在等待接收响应。返回的 Future 将会完成,要么是收到来自服务器的 RouteSummary 消息,要么是一个错误。

双向流式 RPC

最后,让我们看一下我们的双向流式 RPC RouteChat()。就像 RecordRoute 的情况一样,我们向该方法传递一个流,我们将在其中写入请求消息,并且像在 ListFeatures 中一样,我们得到一个可以用来读取响应消息的流。然而,这次我们将在方法流中发送值,同时服务器也会向他们的消息流中写入消息。

Stream<RouteNote> outgoingNotes = ...;

final responses = stub.routeChat(outgoingNotes);
await for (var note in responses) {
  print('Got message ${note.message} at ${note.location.latitude}, ${note
      .location.longitude}');
}

此处读取和写入的语法与我们的客户端和服务端流式方法非常相似。尽管每一方总是按照消息被写入的顺序获取对方的消息,但客户端和服务器都可以以任何顺序读取和写入——流完全独立运行。

试一试!

从示例目录开始工作

cd example/route_guide

获取包

dart pub get

运行服务器

dart bin/server.dart

从不同的终端运行客户端

dart bin/client.dart

报告问题

如果您发现 Dart gRPC 存在问题,请在我们的问题跟踪器中提交一个 issue