异步回调 API 教程

异步回调 API 教程

本教程将演示如何使用 gRPC 的异步回调 API 编写一个简单的 C++ 服务器和客户端。本教程中使用的示例遵循 RouteGuide 示例

概述

gRPC C++ 提供了两种类型的 API:同步 API 和异步 API。具体来说,我们有两种异步 API:旧版基于完成队列(completion-queue)的 API,以及更易于使用的新版基于回调的 API。在本教程中,我们将重点介绍基于回调的异步 API(简称回调 API)。你将学习如何使用回调 API 为以下几种 RPC 实现服务器和客户端。

  • 一元 RPC
  • 服务器端流式 RPC
  • 客户端流式 RPC
  • 双向流式 RPC

示例代码

在本教程中,我们将创建一个路线引导应用程序。客户端可以获取路线上的特征信息,创建路线摘要,并与服务器及其他客户端交换交通更新等路线信息。

以下是在 Protocol Buffers 中定义的服务接口。

// Interface exported by the server.
service RouteGuide {
  // A simple RPC.
  //
  // Obtains the feature at a given position.
  //
  // A feature with an empty name is returned if there's no feature at the given
  // position.
  rpc GetFeature(Point) returns (Feature) {}

  // A server-to-client streaming RPC.
  //
  // Obtains the Features available within the given Rectangle.  Results are
  // streamed rather than returned at once (e.g. in a response message with a
  // repeated field), as the rectangle may cover a large area and contain a
  // huge number of features.
  rpc ListFeatures(Rectangle) returns (stream Feature) {}

  // A client-to-server streaming RPC.
  //
  // Accepts a stream of Points on a route being traversed, returning a
  // RouteSummary when traversal is completed.
  rpc RecordRoute(stream Point) returns (RouteSummary) {}

  // A Bidirectional streaming RPC.
  //
  // Accepts a stream of RouteNotes sent while a route is being traversed,
  // while receiving other RouteNotes (e.g. from other users).
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

同样的示例也使用同步 API 进行了实现。如果你感兴趣,可以对比这两种实现方式。

需要实现的服务

由于我们要使用回调 API 来实现服务,因此我们需要实现的服务接口是 RouteGuide::CallbackService

class RouteGuideImpl final : public RouteGuide::CallbackService {
  ...
};

我们将在以下各节的“服务器”小节中实现该服务中的所有四个 RPC。

一元 RPC

让我们从最简单的 RPC 开始:GetFeature,这是一个一元 RPC。在 GetFeature 中,客户端向服务器发送一个 Point,然后服务器向客户端返回该 PointFeature

服务器

此 RPC 的实现非常简单直接。

  grpc::ServerUnaryReactor* GetFeature(CallbackServerContext* context,
                                       const Point* point,
                                       Feature* feature) override {
    feature->set_name(GetFeatureName(*point, feature_list_));
    feature->mutable_location()->CopyFrom(*point);
    auto* reactor = context->DefaultReactor();
    reactor->Finish(Status::OK);
    return reactor;
  }

设置完 Feature 的输出字段后,我们通过 ServerUnaryReactor 返回最终状态。

自定义一元 Reactor

上面的示例使用了默认 Reactor。如果我们想要处理特定操作(例如 RPC 取消或在 RPC 完成时异步运行某个操作),也可以在这里使用自定义 Reactor。在下面的示例中,我们为这两种操作添加了日志。

  grpc::ServerUnaryReactor* GetFeature(grpc::CallbackServerContext* context,
                                       const Point* point,
                                       Feature* feature) override {
    class Reactor : public grpc::ServerUnaryReactor {
     public:
      Reactor(const Point& point, const std::vector<Feature>& feature_list,
              Feature* feature) {
        feature->set_name(GetFeatureName(point, feature_list));
        *feature->mutable_location() = point;
        Finish(grpc::Status::OK);
      }

     private:
      void OnDone() override {
        LOG(INFO) << "RPC Completed";
        delete this;
      }

      void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }
    };
    return new Reactor(*point, feature_list_, feature);
  }

对于 ServerUnaryReactor,我们需要重写 OnDone(),并可选择重写 OnCancel()

