RPC原理 即RPC字面意思,远程过程调用,跨机器调用方法,实际它的作用更大,常常用于网络通信中,例如分布式缓存,数据库等
网络通信以二进制传输数据,而protobuf序列化刚好生成二进制,既节约了空间,又实现网络通信条件,
与序列化协议的效率、性能、序列化协议后的体积相比,其通用性和兼容性的优先级会更高,因为他是会直接关系到服务调用的稳定性和可用率的
rpc一般用于高并发的场景,所以常结合IO多路复合,React模型来使用
零拷贝技术:sendifle,共享内存
simple mprpc 记录应用mprpc框架的一个聊天室实现
mprpc
protobuf使用: protc xxx.proto –cpp_out=./
所有的message都有一个父类Message 因此才有 所有的service都有一个父类Service
一个login rpc 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 syntax = "proto3" ; package fixbug;option cc_generic_services = true ;message ResultCode { int32 errcode = 1 ; bytes errmsg = 2 ; } message LoginRequest { bytes name = 1 ; bytes pwd = 2 ; } message LoginResponse { ResultCode result = 1 ; bool sucess = 2 ; } service UserServiceRpc { rpc Login(LoginRequest) returns (LoginResponse) ; }
首先生成UserServiceRpc类,这个类会实现这两个虚函数
1 2 3 4 5 6 7 8 virtual void Login (::PROTOBUF_NAMESPACE_ID::RpcController* controller, const ::fixbug::LoginRequest* request, ::fixbug::LoginResponse* response, ::google::protobuf::Closure* done) ;virtual void Register (::PROTOBUF_NAMESPACE_ID::RpcController* controller, const ::fixbug::RegisterRequest* request, ::fixbug::RegisterResponse* response, ::google::protobuf::Closure* done) ;
在user的main函数里调用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 MprpcApplication::Init (argc, argv); fixbug::UserServiceRpc_Stub stub (new MprpcChannel()) ;fixbug::LoginRequest request; request.set_name ("zhang san" ); request.set_pwd ("123456" ); fixbug::LoginResponse response; stub.Login (nullptr , &request, &response, nullptr ); if (0 == response.result ().errcode ()){ std::cout << "rpc login response success:" << response.sucess () << std::endl; } else { std::cout << "rpc login response error : " << response.result ().errmsg () << std::endl; }
mprpc框架包含user-stub/server-stub和RPC
UserServiceRpc_Stub继承UserServiceRpc,接收一个Channel, 在stub中调用Login()方法实际是调用channel里的CallMethod方法,
1 2 3 4 5 6 7 void UserServiceRpc_Stub::Login (::PROTOBUF_NAMESPACE_ID::RpcController* controller, const ::fixbug::LoginRequest* request, ::fixbug::LoginResponse* response, ::google::protobuf::Closure* done) { channel_->CallMethod (descriptor ()->method (0 ), controller, request, response, done); }
这个CallMethod方法接收一个descriptor的method(i)(描述一个服务对象的方法列表) 因此在user端,mprpc框架的内容就是实现自己的Channel类并覆盖CallMethod方法
在server的main函数调用 1 2 3 4 5 6 7 8 9 MprpcApplication::Init (argc, argv); RpcProvider provider; provider.NotifyService (new UserService ()); provider.Run ();
provider即mprpc框架在server端的实现,在这里框架内容:反序列并解析调用本地login方法,然后包装序列化,调用网络框架发送 首先将本地实现的service服务对象发布,以便在run中提供rpc远程调用服务 在provider提供onMessage给网络框架进行读写事件回调
server类实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class UserService : public fixbug::UserServiceRpc // 使用在rpc服务发布端(rpc服务提供者) { public: bool Login(std::string name, std::string pwd) { std::cout << "doing local service: Login" << std::endl; std::cout << "name:" << name << " pwd:" << pwd << std::endl; return false; } /* 重写基类UserServiceRpc的虚函数 下面这些方法都是框架直接调用的 1. caller ===> Login(LoginRequest) => muduo => callee 2. callee ===> Login(LoginRequest) => 交到下面重写的这个Login方法上了 */ void Login(::google::protobuf::RpcController* controller, const ::fixbug::LoginRequest* request, ::fixbug::LoginResponse* response, ::google::protobuf::Closure* done) { // 框架给业务上报了请求参数LoginRequest,应用获取相应数据做本地业务 std::string name = request->name(); std::string pwd = request->pwd(); // 做本地业务 bool login_result = Login(name, pwd); // 把响应写入 包括错误码、错误消息、返回值 fixbug::ResultCode *code = response->mutable_result(); code->set_errcode(0); code->set_errmsg(""); response->set_sucess(login_result); // 执行回调操作 执行响应对象数据的序列化和网络发送(都是由框架来完成的) done->Run(); } }
mprpc框架channel和provider实现 Channel代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 void MprpcChannel::CallMethod (const google::protobuf::MethodDescriptor* method, google::protobuf::RpcController* controller, const google::protobuf::Message* request, google::protobuf::Message* response, google::protobuf:: Closure* done) {const google::protobuf::ServiceDescriptor* sd = method->service ();std::string service_name = sd->name (); std::string method_name = method->name (); uint32_t args_size = 0 ;std::string args_str; if (request->SerializeToString (&args_str)){ args_size = args_str.size (); } else { controller->SetFailed ("serialize request error!" ); return ; } mprpc::RpcHeader rpcHeader; rpcHeader.set_service_name (service_name); rpcHeader.set_method_name (method_name); rpcHeader.set_args_size (args_size); uint32_t header_size = 0 ;std::string rpc_header_str; if (rpcHeader.SerializeToString (&rpc_header_str)){ header_size = rpc_header_str.size (); } else { controller->SetFailed ("serialize rpc header error!" ); return ; } std::string send_rpc_str; send_rpc_str.insert (0 , std::string ((char *)&header_size, 4 )); send_rpc_str += rpc_header_str; send_rpc_str += args_str; std::cout << "============================================" << std::endl; std::cout << "header_size: " << header_size << std::endl; std::cout << "rpc_header_str: " << rpc_header_str << std::endl; std::cout << "service_name: " << service_name << std::endl; std::cout << "method_name: " << method_name << std::endl; std::cout << "args_str: " << args_str << std::endl; std::cout << "============================================" << std::endl; int clientfd = socket (AF_INET, SOCK_STREAM, 0 );if (-1 == clientfd){ char errtxt[512 ] = {0 }; sprintf (errtxt, "create socket error! errno:%d" , errno); controller->SetFailed (errtxt); return ; } std::string ip = "127.0.0.1" ; uint16_t port = 8000 ;struct sockaddr_in server_addr;server_addr.sin_family = AF_INET; server_addr.sin_port = htons (port); server_addr.sin_addr.s_addr = inet_addr (ip.c_str ()); if (-1 == connect (clientfd, (struct sockaddr*)&server_addr, sizeof (server_addr))){ close (clientfd); char errtxt[512 ] = {0 }; sprintf (errtxt, "connect error! errno:%d" , errno); controller->SetFailed (errtxt); return ; } if (-1 == send (clientfd, send_rpc_str.c_str (), send_rpc_str.size (), 0 )){ close (clientfd); char errtxt[512 ] = {0 }; sprintf (errtxt, "send error! errno:%d" , errno); controller->SetFailed (errtxt); return ; } char recv_buf[1024 ] = {0 };int recv_size = 0 ;if (-1 == (recv_size = recv (clientfd, recv_buf, 1024 , 0 ))){ close (clientfd); char errtxt[512 ] = {0 }; sprintf (errtxt, "recv error! errno:%d" , errno); controller->SetFailed (errtxt); return ; } if (!response->ParseFromArray (recv_buf, recv_size)){ close (clientfd); char errtxt[512 ] = {0 }; sprintf (errtxt, "parse error! response_str:%s" , recv_buf); controller->SetFailed (errtxt); return ; } close (clientfd);}
OnMesaage代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 void RpcProvider::OnMessage (const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buffer, muduo::Timestamp) {std::string recv_buf = buffer->retrieveAllAsString (); uint32_t header_size = 0 ;recv_buf.copy ((char *)&header_size, 4 , 0 ); std::string rpc_header_str = recv_buf.substr (4 , header_size); mprpc::RpcHeader rpcHeader; std::string service_name; std::string method_name; uint32_t args_size;if (rpcHeader.ParseFromString (rpc_header_str)){ service_name = rpcHeader.service_name (); method_name = rpcHeader.method_name (); args_size = rpcHeader.args_size (); } else { std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl; return ; } std::string args_str = recv_buf.substr (4 + header_size, args_size); std::cout << "============================================" << std::endl; std::cout << "header_size: " << header_size << std::endl; std::cout << "rpc_header_str: " << rpc_header_str << std::endl; std::cout << "service_name: " << service_name << std::endl; std::cout << "method_name: " << method_name << std::endl; std::cout << "args_str: " << args_str << std::endl; std::cout << "============================================" << std::endl; auto it = m_serviceMap.find (service_name);if (it == m_serviceMap.end ()){ std::cout << service_name << " is not exist!" << std::endl; return ; } auto mit = it->second.m_methodMap.find (method_name);if (mit == it->second.m_methodMap.end ()){ std::cout << service_name << ":" << method_name << " is not exist!" << std::endl; return ; } google::protobuf::Service *service = it->second.m_service; const google::protobuf::MethodDescriptor *method = mit->second; google::protobuf::Message *request = service->GetRequestPrototype (method).New (); if (!request->ParseFromString (args_str)){ std::cout << "request parse error, content:" << args_str << std::endl; return ; } google::protobuf::Message *response = service->GetResponsePrototype (method).New (); google::protobuf::Closure *done = google::protobuf::NewCallback <RpcProvider, const muduo::net::TcpConnectionPtr&, google::protobuf::Message*> (this , &RpcProvider::SendRpcResponse, conn, response); service->CallMethod (method, nullptr , request, response, done); }
解析request,然后依然是通过callmethod调用,只不过在user端已经有stub,而在server端则是自己实现一个stub,然后再Closure里面调用SendRpcResponse,
1 2 3 4 5 std::string response_str; if (response->SerializeToString (&response_str)) conn->send (response_str); conn->shutdown ();
额外补充 为了防止粘包问题,每个包的结构应该如下: header_size(4个字节) + header_str + args_str 这里定义了一个header的message,以便在反序列化直接获取下面三个字段
1 2 3 4 5 6 message RpcHeader { bytes service_name = 1; bytes method_name = 2; uint32 args_size = 3; }