通过muduo库函数实现protobuf通信协议
文章目录
- 实例
在muduo库的简单介绍中,实现的服务器和客户端是直接进行发送消息,并没有考虑序列化和粘包问题等。。但是在实际的网路通信中,由于TCP通信时面向字节流的,所以序列化和解决粘包问题是必须的。所以我们在通信前就需要自己定制协议,在muduo库中,陈硕大佬已经定制好了一套基于protobuf序列化的协议,我们只需要通过接口进行使用就可以了。
下面就是这个协议的规则:
因此我们主要介绍通过相关的类,能够把muduo库中提供的这一套内容使用起来就可以了。
- ProtobufCodec类
它是muduo库中对于protobuf协议的处理类,其内部实现了onMessage回调接口,对于接收到的数据进行基于protobuf协议的请求处理,然后将解析出的信息,存放到对应请求的protobuf请求类对象中,然后最终调用设置进去的消息处理回调函数进行对应请求的处理。 - ProtobufDispatcher类
这个类是非常重要的一个类,这是⼀个protobuf请求的分发处理类,在ProtobufCodec类对消息进行处理之后,会把反序列化好的消息通过这个类的分发器,根据请求类型选择对应的回调函数进行处理,它内部的onProtobufMessage接⼝就是给上边ProtobufCodec::messageCallback_设置的回调函数,相当于ProtobufCodec中onMessage接⼝会设置给服务器作为消息回调函数,其内部对于接收到的数据进行基于protobuf协议的解析,得到请求后,通过ProtobufDispatcher::onProtobufMessage接口进行请求分发处理,也就是确定当前请求应该⽤哪⼀个注册的业务函数进行处理。因此在创建服务器的时候我们还要给这个类中注册一系列收到消息之后的处理函数,来进行消息处理。
实例
那么接下来就通过一个例子来进行说明
首先定制我们的protobuf文件
syntax = "proto3";package proto;message TranslateRequest
{string mess = 1;
}message TranslateResponse
{string mess = 1;
}message AddRequest
{int32 num1 = 1;int32 num2 = 2;
}message AddResponse
{int32 result = 1;
}
然后我们需要从安装的muduo库的muduo/examples/protobuf/codec路径中参考server.cc和client.cc来编写我们的代码,这里我们需要把codec.cc codec.h dispatcher.h 拷贝到我们的项目路径中。除了这个以外我们还需要一个google-in.h,这个在别的路径下,也需要自己拷贝一下。
client.cc
#include "protof_muduo/codec.h"
#include "protof_muduo/dispatcher.h"#include <muduo/base/Logging.h>
#include <muduo/base/Mutex.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/TcpClient.h>
#include <muduo/base/CountDownLatch.h>
#include "translate_add.pb.h"#include <iostream>
#include <unistd.h>
#include <memory>const std::string ip = "127.0.0.1";
int port = 8888;class Client
{
public:typedef std::shared_ptr<proto::TranslateResponse> TranslateResponsePtr;typedef std::shared_ptr<proto::AddResponse> AddResponsePtr;Client(const std::string& ip, int port):_latch(1), _client(_loopthread.startLoop(), muduo::net::InetAddress(ip, port), "Client"),_dispatcher(std::bind(&Client::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)){// 注册回调函数_dispatcher.registerMessageCallback<proto::TranslateResponse>(std::bind(&Client::onTranslate, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<proto::AddResponse>(std::bind(&Client::onAdd, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(&Client::onConnection, this, std::placeholders::_1));}void Conncet(){_client.connect();// 需要等链接建立成功后再返回_latch.wait();}void Translate(const std::string& mess){// 1. 构建请求proto::TranslateRequest req;req.set_mess(mess);send(&req);}void Add(int a, int b){proto::AddRequest req;req.set_num1(a);req.set_num2(b);send(&req);}
private:bool send(const google::protobuf::Message* mess){// 连接没有断开在进行发送if(_con->connected()) {std::cout << "mess" << std::endl;if(mess) _codec.send(_con, (*mess)); return true;}return false;}void onConnection(const muduo::net::TcpConnectionPtr& con){if(con->connected()) {_con = con;_latch.countDown();std::cout << "连接成功" << std::endl;}else {std::cout << "连接断开" << std::endl;}}void onTranslate(const muduo::net::TcpConnectionPtr& con, const TranslateResponsePtr& message, muduo::Timestamp ts){std::cout << message->mess() << std::endl;}void onAdd(const muduo::net::TcpConnectionPtr& con, const AddResponsePtr& message, muduo::Timestamp ts){std::cout << message->result() << std::endl;}void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message,muduo::Timestamp ts){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}
private:muduo::CountDownLatch _latch;muduo::net::TcpConnectionPtr _con;muduo::net::EventLoopThread _loopthread;muduo::net::TcpClient _client;ProtobufDispatcher _dispatcher;ProtobufCodec _codec;
};int main()
{Client client(ip,port);client.Conncet(); client.Translate("type");client.Add(4,4);sleep(10);return 0;
}
server.cc
#include "protof_muduo/codec.h"
#include "protof_muduo/dispatcher.h"#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
#include "translate_add.pb.h"#include <iostream>
#include <unistd.h>
#include <memory>int port = 8888;class Server
{
public:typedef std::shared_ptr<proto::TranslateRequest> TranslateRequestPtr;typedef std::shared_ptr<proto::AddRequest> AddRequestPtr;Server(int port):server_(&_event, muduo::net::InetAddress("0.0.0.0", port), "server", muduo::net::TcpServer::kReusePort), dispatcher_(std::bind(&Server::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2,std::placeholders::_3)),codec_(std::bind(&ProtobufDispatcher::onProtobufMessage, &dispatcher_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)){// 注册回调函数dispatcher_.registerMessageCallback<proto::TranslateRequest>(std::bind(&Server::onTranslate, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));dispatcher_.registerMessageCallback<proto::AddRequest>(std::bind(&Server::onAdd, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));server_.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &codec_, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));server_.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));}void start(){server_.start();_event.loop();}
private:void onConnection(const muduo::net::TcpConnectionPtr& con){if(con->connected()) std::cout << "新连接到来" << std::endl;else std::cout << "一个连接关闭" << std::endl;}std::string Translate(const std::string& word){std::unordered_map<std::string, std::string> dict = {{"类型", "type"},{"type", "类型"},{"转移", "transfer"},{"transfer", "转移"}};auto it = dict.find(word);if(it == dict.end()) return "Unknow";return it->second;}void onTranslate(const muduo::net::TcpConnectionPtr& con, const TranslateRequestPtr& message, muduo::Timestamp ts){std::cout << "收到翻译请求 " << std::endl;// 1. 拿到消息std::string mess = message->mess();// 2. 处理拿到结果std::string res = Translate(mess);// 构建响应proto::TranslateResponse resp;resp.set_mess(res);// 发送会客户端codec_.send(con, resp);}void onAdd(const muduo::net::TcpConnectionPtr& con, const AddRequestPtr& message, muduo::Timestamp ts){std::cout << "收到加法请求 " << std::endl;// 1. 拿到消息int num1 = message->num1();int num2 = message->num2();// 2. 处理int res = num1 + num2;// 3. 构建响应proto::AddResponse resp;resp.set_result(res);// 4. 进行响应codec_.send(con, resp);}void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message,muduo::Timestamp ts){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}
private:muduo::net::EventLoop _event;muduo::net::TcpServer server_;ProtobufDispatcher dispatcher_;ProtobufCodec codec_;
};int main()
{Server server(port);server.start();return 0;
}
makefile
all:server client
server:server.cc protof_muduo/codec.cc translate_add.pb.ccg++ -o $@ $^ -std=c++11 -lpthread -lmuduo_net -lmuduo_base -lprotobuf -lz
client:client.cc protof_muduo/codec.cc translate_add.pb.ccg++ -o $@ $^ -std=c++11 -lpthread -lmuduo_net -lmuduo_base -lprotobuf -lz.PHONY:clean
clean:rm -rf server clent