RPC原理

即RPC字面意思,远程过程调用,跨机器调用方法,实际它的作用更大,常常用于网络通信中,例如分布式缓存,数据库等

网络通信以二进制传输数据,而protobuf序列化刚好生成二进制,既节约了空间,又实现网络通信条件,

与序列化协议的效率、性能、序列化协议后的体积相比,其通用性和兼容性的优先级会更高,因为他是会直接关系到服务调用的稳定性和可用率的

rpc一般用于高并发的场景,所以常结合IO多路复合,React模型来使用

零拷贝技术:sendifle,共享内存

simple mprpc

记录应用mprpc框架的一个聊天室实现

mprpc

protobuf使用:
protc xxx.proto –cpp_out=./

所有的message都有一个父类Message
因此才有
所有的service都有一个父类Service

一个login rpc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
syntax = "proto3";

package fixbug;

option cc_generic_services = true;

message ResultCode
{
int32 errcode = 1;
bytes errmsg = 2;
}

message LoginRequest
{
bytes name = 1;
bytes pwd = 2;
}

message LoginResponse
{
ResultCode result = 1;
bool sucess = 2;
}

service UserServiceRpc
{
rpc Login(LoginRequest) returns(LoginResponse);
}

首先生成UserServiceRpc类,这个类会实现这两个虚函数

1
2
3
4
5
6
7
8
virtual void Login(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::fixbug::LoginRequest* request,
::fixbug::LoginResponse* response,
::google::protobuf::Closure* done);
virtual void Register(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::fixbug::RegisterRequest* request,
::fixbug::RegisterResponse* response,
::google::protobuf::Closure* done);

在user的main函数里调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 整个程序启动以后,想使用mprpc框架来享受rpc服务调用,一定需要先调用框架的初始化函数(只初始化一次)
MprpcApplication::Init(argc, argv);

// 演示调用远程发布的rpc方法Login
fixbug::UserServiceRpc_Stub stub(new MprpcChannel());
// rpc方法的请求参数
fixbug::LoginRequest request;
request.set_name("zhang san");
request.set_pwd("123456");
// rpc方法的响应
fixbug::LoginResponse response;
// 发起rpc方法的调用 同步的rpc调用过程 MprpcChannel::callmethod
stub.Login(nullptr, &request, &response, nullptr); // RpcChannel->RpcChannel::callMethod 集中来做所有rpc方法调用的参数序列化和网络发送

// 一次rpc调用完成,读调用的结果
if (0 == response.result().errcode())
{
std::cout << "rpc login response success:" << response.sucess() << std::endl;
}
else
{
std::cout << "rpc login response error : " << response.result().errmsg() << std::endl;
}

mprpc框架包含user-stub/server-stub和RPC

UserServiceRpc_Stub继承UserServiceRpc,接收一个Channel,
在stub中调用Login()方法实际是调用channel里的CallMethod方法,

1
2
3
4
5
6
7
void UserServiceRpc_Stub::Login(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::fixbug::LoginRequest* request,
::fixbug::LoginResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}

这个CallMethod方法接收一个descriptor的method(i)(描述一个服务对象的方法列表)
因此在user端,mprpc框架的内容就是实现自己的Channel类并覆盖CallMethod方法

在server的main函数调用

1
2
3
4
5
6
7
8
9
// 调用框架的初始化操作
MprpcApplication::Init(argc, argv);

// provider是一个rpc网络服务对象。把UserService对象发布到rpc节点上
RpcProvider provider;
provider.NotifyService(new UserService());

// 启动一个rpc服务发布节点 Run以后,进程进入阻塞状态,等待远程的rpc调用请求
provider.Run();

provider即mprpc框架在server端的实现,在这里框架内容:反序列并解析调用本地login方法,然后包装序列化,调用网络框架发送
首先将本地实现的service服务对象发布,以便在run中提供rpc远程调用服务
在provider提供onMessage给网络框架进行读写事件回调

server类实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class UserService : public fixbug::UserServiceRpc // 使用在rpc服务发布端(rpc服务提供者)
{
public:
bool Login(std::string name, std::string pwd)
{
std::cout << "doing local service: Login" << std::endl;
std::cout << "name:" << name << " pwd:" << pwd << std::endl;
return false;
}

/*
重写基类UserServiceRpc的虚函数 下面这些方法都是框架直接调用的
1. caller ===> Login(LoginRequest) => muduo => callee
2. callee ===> Login(LoginRequest) => 交到下面重写的这个Login方法上了
*/
void Login(::google::protobuf::RpcController* controller,
const ::fixbug::LoginRequest* request,
::fixbug::LoginResponse* response,
::google::protobuf::Closure* done)
{
// 框架给业务上报了请求参数LoginRequest,应用获取相应数据做本地业务
std::string name = request->name();
std::string pwd = request->pwd();

// 做本地业务
bool login_result = Login(name, pwd);

// 把响应写入 包括错误码、错误消息、返回值
fixbug::ResultCode *code = response->mutable_result();
code->set_errcode(0);
code->set_errmsg("");
response->set_sucess(login_result);

// 执行回调操作 执行响应对象数据的序列化和网络发送(都是由框架来完成的)
done->Run();
}
}

mprpc框架channel和provider实现

