gRPC基础入门

Keywords: #技术 #Golang #gRPC
Release Date: 2025-02-25
Table of Contents

gRPC基础入门知识点笔记

什么是 RPC

RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务。

对应 RPC 的是本地过程调用,函数调用是最常见的本地过程调用。

本地过程调用变成远程过程调用也会面临各种问题。这些问题正是 RPC 和分布式服务需要解决的问题。

对于本地过程调用,就比如下方 Python 的一个函数。

def add(a, b):
    total = a + b
    return total

total = add(1, 2)
print(total)

函数调用的过程:

  1. 将 1 和 2 压入 add 函数的栈中。
  2. 进入 add 函数,从栈中去除 1 和 2 分别赋值给 a 和 b。
  3. 执行 a + b,将结果赋值给局部的 total,并压栈。
  4. 将栈中的值取出来,赋值给全局的 total。

如果将以上本地函数调用过程,转化成远程过程调用,就远远没有这么简单了,会复杂很多。

在远程调用时,我们需要执行的函数体是在远程的机器上的,也就是说,add 是在另一个进程中执行的。这就带来了几个新问题:

  1. Call ID 映射

我们怎么告诉远程机器,我们要调用 add,而不是 sub 或者 Foo 呢?

在本地调用中,函数体是直接通过函数指针来指定的,我们调用 add,编译器就自动帮我们调用它相应的函数指针。

但是在远程调用中,函数指针是不行的,因为两个进程的地址空间是完全不一样的。所以,在 RPC 中,所有的函数都必须有自己的一个 ID。这个 ID 在所有进程中都是唯一确定的。

客户端在做远程过程调用时,必须附上这个 ID。然后我们还需要在客户端和服务端分别维护一个 {函数 <–> Call ID} 的对应表。两者的表不一定需要完全相同,但相同的函数对应的 Call ID 必须相同。

当客户端需要进行远程调用时,它就查一下这个表,找出相应的 Call ID,然后把它传给服务端,服务端也通过查表,来确定客户端需要调用的函数,然后执行相应函数的代码。

  1. 序列化和反序列化

客户端怎么把参数值传给远程的函数呢?

在本地调用中,我们只需要把参数压到栈里,然后让函数自己去栈里读就行。

但是在远程过程调用时,客户端跟服务端是不同的进程,不能通过内存来传递参数。甚至有时候客户端和服务端使用的都不是同一种语言(比如服务端用 C++,客户端用 Java 或者 Python)。

这时候就需要客户端把参数先转成一个字节流,传给服务端后,再把字节流转成自己能读取的格式。这个过程叫序列化反序列化。同理,从服务端返回的值也需要序列化反序列化的过程。

  1. 网络传输

远程调用往往用在网络上,客户端和服务端是通过网络连接的。所有的数据都需要通过网络传输,因此就需要有一个网络传输层。

网络传输层需要把 Call ID 和序列化后的参数字节流传给服务端,然后再把序列化后的调用结果传回客户端。只要能完成这两者的,都可以作为传输层使用。

因此,它所使用的协议其实是不限的,能完成传输就行。尽管大部分 RPC 框架都使用TCP 协议,但其实UDP也可以,而gRPC干脆就用了HTTP 2(可以保持长连接,性能更好,更好的选择),Java 的 Netty 也属于这层的东西。也可以基于 UDP/TCP 协议自行封装网络传输协议。

image.png

解决了上面三个机制,就能实现 RPC 了。

总的来说,RPC 最重要的两个点就是:网络传输协议数据编码协议


举个例子,我们有一个电商系统,有一段扣减库存的逻辑,但是库存服务是一个独立的系统(运行在另一个远程服务器中),那么需要进行扣减库存操作就需要远程调用库存服务中的扣减函数,这样就一定会牵扯到网络传输方面的操作,因此需要做成一个 Web 服务(gin、beego、net/httpserver)

这个函数的调用参数就需要用序列化的字节流进行传递,比如常见的 json 格式,当然也有很多其他的协议规范:xml、protobuf、msgpack 等等。

序列化和反序列化是可以选择的,不一定要采用 json、xml、protobuf、msgpack,一些大型公司甚至会采用自己设计实现的传输协议。

需要注意的是,json 协议虽然十分简单,但是它并不是一个高效的协议,因此对于微服务等大型项目中,它不是首选协议。

总结来说,就是将函数调用的过程分布在两台服务器上,两台服务器需要先建立网络连接。调用方将所需要的函数 ID 和函数参数通过某一个协议序列化转递给执行方,执行方接收到序列化的数据流,先进行反序列化,再根据具体的要求执行函数,最后将执行后的结果同样通过序列化数据流传递回调用方。

调用方也可以称为是 client(客户端),执行方也可以称为 server(服务端)。、

客户端:

  1. 建立连接 tcp/http
  2. 将 employee 对象序列化成 json 字符串 - 序列化
  3. 发送 json 字符串 - 调用成功后实际上你接收到的是一个二进制的数据
  4. 等待服务器发送结果
  5. 将服务器返回的数据解析成 PrintResult 对象 - 反序列化

服务端:

  1. 监听网络端口 80
  2. 读取数据 - 二进制的 json 数据
  3. 对数据进行反序列化 Employee 对象
  4. 开始处理业务逻辑
  5. 将处理的结果 PrintResult 序列化成 json 二进制数据 - 序列化
  6. 将数据返回

以下是客户端和服务端具体解决的问题。

client 端解决的问题:

  1. 将这个调用映射为 Call ID。这里假设用最简单的字符串当 Call ID 的方法
  2. 将 Call ID,a 和 b 序列化。可以直接将它们的值以二进制形式打包
  3. 把 2 中得到的数据包发送给 ServerAddr,这需要使用网络传输层
  4. 等待服务器返回结果
  5. 如果服务器调用成功,那么就将结果反序列化,并赋给 total

server 端解决的问题

  1. 在本地维护一个 Call ID 到函数指针的映射 call_id_map,可以用 dict 完成
  2. 等待请求,包括多线程的并发处理能力
  3. 得到一个请求后,将其数据包反序列化,得到 Call ID
  4. 通过在 call_id_map 中查找,得到相应的函数指针
  5. 将 a 和 rb 反序列化后,在本地调用 add 函数,得到结果
  6. 将结果序列化后通过网络返回给 Client

所以要实现一个 RPC 框架,其实只需要按以上流程实现就基本完成了。

其中:

  • Call ID 映射可以直接使用函数字符串,也可以使用整数 ID。映射表一般就是一个哈希表。
  • 序列化反序列化可以自己写,也可以使用 Protobuf 或者 FlatBuffers 之类的。
  • 网络传输库可以自己写 socket,或者用 asio,ZeroMQ,Netty 之类。

