异步回调 API 教程

异步回调 API 教程

本教程向您展示如何使用 gRPC 的异步回调 API 在 C++ 中编写一个简单的服务器和客户端。本教程中使用的示例遵循 RouteGuide 示例

概述

gRPC C++ 提供了两种 API:同步 API 和异步 API。更具体地说,我们有两种异步 API:旧版基于完成队列;新版基于回调,更易于使用。在本教程中,我们将重点介绍基于回调的异步 API(简称回调 API)。您将学习如何使用回调 API 实现以下各类 RPC 的服务器和客户端

  • Unary RPC
  • 服务器端流式 RPC
  • 客户端流式 RPC
  • 双向流式 RPC

示例代码

在本教程中,我们将创建一个路线指引应用。客户端可以获取其路线上的特性信息,创建其路线摘要,并与服务器及其他客户端交换交通更新等路线信息。

下面是 Protocol Buffers 中定义的服务接口。

// Interface exported by the server.
service RouteGuide {
  // A simple RPC.
  //
  // Obtains the feature at a given position.
  //
  // A feature with an empty name is returned if there's no feature at the given
  // position.
  rpc GetFeature(Point) returns (Feature) {}

  // A server-to-client streaming RPC.
  //
  // Obtains the Features available within the given Rectangle.  Results are
  // streamed rather than returned at once (e.g. in a response message with a
  // repeated field), as the rectangle may cover a large area and contain a
  // huge number of features.
  rpc ListFeatures(Rectangle) returns (stream Feature) {}

  // A client-to-server streaming RPC.
  //
  // Accepts a stream of Points on a route being traversed, returning a
  // RouteSummary when traversal is completed.
  rpc RecordRoute(stream Point) returns (RouteSummary) {}

  // A Bidirectional streaming RPC.
  //
  // Accepts a stream of RouteNotes sent while a route is being traversed,
  // while receiving other RouteNotes (e.g. from other users).
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

同样的示例也使用 同步 API 实现。如果您有兴趣,可以比较这两种实现。

要实现的 Service

由于我们希望使用回调 API 实现 service,因此我们应该实现的 service 接口是 RouteGuide::CallbackService

class RouteGuideImpl final : public RouteGuide::CallbackService {
  ...
};

我们将在以下各节的 Server 子节中实现此 service 中的所有四种 RPC。

Unary RPC

让我们从最简单的 RPC 开始:GetFeature,它是一个 Unary RPC。通过 GetFeature,客户端向服务器发送一个 Point,然后服务器将该 PointFeature 返回给客户端。

服务器

此 RPC 的实现非常简单直接。

  grpc::ServerUnaryReactor* GetFeature(CallbackServerContext* context,
                                       const Point* point,
                                       Feature* feature) override {
    feature->set_name(GetFeatureName(*point, feature_list_));
    feature->mutable_location()->CopyFrom(*point);
    auto* reactor = context->DefaultReactor();
    reactor->Finish(Status::OK);
    return reactor;
  }

设置 Feature 的输出字段后,我们通过 ServerUnaryReactor 返回最终状态。

自定义 Unary Reactor

上面的示例使用了 Default Reactor。如果想处理特定操作(例如 RPC 取消)或在 RPC 完成时异步运行操作,我们也可以在这里使用自定义 Reactor。在下面的示例中,我们为这两个操作添加了日志。

  grpc::ServerUnaryReactor* GetFeature(grpc::CallbackServerContext* context,
                                       const Point* point,
                                       Feature* feature) override {
    class Reactor : public grpc::ServerUnaryReactor {
     public:
      Reactor(const Point& point, const std::vector<Feature>& feature_list,
              Feature* feature) {
        feature->set_name(GetFeatureName(point, feature_list));
        *feature->mutable_location() = point;
        Finish(grpc::Status::OK);
      }

     private:
      void OnDone() override {
        LOG(INFO) << "RPC Completed";
        delete this;
      }

      void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }
    };
    return new Reactor(*point, feature_list_, feature);
  }

对于 ServerUnaryReactor,我们需要覆盖 OnDone(),并且可选地覆盖 OnCancel()

