基础教程
gRPC Dart 基础入门教程。
基础教程
本教程为 Dart 开发者提供了关于使用 gRPC 的基础入门介绍。
通过此示例,您将学习如何
- 在
.proto文件中定义服务。 - 使用协议缓冲区编译器生成服务器和客户端代码。
- 通过使用 Dart gRPC API,你将学会编写一个简单的客户端和服务端。
本教程假设你已经阅读过 gRPC 简介 并且熟悉 Protocol Buffers。请注意,本教程中的示例使用 proto3 版本的 Protocol Buffers 语言:你可以在 proto3 语言指南 中了解更多信息。
为什么使用 gRPC?
我们的示例是一个简单的路线映射应用程序,它允许客户端获取其路线上特征的信息,创建其路线摘要,并与服务器及其他客户端交换路线信息(例如交通更新)。
使用 gRPC,我们可以一次在 .proto 文件中定义服务,并生成 gRPC 支持的任何语言的客户端和服务器。这些客户端和服务器可以在各种环境中运行,从大型数据中心内的服务器到您的平板电脑,应有尽有——不同语言和环境之间通信的所有复杂性都由 gRPC 为您处理。我们还可以获得使用 Protocol Buffers 的所有优势,包括高效的序列化、简单的 IDL 和易于更新的接口。
示例代码和设置
本教程的示例代码位于 grpc/grpc-dart/example/route_guide。要下载该示例,请通过运行以下命令克隆 grpc-dart 仓库:
git clone --depth 1 https://github.com/grpc/grpc-dart
然后将当前目录更改为 grpc-dart/example/route_guide:
cd grpc-dart/example/route_guide
你应该已经安装了生成客户端和服务端接口代码所需的工具 —— 如果还没有,请参阅 快速入门 获取设置说明。
定义服务
第一步(正如你从 gRPC 简介 中所了解的)是使用 Protocol Buffers 定义 gRPC 服务以及方法请求和响应类型。你可以在 example/route_guide/protos/route_guide.proto 中查看完整的 .proto 文件。
要定义服务,请在 .proto 文件中指定一个命名的 service
service RouteGuide {
...
}
然后,您在服务定义中定义 rpc 方法,并指定它们的请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide 服务中使用
简单 RPC:客户端使用存根向服务器发送请求,并等待响应返回,就像普通的函数调用一样。
// Obtains the feature at a given position. rpc GetFeature(Point) returns (Feature) {}服务端流式 RPC:客户端向服务器发送请求,并获得一个用于读取一系列消息的流。客户端会读取返回的流,直到没有更多消息为止。正如您在我们的示例中所见,通过在响应类型前放置
stream关键字,可以指定服务端流式方法。// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}客户端流式 RPC:客户端写入一系列消息并通过提供的流发送给服务器。一旦客户端完成了消息写入,它将等待服务器读取所有消息并返回其响应。通过在请求类型前放置
stream关键字,可以指定客户端流式方法。// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}双向流式 RPC:双方都使用读写流发送一系列消息。这两个流独立操作,因此客户端和服务器可以按任何他们喜欢的顺序读取和写入:例如,服务器可以等待接收所有客户端消息后再写入其响应,或者交替读取一条消息然后写入一条消息,亦或是其他读写组合。每条流中消息的顺序都会得到保留。通过在请求和响应之前都放置
stream关键字,可以指定这种类型的方法。// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
我们的 .proto 文件还包含我们服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义——例如,这是 Point 消息类型
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
生成客户端和服务器代码
接下来,我们需要从 .proto 服务定义中生成 gRPC 客户端和服务端接口。我们使用带有特定 Dart 插件的 Protocol Buffer 编译器 protoc 来实现。这与我们在 快速入门 中所做的类似。
在 route_guide 示例目录下运行:
protoc -I protos/ protos/route_guide.proto --dart_out=grpc:lib/src/generated
运行此命令会在 route_guide 示例目录下的 lib/src/generated 目录中生成以下文件:
route_guide.pb.dartroute_guide.pbenum.dartroute_guide.pbgrpc.dartroute_guide.pbjson.dart
这些文件包含:
- 所有用于填充、序列化和获取我们的请求与响应消息类型的 Protocol Buffer 代码。
- 一种供客户端调用的接口类型(或 桩/stub),其中包含
RouteGuide服务定义的方法。 - 一种供服务端实现的接口类型,同样包含
RouteGuide服务定义的方法。
创建服务器
首先让我们看看如何创建一个 RouteGuide 服务器。如果您只对创建 gRPC 客户端感兴趣,您可以跳过此部分并直接跳到创建客户端(尽管您可能仍然会觉得它很有趣!)。
使我们的 RouteGuide 服务正常工作需要两部分
- 实现从我们的服务定义生成的服务接口:完成我们服务的实际“工作”。
- 运行一个 gRPC 服务端以监听来自客户端的请求,并将其分发到正确的服务实现。
你可以在 grpc-dart/example/route_guide/lib/src/server.dart 中找到我们的 RouteGuide 服务端示例。让我们仔细看看它是如何工作的。
实现 RouteGuide
如你所见,我们的服务端有一个 RouteGuideService 类,它继承了生成的抽象类 RouteGuideServiceBase。
class RouteGuideService extends RouteGuideServiceBase {
Future<Feature> getFeature(grpc.ServiceCall call, Point request) async {
...
}
Stream<Feature> listFeatures(
grpc.ServiceCall call, Rectangle request) async* {
...
}
Future<RouteSummary> recordRoute(
grpc.ServiceCall call, Stream<Point> request) async {
...
}
Stream<RouteNote> routeChat(
grpc.ServiceCall call, Stream<RouteNote> request) async* {
...
}
...
}
简单 RPC
RouteGuideService 实现了我们所有的服务方法。让我们先看看最简单的方法 GetFeature,它从客户端获取一个 Point,并返回数据库中对应的 Feature 信息。
/// GetFeature handler. Returns a feature for the given location.
/// The [context] object provides access to client metadata, cancellation, etc.
@override
Future<Feature> getFeature(grpc.ServiceCall call, Point request) async {
return featuresDb.firstWhere((f) => f.location == request,
orElse: () => Feature()..location = request);
}
该方法接收一个 RPC 上下文对象和客户端的 Point Protocol Buffer 请求。它返回一个包含响应信息的 Feature Protocol Buffer 对象。在方法中,我们用相关信息填充 Feature,然后将其 return 给 gRPC 框架,框架会将其发送回客户端。
服务器端流式 RPC
现在让我们看看流式 RPC。ListFeatures 是一个服务端流式 RPC,因此我们需要向客户端发送多个 Feature。
/// ListFeatures handler. Returns a stream of features within the given
/// rectangle.
@override
Stream<Feature> listFeatures(
grpc.ServiceCall call, Rectangle request) async* {
final normalizedRectangle = _normalize(request);
// For each feature, check if it is in the given bounding box
for (var feature in featuresDb) {
if (feature.name.isEmpty) continue;
final location = feature.location;
if (_contains(normalizedRectangle, location)) {
yield feature;
}
}
}
如你所见,这次我们不是获取并返回简单的请求和响应对象,而是接收一个请求对象(客户端希望在其中查找 Feature 的 Rectangle),并返回一个 Feature 对象的 Stream。
在方法中,我们按需填充 Feature 对象,并使用 yield 将它们添加到返回的流中。当方法返回时,流会自动关闭,告知 gRPC 我们已完成响应的写入。
如果在此调用中发生任何错误,该错误将作为异常添加到流中,gRPC 层会将其转换为适当的 RPC 状态并通过网络发送。
客户端流式 RPC
现在让我们看一个稍微复杂一点的方法:客户端流式 RPC RecordRoute,我们从客户端接收一个 Point 流,并返回一个包含其行程信息的 RouteSummary。如你所见,这次请求参数是一个流,服务端可以使用它来读取来自客户端的请求消息。服务端像处理简单的 RPC 一样返回单个响应。
/// RecordRoute handler. Gets a stream of points, and responds with statistics
/// about the "trip": number of points, number of known features visited,
/// total distance traveled, and total time spent.
@override
Future<RouteSummary> recordRoute(
grpc.ServiceCall call, Stream<Point> request) async {
int pointCount = 0;
int featureCount = 0;
double distance = 0.0;
Point previous;
final timer = Stopwatch();
await for (var location in request) {
if (!timer.isRunning) timer.start();
pointCount++;
final feature = featuresDb.firstWhereOrNull((f) => f.location == location);
if (feature != null) {
featureCount++;
}
// For each point after the first, add the incremental distance from the
// previous point to the total distance value.
if (previous != null) distance += _distance(previous, location);
previous = location;
}
timer.stop();
return RouteSummary()
..pointCount = pointCount
..featureCount = featureCount
..distance = distance.round()
..elapsedTime = timer.elapsed.inSeconds;
}
在方法体中,我们对请求流使用 await for 来反复读取客户端的请求(在本例中为 Point 对象),直到没有更多消息为止。一旦请求流结束,服务端就可以返回其 RouteSummary。
双向流式 RPC
最后,让我们看看我们的双向流式 RPC RouteChat()。
/// RouteChat handler. Receives a stream of message/location pairs, and
/// responds with a stream of all previous messages at each of those
/// locations.
@override
Stream<RouteNote> routeChat(
grpc.ServiceCall call, Stream<RouteNote> request) async* {
await for (var note in request) {
final notes = routeNotes.putIfAbsent(note.location, () => <RouteNote>[]);
for (var note in notes) yield note;
notes.add(note);
}
}
这次我们接收一个 RouteNote 流,和之前的客户端流式示例一样,它可以用来读取消息。然而,这次我们通过方法返回的流来返回数值,此时客户端仍在向其消息流写入消息。
此处的读写语法与我们的客户端流式和服务端流式方法相同。虽然每一方始终以对方写入的顺序接收消息,但客户端和服务端都可以按任意顺序进行读写 —— 流的操作是完全独立的。
启动服务器
一旦我们实现了所有方法,我们还需要启动一个 gRPC 服务器,以便客户端能够实际使用我们的服务。以下代码片段展示了我们如何为 RouteGuide 服务执行此操作
Future<void> main(List<String> args) async {
final server = grpc.Server.create([RouteGuideService()]);
await server.serve(port: 8080);
print('Server listening...');
}
要构建并启动服务端,我们需要:
- 使用
grpc.Server.create()创建一个 gRPC 服务端实例,并传入一个服务实现列表。 - 在服务端上调用
serve()以开始监听请求,并可选择性地传入监听的地址和端口。服务端将持续异步处理请求,直到在其上调用shutdown()。
创建客户端
在本节中,我们将介绍如何为 RouteGuide 服务创建一个 Dart 客户端。完整的客户端代码可在 grpc-dart/example/route_guide/lib/src/client.dart 获取。
创建存根
要调用服务方法,首先需要创建一个 gRPC 通道 (Channel) 以便与服务端通信。我们通过将服务端地址和端口传递给 ClientChannel() 来创建通道:
final channel = ClientChannel('127.0.0.1',
port: 8080,
options: const ChannelOptions(
credentials: ChannelCredentials.insecure()));
如有必要,你可以使用 ChannelOptions 为通道设置 TLS 选项(例如受信任的证书)。
一旦 gRPC 通道设置完成,我们就需要一个客户端 桩 (Stub) 来执行 RPC。我们通过实例化 RouteGuideClient 来获取它,该类由示例 .proto 文件生成的包提供。
stub = RouteGuideClient(channel,
options: CallOptions(timeout: Duration(seconds: 30)));
当服务需要身份验证凭据(例如 GCE 或 JWT 凭据)时,你可以使用 CallOptions 设置这些凭据。RouteGuide 服务不需要任何凭据。
调用服务方法
现在让我们看看如何调用我们的服务方法。请注意,在 gRPC-Dart 中,RPC 始终是异步的,这意味着 RPC 会返回一个必须被监听的 Future 或 Stream,从而获取服务端的响应或错误。
简单 RPC
调用简单的 RPC GetFeature 几乎与调用本地方法一样简单。
final point = Point()
..latitude = 409146138
..longitude = -746188906;
final feature = await stub.getFeature(point));
如你所见,我们在之前获取的桩上调用该方法。在方法参数中,我们传递一个请求 Protocol Buffer 对象(在本例中为 Point)。我们还可以传递一个可选的 CallOptions 对象,以便在必要时更改 RPC 的行为(例如超时设置)。如果调用没有返回错误,则返回的 Future 将以服务端的响应信息完成。如果出现错误,Future 将以错误完成。
服务器端流式 RPC
在此处,我们调用服务端流式方法 ListFeatures,它返回一个地理 Feature 流。如果你已经阅读了 创建服务端,这看起来可能会很熟悉 —— 流式 RPC 在双方的实现方式是相似的。
final rect = Rectangle()...; // initialize a Rectangle
try {
await for (var feature in stub.listFeatures(rect)) {
print(feature);
}
catch (e) {
print('ERROR: $e');
}
与简单 RPC 一样,我们将请求传递给方法。不同的是,我们没有得到 Future,而是得到了一个 Stream。客户端可以使用该流来读取服务端的响应。
我们对返回的流使用 await for,反复读取服务端对响应 Protocol Buffer 对象(在本例中为 Feature)的回复,直到没有更多消息为止。
客户端流式 RPC
客户端流式方法 RecordRoute 与服务端流式方法类似,区别在于我们向方法传递一个 Stream 并获得一个 Future 返回。
final random = Random();
// Generate a number of random points
Stream<Point> generateRoute(int count) async* {
for (int i = 0; i < count; i++) {
final point = featuresDb[random.nextInt(featuresDb.length)].location;
yield point;
}
}
final pointCount = random.nextInt(100) + 2; // Traverse at least two points
final summary = await stub.recordRoute(generateRoute(pointCount));
print('Route summary: $summary');
由于 generateRoute() 方法是 async* 的,当 gRPC 监听请求流并向服务端发送点消息时,这些点就会被生成。一旦流结束(当 generateRoute() 返回时),gRPC 就知道我们已完成写入并正在等待响应。返回的 Future 将以从服务端接收到的 RouteSummary 消息完成,或者以错误完成。
双向流式 RPC
最后,让我们看看双向流式 RPC RouteChat()。正如 RecordRoute 的情况,我们传递一个用于写入请求消息的流给方法;又如 ListFeatures,我们得到一个可以用来读取响应消息的流。然而,这次我们会在通过方法流发送数值的同时,服务端也在向其消息流写入消息。
Stream<RouteNote> outgoingNotes = ...;
final responses = stub.routeChat(outgoingNotes);
await for (var note in responses) {
print('Got message ${note.message} at ${note.location.latitude}, ${note
.location.longitude}');
}
此处的读写语法与我们的客户端流式和服务端流式方法非常相似。虽然每一方始终以对方写入的顺序接收消息,但客户端和服务端都可以按任意顺序进行读写 —— 流的操作是完全独立的。
尝试一下!
在示例目录下进行操作:
cd example/route_guide
获取包:
dart pub get
运行服务器
dart bin/server.dart
在另一个终端中,运行客户端
dart bin/client.dart
报告问题
如果你发现 Dart gRPC 有任何问题,请在我们的问题追踪器中 提交一个 Issue。