基础教程

Kotlin 中 gRPC 的基础教程介绍。

基础教程

Kotlin 中 gRPC 的基础教程介绍。

本教程为 Kotlin 程序员提供了使用 gRPC 的基本介绍。

通过学习本示例,你将学习如何

  • .proto 文件中定义服务。
  • 使用协议缓冲区编译器生成服务器和客户端代码。
  • 使用 Kotlin gRPC API 为你的服务编写简单的客户端和服务器。

你应该已经熟悉 gRPC 和协议缓冲区;如果不是,请参阅gRPC 简介和 proto3 语言指南

为什么要使用 gRPC?

我们的示例是一个简单的路线地图应用程序,它允许客户端获取有关其路线上特征的信息,创建其路线的摘要,并与服务器和其他客户端交换路线信息,例如交通更新。

借助 gRPC,我们可以使用 .proto 文件定义一次服务,并以任何 gRPC 支持的语言生成客户端和服务器,这些客户端和服务器反过来可以在从大型数据中心内的服务器到你自己的平板电脑的各种环境中运行——gRPC 会为你处理不同语言和环境之间通信的所有复杂性。我们还可以获得使用协议缓冲区的所有优势,包括高效的序列化、简单的 IDL 和轻松的接口更新。

设置

本教程与快速入门具有相同的先决条件。在继续之前,请安装必要的 SDK 和工具。

获取示例代码

示例代码是 grpc-kotlin 仓库的一部分。

  1. 将仓库下载为 zip 文件 并解压,或克隆仓库

    git clone --depth 1 https://github.com/grpc/grpc-kotlin
    
  2. 切换到 examples 目录

    cd grpc-kotlin/examples
    

定义服务

你的第一步(正如你从 gRPC 简介中所知)是使用协议缓冲区定义 gRPC *服务*以及方法*请求*和*响应*类型。

如果你想通过查看完整的 .proto 文件来继续,请参阅 protos/src/main/proto/io/grpc/examples 文件夹中的 routeguide/route_guide.proto

要定义服务,你需要在 .proto 文件中指定一个命名的 service,如下所示

service RouteGuide {
   ...
}

然后,你在服务定义中定义 rpc 方法,指定它们的请求和响应类型。gRPC 允许你定义四种服务方法,所有这些方法都在 RouteGuide 服务中使用

  • 一个简单 RPC,客户端使用存根向服务器发送请求,并等待响应返回,就像一个普通的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 一个服务器端流式 RPC,客户端向服务器发送请求并获取流以读取一系列消息。客户端从返回的流中读取,直到没有更多消息。正如你在我们的示例中所看到的,你可以通过在响应类型之前放置 stream 关键字来指定服务器端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 一个客户端流式 RPC,客户端写入一系列消息并使用提供的流将它们发送到服务器。客户端完成写入消息后,它会等待服务器读取所有消息并返回其响应。你可以通过在请求类型之前放置 stream 关键字来指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 一个双向流式 RPC,其中双方都使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按照它们喜欢的任何顺序进行读写:例如,服务器可以等待接收所有客户端消息,然后再写入其响应,或者它可以交替读取消息然后写入消息,或者其他读取和写入的组合。每个流中消息的顺序都会被保留。你可以通过在请求和响应之前都放置 stream 关键字来指定这种类型的方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

.proto 文件还包含服务方法使用的所有请求和响应类型的协议缓冲区消息类型定义 – 例如,这是 Point 消息类型

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务器代码

接下来,你需要从 .proto 服务定义生成 gRPC 客户端和服务器接口。 你可以使用带有特殊 gRPC Kotlin 和 Java 插件的协议缓冲区编译器 protoc 来完成此操作。

当使用 Gradle 或 Maven 时,protoc 构建插件将在构建过程中生成必要的代码。 有关 Gradle 示例,请参阅 stub/build.gradle.kts