注意 回调方法(例如 OnDone())应快速返回。切勿在此类回调中执行阻塞工作(例如,等待事件)。

ServerUnaryReactor 的构造函数在 GetFeature() 构造并提供 reactor 以响应已启动的 RPC 时被调用。它收集请求 Point、响应 Featurefeature_list。然后从 Point 获取响应 Feature 并将其添加到 feature_list 中。为了结束 RPC,我们调用 Finish(Status::OK)

OnDone() 响应 RPC 完成。我们将在 OnDone() 中执行最终清理并记录 RPC 结束。

OnCancel() 响应 RPC 取消。在此方法中,我们记录取消的发生。

客户端

注意:为简单起见,本教程不讨论如何创建 Channel 和 Stub。有关详细信息,请参阅基础教程

要启动 GetFeature RPC,除了 ClientContext、请求(即 Point)和响应(即 Feature)之外,客户端还需要将一个回调函数(即 std::function<void(::grpc::Status)>)传递给 stub_->async()->GetFeature()。回调函数将在服务器满足请求且 RPC 完成后被调用。

  bool GetOneFeature(const Point& point, Feature* feature) {
    ClientContext context;
    bool result;
    std::mutex mu;
    std::condition_variable cv;
    bool done = false;
    stub_->async()->GetFeature(
        &context, &point, feature,
        [&result, &mu, &cv, &done, feature, this](Status status) {
          bool ret;
          if (!status.ok()) {
            std::cout << "GetFeature rpc failed." << std::endl;
            ret = false;
          } else if (!feature->has_location()) {
            std::cout << "Server returns incomplete feature." << std::endl;
            ret = false;
          } else if (feature->name().empty()) {
            std::cout << "Found no feature at "
                      << feature->location().latitude() / kCoordFactor_ << ", "
                      << feature->location().longitude() / kCoordFactor_
                      << std::endl;
            ret = true;
          } else {
            std::cout << "Found feature called " << feature->name() << " at "
                      << feature->location().latitude() / kCoordFactor_ << ", "
                      << feature->location().longitude() / kCoordFactor_
                      << std::endl;
            ret = true;
          }
          std::lock_guard<std::mutex> lock(mu);
          result = ret;
          done = true;
          cv.notify_one();
        });
    std::unique_lock<std::mutex> lock(mu);
    cv.wait(lock, [&done] { return done; });
    return result;
  }

回调函数可以为 Unary RPC 执行各种后续工作。例如,上面代码片段中的回调函数检查状态和返回的特性,释放此调用的堆分配对象,最后通知 RPC 已完成。

为简单起见,示例显示了同一个函数等待 RPC 完成的通知,但这并非必需的。

服务器端流式 RPC

现在我们来看一个更复杂的 RPC - ListFeaturesListFeatures 是一个服务器端流式 RPC。客户端向服务器发送一个 Rectangle,然后服务器将返回一个 Feature 序列给客户端,每个 Feature 都以单独的消息发送。

服务器

对于任何流式 RPC,包括服务器端流式 RPC,RPC 处理程序的接口都类似。处理程序没有任何输入参数;返回类型是某种服务器 reactor,它处理单个 RPC 的所有业务逻辑。

下面是 ListFeatures 的处理程序接口。

  grpc::ServerWriteReactor<Feature>* ListFeatures(
      CallbackServerContext* context,
      const routeguide::Rectangle* rectangle);

因为 ListFeatures 是服务器端流式 RPC,所以返回类型应该是 ServerWriteReactorServerWriteReactor 有两个模板参数:Rectangle 是来自客户端的请求类型;Feature 是来自服务器的每个响应消息类型。

