千锋教育-做有情怀、有良心、有品质的职业教育机构

手机站
千锋教育

千锋学习站 | 随时随地免费学

千锋教育

扫一扫进入千锋手机站

领取全套视频
千锋教育

关注千锋学习站小程序
随时随地免费学习课程

当前位置:首页  >  技术干货  > 如何使用golang实现自定义RPC框架

如何使用golang实现自定义RPC框架

来源:千锋教育
发布人:xqq
时间: 2023-12-27 05:03:46 1703624626

如何使用golang实现自定义RPC框架

RPC (Remote Procedure Call)是一种远程调用协议,通过网络传输,使得程序能够像本地调用一样调用远程服务。在现代微服务架构中,RPC协议被广泛使用。golang通过标准库的net/rpc包提供了一套RPC框架,但是这个框架无法满足一些特定的业务需求,本文就来介绍如何使用golang自己实现一个RPC框架。

1. 基本概念

在实现自定义RPC框架之前,需要先了解以下几个基本概念:

- Service:RPC调用的服务,即提供RPC服务的函数集合。

- Method:Service中的方法,即具体的RPC调用方法。

- Codec:序列化和反序列化的方法,将调用的参数和返回值序列化成二进制数据,以便通过网络传输。

- Transport:网络传输协议,用于将序列化后的二进制数据通过网络传输到远程服务。

2. 实现步骤

接下来我们就来实现一个简单的自定义RPC框架,步骤如下:

- 定义Service和Method

- 实现Codec

- 实现Transport

- 完成框架

2.1 定义Service和Method

我们以一个简单的计算器服务为例,在服务端提供两个方法Add和Multiply,客户端可以通过RPC调用这两个方法。

定义服务:

`go

// 定义CalculatorService接口

type CalculatorService interface {

Add(int, int) int

Multiply(int, int) int

}

// 实现具体的CalculatorService

type CalculatorServiceImpl struct {}

func (c *CalculatorServiceImpl) Add(a, b int) int {

return a + b

}

func (c *CalculatorServiceImpl) Multiply(a, b int) int {

return a * b

}

定义Service和Method之后,接下来需要定义一个struct来存储Service和其对应的Method。同时,定义一个Register方法,用于注册新的Service和Method。`gotype Server struct {    services map*service}type service struct {    typ    reflect.Type    method map*methodType}type methodType struct {    method    reflect.Method    ArgType   reflect.Type    ReplyType reflect.Type}func (s *Server) Register(receiver interface{}) error {    service := new(service)    service.typ = reflect.TypeOf(receiver).Elem()    service.method = make(map*methodType)    for i := 0; i < service.typ.NumMethod(); i++ {        method := service.typ.Method(i)        mType := method.Type        if mType.NumIn() != 3 || mType.NumOut() != 1 {            continue        }        argType := mType.In(1)        replyType := mType.In(2)        if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) {            continue        }        service.method = &methodType{            method:    method,            ArgType:   argType,            ReplyType: replyType,        }    }    s.services = service    return nil}func isExportedOrBuiltinType(t reflect.Type) bool {    pkgPath := t.PkgPath()    return pkgPath == "" || pkgPath == "builtin"}

在Register方法中,循环遍历service.typ中的所有方法,将满足条件的方法添加到service.method中。最后将service添加到Server.services中。

2.2 实现Codec

Codec用于将调用的参数和返回值序列化成二进制数据,以便通过网络传输。

在这里,我们使用golang的标准库encoding/gob实现Codec。Gob是golang标准库中的编解码库,支持任意类型的编解码和传输,比JSON和XML更高效。在实现Codec之前,需要先定义一个request结构体和response结构体,用于存储调用信息和返回信息。