如果你从 examples 文件夹运行 ./gradlew installDist,则会从服务定义生成以下文件 – 你可以在 stub/build/generated/source/proto/main 下的子目录中找到生成的文件

  • Feature.javaPoint.javaRectangle.java 和其他文件,其中包含填充、序列化和检索我们的请求和响应消息类型的所有协议缓冲区代码。

    你将在 java/io/grpc/examples/routeguide 子目录中找到这些文件。

  • RouteGuideOuterClassGrpcKt.kt,其中包含,除其他外

    • RouteGuideGrpcKt.RouteGuideCoroutineImplBase,一个用于 RouteGuide 服务器实现的抽象基类,其中包含 RouteGuide 服务中定义的所有方法。
    • RouteGuideGrpcKt.RouteGuideCoroutineStub,客户端用于与 RouteGuide 服务器通信的类。

    你将在 grpckt/io/grpc/examples/routeguide 下找到此 Kotlin 文件。

创建服务器

首先考虑如何创建一个 RouteGuide 服务器。 如果你只对创建 gRPC 客户端感兴趣,请跳到 创建客户端 – 但你可能会发现本节也很有趣!

创建 RouteGuide 服务器时,你需要做两件主要的事情

  • 扩展 RouteGuideCoroutineImplBase 服务基类以执行实际的服务工作。
  • 创建并运行 gRPC 服务器以侦听来自客户端的请求并返回服务响应。

server/src/main/kotlin/io/grpc/examples 文件夹下的 routeguide/RouteGuideServer.kt 中打开示例 RouteGuide 服务器代码。

实现 RouteGuide

如你所见,服务器有一个扩展了生成的服务基类的 RouteGuideService

class RouteGuideService(
  val features: Collection<Feature>,
  /* ... */
) : RouteGuideGrpcKt.RouteGuideCoroutineImplBase() {
  /* ... */
}
简单 RPC

RouteGuideService 实现了所有服务方法。 首先考虑最简单的方法 GetFeature(),它从客户端获取一个 Point,并返回一个从数据库中相应的功能信息构建的 Feature

override suspend fun getFeature(request: Point): Feature =
    features.find { it.location == request } ?:
    // No feature was found, return an unnamed feature.
    Feature.newBuilder().apply { location = request }.build()

该方法接受客户端的 Point 消息请求作为参数,并返回一个 Feature 消息作为响应。该方法使用适当的信息填充 Feature,然后将其返回到 gRPC 框架,后者将其发送回客户端。

服务器端流式 RPC

接下来,考虑其中一个流式 RPC。 ListFeatures() 是一个服务器端流式 RPC,因此服务器可以向客户端发送多个 Feature 消息。

override fun listFeatures(request: Rectangle): Flow<Feature> =
  features.asFlow().filter { it.exists() && it.location in request }

请求对象是一个 Rectangle。 服务器收集并返回给客户端其集合中位于给定 Rectangle 内的所有 Feature 对象。

客户端流式 RPC

现在考虑一些更复杂的情况:客户端流式方法 RecordRoute(),其中服务器从客户端获取一个 Point 对象流,并返回一个包含有关它们通过给定点的行程信息的单个 RouteSummary

override suspend fun recordRoute(requests: Flow<Point>): RouteSummary {
  var pointCount = 0
  var featureCount = 0
  var distance = 0
  var previous: Point? = null
  val stopwatch = Stopwatch.createStarted(ticker)
  requests.collect { request ->
    pointCount++
    if (getFeature(request).exists()) {
      featureCount++
    }
    val prev = previous
    if (prev != null) {
      distance += prev distanceTo request
    }
    previous = request
  }
  return RouteSummary.newBuilder().apply {
    this.pointCount = pointCount
    this.featureCount = featureCount
    this.distance = distance
    this.elapsedTime = Durations.fromMicros(stopwatch.elapsed(TimeUnit.MICROSECONDS))
  }.build()
}

请求参数是以 Kotlin Flow 表示的客户端请求消息流。 与简单 RPC 情况一样,服务器返回单个响应。

