基础教程

Dart 中 gRPC 的基本教程介绍。

基础教程

Dart 中 gRPC 的基本教程介绍。

本教程为 Dart 程序员提供了使用 gRPC 的基本介绍。

通过此示例,您将学习如何

  • .proto 文件中定义服务。
  • 使用协议缓冲区编译器生成服务器和客户端代码。
  • 使用 Dart gRPC API 为你的服务编写一个简单的客户端和服务器。

本教程假设你已阅读 gRPC 简介 并熟悉 协议缓冲区。请注意,本教程中的示例使用协议缓冲区语言的 proto3 版本:你可以在 proto3 语言指南 中了解更多信息。

为什么使用 gRPC?

我们的示例是一个简单的路线映射应用程序,它允许客户端获取其路线上特征的信息,创建其路线摘要,并与服务器及其他客户端交换路线信息(例如交通更新)。

使用 gRPC,我们可以在一个 .proto 文件中定义服务,并使用 gRPC 支持的任何语言生成客户端和服务器,这些客户端和服务器又可以在从大型数据中心内的服务器到你自己的平板电脑等各种环境中运行 — gRPC 为你处理了不同语言和环境之间通信的所有复杂性。我们还获得了使用协议缓冲区的所有优势,包括高效的序列化、简单的 IDL 和易于更新的接口。

示例代码和设置

我们教程的示例代码位于 grpc/grpc-dart/example/route_guide。要下载示例,请运行以下命令克隆 grpc-dart 仓库

git clone --depth 1 https://github.com/grpc/grpc-dart

然后将当前目录更改为 grpc-dart/example/route_guide

cd grpc-dart/example/route_guide

你应该已经安装了生成客户端和服务器接口代码所需的工具 — 如果还没有,请参阅快速入门获取设置说明。

定义服务

我们的第一步(正如你从 gRPC 简介 中所了解的)是使用 协议缓冲区 定义 gRPC 服务以及方法请求响应类型。你可以在 example/route_guide/protos/route_guide.proto 中查看完整的 .proto 文件。

要定义服务,请在 .proto 文件中指定一个命名的 service

service RouteGuide {
   ...
}

然后,您在服务定义中定义 rpc 方法,并指定它们的请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide 服务中使用

  • 简单 RPC:客户端使用存根向服务器发送请求,并等待响应返回,就像普通的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 一种服务器端流式 RPC,客户端向服务器发送请求并获取一个流以读取一系列消息。客户端从返回的流中读取,直到没有更多消息。正如你在我们的示例中看到的,通过在响应类型之前放置 stream 关键字来指定服务器端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 一种客户端流式 RPC,客户端写入一系列消息并将其发送到服务器,同样使用提供的流。一旦客户端完成消息写入,它会等待服务器读取所有消息并返回其响应。通过在请求类型之前放置 stream 关键字来指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 一种双向流式 RPC,双方使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按其喜欢的任何顺序进行读写:例如,服务器可以等待接收所有客户端消息后再写入其响应,或者它可以交替读取一条消息然后写入一条消息,或者读写操作的其他组合。每个流中的消息顺序都得到保留。通过在请求和响应之前都放置 stream 关键字来指定这种类型的方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

我们的 .proto 文件还包含我们服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义——例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务器代码

接下来,我们需要从 .proto 服务定义中生成 gRPC 客户端和服务器接口。我们使用协议缓冲区编译器 protoc 和一个特殊的 Dart 插件来完成此操作。这与我们在快速入门中做的类似。

route_guide 示例目录中运行

protoc -I protos/ protos/route_guide.proto --dart_out=grpc:lib/src/generated

运行此命令会在 route_guide 示例目录下的 lib/src/generated 目录中生成以下文件

  • route_guide.pb.dart
  • route_guide.pbenum.dart
  • route_guide.pbgrpc.dart
  • route_guide.pbjson.dart