实际上真正的开发过程中,除了上面的基本功能以外还需要更多的细节:网络错误、流量控制、超时和重试等。

实现简单的 RPC

使用 Go 语言内置的 net/http 包来实现简单的 RPC 请求和返回(客户端和服务端)。

这里我们设计客户端通过 URL 来指定需要请求的函数 ID,通过 get 参数来指定传递的函数参数。当然也可以是通过 get 参数传递需要请求的函数 ID,通过 post 来传递函数的参数。

网络传输协议为 HTTP(底层是 TCP),数据编码协议为 json,数据传输协议为 URL 参数传递协议。

这里只使用最简单的方式进行演示。后续将直接使用 gRPC 包来进行 RPC 的编写。

服务端的代码:

func main() {
    // 请求格式: http://127.0.0.1:8000/add?a=1&b=2
    // 返回格式: json {"code":200,"data":3}
    // 1. CallID: r.URL.Path
    // 2. 网络传输协议: http
    // 3. 数据传输格式: URL
    http.HandleFunc("/add", func(w http.ResponseWriter, r *http.Request) {
        _ = r.ParseForm()
        fmt.Println(r.URL.Path)
        a, _ := strconv.Atoi(r.Form.Get("a"))
        b, _ := strconv.Atoi(r.Form.Get("b"))
        w.Header().Set("Content-Type", "application/json")
        jsonData, _ := json.Marshal(map[string]int{
            "code": 200,
            "data": a + b,
        })
        _, _ = w.Write(jsonData)
    })
    _ = http.ListenAndServe(":8000", nil)
}

客户端请求的方式,可以是直接通过浏览器输入 http://127.0.0.1:8000/add?a=1&b=2 进行 get 请求,返回结果会直接在浏览器中显示。

image.png

同时也可以写一个客户端的程序,通过 Go 语言的 net/http 包完成 get 请求,当然使用第三方包 github/kirinlabs/HttpRequest 对于这种简单的请求会更加得方便。

func main() {
    resp, err := http.Get("http://127.0.0.1:8000/add?a=1&b=2")
    if err != nil {
        fmt.Printf("请求失败: %v\n", err)
        return
    }
    defer resp.Body.Close()
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        fmt.Printf("读取响应失败: %v\n", err)
        return
    }
    var result map[string]int
    if err := json.Unmarshal(body, &result); err != nil {
        fmt.Printf("JSON 解析失败: %v\n", err)
        return
    }
    fmt.Println(result)
}

但是这样的写法,并不好,我们需要将远程函数调用转化成本地函数调用的形式,这样会更加方便复用。比如以下的写法,更加优秀。

type respData struct {
    Data int `json:"data"`
}

func rpcAdd(a int, b int) int {
    resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:8000/add?a=%d&b=%d", a, b))
    if err != nil {
        fmt.Printf("请求失败: %v\n", err)
        return -1
    }
    defer resp.Body.Close()
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        fmt.Printf("读取响应失败: %v\n", err)
        return -1
    }
    data := respData{}
    _ = json.Unmarshal(body, &data)
    return data.Data
}

func main() {
    c := rpcAdd(1, 2)
    fmt.Println(c)
}

将网络请求和解析的逻辑封装在 rpcAdd 函数中,当主程序需要调用这个远程的函数时,只需要像本地函数一样,将参数通过函数形参传递给 rpcAdd 函数,就可以完成远程的函数调用。同时网络连接部分也可以进行进一步的封装。这也是 RPC 的精髓所在。

当然,以上的示例程序,只是一个非常简单的模拟代码,目的在于学习 RPC 基础的服务端和客户端调用逻辑,距离真正可用的 RPC 框架还有着很大的一段距离,实际工程的 RPC 是不可能这样编写的,还需要考虑性能、网络连接、数据传输等等问题,这里只是为了演示网络传输的流程。

RPC 与 REST 对比

  • RPC 与 REST 的区别是什么?

你一定会觉得这个问题很奇怪,是的,包括我。但是你在网络上一搜,会发现类似对比的文章比比皆是,我在想可能很多初学者由于基础不牢固,才会将不相干的二者拿出来对比吧。既然是这样,那为了让你更加了解陌生的 RPC,就从你熟悉得不能再熟悉的 REST 入手吧。

REST,是 Representational State Transfer 的简写,中文描述表述性状态传递(是指某个瞬间状态的资源数据的快照,包括资源数据的内容、表述格式 (XML、JSON) 等信息。)
REST 是一种软件架构风格。这种风格的典型应用,就是 HTTP。其因为简单、扩展性强的特点而广受开发者的青睐。

而 RPC 呢,是 Remote Procedure Call Protocol 的简写,中文描述是远程过程调用,它可以实现客户端像调用本地服务 (方法) 一样调用服务器的服务 (方法)。
而 RPC 可以基于 TCP/UDP,也可以基于 HTTP 协议进行传输的,按理说它和 REST 不是一个层面意义上的东西,不应该放在一起讨论,但是谁让 REST 这么流行呢,它是目前最流行的一套互联网应用程序的 API 设计标准,某种意义下,我们说 REST 可以其实就是指代 HTTP 协议。

  1. 使用方式不同

从使用上来看,HTTP 接口只关注服务提供方,对于客户端怎么调用并不关心。 接口只要保证有客户端调用时,返回对应的数据就行了。而 RPC 则要求客户端接口保持和服务端的一致。

REST 是服务端把方法写好,客户端并不知道具体方法。客户端只想获取资源,所以发起 HTTP 请求,而服务端接收到请求后根据 URI 经过一系列的路由才定位到方法上面去 RPC 是服务端提供好方法给客户端调用,客户端需要知道服务端的具体类,具体方法,然后像调用本地方法一样直接调用它。

  1. 面向对象不同

从设计上来看,RPC,所谓的远程过程调用,是面向方法的。REST,所谓的 Representational state transfer ,是面向资源的。

除此之外,还有一种叫做 SOA,所谓的面向服务的架构,它是面向消息的,这个接触不多,就不多说了。

  1. 序列化协议不同

接口调用通常包含两个部分,序列化通信协议

通信协议,上面已经提及了,REST 是基于 HTTP 协议,而 RPC 可以基于 TCP/UDP,也可以基于 HTTP 协议进行传输的。

常见的序列化协议,有:json、xml、hession、protobuf、thrift、text、bytes 等,REST 通常使用的是 JSON 或者 XML,而 RPC 使用的是 JSON-RPC,或者 XML-RPC。

