基础教程
Python 中 gRPC 的基础教程介绍。
基础教程
本教程为 Python 程序员提供了使用 gRPC 的基础介绍。
通过此示例,您将学习如何
- 在
.proto
文件中定义服务。 - 使用协议缓冲区编译器生成服务器和客户端代码。
- 使用 Python gRPC API 编写你的服务的简单客户端和服务器。
本教程假定你已经阅读了 gRPC 介绍 并熟悉 协议缓冲区。你可以在 proto3 语言指南 和 Python 生成代码指南 中了解更多信息。
为什么使用 gRPC?
我们的示例是一个简单的路线映射应用程序,它允许客户端获取其路线上特征的信息,创建其路线摘要,并与服务器及其他客户端交换路线信息(例如交通更新)。
通过 gRPC,我们可以在一个 .proto
文件中定义服务一次,并在 gRPC 支持的任何语言中生成客户端和服务器,这些客户端和服务器又可以在从大型数据中心内的服务器到你自己的平板电脑等各种环境中运行—— gRPC 会为你处理不同语言和环境之间通信的所有复杂性。我们还获得了使用协议缓冲区的所有优点,包括高效序列化、简单的 IDL 和易于接口更新。
示例代码和设置
本教程的示例代码位于 grpc/grpc/examples/python/route_guide。要下载示例,请运行以下命令克隆 grpc
仓库
git clone -b v1.74.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc
然后将当前目录更改为仓库中的 examples/python/route_guide
cd grpc/examples/python/route_guide
您还应该安装生成服务器和客户端接口代码的相关工具——如果尚未安装,请按照快速入门中的设置说明进行操作。
定义服务
你的第一步(正如你从 gRPC 介绍 中所知)是使用 协议缓冲区 定义 gRPC 服务 以及方法 请求 和 响应 类型。你可以在 examples/protos/route_guide.proto
中查看完整的 .proto
文件。
要定义服务,请在 .proto
文件中指定一个命名的 service
service RouteGuide {
// (Method definitions not shown)
}
然后,您在服务定义中定义 rpc
方法,并指定它们的请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide
服务中使用
简单 RPC:客户端使用存根向服务器发送请求,并等待响应返回,就像普通的函数调用一样。
// Obtains the feature at a given position. rpc GetFeature(Point) returns (Feature) {}
一种 响应流式 RPC,客户端向服务器发送请求并获得一个流以读取一系列消息。客户端从返回的流中读取,直到没有更多消息为止。如示例所示,你通过在 响应 类型前放置
stream
关键字来指定响应流式方法。// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}
一种 请求流式 RPC,客户端写入一系列消息并将其发送到服务器,同样使用提供的流。一旦客户端完成消息写入,它会等待服务器读取所有消息并返回其响应。你通过在 请求 类型前放置
stream
关键字来指定请求流式方法。// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}
一种 双向流式 RPC,双方使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按其喜欢的任何顺序读写:例如,服务器可以等待接收所有客户端消息后再写入其响应,或者它可以交替读取消息然后写入消息,或者进行其他读写组合。每个流中的消息顺序都得到保留。你通过在请求和响应前都放置
stream
关键字来指定此类型的方法。// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
你的 .proto
文件还包含我们服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义—— 例如,这是 Point
消息类型
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
生成客户端和服务器代码
接下来,你需要从 .proto
服务定义生成 gRPC 客户端和服务器接口。
首先,安装 grpcio-tools 包
pip install grpcio-tools
使用以下命令生成 Python 代码
python -m grpc_tools.protoc -I../../protos --python_out=. --pyi_out=. --grpc_python_out=. ../../protos/route_guide.proto
请注意,由于我们已经在示例目录中提供了一个版本的生成代码,运行此命令将重新生成相应文件,而不是创建新文件。生成的代码文件名为 route_guide_pb2.py
和 route_guide_pb2_grpc.py
,并包含
route_guide.proto
中定义的消息的类route_guide.proto
中定义的服务类RouteGuideStub
,客户端可用于调用 RouteGuide RPCRouteGuideServicer
,它定义了 RouteGuide 服务实现的接口
route_guide.proto
中定义的服务的函数add_RouteGuideServicer_to_server
,它将 RouteGuideServicer 添加到grpc.Server
注意
pb2 中的2
表示生成的代码遵循协议缓冲区 Python API 第 2 版。第 1 版已废弃。它与协议缓冲区语言版本无关,后者由 .proto
文件中的 syntax = "proto3"
或 syntax = "proto2"
指示。生成带自定义包路径的 gRPC 接口
要生成带自定义包路径的 gRPC 客户端接口,你可以结合 grpc_tools.protoc
命令使用 -I
参数。这种方法允许你为生成的文件指定一个自定义包名。
以下是生成带自定义包路径的 gRPC 客户端接口的示例命令
python -m grpc_tools.protoc -Igrpc/example/custom/path=../../protos \
--python_out=. --grpc_python_out=. \
../../protos/route_guide.proto
生成的文件将放置在 ./grpc/example/custom/path/
目录中
./grpc/example/custom/path/route_guide_pb2.py
./grpc/example/custom/path/route_guide_pb2_grpc.py
通过此设置,生成的 route_guide_pb2_grpc.py
文件将使用自定义包结构正确导入 protobuf 定义,如下所示
import grpc.example.custom.path.route_guide_pb2 as route_guide_pb2
通过遵循此方法,你可以确保文件将根据指定的包路径正确地相互调用。此方法允许你为 gRPC 客户端接口维护自定义包结构。
创建服务器
首先,让我们看看如何创建 RouteGuide
服务器。如果你只对创建 gRPC 客户端感兴趣,可以跳过此部分,直接转到 创建客户端 (不过你可能仍然会觉得它很有趣!)。
创建并运行 RouteGuide
服务器分为两个工作项
- 实现从我们的服务定义生成的 servicer 接口,其中包含执行服务实际“工作”的函数。
- 运行 gRPC 服务器以侦听来自客户端的请求并传输响应。
你可以在 examples/python/route_guide/route_guide_server.py 中找到 RouteGuide
服务器示例。
实现 RouteGuide
route_guide_server.py
有一个 RouteGuideServicer
类,它继承自生成的类 route_guide_pb2_grpc.RouteGuideServicer
# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):
RouteGuideServicer
实现了所有 RouteGuide
服务方法。
简单 RPC
我们先来看看最简单的类型 GetFeature
,它只是从客户端获取一个 Point
,然后从其数据库中以 Feature
形式返回相应的特征信息。
def GetFeature(self, request, context):
feature = get_feature(self.db, request)
if feature is None:
return route_guide_pb2.Feature(name="", location=request)
else:
return feature
该方法接收一个用于 RPC 的 route_guide_pb2.Point
请求,以及一个提供 RPC 特定信息(例如超时限制)的 grpc.ServicerContext
对象。它返回一个 route_guide_pb2.Feature
响应。
响应流式 RPC
现在让我们看看下一个方法。ListFeatures
是一个响应流式 RPC,它向客户端发送多个 Feature
。
def ListFeatures(self, request, context):
left = min(request.lo.longitude, request.hi.longitude)
right = max(request.lo.longitude, request.hi.longitude)
top = max(request.lo.latitude, request.hi.latitude)
bottom = min(request.lo.latitude, request.hi.latitude)
for feature in self.db:
if (
feature.location.longitude >= left
and feature.location.longitude <= right
and feature.location.latitude >= bottom
and feature.location.latitude <= top
):
yield feature
在这里,请求消息是一个 route_guide_pb2.Rectangle
,客户端希望在此范围内查找 Feature
。该方法不返回单个响应,而是生成零个或多个响应。
请求流式 RPC
请求流式方法 RecordRoute
使用一个 迭代器 的请求值并返回单个响应值。
def RecordRoute(self, request_iterator, context):
point_count = 0
feature_count = 0
distance = 0.0
prev_point = None
start_time = time.time()
for point in request_iterator:
point_count += 1
if get_feature(self.db, point):
feature_count += 1
if prev_point:
distance += get_distance(prev_point, point)
prev_point = point
elapsed_time = time.time() - start_time
return route_guide_pb2.RouteSummary(
point_count=point_count,
feature_count=feature_count,
distance=int(distance),
elapsed_time=int(elapsed_time),
)
双向流式 RPC
最后,让我们看看双向流式方法 RouteChat
。
def RouteChat(self, request_iterator, context):
prev_notes = []
for new_note in request_iterator:
for prev_note in prev_notes:
if prev_note.location == new_note.location:
yield prev_note
prev_notes.append(new_note)
此方法的语义是请求流式方法和响应流式方法的组合。它接收一个请求值迭代器,并且本身也是一个响应值迭代器。
启动服务器
一旦你实现了所有 RouteGuide
方法,下一步就是启动 gRPC 服务器,以便客户端能够实际使用你的服务
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
route_guide_pb2_grpc.add_RouteGuideServicer_to_server(RouteGuideServicer(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
服务器的 start()
方法是非阻塞的。将实例化一个新线程来处理请求。调用 server.start()
的线程在此期间通常没有其他工作可做。在这种情况下,你可以调用 server.wait_for_termination()
来清晰地阻塞调用线程,直到服务器终止。
创建客户端
你可以在 examples/python/route_guide/route_guide_client.py 中查看完整的客户端示例代码。
创建存根
要调用服务方法,我们首先需要创建一个存根。
我们实例化了从我们的 .proto
文件生成的 route_guide_pb2_grpc
模块中的 RouteGuideStub
类。
channel = grpc.insecure_channel('localhost:50051')
stub = route_guide_pb2_grpc.RouteGuideStub(channel)
调用服务方法
对于返回单个响应的 RPC 方法(“响应-一元”方法),gRPC Python 支持同步(阻塞)和异步(非阻塞)控制流语义。对于响应流式 RPC 方法,调用会立即返回一个响应值迭代器。对该迭代器的 next()
方法的调用会阻塞,直到迭代器生成的响应可用为止。
简单 RPC
对简单 RPC GetFeature
的同步调用几乎和调用本地方法一样简单直接。RPC 调用会等待服务器响应,并会返回响应或抛出异常
feature = stub.GetFeature(point)
对 GetFeature
的异步调用类似,但就像在线程池中异步调用本地方法一样
feature_future = stub.GetFeature.future(point)
feature = feature_future.result()
响应流式 RPC
调用响应流式 ListFeatures
类似于使用序列类型
for feature in stub.ListFeatures(rectangle):
请求流式 RPC
调用请求流式 RecordRoute
类似于将迭代器传递给本地方法。与上面同样返回单个响应的简单 RPC 一样,它可以同步或异步调用
route_summary = stub.RecordRoute(point_iterator)
route_summary_future = stub.RecordRoute.future(point_iterator)
route_summary = route_summary_future.result()
双向流式 RPC
调用双向流式 RouteChat
具有(与服务侧一样)请求流式和响应流式语义的组合
for received_route_note in stub.RouteChat(sent_route_note_iterator):
尝试一下!
运行服务器
python route_guide_server.py
在另一个终端中,运行客户端
python route_guide_client.py