处理 RPC 的复杂性被委托给了 ServerWriteReactor。下面是实现 ServerWriteReactor 以处理 ListFeatures RPC 的方法。

  grpc::ServerWriteReactor<Feature>* ListFeatures(
      CallbackServerContext* context,
      const routeguide::Rectangle* rectangle) override {
    class Lister : public grpc::ServerWriteReactor<Feature> {
     public:
      Lister(const routeguide::Rectangle* rectangle,
             const std::vector<Feature>* feature_list)
          : left_((std::min)(rectangle->lo().longitude(),
                             rectangle->hi().longitude())),
            right_((std::max)(rectangle->lo().longitude(),
                              rectangle->hi().longitude())),
            top_((std::max)(rectangle->lo().latitude(),
                            rectangle->hi().latitude())),
            bottom_((std::min)(rectangle->lo().latitude(),
                               rectangle->hi().latitude())),
            feature_list_(feature_list),
            next_feature_(feature_list_->begin()) {
        NextWrite();
      }

      void OnWriteDone(bool ok) override {
        if (!ok) {
          Finish(Status(grpc::StatusCode::UNKNOWN, "Unexpected Failure"));
        }
        NextWrite();
      }

      void OnDone() override {
        LOG(INFO) << "RPC Completed";
        delete this;
      }

      void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }

     private:
      void NextWrite() {
        while (next_feature_ != feature_list_->end()) {
          const Feature& f = *next_feature_;
          next_feature_++;
          if (f.location().longitude() >= left_ &&
              f.location().longitude() <= right_ &&
              f.location().latitude() >= bottom_ &&
              f.location().latitude() <= top_) {
            StartWrite(&f);
            return;
          }
        }
        // Didn't write anything, all is done.
        Finish(Status::OK);
      }
      const long left_;
      const long right_;
      const long top_;
      const long bottom_;
      const std::vector<Feature>* feature_list_;
      std::vector<Feature>::const_iterator next_feature_;
    };
    return new Lister(rectangle, &feature_list_);
  }

不同的 reactor 有不同的回调方法。我们需要覆盖我们感兴趣的方法来实现我们的 RPC。对于 ListFeatures,我们需要覆盖 OnWriteDone()OnDone(),并且可选地覆盖 OnCancel

ServerWriteReactor 的构造函数在 ListFeatures() 构造并提供 reactor 以响应已启动的 RPC 时被调用。它收集 rectangle 中的所有 Featurefeatures_to_send_ 中,然后开始发送它们(如果存在)。

OnWriteDone() 响应写入完成。如果写入成功完成,我们将继续发送下一个 Feature,直到 features_to_send_ 为空,此时我们将调用 Finish(Status::OK) 来结束调用。

OnDone() 响应 RPC 完成。我们将在 OnDone() 中执行最终清理。

OnCancel() 响应 RPC 取消。在此方法中,我们记录取消的发生。

客户端

与服务器端类似,客户端需要实现某种客户端 reactor 来与服务器交互。客户端 reactor 封装了处理 RPC 所需的所有操作。

由于 ListFeatures 是服务器端流式,我们应该实现一个 ClientReadReactor,其名称与 ServerWriteReactor 对称。

    class Reader : public grpc::ClientReadReactor<Feature> {
     public:
      Reader(RouteGuide::Stub* stub, float coord_factor,
             const routeguide::Rectangle& rect)
          : coord_factor_(coord_factor) {
        stub->async()->ListFeatures(&context_, &rect, this);
        StartRead(&feature_);
        StartCall();
      }
      void OnReadDone(bool ok) override {
        if (ok) {
          std::cout << "Found feature called " << feature_.name() << " at "
                    << feature_.location().latitude() / coord_factor_ << ", "
                    << feature_.location().longitude() / coord_factor_
                    << std::endl;
          StartRead(&feature_);
        }
      }
      void OnDone(const Status& s) override {
        std::unique_lock<std::mutex> l(mu_);
        status_ = s;
        done_ = true;
        cv_.notify_one();
      }
      Status Await() {
        std::unique_lock<std::mutex> l(mu_);
        cv_.wait(l, [this] { return done_; });
        return std::move(status_);
      }

     private:
      ClientContext context_;
      float coord_factor_;
      Feature feature_;
      std::mutex mu_;
      std::condition_variable cv_;
      Status status_;
      bool done_ = false;
    };

ClientReadReactor 使用一个参数 Feature 进行模板化,这是来自服务器的响应消息类型。

Reader 的构造函数中,我们将 ClientContext&rectangle_(请求对象)和 Reader 传递给 RPC 方法 stub->async()->ListFeatures()。然后我们将 &feature_ 传递给 StartRead() 以指定存储接收到响应的位置。最后,我们调用 StartCall() 来激活 RPC!