双向流式 RPC

最后,考虑双向流式 RPC RouteChat()

override fun routeChat(requests: Flow<RouteNote>): Flow<RouteNote> =
  flow {
    // could use transform, but it's currently experimental
    requests.collect { note ->
      val notes: MutableList<RouteNote> = routeNotes.computeIfAbsent(note.location) {
        Collections.synchronizedList(mutableListOf<RouteNote>())
      }
      for (prevNote in notes.toTypedArray()) { // thread-safe snapshot
        emit(prevNote)
      }
      notes += note
    }
  }

与客户端流式示例类似,对于此方法,服务器将 RouteNote 对象流作为 Flow 获取。 但是,这次服务器客户端仍在向消息流写入消息的同时,通过该方法返回的流返回 RouteNote 实例。

启动服务器

一旦所有服务器方法都已实现,你需要代码来创建 gRPC 服务器实例,如下所示

class RouteGuideServer(
    val port: Int,
    val features: Collection<Feature> = Database.features(),
    val server: Server =
      ServerBuilder.forPort(port)
        .addService(RouteGuideService(features)).build()
) {

  fun start() {
    server.start()
    println("Server started, listening on $port")
    /* ... */
  }
  /* ... */
}

fun main(args: Array<String>) {
  val port = 8980
  val server = RouteGuideServer(port)
  server.start()
  server.awaitTermination()
 }

服务器实例使用 ServerBuilder 构建并启动,如下所示

  1. 使用 forPort() 指定服务器将侦听客户端请求的端口。
  2. 创建服务实现类 RouteGuideService 的实例,并将其传递给构建器的 addService() 方法。
  3. 在构建器上调用 build()start() 以创建并启动路由指南服务的 RPC 服务器。
  4. 在服务器上调用 awaitTermination() 以阻止主函数,直到应用程序收到终止信号。

创建客户端

在本节中,你将了解 RouteGuide 服务的客户端。

有关完整的客户端代码,请打开 client/src/main/kotlin/io/grpc/examples 文件夹下的 routeguide/RouteGuideClient.kt

实例化存根

要调用服务方法,你首先需要使用 ManagedChannelBuilder 创建 gRPC 通道。 你将使用此通道与服务器通信。

val channel = ManagedChannelBuilder.forAddress("localhost", 8980).usePlaintext().build()

设置 gRPC 通道后,你需要一个客户端存根来执行 RPC。 通过实例化 RouteGuideCoroutineStub 来获取它,该存根可从 .proto 文件生成的包中获得。

val stub = RouteGuideCoroutineStub(channel)

调用服务方法

现在考虑如何调用服务方法。

简单 RPC

调用简单的 RPC GetFeature() 与调用本地方法一样简单

val request = point(latitude, longitude)
val feature = stub.getFeature(request)

存根方法 getFeature() 执行相应的 RPC,暂停直到 RPC 完成

suspend fun getFeature(latitude: Int, longitude: Int) {
  val request = point(latitude, longitude)
  val feature = stub.getFeature(request)
  if (feature.exists()) { /* ... */ }
}
服务器端流式 RPC

接下来,考虑服务器端流式 ListFeatures() RPC,它返回地理特征流

suspend fun listFeatures(lowLat: Int, lowLon: Int, hiLat: Int, hiLon: Int) {
  val request = Rectangle.newBuilder()
    .setLo(point(lowLat, lowLon))
    .setHi(point(hiLat, hiLon))
    .build()
  var i = 1
  stub.listFeatures(request).collect { feature ->
    println("Result #${i++}: $feature")
  }
}

存根 listFeatures() 方法以 Flow<Feature> 实例的形式返回特征流。 当服务器提供的特征可用时,流 collect() 方法允许客户端处理它们。

客户端流式 RPC

客户端流式 RecordRoute() RPC 将 Point 消息流发送到服务器,并返回单个 RouteSummary

