基础教程

Kotlin gRPC 基础教程入门。

基础教程

Kotlin gRPC 基础教程入门。

本教程为 Kotlin 程序员提供了一份 gRPC 入门基础指南。

通过此示例,您将学习如何

  • .proto 文件中定义服务。
  • 使用协议缓冲区编译器生成服务器和客户端代码。
  • 使用 Kotlin gRPC API 为你的服务编写一个简单的客户端和服务器。

你应该已经熟悉 gRPC 和协议缓冲区(Protocol Buffers);如果还不熟悉,请参阅 gRPC 简介 以及 proto3 语言指南

为什么使用 gRPC?

我们的示例是一个简单的路线映射应用程序,它允许客户端获取其路线上特征的信息,创建其路线摘要,并与服务器及其他客户端交换路线信息(例如交通更新)。

使用 gRPC,我们可以一次在 .proto 文件中定义服务,并生成 gRPC 支持的任何语言的客户端和服务器。这些客户端和服务器可以在各种环境中运行,从大型数据中心内的服务器到您的平板电脑,应有尽有——不同语言和环境之间通信的所有复杂性都由 gRPC 为您处理。我们还可以获得使用 Protocol Buffers 的所有优势,包括高效的序列化、简单的 IDL 和易于更新的接口。

设置

本教程与 快速入门 具有相同的 先决条件。在继续之前,请安装必要的 SDK 和工具。

获取示例代码

示例代码是 grpc-kotlin 仓库的一部分。

  1. 将仓库下载为 zip 文件 并解压,或者克隆仓库

    git clone --depth 1 https://github.com/grpc/grpc-kotlin
    
  2. 切换到 examples 目录

    cd grpc-kotlin/examples
    

定义服务

你的第一步(如你从 gRPC 简介 中所知)是使用 协议缓冲区 定义 gRPC 服务 以及方法的 请求响应 类型。

如果你想通过查看完整的 .proto 文件来跟随学习,请参考 protos/src/main/proto/io/grpc/examples 文件夹下的 routeguide/route_guide.proto

要定义服务,你需要像这样在 .proto 文件中指定一个命名的 service

service RouteGuide {
   ...
}

然后,您在服务定义中定义 rpc 方法,并指定它们的请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide 服务中使用

  • 简单 RPC:客户端使用存根向服务器发送请求,并等待响应返回,就像普通的函数调用一样。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • 服务端流式 RPC:客户端向服务器发送请求,并获得一个用于读取一系列消息的流。客户端会读取返回的流,直到没有更多消息为止。正如您在我们的示例中所见,通过在响应类型前放置 stream 关键字,可以指定服务端流式方法。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 客户端流式 RPC:客户端写入一系列消息并通过提供的流发送给服务器。一旦客户端完成了消息写入,它将等待服务器读取所有消息并返回其响应。通过在请求类型前放置 stream 关键字,可以指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 双向流式 RPC:双方都使用读写流发送一系列消息。这两个流独立操作,因此客户端和服务器可以按任何他们喜欢的顺序读取和写入:例如,服务器可以等待接收所有客户端消息后再写入其响应,或者交替读取一条消息然后写入一条消息,亦或是其他读写组合。每条流中消息的顺序都会得到保留。通过在请求和响应之前都放置 stream 关键字,可以指定这种类型的方法。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

.proto 文件还包含服务方法使用的所有请求和响应类型的协议缓冲区消息类型定义——例如,这是 Point 消息类型。

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务器代码

接下来,你需要从 .proto 服务定义生成 gRPC 客户端和服务器接口。你可以使用带有特殊 gRPC Kotlin 和 Java 插件的协议缓冲区编译器 protoc 来完成此操作。

使用 Gradle 或 Maven 时,protoc 构建插件会在构建过程中自动生成必要的代码。有关 Gradle 的示例,请参阅 stub/build.gradle.kts

