如何优化 gRPC - 第一部分
关于 gRPC 的一个常见问题是如何使其快速。gRPC 库为用户提供了高性能 RPC 的访问能力,但如何实现这一点并不总是很清楚。由于这个问题非常常见,我想通过展示我在调优程序时的思考过程来解答它。
环境搭建
考虑一个由多个其他程序使用的基本键值服务。该服务需要能够安全地进行并发访问,以防同时发生多次更新。它需要能够扩展以利用可用的硬件。最后,它需要快速。gRPC 非常适合这种类型的服务;让我们看看实现它的最佳方法。
为了这篇博客文章,我用 gRPC Java 写了一个示例客户端和服务器。该程序分为三个主要类和一个描述 API 的 protobuf 文件。
- KvClient 是一个模拟键值系统的用户。它随机创建、检索、更新和删除键值对。它使用的键和值的大小也使用指数分布随机决定。
- KvService 是键值服务的实现。它由 gRPC 服务器安装,用于处理客户端发出的请求。为了模拟将键值对存储到磁盘,它在处理请求时会添加短暂的睡眠。读操作和写操作会分别延迟 10 毫秒和 50 毫秒,以使示例更像持久化数据库的行为。
- KvRunner 协调客户端和服务器之间的交互。它是主入口点,在进程内启动客户端和服务器,并等待客户端执行其工作。该运行器工作 60 秒,然后记录完成了多少个 RPC。
- kvstore.proto 是我们服务的协议缓冲区定义。它精确地描述了客户端可以从服务中获得什么。为了简单起见,我们将使用 Create(创建)、Retrieve(检索)、Update(更新)和 Delete(删除)作为操作(通常称为 CRUD)。这些操作使用由任意字节组成的键和值。虽然它们有些类似于 REST,但我们保留在未来进行修改并添加更复杂操作的权利。
协议缓冲区(protos)不是使用 gRPC 的必需品,但它们是定义服务接口并生成客户端和服务端代码的一种非常方便的方式。生成的代码充当应用程序逻辑和 gRPC 核心库之间的胶水代码。我们将 gRPC 客户端调用的代码称为 stub。
起点
客户端
现在我们知道了程序应该做什么,我们可以开始查看程序的性能表现了。如上所述,客户端进行随机 RPC 调用。例如,这是用于发起创建请求的代码
private void doCreate(KeyValueServiceBlockingStub stub) {
ByteString key = createRandomKey();
try {
CreateResponse res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
if (!res.equals(CreateResponse.getDefaultInstance())) {
throw new RuntimeException("Invalid response");
}
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Code.ALREADY_EXISTS) {
knownKeys.remove(key);
logger.log(Level.INFO, "Key already existed", e);
} else {
throw e;
}
}
}
会创建一个随机键和随机值。请求发送到服务器,客户端等待响应。当响应返回时,代码会检查是否符合预期,如果不符合,则抛出异常。虽然键是随机选择的,但它们需要是唯一的,所以我们需要确保每个键尚未被使用。为了解决这个问题,代码会跟踪已创建的键,以免重复创建相同的键。然而,可能存在另一个客户端已经创建了某个键的情况,所以我们记录日志并继续。否则,就会抛出异常。
这里我们使用了阻塞式 gRPC API,它会发出请求并等待响应。这是最简单的 gRPC stub,但在运行时会阻塞线程。这意味着从客户端的角度来看,一次最多只能有一个 RPC 在进行中。
服务端
在服务端,请求由服务处理器接收
private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();
@Override
public synchronized void create(
CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
simulateWork(WRITE_DELAY_MILLIS);
if (store.putIfAbsent(key, value) == null) {
responseObserver.onNext(CreateResponse.getDefaultInstance());
responseObserver.onCompleted();
return;
}
responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}
服务从请求中将键和值提取为 ByteBuffer
对象。它获取服务自身的锁,以确保并发请求不会损坏存储。在模拟写操作的磁盘访问后,它将键值对存储在 Map
中。
与客户端代码不同,服务处理器是非阻塞式的,这意味着它不像函数调用那样返回一个值。取而代之的是,它在 responseObserver
上调用 onNext()
将响应发送回客户端。请注意,这个调用也是非阻塞的,这意味着消息可能尚未发送出去。为了表明消息处理完成,会调用 onCompleted()
。
性能
既然代码是安全且正确的,让我们看看它的性能如何。我的测量环境是我的 Ubuntu 系统,配备 12 核处理器和 32GB 内存。让我们构建并运行代码
./gradlew installDist
time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 1:10:07 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 1:11:07 PM io.grpc.examples.KvRunner runClient
INFO: Did 16.55 RPCs/s
real 1m0.927s
user 0m10.688s
sys 0m1.456s
哇!对于这样一台强大的机器来说,每秒只能完成大约 16 个 RPC。它几乎没有使用任何 CPU 资源,而且我们不知道它使用了多少内存。我们需要找出为什么它这么慢。
优化
分析
在进行任何更改之前,让我们先了解一下程序在做什么。进行优化时,我们需要知道代码把时间花在了哪里,以便知道我们可以优化什么。在早期阶段,我们还不需要性能分析工具,只需对程序进行推理即可。
客户端启动后,会串行地发出 RPC 调用,持续大约一分钟。每次迭代,它都会随机决定执行什么操作
void doClientWork(AtomicBoolean done) {
Random random = new Random();
KeyValueServiceBlockingStub stub = KeyValueServiceGrpc.newBlockingStub(channel);
while (!done.get()) {
// Pick a random CRUD action to take.
int command = random.nextInt(4);
if (command == 0) {
doCreate(stub);
continue;
}
/* ... */
rpcCount++;
}
}
这意味着一次最多只能有一个 RPC 处于活动状态。每个 RPC 都必须等待前一个完成。那么每个 RPC 需要多长时间才能完成呢?从阅读服务器代码可知,大多数操作都是写操作,耗时约 50 毫秒。在最高效率下,这段代码每秒最多能执行约 20 个操作
20 queries = 1000ms / (50 ms / query)
我们的代码每秒能执行大约 16 次查询,这看起来是合理的。我们可以通过查看运行代码所使用的 time
命令的输出来验证这一假设。服务器在 simulateWork 方法中执行查询时会进入睡眠状态。这意味着程序在等待 RPC 完成时大部分时间处于空闲状态。
我们可以通过查看上述命令输出中的 real
时间和 user
时间来确认这一点。它们显示,实际(wall clock)时间为 1 分钟,而 CPU 时间为 10 秒。我的强大、多核 CPU 只有 16% 的时间在忙碌。因此,如果我们能让程序在这段时间内完成更多工作,似乎就可以完成更多的 RPC。
假设
现在我们可以清楚地说明我们认为的问题是什么,并提出解决方案。加快程序速度的一种方法是确保 CPU 不处于空闲状态。为此,我们并发地发出工作请求。
在 gRPC Java 中,有三种类型的 stub:阻塞式、非阻塞式和 listenable future。我们已经在客户端中看到了阻塞式 stub,在服务端看到了非阻塞式 stub。listenable future API 是两者的折衷,提供了类似阻塞式和非阻塞式的行为。只要我们不阻塞线程等待工作完成,我们就可以启动新的 RPC,而不必等待旧的完成。
实验
为了验证我们的假设,我们修改客户端代码以使用 listenable future API。这意味着我们需要更多地考虑代码中的并发性。例如,在客户端跟踪已知键时,我们需要安全地读取、修改和写入这些键。我们还需要确保在出现错误时停止发起新的 RPC(适当的错误处理将在未来的文章中介绍)。最后,我们需要更新并发执行的 RPC 数量,因为更新可能发生在另一个线程中。
进行所有这些更改会增加代码的复杂性。这是你在优化代码时需要考虑的权衡。通常来说,代码的简洁性与优化是对立的。Java 不以简洁著称。尽管如此,下面的代码仍然可读,并且函数中的程序流程仍然大致从上到下。这是修改后的doCreate() 方法
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) {
ByteString key = createRandomKey();
ListenableFuture<CreateResponse> res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
res.addListener(() -> rpcCount.incrementAndGet(), MoreExecutors.directExecutor());
Futures.addCallback(res, new FutureCallback<CreateResponse>() {
@Override
public void onSuccess(CreateResponse result) {
if (!result.equals(CreateResponse.getDefaultInstance())) {
error.compareAndSet(null, new RuntimeException("Invalid response"));
}
synchronized (knownKeys) {
knownKeys.add(key);
}
}
@Override
public void onFailure(Throwable t) {
Status status = Status.fromThrowable(t);
if (status.getCode() == Code.ALREADY_EXISTS) {
synchronized (knownKeys) {
knownKeys.remove(key);
}
logger.log(Level.INFO, "Key already existed", t);
} else {
error.compareAndSet(null, t);
}
}
});
}
stub 已修改为 KeyValueServiceFutureStub
,它在调用时会生成一个 Future
对象,而不是响应本身。gRPC Java 使用了它的一个扩展,称为 ListenableFuture
,它允许在 future 完成时添加回调。对于这个程序来说,我们不太关心获取响应。相反,我们更关心 RPC 是否成功。考虑到这一点,代码主要检查错误,而不是处理响应。
所做的第一个更改是记录 RPC 数量的方式。我们不再在主循环外部增加计数器,而是在 RPC 完成时增加它。
接下来,我们为每个 RPC 创建一个新对象,该对象处理成功和失败两种情况。因为在 RPC 回调被调用时,doCreate()
方法可能已经完成,所以我们需要一种除了抛出异常之外的方式来传播错误。相反,我们尝试原子地更新一个引用。主循环会不时检查是否发生了错误,如果存在问题则停止。
代码最后,注意只有当 RPC 实际完成时才将键添加到 knownKeys
,并且只有当已知失败时才将其移除。我们对该变量进行同步,以确保两个线程不会冲突。注意:尽管对 knownKeys
的访问是线程安全的,但仍然存在 竞态条件(race conditions)。可能出现的情况是,一个线程读取 knownKeys
,第二个线程从 knownKeys
删除,然后第一个线程使用之前读取的键发起一个 RPC。对键进行同步只确保了一致性,而非正确性。正确修复这个问题超出了本文的范围,因此我们只记录事件并继续。如果运行此程序,您会看到一些这样的日志语句。
运行代码
如果您启动并运行此程序,您会发现它不起作用
WARNING: An exception was thrown by io.grpc.netty.NettyClientStream$Sink$1.operationComplete()
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
...
什么?!我为什么要给您展示失败的代码?原因在于,在现实生活中,修改代码往往第一次尝试不会成功。在这个例子中,程序耗尽了内存。当程序耗尽内存时,会开始发生奇怪的事情。通常,根本原因很难找到,并且存在许多干扰信息(red herrings)。一个令人困惑的错误消息显示“unable to create new native thread”(无法创建新的原生线程),尽管我们在代码中没有创建任何新线程。在解决这些问题时,经验比调试更有帮助。因为我调试过很多 OOM(内存溢出)问题,我碰巧知道 Java 会告诉我们“压垮骆驼的最后一根稻草”。我们的程序开始使用了远超预期的内存,但最终失败的分配恰好(碰巧)发生在线程创建时。
那么发生了什么?启动新的 RPC 没有反压机制。在阻塞版本中,新的 RPC 必须等到上一个完成才能启动。虽然慢,但这同时也阻止了我们创建大量我们没有内存容量来容纳的 RPC。在 listenable future 版本中,我们需要考虑这一点。
为了解决这个问题,我们可以对活跃的 RPC 数量施加一个自我限制。在启动新的 RPC 之前,我们将尝试获取一个许可。如果获取到许可,RPC 就可以启动。否则,我们将等待直到有许可可用。当 RPC 完成(无论成功还是失败)时,我们归还许可。为了 实现 这一点,我们将使用一个 Semaphore
private final Semaphore limiter = new Semaphore(100);
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error)
throws InterruptedException {
limiter.acquire();
ByteString key = createRandomKey();
ListenableFuture<CreateResponse> res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
res.addListener(() -> {
rpcCount.incrementAndGet();
limiter.release();
}, MoreExecutors.directExecutor());
/* ... */
}
现在代码成功运行,并且不会耗尽内存。
结果
再次构建并运行代码看起来好多了
./gradlew installDist
time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 2:40:47 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 2:41:47 PM io.grpc.examples.KvRunner runClient
INFO: Did 24.283 RPCs/s
real 1m0.923s
user 0m12.772s
sys 0m1.572s
我们的代码每秒处理的 RPC 数量比之前提高了 46%。我们还可以看到,CPU 使用量比之前增加了约 20%。正如我们所见,我们的假设是正确的,并且修复起作用了。所有这一切都无需对服务器进行任何更改。此外,我们无需使用任何特殊的性能分析器或追踪器就能进行测量。
这些数字合理吗?我们预计变动(创建、更新和删除)RPC 的发出概率各约为 1/4。读取操作的发出概率也是 1/4,但耗时较短。平均 RPC 时间应该约为加权平均 RPC 时间。
.25 * 50ms (create)
.25 * 10ms (retrieve)
.25 * 50ms (update)
+.25 * 50ms (delete)
------------
40ms
平均每 RPC 耗时 40ms,我们预计每秒的 RPC 数量为
25 次查询 = 1000ms / (40 ms / 次查询)
这与新代码的表现大致相符。服务器仍然是串行处理请求,所以未来似乎还有更多工作要做。但就目前而言,我们的优化似乎奏效了。
结论
优化 gRPC 代码有很多机会。为了利用这些机会,您需要理解您的代码正在做什么,以及它应该做什么。本文展示了如何着手和思考优化的基本方法。始终确保在更改前后进行测量,并使用这些测量结果来指导您的优化。
在第二部分中,我们将继续优化代码的服务器部分。