RSS

所以你想优化 gRPC - 第 1 部分

关于 gRPC 的一个常见问题是如何使其快速运行。gRPC 库为用户提供了对高性能 RPC 的访问,但如何实现这一点并不总是很清楚。因为这个问题很常见,所以我想我会尝试展示我在调整程序时的思考过程。

设置

考虑一个由多个其他程序使用的基本键值服务。该服务需要对并发访问安全,以防同时发生多个更新。它需要能够扩展以使用可用的硬件。最后,它需要快速。gRPC 非常适合这种类型的服务;让我们看看实现它的最佳方法。

对于这篇博文,我编写了一个使用 gRPC Java 的示例 客户端和服务器。该程序分为三个主要类和一个描述 API 的 protobuf 文件

  • KvClient 是键值系统的模拟用户。它随机创建、检索、更新和删除键和值。它使用的键和值的大小也使用 指数分布 随机决定。
  • KvService 是键值服务的实现。它由 gRPC 服务器安装,以处理客户端发出的请求。为了模拟将键和值存储在磁盘上,它在处理请求时添加了短暂的休眠。读取和写入将遇到 10 和 50 毫秒的延迟,以使示例更像持久数据库。
  • KvRunner 协调客户端和服务器之间的交互。它是主入口点,在进程中启动客户端和服务器,并等待客户端执行其工作。运行器工作 60 秒,然后记录完成的 RPC 数量。
  • kvstore.proto 是我们服务的协议缓冲区定义。它精确地描述了客户端可以从服务中期望什么。为了简单起见,我们将使用创建、检索、更新和删除作为操作(通常称为 CRUD)。这些操作使用由任意字节组成的键和值。虽然它们有些类似于 REST,但我们保留在未来偏离并添加更复杂操作的权利。

Protocol buffers(protobuf)不是使用 gRPC 的必要条件,但它们是定义服务接口和生成客户端和服务器代码的非常方便的方式。生成的代码充当应用程序逻辑和核心 gRPC 库之间的粘合代码。我们称 gRPC 客户端调用的代码为存根

起点

客户端

现在我们知道了程序应该做什么,我们可以开始研究程序的性能了。如上所述,客户端进行随机 RPC 调用。例如,这是发出创建请求的代码

private void doCreate(KeyValueServiceBlockingStub stub) {
  ByteString key = createRandomKey();
  try {
    CreateResponse res = stub.create(
        CreateRequest.newBuilder()
            .setKey(key)
            .setValue(randomBytes(MEAN_VALUE_SIZE))
            .build());
    if (!res.equals(CreateResponse.getDefaultInstance())) {
      throw new RuntimeException("Invalid response");
    }
  } catch (StatusRuntimeException e) {
    if (e.getStatus().getCode() == Code.ALREADY_EXISTS) {
      knownKeys.remove(key);
      logger.log(Level.INFO, "Key already existed", e);
    } else {
      throw e;
    }
  }
}

创建一个随机键以及一个随机值。请求被发送到服务器,客户端等待响应。当响应返回时,代码会检查响应是否符合预期,如果不符合,则抛出异常。虽然键是随机选择的,但它们必须是唯一的,因此我们需要确保每个键都没有被使用。为了解决这个问题,代码会跟踪它创建的键,以避免两次创建同一个键。但是,可能另一个客户端已经创建了特定的键,所以我们会记录它并继续。否则,会抛出异常。

我们在这里使用阻塞式 gRPC API,它发出请求并等待响应。这是最简单的 gRPC 存根,但它在运行时会阻塞线程。这意味着从客户端的角度来看,一次最多只能有一个 RPC 在进行中。

服务器

在服务器端,请求由服务处理程序接收。

private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();

@Override
public synchronized void create(
    CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
  ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
  ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
  simulateWork(WRITE_DELAY_MILLIS);
  if (store.putIfAbsent(key, value) == null) {
    responseObserver.onNext(CreateResponse.getDefaultInstance());
    responseObserver.onCompleted();
    return;
  }
  responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}

服务从请求中提取键和值作为 ByteBuffer。它获取服务本身的锁,以确保并发请求不会损坏存储。在模拟写入磁盘访问后,它将其存储在键值 Map 中。

与客户端代码不同,服务处理程序是非阻塞的,这意味着它不会像函数调用那样返回值。相反,它会调用 responseObserver 上的 onNext() 将响应发送回客户端。请注意,此调用也是非阻塞的,这意味着消息可能尚未发送。为了表明我们已完成消息,会调用 onCompleted()

性能

由于代码是安全且正确的,让我们看看它的性能如何。在我的测量中,我使用我的 Ubuntu 系统,配备 12 核处理器和 32 GB 内存。让我们构建并运行代码

./gradlew installDist
time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 1:10:07 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 1:11:07 PM io.grpc.examples.KvRunner runClient
INFO: Did 16.55 RPCs/s

real	1m0.927s
user	0m10.688s
sys	0m1.456s

