基础教程

gRPC C++ 基础教程简介。

基础教程

gRPC C++ 基础教程简介。

本教程为 C++ 程序员提供了 gRPC 使用的基础介绍。

通过此示例,您将学习如何

  • .proto 文件中定义服务。
  • 使用协议缓冲区编译器生成服务器和客户端代码。
  • 使用 C++ gRPC API 为您的服务编写一个简单的客户端和服务器。

它假设您已经阅读了 gRPC 简介,并熟悉 协议缓冲区。请注意,本教程中的示例使用了 proto3 版本的协议缓冲区语言:您可以在 proto3 语言指南C++ 生成代码指南中了解更多信息。

为什么使用 gRPC?

我们的示例是一个简单的路线映射应用程序,它允许客户端获取其路线上特征的信息,创建其路线摘要,并与服务器及其他客户端交换路线信息(例如交通更新)。

使用 gRPC,我们可以将服务定义一次写入 .proto 文件,然后以 gRPC 支持的任何语言生成客户端和服务器,这些客户端和服务器又可以在从大型数据中心内的服务器到您自己的平板电脑等各种环境中运行——不同语言和环境之间通信的所有复杂性都由 gRPC 为您处理。我们还获得了使用协议缓冲区的所有优点,包括高效序列化、简单的 IDL 和易于更新的接口。

示例代码和设置

示例代码位于 grpc 仓库的 examples/cpp/route_guide 下。获取示例代码并构建 gRPC

  1. 按照快速入门说明从源代码构建并本地安装 gRPC

  2. 从仓库文件夹中,切换到 route guide 示例目录

    cd examples/cpp/route_guide
    
  3. 运行 cmake

    mkdir -p cmake/build
    cd cmake/build
    cmake -DCMAKE_PREFIX_PATH=$MY_INSTALL_DIR ../..
    

定义服务

我们的第一步(如您从 gRPC 简介中所知)是使用 协议缓冲区定义 gRPC 服务以及方法请求响应类型。您可以在 examples/protos/route_guide.proto 中查看完整的 .proto 文件。

要定义服务,请在 .proto 文件中指定一个命名的 service

service RouteGuide {
   ...
}

然后,您在服务定义中定义 rpc 方法,并指定它们的请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide 服务中使用

  • 简单 RPC:客户端使用存根向服务器发送请求,并等待响应返回,就像普通的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 服务器端流式 RPC:客户端向服务器发送请求,并获得一个流来读取一系列消息。客户端从返回的流中读取,直到没有更多消息。如我们的示例所示,您可以通过在响应类型前放置 stream 关键字来指定服务器端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 客户端流式 RPC:客户端写入一系列消息并将其发送到服务器,同样使用提供的流。一旦客户端完成写入消息,它就会等待服务器读取所有消息并返回其响应。您可以通过在请求类型前放置 stream 关键字来指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 双向流式 RPC:双方使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以以任何顺序读写:例如,服务器可以等待接收所有客户端消息后再写入其响应,或者它可以交替地读取一条消息然后写入一条消息,或者其他一些读写组合。每个流中的消息顺序都得到保留。您可以通过在请求和响应前放置 stream 关键字来指定此类型的方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

我们的 .proto 文件还包含我们服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义——例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务器代码

接下来,我们需要根据 .proto 服务定义生成 gRPC 客户端和服务器接口。我们使用协议缓冲区编译器 protoc 和一个特殊的 gRPC C++ 插件来完成此操作。

为简单起见,我们提供了一个 CMakeLists.txt,它会为您运行 protoc,并带上适当的插件、输入和输出(如果您想自己运行,请确保您已安装 protoc 并首先按照 gRPC 代码安装说明操作)

make route_guide.grpc.pb.o

实际上会运行

