基础教程

Node 中 gRPC 的基础教程介绍。

基础教程

Node 中 gRPC 的基础教程介绍。

本教程为 Node.js 程序员提供了一个使用 gRPC 的基本入门。

通过学习本示例,您将学习如何

  • .proto 文件中定义服务。
  • 使用 Node.js gRPC API 为您的服务编写一个简单的客户端和服务器。

本教程假设您已阅读 gRPC 简介 并熟悉 协议缓冲区。请注意,本教程中的示例使用了协议缓冲区语言的 proto3 版本。您可以在 proto3 语言指南 中找到更多信息。

为什么要使用 gRPC?

我们的示例是一个简单的路线地图应用程序,它允许客户端获取有关其路线上特征的信息,创建其路线的摘要,并与服务器和其他客户端交换诸如交通更新之类的路线信息。

通过 gRPC,我们可以在一个 .proto 文件中定义我们的服务一次,并生成任何 gRPC 支持语言的客户端和服务器,这些客户端和服务器又可以在从大型数据中心内部的服务器到您自己的平板电脑的环境中运行 — gRPC 为您处理了不同语言和环境之间通信的所有复杂性。我们还可以获得使用协议缓冲区的所有优势,包括高效的序列化、简单的 IDL 和轻松的接口更新。

示例代码和设置

我们教程的示例代码位于 grpc/grpc-node/examples/routeguide/dynamic_codegen 中。如果您查看存储库,您会发现 grpc/grpc-node/examples/routeguide/static_codegen 中也有一个非常相似的示例。我们有 route guide 示例的两个版本,因为有两种方法可以生成在 Node.js 中使用协议缓冲区所需的代码 - 一种方法使用 Protobuf.js 在运行时动态生成代码,另一种方法使用协议缓冲区编译器 protoc 静态生成的代码。这些示例的行为完全相同,并且任何一个服务器都可以与任何一个客户端一起使用。正如目录名称所暗示的,我们将在这份文档中使用动态生成代码的版本,但也欢迎您查看静态代码示例。

要下载示例,请运行以下命令克隆 grpc 存储库

git clone -b @grpc/[email protected] --depth 1 --shallow-submodules https://github.com/grpc/grpc-node
cd grpc

然后将当前目录更改为 examples

cd examples

您还应该安装相关的工具来生成服务器和客户端接口代码 - 如果您还没有安装,请按照 快速入门 中的设置说明进行操作。

定义服务

我们的第一步(正如您从 gRPC 简介 中了解的那样)是使用 协议缓冲区 定义 gRPC 服务 和方法请求响应类型。您可以在 examples/protos/route_guide.proto 中查看完整的 .proto 文件。

要定义服务,您需要在 .proto 文件中指定一个命名的 service

service RouteGuide {
   ...
}

然后在您的服务定义中定义 rpc 方法,指定它们的请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide 服务中使用

  • 简单 RPC,其中客户端使用存根向服务器发送请求,并等待响应返回,就像正常的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 服务器端流式 RPC,其中客户端向服务器发送请求,并获取一个流来读取一系列消息。客户端从返回的流中读取,直到没有更多消息为止。正如您在我们的示例中所见,您可以通过将 stream 关键字放在响应类型之前来指定服务器端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 客户端流式 RPC,其中客户端编写一系列消息并将其发送到服务器,同样使用提供的流。一旦客户端完成编写消息,它将等待服务器读取所有消息并返回其响应。您可以通过将 stream 关键字放在请求类型之前来指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 双向流式 RPC,其中双方使用读写流发送一系列消息。两个流独立运行,因此客户端和服务器可以按照他们喜欢的任何顺序进行读取和写入:例如,服务器可以等待接收所有客户端消息,然后再编写其响应,或者它可以交替读取消息,然后写入消息,或者其他读取和写入的组合。每个流中消息的顺序都会被保留。您可以通过将 stream 关键字放在请求和响应之前来指定这种类型的方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

我们的 .proto 文件还包含了在我们的服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义 - 例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

从 proto 文件加载服务描述符

Node.js 库从运行时加载的 .proto 文件动态生成服务描述符和客户端存根定义。

要加载 .proto 文件,只需 require gRPC proto 加载器库并使用其 loadSync() 方法,然后将输出传递给 gRPC 库的 loadPackageDefinition 方法。

var PROTO_PATH = __dirname + '/../../protos/route_guide.proto';
var grpc = require('@grpc/grpc-js');
var protoLoader = require('@grpc/proto-loader');
// Suggested options for similarity to existing grpc.load behavior
var packageDefinition = protoLoader.loadSync(
    PROTO_PATH,
    {keepCase: true,
     longs: String,
     enums: String,
     defaults: true,
     oneofs: true
    });
var protoDescriptor = grpc.loadPackageDefinition(packageDefinition);
// The protoDescriptor object has the full package hierarchy
var routeguide = protoDescriptor.routeguide;