这包含

  • 所有用于填充、序列化和检索我们的请求和响应消息类型的协议缓冲区代码
  • 一个接口类型(或存根),供客户端调用 RouteGuide 服务中定义的方法。
  • 一个接口类型,供服务器实现,也包含 RouteGuide 服务中定义的方法。

创建服务器

首先让我们看看如何创建一个 RouteGuide 服务器。如果您只对创建 gRPC 客户端感兴趣,您可以跳过此部分并直接跳到创建客户端(尽管您可能仍然会觉得它很有趣!)。

使我们的 RouteGuide 服务正常工作需要两部分

  • 实现从我们的服务定义生成的服务接口:完成我们服务的实际“工作”。
  • 运行 gRPC 服务器以侦听来自客户端的请求,并将其分派到正确的服务实现。

你可以在 grpc-dart/example/route_guide/lib/src/server.dart 中找到我们的示例 RouteGuide 服务器。让我们仔细看看它是如何工作的。

实现 RouteGuide

如你所见,我们的服务器有一个 RouteGuideService 类,它继承自生成的抽象类 RouteGuideServiceBase

class RouteGuideService extends RouteGuideServiceBase {
  Future<Feature> getFeature(grpc.ServiceCall call, Point request) async {
    ...
  }

  Stream<Feature> listFeatures(
      grpc.ServiceCall call, Rectangle request) async* {
    ...
  }

  Future<RouteSummary> recordRoute(
      grpc.ServiceCall call, Stream<Point> request) async {
    ...
  }

  Stream<RouteNote> routeChat(
      grpc.ServiceCall call, Stream<RouteNote> request) async* {
    ...
  }

  ...
}
简单 RPC

RouteGuideService 实现了我们所有的服务方法。让我们首先看看最简单的类型 GetFeature,它只是从客户端获取一个 Point,并从其数据库中以 Feature 形式返回相应的特征信息。

/// GetFeature handler. Returns a feature for the given location.
/// The [context] object provides access to client metadata, cancellation, etc.
@override
Future<Feature> getFeature(grpc.ServiceCall call, Point request) async {
  return featuresDb.firstWhere((f) => f.location == request,
      orElse: () => Feature()..location = request);
}

该方法被传递一个 RPC 的上下文对象和客户端的 Point 协议缓冲区请求。它返回一个包含响应信息的 Feature 协议缓冲区对象。在该方法中,我们用适当的信息填充 Feature,然后将其 return 给 gRPC 框架,gRPC 框架将其发送回客户端。

服务器端流式 RPC

现在让我们看看我们的一个流式 RPC。ListFeatures 是一个服务器端流式 RPC,因此我们需要向客户端发送多个 Feature

/// ListFeatures handler. Returns a stream of features within the given
/// rectangle.
@override
Stream<Feature> listFeatures(
    grpc.ServiceCall call, Rectangle request) async* {
  final normalizedRectangle = _normalize(request);
  // For each feature, check if it is in the given bounding box
  for (var feature in featuresDb) {
    if (feature.name.isEmpty) continue;
    final location = feature.location;
    if (_contains(normalizedRectangle, location)) {
      yield feature;
    }
  }
}

如你所见,这次我们的方法不再是获取并返回简单的请求和响应对象,而是获取一个请求对象(客户端想要在其中查找 FeatureRectangle)并返回一个 Feature 对象 Stream

在该方法中,我们填充需要返回的任意数量的 Feature 对象,并使用 yield 将它们添加到返回的流中。当方法返回时,流会自动关闭,告知 gRPC 我们已完成响应的写入。

如果在此调用中发生任何错误,该错误将作为异常添加到流中,并且 gRPC 层会将其转换为适当的 RPC 状态,以便通过网络发送。

客户端流式 RPC

现在让我们看一个更复杂一点的:客户端流式方法 RecordRoute,我们从客户端获取一个 Point 流,并返回一个包含其行程信息的单个 RouteSummary。如你所见,这次请求参数是一个流,服务器可以使用它来读取来自客户端的请求消息。服务器返回其单个响应,就像在简单 RPC 的情况下一样。