protoc -I ../../protos --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` ../../protos/route_guide.proto
protoc -I ../../protos --cpp_out=. ../../protos/route_guide.proto

运行此命令将在当前目录中生成以下文件

  • route_guide.pb.h,声明您生成的邮件类的头文件
  • route_guide.pb.cc,包含您的消息类的实现
  • route_guide.grpc.pb.h,声明您生成的服务类的头文件
  • route_guide.grpc.pb.cc,包含您的服务类的实现

这些文件包含

  • 所有用于填充、序列化和检索我们的请求和响应消息类型的协议缓冲区代码

  • 一个名为 RouteGuide 的类,包含

    • 一个远程接口类型(或存根),供客户端调用 RouteGuide 服务中定义的方法。
    • 服务器需要实现的两个抽象接口,也包含 RouteGuide 服务中定义的方法。

创建服务器

首先让我们看看如何创建一个 RouteGuide 服务器。如果您只对创建 gRPC 客户端感兴趣,您可以跳过此部分并直接跳到创建客户端(尽管您可能仍然会觉得它很有趣!)。

使我们的 RouteGuide 服务正常工作需要两部分

  • 实现从我们的服务定义生成的服务接口:完成我们服务的实际“工作”。
  • 运行 gRPC 服务器以侦听来自客户端的请求并返回服务响应。

您可以在 examples/cpp/route_guide/route_guide_server.cc 中找到我们的 RouteGuide 服务器示例。让我们仔细看看它是如何工作的。

实现 RouteGuide

如您所见,我们的服务器有一个 RouteGuideImpl 类,它实现了生成的 RouteGuide::Service 接口

class RouteGuideImpl final : public RouteGuide::Service {
...
}

在这种情况下,我们正在实现 RouteGuide同步版本,它提供了我们默认的 gRPC 服务器行为。也可以实现一个异步接口 RouteGuide::AsyncService,它允许您进一步自定义服务器的线程行为,尽管在本教程中我们不会探讨这一点。

RouteGuideImpl 实现了我们所有的服务方法。让我们首先看看最简单的类型 GetFeature,它只是从客户端获取一个 Point,并从其数据库中返回相应的特征信息到一个 Feature 中。

Status GetFeature(ServerContext* context, const Point* point,
                  Feature* feature) override {
  feature->set_name(GetFeatureName(*point, feature_list_));
  feature->mutable_location()->CopyFrom(*point);
  return Status::OK;
}

该方法会传入一个 RPC 的上下文对象、客户端的 Point 协议缓冲区请求,以及一个要填充响应信息的 Feature 协议缓冲区。在方法中,我们用适当的信息填充 Feature,然后返回 OK 状态以告知 gRPC 我们已完成处理 RPC,并且 Feature 可以返回给客户端。

请注意,所有服务方法都可以(也将会!)同时从多个线程调用。您必须确保您的方法实现是线程安全的。在我们的示例中,feature_list_ 在构造后从不更改,因此其设计本身就是安全的。但是,如果 feature_list_ 在服务生命周期内发生变化,我们就需要同步对该成员的访问。

现在让我们看一个更复杂一点的例子——流式 RPC。ListFeatures 是一个服务器端流式 RPC,因此我们需要向客户端发送多个 Feature

Status ListFeatures(ServerContext* context, const Rectangle* rectangle,
                    ServerWriter<Feature>* writer) override {
  auto lo = rectangle->lo();
  auto hi = rectangle->hi();
  long left = std::min(lo.longitude(), hi.longitude());
  long right = std::max(lo.longitude(), hi.longitude());
  long top = std::max(lo.latitude(), hi.latitude());
  long bottom = std::min(lo.latitude(), hi.latitude());
  for (const Feature& f : feature_list_) {
    if (f.location().longitude() >= left &&
        f.location().longitude() <= right &&
        f.location().latitude() >= bottom &&
        f.location().latitude() <= top) {
      writer->Write(f);
    }
  }
  return Status::OK;
}

如您所见,这次我们不是在方法参数中获取简单的请求和响应对象,而是获取一个请求对象(客户端想要查找 FeatureRectangle)和一个特殊的 ServerWriter 对象。在该方法中,我们填充需要返回的任意数量的 Feature 对象,使用 ServerWriterWrite() 方法将它们写入 ServerWriter。最后,与我们的简单 RPC 一样,我们 return Status::OK 以告知 gRPC 我们已完成写入响应。

如果您查看客户端流式方法 RecordRoute,您会发现它非常相似,只不过这次我们获得的是 ServerReader 而不是请求对象和单个响应。我们使用 ServerReaderRead() 方法重复读取客户端的请求到请求对象(在此例中为 Point),直到没有更多消息为止:服务器需要在每次调用后检查 Read() 的返回值。如果为 true,则流仍然有效,可以继续读取;如果为 false,则消息流已结束。

while (stream->Read(&point)) {
  ...//process client input
}

最后,让我们看看我们的双向流式 RPC RouteChat()

Status RouteChat(ServerContext* context,
                  ServerReaderWriter<RouteNote, RouteNote>* stream) override {
  RouteNote note;
  while (stream->Read(&note)) {
    std::unique_lock<std::mutex> lock(mu_);
    for (const RouteNote& n : received_notes_) {
      if (n.location().latitude() == note.location().latitude() &&
          n.location().longitude() == note.location().longitude()) {
        stream->Write(n);
      }
    }
    received_notes_.push_back(note);
  }

  return Status::OK;
}

这次我们得到了一个 ServerReaderWriter,它可用于读写消息。这里的读写语法与我们的客户端流式和服务器流式方法完全相同。尽管双方总是会按照消息写入的顺序接收对方的消息,但客户端和服务器都可以以任何顺序读写——流完全独立运行。

请注意,由于 received_notes_ 是一个实例变量,并且可以被多个线程访问,因此我们在这里使用互斥锁来保证独占访问。

启动服务器

一旦我们实现了所有方法,我们还需要启动一个 gRPC 服务器,以便客户端能够实际使用我们的服务。以下代码片段展示了我们如何为 RouteGuide 服务执行此操作

void RunServer(const std::string& db_path) {
  std::string server_address("0.0.0.0:50051");
  RouteGuideImpl service(db_path);

  ServerBuilder builder;
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  builder.RegisterService(&service);
  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;
  server->Wait();
}

如您所见,我们使用 ServerBuilder 构建并启动服务器。为此,我们执行以下操作

  1. 创建服务实现类 RouteGuideImpl 的实例。
  2. 创建工厂类 ServerBuilder 的实例。
  3. 使用构建器的 AddListeningPort() 方法指定我们要用于侦听客户端请求的地址和端口。
  4. 向构建器注册我们的服务实现。
  5. 调用构建器的 BuildAndStart() 方法来创建并启动我们服务的 RPC 服务器。
  6. 在服务器上调用 Wait() 以进行阻塞等待,直到进程被终止或调用 Shutdown()

创建客户端

在本节中,我们将介绍如何为我们的 RouteGuide 服务创建 C++ 客户端。您可以在 examples/cpp/route_guide/route_guide_client.cc 中查看我们完整的客户端示例代码。

创建存根

要调用服务方法,我们首先需要创建一个存根

首先,我们需要为存根创建一个 gRPC 通道,指定我们要连接的服务器地址和端口——在本例中,我们将不使用 SSL

grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials());

现在我们可以使用该通道通过 .proto 文件中生成的 RouteGuide 类提供的 NewStub 方法来创建我们的存根。

public:
 RouteGuideClient(std::shared_ptr<ChannelInterface> channel,
                  const std::string& db)
     : stub_(RouteGuide::NewStub(channel)) {
   ...
 }

调用服务方法

现在让我们看看如何调用我们的服务方法。请注意,在本教程中我们调用的是每个方法的阻塞/同步版本:这意味着 RPC 调用会等待服务器响应,并会返回响应或抛出异常。

简单 RPC

调用简单的 RPC GetFeature 几乎与调用本地方法一样简单。

Point point;
Feature feature;
point = MakePoint(409146138, -746188906);
GetOneFeature(point, &feature);

...

bool GetOneFeature(const Point& point, Feature* feature) {
  ClientContext context;
  Status status = stub_->GetFeature(&context, point, feature);
  ...
}

如您所见,我们创建并填充一个请求协议缓冲区对象(在本例中为 Point),并创建一个响应协议缓冲区对象供服务器填充。我们还为我们的调用创建一个 ClientContext 对象——您可以在此对象上可选地设置 RPC 配置值,例如截止日期,但目前我们将使用默认设置。请注意,此对象不能在调用之间重用。最后,我们在存根上调用方法,传入上下文、请求和响应。如果方法返回 OK,那么我们就可以从响应对象中读取服务器返回的响应信息。

std::cout << "Found feature called " << feature->name()  << " at "
          << feature->location().latitude()/kCoordFactor_ << ", "
          << feature->location().longitude()/kCoordFactor_ << std::endl;
流式 RPC

现在让我们看看我们的流式方法。如果您已经阅读过创建服务器,其中一些内容可能看起来非常熟悉——流式 RPC 在两端都以类似的方式实现。以下是我们调用服务器端流式方法 ListFeatures 的位置,该方法返回地理 Feature 的流

std::unique_ptr<ClientReader<Feature> > reader(
    stub_->ListFeatures(&context, rect));
while (reader->Read(&feature)) {
  std::cout << "Found feature called "
            << feature.name() << " at "
            << feature.location().latitude()/kCoordFactor_ << ", "
            << feature.location().longitude()/kCoordFactor_ << std::endl;
}
Status status = reader->Finish();

我们不向方法传递上下文、请求和响应,而是传递上下文和请求,并返回一个 ClientReader 对象。客户端可以使用 ClientReader 读取服务器的响应。我们使用 ClientReaderRead() 方法反复读取服务器的响应到响应协议缓冲区对象(在本例中为 Feature),直到没有更多消息:客户端需要在每次调用后检查 Read() 的返回值。如果为 true,则流仍然有效,可以继续读取;如果为 false,则消息流已结束。最后,我们对流调用 Finish() 以完成调用并获取 RPC 状态。

客户端流式方法 RecordRoute 类似,只不过我们向方法传递一个上下文和响应对象,并返回一个 ClientWriter

std::unique_ptr<ClientWriter<Point> > writer(
    stub_->RecordRoute(&context, &stats));
for (int i = 0; i < kPoints; i++) {
  const Feature& f = feature_list_[feature_distribution(generator)];
  std::cout << "Visiting point "
            << f.location().latitude()/kCoordFactor_ << ", "
            << f.location().longitude()/kCoordFactor_ << std::endl;
  if (!writer->Write(f.location())) {
    // Broken stream.
    break;
  }
  std::this_thread::sleep_for(std::chrono::milliseconds(
      delay_distribution(generator)));
}
writer->WritesDone();
Status status = writer->Finish();
if (status.IsOk()) {
  std::cout << "Finished trip with " << stats.point_count() << " points\n"
            << "Passed " << stats.feature_count() << " features\n"
            << "Travelled " << stats.distance() << " meters\n"
            << "It took " << stats.elapsed_time() << " seconds"
            << std::endl;
} else {
  std::cout << "RecordRoute rpc failed." << std::endl;
}

一旦我们使用 Write() 完成了客户端请求的写入流,我们需要在流上调用 WritesDone() 以告知 gRPC 我们已完成写入,然后调用 Finish() 以完成调用并获取 RPC 状态。如果状态为 OK,我们最初传递给 RecordRoute() 的响应对象将填充服务器的响应。

最后,让我们看看我们的双向流式 RPC RouteChat()。在这种情况下,我们只需将上下文传递给方法,然后返回一个 ClientReaderWriter,我们可以使用它来读写消息。

std::shared_ptr<ClientReaderWriter<RouteNote, RouteNote> > stream(
    stub_->RouteChat(&context));

这里的读写语法与我们的客户端流式和服务器流式方法完全相同。尽管双方总是会按照消息写入的顺序接收对方的消息,但客户端和服务器都可以以任何顺序读写——流完全独立运行。

尝试一下!

构建客户端和服务器

make

运行服务器

./route_guide_server --db_path=path/to/route_guide_db.json

在另一个终端中,运行客户端

./route_guide_client --db_path=path/to/route_guide_db.json