如果你在示例文件夹中运行 ./gradlew installDist,系统会从服务定义中生成以下文件——你可以在 stub/build/generated/source/proto/main 下的子目录中找到生成的文件。

  • Feature.javaPoint.javaRectangle.java 等,它们包含了填充、序列化和检索请求与响应消息类型的所有协议缓冲区代码。

    你可以在 java/io/grpc/examples/routeguide 子目录中找到这些文件。

  • RouteGuideOuterClassGrpcKt.kt,其中包含了以下内容:

    • RouteGuideGrpcKt.RouteGuideCoroutineImplBase:供 RouteGuide 服务器实现的抽象基类,包含 RouteGuide 服务中定义的所有方法。
    • RouteGuideGrpcKt.RouteGuideCoroutineStub:客户端用于与 RouteGuide 服务器通信的类。

    你可以在 grpckt/io/grpc/examples/routeguide 下找到此 Kotlin 文件。

创建服务器

首先考虑如何创建 RouteGuide 服务器。如果你只对创建 gRPC 客户端感兴趣,可以跳过此节直接阅读 创建客户端 —— 尽管你可能也会发现本节内容很有趣!

创建 RouteGuide 服务器时,主要需要完成两件事:

  • 扩展 RouteGuideCoroutineImplBase 服务基类以执行实际的服务工作。
  • 创建并运行 gRPC 服务器以监听来自客户端的请求,并返回服务响应。

server/src/main/kotlin/io/grpc/examples 文件夹下打开示例 RouteGuide 服务器代码 routeguide/RouteGuideServer.kt

实现 RouteGuide

如你所见,服务器有一个继承自生成服务基类的 RouteGuideService 类。

class RouteGuideService(
  val features: Collection<Feature>,
  /* ... */
) : RouteGuideGrpcKt.RouteGuideCoroutineImplBase() {
  /* ... */
}
简单 RPC

RouteGuideService 实现了所有服务方法。首先考虑最简单的方法 GetFeature(),它从客户端获取一个 Point,并返回一个根据数据库中相应特征信息构建的 Feature

override suspend fun getFeature(request: Point): Feature =
    features.find { it.location == request } ?:
    // No feature was found, return an unnamed feature.
    Feature.newBuilder().apply { location = request }.build()

该方法接收客户端的 Point 消息请求作为参数,并返回一个 Feature 消息作为响应。该方法会使用适当的信息填充 Feature,然后将其返回给 gRPC 框架,由框架将其发送回客户端。

服务器端流式 RPC

接下来,考虑流式 RPC 之一。ListFeatures() 是一个服务器端流式 RPC,因此服务器可以向客户端发回多个 Feature 消息。

override fun listFeatures(request: Rectangle): Flow<Feature> =
  features.asFlow().filter { it.exists() && it.location in request }

请求对象是一个 Rectangle。服务器收集并返回给客户端所有位于该 Rectangle 内的特征对象。

客户端流式 RPC

现在考虑一个稍微复杂一点的情况:客户端流式方法 RecordRoute()。服务器从客户端接收一个 Point 对象流,并返回一个包含其经过各点的旅程信息的 RouteSummary

override suspend fun recordRoute(requests: Flow<Point>): RouteSummary {
  var pointCount = 0
  var featureCount = 0
  var distance = 0
  var previous: Point? = null
  val stopwatch = Stopwatch.createStarted(ticker)
  requests.collect { request ->
    pointCount++
    if (getFeature(request).exists()) {
      featureCount++
    }
    val prev = previous
    if (prev != null) {
      distance += prev distanceTo request
    }
    previous = request
  }
  return RouteSummary.newBuilder().apply {
    this.pointCount = pointCount
    this.featureCount = featureCount
    this.distance = distance
    this.elapsedTime = Durations.fromMicros(stopwatch.elapsed(TimeUnit.MICROSECONDS))
  }.build()
}