注意 回调方法(如 OnDone())应该能够快速返回。切勿在此类回调中执行阻塞操作(例如等待某个事件)。

ServerUnaryReactor 的构造函数在 GetFeature() 构建并提供响应启动 RPC 的 Reactor 时被调用。它会收集请求 Point、响应 Featurefeature_list。然后,它从 Point 获取响应 Feature 并将其添加到 feature_list 中。为了完成 RPC,我们调用 Finish(Status::OK)

OnDone() 响应 RPC 的完成。我们将在 OnDone() 中执行最终的清理工作,并记录 RPC 的结束。

OnCancel() 响应 RPC 的取消。在此方法中,我们记录取消操作的发生。

客户端

注意:为简洁起见,本教程不讨论如何创建通道(channel)和存根(stub)。请参阅 基础教程 获取相关信息。

要启动 GetFeature RPC,除了 ClientContext、请求(即 Point)和响应(即 Feature)外,客户端还需要将一个回调(即 std::function<void(::grpc::Status)>)传递给 stub_->async()->GetFeature()。该回调将在服务器完成请求且 RPC 结束后被调用。

  bool GetOneFeature(const Point& point, Feature* feature) {
    ClientContext context;
    bool result;
    std::mutex mu;
    std::condition_variable cv;
    bool done = false;
    stub_->async()->GetFeature(
        &context, &point, feature,
        [&result, &mu, &cv, &done, feature, this](Status status) {
          bool ret;
          if (!status.ok()) {
            std::cout << "GetFeature rpc failed." << std::endl;
            ret = false;
          } else if (!feature->has_location()) {
            std::cout << "Server returns incomplete feature." << std::endl;
            ret = false;
          } else if (feature->name().empty()) {
            std::cout << "Found no feature at "
                      << feature->location().latitude() / kCoordFactor_ << ", "
                      << feature->location().longitude() / kCoordFactor_
                      << std::endl;
            ret = true;
          } else {
            std::cout << "Found feature called " << feature->name() << " at "
                      << feature->location().latitude() / kCoordFactor_ << ", "
                      << feature->location().longitude() / kCoordFactor_
                      << std::endl;
            ret = true;
          }
          std::lock_guard<std::mutex> lock(mu);
          result = ret;
          done = true;
          cv.notify_one();
        });
    std::unique_lock<std::mutex> lock(mu);
    cv.wait(lock, [&done] { return done; });
    return result;
  }

回调可以为一元 RPC 执行各种后续工作。例如,上述片段中的回调会检查状态和返回的特征,释放为此调用分配的堆内存对象,最后通知 RPC 已完成。

为简洁起见,该示例显示了同一个函数在等待 RPC 完成的通知,但这并非必须。

服务器端流式 RPC

现在让我们看看一个更复杂的 RPC - ListFeaturesListFeatures 是一个服务器流式 RPC。客户端向服务器发送一个 Rectangle,服务器将向客户端返回一系列 Feature,每个特征都在单独的消息中发送。

服务器

对于任何流式 RPC(包括服务器流式 RPC),RPC 处理程序的接口都是相似的。处理程序没有任何输入参数;返回类型是某种服务器 Reactor,它负责处理一个 RPC 的所有业务逻辑。

以下是 ListFeatures 的处理程序接口。

  grpc::ServerWriteReactor<Feature>* ListFeatures(
      CallbackServerContext* context,
      const routeguide::Rectangle* rectangle);

由于 ListFeatures 是一个服务器流式 RPC,返回类型应为 ServerWriteReactorServerWriteReactor 有两个模板参数:Rectangle 是来自客户端的请求类型;Feature 是来自服务器的每个响应消息的类型。

