如果你想优化 gRPC - 第 1 部分
一个常见的 gRPC 问题是如何使其变得快速。gRPC 库为用户提供了高性能 RPC 的访问权限,但如何实现这一点并不总是清晰的。因为这个问题足够常见,所以我决定尝试展示我在调整程序时的思考过程。
设置
考虑一个被多个其他程序使用的基本键值服务。该服务需要支持并发访问,以防多个更新同时发生。它需要能够扩展以利用可用的硬件。最后,它需要快速。gRPC 非常适合这种类型的服务;让我们来看看实现它的最佳方式。
对于这篇博文,我使用 gRPC Java 编写了一个示例客户端和服务器。该程序分为三个主要类和一个描述 API 的 protobuf 文件。
- KvClient 是键值系统的一个模拟用户。它随机创建、检索、更新和删除键值。它使用的键和值的大小也是使用指数分布随机决定的。
- KvService 是键值服务的实现。它由 gRPC 服务器安装,用于处理客户端发出的请求。为了模拟将键值存储在磁盘上,它在处理请求时会添加短暂的休眠。读写操作将分别经历10毫秒和50毫秒的延迟,使示例更像一个持久化数据库。
- KvRunner 协调客户端和服务器之间的交互。它是主要的入口点,在进程中启动客户端和服务器,并等待客户端执行其工作。Runner 工作60秒,然后记录完成了多少个 RPC。
- kvstore.proto 是我们服务的协议缓冲区定义。它准确地描述了客户端可以从服务中获得什么。为了简单起见,我们将使用创建(Create)、检索(Retrieve)、更新(Update)和删除(Delete)作为操作(通常称为 CRUD)。这些操作处理由任意字节组成的键和值。虽然它们有点像 REST,但我们保留在未来进行修改并添加更复杂操作的权利。
协议缓冲区(protos)并不是使用 gRPC 的必需条件,但它们是定义服务接口和生成客户端与服务器代码的一种非常方便的方式。生成的代码充当应用程序逻辑和 gRPC 核心库之间的胶水代码。我们称 gRPC 客户端调用的代码为 stub。
起点
客户端
现在我们知道了程序应该做什么,我们可以开始查看程序的性能。如上所述,客户端会随机发出 RPC。例如,这是发出创建请求的代码:
private void doCreate(KeyValueServiceBlockingStub stub) {
ByteString key = createRandomKey();
try {
CreateResponse res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
if (!res.equals(CreateResponse.getDefaultInstance())) {
throw new RuntimeException("Invalid response");
}
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Code.ALREADY_EXISTS) {
knownKeys.remove(key);
logger.log(Level.INFO, "Key already existed", e);
} else {
throw e;
}
}
}
一个随机键和一个随机值被创建。请求被发送到服务器,客户端等待响应。当响应返回时,代码会检查它是否符合预期,如果不符则抛出异常。虽然键是随机选择的,但它们需要是唯一的,因此我们需要确保每个键都没有被使用过。为了解决这个问题,代码会跟踪已创建的键,以免重复创建相同的键。但是,如果另一个客户端已经创建了某个特定的键,我们则会记录下来并继续。否则,将抛出异常。
我们在这里使用**阻塞式** gRPC API,它发出请求并等待响应。这是最简单的 gRPC stub,但在运行时会阻塞线程。这意味着从客户端的角度看,最多只能同时处理**一个** RPC。
服务器
在服务器端,请求由服务处理器接收。
private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();
@Override
public synchronized void create(
CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
simulateWork(WRITE_DELAY_MILLIS);
if (store.putIfAbsent(key, value) == null) {
responseObserver.onNext(CreateResponse.getDefaultInstance());
responseObserver.onCompleted();
return;
}
responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}
服务从请求中提取键和值作为 `ByteBuffer`。它会锁定服务本身,以确保并发请求不会破坏存储。在模拟写入的磁盘访问后,它将其存储在键值 `Map` 中。
与客户端代码不同,服务处理器是**非阻塞的**,这意味着它不像函数调用那样返回一个值。相反,它会在 `responseObserver` 上调用 `onNext()` 以将响应发送回客户端。请注意,这个调用也是非阻塞的,这意味着消息可能尚未发送。为了表示我们已经处理完消息,会调用 `onCompleted()`。
性能
既然代码是安全且正确的,让我们看看它的性能如何。为了我的测量,我使用了我的 Ubuntu 系统,它有一个12核处理器和32GB内存。让我们构建并运行代码:
./gradlew installDist
time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 1:10:07 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 1:11:07 PM io.grpc.examples.KvRunner runClient
INFO: Did 16.55 RPCs/s
real 1m0.927s
user 0m10.688s
sys 0m1.456s
哎呀!对于这样一台强大的机器,它每秒只能处理大约16个 RPC。它几乎没有用到任何 CPU,我们也不知道它使用了多少内存。我们需要找出它为什么这么慢。
优化
分析
在进行任何更改之前,我们先来理解程序正在做什么。在优化时,我们需要知道代码将时间花费在哪里,才能知道可以优化什么。在这个早期阶段,我们还不需要性能分析工具,只需对程序进行推理即可。
客户端启动后,会串行地发出大约一分钟的 RPC。每次迭代,它都会随机决定要执行什么操作:
void doClientWork(AtomicBoolean done) {
Random random = new Random();
KeyValueServiceBlockingStub stub = KeyValueServiceGrpc.newBlockingStub(channel);
while (!done.get()) {
// Pick a random CRUD action to take.
int command = random.nextInt(4);
if (command == 0) {
doCreate(stub);
continue;
}
/* ... */
rpcCount++;
}
}
这意味着**任何时候最多只有一个 RPC 处于活动状态**。每个 RPC 都必须等待前一个完成。那么每个 RPC 完成需要多长时间?从服务器代码来看,大多数操作都在进行写入,这大约需要50毫秒。在最高效率下,这段代码每秒最多可以执行大约20个操作:
20 次查询 = 1000ms / (50 毫秒/查询)
我们的代码每秒可以执行大约16次查询,所以这看起来是正确的。我们可以通过查看用于运行代码的 `time` 命令的输出,来核对这个假设。服务器在执行查询时会在simulateWork方法中进入休眠。这意味着程序在等待 RPC 完成时大部分时间应该处于空闲状态。
我们可以通过查看上述命令的 `real` 时间和 `user` 时间来确认这一点。它们表示“墙钟时间”为1分钟,而“CPU 时间”为10秒。我强大的多核 CPU 只有16%的时间处于繁忙状态。因此,如果我们能让程序在这段时间内做更多的工作,似乎我们就能完成更多的 RPC。
假设
现在我们可以清楚地说明我们认为的问题,并提出解决方案。一种加速程序的方法是确保 CPU 不空闲。为此,我们并发地发出工作。
在 gRPC Java 中,有三种类型的 stub:阻塞式、非阻塞式和可监听的 Future。我们已经在客户端看到了阻塞式 stub,在服务器端看到了非阻塞式 stub。可监听的 Future API 介于两者之间,提供了类似于阻塞和非阻塞的行为。只要我们不阻塞线程等待工作完成,我们就可以启动新的 RPC,而无需等待旧的 RPC 完成。
实验
为了验证我们的假设,让我们修改客户端代码以使用可监听的 Future API。这意味着我们需要更多地考虑代码中的并发性。例如,在客户端跟踪已知键时,我们需要安全地读取、修改和写入键。我们还需要确保在发生错误时停止发出新的 RPC(正确的错误处理将在未来的文章中介绍)。最后,我们需要更新并发发出的 RPC 数量,因为更新可能发生在另一个线程中。
进行所有这些更改会增加代码的复杂性。这是您在优化代码时需要考虑的权衡。通常,代码的简洁性与优化是矛盾的。Java 并不以简洁著称。尽管如此,下面的代码仍然可读,并且程序流程在函数中仍然大致是从上到下。这是修订后的doCreate()方法:
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) {
ByteString key = createRandomKey();
ListenableFuture<CreateResponse> res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
res.addListener(() -> rpcCount.incrementAndGet(), MoreExecutors.directExecutor());
Futures.addCallback(res, new FutureCallback<CreateResponse>() {
@Override
public void onSuccess(CreateResponse result) {
if (!result.equals(CreateResponse.getDefaultInstance())) {
error.compareAndSet(null, new RuntimeException("Invalid response"));
}
synchronized (knownKeys) {
knownKeys.add(key);
}
}
@Override
public void onFailure(Throwable t) {
Status status = Status.fromThrowable(t);
if (status.getCode() == Code.ALREADY_EXISTS) {
synchronized (knownKeys) {
knownKeys.remove(key);
}
logger.log(Level.INFO, "Key already existed", t);
} else {
error.compareAndSet(null, t);
}
}
});
}
stub 已被修改为 `KeyValueServiceFutureStub`,它在调用时会产生一个 `Future`,而不是直接返回响应。gRPC Java 使用它的一个扩展,称为 `ListenableFuture`,它允许在 Future 完成时添加回调。对于本程序而言,我们并不那么关心获取响应。相反,我们更关心 RPC 是否成功。考虑到这一点,代码主要检查错误,而不是处理响应。
第一个改变是 RPC 数量的记录方式。我们不再在主循环之外增加计数器,而是在 RPC 完成时增加它。
接下来,我们为每个 RPC 创建一个新对象,该对象处理成功和失败两种情况。由于 `doCreate()` 在 RPC 回调被调用时已经完成,我们需要一种方式来传播错误,而不是通过抛出异常。相反,我们尝试原子地更新一个引用。主循环将偶尔检查是否发生了错误,如果出现问题则停止。
最后,代码小心地仅在 RPC 实际完成时才将键添加到 `knownKeys` 中,并且仅在已知失败时才将其删除。我们对变量进行同步以确保两个线程不会冲突。注意:尽管 `knownKeys` 的访问是线程安全的,但仍然存在竞态条件。一个线程可能从 `knownKeys` 读取,第二个线程从 `knownKeys` 删除,然后第一个线程使用第一个键发出 RPC。对键的同步仅确保其一致性,而不是正确性。正确地修复这个问题超出了本文的范围,因此我们只是记录事件并继续。如果您运行此程序,您会看到一些这样的日志语句。
运行代码
如果你启动并运行这个程序,你会发现它无法工作。
WARNING: An exception was thrown by io.grpc.netty.NettyClientStream$Sink$1.operationComplete()
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
...
什么?!我为什么要给你看一个失败的代码?原因是在现实生活中,一次改动往往不能一次成功。在这种情况下,程序内存不足。当程序内存不足时,会发生奇怪的事情。通常,根本原因很难找到,而且会充斥着误导性的线索。一个令人困惑的错误消息说“无法创建新的原生线程”,即使我们没有在代码中创建任何新线程。经验对于解决这些问题而非调试非常有帮助。由于我调试过很多次内存溢出,我碰巧知道 Java 会告诉我们压垮骆驼的最后一根稻草。我们的程序开始使用更多的内存,但最终失败的内存分配碰巧发生在线程创建中。
那么发生了什么?*没有阻止新 RPC 启动的回压。* 在阻塞版本中,新的 RPC 必须等到上一个完成才能启动。虽然慢,但也阻止了我们创建大量没有内存可用的 RPC。我们需要在可监听的 Future 版本中考虑到这一点。
为了解决这个问题,我们可以对活跃 RPC 的数量施加一个自我限制。在启动一个新的 RPC 之前,我们将尝试获取一个许可证。如果获得,RPC 可以启动。否则,我们将等待直到有可用许可证。当一个 RPC 完成(成功或失败)时,我们返回许可证。为了实现这一点,我们将使用 `Semaphore`:
private final Semaphore limiter = new Semaphore(100);
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error)
throws InterruptedException {
limiter.acquire();
ByteString key = createRandomKey();
ListenableFuture<CreateResponse> res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
res.addListener(() -> {
rpcCount.incrementAndGet();
limiter.release();
}, MoreExecutors.directExecutor());
/* ... */
}
现在代码成功运行,并且不再出现内存不足的情况。
结果
再次构建并运行代码看起来好多了:
./gradlew installDist
time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 2:40:47 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 2:41:47 PM io.grpc.examples.KvRunner runClient
INFO: Did 24.283 RPCs/s
real 1m0.923s
user 0m12.772s
sys 0m1.572s
我们的代码每秒完成的 RPC 比以前**增加了46%**。我们还可以看到,CPU 使用率比以前增加了大约20%。正如我们所看到的,我们的假设被证明是正确的,并且修复奏效了。所有这一切都是在不改变服务器的情况下发生的。此外,我们无需使用任何特殊的分析器或跟踪器就能进行测量。
这些数字合理吗?我们预计变异(创建、更新和删除)RPC 大约以 1/4 的概率发出。读取也以 1/4 的时间发出,但花费时间较短。平均 RPC 时间应该约为加权平均 RPC 时间:
.25 * 50ms (create)
.25 * 10ms (retrieve)
.25 * 50ms (update)
+.25 * 50ms (delete)
------------
40ms
平均每次 RPC 耗时 40 毫秒,我们预计每秒的 RPC 数量为:
25 次查询 = 1000 毫秒 / (40 毫秒 / 查询)
这与新代码看到的结果大致相符。服务器仍在串行处理请求,所以看来我们未来还有更多工作要做。但就目前而言,我们的优化似乎已经奏效。
结论
优化 gRPC 代码有很多机会。要利用这些机会,你需要了解你的代码正在做什么,以及你的代码应该做什么。这篇博文展示了如何着手思考优化的基础知识。始终确保在更改前后进行测量,并利用这些测量结果指导你的优化。
在第二部分,我们将继续优化代码的服务器部分。