OnReadDone() 响应读取完成。如果读取成功完成,我们将继续读取下一个 Feature,直到失败(由 okfalse 指示)。读取失败时,服务器会将响应设置到 summary_ 中并调用 Finish(Status::OK) 以结束 RPC。

OnDone() 响应 RPC 完成。它检查 RPC 状态结果并通知等待 OnDone() 的条件变量。

Await() 不是 ClientReadReactor 的方法。这只是为简单起见添加的,以便示例知道 RPC 已完成。或者,如果无需完成通知,OnDone() 只需在清理后返回,例如,释放堆分配的对象。

要发起 RPC,客户端只需实例化一个 ReadReactor 并等待 RPC 完成。

    routeguide::Rectangle rect;
    Feature feature;

    rect.mutable_lo()->set_latitude(400000000);
    rect.mutable_lo()->set_longitude(-750000000);
    rect.mutable_hi()->set_latitude(420000000);
    rect.mutable_hi()->set_longitude(-730000000);
    std::cout << "Looking for features between 40, -75 and 42, -73"
              << std::endl;

    Reader reader(stub_.get(), kCoordFactor_, rect);
    Status status = reader.Await();
    if (status.ok()) {
      std::cout << "ListFeatures rpc succeeded." << std::endl;
    } else {
      std::cout << "ListFeatures rpc failed." << std::endl;
    }

客户端流式 RPC

一旦您理解了上一节中服务器端流式 RPC 的概念,就会发现客户端流式 RPC 很容易学习。

RecordRoute 是我们将讨论的客户端流式 RPC。客户端向服务器发送一个 Point 序列,服务器将在客户端完成发送 Point 后返回一个 RouteSummary

服务器

客户端流式 RPC 的 RPC 处理程序接口没有任何输入参数,其返回类型是一个服务器 reactor,即 ServerReadReactor

ServerReadReactor 有两个模板参数:Point 是来自客户端的每个请求消息类型;RouteSummary 是来自服务器的响应类型。

ServerWriteReactor 类似,ServerReadReactor 是处理 RPC 的类。

grpc::ServerReadReactor<Point>* RecordRoute(CallbackServerContext* context,
                                              RouteSummary* summary) override {
    class Recorder : public grpc::ServerReadReactor<Point> {
     public:
      Recorder(RouteSummary* summary, const std::vector<Feature>* feature_list)
          : start_time_(system_clock::now()),
            summary_(summary),
            feature_list_(feature_list) {
        StartRead(&point_);
      }

      void OnReadDone(bool ok) override {
        if (ok) {
          point_count_++;
          if (!GetFeatureName(point_, *feature_list_).empty()) {
            feature_count_++;
          }
          if (point_count_ != 1) {
            distance_ += GetDistance(previous_, point_);
          }
          previous_ = point_;
          StartRead(&point_);
        } else {
          summary_->set_point_count(point_count_);
          summary_->set_feature_count(feature_count_);
          summary_->set_distance(static_cast<long>(distance_));
          auto secs = std::chrono::duration_cast<std::chrono::seconds>(
              system_clock::now() - start_time_);
          summary_->set_elapsed_time(secs.count());
          Finish(Status::OK);
        }
      }

      void OnDone() override {
        LOG(INFO) << "RPC Completed";
        delete this;
      }

      void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }

     private:
      system_clock::time_point start_time_;
      RouteSummary* summary_;
      const std::vector<Feature>* feature_list_;
      Point point_;
      int point_count_ = 0;
      int feature_count_ = 0;
      float distance_ = 0.0;
      Point previous_;
    };
    return new Recorder(summary, &feature_list_);
  }

ServerReadReactor 的构造函数在 RecordRoute() 构造并提供 reactor 以响应已启动的 RPC 时被调用。构造函数存储 RouteSummary* 以便稍后返回响应,并通过调用 StartRead(&point_) 发起读取操作。