处理 RPC 的复杂性被委托给了 ServerWriteReactor。下面是我们如何实现 ServerWriteReactor 来处理 ListFeatures RPC。

  grpc::ServerWriteReactor<Feature>* ListFeatures(
      CallbackServerContext* context,
      const routeguide::Rectangle* rectangle) override {
    class Lister : public grpc::ServerWriteReactor<Feature> {
     public:
      Lister(const routeguide::Rectangle* rectangle,
             const std::vector<Feature>* feature_list)
          : left_((std::min)(rectangle->lo().longitude(),
                             rectangle->hi().longitude())),
            right_((std::max)(rectangle->lo().longitude(),
                              rectangle->hi().longitude())),
            top_((std::max)(rectangle->lo().latitude(),
                            rectangle->hi().latitude())),
            bottom_((std::min)(rectangle->lo().latitude(),
                               rectangle->hi().latitude())),
            feature_list_(feature_list),
            next_feature_(feature_list_->begin()) {
        NextWrite();
      }

      void OnWriteDone(bool ok) override {
        if (!ok) {
          Finish(Status(grpc::StatusCode::UNKNOWN, "Unexpected Failure"));
        }
        NextWrite();
      }

      void OnDone() override {
        LOG(INFO) << "RPC Completed";
        delete this;
      }

      void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }

     private:
      void NextWrite() {
        while (next_feature_ != feature_list_->end()) {
          const Feature& f = *next_feature_;
          next_feature_++;
          if (f.location().longitude() >= left_ &&
              f.location().longitude() <= right_ &&
              f.location().latitude() >= bottom_ &&
              f.location().latitude() <= top_) {
            StartWrite(&f);
            return;
          }
        }
        // Didn't write anything, all is done.
        Finish(Status::OK);
      }
      const long left_;
      const long right_;
      const long top_;
      const long bottom_;
      const std::vector<Feature>* feature_list_;
      std::vector<Feature>::const_iterator next_feature_;
    };
    return new Lister(rectangle, &feature_list_);
  }

不同的 Reactor 具有不同的回调方法。我们需要重写我们感兴趣的方法来实现我们的 RPC。对于 ListFeatures,我们需要重写 OnWriteDone()OnDone(),以及可选的 OnCancel

ServerWriteReactor 的构造函数在 ListFeatures() 构建并提供响应启动 RPC 的 Reactor 时被调用。它将 rectangle 内的所有 Feature 收集到 features_to_send_ 中,并在有数据时开始发送。

OnWriteDone() 响应写入完成。如果写入成功完成,我们将继续发送下一个 Feature,直到 features_to_send_ 为空,此时我们将调用 Finish(Status::OK) 以完成调用。

OnDone() 响应 RPC 的完成。我们将在 OnDone() 中执行最终的清理工作。

OnCancel() 响应 RPC 的取消。我们在此方法中记录取消操作的发生。

客户端

与服务器端类似,客户端也需要实现某种客户端 Reactor 来与服务器交互。客户端 Reactor 封装了处理 RPC 所需的所有操作。

由于 ListFeatures 是服务器流式的,我们应该实现一个 ClientReadReactor,其名称与 ServerWriteReactor 对称。

    class Reader : public grpc::ClientReadReactor<Feature> {
     public:
      Reader(RouteGuide::Stub* stub, float coord_factor,
             const routeguide::Rectangle& rect)
          : coord_factor_(coord_factor) {
        stub->async()->ListFeatures(&context_, &rect, this);
        StartRead(&feature_);
        StartCall();
      }
      void OnReadDone(bool ok) override {
        if (ok) {
          std::cout << "Found feature called " << feature_.name() << " at "
                    << feature_.location().latitude() / coord_factor_ << ", "
                    << feature_.location().longitude() / coord_factor_
                    << std::endl;
          StartRead(&feature_);
        }
      }
      void OnDone(const Status& s) override {
        std::unique_lock<std::mutex> l(mu_);
        status_ = s;
        done_ = true;
        cv_.notify_one();
      }
      Status Await() {
        std::unique_lock<std::mutex> l(mu_);
        cv_.wait(l, [this] { return done_; });
        return std::move(status_);
      }

     private:
      ClientContext context_;
      float coord_factor_;
      Feature feature_;
      std::mutex mu_;
      std::condition_variable cv_;
      Status status_;
      bool done_ = false;
    };

ClientReadReactor 模板化了一个参数 Feature,它是来自服务器的响应消息类型。

