RPC原理 即RPC字面意思,远程过程调用,跨机器调用方法,实际它的作用更大,常常用于网络通信中,例如分布式缓存,数据库等
当一个程序的用户量上来时,响应变慢,从单体到多台服务器进行负载均衡,提高用户并发量,但是每个服务器都包含所有代码致使每次变动时所有服务器都需要重新编译,多次部署;将功能拆分放在各个服务器,用过rpc透明的调用远地方法
核心是解决分布式网络通信的复杂问题
rpc通信流程
为什么要用c++写rpc
高性能:c++以其高效的内存管理和底层控制能力,可以满足rpc通信的低开销
系统开发:很多底层基础设施(如数据库、中间件、分布式存储系统)都是用c++开发
跨平台:同一通信接口
灵活性与可扩展性:C++允许开发者灵活地调整底层实现。例如:可以定制序列化协议(如Protobuf、Thrift)、网络传输方式(如TCP、UDP、QUIC)等,以满足不同场景的需求。
哪里用到rpc
微服务架构:定制高效的通信框架
实时通信:低延迟和高吞吐
分布式存储与计算
嵌入式:资源限制,轻量级
跨语言调用:c++rpc框架支持多语言绑定
协议 浏览器使用http协议进行传输,http不适合用于rpc上,1,无状态,每次请求都需要重新连接;2,http数据包臃肿,包含许多无用内容
rpc一般使用tcp协议传输,tcp基于流传输,为了服务端正确接收解码数据,需要自定义传输协议
解决tcp粘包问题:协议长度(比如4字节)+ 协议体
解决编码解码:协议头(比如协议长度、消息类型、协议版本、序列化方式) + 协议体
为了后续功能拓展,协议头一般会加上扩展字段
序列化 Json 基于key-value方式存储数据,文本存储,可读性高,适合适合前后端通信,配置文件等需要快速开发 和易读性 场景
缺点:
体积大,存在key值冗余,进行序列化的额外空间开销比较大、速度慢
JSON没有类型,但像Java这种强类型语言,需要通过反射统一解决,所以性能不会太好。
直接 eval()
解析可能导致代码注入,不安全。
Hessian 二进制存储,编码紧凑;java生态最完整,动态类型,通过object反序列化,常用于java的rpc通信;兼容性高,字段顺序无关,字段增减不影响旧版解析。
缺点:
不直接支持 Linked
系列、Locale
类等(需扩展序列化器)。
Byte/Short反序列化的时候变成Integer。
非 Java 语言支持较弱。
XML 文本存储,标签嵌套存储数据,仅用于 传统企业系统 (如银行报文)、复杂文档存储 (如 EPUB 电子书)等遗留场景
缺点:
标签冗余,体积最大,序列化时间极慢,
DOM/SAX 解析器性能差,内存占用高。
DOM解析时内存溢出,用SAX解析器
protobuf 二进制存储,数据结构化存储格式;强类型(需预定义 IDL),不需要反射;兼容性高,字段编号机制,序列化的时间开销和空间开销都非常小;适用于高并发场景
缺点:
开发成本高 :需预编译生成代码,对于具有反射和动态能力的语言是不必要的
灵活性差 :修改 IDL 需重新生成代码,不适合频繁变更的结构。
字段编号机制:
每个字段必须分配 唯一数字编号 (如 int32 id = 1;
)
编号范围:1~536,870,911 (其中 19000~19999 为协议保留,不可用)
字段编号一旦定义,禁止修改 (否则破坏兼容性)
删除字段时需 保留其编号 (通过 reserved
关键字声明
协议
兼容性机制
典型问题
JSON
依赖字段名
字段重命名或删除导致解析失败
XML
依赖标签名和顺序
标签结构调整破坏兼容性
Hessian
依赖字段名和类型签名
类型签名变更导致反序列化错误
构造入参和返回值时避免一些问题: 对象构造得过于复杂 :减少依赖关系和属性
对象过于庞大:
使用序列化框架不支持的类作为入参类: 尽量使用原生对象
对象有复杂的继承关系: 大多数序列化时会寻找不停的寻找父类
与序列化协议的效率、性能、序列化协议后的体积相比,其通用性和兼容性的优先级会更高,因为他是会直接关系到服务调用的稳定性和可用率的
考虑序列化协议的优先级从高到低:安全性,通用性,兼容性,性能,时间开销,空间开销
零拷贝 用户每次读写数据都要陷入内核态,在用户空间和内核空间来回拷贝,每次都要cpu上下文切换,浪费cpu和性能
mmap+write
sendfile
1 2 3 struct stat stat_buf; fstat(filefd, &stat_buf); sendfile(connfd, filefd, NULL, stat_buf.st_size);
这里filedfd必须是真实的文件,connfd必须是socket
客户端异步调用,负载均衡,异常重试,健康检测,熔断机制等
simple mprpc 记录应用mprpc框架的一个聊天室实现
mprpc
protobuf使用: protc xxx.proto –cpp_out=./
所有的message都有一个父类Message 因此才有 所有的service都有一个父类Service
一个login rpc 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 syntax = "proto3" ; package fixbug;option cc_generic_services = true ;message ResultCode { int32 errcode = 1 ; bytes errmsg = 2 ; } message LoginRequest { bytes name = 1 ; bytes pwd = 2 ; } message LoginResponse { ResultCode result = 1 ; bool sucess = 2 ; } service UserServiceRpc { rpc Login(LoginRequest) returns (LoginResponse) ; }
首先生成UserServiceRpc类,这个类会实现这两个虚函数
1 2 3 4 5 6 7 8 virtual void Login (::PROTOBUF_NAMESPACE_ID::RpcController* controller, const ::fixbug::LoginRequest* request, ::fixbug::LoginResponse* response, ::google::protobuf::Closure* done) ;virtual void Register (::PROTOBUF_NAMESPACE_ID::RpcController* controller, const ::fixbug::RegisterRequest* request, ::fixbug::RegisterResponse* response, ::google::protobuf::Closure* done) ;
在user的main函数里调用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 MprpcApplication::Init (argc, argv); fixbug::UserServiceRpc_Stub stub (new MprpcChannel()) ;fixbug::LoginRequest request; request.set_name ("zhang san" ); request.set_pwd ("123456" ); fixbug::LoginResponse response; stub.Login (nullptr , &request, &response, nullptr ); if (0 == response.result ().errcode ()){ std::cout << "rpc login response success:" << response.sucess () << std::endl; } else { std::cout << "rpc login response error : " << response.result ().errmsg () << std::endl; }
mprpc框架包含user-stub/server-stub和RPC
UserServiceRpc_Stub继承UserServiceRpc,接收一个Channel, 在stub中调用Login()方法实际是调用channel里的CallMethod方法,
1 2 3 4 5 6 7 void UserServiceRpc_Stub::Login (::PROTOBUF_NAMESPACE_ID::RpcController* controller, const ::fixbug::LoginRequest* request, ::fixbug::LoginResponse* response, ::google::protobuf::Closure* done) { channel_->CallMethod (descriptor ()->method (0 ), controller, request, response, done); }
这个CallMethod方法接收一个descriptor的method(i)(描述一个服务对象的方法列表) 因此在user端,mprpc框架的内容就是实现自己的Channel类并覆盖CallMethod方法
在server的main函数调用 1 2 3 4 5 6 7 8 9 MprpcApplication::Init (argc, argv); RpcProvider provider; provider.NotifyService (new UserService ()); provider.Run ();
provider即mprpc框架在server端的实现,在这里框架内容:反序列并解析调用本地login方法,然后包装序列化,调用网络框架发送 首先将本地实现的service服务对象发布,以便在run中提供rpc远程调用服务 在provider提供onMessage给网络框架进行读写事件回调
server类实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class UserService : public fixbug::UserServiceRpc // 使用在rpc服务发布端(rpc服务提供者) { public: bool Login(std::string name, std::string pwd) { std::cout << "doing local service: Login" << std::endl; std::cout << "name:" << name << " pwd:" << pwd << std::endl; return false; } /* 重写基类UserServiceRpc的虚函数 下面这些方法都是框架直接调用的 1. caller ===> Login(LoginRequest) => muduo => callee 2. callee ===> Login(LoginRequest) => 交到下面重写的这个Login方法上了 */ void Login(::google::protobuf::RpcController* controller, const ::fixbug::LoginRequest* request, ::fixbug::LoginResponse* response, ::google::protobuf::Closure* done) { // 框架给业务上报了请求参数LoginRequest,应用获取相应数据做本地业务 std::string name = request->name(); std::string pwd = request->pwd(); // 做本地业务 bool login_result = Login(name, pwd); // 把响应写入 包括错误码、错误消息、返回值 fixbug::ResultCode *code = response->mutable_result(); code->set_errcode(0); code->set_errmsg(""); response->set_sucess(login_result); // 执行回调操作 执行响应对象数据的序列化和网络发送(都是由框架来完成的) done->Run(); } }
mprpc框架channel和provider实现 Channel代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 void MprpcChannel::CallMethod (const google::protobuf::MethodDescriptor* method, google::protobuf::RpcController* controller, const google::protobuf::Message* request, google::protobuf::Message* response, google::protobuf:: Closure* done) {const google::protobuf::ServiceDescriptor* sd = method->service ();std::string service_name = sd->name (); std::string method_name = method->name (); uint32_t args_size = 0 ;std::string args_str; if (request->SerializeToString (&args_str)){ args_size = args_str.size (); } else { controller->SetFailed ("serialize request error!" ); return ; } mprpc::RpcHeader rpcHeader; rpcHeader.set_service_name (service_name); rpcHeader.set_method_name (method_name); rpcHeader.set_args_size (args_size); uint32_t header_size = 0 ;std::string rpc_header_str; if (rpcHeader.SerializeToString (&rpc_header_str)){ header_size = rpc_header_str.size (); } else { controller->SetFailed ("serialize rpc header error!" ); return ; } std::string send_rpc_str; send_rpc_str.insert (0 , std::string ((char *)&header_size, 4 )); send_rpc_str += rpc_header_str; send_rpc_str += args_str; std::cout << "============================================" << std::endl; std::cout << "header_size: " << header_size << std::endl; std::cout << "rpc_header_str: " << rpc_header_str << std::endl; std::cout << "service_name: " << service_name << std::endl; std::cout << "method_name: " << method_name << std::endl; std::cout << "args_str: " << args_str << std::endl; std::cout << "============================================" << std::endl; int clientfd = socket (AF_INET, SOCK_STREAM, 0 );if (-1 == clientfd){ char errtxt[512 ] = {0 }; sprintf (errtxt, "create socket error! errno:%d" , errno); controller->SetFailed (errtxt); return ; } std::string ip = "127.0.0.1" ; uint16_t port = 8000 ;struct sockaddr_in server_addr;server_addr.sin_family = AF_INET; server_addr.sin_port = htons (port); server_addr.sin_addr.s_addr = inet_addr (ip.c_str ()); if (-1 == connect (clientfd, (struct sockaddr*)&server_addr, sizeof (server_addr))){ close (clientfd); char errtxt[512 ] = {0 }; sprintf (errtxt, "connect error! errno:%d" , errno); controller->SetFailed (errtxt); return ; } if (-1 == send (clientfd, send_rpc_str.c_str (), send_rpc_str.size (), 0 )){ close (clientfd); char errtxt[512 ] = {0 }; sprintf (errtxt, "send error! errno:%d" , errno); controller->SetFailed (errtxt); return ; } char recv_buf[1024 ] = {0 };int recv_size = 0 ;if (-1 == (recv_size = recv (clientfd, recv_buf, 1024 , 0 ))){ close (clientfd); char errtxt[512 ] = {0 }; sprintf (errtxt, "recv error! errno:%d" , errno); controller->SetFailed (errtxt); return ; } if (!response->ParseFromArray (recv_buf, recv_size)){ close (clientfd); char errtxt[512 ] = {0 }; sprintf (errtxt, "parse error! response_str:%s" , recv_buf); controller->SetFailed (errtxt); return ; } close (clientfd);}
OnMesaage代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 void RpcProvider::OnMessage (const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buffer, muduo::Timestamp) {std::string recv_buf = buffer->retrieveAllAsString (); uint32_t header_size = 0 ;recv_buf.copy ((char *)&header_size, 4 , 0 ); std::string rpc_header_str = recv_buf.substr (4 , header_size); mprpc::RpcHeader rpcHeader; std::string service_name; std::string method_name; uint32_t args_size;if (rpcHeader.ParseFromString (rpc_header_str)){ service_name = rpcHeader.service_name (); method_name = rpcHeader.method_name (); args_size = rpcHeader.args_size (); } else { std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl; return ; } std::string args_str = recv_buf.substr (4 + header_size, args_size); std::cout << "============================================" << std::endl; std::cout << "header_size: " << header_size << std::endl; std::cout << "rpc_header_str: " << rpc_header_str << std::endl; std::cout << "service_name: " << service_name << std::endl; std::cout << "method_name: " << method_name << std::endl; std::cout << "args_str: " << args_str << std::endl; std::cout << "============================================" << std::endl; auto it = m_serviceMap.find (service_name);if (it == m_serviceMap.end ()){ std::cout << service_name << " is not exist!" << std::endl; return ; } auto mit = it->second.m_methodMap.find (method_name);if (mit == it->second.m_methodMap.end ()){ std::cout << service_name << ":" << method_name << " is not exist!" << std::endl; return ; } google::protobuf::Service *service = it->second.m_service; const google::protobuf::MethodDescriptor *method = mit->second; google::protobuf::Message *request = service->GetRequestPrototype (method).New (); if (!request->ParseFromString (args_str)){ std::cout << "request parse error, content:" << args_str << std::endl; return ; } google::protobuf::Message *response = service->GetResponsePrototype (method).New (); google::protobuf::Closure *done = google::protobuf::NewCallback <RpcProvider, const muduo::net::TcpConnectionPtr&, google::protobuf::Message*> (this , &RpcProvider::SendRpcResponse, conn, response); service->CallMethod (method, nullptr , request, response, done); }
解析request,然后依然是通过callmethod调用,只不过在user端已经有stub,而在server端则是自己实现一个stub,然后再Closure里面调用SendRpcResponse,
1 2 3 4 5 std::string response_str; if (response->SerializeToString (&response_str)) conn->send (response_str); conn->shutdown ();
额外补充 为了防止粘包问题,每个包的结构应该如下: header_size(4个字节) + header_str + args_str 这里定义了一个header的message,以便在反序列化直接获取下面三个字段
1 2 3 4 5 6 message RpcHeader { bytes service_name = 1; bytes method_name = 2; uint32 args_size = 3; }