通过以上几点,我们知道了 REST 和 RPC 之间有很明显的差异。

  • 为什么要采用 RPC 呢?

那到底为何要使用 RPC,单纯的依靠 RESTful API 不可以吗?为什么要搞这么多复杂的协议,渣渣表示真的学不过来了。

关于这一点,以下几点仅是我的个人猜想,仅供交流:

RPC 和 REST 两者的定位不同,REST 面向资源,更注重接口的规范,因为要保证通用性更强,所以对外最好通过 REST。而 RPC 面向方法,主要用于函数方法的调用,可以适合更复杂通信需求的场景。RESTful API 客户端与服务端之间采用的是同步机制,当发送 HTTP 请求时,客户端需要等待服务端的响应。当然对于这一点是可以通过一些技术来实现异步的机制的。采用 RESTful API,客户端与服务端之间虽然可以独立开发,但还是存在耦合。比如,客户端在发送请求的时,必须知道服务器的地址,且必须保证服务器正常工作。而 rpc + ralbbimq 中间件可以实现低耦合的分布式集群架构。

说了这么多,我们该如何选择这两者呢?我总结了如下两点,供你参考:

REST 接口更加规范,通用适配性要求高,建议对外的接口都统一成 REST。而组件内部的各个模块,可以选择 RPC,一个是不用耗费太多精力去开发和维护多套的 HTTP 接口,一个 RPC 的调用性能更高(见下条)从性能角度看,由于 HTTP 本身提供了丰富的状态功能与扩展功能,但也正由于 HTTP 提供的功能过多,导致在网络传输时,需要携带的信息更多,从性能角度上讲,较为低效。而 RPC 服务网络传输上仅传输与业务内容相关的数据,传输数据更小,性能更高。

  • 为什么一定要 rpc,不能只学 http 协议和 restful 协议吗?
  1. rpc 可以基于 tcp 直接开发自己的协议,这个是可以保持长连接的,tcp 的传输效率高,并且可以一直维持链接
  2. 自定义协议可以优化数据的传输

如果我们只是开发 web 网站或者一些服务的使用者,那么我们用 restful 看起来已经足够了,但是 rpc 的这种模式在大量的服务中都有,比如 redis 协议, rabbitmq 的 AMQP 协议,聊天软件的协议,也就是说我们想要开发一个 redis 的客户端,我们只需要用我们喜欢的语言实现 redis 定义的协议就行了,这对于开发服务来说非常有用,一般这种协议的价值在于我们自己开发的服务之间需要通信的时候 - 那你会问了,自己开发的组件之间协作,直接调用函数不就行了吗?

对了,有些人已经反映过来了:分布式系统,分布式系统中非常常用,比如 openstack 中。还有就是微服务!

所以掌握 rpc 开发,对于进阶和分布式开发就变得非常重要。

http 协议 1. x 一般情况下一个来回就关闭连接,虽然提供了 keep-alive 可以保持长连接,但是依然不方便,所以就出现了 http 2.0, http 2.0 基本上可以当做 tcp 协议使用了。所以后面讲解到的 grpc 就会使用 http 2.0 开发

RPC 架构技术要点

RPC 技术在架构设计上有四部分组成,分别是:客户端、客户端存根、服务端、服务端存根。

  • 客户端 (Client):服务调用发起方,也称为服务消费者。
  • 客户端存根 (Client Stub):该程序运行在客户端所在的计算机机器上,主要用来存储要调用的服务器的地址,另外,该程序还负责将客户端请求远端服务器程序的数据信息打包成数据包,通过网络发送给服务端 Stub 程序;其次,还要接收服务端 Stub 程序发送的调用结果数据包,并解析返回给客户端。
  • 服务端 (Server):远端的计算机机器上运行的程序,其中有客户端要调用的方法。
  • 服务端存根 (Server Stub):接收客户 Stub 程序通过网络发送的请求消息数据包,并调用服务端中真正的程序功能方法,完成功能调用;其次,将服务端执行调用的结果进行数据处理打包发送给客户端 Stub 程序。

了解完了 RPC 技术的组成结构我们来看一下具体是如何实现客户端到服务端的调用的。实际上,如果我们想要在网络中的任意两台计算机上实现远程调用过程,要解决很多问题,比如:

  • 两台物理机器在网络中要建立稳定可靠的通信连接。
  • 两台服务器的通信协议的定义问题,即两台服务器上的程序如何识别对方的请求和返回结果。也就是说两台计算机必须都能够识别对方发来的信息,并且能够识别出其中的请求含义和返回含义,然后才能进行处理。这其实就是通信协议所要完成的工作。

image.png

在上述图中,通过 1-10 的步骤图解的形式,说明了 RPC 每一步的调用过程。具体描述为:

  • 1、客户端想要发起一个远程过程调用,首先通过调用本地客户端 Stub 程序的方式调用想要使用的功能方法名;
  • 2、客户端 Stub 程序接收到了客户端的功能调用请求,将客户端请求调用的方法名,携带的参数等信息做序列化操作,并打包成数据包。
  • 3、客户端 Stub 查找到远程服务器程序的 IP 地址,调用 Socket 通信协议,通过网络发送给服务端。
  • 4、服务端 Stub 程序接收到客户端发送的数据包信息,并通过约定好的协议将数据进行反序列化,得到请求的方法名和请求参数等信息。
  • 5、服务端 Stub 程序准备相关数据,调用本地 Server 对应的功能方法进行,并传入相应的参数,进行业务处理。
  • 6、服务端程序根据已有业务逻辑执行调用过程,待业务执行结束,将执行结果返回给服务端 Stub 程序。
  • 7、服务端 Stub 程序将程序调用结果按照约定的协议进行序列化,并通过网络发送回客户端 Stub 程序。
  • 8、客户端 Stub 程序接收到服务端 Stub 发送的返回数据,对数据进行反序列化操作,并将调用返回的数据传递给客户端请求发起者。
  • 9、客户端请求发起者得到调用结果,整个 RPC 调用过程结束。

以下是 rpc 需要使用到的术语