完成此操作后,存根构造函数位于 routeguide 命名空间 (protoDescriptor.routeguide.RouteGuide) 中,服务描述符(用于创建服务器)是存根的属性 (protoDescriptor.routeguide.RouteGuide.service);

创建服务器

首先,让我们看看如何创建 RouteGuide 服务器。如果您只对创建 gRPC 客户端感兴趣,您可以跳过此部分,直接转到 创建客户端(尽管您可能会觉得它也很有趣!)。

要使我们的 RouteGuide 服务正常工作,需要完成两个部分

  • 实现从我们的服务定义生成的服务接口:执行我们服务的实际“工作”。
  • 运行 gRPC 服务器以监听来自客户端的请求并返回服务响应。

您可以在 examples/routeguide/dynamic_codegen/route_guide_server.js 中找到我们的示例 RouteGuide 服务器。让我们仔细看看它是如何工作的。

实现 RouteGuide

如您所见,我们的服务器有一个从 RouteGuide.service 描述符对象生成的 Server 构造函数。

var Server = new grpc.Server();

在这种情况下,我们正在实现 RouteGuide异步版本,它提供了我们的默认 gRPC 服务器行为。

route_guide_server.js 中的函数实现了我们所有的服务方法。让我们先来看最简单的类型,getFeature,它只是从客户端获取一个 Point,并从其数据库中返回相应的特征信息,格式为 Feature

function checkFeature(point) {
  var feature;
  // Check if there is already a feature object for the given point
  for (var i = 0; i < feature_list.length; i++) {
    feature = feature_list[i];
    if (feature.location.latitude === point.latitude &&
        feature.location.longitude === point.longitude) {
      return feature;
    }
  }
  var name = '';
  feature = {
    name: name,
    location: point
  };
  return feature;
}
function getFeature(call, callback) {
  callback(null, checkFeature(call.request));
}

该方法传递一个 RPC 的调用对象,该对象将 Point 参数作为属性,以及一个我们可以传递返回的 Feature 的回调。在方法体中,我们填充一个与给定点对应的 Feature,并将其传递给回调,第一个参数为 null,表示没有错误。

现在让我们来看一个更复杂的东西 - 流式 RPC。listFeatures 是一个服务器端流式 RPC,因此我们需要向客户端发送多个 Feature

function listFeatures(call) {
  var lo = call.request.lo;
  var hi = call.request.hi;
  var left = _.min([lo.longitude, hi.longitude]);
  var right = _.max([lo.longitude, hi.longitude]);
  var top = _.max([lo.latitude, hi.latitude]);
  var bottom = _.min([lo.latitude, hi.latitude]);
  // For each feature, check if it is in the given bounding box
  _.each(feature_list, function(feature) {
    if (feature.name === '') {
      return;
    }
    if (feature.location.longitude >= left &&
        feature.location.longitude <= right &&
        feature.location.latitude >= bottom &&
        feature.location.latitude <= top) {
      call.write(feature);
    }
  });
  call.end();
}

如您所见,这次我们没有在方法参数中获取调用对象和回调,而是获取一个实现 Writable 接口的 call 对象。在该方法中,我们创建我们需要返回的尽可能多的 Feature 对象,使用其 write() 方法将它们写入 call。最后,我们调用 call.end() 以表明我们已发送所有消息。

如果您查看客户端流式方法 RecordRoute,您会发现它与一元调用非常相似,只不过这次 call 参数实现了 Reader 接口。每次有新数据时,call'data' 事件都会触发,并且当所有数据都已读取时,'end' 事件会触发。与一元情况一样,我们通过调用回调来响应

call.on('data', function(point) {
  // Process user data
});
call.on('end', function() {
  callback(null, result);
});

最后,让我们看看我们的双向流式 RPC RouteChat()

function routeChat(call) {
  call.on('data', function(note) {
    var key = pointKey(note.location);
    /* For each note sent, respond with all previous notes that correspond to
     * the same point */
    if (route_notes.hasOwnProperty(key)) {
      _.each(route_notes[key], function(note) {
        call.write(note);
      });
    } else {
      route_notes[key] = [];
    }
    // Then add the new note to the list
    route_notes[key].push(JSON.parse(JSON.stringify(note)));
  });
  call.on('end', function() {
    call.end();
  });
}

这次我们获得一个实现 Duplexcall,可用于读取写入消息。此处读取和写入的语法与我们的客户端流式和服务器流式方法完全相同。尽管每一方都将始终按照写入顺序获取对方的消息,但客户端和服务器都可以按任意顺序读取和写入 - 这些流是完全独立运行的。

启动服务器

一旦我们实现了所有方法,我们还需要启动一个 gRPC 服务器,以便客户端可以实际使用我们的服务。以下代码片段显示了我们如何为 RouteGuide 服务执行此操作

function getServer() {
  var server = new grpc.Server();
  server.addService(routeguide.RouteGuide.service, {
    getFeature: getFeature,
    listFeatures: listFeatures,
    recordRoute: recordRoute,
    routeChat: routeChat
  });
  return server;
}
var routeServer = getServer();
routeServer.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
  routeServer.start();
});

