RSS

你想优化 gRPC 吗?- 第二部分

gRPC 有多快?如果你理解现代客户端和服务器是如何构建的,它会非常快。在第一部分中,我展示了如何轻松获得 60% 的改进。在这篇文章中,我将展示如何获得 10000% 的改进。

设置

就像在第一部分中一样,我们将从一个现有的、基于 Java 的键值服务开始。该服务将提供并发访问,用于创建、读取、更新和删除键值。所有代码都可以在这里看到,如果你想尝试一下。

服务器并发

我们来看看 KvService 类。该服务处理客户端发送的 RPC,确保它们都不会意外地破坏存储的状态。为了确保这一点,该服务使用 synchronized 关键字来确保一次只有一个 RPC 处于活动状态

private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();

@Override
public synchronized void create(
    CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
  ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
  ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
  simulateWork(WRITE_DELAY_MILLIS);
  if (store.putIfAbsent(key, value) == null) {
    responseObserver.onNext(CreateResponse.getDefaultInstance());
    responseObserver.onCompleted();
    return;
  }
  responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}

虽然这段代码是线程安全的,但付出了高昂的代价:一次只能有一个 RPC 处于活动状态!我们需要某种方法允许多个操作同时安全地进行。否则,程序就无法利用所有可用的处理器。

打破锁

为了解决这个问题,我们需要更多地了解 RPC 的语义。我们对 RPC 工作方式了解得越多,就可以进行更多的优化。对于键值服务,我们注意到对不同键的操作互不干扰。当我们更新键“foo”时,它不会影响存储在键“bar”上的值。但是,我们的服务器编写方式使得对任何键的操作都必须相互同步。如果我们可以让对不同键的操作并发进行,我们的服务器就可以处理更大的负载。

有了这个想法后,我们需要弄清楚如何修改服务器。synchronized 关键字会使 Java 获取 this(即 KvService 的实例)上的一个锁。锁在进入 create 方法时获取,并在返回时释放。我们需要同步的原因是为了保护 store Map。由于它被实现为 HashMap,对其的修改会改变内部数组。由于 HashMap 的内部状态如果没有正确同步就会被破坏,我们不能直接移除方法上的同步。

然而,Java 在这里提供了一个解决方案:ConcurrentHashMap。该类提供了安全地并发访问 Map 内容的能力。例如,在我们的用法中,我们想检查一个键是否存在。如果不存在,我们想添加它;否则,我们想返回一个错误。putIfAbsent 方法会原子地检查一个值是否存在,如果不存在则添加它,并告诉我们是否成功。

并发 Map 提供了关于 putIfAbsent 安全性的更强保证,因此我们可以将 HashMap 替换为 ConcurrentHashMap 并移除 synchronized

private final ConcurrentMap<ByteBuffer, ByteBuffer> store = new ConcurrentHashMap<>();

@Override
public void create(
    CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
  ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
  ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
  simulateWork(WRITE_DELAY_MILLIS);
  if (store.putIfAbsent(key, value) == null) {
    responseObserver.onNext(CreateResponse.getDefaultInstance());
    responseObserver.onCompleted();
    return;
  }
  responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}

初试不成

更新 create 相当容易。对 retrievedelete 做同样的操作也很容易。然而,update 方法有点棘手。我们来看看它在做什么

@Override
public synchronized void update(
    UpdateRequest request, StreamObserver<UpdateResponse> responseObserver) {
  ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
  ByteBuffer newValue = request.getValue().asReadOnlyByteBuffer();
  simulateWork(WRITE_DELAY_MILLIS);
  ByteBuffer oldValue = store.get(key);
  if (oldValue == null) {
    responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
    return;
  }
  store.replace(key, oldValue, newValue);
  responseObserver.onNext(UpdateResponse.getDefaultInstance());
  responseObserver.onCompleted();
}

将键更新为新值需要与 store 进行两次交互

  1. 检查键是否存在。
  2. 将旧值更新为新值。