通过上文一系列的文字描述和讲解,我们已经了解了 RPC 的由来和 RPC 整个调用过程。我们可以看到 RPC 是一系列操作的集合,其中涉及到很多对数据的操作,以及网络通信。因此,我们对 RPC 中涉及到的技术做一个总结和分析:

  • 1、动态代理技术: 上文中我们提到的 Client Stub 和 Sever Stub 程序,在具体的编码和开发实践过程中,都是使用动态代理技术自动生成的一段程序。
  • 2、序列化和反序列化: 在 RPC 调用的过程中,我们可以看到数据需要在一台机器上传输到另外一台机器上。在互联网上,所有的数据都是以字节的形式进行传输的。而我们在编程的过程中,往往都是使用数据对象,因此想要在网络上将数据对象和相关变量进行传输,就需要对数据对象做序列化和反序列化的操作。
    • 序列化:把对象转换为字节序列的过程称为对象的序列化,也就是编码的过程。
    • 反序列化:把字节序列恢复为对象的过程称为对象的反序列化,也就是解码的过程。

我们常见的 Json,XML 等相关框架都可以对数据做序列化和反序列化编解码操作。后面我们要学习的 Protobuf 协议,这也是一种数据编解码的协议,在 RPC 框架中使用的更广泛。

net/rpc 快速上手

Go 语言是有一个内置的 RPC 包:net/rpc,它虽然功能简单但是十分灵活。

以下是一个简单的服务端程序例子:

type HelloService struct{}

// HelloService的结构体方法
func (s *HelloService) Hello(request string, reply *string) error {
    //返回值是通过修改reply的值
    *reply = "hello, " + request
    return nil
}

func main() {
    //1. 实例化一个server
    listener, _ := net.Listen("tcp", ":1234")
    //2. 注册处理逻辑 handler
    _ = rpc.RegisterName("HelloService", &HelloService{})
    //3. 启动服务
    conn, _ := listener.Accept() // 只处理一次逻辑
    rpc.ServeConn(conn)
}

这里将处理的函数方法封装成一个结构体方法,这是为了方便后续建立更多同名方法,通过结构体可以区分不同的同名方法,起到限制作用域的功能。

对于客户端程序也是十分简单:

func main() {
    //建立连接
    client, err := rpc.Dial("tcp", "localhost:1234")
    if err != nil {
        panic("连接失败")
    }
    // var reply *string = new(string) 也是可以的,&可以去掉
    var reply string //string有默认值
    err = client.Call("HelloService.Hello", "bobby", &reply)
    if err != nil {
        panic("调用失败")
    }
    fmt.Println(reply)
}

观察以上两个服务端和客户端的程序代码,我们可以发现一连串的代码大部分都是 net 包,比如 net.Listenlistener.Accept()net.Dial ,好像和 rpc 没有关系,那么我们能不能只使用 net 包呢?

当然不行。RPC 调用中有几个重要的问题需要解决:

  1. Call ID
  2. 序列化和反序列化(编码和解码)

而这几个重要的问题,则是通过 net/rpc 内部自动完成了,所以一定还需要一个 rpc 框架,而不能只是使用网络传输的框架。

同时,以上的代码还有一个问题,就是客户端的函数调用并没有封装为本地函数调用的形式。在后续客户端有大量方法需要调用的时候,写起来就十分麻烦,没有一个统一的接口来处理逻辑。

而在上面的客户端代码中,我们始终是通过 client.Call 这个接口来调用所有的远程函数方法,这样的代码是不好的。我们希望调用方式改为:

client.Hello("bobby", &reply)

而不是上述的:

client.Call("HelloService.Hello", "bobby", &reply)

前面部分我们讲过,RPC 的精髓就是将远程函数调用封装为像本地函数调用一样方便,没有割裂感。因此,后续我们还需要对上面的代码调用进行一定程度的封装,使 RPC 调用更加方便。

序列化协议改为 json

这里还有一个问题值得思考:既然上述的 RPC 程序主要都是网络请求,那么是否可以跨语言调用呢?本质上可以跨语言

这里就需要注意两点:

  1. Go 语言的 RPC 的序列化协议是什么?Gob 协议
  2. 能否将 Go 语言的 Gob 协议替换成常见的序列化协议?可以的

实际上,将 Go 语言内置的 RPC 框架的序列化协议改成常见的 json 格式十分简单,只需要将上方服务端中的:

rpc.ServeConn(conn)

更改为:

rpc.ServeCodec(jsonrpc.NewServerCodec(conn))

再将客户端的代码更改为:

func main() {
    conn, err := net.Dial("tcp", "localhost:1234")
    if err != nil {
        panic("连接失败")
    }
    var reply string //string有默认值
    client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
    err = client.Call("HelloService.Hello", "bobby", &reply)
    if err != nil {
        panic("调用失败")
    }
    fmt.Println(reply)
}

即改为使用 jsonrpc 包的功能。

以上的服务端依旧只能处理一个请求,就会自动结束,可以加上一个死循环让它能够不断接收请求。同时为了让多个请求到达时并发处理,可以将 RPC 调用改成 goruntine。

func main() {
    listener, _ := net.Listen("tcp", ":1234")
    _ = rpc.RegisterName("HelloService", &HelloService{})
    for {
        conn, _ := listener.Accept()
        go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
    }
}

既然 RPC 程序可以跨语言调用,那么我们就可以使用 Python 来完成上述客户端调用的例子。

只需要通过 Python 传递一个固定的 json 格式数据给 Go 的服务端,就可以完成。打印传递的 json 格式参数,可以清楚传递的格式为:

{"method": "HelloService.Hello", "params": ["bobby"], "id": 0}

返回格式为:

{'id': 0, 'result': 'hello, bobby', 'error': None}

由于 Python 中的 request 库是 http 协议的,而我们的 go 服务端是使用 tcp 协议的,因此不匹配,我们则需要使用 socket 库进行网络编程。

Python 代码如下:

import json
import socket

data =  {
    "id":0,
    "params":["bobby"],
    "method":"HelloService.Hello"
}

client = socket.create_connection(("localhost", 1234))
client.sendall(json.dumps(data).encode())

resp = client.recv(1024)
resp = json.loads(resp.decode())

print(resp['result'])

因此,只要是数据编码协议网络传输协议一致,任何编程语言都可以进行 RPC 互相调用。

网络传输协议改为 http

只需要将服务端和客户端使用的的 net 改为 net/http ,再将相应的代码进行一定的更改,就可以把 tcp 协议更改为 http 协议。正如在实现简单的 RPC 中做的一样。当然 gin 框架也是可以的。

Go 语言的服务端代码如下:

type HelloService struct{}

func (s *HelloService) Hello(request string, reply *string) error {
    *reply = "hello, " + request
    return nil
}

func main() {
    _ = rpc.RegisterName("HelloService", &HelloService{})
    http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) {
        var conn io.ReadWriteCloser = struct {
            io.Writer
            io.ReadCloser
        }{
            ReadCloser: r.Body,
            Writer:     w,
        }
        rpc.ServeRequest(jsonrpc.NewServerCodec(conn))
    })
    // rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
    http.ListenAndServe(":1234", nil)
}

