基础教程

一个关于 Python 中 gRPC 的基础教程介绍。

基础教程

一个关于 Python 中 gRPC 的基础教程介绍。

本教程为 Python 程序员提供了一个使用 gRPC 的基础入门介绍。

通过学习此示例,您将了解如何:

  • .proto 文件中定义服务。
  • 使用协议缓冲区编译器生成服务器和客户端代码。
  • 使用 Python gRPC API 为您的服务编写简单的客户端和服务器。

它假设您已经阅读了 gRPC 简介 并且熟悉 协议缓冲区。您可以在 proto3 语言指南Python 生成代码指南 中找到更多信息。

为什么要使用 gRPC?

我们的示例是一个简单的路线地图应用程序,它允许客户端获取有关其路线上的功能信息,创建其路线的摘要,并与服务器和其他客户端交换路线信息,例如交通更新。

使用 gRPC,我们可以在 .proto 文件中定义一次服务,并生成任何 gRPC 支持的语言的客户端和服务器,这些客户端和服务器又可以在从大型数据中心内的服务器到您自己的平板电脑等各种环境中运行 - 不同语言和环境之间通信的所有复杂性都由 gRPC 为您处理。我们还获得了使用协议缓冲区的所有优势,包括高效的序列化、简单的 IDL 和简单的接口更新。

示例代码和设置

本教程的示例代码位于 grpc/grpc/examples/python/route_guide。要下载该示例,请运行以下命令克隆 grpc 存储库:

git clone -b v1.66.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc

然后将当前目录更改为存储库中的 examples/python/route_guide

cd grpc/examples/python/route_guide

您还应该安装相关的工具来生成服务器和客户端接口代码 - 如果您还没有安装,请按照 快速入门 中的设置说明进行操作。

定义服务

您的第一步(正如您从 gRPC 简介 中所了解的那样)是使用 协议缓冲区定义 gRPC 服务 以及方法请求响应类型。您可以在 examples/protos/route_guide.proto 中查看完整的 .proto 文件。

要定义服务,请在您的 .proto 文件中指定一个名为 service 的名称:

service RouteGuide {
   // (Method definitions not shown)
}

然后,在您的服务定义中定义 rpc 方法,指定它们的请求和响应类型。 gRPC 允许您定义四种服务方法,所有这些方法都用于 RouteGuide 服务中:

  • 一个简单 RPC,客户端使用存根向服务器发送请求,并等待响应返回,就像正常的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 一个响应流式 RPC,客户端向服务器发送请求,并获取一个流以读取一系列消息。客户端从返回的流中读取,直到没有更多消息。正如您在示例中所看到的,您可以通过在响应类型之前放置 stream 关键字来指定响应流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 一个请求流式 RPC,客户端写入一系列消息并将它们发送到服务器,同样使用提供的流。客户端完成写入消息后,它会等待服务器读取所有消息并返回其响应。您可以通过在请求类型之前放置 stream 关键字来指定请求流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 一个双向流式 RPC,其中双方都使用读写流发送一系列消息。这两个流是独立运作的,因此客户端和服务器可以按他们喜欢的任何顺序读取和写入:例如,服务器可以等待接收所有客户端消息后再写入其响应,或者它可以交替读取消息然后写入消息,或者其他一些读取和写入的组合。每个流中消息的顺序都会被保留。您可以通过在请求和响应之前都放置 stream 关键字来指定这种类型的方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

您的 .proto 文件还包含用于我们服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义 - 例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务器代码

接下来,您需要从您的 .proto 服务定义生成 gRPC 客户端和服务器接口。

首先,安装 grpcio-tools 包

pip install grpcio-tools

使用以下命令生成 Python 代码

python -m grpc_tools.protoc -I../../protos --python_out=. --pyi_out=. --grpc_python_out=. ../../protos/route_guide.proto

请注意,由于我们已经在示例目录中提供了生成代码的版本,因此运行此命令会重新生成相应的文件,而不是创建新文件。 生成的代码文件名为 route_guide_pb2.pyroute_guide_pb2_grpc.py,包含

  • route_guide.proto 中定义的消息的类
  • route_guide.proto 中定义的服务类
    • RouteGuideStub,客户端可以使用它来调用 RouteGuide RPC
    • RouteGuideServicer,它定义了 RouteGuide 服务实现的接口
  • route_guide.proto 中定义的服务函数
    • add_RouteGuideServicer_to_server,它将 RouteGuideServicer 添加到 grpc.Server

使用自定义包路径生成 gRPC 接口

要使用自定义包路径生成 gRPC 客户端接口,可以将 -I 参数与 grpc_tools.protoc 命令一起使用。此方法允许您为生成的文件指定自定义包名称。

这是一个使用自定义包路径生成 gRPC 客户端接口的示例命令

python -m grpc_tools.protoc -Igrpc/example/custom/path=../../protos \
  --python_out=. --grpc_python_out=. \
  ../../protos/route_guide.proto

生成的文件将放置在 ./grpc/example/custom/path/ 目录中

  • ./grpc/example/custom/path/route_guide_pb2.py
  • ./grpc/example/custom/path/route_guide_pb2_grpc.py

通过此设置,生成的 route_guide_pb2_grpc.py 文件将使用自定义包结构正确导入 protobuf 定义,如下所示

import grpc.example.custom.path.route_guide_pb2 as route_guide_pb2

通过遵循此方法,您可以确保文件将根据指定的包路径正确相互调用。此方法允许您为 gRPC 客户端接口维护自定义包结构。

创建服务器

首先,让我们看看如何创建 RouteGuide 服务器。如果您只对创建 gRPC 客户端感兴趣,则可以跳过本节,直接转到 创建客户端(尽管您可能会觉得它很有趣!)。