哎呀!对于如此强大的机器,它每秒只能处理大约 16 个 RPC。它几乎没有使用我们的 CPU,而且我们不知道它使用了多少内存。我们需要弄清楚为什么它如此之慢。

优化

分析

在我们进行任何更改之前,让我们先了解程序在做什么。在优化时,我们需要知道代码将时间花费在哪里,以便知道我们可以优化什么。在这个早期阶段,我们还不需要性能分析工具,我们只需要推断程序即可。

客户端启动并串行发出 RPC 大约一分钟。每次迭代,它随机决定要执行什么操作

void doClientWork(AtomicBoolean done) {
  Random random = new Random();
  KeyValueServiceBlockingStub stub = KeyValueServiceGrpc.newBlockingStub(channel);

  while (!done.get()) {
    // Pick a random CRUD action to take.
    int command = random.nextInt(4);
    if (command == 0) {
      doCreate(stub);
      continue;
    }
    /* ... */
    rpcCount++;
  }
}

这意味着在任何时候最多只能有一个 RPC 处于活动状态。每个 RPC 都必须等待前一个 RPC 完成。而且每个 RPC 需要多长时间才能完成?从读取服务器代码来看,大多数操作都在进行写入,这大约需要 50 毫秒。在最高效率下,此代码每秒最多可以执行大约 20 个操作

20 次查询 = 1000 毫秒 / (50 毫秒 / 查询)

我们的代码在一秒内可以执行大约 16 次查询,所以这看起来差不多是对的。我们可以通过查看用于运行代码的 time 命令的输出来快速检查此假设。当在 simulateWork 方法中运行查询时,服务器会进入休眠状态。这意味着程序在等待 RPC 完成时应该大部分时间处于空闲状态。

我们可以通过查看上面命令的 realuser 时间来确认情况确实如此。它们表明 挂钟时间 为 1 分钟,而 cpu 时间为 10 秒。我强大的多核 CPU 只有 16% 的时间处于繁忙状态。因此,如果我们可以让程序在这段时间内完成更多的工作,似乎我们可以完成更多的 RPC。

假设

现在我们可以清楚地说明我们认为的问题是什么,并提出一个解决方案。加速程序的一种方法是确保 CPU 不会空闲。为此,我们并发地发出工作。

在 gRPC Java 中,有三种类型的存根:阻塞、非阻塞和可监听的 Future。我们已经在客户端中看到了阻塞存根,并在服务器中看到了非阻塞存根。可监听的 Future API 是两者之间的折衷方案,提供类似阻塞和非阻塞的行为。只要我们不阻塞线程等待工作完成,我们就可以启动新的 RPC,而无需等待旧的 RPC 完成。

实验

为了测试我们的假设,让我们修改客户端代码以使用可监听的 Future API。这意味着我们需要更多地考虑代码中的并发性。例如,当在客户端跟踪已知键时,我们需要安全地读取、修改和写入键。我们还需要确保在发生错误时停止创建新的 RPC(适当的错误处理将在以后的文章中介绍)。最后,我们需要更新并发执行的 RPC 数量,因为更新可能发生在另一个线程中。

进行所有这些更改会增加代码的复杂性。这是你在优化代码时需要考虑的权衡。一般来说,代码的简洁性与优化是矛盾的。Java 并不以简洁而闻名。也就是说,下面的代码仍然是可读的,并且程序流程仍然大致是从函数顶部到底部。这是修改后的 doCreate() 方法

private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) {
  ByteString key = createRandomKey();
  ListenableFuture<CreateResponse> res = stub.create(
      CreateRequest.newBuilder()
          .setKey(key)
          .setValue(randomBytes(MEAN_VALUE_SIZE))
          .build());
  res.addListener(() -> rpcCount.incrementAndGet(), MoreExecutors.directExecutor());
  Futures.addCallback(res, new FutureCallback<CreateResponse>() {
    @Override
    public void onSuccess(CreateResponse result) {
      if (!result.equals(CreateResponse.getDefaultInstance())) {
        error.compareAndSet(null, new RuntimeException("Invalid response"));
      }
      synchronized (knownKeys) {
        knownKeys.add(key);
      }
    }

    @Override
    public void onFailure(Throwable t) {
      Status status = Status.fromThrowable(t);
      if (status.getCode() == Code.ALREADY_EXISTS) {
        synchronized (knownKeys) {
          knownKeys.remove(key);
        }
        logger.log(Level.INFO, "Key already existed", t);
      } else {
        error.compareAndSet(null, t);
      }
    }
  });
}

存根已修改为 KeyValueServiceFutureStub,它在调用时产生 Future 而不是响应本身。gRPC Java 使用此扩展的名为 ListenableFuture 的扩展,它允许在 Future 完成时添加回调。为了这个程序的缘故,我们不太关心获取响应。相反,我们更关心 RPC 是否成功。考虑到这一点,代码主要检查错误,而不是处理响应。