注意:rpc.ServeCodec 变为了 rpc.ServeRequest

Go 语言的客户端代码如下:

func main() {
    data := struct {
        Id     int      `json:"id"` // 注意:字段需要大写开头才能被序列化
        Params []string `json:"params"`
        Method string   `json:"method"`
    }{
        Id:     0,
        Params: []string{"bobby"},
        Method: "HelloService.Hello",
    }
    jsonData, _ := json.Marshal(data)
    resp, err := http.Post("http://localhost:1234/hello", "application/json", bytes.NewBuffer(jsonData))
    if err != nil {
        panic(err)
    }
    defer resp.Body.Close()
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        panic(err)
    }
    println(string(body))
}

Python 语言的客户端代码如下:

import requests

data = {
    "id": 0,
    "params": ['bobby'],
    "method": "HelloService.Hello"
}

resp = requests.post("http://localhost:1234/hello", json=data)
print(resp.text)

后续还可以有改进的空间,即将代码进一步封装,调用时将不用传递 CallID 等参数,使其调用过程更接近本地的函数调用。

封装为本地调用的效果

在以上的代码中,我们都是使用 client.Call 这个方法配合函数名称作为参数,才能正确调用远程的函数。但是在 Python 中却可以直接使用例如 client.Hello 这样的方法直接调用远程对应的函数(这里的 Hello 就是远程函数的名称或者叫做函数调用 ID),而不需要将函数 ID 作为参数传递,这是怎么实现的呢?

熟悉 Python 的高级特性就可以知道,Python 可以通过 _getattr_ 这个魔法方法去 hook 住这个逻辑,而这在 Go 中是会报错的。

Go 语言本身没有像 Python 这样的魔法方法和特性,因此需要自己编写这样的方式。怎么写才比较合理?

net/rpc 快速上手 一开始的第一个程序代码为例。

我们可以先把 HelloService.Hello 中的 HelloService 的固定名称提出为一个全局常量。因为服务端和客户端的代码在实际运行时是在两个不同的服务器中,因此如果需要两个程序都能访问到这个全局常量,就需要新创建一个 handler/handler.go 文件,在其中编写这个全局常量,并且运行时在客户端和服务端都带上这个文件。

后续将原先的 HelloService.Hello 改为 handle.HelloService+".Hello" 这样的形式。

这样改进的目的就是,我们只想编写业务逻辑的代码,而不想过多关注每个远程函数的名称。

进一步,我们也可以把网络请求传输和 RPC 的部分,也封装成一个新的文件中,这样就可以把不同的逻辑代码拆开,而服务端和客户端的代码只需要关注业务逻辑代码。

比如,可以创建一个 client_proxy/client_proxy.go 文件,在其中编写以下代码:

type HelloServiceStub struct {
    *rpc.Client
}

func NewHelloServiceClient(protcol, address string) HelloServiceStub {
    conn, err := rpc.Dial(protcol, address)
    if err != nil {
        panic("connect error!")
    }
    return HelloServiceStub{conn}
}

func (c *HelloServiceStub) Hello(request string, reply *string) error {
    err := c.Call(hanlder.HelloServiceName+".Hello", request, reply)
    if err != nil {
        return err
    }
    return nil
}

值得注意的是,在 Go 语言中没有类、对象就意味着没有初始化方法。因此对一个结构体类进行初始化,常常使用一个函数 (开头为 New),比如上方的 NewHelloServiceClient,如果需要初始化一个对象,就可以通过这个函数返回一个该类型的对象。这在 Go 语言中十分十分常见

接着,client/client.go 的代码就可以写为以下的形式:

func main() {
    client := client_proxy.NewHelloServiceClient("tcp", "localhost:1234")
    var reply string 
    err := client.Hello("bobby", &reply)
    if err != nil {
        panic("调用失败")
    }
    fmt.Println(reply)
}

这样就可以像本地函数调用一样的方式,去调用远程的函数。

对于服务端代码,我们同样可以把函数具体的处理逻辑放入 handler/handler.go 中,再把服务端网络连接与 RPC 注册返回的处理逻辑放入 server_proxy/server_proxy.go 中。

于是,handler/handler.go 的代码内容如下:

// 防止名称冲突的问题
const HelloServiceName = "handler/HelloService"

type NewHelloService struct{}

// 我们更加关注这个方法名而不是它的结构体名
func (s *NewHelloService) Hello(request string, reply *string) error {
    //返回值是通过修改reply的值
    *reply = "hello, " + request
    return nil
}

server_proxy/server_proxy.go 的代码内容如下:

type HelloServicer interface {
    Hello(request string, reply *string) error
}

func RegisterHelloService(srv HelloServicer) error {
    return rpc.RegisterName(hanlder.HelloServiceName, srv)
}

这样的写法是为了将 server_proxy.gohandler.go 进行解耦,使其耦合性降低。

我们关心的是函数方法名而不是结构体名,这就需要妙用接口。在 Go 语言中,只要实现了这个接口中的所有方法,就默认自动实现了这个接口。

在上方的代码中,NewHelloService 拥有这个 Hello 方法,则它就实现了 HelloServicer 这个接口,所以在使用这个 RegisterHelloService 函数时,只需要传递 NewHelloService 这个结构体,到了 server_proxy.go 代码中就可以使用接口代替。

这样就实现了两个程序的解耦,当 handler.go 中的 NewHelloService 结构体更改了名称,只要还有 hello 这个方法,在 server_proxy.go 中就不需要做任何的代码修改。

server/server.go 的代码内容如下:

func main() {
    //1. 实例化一个server
    listener, _ := net.Listen("tcp", ":1234")
    //2. 注册处理逻辑 handler
    _ = server_proxy.RegisterHelloService(&hanlder.NewHelloService{})
    //3. 启动服务
    for {
        conn, _ := listener.Accept() //当一个新的连接进来的时候,
        go rpc.ServeConn(conn)
    }
}

以上这些概念在 grpc 中都有对应,实际上我们简单模拟了 gRPC 的调用流程。在后续学习就会简单一些。

这时我们会有个问题:server_proxy 和 client_proxy 能否自动生成?并且为多种语言生成?

:这些都能满足!这就是 protobuf + gRPC 的一整套解决方案。这也是后续学习的内容。

什么是 gRPC

gRPC 是 Google 开源的一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpcgrpc-javagrpc-go. 其中 C 版本支持 CC++Node.jsPythonRubyObjective-CPHP 和 C# 支持。

