基础教程

gRPC C++ 基础教程简介。

基础教程

gRPC C++ 基础教程简介。

本教程为 C++ 程序员提供了 gRPC 的基础入门知识。

通过学习本示例,您将了解如何:

  • .proto 文件中定义服务。
  • 使用 Protocol Buffer 编译器生成服务器和客户端代码。
  • 使用 C++ gRPC API 为您的服务编写简单的客户端和服务器。

本教程假设您已阅读 gRPC 简介 并熟悉 Protocol Buffer。请注意,本教程中的示例使用了 Protocol Buffer 语言的 proto3 版本:您可以在 proto3 语言指南C++ 生成代码指南 中了解更多信息。

为什么使用 gRPC?

我们的示例是一个简单的路线图应用,它允许客户端获取路线上特征点的信息、创建路线摘要,并与服务器及其他客户端交换流量更新等路线信息。

使用 gRPC,我们可以在一个 .proto 文件中一次性定义服务,然后生成支持 gRPC 的任何语言的客户端和服务器代码,这些代码可以在从大型数据中心内的服务器到您自己的平板电脑的各种环境中运行——不同语言和环境之间通信的所有复杂性都由 gRPC 为您处理。我们还获得了使用 Protocol Buffer 的所有优势,包括高效的序列化、简单的 IDL 以及方便的接口更新。

示例代码和设置

示例代码位于 grpc 仓库的 examples/cpp/route_guide 目录下。获取示例代码并构建 gRPC

  1. 遵循快速入门指南 从源代码构建和本地安装 gRPC

  2. 从仓库文件夹中,切换到 route guide 示例目录

    cd examples/cpp/route_guide
    
  3. 运行 cmake

    mkdir -p cmake/build
    cd cmake/build
    cmake -DCMAKE_PREFIX_PATH=$MY_INSTALL_DIR ../..
    

定义服务

我们的第一步(正如您在 gRPC 简介 中了解到的)是使用 Protocol Buffer 定义 gRPC 服务以及方法 请求响应 类型。您可以在 examples/protos/route_guide.proto 中查看完整的 .proto 文件。

要定义服务,您需要在 .proto 文件中指定一个命名的 service

service RouteGuide {
   ...
}

然后,您在服务定义内部定义 rpc 方法,指定它们的请求和响应类型。gRPC 允许您定义四种类型的服务方法,所有这些类型都在 RouteGuide 服务中使用

  • 一种简单 RPC,客户端使用存根向服务器发送请求并等待响应返回,就像普通函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 一种服务器端流式 RPC,客户端向服务器发送请求并获取一个流,以便从中读取一系列消息。客户端从返回的流中读取,直到没有更多消息为止。正如您在我们的示例中看到的,您可以通过在响应类型之前放置 stream 关键字来指定服务器端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 一种客户端流式 RPC,客户端写入一系列消息并将其发送到服务器,同样使用提供的流。一旦客户端完成消息写入,它会等待服务器读取所有消息并返回响应。您可以通过在请求类型之前放置 stream 关键字来指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 一种双向流式 RPC,客户端和服务器双方都使用读写流发送一系列消息。这两个流独立操作,因此客户端和服务器可以按任何顺序读写:例如,服务器可以等到接收所有客户端消息后再写入响应,或者它可以交替读取一条消息然后写入一条消息,或以其他读写组合方式进行。每个流中的消息顺序都得到保留。您可以通过在请求和响应之前都放置 stream 关键字来指定这种类型的方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

我们的 .proto 文件还包含 Protocol Buffer 消息类型定义,用于我们的服务方法中使用的所有请求和响应类型——例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务器代码

接下来我们需要从 .proto 服务定义生成 gRPC 客户端和服务器接口。我们使用 Protocol Buffer 编译器 protoc 以及一个特殊的 gRPC C++ 插件来完成此操作。

