基础教程
Go 中 gRPC 的基础教程介绍。
基础教程
本教程为 Go 程序员提供了使用 gRPC 的基础入门。
通过本示例,您将学习如何
- 在
.proto
文件中定义服务。 - 使用协议缓冲区编译器生成服务器和客户端代码。
- 使用 Go gRPC API 为您的服务编写一个简单的客户端和服务器。
本教程假设您已阅读 gRPC 简介 并且熟悉 协议缓冲区。请注意,本教程中的示例使用协议缓冲区语言的 proto3 版本:您可以在 proto3 语言指南 和 Go 生成的代码指南 中找到更多信息。
为什么要使用 gRPC?
我们的示例是一个简单的路线映射应用程序,它允许客户端获取有关其路线上要素的信息,创建其路线的摘要,并与服务器和其他客户端交换路线信息(如交通更新)。
使用 gRPC,我们可以在 .proto
文件中定义一次服务,并生成任何 gRPC 支持的语言的客户端和服务器,这些客户端和服务器又可以在从大型数据中心内部的服务器到您自己的平板电脑的环境中运行——gRPC 为您处理不同语言和环境之间通信的所有复杂性。我们还获得了使用协议缓冲区的所有优点,包括高效的序列化、简单的 IDL 和易于更新的接口。
设置
您应该已经安装了生成客户端和服务器接口代码所需的工具——如果您还没有安装,请参阅 先决条件 部分的 快速入门 以获取设置说明。
获取示例代码
示例代码是 grpc-go 存储库的一部分。
将存储库下载为 zip 文件 并解压缩,或克隆存储库
git clone -b v1.69.2 --depth 1 https://github.com/grpc/grpc-go
切换到示例目录
cd grpc-go/examples/route_guide
定义服务
我们的第一步(正如您从 gRPC 简介 中了解到的)是使用 协议缓冲区定义 gRPC 服务 以及方法请求和响应类型。有关完整的 .proto
文件,请参阅 routeguide/route_guide.proto。
要定义服务,您需要在 .proto
文件中指定一个名为 service
的内容
service RouteGuide {
...
}
然后,您在服务定义中定义 rpc
方法,指定其请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide
服务中使用
一个简单 RPC,客户端使用存根向服务器发送请求,并等待返回响应,就像普通的函数调用一样。
// Obtains the feature at a given position. rpc GetFeature(Point) returns (Feature) {}
一个服务器端流式 RPC,客户端向服务器发送请求,并获取一个流来读取一系列消息。客户端从返回的流中读取,直到没有更多消息为止。正如您在我们的示例中看到的,您可以通过在响应类型之前放置
stream
关键字来指定服务器端流式方法。// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}
客户端流式 RPC,客户端写入一系列消息并使用提供的流将其发送到服务器。一旦客户端完成消息写入,它会等待服务器读取所有消息并返回响应。您可以通过在请求类型之前放置
stream
关键字来指定客户端流式方法。// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}
双向流式 RPC,双方都使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按任意顺序读取和写入:例如,服务器可以等待接收所有客户端消息后再写入其响应,或者它可以交替读取一条消息然后写入一条消息,或其他一些读取和写入的组合。每个流中消息的顺序都会被保留。您可以通过在请求和响应之前都放置
stream
关键字来指定这种类型的方法。// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
我们的 .proto
文件还包含我们服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义 - 例如,这里是 Point
消息类型
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
生成客户端和服务器代码
接下来,我们需要从我们的 .proto
服务定义生成 gRPC 客户端和服务器接口。我们使用协议缓冲区编译器 protoc
和一个特殊的 gRPC Go 插件来完成此操作。这与我们在快速入门中所做的类似。
从 examples/route_guide
目录中,运行以下命令
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
routeguide/route_guide.proto
运行此命令会在 routeguide 目录中生成以下文件
route_guide.pb.go
,其中包含所有用于填充、序列化和检索请求和响应消息类型的协议缓冲区代码。route_guide_grpc.pb.go
,其中包含以下内容- 一个接口类型(或存根),供客户端调用,其中包含在
RouteGuide
服务中定义的方法。 - 一个接口类型,供服务器实现,也包含在
RouteGuide
服务中定义的方法。
- 一个接口类型(或存根),供客户端调用,其中包含在
创建服务器
首先,让我们看看如何创建 RouteGuide
服务器。如果您只对创建 gRPC 客户端感兴趣,您可以跳过此部分,直接转到创建客户端(尽管您可能会觉得它很有趣!)。
要使我们的 RouteGuide
服务正常工作,需要完成两个部分
- 实现从我们的服务定义生成的服务接口:执行我们服务的实际“工作”。
- 运行 gRPC 服务器以侦听来自客户端的请求,并将它们分派到正确的服务实现。
您可以在 server/server.go 中找到我们的示例 RouteGuide
服务器。让我们仔细看看它是如何工作的。
实现 RouteGuide
如您所见,我们的服务器有一个 routeGuideServer
结构体类型,它实现了生成的 RouteGuideServer
接口
type routeGuideServer struct {
...
}
...
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
...
}
...
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
...
}
...
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
...
}
...
简单 RPC
routeGuideServer
实现了我们所有的服务方法。让我们先来看最简单的类型 GetFeature
,它只是从客户端获取一个 Point
,并从其数据库中返回 Feature
中的相应功能信息。
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
return feature, nil
}
}
// No feature was found, return an unnamed feature
return &pb.Feature{Location: point}, nil
}
该方法会传入一个用于 RPC 的上下文对象和客户端的 Point
协议缓冲区请求。它返回一个包含响应信息的 Feature
协议缓冲区对象和一个 error
。在该方法中,我们使用适当的信息填充 Feature
,然后 return
它和一个 nil
错误,以告知 gRPC 我们已经完成处理 RPC,并且可以将 Feature
返回给客户端。
服务器端流式 RPC
现在让我们看一下我们的一个流式 RPC。ListFeatures
是一个服务器端流式 RPC,因此我们需要向客户端发送多个 Feature
。
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
如您所见,这次在我们的方法参数中,我们没有获得简单的请求和响应对象,而是获得了一个请求对象(客户端希望在其中查找 Feature
的 Rectangle
)和一个特殊的 RouteGuide_ListFeaturesServer
对象来写入我们的响应。
在该方法中,我们填充尽可能多的 Feature
对象,并将它们使用其 Send()
方法写入 RouteGuide_ListFeaturesServer
。最后,就像在我们的简单 RPC 中一样,我们返回一个 nil
错误,以告知 gRPC 我们已经完成写入响应。如果此调用中发生任何错误,我们将返回一个非 nil
错误;gRPC 层会将其转换为适当的 RPC 状态以在网络上传输。
客户端流式 RPC
现在让我们看一下更复杂的内容:客户端流式方法 RecordRoute
,我们从客户端获取一个 Point
流,并返回一个包含其行程信息的 RouteSummary
。如您所见,这次该方法根本没有请求参数。相反,它获得了一个 RouteGuide_RecordRouteServer
流,服务器可以使用该流来读取和写入消息 - 它可以使用其 Recv()
方法接收客户端消息,并使用其 SendAndClose()
方法返回其单个响应。
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
在该方法体中,我们使用 RouteGuide_RecordRouteServer
的 Recv()
方法重复读取客户端的请求到请求对象(在本例中为 Point
),直到没有更多消息:服务器需要在每次调用后检查从 Recv()
返回的错误。如果此错误为 nil
,则流仍然正常,并且可以继续读取;如果为 io.EOF
,则消息流已结束,服务器可以返回其 RouteSummary
。如果它具有任何其他值,我们将按原样返回错误,以便它将被 gRPC 层转换为 RPC 状态。
双向流式 RPC
最后,让我们看一下我们的双向流式 RPC RouteChat()
。
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
... // look for notes to be sent to client
for _, note := range s.routeNotes[key] {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
这次我们获得了一个 RouteGuide_RouteChatServer
流,就像在我们的客户端流式示例中一样,它可以用于读取和写入消息。但是,这次我们通过我们方法的流返回值,而客户端仍在向他们的消息流写入消息。
此处读取和写入的语法与我们的客户端流式方法非常相似,只是服务器使用流的 Send()
方法而不是 SendAndClose()
,因为它要写入多个响应。尽管每一方总是按写入顺序获取另一方的消息,但客户端和服务器都可以按任何顺序读取和写入 — 流完全独立运行。
启动服务器
一旦我们实现了所有方法,我们还需要启动一个 gRPC 服务器,以便客户端可以实际使用我们的服务。以下代码片段显示了我们如何为我们的 RouteGuide
服务执行此操作
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
...
grpcServer := grpc.NewServer(opts...)
pb.RegisterRouteGuideServer(grpcServer, newServer())
grpcServer.Serve(lis)
要构建和启动服务器,我们
- 使用指定我们希望用于侦听客户端请求的端口
lis, err := net.Listen(...)
. - 使用
grpc.NewServer(...)
创建 gRPC 服务器的实例。 - 向 gRPC 服务器注册我们的服务实现。
- 使用我们的端口详细信息在服务器上调用
Serve()
,以执行阻塞等待,直到进程被终止或调用Stop()
。
创建客户端
在本节中,我们将介绍如何为我们的 RouteGuide
服务创建一个 Go 客户端。您可以在 grpc-go/examples/route_guide/client/client.go 中看到我们完整的示例客户端代码。
创建存根
要调用服务方法,我们首先需要创建一个 gRPC 通道来与服务器通信。我们通过将服务器地址和端口号传递给 grpc.NewClient()
来创建它,如下所示
var opts []grpc.DialOption
...
conn, err := grpc.NewClient(*serverAddr, opts...)
if err != nil {
...
}
defer conn.Close()
当服务需要身份验证凭据时,您可以使用 DialOptions
在 grpc.NewClient
中设置身份验证凭据(例如,TLS、GCE 凭据或 JWT 凭据)。RouteGuide
服务不需要任何凭据。
一旦 gRPC 通道设置完毕,我们需要一个客户端存根来执行 RPC。我们使用示例 .proto
文件生成的 pb
包提供的 NewRouteGuideClient
方法来获取它。
client := pb.NewRouteGuideClient(conn)
调用服务方法
现在让我们看看如何调用我们的服务方法。请注意,在 gRPC-Go 中,RPC 以阻塞/同步模式运行,这意味着 RPC 调用会等待服务器响应,并返回响应或错误。
简单 RPC
调用简单的 RPC GetFeature
几乎和调用本地方法一样直接。
feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
...
}
如您所见,我们在之前获取的存根上调用该方法。在我们的方法参数中,我们创建并填充一个请求协议缓冲区对象(在我们的例子中是 Point
)。我们还传递一个 context.Context
对象,该对象允许我们在必要时更改 RPC 的行为,例如超时/取消正在进行的 RPC。如果调用没有返回错误,那么我们可以从第一个返回值中读取服务器的响应信息。
log.Println(feature)
服务器端流式 RPC
这里是我们调用服务器端流式方法 ListFeatures
的地方,它返回地理 Feature
的流。如果您已经阅读过创建服务器,那么其中一些内容可能看起来很熟悉 - 流式 RPC 在两侧的实现方式类似。
rect := &pb.Rectangle{ ... } // initialize a pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
...
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
与简单的 RPC 中一样,我们向方法传递一个上下文和一个请求。然而,我们不是返回一个响应对象,而是返回一个 RouteGuide_ListFeaturesClient
的实例。客户端可以使用 RouteGuide_ListFeaturesClient
流来读取服务器的响应。
我们使用 RouteGuide_ListFeaturesClient
的 Recv()
方法重复读取服务器对响应协议缓冲区对象(在本例中为 Feature
)的响应,直到没有更多消息为止:客户端需要在每次调用后检查从 Recv()
返回的错误 err
。如果 nil
,则流仍然正常,可以继续读取;如果为 io.EOF
,则消息流已结束;否则必须存在 RPC 错误,该错误通过 err
传递。
客户端流式 RPC
客户端流式方法 RecordRoute
与服务器端方法类似,只是我们只向方法传递一个上下文并返回一个 RouteGuide_RecordRouteClient
流,我们可以使用该流来写入和读取消息。
// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
if err := stream.Send(point); err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
RouteGuide_RecordRouteClient
有一个 Send()
方法,我们可以使用该方法向服务器发送请求。一旦我们使用 Send()
完成向流写入客户端的请求后,我们需要在流上调用 CloseAndRecv()
,以告知 gRPC 我们已完成写入并期望收到响应。我们从 CloseAndRecv()
返回的 err
中获取 RPC 状态。如果状态为 nil
,那么 CloseAndRecv()
的第一个返回值将是有效的服务器响应。
双向流式 RPC
最后,让我们看看我们的双向流式 RPC RouteChat()
。与 RecordRoute
的情况一样,我们只向方法传递一个上下文对象,并返回一个可以用来写入和读取消息的流。然而,这次我们通过我们方法的流返回值,而服务器仍在向他们的消息流写入消息。
stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
此处读取和写入的语法与我们的客户端流式方法非常相似,只是我们在完成调用后使用流的 CloseSend()
方法。尽管每一方总是按照它们写入的顺序获取对方的消息,但客户端和服务器都可以按任意顺序读取和写入 — 流完全独立运行。
试一试!
从 examples/route_guide
目录执行以下命令
运行服务器
go run server/server.go
从另一个终端,运行客户端
go run client/client.go
您将看到如下输出
Getting feature for point (409146138, -746188906)
name:"Berkshire Valley Management Area Trail, Jefferson, NJ, USA" location:<latitude:409146138 longitude:-746188906 >
Getting feature for point (0, 0)
location:<>
Looking for features within lo:<latitude:400000000 longitude:-750000000 > hi:<latitude:420000000 longitude:-730000000 >
name:"Patriots Path, Mendham, NJ 07945, USA" location:<latitude:407838351 longitude:-746143763 >
...
name:"3 Hasta Way, Newton, NJ 07860, USA" location:<latitude:410248224 longitude:-747127767 >
Traversing 56 points.
Route summary: point_count:56 distance:497013163
Got message First message at point(0, 1)
Got message Second message at point(0, 2)
Got message Third message at point(0, 3)
Got message First message at point(0, 1)
Got message Fourth message at point(0, 1)
Got message Second message at point(0, 2)
Got message Fifth message at point(0, 2)
Got message Third message at point(0, 3)
Got message Sixth message at point(0, 3)