我们不需要担心使用 http 2.0 会出现性能问题,因为 http 2.0 相对于 http 1.0 已经针对性能进行了很深的改进。

同时在 gRPC 框架中也使用了一个高性能的序列化协议 protobuf,比 json 的性能高很多。

landing-2.svg

什么是 protobuf

protobuf 全称是 Protocol Buffer ,它是 Google 设计的一种轻量高效的结构化数据存储格式,性能比 Json、XML 强很多。它同样也是一个通用性的协议,并不是只有 Go 语言才能用。

习惯用 Json、XML 数据存储格式的大家,相信大多都没听过 Protocol Buffer

protobuf 经历了 protobuf2protobuf3,pb 3 比 pb 2 简化了很多,目前主流的版本是 pb 3。

对应 protobuf 的还有很多协议,比如 java 中的 dubbormihessian,还有 python 中的 messagepack ,这些协议都比 JsonXML 更高效。如果你懂了这些协议,完全有能力自己去实现一个类似的高性能协议。

image.png

根据以上的优缺点,我们可以想到:一般对于内部接口、微服务、RPC 等场景,我们推荐使用像 protobuf 这样的高性能协议。而对于外部接口、网络传输、开放接口等场景,就更推荐使用像是 JsonXML 这样通用性高的协议。

gRPC 快速上手

开发环境搭建

Quick start | Go | gRPC

先在 Github 中 protobuf Releases 下载对应工具。并将 /bin/protoc.exe 添加进环境变量中。

注意:protoc 的版本需要和 golang/protobuf 保持一致。

下载 Go 依赖库:

go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

注意:安装过程中会提示说 go get 会慢慢被弃用,不是错误只是提示,Go 的新版本依赖安装会慢慢弃用 go get 方式安装,以后一律采用 go install 方式安装第三方依赖。

导入 gRPC 自动下载所需要的文件:

import "google.golang.org/grpc"

为什么会需要 protoc 这个工具以及 protoc-gen-go 这个插件包呢?

这是因为 protobuf 与 Json、XML 都是通用性的协议,它不关注使用什么编程语言,任何编程语言只要遵守这个协议,都能实现这个协议。如果每个语言每次用到 protobuf 协议都需要写一次协议的实现,就会十分麻烦,因此官方给出了 protoc 这个工具,配合各个语言官方的第三方包,就可以帮助我们生成对应语言实现该协议的源代码。

同时为什么需要做成插件的形式?这是因为方便让我们自己生成所需要的源码,并可以根据自身的需求修改这个源码。如果像是 Go 语言第三方库这样的形式引入代码中,我们便不能对生成的代码进行自定义的修改。

proto 文件编写

你可以将 protobuf 简单地理解为一个新的语言和结构,它以 .proto 结尾。

如果在代码编辑器中没有该语言的提示和高亮,则可以在插件商店中安装一个 proto 插件,如 vscode 中的 vscode-proto3

接着就可以编写一个 helloworld.proto 文件:

syntax = "proto3";
option go_package = ".;proto"; // 必须要指定包名
service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;  //1表示字段的序号不是值
}

message HelloReply {
  string message = 1;
}

第二行,新版本的 protoc 和 protobuf 应该写成 option go_package = "./;proto";

可以使用以下命令行指令,生成 Go 语言文件:

protoc --go_out=. --go-grpc_out=require_unimplemented_servers=false:. helloworld.proto

这样在当前目录下就会生成一个 helloworld.pb.gohelloworld_grpc.pb.go 的 Go 语言文件。

使用时只需要在代码中引入 proto 这个包,就可以使用 HelloRequestHelloReply 这两个结构体,这两个结构体会将其中的信息自动转换为 protobuf 协议的数据。

package main

import (
    "fmt"
    "main/proto"
)

func main() {
    req := proto.HelloRequest{
        Name: "Bobby",
    }
    fmt.Println(req)
}

接下来,我们可以将数据变得更加复杂一些。

helloworld.proto 中的 HelloRequest 增加一些字段:

message HelloRequest {
  string name = 1;  //1表示字段的序号不是值
  int32 age = 2;
  repeated string courses = 3;  //repeated表示是一个数组
}

编写以下 protobuf 和 Json 协议对比的代码,两者都使用相同的结构体数据。

package main

import (
    "encoding/json"
    "fmt"
    pb "main/proto"

    "google.golang.org/protobuf/proto"
)

func main() {
    protobufStruct := pb.HelloRequest{
        Name:    "Bobby",
        Age:     20,
        Courses: []string{"Math", "Science", "English"},
    }
    jsonStruct := struct {
        Name    string   `json:"name"`
        Age     int32    `json:"age"`
        Courses []string `json:"courses"`
    }{
        Name:    protobufStruct.Name,
        Age:     protobufStruct.Age,
        Courses: protobufStruct.Courses,
    }
    protobufByte, _ := proto.Marshal(&protobufStruct)
    jsonByte, _ := json.Marshal(&jsonStruct)
    fmt.Printf("protobuf协议的数据长度: %d\n", len(protobufByte))
    fmt.Printf("json协议的数据长度: %d\n", len(jsonByte))
}

最终输出为:

protobuf协议的数据长度: 33
json协议的数据长度: 64

我们可以很直观地发现,protobuf 协议的压缩比远远高于 Json 协议,大致为两倍的差距,这也是为什么我们在微服务等内部场景中,更推荐使用 protobuf 协议。

那么 protobuf 协议具体是如何做到高性能和高压缩比的,可以作为后续学习的路径。

同时从上方的代码写法中可以发现,google.golang.org/protobuf/proto 这个工具的用法与 encoding/json 的用法相似,都可以通过 MarshalUnmarshal 将数据编码与反编码。

生成 API 接口

在上方的实例中,我们实际上只是使用了 helloworld.proto 文件中的

message HelloRequest {
  string name = 1;  //1表示字段的序号不是值
  int32 age = 2;
  repeated string courses = 3;  //repeated表示是一个数组
}

结构体,那么其他代码是干什么用的呢?

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply); // SayHello接口
}

比如上方这段代码,可以为服务端和客户端生成一个接口,在 helloworld_grpc.pb.go 文件中。

type GreeterClient interface {
    SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}

type GreeterServer interface {
    SayHello(context.Context, *HelloRequest) (*HelloReply, error)
}

根据上面的学习,我们清楚 protobuf 只是一个序列化协议,那么为什么我们通过编写 helloworld.proto 这样一个 proto 文件就可以为我们同时生成客户端和服务端的 API 接口?