Channel代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
header_size + service_name method_name args_size + args
*/
// 所有通过stub代理对象调用的rpc方法,都走到这里了,统一做rpc方法调用的数据数据序列化和网络发送
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf:: Closure* done)
{
const google::protobuf::ServiceDescriptor* sd = method->service();
std::string service_name = sd->name(); // service_name
std::string method_name = method->name(); // method_name

// 获取参数的序列化字符串长度 args_size
uint32_t args_size = 0;
std::string args_str;
if (request->SerializeToString(&args_str))
{
args_size = args_str.size();
}
else
{
controller->SetFailed("serialize request error!");
return;
}

// 定义rpc的请求header
mprpc::RpcHeader rpcHeader;
rpcHeader.set_service_name(service_name);
rpcHeader.set_method_name(method_name);
rpcHeader.set_args_size(args_size);

uint32_t header_size = 0;
std::string rpc_header_str;
if (rpcHeader.SerializeToString(&rpc_header_str))
{
header_size = rpc_header_str.size();
}
else
{
controller->SetFailed("serialize rpc header error!");
return;
}

// 组织待发送的rpc请求的字符串
std::string send_rpc_str;
send_rpc_str.insert(0, std::string((char*)&header_size, 4)); // header_size
send_rpc_str += rpc_header_str; // rpcheader
send_rpc_str += args_str; // args

// 打印调试信息
std::cout << "============================================" << std::endl;
std::cout << "header_size: " << header_size << std::endl;
std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
std::cout << "service_name: " << service_name << std::endl;
std::cout << "method_name: " << method_name << std::endl;
std::cout << "args_str: " << args_str << std::endl;
std::cout << "============================================" << std::endl;

// 使用tcp编程,完成rpc方法的远程调用
int clientfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == clientfd)
{
char errtxt[512] = {0};
sprintf(errtxt, "create socket error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}

// 读取配置文件rpcserver的信息
std::string ip = "127.0.0.1";
uint16_t port = 8000;

struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = inet_addr(ip.c_str());

// 连接rpc服务节点
if (-1 == connect(clientfd, (struct sockaddr*)&server_addr, sizeof(server_addr)))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "connect error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}

// 发送rpc请求
if (-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "send error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}

// 接收rpc请求的响应值
char recv_buf[1024] = {0};
int recv_size = 0;
if (-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "recv error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}

// 反序列化rpc调用的响应数据
// std::string response_str(recv_buf, 0, recv_size); // bug出现问题,recv_buf中遇到\0后面的数据就存不下来了,导致反序列化失败
// if (!response->ParseFromString(response_str))
if (!response->ParseFromArray(recv_buf, recv_size))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "parse error! response_str:%s", recv_buf);
controller->SetFailed(errtxt);
return;
}

close(clientfd);
}

OnMesaage代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// 已建立连接用户的读写事件回调 如果远程有一个rpc服务的调用请求,那么OnMessage方法就会响应
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buffer,
muduo::Timestamp)
{
// 网络上接收的远程rpc调用请求的字符流 Login args
std::string recv_buf = buffer->retrieveAllAsString();

// 从字符流中读取前4个字节的内容
uint32_t header_size = 0;
recv_buf.copy((char*)&header_size, 4, 0);

// 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
std::string rpc_header_str = recv_buf.substr(4, header_size);
mprpc::RpcHeader rpcHeader;
std::string service_name;
std::string method_name;
uint32_t args_size;
if (rpcHeader.ParseFromString(rpc_header_str))
{
// 数据头反序列化成功
service_name = rpcHeader.service_name();
method_name = rpcHeader.method_name();
args_size = rpcHeader.args_size();
}
else
{
// 数据头反序列化失败
std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl;
return;
}

// 获取rpc方法参数的字符流数据
std::string args_str = recv_buf.substr(4 + header_size, args_size);

// 打印调试信息
std::cout << "============================================" << std::endl;
std::cout << "header_size: " << header_size << std::endl;
std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
std::cout << "service_name: " << service_name << std::endl;
std::cout << "method_name: " << method_name << std::endl;
std::cout << "args_str: " << args_str << std::endl;
std::cout << "============================================" << std::endl;

// 获取service对象和method对象
auto it = m_serviceMap.find(service_name);
if (it == m_serviceMap.end())
{
std::cout << service_name << " is not exist!" << std::endl;
return;
}

auto mit = it->second.m_methodMap.find(method_name);
if (mit == it->second.m_methodMap.end())
{
std::cout << service_name << ":" << method_name << " is not exist!" << std::endl;
return;
}

google::protobuf::Service *service = it->second.m_service; // 获取service对象 new UserService
const google::protobuf::MethodDescriptor *method = mit->second; // 获取method对象 Login

// 生成rpc方法调用的请求request和响应response参数
google::protobuf::Message *request = service->GetRequestPrototype(method).New();
if (!request->ParseFromString(args_str))
{
std::cout << "request parse error, content:" << args_str << std::endl;
return;
}
google::protobuf::Message *response = service->GetResponsePrototype(method).New();

// 给下面的method方法的调用,绑定一个Closure的回调函数
google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,
const muduo::net::TcpConnectionPtr&,
google::protobuf::Message*>
(this,
&RpcProvider::SendRpcResponse,
conn, response);

// 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
// new UserService().Login(controller, request, response, done)
service->CallMethod(method, nullptr, request, response, done);
}

解析request,然后依然是通过callmethod调用,只不过在user端已经有stub,而在server端则是自己实现一个stub,然后再Closure里面调用SendRpcResponse,

1
2
3
4
5
std::string response_str;
if (response->SerializeToString(&response_str)) // response进行序列化
// 序列化成功后,通过网络把rpc方法执行的结果发送会rpc的调用方
conn->send(response_str);
conn->shutdown(); // 模拟http的短链接服务,由rpcprovider主动断开连接

额外补充

为了防止粘包问题,每个包的结构应该如下:
header_size(4个字节) + header_str + args_str
这里定义了一个header的message,以便在反序列化直接获取下面三个字段

1
2
3
4
5
6
message RpcHeader
{
bytes service_name = 1;
bytes method_name = 2;
uint32 args_size = 3;
}