suspend fun recordRoute(points: Flow<Point>) {
  println("*** RecordRoute")
  val summary = stub.recordRoute(points)
  println("Finished trip with ${summary.pointCount} points.")
  println("Passed ${summary.featureCount} features.")
  println("Travelled ${summary.distance} meters.")
  val duration = summary.elapsedTime.seconds
  println("It took $duration seconds.")
}

该方法从与随机选择的功能列表关联的点生成路线点。 随机选择是从先前加载的功能集合中进行的

fun generateRoutePoints(features: List<Feature>, numPoints: Int): Flow<Point> = flow {
  for (i in 1..numPoints) {
    val feature = features.random(random)
    println("Visiting point ${feature.location.toStr()}")
    emit(feature.location)
    delay(timeMillis = random.nextLong(500L..1500L))
  }
}

请注意,流点是延迟发出的,也就是说,仅在服务器请求它们时才发出。 一旦将点发送到流,点生成器就会暂停,直到服务器请求下一个点。

双向流式 RPC

最后,考虑双向流式 RPC RouteChat()。 与 RecordRoute() 的情况一样,你将一个流传递给存根方法,你使用该流向服务器写入请求消息; 与 ListFeatures() 类似,你返回一个流,你可以使用该流从服务器读取响应消息。 但是,这次你在服务器也在向消息流写入消息的同时,通过我们方法的流发送值。

suspend fun routeChat() {
  val requests = generateOutgoingNotes()
  stub.routeChat(requests).collect { note ->
    println("Got message \"${note.message}\" at ${note.location.toStr()}")
  }
  println("Finished RouteChat")
}

private fun generateOutgoingNotes(): Flow<RouteNote> = flow {
  val notes = listOf(/* ... */)
  for (note in notes) {
    println("Sending message \"${note.message}\" at ${note.location.toStr()}")
    emit(note)
    delay(500)
  }
}

此处读取和写入的语法与客户端和服务器端流式方法非常相似。 尽管每一方始终会按照写入的顺序获取另一方的消息,但客户端和服务器都可以按任意顺序读取和写入 - 这些流完全独立运行。

试一试!

grpc-kotlin/examples 目录运行以下命令

  1. 编译客户端和服务器

    ./gradlew installDist
    
  2. 运行服务器

    ./server/build/install/server/bin/route-guide-server
    Server started, listening on 8980
    
  3. 从另一个终端,运行客户端

    ./client/build/install/client/bin/route-guide-client
    

你将看到如下所示的客户端输出

*** GetFeature: lat=409146138 lon=-746188906
Found feature called "Berkshire Valley Management Area Trail, Jefferson, NJ, USA" at 40.9146138, -74.6188906
*** GetFeature: lat=0 lon=0
Found no feature at 0.0, 0.0
*** ListFeatures: lowLat=400000000 lowLon=-750000000 hiLat=420000000 liLon=-730000000
Result #1: name: "Patriots Path, Mendham, NJ 07945, USA"
location {
  latitude: 407838351
  longitude: -746143763
}
...
Result #64: name: "3 Hasta Way, Newton, NJ 07860, USA"
location {
  latitude: 410248224
  longitude: -747127767
}

*** RecordRoute
Visiting point 40.0066188, -74.6793294
...
Visiting point 40.4318328, -74.0835638
Finished trip with 10 points.
Passed 3 features.
Travelled 93238790 meters.
It took 9 seconds.
*** RouteChat
Sending message "First message" at 0.0, 0.0
Sending message "Second message" at 0.0, 0.0
Got message "First message" at 0.0, 0.0
Sending message "Third message" at 1.0, 0.0
Sending message "Fourth message" at 1.0, 1.0
Sending message "Last message" at 0.0, 0.0
Got message "First message" at 0.0, 0.0
Got message "Second message" at 0.0, 0.0
Finished RouteChat
上次修改时间为 2024 年 11 月 25 日:feat: move the $ shell line indicator to scss (#1354) (ab8b3af)