Reader 的构造函数中,我们将 ClientContext&rectangle_(请求对象)和 Reader 传递给 RPC 方法 stub->async()->ListFeatures()。然后,我们将 &feature_ 传递给 StartRead() 以指定存储接收到的响应的位置。最后,调用 StartCall() 来激活 RPC!

OnReadDone() 响应读取完成。如果读取成功完成,我们将继续读取下一个 Feature,直到失败为止(由 okfalse 指示)。

OnDone() 响应 RPC 的完成。它检查 RPC 状态结果,并通知正在等待 OnDone() 的条件变量。

Await() 不是 ClientReadReactor 的方法。这只是为了简单起见而添加的,以便示例知道 RPC 已完成。或者,如果不需要完成通知,OnDone() 只需在清理后返回即可,例如释放堆分配的对象。

要启动 RPC,客户端只需实例化一个 ReadReactor 并等待 RPC 完成。

    routeguide::Rectangle rect;
    Feature feature;

    rect.mutable_lo()->set_latitude(400000000);
    rect.mutable_lo()->set_longitude(-750000000);
    rect.mutable_hi()->set_latitude(420000000);
    rect.mutable_hi()->set_longitude(-730000000);
    std::cout << "Looking for features between 40, -75 and 42, -73"
              << std::endl;

    Reader reader(stub_.get(), kCoordFactor_, rect);
    Status status = reader.Await();
    if (status.ok()) {
      std::cout << "ListFeatures rpc succeeded." << std::endl;
    } else {
      std::cout << "ListFeatures rpc failed." << std::endl;
    }

客户端流式 RPC

一旦你理解了上一节中服务器流式 RPC 的概念,你应该会发现客户端流式 RPC 很简单。

RecordRoute 是我们将要讨论的客户端流式 RPC。客户端向服务器发送一系列 Point,服务器将在客户端完成发送 Point 后返回一个 RouteSummary

服务器

客户端流式 RPC 的 RPC 处理程序接口没有任何输入参数,其返回类型是一个服务器 Reactor,即 ServerReadReactor

ServerReadReactor 有两个模板参数:Point 是来自客户端的每个请求消息的类型;RouteSummary 是来自服务器的响应类型。

ServerWriteReactor 类似,ServerReadReactor 是处理 RPC 的类。

grpc::ServerReadReactor<Point>* RecordRoute(CallbackServerContext* context,
                                              RouteSummary* summary) override {
    class Recorder : public grpc::ServerReadReactor<Point> {
     public:
      Recorder(RouteSummary* summary, const std::vector<Feature>* feature_list)
          : start_time_(system_clock::now()),
            summary_(summary),
            feature_list_(feature_list) {
        StartRead(&point_);
      }

      void OnReadDone(bool ok) override {
        if (ok) {
          point_count_++;
          if (!GetFeatureName(point_, *feature_list_).empty()) {
            feature_count_++;
          }
          if (point_count_ != 1) {
            distance_ += GetDistance(previous_, point_);
          }
          previous_ = point_;
          StartRead(&point_);
        } else {
          summary_->set_point_count(point_count_);
          summary_->set_feature_count(feature_count_);
          summary_->set_distance(static_cast<long>(distance_));
          auto secs = std::chrono::duration_cast<std::chrono::seconds>(
              system_clock::now() - start_time_);
          summary_->set_elapsed_time(secs.count());
          Finish(Status::OK);
        }
      }

      void OnDone() override {
        LOG(INFO) << "RPC Completed";
        delete this;
      }

      void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }

     private:
      system_clock::time_point start_time_;
      RouteSummary* summary_;
      const std::vector<Feature>* feature_list_;
      Point point_;
      int point_count_ = 0;
      int feature_count_ = 0;
      float distance_ = 0.0;
      Point previous_;
    };
    return new Recorder(summary, &feature_list_);
  }

ServerReadReactor 的构造函数在 RecordRoute() 构建并提供响应启动 RPC 的 Reactor 时被调用。构造函数存储 RouteSummary* 以便稍后返回响应,并通过调用 StartRead(&point_) 发起读取操作。