实际上,这是通过 gRPC 生成的,而不是原本 protobuf 的功能。我们在开发环境搭建中安装的 google.golang.org/grpc/cmd/protoc-gen-go-grpc 包帮助我们生成了 API 接口,而 protobuf 只是专注于将数据编码按照协议规范进行转化,它只是提供了我们在 proto 文件中编写的那个结构体。

当我们去除了 google.golang.org/grpc/cmd/protoc-gen-go-grpc 这个包,执行上述生成指令,它依旧可以生成 Go 语言代码,但是只有一个 helloworld.pb.go 文件。

总的来说:

  • google.golang.org/protobuf/cmd/protoc-gen-go 帮助生成了 helloworld.pb.go 文件,其中包含 HelloRequestHelloReply 结构体信息。
  • google.golang.org/grpc/cmd/protoc-gen-go-grpc 帮助生成了 helloworld_grpc.pb.go 文件,其中包含 SayHello 的接口信息。

protoc-gen-go-grpcprotoc-gen-go 的一个插件。

因此,我们在执行以下生成命令时,需要指定使用什么插件,如 grpc

protoc --go_out=. --go-grpc_out=require_unimplemented_servers=false:. helloworld.proto
  • --go_out=.:指定生成 Go 语言文件,并放置在当前目录 .
  • --go-grpc_out=require_unimplemented_servers=false:.:指定生成 gRPC 服务代码,并且即使某些服务方法没有实现,代码也可以正常编译。生成的 gRPC 代码将被放置在当前目录 .
  • helloworld.proto:指定输入的 .proto 文件

在完全掌握微服务后,我们可以尝试阅读 protoc-gen-go 的源码,并编写一个对应的插件程序,用于满足我们项目的生成代码需求。

gRPC 快速体验

在搞懂 proto 生成原理后,我们就可以开始学习 gRPC 的使用。

同样我们在项目根目录先建立一个 proto 文件夹,在其中编写 helloworld.proto 文件:

syntax = "proto3";
option go_package = ".;proto";
service Greeter {
    rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
    string name = 1;
}

message HelloReply {
    string message = 1;
}

执行生成指令,生成 helloworld.pb.gohelloworld_grpc.pb.go 文件。

protoc --go_out=. --go-grpc_out=require_unimplemented_servers=false:. helloworld.proto

接着就可以在项目根目录中分布创建 server/server.goclient/client.go,开始编写服务端和客户端代码。

服务端代码如下:

type Server struct{}

// 实现helloworld_grpc.pb.go中的GreeterServer接口
func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply,
    error) {
    return &proto.HelloReply{
        Message: "hello " + request.Name,
    }, nil
}

func main() {
    // 1. 实例化一个服务
    g := grpc.NewServer()
    // 2. 注册处理逻辑
    proto.RegisterGreeterServer(g, &Server{})
    // 3. 启动服务
    lis, err := net.Listen("tcp", "0.0.0.0:8088")
    if err != nil {
        panic("failed to listen:" + err.Error())
    }
    err = g.Serve(lis) // 不会直接退出
    if err != nil {
        panic("failed to start grpc:" + err.Error())
    }
}

服务端的代码如下:

func main()  {
    conn,err := grpc.Dial("127.0.0.1:8080",grpc.WithInsecure())
    if err!=nil{
        panic(err)
    }
    defer conn.Close()
    
    c := hello.NewGreeterClient(conn)
    r, err := c.SayHello(context.Background(),&hello.HelloRequest{Name:"bobby"})
    if err!=nil{
        panic(err)
    }
    fmt.Println(r.Message)
}

通过以上代码,我们可以发现,其主逻辑与 Go 语言内置的 RPC 框架是几乎一样的,但是 gRPC 帮我们封装好了客户端和服务端的 API 接口,我们只需要实现并调用。

Go 语言内置 RPC 框架的服务端注册绑定写法:

rpc.RegisterName("HelloService", &HelloService{})

Go 语言内置 RPC 框架的客户端调用逻辑写法:

client.Call("HelloService.Hello", "bobby", &reply)

gRPC 服务端注册绑定写法:

proto.RegisterGreeterServer(g, &Server{})

gRPC 客户端调用逻辑写法:

c.SayHello(context.Background(),&hello.HelloRequest{Name:"bobby"})

对比发现,gRPC 的风格更加简洁方便,不再需要通过设置参数的方式来指定所需要执行的函数名,这在远程调用的场景中十分重要。

同时 gRPC 结合 protobuf 协议,帮我们完成了数据编码的序列化与反序列化的过程,简化了开发流程,我们只需要专注于业务逻辑的编写。

gRPC 四种数据模式

以上简单介绍了 gRPC 的简单使用,接着再介绍 gRPC 中的 stream (流模式)。srteam 顾名思义就是一种流,可以源源不断的推送数据,很适合传输一些大数据,或者服务端和客户端长时间数据交互,比如客户端可以向服务端订阅一个数据,服务端就可以利用 stream ,源源不断地推送数据。

gRPC 共有四种数据模式:

  1. 简单模式(Simple RPC)
  2. 服务端数据流模式(Server-side streaming RPC)
  3. 客户端数据流模式(Client-side streaming RPC)
  4. 双向数据流模式(Bidrectional streaming RPC)

简单模式:这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,这和大家平时熟悉的 RPC 没有什么大的区别,所以不再详细介绍。

服务端数据流模式:这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断地返回给客户端。

客户端数据流模式:与服务端数据流模式相反,客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器报送数据。

双向数据流模式:顾名思义,客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。典型的例子是聊天机器人。

具体的代码如何编写,如下所示。

如果要使用流模式,则需要在编写 proto 文件时,加上 stream 关键字。

syntax = "proto3";

option go_package="./;proto";
service Greeter {
    //服务端流模式
    rpc GetStream(StreamReqData) returns (stream StreamResData); 
    //客户端流模式
    rpc PutStream(stream StreamReqData) returns (StreamResData); 
    //双向流模式
    rpc AllStream(stream StreamReqData) returns (stream StreamResData); 
}

message StreamReqData {
    string data = 1;
}

message StreamResData {
    string data = 1;
}

使用 protoc 指令生成 Go 语言文件:

protoc --go_out=. --go-grpc_out=require_unimplemented_servers=false:. stream.proto

接着就可以开始编写服务端和客户端的代码。

服务端代码:

const PORT = ":12345"

type Server struct {
}