`go

type request struct {

ServiceMethod string // 形如"Service.Method"

Seq uint64 // 请求序列号

Args byte // 客户端传递的参数

}

type response struct {

Seq uint64 // 请求序列号

ServiceMethod string // 形如"Service.Method"

Error string // 存储错误信息

Reply byte // 存储响应参数

}

接下来实现Codec,具体实现代码如下:`gotype Codec struct {    conn io.ReadWriteCloser    dec  *gob.Decoder    enc  *gob.Encoder    mutex sync.Mutex    ids   uint64    pending map*call}type call struct {    req  *request    resp *response    done chan *call}func (c *Codec) WriteRequest(method string, args interface{}) (uint64, error) {    c.mutex.Lock()    defer c.mutex.Unlock()    id := c.ids    c.ids++    req := &request{        ServiceMethod: method,        Seq:           id,    }    buf := bytes.NewBuffer(nil)    enc := gob.NewEncoder(buf)    if err := enc.Encode(args); err != nil {        return 0, err    }    req.Args = buf.Bytes()    call := &call{        req:  req,        resp: new(response),        done: make(chan *call),    }    c.pending = call    if err := c.enc.Encode(req); err != nil {        delete(c.pending, id)        return 0, err    }    return id, nil}func (c *Codec) ReadResponseHeader() (*rpc.Response, error) {    c.mutex.Lock()    defer c.mutex.Unlock()    var resp response    if err := c.dec.Decode(&resp); err != nil {        return nil, err    }    call := c.pending    delete(c.pending, resp.Seq)    call.resp = &resp    call.done <- call    return &rpc.Response{        ServiceMethod: resp.ServiceMethod,        Seq:           resp.Seq,        Error:         errors.New(resp.Error),    }, nil}func (c *Codec) ReadResponseBody(x interface{}) error {    c.mutex.Lock()    defer c.mutex.Unlock()    call := <-c.pending.done    if call.resp.Error != "" {        return errors.New(call.resp.Error)    }    dec := gob.NewDecoder(bytes.NewBuffer(call.resp.Reply))    return dec.Decode(x)}

在上面的代码中,我们使用了一个pending map来存储请求的序列号和请求的返回值。在WriteRequest方法中,我们将请求信息编码成二进制数据,然后将请求信息和该请求的channel存储到pending中。在ReadResponseHeader和ReadResponseBody方法中,我们根据pending中的请求序列号获取该请求对应的call,然后将call.resp进行解码后返回。

2.3 实现Transport

Transport用于将序列化后的二进制数据通过网络传输到远程服务。

在golang中,可以使用net包来实现简单的Socket编程。在这里,我们通过net.Dial建立连接后,将Codec中序列化后的数据通过Socket发送到远程服务端。

