异步API教程

异步API教程

本教程将向您展示如何使用 gRPC 的异步/非阻塞 API 编写一个简单的 C++ 服务器和客户端。本教程假定您已经熟悉编写简单的同步 gRPC 代码,如基础教程中所述。本教程中使用的示例沿用了快速入门中使用的基本 Greeter 示例。您可以在 grpc/examples/cpp/helloworld 中找到它以及安装说明。

概述

gRPC 使用 CompletionQueue API 进行异步操作。基本工作流程如下:

  • 将 `CompletionQueue` 绑定到 RPC 调用
  • 执行读写操作,并附带一个唯一的 `void*` 标签
  • 调用 `CompletionQueue::Next` 等待操作完成。如果出现标签,则表示相应的操作已完成。

异步客户端

要使用异步客户端调用远程方法,您首先创建通道和存根,就像在同步客户端中那样。一旦有了存根,您可以通过以下方式进行异步调用:

  • 启动 RPC 并为其创建句柄。将 RPC 绑定到 `CompletionQueue`。

    CompletionQueue cq;
    std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
        stub_->AsyncSayHello(&context, request, &cq));
    
  • 请求回复和最终状态,并带有唯一的标签

    Status status;
    rpc->Finish(&reply, &status, (void*)1);
    
  • 等待完成队列返回下一个标签。一旦传入相应 `Finish()` 调用的标签返回,回复和状态就绪。

    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)1) {
      // check reply and status
    }
    

您可以在 greeter_async_client.cc 中查看完整的客户端示例。

异步服务器

服务器实现请求带标签的 RPC 调用,然后等待完成队列返回标签。异步处理 RPC 的基本流程是:

  • 构建一个暴露异步服务的服务器

    helloworld::Greeter::AsyncService service;
    ServerBuilder builder;
    builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials());
    builder.RegisterService(&service);
    auto cq = builder.AddCompletionQueue();
    auto server = builder.BuildAndStart();
    
  • 请求一个 RPC,提供一个唯一的标签

    ServerContext context;
    HelloRequest request;
    ServerAsyncResponseWriter<HelloReply> responder;
    service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
    
  • 等待完成队列返回标签。一旦标签检索成功,上下文、请求和响应器就绪。

    HelloReply reply;
    Status status;
    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)1) {
      // set reply and status
      responder.Finish(reply, status, (void*)2);
    }
    
  • 等待完成队列返回标签。当标签返回时,RPC 完成。

    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)2) {
      // clean up
    }
    

然而,这个基本流程没有考虑到服务器同时处理多个请求的情况。为了解决这个问题,我们完整的异步服务器示例使用一个 `CallData` 对象来维护每个 RPC 的状态,并使用此对象的地址作为调用的唯一标签。

class CallData {
public:
  // Take in the "service" instance (in this case representing an asynchronous
  // server) and the completion queue "cq" used for asynchronous communication
  // with the gRPC runtime.
  CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
      : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
    // Invoke the serving logic right away.
    Proceed();
  }

  void Proceed() {
    if (status_ == CREATE) {
      // As part of the initial CREATE state, we *request* that the system
      // start processing SayHello requests. In this request, "this" acts are
      // the tag uniquely identifying the request (so that different CallData
      // instances can serve different requests concurrently), in this case
      // the memory address of this CallData instance.
      service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                this);
      // Make this instance progress to the PROCESS state.
      status_ = PROCESS;
    } else if (status_ == PROCESS) {
      // Spawn a new CallData instance to serve new clients while we process
      // the one for this CallData. The instance will deallocate itself as
      // part of its FINISH state.
      new CallData(service_, cq_);

      // The actual processing.
      std::string prefix("Hello ");
      reply_.set_message(prefix + request_.name());

      // And we are done! Let the gRPC runtime know we've finished, using the
      // memory address of this instance as the uniquely identifying tag for
      // the event.
      responder_.Finish(reply_, Status::OK, this);
      status_ = FINISH;
    } else {
      GPR_ASSERT(status_ == FINISH);
      // Once in the FINISH state, deallocate ourselves (CallData).
      delete this;
    }
  }
}

为简单起见,服务器只对所有事件使用一个完成队列,并在 `HandleRpcs` 中运行主循环来查询队列

void HandleRpcs() {
  // Spawn a new CallData instance to serve new clients.
  new CallData(&service_, cq_.get());
  void* tag;  // uniquely identifies a request.
  bool ok;
  while (true) {
    // Block waiting to read the next event from the completion queue. The
    // event is uniquely identified by its tag, which in this case is the
    // memory address of a CallData instance.
    cq_->Next(&tag, &ok);
    GPR_ASSERT(ok);
    static_cast<CallData*>(tag)->Proceed();
  }
}

关闭服务器

我们一直使用完成队列来获取异步通知。必须注意在服务器关闭*之后*关闭它。

请记住,我们在 `ServerImpl::Run()` 中通过运行 `cq_ = builder.AddCompletionQueue()` 获取了完成队列实例 `cq_`。查看 `ServerBuilder::AddCompletionQueue` 的文档,我们看到:

… 调用者必须在关闭返回的完成队列之前关闭服务器。

有关更多详细信息,请参阅 `ServerBuilder::AddCompletionQueue` 的完整文档字符串。这在我们的示例中意味着 `ServerImpl` 的析构函数看起来像:

~ServerImpl() {
  server_->Shutdown();
  // Always shutdown the completion queue after the server.
  cq_->Shutdown();
}

您可以在 greeter_async_server.cc 中查看完整的服务器示例。