/// RecordRoute handler. Gets a stream of points, and responds with statistics
/// about the "trip": number of points, number of known features visited,
/// total distance traveled, and total time spent.
@override
Future<RouteSummary> recordRoute(
    grpc.ServiceCall call, Stream<Point> request) async {
  int pointCount = 0;
  int featureCount = 0;
  double distance = 0.0;
  Point previous;
  final timer = Stopwatch();

  await for (var location in request) {
    if (!timer.isRunning) timer.start();
    pointCount++;
    final feature = featuresDb.firstWhereOrNull((f) => f.location == location);
    if (feature != null) {
      featureCount++;
    }
    // For each point after the first, add the incremental distance from the
    // previous point to the total distance value.
    if (previous != null) distance += _distance(previous, location);
    previous = location;
  }
  timer.stop();
  return RouteSummary()
    ..pointCount = pointCount
    ..featureCount = featureCount
    ..distance = distance.round()
    ..elapsedTime = timer.elapsed.inSeconds;
}

在方法体中,我们使用请求流中的 await for 来重复读取客户端的请求(在本例中是 Point 对象),直到没有更多消息。一旦请求流完成,服务器就可以返回其 RouteSummary

双向流式 RPC

最后,让我们看看我们的双向流式 RPC RouteChat()

/// RouteChat handler. Receives a stream of message/location pairs, and
/// responds with a stream of all previous messages at each of those
/// locations.
@override
Stream<RouteNote> routeChat(
    grpc.ServiceCall call, Stream<RouteNote> request) async* {
  await for (var note in request) {
    final notes = routeNotes.putIfAbsent(note.location, () => <RouteNote>[]);
    for (var note in notes) yield note;
    notes.add(note);
  }
}

这次我们得到一个 RouteNote 流,与我们的客户端流式示例一样,它可用于读取消息。然而,这次我们通过方法返回的流返回值,而客户端仍在向消息流写入消息。

这里的读写语法与我们的客户端流式和服务器流式方法相同。尽管每一方总是会按照消息写入的顺序接收对方的消息,但客户端和服务器都可以按任何顺序读写 — 流是完全独立运行的。

启动服务器

一旦我们实现了所有方法,我们还需要启动一个 gRPC 服务器,以便客户端能够实际使用我们的服务。以下代码片段展示了我们如何为 RouteGuide 服务执行此操作

Future<void> main(List<String> args) async {
  final server = grpc.Server.create([RouteGuideService()]);
  await server.serve(port: 8080);
  print('Server listening...');
}

要构建和启动服务器,我们

  1. 使用 grpc.Server.create() 创建一个 gRPC 服务器实例,并提供服务实现的列表。
  2. 在服务器上调用 serve() 开始侦听请求,可以选择传入要侦听的地址和端口。服务器将继续异步处理请求,直到对其调用 shutdown()

创建客户端

在本节中,我们将介绍如何为我们的 RouteGuide 服务创建 Dart 客户端。完整的客户端代码可从 grpc-dart/example/route_guide/lib/src/client.dart 获取。

创建存根

要调用服务方法,我们首先需要创建一个 gRPC 通道来与服务器通信。我们通过将服务器地址和端口号传递给 ClientChannel() 来创建它,如下所示

final channel = ClientChannel('127.0.0.1',
    port: 8080,
    options: const ChannelOptions(
        credentials: ChannelCredentials.insecure()));

如有必要,你可以使用 ChannelOptions 为通道设置 TLS 选项(例如,受信任的证书)。

一旦 gRPC 通道设置好,我们需要一个客户端存根来执行 RPC。我们通过实例化 RouteGuideClient 来获取它,该类由示例 .proto 文件生成的包提供。

stub = RouteGuideClient(channel,
    options: CallOptions(timeout: Duration(seconds: 30)));