请求参数是一个表示为 Kotlin Flow 的客户端请求消息流。服务器像在简单 RPC 情况下一样返回单个响应。

双向流式 RPC

最后,考虑双向流式 RPC RouteChat()

override fun routeChat(requests: Flow<RouteNote>): Flow<RouteNote> =
  flow {
    // could use transform, but it's currently experimental
    requests.collect { note ->
      val notes: MutableList<RouteNote> = routeNotes.computeIfAbsent(note.location) {
        Collections.synchronizedList(mutableListOf<RouteNote>())
      }
      for (prevNote in notes.toTypedArray()) { // thread-safe snapshot
        emit(prevNote)
      }
      notes += note
    }
  }

与客户端流式示例类似,对于此方法,服务器接收 RouteNote 对象流作为 Flow。然而这一次,服务器通过方法返回的流发回 RouteNote 实例,同时客户端仍在向消息流写入消息。

启动服务器

一旦所有服务器方法实现完毕,你需要编写代码来创建一个 gRPC 服务器实例,类似于这样:

class RouteGuideServer(
    val port: Int,
    val features: Collection<Feature> = Database.features(),
    val server: Server =
      ServerBuilder.forPort(port)
        .addService(RouteGuideService(features)).build()
) {

  fun start() {
    server.start()
    println("Server started, listening on $port")
    /* ... */
  }
  /* ... */
}

fun main(args: Array<String>) {
  val port = 8980
  val server = RouteGuideServer(port)
  server.start()
  server.awaitTermination()
 }

使用 ServerBuilder 构建并启动服务器实例,如下所示:

  1. 使用 forPort() 指定服务器将监听客户端请求的端口。
  2. 创建服务实现类 RouteGuideService 的实例,并将其传递给构建器的 addService() 方法。
  3. 在构建器上调用 build()start(),为路由引导服务创建并启动 RPC 服务器。
  4. 在服务器上调用 awaitTermination() 以阻塞主函数,直到应用程序收到终止信号。

创建客户端

在本节中,你将查看 RouteGuide 服务的客户端。

如需完整客户端代码,请在 client/src/main/kotlin/io/grpc/examples 文件夹下打开 routeguide/RouteGuideClient.kt

实例化存根(Stub)

要调用服务方法,首先需要使用 ManagedChannelBuilder 创建一个 gRPC 通道(Channel)。你将使用此通道与服务器进行通信。

val channel = ManagedChannelBuilder.forAddress("localhost", 8980).usePlaintext().build()

一旦 gRPC 通道设置完成,你需要一个客户端 存根(Stub) 来执行 RPC。通过实例化 RouteGuideCoroutineStub 来获取它,该类可从根据 .proto 文件生成的包中获得。

val stub = RouteGuideCoroutineStub(channel)

调用服务方法

现在考虑你将如何调用服务方法。

简单 RPC

调用简单 RPC GetFeature() 就像调用本地方法一样直接:

val request = point(latitude, longitude)
val feature = stub.getFeature(request)

存根方法 getFeature() 执行相应的 RPC,并挂起直到 RPC 完成。

suspend fun getFeature(latitude: Int, longitude: Int) {
  val request = point(latitude, longitude)
  val feature = stub.getFeature(request)
  if (feature.exists()) { /* ... */ }
}
服务器端流式 RPC

接下来,考虑服务器端流式 ListFeatures() RPC,它返回一个地理特征流:

suspend fun listFeatures(lowLat: Int, lowLon: Int, hiLat: Int, hiLon: Int) {
  val request = Rectangle.newBuilder()
    .setLo(point(lowLat, lowLon))
    .setHi(point(hiLat, hiLon))
    .build()
  var i = 1
  stub.listFeatures(request).collect { feature ->
    println("Result #${i++}: $feature")
  }
}

存根方法 listFeatures()Flow<Feature> 实例的形式返回特征流。流的 collect() 方法允许客户端在服务器提供的特征可用时对其进行处理。

客户端流式 RPC