第一个改动是 RPC 数量的记录方式。我们不再在主循环外部增加计数器,而是在 RPC 完成时增加计数器。

接下来,我们为每个 RPC 创建一个新对象,该对象处理成功和失败两种情况。因为当 RPC 回调被调用时,doCreate() 已经完成,我们需要一种方法来传播错误,而不是抛出异常。相反,我们尝试原子地更新引用。主循环会偶尔检查是否发生错误,如果出现问题则停止。

最后,代码会小心地仅在 RPC 实际完成时才将键添加到 knownKeys 中,并且仅在已知失败时才将其删除。我们对该变量进行同步,以确保两个线程不会冲突。注意:虽然对 knownKeys 的访问是线程安全的,但仍然存在竞态条件。一个线程可能从 knownKeys 读取,第二个线程从 knownKeys 删除,然后第一个线程使用第一个键发出 RPC。对键进行同步只能确保其一致性,而不能保证其正确性。正确修复此问题不在本文的范围内,因此我们只是记录该事件并继续。如果您运行此程序,将会看到一些这样的日志语句。

运行代码

如果您启动并运行此程序,您会发现它无法工作。

WARNING: An exception was thrown by io.grpc.netty.NettyClientStream$Sink$1.operationComplete()
java.lang.OutOfMemoryError: unable to create new native thread
	at java.lang.Thread.start0(Native Method)
	at java.lang.Thread.start(Thread.java:714)
	...

什么?!为什么我要向您展示失败的代码?原因是现实生活中,更改通常不会在第一次尝试时就成功。在这种情况下,程序耗尽了内存。当程序耗尽内存时,会开始发生奇怪的事情。通常,根本原因很难找到,并且会出现许多误导信息。一个令人困惑的错误消息说“无法创建新的本地线程”,即使我们在代码中没有创建任何新线程。解决这些问题,经验比调试更有帮助。因为我调试过很多 OOM,我恰好知道 Java 会告诉我们压垮骆驼的最后一根稻草。我们的程序开始使用更多的内存,但最终失败的分配恰好发生在线程创建中。

那么发生了什么?没有对启动新 RPC 进行反压。 在阻塞版本中,新的 RPC 必须在上一个 RPC 完成后才能启动。虽然速度较慢,但它也阻止了我们创建大量没有内存的 RPC。我们需要在可监听的 Future 版本中考虑这一点。

为了解决这个问题,我们可以对活动 RPC 的数量施加一个自我限制。在启动新的 RPC 之前,我们将尝试获取许可。如果我们获得许可,则可以启动 RPC。如果没有,我们将等待直到有许可可用。当 RPC 完成(无论成功还是失败)时,我们返回许可。为了实现 这一点,我们将使用 Semaphore

private final Semaphore limiter = new Semaphore(100);

private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error)
    throws InterruptedException {
  limiter.acquire();
  ByteString key = createRandomKey();
  ListenableFuture<CreateResponse> res = stub.create(
      CreateRequest.newBuilder()
          .setKey(key)
          .setValue(randomBytes(MEAN_VALUE_SIZE))
          .build());
  res.addListener(() ->  {
    rpcCount.incrementAndGet();
    limiter.release();
  }, MoreExecutors.directExecutor());
  /* ... */
}

现在代码成功运行,并且不会耗尽内存。

结果

再次构建并运行代码看起来好多了。

./gradlew installDist
time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 2:40:47 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 2:41:47 PM io.grpc.examples.KvRunner runClient
INFO: Did 24.283 RPCs/s

real	1m0.923s
user	0m12.772s
sys	0m1.572s

我们的代码每秒比以前多执行 46% 的 RPC。我们还可以看到,我们使用的 CPU 比以前多大约 20%。正如我们所见,我们的假设证明是正确的,并且修复工作奏效了。所有这一切都发生在没有对服务器进行任何更改的情况下。此外,我们能够在不使用任何特殊分析器或跟踪器的情况下进行测量。

这些数字有意义吗?我们预计每次发出突变(创建、更新和删除)RPC 的概率约为 1/4。读取也以 1/4 的时间发出,但不需要那么长时间。平均 RPC 时间应约为加权平均 RPC 时间。

  .25 * 50ms (create)
  .25 * 10ms (retrieve)
  .25 * 50ms (update)
 +.25 * 50ms (delete)
------------
        40ms

平均每次 RPC 40 毫秒,我们预计每秒的 RPC 数量为

25 个查询 = 1000 毫秒 / (40 毫秒 / 查询)

这与我们在新代码中看到的大致相同。服务器仍然串行处理请求,因此我们似乎在将来还有更多工作要做。但就目前而言,我们的优化似乎奏效了。

结论

有很多机会可以优化您的 gRPC 代码。要利用这些机会,您需要了解您的代码正在做什么以及您的代码应该做什么。这篇文章展示了如何进行优化和思考优化的基本知识。始终确保在更改前后进行测量,并使用这些测量结果来指导您的优化。

第 2 部分中,我们将继续优化代码的服务器部分。