OnReadDone() 响应读取完成。如果读取成功完成(即 oktrue),我们将继续读取下一个 Point;否则,我们将记录读取全部完成,并结束 RPC。至于读取成功时新收到的 Point,我们使用它更新统计数据,并继续读取下一个 Point,直到失败(由 okfalse 指示)。读取失败时,服务器会将响应设置到 summary_ 中并调用 Finish(Status::OK) 以结束 RPC。

OnDone() 响应 RPC 完成。我们将在 OnDone() 中执行最终清理。

OnCancel() 响应 RPC 取消。在此方法中,我们记录取消的发生。

客户端

毫不意外地,我们需要为客户端实现一个客户端 reactor,该客户端 reactor 称为 ClientWriteReactor

    class Recorder : public grpc::ClientWriteReactor<Point> {
     public:
      Recorder(RouteGuide::Stub* stub, float coord_factor,
               const std::vector<Feature>* feature_list)
          : coord_factor_(coord_factor),
            feature_list_(feature_list),
            generator_(
                std::chrono::system_clock::now().time_since_epoch().count()),
            feature_distribution_(0, feature_list->size() - 1),
            delay_distribution_(500, 1500) {
        stub->async()->RecordRoute(&context_, &stats_, this);
        // Use a hold since some StartWrites are invoked indirectly from a
        // delayed lambda in OnWriteDone rather than directly from the reaction
        // itself
        AddHold();
        NextWrite();
        StartCall();
      }
      void OnWriteDone(bool ok) override {
        // Delay and then do the next write or WritesDone
        alarm_.Set(
            std::chrono::system_clock::now() +
                std::chrono::milliseconds(delay_distribution_(generator_)),
            [this](bool /*ok*/) { NextWrite(); });
      }
      void OnDone(const Status& s) override {
        std::unique_lock<std::mutex> l(mu_);
        status_ = s;
        done_ = true;
        cv_.notify_one();
      }
      Status Await(RouteSummary* stats) {
        std::unique_lock<std::mutex> l(mu_);
        cv_.wait(l, [this] { return done_; });
        *stats = stats_;
        return std::move(status_);
      }

     private:
      void NextWrite() {
        if (points_remaining_ != 0) {
          const Feature& f =
              (*feature_list_)[feature_distribution_(generator_)];
          std::cout << "Visiting point "
                    << f.location().latitude() / coord_factor_ << ", "
                    << f.location().longitude() / coord_factor_ << std::endl;
          StartWrite(&f.location());
          points_remaining_--;
        } else {
          StartWritesDone();
          RemoveHold();
        }
      }
      ClientContext context_;
      float coord_factor_;
      int points_remaining_ = 10;
      Point point_;
      RouteSummary stats_;
      const std::vector<Feature>* feature_list_;
      std::default_random_engine generator_;
      std::uniform_int_distribution<int> feature_distribution_;
      std::uniform_int_distribution<int> delay_distribution_;
      grpc::Alarm alarm_;
      std::mutex mu_;
      std::condition_variable cv_;
      Status status_;
      bool done_ = false;
    };

ClientWriteReactor 使用一个参数 Point 进行模板化,这是来自客户端的请求消息类型。

Recorder 的构造函数中,我们将 ClientContext&stats_(响应对象)和 Recorder 传递给 RPC 方法 stub->async()->RecordRoute()。然后我们添加一个操作来发送 points_to_send_ 中的第一个 Point(如果存在)。注意,如果在 RPC 启动时没有要发送的内容,我们需要调用 StartWritesDone() 来通知服务器我们已完成写入。最后,调用 StartCall() 激活 RPC。

OnWriteDone() 响应写入完成。如果写入成功完成,我们将继续写入下一个 Point,直到 points_to_send_ 为空。对于最后一个要发送的 Point,我们调用 StartWriteLast() 来捎带写入完成的信号。StartWriteLast() 实质上与 StartWrite()StartWritesDone() 的组合相同,但更高效。

OnDone() 响应 RPC 完成。它检查 RPC 状态结果和 stats_ 中的响应,并通知等待 OnDone() 的条件变量。

Await() 不是 ClientWriteReactor 的方法。我们添加 Await() 以便调用者可以等待 RPC 完成。