OnReadDone() 响应读取完成。如果读取成功完成,我们将使用新收到的 Point 更新统计信息,并继续读取下一个 Point,直到失败为止(由 okfalse 指示)。读取失败后,服务器会将响应设置到 summary_ 中,并调用 Finish(Status::OK) 来完成 RPC。

OnDone() 响应 RPC 的完成。我们将在 OnDone() 中执行最终的清理工作。

OnCancel() 响应 RPC 的取消。我们在此方法中记录取消操作的发生。

客户端

不出所料,我们需要为客户端侧实现一个客户端 Reactor,该客户端 Reactor 被称为 ClientWriteReactor

    class Recorder : public grpc::ClientWriteReactor<Point> {
     public:
      Recorder(RouteGuide::Stub* stub, float coord_factor,
               const std::vector<Feature>* feature_list)
          : coord_factor_(coord_factor),
            feature_list_(feature_list),
            generator_(
                std::chrono::system_clock::now().time_since_epoch().count()),
            feature_distribution_(0, feature_list->size() - 1),
            delay_distribution_(500, 1500) {
        stub->async()->RecordRoute(&context_, &stats_, this);
        // Use a hold since some StartWrites are invoked indirectly from a
        // delayed lambda in OnWriteDone rather than directly from the reaction
        // itself
        AddHold();
        NextWrite();
        StartCall();
      }
      void OnWriteDone(bool ok) override {
        // Delay and then do the next write or WritesDone
        alarm_.Set(
            std::chrono::system_clock::now() +
                std::chrono::milliseconds(delay_distribution_(generator_)),
            [this](bool /*ok*/) { NextWrite(); });
      }
      void OnDone(const Status& s) override {
        std::unique_lock<std::mutex> l(mu_);
        status_ = s;
        done_ = true;
        cv_.notify_one();
      }
      Status Await(RouteSummary* stats) {
        std::unique_lock<std::mutex> l(mu_);
        cv_.wait(l, [this] { return done_; });
        *stats = stats_;
        return std::move(status_);
      }

     private:
      void NextWrite() {
        if (points_remaining_ != 0) {
          const Feature& f =
              (*feature_list_)[feature_distribution_(generator_)];
          std::cout << "Visiting point "
                    << f.location().latitude() / coord_factor_ << ", "
                    << f.location().longitude() / coord_factor_ << std::endl;
          StartWrite(&f.location());
          points_remaining_--;
        } else {
          StartWritesDone();
          RemoveHold();
        }
      }
      ClientContext context_;
      float coord_factor_;
      int points_remaining_ = 10;
      Point point_;
      RouteSummary stats_;
      const std::vector<Feature>* feature_list_;
      std::default_random_engine generator_;
      std::uniform_int_distribution<int> feature_distribution_;
      std::uniform_int_distribution<int> delay_distribution_;
      grpc::Alarm alarm_;
      std::mutex mu_;
      std::condition_variable cv_;
      Status status_;
      bool done_ = false;
    };

ClientWriteReactor 模板化了一个参数 Point,它是来自客户端的请求消息类型。

Recorder 的构造函数中,我们将 ClientContext&stats_(响应对象)和 Recorder 传递给 RPC 方法 stub->async()->RecordRoute()。然后,我们添加一个操作来发送 points_to_send_ 中的第一个 Point(如果有的话)。注意,如果在启动 RPC 时没有任何东西要发送,我们需要调用 StartWritesDone() 来通知服务器我们已经完成了写入。最后,调用 StartCall() 激活 RPC。

OnWriteDone() 响应写入完成。如果写入成功完成,我们将继续写入下一个 Point,直到 points_to_send_ 为空。对于要发送的最后一个 Point,我们调用 StartWriteLast() 来附带写入已完成的信号。StartWriteLast() 的效果等同于结合使用 StartWrite()StartWritesDone(),但效率更高。

OnDone() 响应 RPC 的完成。它检查 RPC 状态结果和 stats_ 中的响应,并通知正在等待 OnDone() 的条件变量。

Await() 不是 ClientWriteReactor 的方法。我们添加 Await() 是为了让调用者可以等待直到 RPC 完成。