// 服务端流模式
func (s *Server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {
    for i := 0; i < 10; i++ { // 发送10次数据
        // 发送当前时间数据
        res.Send(&proto.StreamResData{
            Data: fmt.Sprintf("server: %s", time.Now().Format("2006-01-02 15:04:05")),
        })
        time.Sleep(time.Second) // 每隔一秒发送一次
    }
    return nil
}

// 客户端流模式
func (s *Server) PutStream(res proto.Greeter_PutStreamServer) error {
    for {
        if data, err := res.Recv(); err != nil {
            fmt.Println(err)
            break
        } else {
            fmt.Println(data.Data)
        }
    }
    return nil
}

// 双向流模式
func (s *Server) AllStream(res proto.Greeter_AllStreamServer) error {
    wg := sync.WaitGroup{}
    wg.Add(2)
    // 接收数据的goroutine
    go func() {
        defer wg.Done()
        for {
            data, err := res.Recv()
            if err != nil {
                fmt.Println(err)
                break
            }
            fmt.Println(data.Data)
        }
    }()
    // 发送数据的goroutine
    go func() {
        defer wg.Done()
        for i := 0; i < 10; i++ { // 发送10次数据
            _ = res.Send(&proto.StreamResData{
                Data: fmt.Sprintf("server: 双向数据流 %d", i),
            })
            time.Sleep(time.Second) // 每隔一秒发送一次
        }
    }()
    wg.Wait() // 等待两个goroutine结束
    return nil
}

func main() {
    //监听端口
    lis, err := net.Listen("tcp", PORT)
    if err != nil {
        panic(err)
    }
    //创建一个grpc 服务器
    s := grpc.NewServer()
    //注册事件
    proto.RegisterGreeterServer(s, &Server{})
    //处理链接
    err = s.Serve(lis)
    if err != nil {
        panic(err)
    }
}

需要注意的是,流模式中实现接口的这三个方法,其参数与简单模式中的并不相同。

简单模式中的参数形式如下:

func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply,
error)

而流模式中的参数形式如下:

func (s *Server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error

客户端的代码:

const ADDRESS = "localhost:12345"

func main() {
    conn, err := grpc.Dial(ADDRESS, grpc.WithInsecure())
    if err != nil {
        panic(err)
    }
    defer conn.Close()
    c := proto.NewGreeterClient(conn)
    // 1. 服务端流模式
    reqStreamData := &proto.StreamReqData{
        Data: "client",
    }
    res, _ := c.GetStream(context.Background(), reqStreamData)
    for {
        data, err := res.Recv()
        if err != nil {
            fmt.Println(err)
            break
        }
        fmt.Println(data.Data)
    }
    // 2. 客户端流模式
    putRes, _ := c.PutStream(context.Background())
    for i := 0; i < 10; i++ { // 发送10次数据
        _ = putRes.Send(&proto.StreamReqData{
            Data: fmt.Sprintf("client: %s", time.Now().Format("2006-01-02 15:04:05")),
        })
        time.Sleep(time.Second) // 每隔一秒发送一次
    }
    // 3. 双向流模式
    allRes, _ := c.AllStream(context.Background())
    wg := sync.WaitGroup{}
    wg.Add(2)
    // 接收数据的goroutine
    go func() {
        defer wg.Done()
        for {
            data, err := allRes.Recv()
            if err != nil {
                fmt.Println(err)
                break
            }
            fmt.Println(data.Data)
        }
    }()
    // 发送数据的goroutine
    go func() {
        defer wg.Done()
        for i := 0; i < 10; i++ { // 发送10次数据
            _ = allRes.Send(&proto.StreamReqData{
                Data: fmt.Sprintf("client: 双向数据流 %d", i),
            })
            time.Sleep(time.Second) // 每隔一秒发送一次
        }
    }()
    wg.Wait() // 等待两个goroutine结束
    // 也可以使用select{}阻塞主goroutine
    // select{}
}
  1. 在使用服务端数据流模式时,服务端会源源不断向客户端发送当前时间数据。
  2. 在使用客户端数据流模式时,客户端会源源不断向服务端发送当前时间数据。
  3. 在使用双向数据流模式时,服务端和客户端都能向对方源源不断地发送字符数据。

客户端输出:

PS D:\Desktop\stream_gprc\client> go run client.go
server: 2025-02-22 17:13:13
server: 2025-02-22 17:13:14
server: 2025-02-22 17:13:15
server: 2025-02-22 17:13:16
server: 2025-02-22 17:13:17
server: 2025-02-22 17:13:18
server: 2025-02-22 17:13:19
server: 2025-02-22 17:13:20
server: 2025-02-22 17:13:21
server: 2025-02-22 17:13:22
EOF
server: 双向数据流 0
server: 双向数据流 1
server: 双向数据流 2
server: 双向数据流 3
server: 双向数据流 4
server: 双向数据流 5
server: 双向数据流 6
server: 双向数据流 7
server: 双向数据流 8
server: 双向数据流 9

服务端输出:

PS D:\Desktop\stream_gprc\server> go run server.go
client: 2025-02-22 17:13:23
client: 2025-02-22 17:13:24
client: 2025-02-22 17:13:25
client: 2025-02-22 17:13:26
client: 2025-02-22 17:13:27
client: 2025-02-22 17:13:28
client: 2025-02-22 17:13:29
client: 2025-02-22 17:13:30
client: 2025-02-22 17:13:31
client: 2025-02-22 17:13:32
client: 双向数据流 0
client: 双向数据流 1
client: 双向数据流 2
client: 双向数据流 3
client: 双向数据流 4
client: 双向数据流 5
client: 双向数据流 6
client: 双向数据流 7
client: 双向数据流 8
client: 双向数据流 9

项目代码结构:

├───client
│       client.go
│       go.mod
│       go.sum
├───proto
│       go.mod
│       go.sum
│       stream.pb.go
│       stream.proto
│       stream_grpc.pb.go
└───server
        go.mod
        go.sum
        server.go

值得一提的是,在使用双向数据流模式的代码中,服务端和客户端都使用了两个 goroutine 来发送和接收数据,这样就可以使程序更加灵活,不要求接收和发送的顺序,即使是网络阻塞使发送和接收数据的顺序变化,依然可以成功执行这段逻辑代码。

观察以上服务端和客户端的代码,我们可以发现,这与传统的 socket 编程十分类似,比如都使用了 SendRecv 两个方法来发送和接收数据。

实际上,这段代码底层就是使用 socket 编程的模式,只是在 socket 的基础上再封装了一层,加上了 RPC 相关的功能,同时数据编码格式使用了 protobuf 协议,而在传统 socket 编程中,并没有规定需要使用什么数据格式,也就是说 gRPC 流模式就是建立在 socket 层上的一个应用层封装。