基础教程

一个关于 gRPC 在 Python 中的基础教程简介。

基础教程

一个关于 gRPC 在 Python 中的基础教程简介。

本教程为 Python 程序员提供了一个关于使用 gRPC 的基础入门。

通过学习此示例,你将了解如何

  • .proto 文件中定义服务。
  • 使用 protocol buffer 编译器生成服务器和客户端代码。
  • 使用 Python gRPC API 为你的服务编写一个简单的客户端和服务器。

它假设你已经阅读了gRPC 简介并熟悉protocol buffers。你可以在proto3 语言指南Python 生成代码指南中了解更多信息。

为何使用 gRPC?

我们的示例是一个简单的路线映射应用程序,它允许客户端获取路线上的特征信息、创建路线摘要,并与服务器和其他客户端交换路线信息(如交通更新)。

使用 gRPC,我们可以在一个 .proto 文件中定义一次服务,并使用 gRPC 支持的任何语言生成客户端和服务器,这些客户端和服务器可以在从大型数据中心内的服务器到你自己的平板电脑等各种环境中运行——不同语言和环境之间的所有通信复杂性都由 gRPC 为你处理。我们还获得了使用 protocol buffers 的所有优势,包括高效的序列化、简单的 IDL 和易于更新的接口。

示例代码和设置

本教程的示例代码位于grpc/grpc/examples/python/route_guide。要下载此示例,请运行以下命令克隆 grpc 仓库

git clone -b v1.71.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc

然后将当前目录更改为仓库中的 examples/python/route_guide

cd grpc/examples/python/route_guide

你还应该安装用于生成服务器和客户端接口代码的相关工具——如果你尚未安装,请按照快速入门中的设置说明进行操作。

定义服务

你的第一步(正如你在gRPC 简介中了解到的)是使用protocol buffers定义 gRPC 的 service 以及方法 requestresponse 类型。你可以在examples/protos/route_guide.proto中看到完整的 .proto 文件。

要定义服务,请在你的 .proto 文件中指定一个命名为 service 的块

service RouteGuide {
   // (Method definitions not shown)
}

然后你在服务定义内部定义 rpc 方法,并指定它们的请求和响应类型。gRPC 允许你定义四种服务方法,所有这些方法都在 RouteGuide 服务中使用

  • 一种 简单 RPC,客户端使用 stub 向服务器发送请求并等待响应返回,就像普通的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 一种 响应流式 RPC,客户端向服务器发送请求并获取一个流,用于读取一系列消息。客户端从返回的流中读取,直到没有更多消息。如示例所示,通过在 response 类型前放置 stream 关键字来指定响应流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 一种 请求流式 RPC,客户端写入一系列消息并使用提供的流将其发送到服务器。一旦客户端完成写入消息,它会等待服务器读取所有消息并返回其响应。通过在 request 类型前放置 stream 关键字来指定请求流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 一种 双向流式 RPC,客户端和服务器使用读写流发送一系列消息。两个流独立运行,因此客户端和服务器可以按任何顺序读写:例如,服务器可以在写入响应之前等待接收所有客户端消息,或者它可以交替读取一条消息然后写入一条消息,或者读写组合的任何其他方式。每个流中的消息顺序都会保留。通过在 request 和 response 前都放置 stream 关键字来指定此类方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

你的 .proto 文件还包含我们服务方法中使用的所有请求和响应类型的 protocol buffer 消息类型定义——例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务器代码

接下来,你需要从你的 .proto 服务定义中生成 gRPC 客户端和服务器接口。

首先,安装 grpcio-tools 包

pip install grpcio-tools

使用以下命令生成 Python 代码

python -m grpc_tools.protoc -I../../protos --python_out=. --pyi_out=. --grpc_python_out=. ../../protos/route_guide.proto

请注意,由于我们已经在示例目录中提供了一个版本的生成代码,运行此命令会重新生成相应的文件,而不是创建新文件。生成的代码文件名为 route_guide_pb2.pyroute_guide_pb2_grpc.py,包含:

  • route_guide.proto 中定义的消息类
  • route_guide.proto 中定义的服务类
    • RouteGuideStub,客户端可以使用它来调用 RouteGuide RPC
    • RouteGuideServicer,定义了 RouteGuide 服务实现的接口
  • route_guide.proto 中定义的服务函数
    • add_RouteGuideServicer_to_server,将 RouteGuideServicer 添加到 grpc.Server

使用自定义包路径生成 gRPC 接口

要使用自定义包路径生成 gRPC 客户端接口,可以使用 -I 参数以及 grpc_tools.protoc 命令。这种方法允许你为生成的文件指定自定义包名。

以下是使用自定义包路径生成 gRPC 客户端接口的示例命令

python -m grpc_tools.protoc -Igrpc/example/custom/path=../../protos \
  --python_out=. --grpc_python_out=. \
  ../../protos/route_guide.proto

生成的文件将放置在 ./grpc/example/custom/path/ 目录中

  • ./grpc/example/custom/path/route_guide_pb2.py
  • ./grpc/example/custom/path/route_guide_pb2_grpc.py

通过此设置,生成的 route_guide_pb2_grpc.py 文件将使用自定义包结构正确导入 protobuf 定义,如下所示:

import grpc.example.custom.path.route_guide_pb2 as route_guide_pb2

通过遵循这种方法,你可以确保文件会根据指定的包路径正确地相互调用。此方法允许你为 gRPC 客户端接口维护自定义包结构。

创建服务器

首先,我们来看看如何创建 RouteGuide 服务器。如果你只对创建 gRPC 客户端感兴趣,可以跳过本节,直接前往创建客户端(不过你可能仍然会觉得本节有趣!)。