当服务需要时,你可以使用 CallOptions 来设置身份验证凭据(例如,GCE 凭据或 JWT 凭据)。RouteGuide 服务不需要任何凭据。

调用服务方法

现在让我们看看如何调用服务方法。请注意,在 gRPC-Dart 中,RPC 总是异步的,这意味着 RPC 会返回一个 FutureStream,必须对其进行监听才能从服务器获取响应或错误。

简单 RPC

调用简单的 RPC GetFeature 几乎与调用本地方法一样简单。

final point = Point()
  ..latitude = 409146138
  ..longitude = -746188906;
final feature = await stub.getFeature(point));

如你所见,我们在之前获得的存根上调用该方法。在我们的方法参数中,我们传递一个请求协议缓冲区对象(在本例中是 Point)。我们还可以传递一个可选的 CallOptions 对象,如果需要,它允许我们更改 RPC 的行为,例如超时。如果调用没有返回错误,返回的 Future 将完成并包含来自服务器的响应信息。如果存在错误,Future 将完成并包含该错误。

服务器端流式 RPC

这里是我们调用服务器端流式方法 ListFeatures 的地方,它返回一个地理 Feature 流。如果你已经阅读了创建服务器,其中一些内容可能看起来很熟悉——流式 RPC 在两端以类似的方式实现。

final rect = Rectangle()...; // initialize a Rectangle

try {
  await for (var feature in stub.listFeatures(rect)) {
    print(feature);
  }
catch (e) {
  print('ERROR: $e');
}

与简单 RPC 中一样,我们向方法传递一个请求。然而,这次我们没有返回 Future,而是返回一个 Stream。客户端可以使用该流来读取服务器的响应。

我们使用返回流上的 await for 来重复读取服务器的响应到一个响应协议缓冲区对象(在本例中是一个 Feature),直到没有更多消息。

客户端流式 RPC

客户端流式方法 RecordRoute 类似于服务器端方法,不同之处在于我们向方法传递一个 Stream 并返回一个 Future

final random = Random();

// Generate a number of random points
Stream<Point> generateRoute(int count) async* {
  for (int i = 0; i < count; i++) {
    final point = featuresDb[random.nextInt(featuresDb.length)].location;
    yield point;
  }
}

final pointCount = random.nextInt(100) + 2; // Traverse at least two points

final summary = await stub.recordRoute(generateRoute(pointCount));
print('Route summary: $summary');

由于 generateRoute() 方法是 async*,点将在 gRPC 监听请求流并将点消息发送到服务器时生成。一旦流完成(当 generateRoute() 返回时),gRPC 就知道我们已完成写入并期望接收响应。返回的 Future 将完成并包含从服务器接收到的 RouteSummary 消息,或者一个错误。

双向流式 RPC

最后,让我们看看我们的双向流式 RPC RouteChat()。与 RecordRoute 的情况一样,我们向方法传递一个将写入请求消息的流,并且像在 ListFeatures 中一样,我们返回一个可用于读取响应消息的流。然而,这次我们将通过我们方法自己的流发送值,而服务器也正在向它们的消息流写入消息。

Stream<RouteNote> outgoingNotes = ...;

final responses = stub.routeChat(outgoingNotes);
await for (var note in responses) {
  print('Got message ${note.message} at ${note.location.latitude}, ${note
      .location.longitude}');
}

这里的读写语法与我们的客户端流式和服务器端流式方法非常相似。尽管每一方总是会按照消息写入的顺序接收对方的消息,但客户端和服务器都可以按任何顺序读写 — 流是完全独立运行的。

尝试一下!

从示例目录开始工作

cd example/route_guide

获取包

dart pub get

运行服务器

dart bin/server.dart

在另一个终端中,运行客户端

dart bin/client.dart

报告问题

如果你发现 Dart gRPC 的问题,请在我们的问题跟踪器中提交问题。