要发起 RPC,客户端只需实例化一个 Recorder 并等待 RPC 完成。

    Recorder recorder(stub_.get(), kCoordFactor_, &feature_list_);
    RouteSummary stats;
    Status status = recorder.Await(&stats);
    if (status.ok()) {
      std::cout << "Finished trip with " << stats.point_count() << " points\n"
                << "Passed " << stats.feature_count() << " features\n"
                << "Travelled " << stats.distance() << " meters\n"
                << "It took " << stats.elapsed_time() << " seconds"
                << std::endl;
    } else {
      std::cout << "RecordRoute rpc failed." << std::endl;
    }

双向流式 RPC

最后,我们将看双向流式 RPC RouteChat。在这种情况下,客户端向服务器发送一个 RouteNote 序列。每次发送一个位于 PointRouteNote 时,服务器将返回一个位于同一 PointRouteNote 序列,这些 RouteNote 是之前由所有客户端发送的。

服务器

同样,双向流式 RPC 的 RPC 处理程序接口没有任何输入参数,其返回类型是一个服务器 reactor,即 ServerBidiReactor

ServerBidiReactor 有两个模板参数,两者都是 RouteNote,因为 RouteNote 是请求和响应的消息类型。毕竟,RouteChat 意味着让客户端彼此聊天和共享信息!

既然我们已经讨论了 ServerWriteReactorServerReadReactorServerBidiReactor 应该非常简单直接。

  grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat(
      CallbackServerContext* context) override {
    class Chatter : public grpc::ServerBidiReactor<RouteNote, RouteNote> {
     public:
      Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes)
          : mu_(mu), received_notes_(received_notes) {
        StartRead(&note_);
      }

      void OnReadDone(bool ok) override {
        if (ok) {
          // Unlike the other example in this directory that's not using
          // the reactor pattern, we can't grab a local lock to secure the
          // access to the notes vector, because the reactor will most likely
          // make us jump threads, so we'll have to use a different locking
          // strategy. We'll grab the lock locally to build a copy of the
          // list of nodes we're going to send, then we'll grab the lock
          // again to append the received note to the existing vector.
          mu_->Lock();
          std::copy_if(received_notes_->begin(), received_notes_->end(),
                       std::back_inserter(to_send_notes_),
                       [this](const RouteNote& note) {
                         return note.location().latitude() ==
                                    note_.location().latitude() &&
                                note.location().longitude() ==
                                    note_.location().longitude();
                       });
          mu_->Unlock();
          notes_iterator_ = to_send_notes_.begin();
          NextWrite();
        } else {
          Finish(Status::OK);
        }
      }
      void OnWriteDone(bool /*ok*/) override { NextWrite(); }

      void OnDone() override {
        LOG(INFO) << "RPC Completed";
        delete this;
      }

      void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }

     private:
      void NextWrite() {
        if (notes_iterator_ != to_send_notes_.end()) {
          StartWrite(&*notes_iterator_);
          notes_iterator_++;
        } else {
          mu_->Lock();
          received_notes_->push_back(note_);
          mu_->Unlock();
          StartRead(&note_);
        }
      }

      RouteNote note_;
      absl::Mutex* mu_;
      std::vector<RouteNote>* received_notes_ ABSL_GUARDED_BY(mu_);
      std::vector<RouteNote> to_send_notes_;
      std::vector<RouteNote>::iterator notes_iterator_;
    };
    return new Chatter(&mu_, &received_notes_);
  }

ServerBidiReactor 的构造函数在 RouteChat() 构造并提供 reactor 以响应已启动的 RPC 时被调用。构造函数通过调用 StartRead(&received_note_) 发起读取操作。

OnReadDone() 响应读取完成。如果读取成功完成(即 oktrue),我们将继续读取下一个 RouteNote;否则,我们将记录读取全部完成,并结束 RPC。至于读取成功时新收到的 RouteNote,我们将其添加到 received_notes_ 中,并将之前在同一 Point 接收到的 notes 附加到 to_send_notes_ 中。每当 to_send_notes_ 变为非空时,我们开始发送 to_send_notes_ 中的 RouteNote