创建和运行 RouteGuide 服务器分为两个工作项:

  • 实现从我们的服务定义生成的 servicer 接口,该接口包含执行服务实际“工作”的函数。
  • 运行 gRPC 服务器以监听来自客户端的请求并传输响应。

你可以在examples/python/route_guide/route_guide_server.py中找到示例 RouteGuide 服务器代码。

实现 RouteGuide

route_guide_server.py 有一个 RouteGuideServicer 类,它继承了生成的类 route_guide_pb2_grpc.RouteGuideServicer

# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):

RouteGuideServicer 实现了所有 RouteGuide 服务方法。

简单 RPC

首先,我们来看看最简单的类型,GetFeature,它只从客户端获取一个 Point,并从其数据库中返回相应的特征信息,以 Feature 形式。

def GetFeature(self, request, context):
    feature = get_feature(self.db, request)
    if feature is None:
        return route_guide_pb2.Feature(name="", location=request)
    else:
        return feature

该方法被传递一个用于 RPC 的 route_guide_pb2.Point 请求,以及一个提供 RPC 特定信息(如超时限制)的 grpc.ServicerContext 对象。它返回一个 route_guide_pb2.Feature 响应。

响应流式 RPC

现在我们来看看下一个方法。ListFeatures 是一个响应流式 RPC,它向客户端发送多个 Feature

def ListFeatures(self, request, context):
    left = min(request.lo.longitude, request.hi.longitude)
    right = max(request.lo.longitude, request.hi.longitude)
    top = max(request.lo.latitude, request.hi.latitude)
    bottom = min(request.lo.latitude, request.hi.latitude)
    for feature in self.db:
        if (
            feature.location.longitude >= left
            and feature.location.longitude <= right
            and feature.location.latitude >= bottom
            and feature.location.latitude <= top
        ):
            yield feature

这里的请求消息是一个 route_guide_pb2.Rectangle,客户端想在此矩形范围内查找 Feature。该方法不是返回单个响应,而是生成零个或多个响应。

请求流式 RPC

请求流式方法 RecordRoute 使用请求值的迭代器,并返回单个响应值。

def RecordRoute(self, request_iterator, context):
    point_count = 0
    feature_count = 0
    distance = 0.0
    prev_point = None

    start_time = time.time()
    for point in request_iterator:
        point_count += 1
        if get_feature(self.db, point):
            feature_count += 1
        if prev_point:
            distance += get_distance(prev_point, point)
        prev_point = point

    elapsed_time = time.time() - start_time
    return route_guide_pb2.RouteSummary(
        point_count=point_count,
        feature_count=feature_count,
        distance=int(distance),
        elapsed_time=int(elapsed_time),
    )
双向流式 RPC

最后,我们来看看双向流式方法 RouteChat

def RouteChat(self, request_iterator, context):
    prev_notes = []
    for new_note in request_iterator:
        for prev_note in prev_notes:
            if prev_note.location == new_note.location:
                yield prev_note
        prev_notes.append(new_note)

此方法的语义是请求流式方法和响应流式方法的组合。它被传递一个请求值的迭代器,并且本身也是一个响应值的迭代器。

启动服务器

实现所有 RouteGuide 方法后,下一步是启动 gRPC 服务器,以便客户端能够实际使用你的服务

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    route_guide_pb2_grpc.add_RouteGuideServicer_to_server(RouteGuideServicer(), server)
    server.add_insecure_port("[::]:50051")
    server.start()
    server.wait_for_termination()

服务器的 start() 方法是非阻塞的。将实例化一个新线程来处理请求。调用 server.start() 的线程在此期间通常没有其他工作可做。在这种情况下,你可以调用 server.wait_for_termination() 以干净地阻塞调用线程,直到服务器终止。

创建客户端

你可以在examples/python/route_guide/route_guide_client.py中查看完整的客户端示例代码。

创建 Stub

要调用服务方法,我们首先需要创建一个 stub

我们实例化从我们的 .proto 文件生成的 route_guide_pb2_grpc 模块中的 RouteGuideStub 类。

channel = grpc.insecure_channel('localhost:50051')
stub = route_guide_pb2_grpc.RouteGuideStub(channel)

调用服务方法

对于返回单个响应的 RPC 方法(“响应-一元”方法),gRPC Python 支持同步(阻塞)和异步(非阻塞)两种控制流语义。对于响应流式 RPC 方法,调用会立即返回一个响应值的迭代器。对此迭代器的 next() 方法的调用会阻塞,直到迭代器生成相应的响应可用为止。

简单 RPC

对简单 RPC GetFeature 的同步调用几乎与调用本地方法一样简单。RPC 调用会等待服务器响应,然后返回响应或引发异常

feature = stub.GetFeature(point)

GetFeature 的异步调用类似,但就像在线程池中异步调用本地方法一样

feature_future = stub.GetFeature.future(point)
feature = feature_future.result()
响应流式 RPC

调用响应流式 ListFeatures 与使用序列类型类似

for feature in stub.ListFeatures(rectangle):
请求流式 RPC

调用请求流式 RecordRoute 与将迭代器传递给本地方法类似。与上面同样返回单个响应的简单 RPC 一样,它可以同步或异步调用

route_summary = stub.RecordRoute(point_iterator)
route_summary_future = stub.RecordRoute.future(point_iterator)
route_summary = route_summary_future.result()
双向流式 RPC

调用双向流式 RouteChat(就像在服务侧一样)结合了请求流式和响应流式的语义

for received_route_note in stub.RouteChat(sent_route_note_iterator):

试一试!

运行服务器

python route_guide_server.py

在另一个终端中,运行客户端

python route_guide_client.py