要启动 RPC,客户端只需实例化一个 Recorder 并等待 RPC 完成。

    Recorder recorder(stub_.get(), kCoordFactor_, &feature_list_);
    RouteSummary stats;
    Status status = recorder.Await(&stats);
    if (status.ok()) {
      std::cout << "Finished trip with " << stats.point_count() << " points\n"
                << "Passed " << stats.feature_count() << " features\n"
                << "Travelled " << stats.distance() << " meters\n"
                << "It took " << stats.elapsed_time() << " seconds"
                << std::endl;
    } else {
      std::cout << "RecordRoute rpc failed." << std::endl;
    }

双向流式 RPC

最后,我们将看看双向流式 RPC RouteChat。在这种情况下,客户端向服务器发送一系列 RouteNote。每次在某个 Point 发送一个 RouteNote 时,服务器都会返回所有客户端之前在该 Point 发送过的所有 RouteNote 序列。

服务器

同样,双向流式 RPC 的 RPC 处理程序接口没有任何输入参数,其返回类型是一个服务器 Reactor,即 ServerBidiReactor

ServerBidiReactor 有两个模板参数,它们都是 RouteNote,因为 RouteNote 是请求和响应的消息类型。毕竟,RouteChat 的目的就是让客户端聊天并相互共享信息!

既然我们已经讨论了 ServerWriteReactorServerReadReactorServerBidiReactor 就非常直观了。

  grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat(
      CallbackServerContext* context) override {
    class Chatter : public grpc::ServerBidiReactor<RouteNote, RouteNote> {
     public:
      Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes)
          : mu_(mu), received_notes_(received_notes) {
        StartRead(&note_);
      }

      void OnReadDone(bool ok) override {
        if (ok) {
          // Unlike the other example in this directory that's not using
          // the reactor pattern, we can't grab a local lock to secure the
          // access to the notes vector, because the reactor will most likely
          // make us jump threads, so we'll have to use a different locking
          // strategy. We'll grab the lock locally to build a copy of the
          // list of nodes we're going to send, then we'll grab the lock
          // again to append the received note to the existing vector.
          mu_->Lock();
          std::copy_if(received_notes_->begin(), received_notes_->end(),
                       std::back_inserter(to_send_notes_),
                       [this](const RouteNote& note) {
                         return note.location().latitude() ==
                                    note_.location().latitude() &&
                                note.location().longitude() ==
                                    note_.location().longitude();
                       });
          mu_->Unlock();
          notes_iterator_ = to_send_notes_.begin();
          NextWrite();
        } else {
          Finish(Status::OK);
        }
      }
      void OnWriteDone(bool /*ok*/) override { NextWrite(); }

      void OnDone() override {
        LOG(INFO) << "RPC Completed";
        delete this;
      }

      void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }

     private:
      void NextWrite() {
        if (notes_iterator_ != to_send_notes_.end()) {
          StartWrite(&*notes_iterator_);
          notes_iterator_++;
        } else {
          mu_->Lock();
          received_notes_->push_back(note_);
          mu_->Unlock();
          StartRead(&note_);
        }
      }

      RouteNote note_;
      absl::Mutex* mu_;
      std::vector<RouteNote>* received_notes_ ABSL_GUARDED_BY(mu_);
      std::vector<RouteNote> to_send_notes_;
      std::vector<RouteNote>::iterator notes_iterator_;
    };
    return new Chatter(&mu_, &received_notes_);
  }

ServerBidiReactor 的构造函数在 RouteChat() 构建并提供响应启动 RPC 的 Reactor 时被调用。构造函数通过调用 StartRead(&received_note_) 发起读取操作。

OnReadDone() 响应读取完成。如果读取成功完成(即 oktrue),我们将继续读取下一个 RouteNote;否则,我们将记录所有读取已完成,并结束 RPC。对于成功读取后新收到的 RouteNote,我们将其添加到 received_notes_ 中,并将之前在同一 Point 收到的笔记追加到 to_send_notes_ 中。每当 to_send_notes_ 不为空时,我们就开始发送 to_send_notes_ 中的 RouteNote