OnWriteDone() 响应写入完成。如果写入成功完成,我们将继续发送下一个 RouteNote,直到 to_send_notes_ 为空,此时我们将继续读取下一个 RouteNote,如果读取也已完成,则结束 RPC。

OnDone() 响应 RPC 完成。我们将在 OnDone() 中执行最终清理。

OnCancel() 响应 RPC 取消。在此方法中,我们记录取消的发生。

客户端

是的,双向流式 RPC 的客户端 reactor 是 ClientBidiReactor

    class Chatter : public grpc::ClientBidiReactor<RouteNote, RouteNote> {
     public:
      explicit Chatter(RouteGuide::Stub* stub)
          : notes_{MakeRouteNote("First message", 0, 0),
                   MakeRouteNote("Second message", 0, 1),
                   MakeRouteNote("Third message", 1, 0),
                   MakeRouteNote("Fourth message", 0, 0)},
            notes_iterator_(notes_.begin()) {
        stub->async()->RouteChat(&context_, this);
        NextWrite();
        StartRead(&server_note_);
        StartCall();
      }
      void OnWriteDone(bool ok) override {
        if (ok) {
          NextWrite();
        }
      }
      void OnReadDone(bool ok) override {
        if (ok) {
          std::cout << "Got message " << server_note_.message() << " at "
                    << server_note_.location().latitude() << ", "
                    << server_note_.location().longitude() << std::endl;
          StartRead(&server_note_);
        }
      }
      void OnDone(const Status& s) override {
        std::unique_lock<std::mutex> l(mu_);
        status_ = s;
        done_ = true;
        cv_.notify_one();
      }
      Status Await() {
        std::unique_lock<std::mutex> l(mu_);
        cv_.wait(l, [this] { return done_; });
        return std::move(status_);
      }

     private:
      void NextWrite() {
        if (notes_iterator_ != notes_.end()) {
          const auto& note = *notes_iterator_;
          std::cout << "Sending message " << note.message() << " at "
                    << note.location().latitude() << ", "
                    << note.location().longitude() << std::endl;
          StartWrite(&note);
          notes_iterator_++;
        } else {
          StartWritesDone();
        }
      }
      ClientContext context_;
      const std::vector<RouteNote> notes_;
      std::vector<RouteNote>::const_iterator notes_iterator_;
      RouteNote server_note_;
      std::mutex mu_;
      std::condition_variable cv_;
      Status status_;
      bool done_ = false;
    };

ClientBidiReactor 使用两个参数进行模板化,即请求和响应的消息类型,对于 RPC RouteChat,两者都是 RouteNote

Chatter 的构造函数中,我们将 ClientContextChatter 传递给 RPC 方法 stub->async()->RouteChat()。然后我们添加一个操作来发送 notes_ 中的第一个 RouteNote(如果存在)。注意,如果在 RPC 启动时没有要发送的内容,我们需要调用 StartWritesDone() 来通知服务器我们已完成写入。我们还调用 StartRead() 来添加一个读取操作。最后,调用 StartCall() 激活 RPC。

OnReadDone() 响应读取完成。如果读取成功完成,我们将继续读取下一个 RouteNote,直到失败(由 okfalse 指示)。

OnWriteDone() 响应写入完成。如果写入成功完成,我们将继续写入下一个 RouteNote,直到 notes_ 为空。对于最后一个要发送的 RouteNote,我们调用 StartWriteLast() 来捎带写入完成的信号。StartWriteLast() 实质上与 StartWrite()StartWritesDone() 的组合相同,但更高效。

OnDone() 响应 RPC 完成。它检查 RPC 状态结果和消息统计,并通知等待 OnDone() 的条件变量。

Await() 不是 ClientBidiReactor 的方法。我们添加 Await() 以便调用者可以等待 RPC 完成。

要发起 RPC,客户端只需实例化一个 Chatter 并等待 RPC 完成。

    Chatter chatter(stub_.get());
    Status status = chatter.Await();
    if (!status.ok()) {
      std::cout << "RouteChat rpc failed." << std::endl;
    }
上次修改于 2024 年 6 月 18 日:C++ Callback API Tutorial (#1305) (18b12d6)