遗憾的是,ConcurrentMap 没有一个直接的方法可以做到这一点。由于我们可能不是唯一修改 Map 的人,我们需要处理我们的假设可能已经改变的可能性。我们读出了旧值,但当我们替换它时,它可能已经被删除了。

为了解决这个问题,如果 replace 失败,我们再试一次。如果替换成功,它返回 true。(ConcurrentMap 保证操作不会破坏内部结构,但不能保证它们会成功!)我们将使用一个 do-while 循环

@Override
public void update(
    UpdateRequest request, StreamObserver<UpdateResponse> responseObserver) {
  // ...
  ByteBuffer oldValue;
  do {
    oldValue = store.get(key);
    if (oldValue == null) {
      responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
      return;
    }
  } while (!store.replace(key, oldValue, newValue));
  responseObserver.onNext(UpdateResponse.getDefaultInstance());
  responseObserver.onCompleted();
}

如果代码看到 null,就希望失败,但如果存在非 null 的旧值,则不会失败。需要注意的是,如果另一个 RPC 在 store.get() 调用和 store.replace() 调用之间修改了值,它将失败。这对我们来说是一个非致命错误,所以我们只需再试一次。一旦成功放入新值,服务就可以向用户响应。

还有一种可能性可能发生:两个 RPC 可能更新相同的值并覆盖彼此的工作。虽然这对某些应用程序来说可能没问题,但对于提供事务性的 API 来说是不合适的。如何在本文中解决这个问题超出了讨论范围,但请注意它可能发生。

衡量性能

在上一篇文章中,我们修改了客户端以使其成为异步的,并使用 gRPC ListenableFuture API。为了避免内存不足,客户端被修改为一次最多有 100 个活动的 RPC。正如我们现在从服务器代码中看到的,性能瓶颈在于获取锁。由于我们已经移除了这些锁,我们期望看到 100 倍的改进。每个 RPC 完成的工作量相同,但同时发生的 RPC 数量更多。让我们看看我们的假设是否成立

之前

./gradlew installDist
time ./build/install/kvstore/bin/kvstore
Apr 16, 2018 10:38:42 AM io.grpc.examples.KvRunner runClient
INFO: Did 24.067 RPCs/s

real	1m0.886s
user	0m9.340s
sys	0m1.660s

之后

Apr 16, 2018 10:36:48 AM io.grpc.examples.KvRunner runClient
INFO: Did 2,449.8 RPCs/s

real	1m0.968s
user	0m52.184s
sys	0m20.692s

哇!从每秒 24 个 RPC 增加到每秒 2400 个 RPC。而且我们不必改变我们的 API 或客户端。这就是理解代码和 API 语义的重要性。通过利用键值 API 的属性,即对不同键的操作的独立性,代码现在快得多。

这段代码一个值得注意的现象是结果中的 user 时间。之前用户时间只有 9 秒,这意味着在代码运行的 60 秒中,CPU 只活跃了 9 秒。之后,使用量增加了 5 倍多,达到 52 秒。原因是更多的 CPU 核心处于活动状态。KvServer 通过休眠几毫秒来模拟工作。在实际应用中,它会做有用的工作,而不会有如此巨大的变化。它不是根据 RPC 的数量来扩展,而是根据核心的数量来扩展。因此,如果你的机器有 12 个核心,你就会看到 12 倍的改进。但这仍然不错!

更多错误

如果你自己运行这段代码,你会看到更多以下形式的日志垃圾信息

Apr 16, 2018 10:38:40 AM io.grpc.examples.KvClient$3 onFailure
INFO: Key not found
io.grpc.StatusRuntimeException: NOT_FOUND

原因是新版本的代码使得 API 级别的竞态条件更加明显。随着 RPC 数量增加 100 倍,更新和删除相互冲突的可能性更大。为了解决这个问题,我们需要修改 API 定义。请继续关注下一篇文章,我们将展示如何解决此问题。

结论

有很多机会可以优化你的 gRPC 代码。为了利用这些机会,你需要了解你的代码在做什么。这篇文章展示了如何将基于锁的服务转换为低竞争、无锁服务。在更改前后务必进行测量。