为简单起见,我们提供了一个 CMakeLists.txt,它会为您运行 protoc,并使用适当的插件、输入和输出(如果您想自己运行,请确保您已安装 protoc 并首先遵循了 gRPC 代码安装说明

make route_guide.grpc.pb.o

实际运行的是

protoc -I ../../protos --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` ../../protos/route_guide.proto
protoc -I ../../protos --cpp_out=. ../../protos/route_guide.proto

运行此命令会在当前目录中生成以下文件:

  • route_guide.pb.h,声明生成的消息类的头文件
  • route_guide.pb.cc,包含消息类实现的源文件
  • route_guide.grpc.pb.h,声明生成的服务类的头文件
  • route_guide.grpc.pb.cc,包含服务类实现的源文件

这些文件包含:

  • 所有用于填充、序列化和检索我们的请求和响应消息类型的 Protocol Buffer 代码

  • 一个名为 RouteGuide 的类,其中包含:

    • 客户端用于调用 RouteGuide 服务中定义的方法的远程接口类型(或存根)。
    • 服务器要实现的两个抽象接口,也包含 RouteGuide 服务中定义的方法。

创建服务器

首先,我们来看看如何创建一个 RouteGuide 服务器。如果您只对创建 gRPC 客户端感兴趣,可以跳过本节,直接转到创建客户端(尽管您可能还是会觉得这部分内容很有趣!)。

让我们的 RouteGuide 服务正常工作需要两个部分:

  • 实现从服务定义生成的服务接口:完成服务的实际“工作”。
  • 运行 gRPC 服务器,监听客户端请求并返回服务响应。

您可以在 examples/cpp/route_guide/route_guide_server.cc 中找到我们的示例 RouteGuide 服务器代码。让我们仔细看看它是如何工作的。

实现 RouteGuide

正如您所见,我们的服务器有一个 RouteGuideImpl 类,它实现了生成的 RouteGuide::Service 接口

class RouteGuideImpl final : public RouteGuide::Service {
...
}

在本例中,我们正在实现 RouteGuide同步版本,这提供了我们默认的 gRPC 服务器行为。也可以实现一个异步接口 RouteGuide::AsyncService,它允许您进一步自定义服务器的线程行为,尽管在本教程中我们不会对此进行探讨。

RouteGuideImpl 实现了我们所有的服务方法。我们首先来看看最简单的类型 GetFeature,它仅从客户端获取一个 Point,然后从其数据库中返回相应的特征信息到 Feature 中。

Status GetFeature(ServerContext* context, const Point* point,
                  Feature* feature) override {
  feature->set_name(GetFeatureName(*point, feature_list_));
  feature->mutable_location()->CopyFrom(*point);
  return Status::OK;
}

方法接收 RPC 的上下文对象、客户端的 Point Protocol Buffer 请求以及用于填充响应信息的 Feature Protocol Buffer。在方法中,我们使用适当的信息填充 Feature,然后返回 OK 状态,以告知 gRPC 我们已完成此 RPC 的处理,并且可以将 Feature 返回给客户端。

请注意,所有服务方法都可以(并且会!)同时从多个线程调用。您必须确保您的方法实现是线程安全的。在我们的示例中,feature_list_ 在构造后永不更改,因此设计上是安全的。但是如果 feature_list_ 在服务的生命周期内会发生变化,我们就需要同步对此成员的访问。

现在我们来看看更复杂一些的内容——流式 RPC。ListFeatures 是一个服务器端流式 RPC,因此我们需要向客户端发送多个 Feature

Status ListFeatures(ServerContext* context, const Rectangle* rectangle,
                    ServerWriter<Feature>* writer) override {
  auto lo = rectangle->lo();
  auto hi = rectangle->hi();
  long left = std::min(lo.longitude(), hi.longitude());
  long right = std::max(lo.longitude(), hi.longitude());
  long top = std::max(lo.latitude(), hi.latitude());
  long bottom = std::min(lo.latitude(), hi.latitude());
  for (const Feature& f : feature_list_) {
    if (f.location().longitude() >= left &&
        f.location().longitude() <= right &&
        f.location().latitude() >= bottom &&
        f.location().latitude() <= top) {
      writer->Write(f);
    }
  }
  return Status::OK;
}

正如您所见,这次在方法参数中,我们没有收到简单的请求和响应对象,而是收到一个请求对象(客户端希望在其中查找 FeatureRectangle)和一个特殊的 ServerWriter 对象。在方法中,我们填充需要返回的尽可能多的 Feature 对象,并使用 ServerWriterWrite() 方法将它们写入流。最后,就像在简单 RPC 中一样,我们返回 Status::OK 以告知 gRPC 我们已完成响应写入。

如果您查看客户端流式方法 RecordRoute,您会发现它非常相似,只不过这次我们获得的是 ServerReader,而不是请求对象和单个响应。我们使用 ServerReaderRead() 方法反复读取客户端的请求到一个请求对象(在本例中是 Point),直到没有更多消息为止:服务器需要在每次调用后检查 Read() 的返回值。如果为 true,流仍然正常,可以继续读取;如果为 false,则消息流已结束。

while (stream->Read(&point)) {
  ...//process client input
}

最后,我们来看看我们的双向流式 RPC RouteChat()

Status RouteChat(ServerContext* context,
                  ServerReaderWriter<RouteNote, RouteNote>* stream) override {
  RouteNote note;
  while (stream->Read(&note)) {
    std::unique_lock<std::mutex> lock(mu_);
    for (const RouteNote& n : received_notes_) {
      if (n.location().latitude() == note.location().latitude() &&
          n.location().longitude() == note.location().longitude()) {
        stream->Write(n);
      }
    }
    received_notes_.push_back(note);
  }

  return Status::OK;
}

这次我们获得一个 ServerReaderWriter,可用于读写消息。这里的读写语法与客户端流式和服务器端流式方法完全相同。尽管双方总是按写入顺序接收对方的消息,但客户端和服务器都可以按任何顺序读写——流是完全独立操作的。

请注意,由于 received_notes_ 是一个实例变量,可以由多个线程访问,因此我们在此使用互斥锁来保证独占访问。

启动服务器

一旦我们实现了所有方法,还需要启动 gRPC 服务器,以便客户端能够实际使用我们的服务。以下代码片段展示了如何为我们的 RouteGuide 服务执行此操作

void RunServer(const std::string& db_path) {
  std::string server_address("0.0.0.0:50051");
  RouteGuideImpl service(db_path);

  ServerBuilder builder;
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  builder.RegisterService(&service);
  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;
  server->Wait();
}

正如您所见,我们使用 ServerBuilder 构建并启动服务器。为此,我们执行以下步骤:

  1. 创建我们的服务实现类 RouteGuideImpl 的实例。
  2. 创建工厂类 ServerBuilder 的实例。
  3. 使用 builder 的 AddListeningPort() 方法指定我们要用于监听客户端请求的地址和端口。
  4. 使用 builder 注册我们的服务实现。
  5. 在 builder 上调用 BuildAndStart() 来为我们的服务创建并启动一个 RPC 服务器。
  6. 在服务器上调用 Wait() 进行阻塞等待,直到进程被杀死或调用 Shutdown()

创建客户端

在本节中,我们将介绍如何为我们的 RouteGuide 服务创建 C++ 客户端。您可以在 examples/cpp/route_guide/route_guide_client.cc 中查看完整的示例客户端代码。

创建存根

要调用服务方法,我们首先需要创建一个存根

首先,我们需要为我们的存根创建一个 gRPC 通道,指定我们要连接的服务器地址和端口——在本例中,我们将不使用 SSL

grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials());

现在我们可以使用通道,通过在从 .proto 生成的 RouteGuide 类中提供的 NewStub 方法来创建我们的存根。

public:
 RouteGuideClient(std::shared_ptr<ChannelInterface> channel,
                  const std::string& db)
     : stub_(RouteGuide::NewStub(channel)) {
   ...
 }

调用服务方法

现在我们来看看如何调用我们的服务方法。请注意,在本教程中,我们调用的是每种方法的阻塞/同步版本:这意味着 RPC 调用会等待服务器响应,并会返回响应或抛出异常。

简单 RPC

调用简单 RPC GetFeature 几乎就像调用本地方法一样直接。

Point point;
Feature feature;
point = MakePoint(409146138, -746188906);
GetOneFeature(point, &feature);

...

bool GetOneFeature(const Point& point, Feature* feature) {
  ClientContext context;
  Status status = stub_->GetFeature(&context, point, feature);
  ...
}

正如您所见,我们创建并填充了一个请求 Protocol Buffer 对象(在本例中是 Point),并创建一个响应 Protocol Buffer 对象供服务器填充。我们还为调用创建了一个 ClientContext 对象——您可以选择在此对象上设置 RPC 配置值,例如截止时间,但目前我们将使用默认设置。请注意,此对象不能在调用之间重复使用。最后,我们在存根上调用该方法,将上下文、请求和响应传递给它。如果方法返回 OK,那么我们可以从我们的响应对象中读取服务器返回的响应信息。

std::cout << "Found feature called " << feature->name()  << " at "
          << feature->location().latitude()/kCoordFactor_ << ", "
          << feature->location().longitude()/kCoordFactor_ << std::endl;
流式 RPCs

现在我们来看看我们的流式方法。如果您已经阅读了创建服务器一节,其中一些内容可能看起来非常熟悉——流式 RPCs 在客户端和服务器端以类似的方式实现。这里是我们调用服务器端流式方法 ListFeatures 的地方,它返回地理位置 Feature 的流

std::unique_ptr<ClientReader<Feature> > reader(
    stub_->ListFeatures(&context, rect));
while (reader->Read(&feature)) {
  std::cout << "Found feature called "
            << feature.name() << " at "
            << feature.location().latitude()/kCoordFactor_ << ", "
            << feature.location().longitude()/kCoordFactor_ << std::endl;
}
Status status = reader->Finish();

并非向方法传递上下文、请求和响应,我们传递上下文和请求,并获得一个 ClientReader 对象。客户端可以使用 ClientReader 读取服务器的响应。我们使用 ClientReaderRead() 方法重复读取服务器响应到一个响应 Protocol Buffer 对象(在本例中是 Feature),直到没有更多消息为止:客户端需要在每次调用后检查 Read() 的返回值。如果为 true,则流仍然正常,可以继续读取;如果为 false,则消息流已结束。最后,我们在流上调用 Finish() 以完成调用并获取我们的 RPC 状态。

客户端流式方法 RecordRoute 也很类似,只不过我们向方法传递上下文和响应对象,并获得一个 ClientWriter

std::unique_ptr<ClientWriter<Point> > writer(
    stub_->RecordRoute(&context, &stats));
for (int i = 0; i < kPoints; i++) {
  const Feature& f = feature_list_[feature_distribution(generator)];
  std::cout << "Visiting point "
            << f.location().latitude()/kCoordFactor_ << ", "
            << f.location().longitude()/kCoordFactor_ << std::endl;
  if (!writer->Write(f.location())) {
    // Broken stream.
    break;
  }
  std::this_thread::sleep_for(std::chrono::milliseconds(
      delay_distribution(generator)));
}
writer->WritesDone();
Status status = writer->Finish();
if (status.IsOk()) {
  std::cout << "Finished trip with " << stats.point_count() << " points\n"
            << "Passed " << stats.feature_count() << " features\n"
            << "Travelled " << stats.distance() << " meters\n"
            << "It took " << stats.elapsed_time() << " seconds"
            << std::endl;
} else {
  std::cout << "RecordRoute rpc failed." << std::endl;
}

一旦我们使用 Write() 将客户端请求写入流完成,我们需要在流上调用 WritesDone() 以告知 gRPC 我们已完成写入,然后调用 Finish() 来完成调用并获取 RPC 状态。如果状态为 OK,我们最初传递给 RecordRoute() 的响应对象将填充服务器的响应。

最后,我们来看看双向流式 RPC RouteChat()。在这种情况下,我们只需将上下文传递给方法,并获得一个 ClientReaderWriter,我们可以用它来读写消息。

std::shared_ptr<ClientReaderWriter<RouteNote, RouteNote> > stream(
    stub_->RouteChat(&context));

这里的读写语法与客户端流式和服务器端流式方法完全相同。尽管双方总是按写入顺序接收对方的消息,但客户端和服务器都可以按任何顺序读写——流是完全独立操作的。

动手尝试!

构建客户端和服务器

make

运行服务器

./route_guide_server --db_path=path/to/route_guide_db.json

从另一个终端运行客户端

./route_guide_client --db_path=path/to/route_guide_db.json