如您所见,我们通过以下步骤构建和启动服务器

  1. RouteGuide 服务描述符创建 Server 构造函数。
  2. 实现服务方法。
  3. 通过使用方法实现调用 Server 构造函数来创建服务器实例。
  4. 使用实例的 bind() 方法指定我们要用于监听客户端请求的地址和端口。
  5. 在实例上调用 start() 以启动 RPC 服务器。

创建客户端

在本节中,我们将介绍如何为我们的 RouteGuide 服务创建 Node.js 客户端。您可以在 examples/routeguide/dynamic_codegen/route_guide_client.js 中查看我们的完整示例客户端代码。

创建存根

要调用服务方法,我们首先需要创建一个存根。为此,我们只需要调用 RouteGuide 存根构造函数,并指定服务器地址和端口。

new routeguide.RouteGuide('localhost:50051', grpc.credentials.createInsecure());

调用服务方法

现在让我们看看如何调用我们的服务方法。请注意,所有这些方法都是异步的:它们使用事件或回调来检索结果。

简单 RPC

调用简单的 RPC GetFeature 几乎与调用本地异步方法一样简单。

var point = {latitude: 409146138, longitude: -746188906};
stub.getFeature(point, function(err, feature) {
  if (err) {
    // process error
  } else {
    // process feature
  }
});

如您所见,我们创建并填充了一个请求对象。最后,我们在存根上调用该方法,并将请求和回调传递给它。如果没有错误,我们可以从我们的响应对象中读取来自服务器的响应信息。

console.log('Found feature called "' + feature.name + '" at ' +
    feature.location.latitude/COORD_FACTOR + ', ' +
    feature.location.longitude/COORD_FACTOR);
流式 RPC

现在让我们看一下我们的流式方法。如果您已经阅读了 创建服务器,其中一些内容可能看起来非常熟悉 - 流式 RPC 在两侧的实现方式相似。这是我们调用服务器端流式方法 ListFeatures 的地方,它返回一个地理 Feature

var call = client.listFeatures(rectangle);
  call.on('data', function(feature) {
      console.log('Found feature called "' + feature.name + '" at ' +
          feature.location.latitude/COORD_FACTOR + ', ' +
          feature.location.longitude/COORD_FACTOR);
  });
  call.on('end', function() {
    // The server has finished sending
  });
  call.on('error', function(e) {
    // An error has occurred and the stream has been closed.
  });
  call.on('status', function(status) {
    // process status
  });

我们不是将请求和回调传递给该方法,而是传递一个请求并取回一个 Readable 流对象。客户端可以使用 Readable'data' 事件读取服务器的响应。此事件会随着每个 Feature 消息对象触发,直到没有更多消息为止。'data' 回调中的错误不会导致流关闭。'error' 事件表示发生错误并且流已关闭。'end' 事件表示服务器已完成发送且未发生错误。只会发出 'error''end' 中的一个。最后,当服务器发送状态时,'status' 事件会触发。

客户端流式方法 RecordRoute 类似,只不过我们在那里将回调传递给该方法并取回一个 Writable

var call = client.recordRoute(function(error, stats) {
  if (error) {
    callback(error);
  }
  console.log('Finished trip with', stats.point_count, 'points');
  console.log('Passed', stats.feature_count, 'features');
  console.log('Travelled', stats.distance, 'meters');
  console.log('It took', stats.elapsed_time, 'seconds');
});
function pointSender(lat, lng) {
  return function(callback) {
    console.log('Visiting point ' + lat/COORD_FACTOR + ', ' +
        lng/COORD_FACTOR);
    call.write({
      latitude: lat,
      longitude: lng
    });
    _.delay(callback, _.random(500, 1500));
  };
}
var point_senders = [];
for (var i = 0; i < num_points; i++) {
  var rand_point = feature_list[_.random(0, feature_list.length - 1)];
  point_senders[i] = pointSender(rand_point.location.latitude,
                                 rand_point.location.longitude);
}
async.series(point_senders, function() {
  call.end();
});

一旦我们使用 write() 完成向流写入客户端的请求,我们需要在流上调用 end(),以告知 gRPC 我们已完成写入。如果状态为 OK,则 stats 对象将填充服务器的响应。

最后,让我们看看我们的双向流式 RPC routeChat()。在这种情况下,我们只需将上下文传递给该方法,并取回一个 Duplex 流对象,该对象可用于写入和读取消息。

var call = client.routeChat();

此处读取和写入的语法与我们的客户端流式方法和服务端流式方法完全相同。尽管每一方总是会按照写入的顺序接收到另一方的消息,但客户端和服务器都可以按照任何顺序读取和写入——流的操作是完全独立的。

试试看!

构建客户端和服务器

npm install

运行服务器

node ./routeguide/dynamic_codegen/route_guide_server.js --db_path=./routeguide/dynamic_codegen/route_guide_db.json

从另一个终端,运行客户端

node ./routeguide/dynamic_codegen/route_guide_client.js --db_path=./routeguide/dynamic_codegen/route_guide_db.json