`go

type Transport struct {

conn io.ReadWriteCloser

}

func (t *Transport) Dial(network, address string) error {

conn, err := net.Dial(network, address)

if err != nil {

return err

}

t.conn = conn

return nil

}

func (t *Transport) Close() error {

return t.conn.Close()

}

func (t *Transport) Codec() rpc.ClientCodec {

return &Codec{

conn: t.conn,

dec: gob.NewDecoder(t.conn),

enc: gob.NewEncoder(t.conn),

pending: make(map*call),

}

}

2.4 完成框架最后,我们完成自定义RPC框架的实现。具体代码如下:`gotype Server struct {    services map*service}type service struct {    typ    reflect.Type    method map*methodType}type methodType struct {    method    reflect.Method    ArgType   reflect.Type    ReplyType reflect.Type}func (s *Server) Register(receiver interface{}) error {    service := new(service)    service.typ = reflect.TypeOf(receiver).Elem()    service.method = make(map*methodType)    for i := 0; i < service.typ.NumMethod(); i++ {        method := service.typ.Method(i)        mType := method.Type        if mType.NumIn() != 3 || mType.NumOut() != 1 {            continue        }        argType := mType.In(1)        replyType := mType.In(2)        if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) {            continue        }        service.method = &methodType{            method:    method,            ArgType:   argType,            ReplyType: replyType,        }    }    s.services = service    return nil}func (s *Server) ServeCodec(codec rpc.ServerCodec) error {    for {        req, err := codec.ReadRequestHeader()        if err != nil {            if err != io.EOF && err != io.ErrUnexpectedEOF {                log.Println("rpc server:", err)            }            return err        }        serviceMethod := req.ServiceMethod        dot := strings.LastIndex(serviceMethod, ".")        if dot < 0 {            err := errors.New("rpc server: service/method request ill-formed: " + serviceMethod)            log.Println(err.Error())            resp := &rpc.Response{                ServiceMethod: serviceMethod,                Seq:           req.Seq,                Error:         err.Error(),            }            codec.WriteResponse(resp, nil)            continue        }        serviceName, methodName := serviceMethod, serviceMethod        service, ok := s.services        if !ok {            err := errors.New("rpc server: can't find service " + serviceName)            log.Println(err.Error())            resp := &rpc.Response{                ServiceMethod: serviceMethod,                Seq:           req.Seq,                Error:         err.Error(),            }            codec.WriteResponse(resp, nil)            continue        }        mtype, ok := service.method        if !ok {            err := errors.New("rpc server: can't find method " + methodName)            log.Println(err.Error())            resp := &rpc.Response{                ServiceMethod: serviceMethod,                Seq:           req.Seq,                Error:         err.Error(),            }            codec.WriteResponse(resp, nil)            continue        }        argv := reflect.New(mtype.ArgType)        replyv := reflect.New(mtype.ReplyType).Elem()        if err = codec.ReadRequestBody(argv.Interface()); err != nil {            log.Println("rpc server: ", err)            resp := &rpc.Response{                ServiceMethod: serviceMethod,                Seq:           req.Seq,                Error:         err.Error(),            }            codec.WriteResponse(resp, nil)            continue        }        // Call the service method.        returnValues := mtype.method.Func.Call(reflect.Value{            reflect.ValueOf(service),            reflect.ValueOf(argv.Interface()),            replyv,        })        // The return value for the method is an error.        errInter := returnValues.Interface()        if errInter != nil {            err := errInter.(error)            log.Println("rpc server: ", err)            resp := &rpc.Response{                ServiceMethod: serviceMethod,                Seq:           req.Seq,                Error:         err.Error(),            }            codec.WriteResponse(resp, nil)            continue        }        resp := &rpc.Response{            ServiceMethod: serviceMethod,            Seq:           req.Seq,            Error:         "",        }        if err = codec.WriteResponse(resp, replyv.Interface()); err != nil {            log.Println("rpc server: ", err)        }    }}func (s *Server) ServeTransport(transport *Transport) error {    codec := transport.Codec()    defer transport.Close()    return s.ServeCodec(codec)}func isExportedOrBuiltinType(t reflect.Type) bool {    pkgPath := t.PkgPath()    return pkgPath == "" || pkgPath == "builtin"}

在上面的代码中,我们定义了一个Server结构体,用于注册Service和Method,同时实现ServeCodec和ServeTransport方法,用于在服务端处理RPC请求。

3. 测试

完成自定义RPC框架的实现之后,我们需要对其进行测试。下面我们将分别在服务端和客户端使用该RPC框架。

服务端代码:

`go

func main() {

server := new(Server)

server.services = make(map*service)

_ = server.Register(&CalculatorServiceImpl{})

transport := new(Transport)

_ = transport.Dial("tcp", "localhost:8080")

defer transport.Close()

log.Fatal(server.ServeTransport(transport))

}

在服务端,我们首先通过Server.Register方法注册了一个CalculatorServiceImpl服务,然后使用Transport.Dial方法连接到特定的地址。客户端代码:`gofunc main() {    transport := new(Transport)    _ = transport.Dial("tcp", "localhost:8080")    defer transport.Close()    client := rpc.NewClientWithCodec(transport.Codec())    var res int    err := client.Call("CalculatorService.Add", int{10, 20}, &res)    if err != nil {        log.Fatal(err)    }    log.Printf("Add(10, 20) = %d", res)    var mul int    err = client.Call("CalculatorService.Multiply", int{10, 20}, &mul)    if err != nil {        log.Fatal(err)    }    log.Printf("Multiply(10, 20) = %d", mul)}

在客户端,我们首先通过Transport.Dial方法连接到服务端,然后通过rpc.NewClientWithCodec方法创建一个客户端,并使用client.Call方法调用服务端的方法。

最后,我们启动服务端和客户端,可以看到客户端成功调用了服务端提供的Add和Multiply方法。

4. 总结

本文介绍了如何使用golang实现自定义RPC框架,包括定义Service和Method,实现Codec和Transport,完成框架等步骤,并通过一个简单的计算器服务对该RPC框架进行了测试。该自定义RPC框架适用于一些特定的业务需求,可以满足不同的RPC调用场景。

以上就是IT培训机构千锋教育提供的相关内容,如果您有web前端培训鸿蒙开发培训python培训linux培训,java培训,UI设计培训等需求,欢迎随时联系千锋教育。

tags:
声明:本站稿件版权均属千锋教育所有,未经许可不得擅自转载。
10年以上业内强师集结,手把手带你蜕变精英
请您保持通讯畅通,专属学习老师24小时内将与您1V1沟通
免费领取
今日已有369人领取成功
刘同学 138****2860 刚刚成功领取
王同学 131****2015 刚刚成功领取
张同学 133****4652 刚刚成功领取
李同学 135****8607 刚刚成功领取
杨同学 132****5667 刚刚成功领取
岳同学 134****6652 刚刚成功领取
梁同学 157****2950 刚刚成功领取
刘同学 189****1015 刚刚成功领取
张同学 155****4678 刚刚成功领取
邹同学 139****2907 刚刚成功领取
董同学 138****2867 刚刚成功领取
周同学 136****3602 刚刚成功领取
相关推荐HOT