创建和运行 RouteGuide 服务器分为两个工作项

  • 使用执行服务实际“工作”的函数来实现从我们的服务定义生成的 servicer 接口。
  • 运行 gRPC 服务器以侦听来自客户端的请求并传输响应。

您可以在 examples/python/route_guide/route_guide_server.py 中找到示例 RouteGuide 服务器。

实现 RouteGuide

route_guide_server.py 有一个 RouteGuideServicer 类,该类是生成的类 route_guide_pb2_grpc.RouteGuideServicer 的子类

# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):

RouteGuideServicer 实现了所有 RouteGuide 服务方法。

简单 RPC

让我们先看一下最简单的类型,GetFeature,它只是从客户端获取一个 Point,并从其数据库中返回相应的要素信息,返回结果类型为Feature

def GetFeature(self, request, context):
    feature = get_feature(self.db, request)
    if feature is None:
        return route_guide_pb2.Feature(name="", location=request)
    else:
        return feature

该方法被传递一个用于 RPC 的 route_guide_pb2.Point 请求,以及一个 grpc.ServicerContext 对象,该对象提供 RPC 特有的信息,例如超时限制。它返回一个 route_guide_pb2.Feature 响应。

响应流式 RPC

现在让我们看看下一个方法。ListFeatures 是一个响应流式 RPC,它向客户端发送多个 Feature

def ListFeatures(self, request, context):
    left = min(request.lo.longitude, request.hi.longitude)
    right = max(request.lo.longitude, request.hi.longitude)
    top = max(request.lo.latitude, request.hi.latitude)
    bottom = min(request.lo.latitude, request.hi.latitude)
    for feature in self.db:
        if (
            feature.location.longitude >= left
            and feature.location.longitude <= right
            and feature.location.latitude >= bottom
            and feature.location.latitude <= top
        ):
            yield feature

这里的请求消息是一个 route_guide_pb2.Rectangle,客户端希望在其中查找 Feature。该方法不会返回单个响应,而是产生零个或多个响应。

请求流式 RPC

请求流式方法 RecordRoute 使用请求值的 迭代器,并返回单个响应值。

def RecordRoute(self, request_iterator, context):
    point_count = 0
    feature_count = 0
    distance = 0.0
    prev_point = None

    start_time = time.time()
    for point in request_iterator:
        point_count += 1
        if get_feature(self.db, point):
            feature_count += 1
        if prev_point:
            distance += get_distance(prev_point, point)
        prev_point = point

    elapsed_time = time.time() - start_time
    return route_guide_pb2.RouteSummary(
        point_count=point_count,
        feature_count=feature_count,
        distance=int(distance),
        elapsed_time=int(elapsed_time),
    )
双向流式 RPC

最后,让我们看看双向流式方法 RouteChat

def RouteChat(self, request_iterator, context):
    prev_notes = []
    for new_note in request_iterator:
        for prev_note in prev_notes:
            if prev_note.location == new_note.location:
                yield prev_note
        prev_notes.append(new_note)

此方法的语义是请求流式方法和响应流式方法的组合。它被传递一个请求值的迭代器,并且它本身就是响应值的迭代器。

启动服务器

在您实现了所有 RouteGuide 方法后,下一步是启动 gRPC 服务器,以便客户端可以实际使用您的服务

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    route_guide_pb2_grpc.add_RouteGuideServicer_to_server(RouteGuideServicer(), server)
    server.add_insecure_port("[::]:50051")
    server.start()
    server.wait_for_termination()

服务器 start() 方法是非阻塞的。将实例化一个新线程来处理请求。调用 server.start() 的线程通常在此期间没有任何其他工作要做。在这种情况下,您可以调用 server.wait_for_termination() 来干净地阻止调用线程,直到服务器终止。

创建客户端

您可以在 examples/python/route_guide/route_guide_client.py 中查看完整的客户端示例代码。

创建存根

要调用服务方法,我们首先需要创建一个存根

我们实例化从我们的 .proto 生成的 route_guide_pb2_grpc 模块的 RouteGuideStub 类。

channel = grpc.insecure_channel('localhost:50051')
stub = route_guide_pb2_grpc.RouteGuideStub(channel)

调用服务方法

对于返回单个响应(“响应单向”方法)的 RPC 方法,gRPC Python 支持同步(阻塞)和异步(非阻塞)控制流语义。对于响应流式 RPC 方法,调用会立即返回一个响应值的迭代器。对该迭代器的 next() 方法的调用会阻塞,直到迭代器要产生的响应可用为止。

简单 RPC

对简单 RPC GetFeature 的同步调用几乎与调用本地方法一样简单。 RPC 调用会等待服务器响应,并且要么返回响应,要么引发异常

feature = stub.GetFeature(point)

GetFeature 的异步调用类似,但类似于在线程池中异步调用本地方法

feature_future = stub.GetFeature.future(point)
feature = feature_future.result()
响应流式 RPC

调用响应流式 ListFeatures 类似于使用序列类型

for feature in stub.ListFeatures(rectangle):
请求流式 RPC

调用请求流式 RecordRoute 类似于将迭代器传递给本地方法。像上面也返回单个响应的简单 RPC 一样,它可以同步或异步调用

route_summary = stub.RecordRoute(point_iterator)
route_summary_future = stub.RecordRoute.future(point_iterator)
route_summary = route_summary_future.result()
双向流式 RPC

调用双向流式 RouteChat 具有(与服务侧的情况相同)请求流式和响应流式语义的组合

for received_route_note in stub.RouteChat(sent_route_note_iterator):

亲自尝试!

运行服务器

python route_guide_server.py

从另一个终端,运行客户端

python route_guide_client.py
上次修改时间 2024 年 11 月 25 日: feat: move the $ shell line indicator to scss (#1354) (ab8b3af)