客户端流式 RecordRoute() RPC 向服务器发送一个 Point 消息流,并获得一个 RouteSummary 返回。

suspend fun recordRoute(points: Flow<Point>) {
  println("*** RecordRoute")
  val summary = stub.recordRoute(points)
  println("Finished trip with ${summary.pointCount} points.")
  println("Passed ${summary.featureCount} features.")
  println("Travelled ${summary.distance} meters.")
  val duration = summary.elapsedTime.seconds
  println("It took $duration seconds.")
}

该方法从与随机选择的特征列表关联的点生成路由点。随机选择是从先前加载的特征集合中进行的。

fun generateRoutePoints(features: List<Feature>, numPoints: Int): Flow<Point> = flow {
  for (i in 1..numPoints) {
    val feature = features.random(random)
    println("Visiting point ${feature.location.toStr()}")
    emit(feature.location)
    delay(timeMillis = random.nextLong(500L..1500L))
  }
}

请注意,流点是延迟发射的,也就是说,只有在服务器请求时才发射。一旦一个点被发射到流中,点生成器就会挂起,直到服务器请求下一个点。

双向流式 RPC

最后,考虑双向流式 RPC RouteChat()。正如 RecordRoute() 的情况,你向存根方法传递一个用于写入请求消息的流;就像 ListFeatures() 一样,你会得到一个用于读取响应消息的流。然而,这次你通过方法返回的流发送值,同时服务器也在向消息流写入消息。

suspend fun routeChat() {
  val requests = generateOutgoingNotes()
  stub.routeChat(requests).collect { note ->
    println("Got message \"${note.message}\" at ${note.location.toStr()}")
  }
  println("Finished RouteChat")
}

private fun generateOutgoingNotes(): Flow<RouteNote> = flow {
  val notes = listOf(/* ... */)
  for (note in notes) {
    println("Sending message \"${note.message}\" at ${note.location.toStr()}")
    emit(note)
    delay(500)
  }
}

这里的读写语法与客户端和服务端流式方法非常相似。尽管每一方都会始终按写入顺序接收对方的消息,但客户端和服务器都可以按任何顺序读取和写入——流是完全独立操作的。

尝试一下!

grpc-kotlin/examples 目录下运行以下命令:

  1. 编译客户端和服务器

    ./gradlew installDist
    
  2. 运行服务器

    ./server/build/install/server/bin/route-guide-server
    Server started, listening on 8980
    
  3. 在另一个终端中,运行客户端

    ./client/build/install/client/bin/route-guide-client
    

你将看到如下客户端输出:

*** GetFeature: lat=409146138 lon=-746188906
Found feature called "Berkshire Valley Management Area Trail, Jefferson, NJ, USA" at 40.9146138, -74.6188906
*** GetFeature: lat=0 lon=0
Found no feature at 0.0, 0.0
*** ListFeatures: lowLat=400000000 lowLon=-750000000 hiLat=420000000 liLon=-730000000
Result #1: name: "Patriots Path, Mendham, NJ 07945, USA"
location {
  latitude: 407838351
  longitude: -746143763
}
...
Result #64: name: "3 Hasta Way, Newton, NJ 07860, USA"
location {
  latitude: 410248224
  longitude: -747127767
}

*** RecordRoute
Visiting point 40.0066188, -74.6793294
...
Visiting point 40.4318328, -74.0835638
Finished trip with 10 points.
Passed 3 features.
Travelled 93238790 meters.
It took 9 seconds.
*** RouteChat
Sending message "First message" at 0.0, 0.0
Sending message "Second message" at 0.0, 0.0
Got message "First message" at 0.0, 0.0
Sending message "Third message" at 1.0, 0.0
Sending message "Fourth message" at 1.0, 1.0
Sending message "Last message" at 0.0, 0.0
Got message "First message" at 0.0, 0.0
Got message "Second message" at 0.0, 0.0
Finished RouteChat