OnWriteDone() 响应写入完成。如果写入成功完成,我们将继续发送下一个 RouteNote,直到 to_send_notes_ 为空。此时,我们将继续读取下一个 RouteNote,或者如果读取也已完成,则结束 RPC。

OnDone() 响应 RPC 的完成。我们将在 OnDone() 中执行最终的清理工作。

OnCancel() 响应 RPC 的取消。我们在此方法中记录取消操作的发生。

客户端

是的,双向流式 RPC 的客户端 Reactor 是 ClientBidiReactor

    class Chatter : public grpc::ClientBidiReactor<RouteNote, RouteNote> {
     public:
      explicit Chatter(RouteGuide::Stub* stub)
          : notes_{MakeRouteNote("First message", 0, 0),
                   MakeRouteNote("Second message", 0, 1),
                   MakeRouteNote("Third message", 1, 0),
                   MakeRouteNote("Fourth message", 0, 0)},
            notes_iterator_(notes_.begin()) {
        stub->async()->RouteChat(&context_, this);
        NextWrite();
        StartRead(&server_note_);
        StartCall();
      }
      void OnWriteDone(bool ok) override {
        if (ok) {
          NextWrite();
        }
      }
      void OnReadDone(bool ok) override {
        if (ok) {
          std::cout << "Got message " << server_note_.message() << " at "
                    << server_note_.location().latitude() << ", "
                    << server_note_.location().longitude() << std::endl;
          StartRead(&server_note_);
        }
      }
      void OnDone(const Status& s) override {
        std::unique_lock<std::mutex> l(mu_);
        status_ = s;
        done_ = true;
        cv_.notify_one();
      }
      Status Await() {
        std::unique_lock<std::mutex> l(mu_);
        cv_.wait(l, [this] { return done_; });
        return std::move(status_);
      }

     private:
      void NextWrite() {
        if (notes_iterator_ != notes_.end()) {
          const auto& note = *notes_iterator_;
          std::cout << "Sending message " << note.message() << " at "
                    << note.location().latitude() << ", "
                    << note.location().longitude() << std::endl;
          StartWrite(&note);
          notes_iterator_++;
        } else {
          StartWritesDone();
        }
      }
      ClientContext context_;
      const std::vector<RouteNote> notes_;
      std::vector<RouteNote>::const_iterator notes_iterator_;
      RouteNote server_note_;
      std::mutex mu_;
      std::condition_variable cv_;
      Status status_;
      bool done_ = false;
    };

ClientBidiReactor 模板化了两个参数,即请求和响应的消息类型,在 RouteChat RPC 的情况下,它们都是 RouteNote

Chatter 的构造函数中,我们将 ClientContextChatter 传递给 RPC 方法 stub->async()->RouteChat()。然后,我们添加一个操作来发送 notes_ 中的第一个 RouteNote(如果有的话)。注意,如果在启动 RPC 时没有任何东西要发送,我们需要调用 StartWritesDone() 来通知服务器我们已经完成了写入。我们还调用 StartRead() 来添加读取操作。最后,调用 StartCall() 激活 RPC。

OnReadDone() 响应读取完成。如果读取成功完成,我们将继续读取下一个 RouteNote,直到失败为止(由 okfalse 指示)。

OnWriteDone() 响应写入完成。如果写入成功完成,我们将继续写入下一个 RouteNote,直到 notes_ 为空。对于要发送的最后一个 RouteNote,我们调用 StartWriteLast() 来附带写入已完成的信号。StartWriteLast() 的效果等同于结合使用 StartWrite()StartWritesDone(),但效率更高。

OnDone() 响应 RPC 的完成。它检查 RPC 状态结果和消息统计信息,并通知正在等待 OnDone() 的条件变量。

Await() 不是 ClientBidiReactor 的方法。我们添加 Await() 是为了让调用者可以等待直到 RPC 完成。

要启动 RPC,客户端只需实例化一个 Chatter 并等待 RPC 完成。

    Chatter chatter(stub_.get());
    Status status = chatter.Await();
    if (!status.ok()) {
      std::cout << "RouteChat rpc failed." << std::endl;
    }
最后修改于 2024 年 6 月 18 日:C++ 回调 API 教程 (#1305) (18b12d6)