异步 API 教程
异步 API 教程
本教程演示如何使用 gRPC 的异步/非阻塞 API 在 C++ 中编写一个简单的服务器和客户端。它假设您已经熟悉编写简单的同步 gRPC 代码,如 基础教程 中所述。本教程中使用的示例沿用了 Greeter 示例,该示例在快速入门中使用。您可以在 grpc/examples/cpp/helloworld 中找到它以及安装说明。
概述
gRPC 使用 CompletionQueue API 进行异步操作。基本工作流程如下:
- 将
CompletionQueue
绑定到 RPC 调用 - 执行类似于读取或写入的操作,使用唯一的
void*
标记 - 调用
CompletionQueue::Next
等待操作完成。如果出现标记,则表示相应的操作已完成。
异步客户端
要使用异步客户端调用远程方法,首先需要像在 同步客户端 中一样创建通道和存根。一旦有了存根,就可以按照以下步骤进行异步调用:
启动 RPC 并为其创建一个句柄。将 RPC 绑定到
CompletionQueue
。CompletionQueue cq; std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc( stub_->AsyncSayHello(&context, request, &cq));
请求回复和最终状态,使用唯一的标记
Status status; rpc->Finish(&reply, &status, (void*)1);
等待完成队列返回下一个标记。一旦返回传递到相应
Finish()
调用中的标记,回复和状态就绪。void* got_tag; bool ok = false; cq.Next(&got_tag, &ok); if (ok && got_tag == (void*)1) { // check reply and status }
您可以在 greeter_async_client.cc 中查看完整的客户端示例。
异步服务器
服务器实现使用标记请求 RPC 调用,然后等待完成队列返回标记。异步处理 RPC 的基本流程是:
构建一个导出异步服务的服务器
helloworld::Greeter::AsyncService service; ServerBuilder builder; builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials()); builder.RegisterService(&service); auto cq = builder.AddCompletionQueue(); auto server = builder.BuildAndStart();
请求一个 RPC,提供唯一的标记
ServerContext context; HelloRequest request; ServerAsyncResponseWriter<HelloReply> responder; service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
等待完成队列返回标记。一旦检索到标记,上下文、请求和响应器就绪。
HelloReply reply; Status status; void* got_tag; bool ok = false; cq.Next(&got_tag, &ok); if (ok && got_tag == (void*)1) { // set reply and status responder.Finish(reply, status, (void*)2); }
等待完成队列返回标记。当标记返回时,RPC 完成。
void* got_tag; bool ok = false; cq.Next(&got_tag, &ok); if (ok && got_tag == (void*)2) { // clean up }
但是,这个基本流程没有考虑到服务器同时处理多个请求。为了处理这种情况,我们完整的异步服务器示例使用 CallData
对象来维护每个 RPC 的状态,并使用此对象的地址作为调用的唯一标记。
class CallData {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
this);
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
} else if (status_ == PROCESS) {
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new CallData(service_, cq_);
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name());
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
responder_.Finish(reply_, Status::OK, this);
status_ = FINISH;
} else {
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (CallData).
delete this;
}
}
}
为了简单起见,服务器仅对所有事件使用一个完成队列,并在 HandleRpcs
中运行一个主循环来查询队列
void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
cq_->Next(&tag, &ok);
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
关闭服务器
我们一直在使用完成队列来获取异步通知。必须小心在服务器也关闭后将其关闭。
请记住,我们在 ServerImpl::Run()
中通过运行 cq_ = builder.AddCompletionQueue()
获取了完成队列实例 cq_
。查看 ServerBuilder::AddCompletionQueue
的文档,我们看到:
… 调用者需要在关闭返回的完成队列之前关闭服务器。
有关更多详细信息,请参阅 ServerBuilder::AddCompletionQueue
的完整文档字符串。这在我们的示例中意味着 ServerImpl
的析构函数如下所示:
~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
您可以在 greeter_async_server.cc 中查看完整的服务器示例。