仿RabitMQ 模拟实现消息队列项目开发文档1(个人项目)
要制作这个项目首先就要理解什么是消息队列,在之前我们学习过阻塞队列阻塞队列的部分优点如下:
这个模型在实际开发过程中是具有很好的优点的,由此也就存在了在不同主机之间的生产消费者模型。
消息队列也就是将这个阻塞队列封装成一个独立的服务器程序,此时其他的主机就可以将消息发布到这个消息队列的主机上,让消费者主机直接从这个消息队列主机中获取消息,生产者主机箱消息队列中存放消息。
所以要实现的这个消息队列也就是一个简单的跨主机的用于生产消费模型当中所使用的服务器,能够接受发布客户端的消息,缓冲起来最后再交给消费客户端进行处理。
开发环境:
技术选型:
环境搭建
首先需要安装wget工具
在ubuntu系统下,首先需要使用下面的指令更新软件源:
sudo apt-get update
然后使用sudo apt-get install wget进行软件的安装,如果使用的是center os的系统那么需要更新一下软件源。使用这个wget就能够完成。这个工具的作用是能够从网络上自动下载文件。
基础工具安装
安装lrzsz
需要安装一个lrzsz工具,这个工具的主要作用是通过终端(串行线、Telnet或SSH会话)在本地系统和远程系统之间进行文件传输,特别适用于与远程服务器进行文件交互。不同系统使用不同的指令安装即可。
安装高版本gcc/g++(7以上)
然后是高版本的gcc工具的安装,在center os系统中默认安装的gcc版本为4.x,是不符合这个项目要求,需要安装一个gcc 7.x版本,这里我安装的版本是gcc 11版本,在center os中通过devtoolset工具来安装高版本的gcc工具(前提是你配置的软件源中具有了这个高版本的gcc的下载链接)
下面的这个指令是center os上的
在center os中使用了上面的指令之后,并不会直接安装完成,而是会在下面的文件路径下存在一个文件包
这个文件包就是高版本的gcc,此时需要让os去这个文件路径下搜索这个gcc编译器,此时就需要加载一下enable文件了:
此时就能够显示出新的gcc编译器,但是这个加载在我新开一个终端的时候这个gcc编译器又没有了,此时就需要将上面的这个指令写入到打开终端后会自动运行的配置文件中去。
如果是在ubuntu中,在终端启动的时候会加载下面几个关键文件:
-
/etc/profile
:这个文件为系统的每个用户设置环境信息,当用户第一次登录时,该文件被执行。它还会从/etc/profile.d
目录的配置文件中搜集shell的设置。此文件默认调用/etc/bash.bashrc
文件。 -
/etc/bash.bashrc
:这个文件为每一个运行bash shell的用户执行。当bash shell被打开时,该文件被读取。它包含了系统级别的bash配置信息。 -
~/.bash_profile
、~/.bash_login
、~/.profile
:这些文件属于用户级别的配置文件。用户登录时,系统会按照顺序查找这些文件并执行第一个找到的文件(如果多个文件都存在)。这些文件允许用户设置专用于自己使用的shell信息,如环境变量等。需要注意的是,默认情况下,~/.bash_profile
会调用~/.bashrc
文件。 -
~/.bashrc
:这个文件包含专用于用户的bash shell的bash信息。当bash shell被打开时(无论是通过终端还是其他方式),该文件都会被读取并执行。因此,它是设置终端自动运行命令和配置终端环境的理想位置。
但是在ubuntu中不需要这么麻烦,在Ubuntu上安装了新版本的gcc(如gcc-11)后,通常不需要像在某些其他系统(如CentOS)中那样,将source enable指令直接写入配置文件中来让新开的终端也能识别到这个新的gcc。Ubuntu的APT包管理器在安装软件包时,会处理好大部分的环境变量和链接问题。
不过,如果你想要确保新开的终端能够默认使用新安装的gcc-11版本,或者系统中安装了多个gcc版本并需要切换默认版本,你可以采取以下几种方法:
1. 更新替代配置(update-alternatives)
Ubuntu提供了update-alternatives
命令来管理系统上多个命令的替代版本。如果系统中安装了多个gcc版本,你可以使用update-alternatives
来设置默认的gcc版本。以下是一个基本的操作流程:
- 更新替代配置列表: 首先,你需要更新替代配置列表,以确保
update-alternatives
知道有哪些gcc版本可供选择。这通常在你安装新版本的gcc时自动完成,但如果没有,你可以手动添加:
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-11 110 --slave /usr/bin/g++ g++ /usr/bin/g++-11
- 这里的
110
是优先级,数字越大优先级越高。 - 配置默认的gcc版本: 然后,你可以使用
update-alternatives
来配置默认的gcc版本:
sudo update-alternatives --config gcc
- 执行此命令后,系统会列出所有可用的gcc版本,并允许你选择一个作为默认版本。
2. 直接创建符号链接
另一种方法是直接为/usr/bin/gcc
和/usr/bin/g++
创建指向新版本gcc的符号链接。但请注意,这种方法可能会覆盖系统默认的gcc和g++命令,因此在执行之前请确保你了解这样做的后果:
sudo ln -s /usr/bin/gcc-11 /usr/bin/gcc
sudo ln -s /usr/bin/g++-11 /usr/bin/g++
这种方法简单直接,但如果你需要频繁切换gcc版本,或者担心影响到系统其他部分的稳定性,那么使用update-alternatives
可能是更好的选择。
3. 修改环境变量
虽然不是设置默认gcc版本的直接方法,但了解如何通过修改环境变量来指定gcc版本也是有用的。你可以在用户的shell配置文件中(如~/.bashrc
或~/.profile
)添加以下行来指定PATH中gcc的版本:
export PATH=/usr/bin/gcc-11:$PATH
但请注意,这种方法可能不会按预期工作,因为/usr/bin/gcc-11
不是一个包含gcc可执行文件的目录,而是一个指向gcc-11可执行文件的符号链接。正确的做法是在需要时直接使用gcc-11
命令来调用特定版本的gcc。
综上所述,推荐使用update-alternatives
来管理系统上多个gcc版本的默认设置。这样既可以方便地切换默认版本,又不会对系统其他部分造成不必要的干扰。
安装git
使用指令安装即可,因为之后需要从github上下载第三方库。
下一个工具是make一般是具有了的,没有安装即可。
安装cmake
不同的系统下安装即可,ubuntu上使用:
apt-get install cmake
对版本没有要求。
第三方库的安装
protobuf的安装
安装这个第三库之前需要先将这个库的依赖库安装完成。
需要的是下面的几个库:
其中画了蓝色线条的是已经具有了的库。
使用的指令:
这些库的依赖关系由apt-get已经解决了,然后使用下面的指令去github上下载protobuf库:
这里选择了使用github将这个压缩包安装到本地之后,我再上传到服务器上:
也可以使用wget指令
wget https://github.com/protocolbuffers/protobuf/releases/download/v3.20.0/protobuf-all-3.20.0.tar.gz
但是很慢,我更推荐在本地下载好之后再上传到服务器上。
之后就能够得到一个下面这样的压缩包:
然后就是对这个压缩包进行解压了。
然后进入这个已经压缩的文件夹中:
然后执行这个文件夹下这个库的autogen.sh脚本。配置安装环境
然后执行configure
这个configure就是用来检测安装环境的,如果各种环境都具有了那么就会生成makefile文件:
然后开始make进行编译
编译很慢,在编译完成之后适应sudo make install进行安装。
然后在下面的路径下就可以看到这个库相关的头文件:
之后运行下面的指令如果能够运行代表可行了:
出现了上面的运行结果代表安装成功了
mudo库的安装
这个mudo库是陈硕大佬写的一个TCP服务器
可以使用wger或者git将这个库下载到本地。
Git clone方法:
git clone https://github.com/chenshuo/muduo.git
wget方法:
wget https://gitee.com/hansionz/mq/raw/master/resource/muduo master.zip
使用wget下载下来的mudo包是一个zip包,所以需要使用unzip来进行解压
为了让安装不失败,这里还需要安装这个库的依赖环境:
gcc-c++ cmake make zlib zlib-devel boost-devel
上面的这些就是mudo库的安装需要的环境,其中的gcc cmake 和make我这里已经有了需要安装剩下的部分
在center os的环境下使用上面的名字就能够完成安装,但是在ubuntu的环境下需要使用下面的指令来进行安装:
sudo apt install zlib1g zlib1g-dev
sudo apt install libboost-all-dev
然后就可以去进行mudo库的安装了。
使用mudo库中的脚本来进行安装:
也就是对这个代码进行编译,需要编译几分钟
这样就编译完成了,然后执行下面的指令进行安装:
./build.sh install
安装的文件就在当前目录的上一个目录中:
可以看到出现了一个build文件夹:
这个文件夹中还有两个文件夹,这两个文件夹中的内容就是lib和include文件夹了,也就是头文件和动态库了,这些库文件并没有放到系统的目录下面,因为这个库是一个静态库也就没有必要放到系统的目录下面了。
在release-cpp11的bin目录中就有一些样例程序:
运行下面这个程序
这里如果没有将protobuf库放到系统的搜索路径下是会报出无法找到protobuf库的错误的,此时就需要去下面的这个conf文件中加上protobuf库的路径:
这conf文件需要创建在下面这个路径下:(如果存在这个文件就不需要进行创建了,直接将路径写下即可)
在使用两个会话分别去启动protobuf_client和protobuf_server程序就能够在client这边看到下面的信息:
代表这一次客户端和服务端进行了一次通信。到这里mudo库就安装成功了。build这个文件夹不能删除到时候需要使用了,要将build中的内容拷贝到工作目录下
SQLite3的安装
在ubuntu的环境下:首先使用下面的指令进行索引更新:
sudo apt update
执行安装程序
sudo apt install libsqlite3-dev
sudo apt install sqlite3
安装Gtest
首先需要安装,dnf这个软件,如果你的软件源中没有这个软件需要进行软件源的更换,我这里的这个软件源中有这个软件所以我直接进行了安装。
sudo apt install dnf
这个dnf其实也是一个下载安装器,通过dnf来安装下面的东西:
sudo dnf install dnf-plugins-core
sudo dnf install gtest gtest-devel
以上步骤只适用于center os环境,在ubuntu的环境下并不支持dnf所以不能使用上面的方法,这里我选择了另外一种方法去安装gtest库。方法如下:
1.下载Google Test源码: 使用git
从GitHub上克隆Google Test的仓库:
git clone https://github.com/google/googletest.git
2.创建并进入build目录: 在Google Test的源码目录中创建一个名为build
的目录,并进入该目录。
cd googletest
mkdir build
cd build
3.使用cmake编译: 确保已经安装了cmake
和g++
工具。然后,在build
目录中运行cmake
命令来配置构建环境
cmake ..
4.编译和安装: 使用make
命令编译Google Test,并使用sudo make install
命令将其安装到系统中:
make
sudo make install
运行了上面的两个之后会出现下面的内容:
代表这个库已经被安装到了系统的搜索路径下了,如果之后再使用的时候出现了动态库没有找到的问题,就需要去下面的路径下创建conf文件
文件内容就是gtest的lib库的路径
为了测试gtest是否安装成功就需要写测试代码了:
#include <gtest/gtest.h>
int add(int a, int b)
{return a + b;
}
TEST(testCase, test1)
{EXPECT_EQ(add(2, 3), 5);//这个断言来自于gtest用于判断两个数是否相等
}
int main(int argc, char **argv)
{testing::InitGoogleTest(&argc, argv);return RUN_ALL_TESTS();
}
然后去编译运行这个文件
g++ test.cpp -lgtest//最后的-l说明使用了哪个第三方库
出现下面的运行就代表成功安装了gtest
上面的意思就是对于test1测试,我想要的函数值为5,最后运行得到的值也是5符合期望所以运行通过了。
到这里所需要的所有的第三方库/工具就全部安装完成了。
第三方库介绍
protobuf库
ProtoBuf(全称Protocol Buffer)是数据结构序列化和反序列化框架,它具有以下特点: • 语⾔⽆关、平台⽆关:即 ProtoBuf ⽀持 Java、C++、Python 等多种语⾔,⽀持多个平台 • ⾼效:即⽐ XML 更⼩、更快、更为简单 • 扩展性、兼容性好:你可以更新数据结构,⽽不影响和破坏原有的旧程序,这个库是由谷歌完成的一个库,最后不破坏原有的旧程序可以理解为,这个库和原有的旧程序互相之间的独立性是比较高的。
既然这个库的作用是完成序列化和反序列化,那么这个protobuf库就需要将要进行网络通信的信息完成结构化重组,将消息传递到另外一端的时候还要让另外一端也要完成将结构化的信息重新变成原来的信息,以完成信息的传递。
下图就是这个库的工作原理:
需要完成序列化的文件会被这个库编译成为.proto文件,将这个文件传递到另外一端,另外一端在使用这个库将.proto文件恢复成原来的形式,也就完成了序列化和反序列化的工作。
简要步骤:
编写 .proto ⽂件,⽬的是为了定义结构对象(message)及属性内容
使⽤ protoc 编译器编译 .proto ⽂件,⽣成⼀系列接⼝代码,存放在新⽣成头⽂件和源⽂件中 依赖⽣成的接⼝,将编译⽣成的头⽂件包含进我们的代码中,实现对 .proto ⽂件中定义的字段进⾏ 设置和获取,和对 message 对象进⾏序列化和反序列化。
总结一下:
下面是.proto文件的语法规范
.proto⽂件命名规范
创建 .proto ⽂件时,⽂件命名应该使⽤全⼩写字⺟命名,多个字⺟之间⽤ _ 连接。 例如:
lower_snake_case.proto
书写 .proto ⽂件代码时,应使⽤ 2 个空格的缩进
例如我们为通讯录 demo 新建⽂件: contacts.proto
添加注释
向⽂件添加注释,可使⽤ // 或者 /* ... */
下面就来完成一个简单的proto文件,在写这个文件之间还有一个事情需要注意:
Protocol Buffers 语⾔版本3,简称 proto3,是 .proto ⽂件最新的语法版本。proto3 简化了 Protocol Buffers 语⾔,既易于使⽤,⼜可以在更⼴泛的编程语⾔中使⽤。它允许你使⽤ Java,C++,Python等多种语⾔⽣成 protocol buffer 代码。在 .proto ⽂件中,要使⽤ syntax = "proto3"; 来指定⽂件语法为 proto3,并且必须写在除去注释内容的第⼀⾏。 如果没有指定,编译器会使⽤proto2语法。
所以需要在这个文件的头部加上这么一行:
syntax = "proto3";//指定语法版本为3版本
在proto3中也具有和c++一样的命名空间的语法:
package 是⼀个可选的声明符,能表⽰ .proto ⽂件的命名空间,在项⽬中要有唯⼀性。它的作⽤是为 了避免我们定义的消息出现冲突。
在这个简单的通讯录的demo中我就可以使用下面这样的语法来创建一个命名空间
syntax = "proto3";//指定语法版本
package contacts;//声明命名空间(可选的,但是一般都是有的)
下面要做的就是定义结构对象描述,需要使用的关键字为message
消息(message): 要定义的结构化对象,我们可以给这个结构化对象中定义其对应的属性内容。在⽹ 络传输中,我们需要为传输双⽅定制协议。定制协议说⽩了就是定义结构体或者结构化数据,⽐如, tcp,udp 报⽂就是结构化的。再⽐如将数据持久化存储到数据库时,会将⼀系列元数据统⼀⽤对象组 织起来,再进⾏存储。ProtoBuf 就是以 message 的⽅式来⽀持我们定制协议字段,后期帮助我们形成类和⽅法来使⽤。
下面是使用方法:
message 消息类型名{
}
消息类型命名规范:使⽤驼峰命名法,⾸字⺟⼤写。
下面要做的就是要在这个{}之间定义消息字段了。
在 message 中我们可以定义其属性字段,字段定义格式为:字段类型 字段名 = 字段唯⼀编号;
• 字段名称命名规范:全⼩写字⺟,多个字⺟之间⽤ _ 连接。
• 字段类型分为:标量数据类型 和 特殊类型(包括枚举、其他消息类型等)。
• 字段唯⼀编号:⽤来标识字段,⼀旦开始使⽤就不能够再改变。
下图是数据类型:
下面就来定义我的这个通信录的demo的消息字段:
这样proto文件就完成了。下面要做的就是编译这个文件了,编译使用的命令格式:
protoc [--proto_path=IMPORT_PATH] --cpp_out=DST_DIR path/to/file.proto
protoc 是 Protocol Buffer 提供的命令⾏编译⼯具。 --proto_path 指定 被编译的.proto⽂件所在⽬录,可多次指定。可简写成 -I
IMPORT_PATH 。如不指定该参数,则在当前⽬录进⾏搜索。
当某个.proto ⽂件 import 其他 .proto ⽂件时,
或需要编译的 .proto ⽂件不在当前⽬录下,这时就要⽤-I来指定搜索⽬录。
--cpp_out= 指编译后的⽂件为 C++ ⽂件。
OUT_DIR 编译后⽣成⽂件的⽬标路径。
path/to/file.proto 要编译的.proto⽂件。
因为我想将生成的文件就放在当前的工作目录下,所以使用下面的指令就可以了:
protoc --cpp_out =. contacts.proto
这样就生成好了。在这个.cc文件中使用的命名空间就是刚刚在.proto文件中使用的这个命名空间:
在生成的这些代码中还具有很多其它的接口这些接口在之后使用的时候再去做介绍。
最后再整体总结一下:
下面来介绍一下生成的文件中的其它接口,之后再使用这些接口来完成序列化和反序列化的工作。
在生成的.pb.h文件中是具有一个和我的结构体名字一样的类的:
这个类是继承于Message这个类的:
Message这个类又是继承于MessageLite这个类的,为什么要介绍这个类呢?
因为在MessageLite这个类中就提供了序列化和反序列化的接口:
下面就是常用的接口
class MessageLite {
public: //序列化: bool SerializeToOstream(ostream* output) const; // 将序列化后数据写⼊⽂件流 bool SerializeToArray(void *data, int size) const; bool SerializeToString(string* output) const; //反序列化: bool ParseFromIstream(istream* input); // 从流中读取数据,再进⾏反序列化动作 bool ParseFromArray(const void* data, int size); bool ParseFromString(const string& data);
};
上面的这些接口都是提供给外部使用的,至于这个函数的实现则是通过另外一个虚函数实现了的。
下面就可以使用上面的文件来实现序列化和反序列化的操作了。下面来演示一下:
创建一个新的代码文件:
编译运行这个文件:
最后果然出现了我想要的答案,这就是最基本的protobuf库的最简单的使用方式。需要掌握的就是如何编写proto文件,以及常用的接口。
mudo库
Muduo由陈硕⼤佬开发,是⼀个基于⾮阻塞IO和事件驱动的C++⾼并发TCP⽹络编程库。 它是⼀款基于主从Reactor模型的⽹络库,其使⽤的线程模型是one loop per thread, 所谓one loop perthread 指的是:
• ⼀个线程只能有⼀个事件循环(EventLoop), ⽤于响应计时器和IO事件
• ⼀个⽂件描述符只能由⼀个线程进⾏读写,换句话说就是⼀个TCP连接必须归属于某个EventLoop管理
Reactor就是一种时间触发模式,主从Reactor也就是一种主从事件触发模型,主线程中存在一个主Reactor用于事件触发,其它的普通线程具有普通的事件触发。在主Reactor主要用于监控新建连接,以确保整个服务能够比较高效的获取到新建连接,获取了新建连接之后再将这个连接交给子Reactor这些子Reactor会对这些连接进行IO监控,也就是说主Reactor只监控连接,而子Reactor则监控这些连接的IO请求。在子Reactor中如果某一个连接的IO条件满足了就会进行事件的触发,一个子Reactor能够监控的连接是很多的。通过这个模型来实现高并发服务器的框架。当然以上的介绍只是一个简单的介绍。原理这里就不介绍了,这里主要是需要使用起来。
主要需要认识到几个类,只要这几个类认识了,那么就能够完成服务器的搭建了。
第一个类TcpServer
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void(const TcpConnectionPtr &)> ConnectionCallback;
typedef std::function<void(const TcpConnectionPtr &,Buffer *,Timestamp)> MessageCallback;
class InetAddress : public muduo::copyable
{
public:InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
};
class TcpServer : noncopyable
{public:enum Option{kNoReusePort,kReusePort,};TcpServer(EventLoop *loop,const InetAddress &listenAddr,const string &nameArg,Option option = kNoReusePort);void setThreadNum(int numThreads);void start();/// 当⼀个新连接建⽴成功的时候被调⽤void setConnectionCallback(const ConnectionCallback &cb){connectionCallback_ = cb;}/// 消息的业务处理回调函数---这是收到新连接消息的时候被调⽤的函数void setMessageCallback(const MessageCallback &cb){messageCallback_ = cb;}
};
这个类的构造函数第一个参数需要我们传入一个EventLoop类,这个类会在下面进行介绍,第二个接口也是一个类,只不过这个类的作用在上面已经可以看到了,主要是绑定这个服务器需要监听的ip和端口(port)是什么,下一个参数就是这个服务器的名字,最后一个参数就是一个参数表示是否启动端口重用的功能的参数。所谓的端口重用也就是在TCP中某一个链接如果主动断开了连接,那么这个链接就会进入到time_wait状态,此时这个端口对应的端口是不能被使用的,而如果启动了端口重用功能,那么处于time_wait的端口也可以被使用,这就是构造函数。
下一个函数setThreadNum,要认识这个接口就要明白一些知识:
两个Reactor负责不同的事情,但是一个执行流中只能做一件事情,怎么会有这么多的Reactor呢?所以主从Reactor一定是多执行流的并发
而一个主Reactor下面就可以具有多个从属Reactor(分发的时候采用负载均衡的方式进行分发),而setThreadNum函数就是用来来创建具有多少个从属Reactor的函数,而线程并不是越多越好的,所以根据实际的情况来设定从属Reactor的个数。start函数不用多说,最后还有两个接口:setConnectionCallback和setMessageCallback,这两个函数很重要,这两个函数是由用户我自己写的函数,setConnectionCallback这个函数会在链接被创建的时候被调用,当我希望这个链接被建立的时候就被保存起来,我就可以自己写一个函数将这个链接保存起来,比如一个聊天室,当谁上线了,此时就会通知其他在线的人谁谁谁上线了,这个不就是在链接被创建的时候被建立的吗?我就可以将这个功能封装为一个接口通过上面的回调函数被自动调用,setMessageCallback这个函数会在新连接被调用的时候才会被自动的进行调用。这就是Tcpserver类。
下一个EventLoop类的介绍:
class EventLoop : noncopyable
{
public:/// Loops forever./// Must be called in the same thread as creation of the object.void loop();/// Quits loop./// This is not 100% thread safe, if you call through a raw pointer,/// better to call through shared_ptr<EventLoop> for 100% safety.void quit();TimerId runAt(Timestamp time, TimerCallback cb);/// Runs callback after @c delay seconds./// Safe to call from other threads.TimerId runAfter(double delay, TimerCallback cb);/// Runs callback every @c interval seconds./// Safe to call from other threads.TimerId runEvery(double interval, TimerCallback cb);/// Cancels the timer./// Safe to call from other threads.void cancel(TimerId timerId);
private:std::atomic<bool> quit_;std::unique_ptr<Poller> poller_;mutable MutexLock mutex_;std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};
这个类很关键,因为在muto库中具有一个机制:one loop one perthread
一个线程一个事件处理循环,这个事件处理循环也就是上面的这个Eventloop类,这个类中封装了epoll,定时器等等很多东西来进行事件监控,业务处理,所以每一个线程内部都具有一个Eventloop对象,来进行事件监控,业务处理。所以构建Tcpserver的时候需要传入一个Eventloop对象。有了Eventloop对象才会对TCP中的监听套接字进行监控和IO处理。这个类中的接口比较少,我们需要了解的接口也就是loop开始循环接口。下面的几个接口是几个定时器接口。
有了这几个接口就能够完成TCP服务器的搭建了。搭建了服务器之后,收到消息,会进行一个业务处理,但是现在服务端要给客户端回复消息要如何回呢?这就需要知道在Tcpserver中的setMessageCallback中传入的回调函数是具有格式的,格式如下:
这里可以看到这里又出现了一个新的类,这个类解决的就是如何向客户端进行消息的响应。
class TcpConnection : noncopyable,public std::enable_shared_from_this<TcpConnection>
{
public:/// Constructs a TcpConnection with a connected sockfd////// User should not create this object.TcpConnection(EventLoop* loop,const string& name,int sockfd,const InetAddress& localAddr,const InetAddress& peerAddr);bool connected() const { return state_ == kConnected; }bool disconnected() const { return state_ == kDisconnected; }void send(string&& message); // C++11void send(const void* message, int len);void send(const StringPiece& message);// void send(Buffer&& message); // C++11void send(Buffer* message); // this one will swap datavoid shutdown(); // NOT thread safe, no simultaneous callingvoid setContext(const boost::any& context){ context_ = context; }const boost::any& getContext() const{ return context_; }boost::any* getMutableContext(){ return &context_; }void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }void setMessageCallback(const MessageCallback& cb){ messageCallback_ = cb; }
private:enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };EventLoop* loop_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;boost::any context_;
};
这个类中的函数主要就需要认识到下面的函数
connected函数,返回当前是否处于连接完成状态的函数
disconnected函数,返回当前是否处于连接断开状态
send函数就是用来进行数据的发送的
setContext函数设置连接上下文的函数,其它的函数基本没有使用过。
最后还有一个Buffer类这个类就是内部这个mudo库中实现的缓冲区类,这个类中就有函数能够将缓冲区中的内容取出。或者是发送数据到缓冲区中,或者追加数据到缓冲区中。
class Buffer : public muduo::copyable
{
public:static const size_t kCheapPrepend = 8;static const size_t kInitialSize = 1024;explicit Buffer(size_t initialSize = kInitialSize): buffer_(kCheapPrepend + initialSize),readerIndex_(kCheapPrepend),writerIndex_(kCheapPrepend);void swap(Buffer& rhs)size_t readableBytes() constsize_t writableBytes() constconst char* peek() constconst char* findEOL() constconst char* findEOL(const char* start) constvoid retrieve(size_t len)void retrieveInt64()void retrieveInt32()void retrieveInt16()void retrieveInt8()string retrieveAllAsString()string retrieveAsString(size_t len)void append(const StringPiece& str)void append(const char* /*restrict*/ data, size_t len)void append(const void* /*restrict*/ data, size_t len)char* beginWrite()const char* beginWrite() constvoid hasWritten(size_t len)void appendInt64(int64_t x)void appendInt32(int32_t x)void appendInt16(int16_t x)void appendInt8(int8_t x)int64_t readInt64()int32_t readInt32()int16_t readInt16()int8_t readInt8()int64_t peekInt64() constint32_t peekInt32() constint16_t peekInt16() constint8_t peekInt8() constvoid prependInt64(int64_t x)void prependInt32(int32_t x)void prependInt16(int16_t x)void prependInt8(int8_t x)void prepend(const void* /*restrict*/ data, size_t len)
private:std::vector<char> buffer_;size_t readerIndex_;size_t writerIndex_;static const char kCRLF[];
};
以上就是搭建一个服务器需要知道的类。
下面就来使用上面的几个类来搭建一个英译汉的服务器,在搭建服务器的时候首先就要将这个库文件挪过去,在安装modo库的时候创建了一个新的build文件夹,这个文件夹中的release-install-cpp11文件夹下,就具有include文件夹和lib文件夹,这两个文件夹就是mudo库的头文件和库文件,要使用这个mudo库就需要将这两个文件夹中的内容拷贝到工作项目中,然后就可以来写这个服务器了:
这个服务器具有两个私有成员变量一个就是服务器,一个就是循环的事件,然后就是对这个类进行初始化了。在完成了初始化之后,在将启动服务器的函数,建立新连接之后的回调函数,对客户端发起的请求进行回应的回调函数的接口。
需要注意一下上图中onMessage和onConnection函数中的第一个参数应该是const muduo::net::TcpConnectionPtr& conn而不是TcpConnection
之后在main函数中调用start函数就可以启动服务器了,下面要做的是需要完成两个回调以及start函数。
下面完成的就是start函数。用于启动整个服务器,_server会监控到来的链接将其交给从属Reactor,从属Reactor(就是_baseloop,其实就是一个epoll)会将这个链接放到IO的监控下,对满足条件的链接进行事件处理
下面要完成的是onConection函数,这个函数会在新链接被建立和断开的时候被调用:
光是完成还不够,需要将这个函数设定到_server的connectioncallback中,让Tcpserver能够在链接建立和断开的时候完成自动的调用。
但是这样写是错误的,因为setConnectionCallback这个函数要求的函数的参数只能是一个TcpConnection,但是因为onConnection是在类中的成员函数,所以这个函数是具有隐藏的this指针的,所以这里需要使用bind函数适配器去完成函数的适配。
这样就能够让服务器在建立链接和断开连接的时候自动调用Onconnection函数了。
但是这样还不够因为当客户端的请求到来的时候还需要服务器做出响应,所以还需要将onMessage函数设定到响应回调函数中,因为处理响应的函数需要具有三个可选参数,所以使用bind的时候需要预留三个位置:
这样就将事件处理函数也交给了,server服务器,因为我想做的是一个简单的汉译英的服务器,所以还需要一个处理业务的函数
这个函数比较简单,
下面要完成的就是onMessage函数了,这个函数要做的事情也就3步,第一步将客户端发送来的信息提取出来,第二步对数据进行处理,最后一步对数据进行回应。
其中Buffer类中的retrieveAllAsString函数就是将客户端发来的所有的信息按照全部按照字符串的形式取出来。conn中的send函数能够将信息发送给客户端
这样这个简单的服务器端就完成了,最后使用main函数创建对象然后启动start,这里我再写一个makefile文件
其中的-I是为了指明这个文件需要的头文件所在的路径,-L是为了指明这个文件需要的库文件所在的路径,之后的-l指明需要连接的静态库是什么,最后这个库肯定是使用了多线程的,所以最后还需要加上连接多线程的库
生成成功,运行之后使用netstat -nltp就可以看到这个服务器的8081端口已经被监视起来了了,等待客户端的请求了
上面的代码其实就是完成了,对客户端的请求做出不同的回应,在这个服务器中的两个成员中_server只是会对新到来的连接进行监控,然后将这个连接交给eventloop而这个eventloop才是epoll会对连接进行IO监控,对满足条件的链接进行事件处理。
下面来进行客户端的搭建
既然要去搭建客户端那么首先就要去了解客户端的类
class TcpClient : noncopyable
{
public:// TcpClient(EventLoop* loop);// TcpClient(EventLoop* loop, const string& host, uint16_t port);TcpClient(EventLoop *loop,const InetAddress &serverAddr,const string &nameArg);~TcpClient(); // force out-line dtor, for std::unique_ptr members.void connect(); // 连接服务器void disconnect(); // 关闭连接void stop();// 获取客⼾端对应的通信连接Connection对象的接⼝,发起connect后,有可能还没有连接建⽴成功TcpConnectionPtr connection() const{MutexLockGuard lock(mutex_);return connection_;}/// 连接服务器成功时的回调函数void setConnectionCallback(ConnectionCallback cb){connectionCallback_ = std::move(cb);}/// 收到服务器发送的消息时的回调函数void setMessageCallback(MessageCallback cb){messageCallback_ = std::move(cb);}private:EventLoop *loop_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
class CountDownLatch : noncopyable
{
public:explicit CountDownLatch(int count);void wait(){MutexLockGuard lock(mutex_);while (count_ > 0){condition_.wait();}}void countDown(){MutexLockGuard lock(mutex_);--count_;if (count_ == 0){condition_.notifyAll();}}int getCount() const;private:mutable MutexLock mutex_;Condition condition_ GUARDED_BY(mutex_);int count_ GUARDED_BY(mutex_);
};
首先看构造函数:
可以看到这个构造函数依旧需要一个Eventloop类,来进行事件循环,而第二个就是绑定的服务器的地址信息了,最后一个就是客户端的名字了。
下面两个函数:
void connect(); // 连接服务器 void disconnect(); // 关闭连接,这两个函数和在服务端那里的函数是不同的,
connection函数用于从服务器端获取链接,因为客户端只有和服务端建立了链接才能进行通信。在muduo库中无论是客户端还是服务端对于对端传递过来的信息的处理都是异步的,这也就意味着客户端是无法等待服务端将信息发送过来的。所以在客户端这里也存在回调函数:
我们自己完成两个业务函数,当连接建立的的时候自动调用业务函数,这样就不需要客户端去等待服务端的请求。好处为只需要设定两个回调函数,就不需要处理后面的事情了。但是如果客户端对于服务端发送来的信息,需要等待这个信息的到来的时候就会出现问题。还有一个问题:在上面的函数中: void connect(); // 连接服务器这个函数是非阻塞函数,也就是说,这个函数被调用了一次之后不会等待连接建立成功而是直接就返回了,如果在连接没有建立的时候获取了这个连接,然后send发送信息,这个信息是不会发送成功的,因为连接还没有建立成功,还有可能引发其它的连带问题(获取到的connection是一个空的对象,调用send会让客户端直接崩溃)。
为了不发生这样的问题就需要在了解一个类CountDownLatch
class CountDownLatch : noncopyable
{
public: explicit CountDownLatch(int count); void wait(){ MutexLockGuard lock(mutex_); while (count_ > 0) { condition_.wait(); } } void countDown(){ MutexLockGuard lock(mutex_); --count_; if (count_ == 0) { condition_.notifyAll(); } } int getCount() const;
private: mutable MutexLock mutex_; Condition condition_ GUARDED_BY(mutex_); int count_ GUARDED_BY(mutex_);
};
这个类就是用来做技术同步的,这个类中有一个wait函数就是用来等待的,countDown就是唤醒。所以对于某些必须连接了服务器之后才能继续往下执行的代码,在客户端调用了connect之后,会再去调用CountDownLatch 类中的wait方法,让这个执行流等待,知道这个连接建立成功了,开始执行连接建立成功的回调函数的时候(也就是执行我自己写的方法),在这个方法中再去唤醒这个执行流,让其继续往下运行。客户端需要使用的类和方法主要就是上面说的几个,下面就开始搭建客户端的框架。也就是要将muduo库中的客户端类封装起来,变成符合我要求的类。在写这个类之前还需要知道一个知识点,在写服务端的时候我们知道了在EventLoop类中的loop函数是一个阻塞等待函数,在服务端的时候,因为服务端可能接收到多个客户端的请求,所以需要主线程去执行loop函数,而我的这个客户端并不需要接收来组多个客户端的请求,所以我的这个客户端的主线程并不需要去阻塞循环等待。由此就需要一个新的类:
class EventLoopThread : noncopyable
{public:typedef std::function<void(EventLoop*)> ThreadInitCallback;EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),const string& name = string());~EventLoopThread();EventLoop* startLoop();private:void threadFunc();EventLoop* loop_ GUARDED_BY(mutex_);bool exiting_;Thread thread_;MutexLock mutex_;Condition cond_ GUARDED_BY(mutex_);ThreadInitCallback callback_;
};
这个类就是创建一个新的线程这个线程专门去执行loop函数。这样我的客户端主线程就不会被阻塞等待了。
然后我的客户端需要完成的公共函数为构造函数,和连接服务器端的函数:
同时为了能够在连接建立和收到服务端的回应之后能够正确的进行处理,所以还需要进行回调函数的设置:
到这里基本的框架就完成了。
在初始化_latch的时候将计数设置为1,如果小于1那么第一次调用wait就不会被阻塞,如果大于了1,虽然能够阻塞但是需要多次唤醒,所以设定初始值为1是最好的情况。
到这里就完成了部分的构造函数,以及connect函数,下面去完成当连接建立成功之后会被调用的回调函数:
下面要完成的就是当客户端收到服务端的信息之后要进行的回调函数了
下一个函数向服务端发送信息的函数:
上面的代码就能够完成信息的发送了,但是这样还不够,因为不能确认在发送信息的时候这个连接是否还在,同时为了让上层知道这一次信息的发送是否成功,所以需要将上面这个函数的返回值设定为bool
下面再将上面的两个回调函数设定到_client这个类中即可:
然后在main函数中进行调用即可
下面就来编译运行一下,依旧是将这个客户端放到makefile文件中
下面就来进行一下编译
编译成功.
然后进行通信:两端启动之后进行通信,通信也是成功。
到这里借助mudo库一个简单的客户端和服务端就建立成功了。
下面我们需要完成一个基于mudo库的protobuf的通信协议,上面写的客户端和服务器是根本没有网络通信协议的,也没有考虑TCP粘包的问题,上面的demo没有考虑这么多,但是在正式的项目中必须有网络协议的。否则粘包问题都无法解决,
基于muduo库的protobuf协议
要解决粘包问题这个协议首先就要去描述本次的一个请求有多长,然后取出这么多的数据,由此才能解决粘包问题。
在muduo库中是存在一个已经写好了的protobuf的应用层协议的:我们可以学习这个应用层协议:在client.cc中大佬写了一个自己的类:
class QueryClient : noncopyable
{public:QueryClient(EventLoop* loop,const InetAddress& serverAddr): loop_(loop),client_(loop, serverAddr, "QueryClient"),dispatcher_(std::bind(&QueryClient::onUnknownMessage, this, _1, _2, _3)),codec_(std::bind(&ProtobufDispatcher::onProtobufMessage, &dispatcher_, _1, _2, _3)){dispatcher_.registerMessageCallback<muduo::Answer>(std::bind(&QueryClient::onAnswer, this, _1, _2, _3));dispatcher_.registerMessageCallback<muduo::Empty>(std::bind(&QueryClient::onEmpty, this, _1, _2, _3));client_.setConnectionCallback(std::bind(&QueryClient::onConnection, this, _1));client_.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &codec_, _1, _2, _3));}
上面代码中的registerMessageCallback就是注册一个消息回调,上面的函数注册了四个方法,针对不同的请求进行不同的业务处理,这和我们上面那个demo是一样的,setMessageCallback告诉服务器什么样的请求做出什么样的处理,只不过上面的代码针对的是protobuf的数据进行了处理。ProtobufCodec也是一个类,这个类的构造函数需要传入消息处理函数,并且这个函数内部具有两个方法,一个send方法,一个OnMessage方法,所以这个类的操作就是针对于protobuf进行协议处理的一个类,那么
ProtobufCodec这个类中的消息处理函数是如何进行消息处理的呢?
void ProtobufCodec::onMessage(const TcpConnectionPtr& conn,Buffer* buf,Timestamp receiveTime)
{while (buf->readableBytes() >= kMinMessageLen + kHeaderLen){const int32_t len = buf->peekInt32();if (len > kMaxMessageLen || len < kMinMessageLen){errorCallback_(conn, buf, receiveTime, kInvalidLength);break;}else if (buf->readableBytes() >= implicit_cast<size_t>(len + kHeaderLen)){ErrorCode errorCode = kNoError;MessagePtr message = parse(buf->peek()+kHeaderLen, len, &errorCode);if (errorCode == kNoError && message){messageCallback_(conn, message, receiveTime);buf->retrieve(kHeaderLen+len);}else{errorCallback_(conn, buf, receiveTime, errorCode);break;}}else{break;}}
}
上面的代码是先读取了一串数据,为什么呢?这就需要知道muduo库中的网络通信协议了,协议格式如下:
首先就是4字节长度的len,这个len描述了某一个网络TCP包的长度,然后name_len描述的是第三个type_name整体的数据长度,然后中间的一长条才是protobuf的请求数据(这个请求可能是翻译/加法等等),通过上面的数据格式就解决了粘包问题,所以上面的代码首先就将一个TCP包的整体的长度取出来,然后再取出protobuf段的数据,这个数据就是protobuf请求序列化后的数据,然后进行一次protobuf的反序列化,由此得到了请求/响应的数据。判断取出的信息没有出错之后,调用消息处理函数,这个消息处理函数就是再构造ProtobufCodec类的时候所传入的一个函数,所以说运行逻辑就是当我们的服务器收到一个数据之后会调用ProtobufCodec中的消息处理函数进行处理(对这个数据进行protobuf数据的处理,得到请求/响应),得到请求或者响应之后调用ProtobufDispatcher类中的消息处理函数对请求/响应进行处理,那么这个类中的这个接口又做了什么事情呢?
onProtobufMessage这个方法中就有一个Message的智能指针,通过这个指针读取这个请求的类型,调用不同的数据处理方法,回调函数从哪里来的呢?这个回调函数就来自于构造QueryClient类的时候我们放进去的业务回调函数,内部将我们自己写的回调函数保存起来了,并且根据不同的请求类型有不同的名称,映射了起来。
那么业务处理的流程也就明白了,我们构建业务处理的时候,这个函数黑会被muduo库保留,然后进行映射,当一个请求到来的时候,首先根据请求的类型判断是否具有这个类型的请求的处理方法,如果存在就调用,如果不存在就调用默认的处理方法。有点绕需要画一个图:
首先这个TcpClient中具有一个业务处理函数,在我写的Demo中这个OnMessage是我自己写的函数传递到客户端中,但是在上面的这个库中并不是由我写的,这里传递的是ProtobufCodec中的一个OnMessage函数
然后将这个处理过程设置为了消息回调函数
设置了这个消息回调之后,一旦client收到了消息,就会调用下面的接口进行处理,能够得到一个请求/响应的数据,但是得到了这个请求/响应数据,服务器不知道如何处理啊,由此又有了一个类ProtobufDispatcher,这个类中具有一个映射表,针对于不同的请求具有不同的回调函数
这个映射表是由上图中的这个函数进行注册形成的,这个类中还有一个函数:
然后ProtobufCodec类中的函数在处理一个一个TCP包得到了一个请求/响应之后,就去调用了上面的这个类的业务处理函数:
这个函数再查找表,去寻找我们自己写的业务处理函数,来进行调用。
所以在一开始的构造函数中:
code_c需要绑定一个分发器,因为当得到了请求/响应之后需要分发器,去判断使用什么方法去进行业务处理,而分发器也在一开始就注册了不同的方法,然后服务器再将ProtobufCodec中的消息处理函数设置为消息处理的回调函数,让其去解析协议,得到请求之后,调用分发器进行业务处理,这就是大佬设置的内部的环路模型,下面我们也借助这个库中的工具来完成我们自己的protobuf协议处理函数。下面我们再来定义我自己的proto文件,来编写自己的基于protobuf进行通信的客户端。
下面是编写步骤:
下面首先来完成请求和响应的两个proto文件
首先是请求文件:
然后将其生成出来,然后因为我要完成自己的protobuf处理函数需要使用到大佬的部分文件所以还需要将这些文件移动到我的这个项目的include下一个新建的proto文件夹下:
我需要的大佬的文件路径在我的电脑中如下:
需要移动的文件就是code.cc,code.h,dispatcher.h这三个文件。
下面我们就先来写一个服务端的代码,首先将需要使用到的头文件包含进来:
然后根据大佬的服务端代码首先就是针对于请求的智能指针的一个定义,对于不同的请求定义不同的智能指针类型,因为一个请求完了之后是需要释放对象的,但是生成的proto文件中只是一个类的定义,需要我们自己去定义智能指针。所以下面就需要增加我自己的智能指针类型了(proto文件自动生成了请求结构类,在项目中使用的时候为了方便使用,需要定义智能指针)所以下面我就将proto文件生成的四个不同的类的智能指针类型创建出来放在我的server类中)。
然后根据大佬编写服务器的逻辑,一个server类中需要存在一个服务器对象,一个分发器用于事务处理,以及一个解决粘包,拿取protobuf数据的类。
下面就是这个函数的构造函数了:在构造Tcpserver的时候不要忘了还有一个重要的对象,那就是EventLoop类对象,所以还需要将这个对象也放到成员变量中。并且这个类的初始化要放到_server之前,因为_server初始化需要这个类,先初始化_server会导致崩溃,之后要初始化的是ProtobufDispatcher对象(请求分发器的对象初始化),这个对象的初始化需要传入一个函数,这个方法是用于处理无法识别的请求的。在大佬写的文件中也是定义了一个这样的方法。
上面的代码就是大佬定义的未知请求处理函数。
下面我们需要自己写一个这样的函数:
然后将这个函数绑定给这个分发器做初始化:
最后就是对处理protobuf协议的类对象进行初始化了。
大佬的初始化方法如下:
我们也使用这样的方法进行初始化,这个对象进行初始化需要一个函数,这个函数就是ProtobufDispatcher中的一个函数,并且将分发器作为参数传入到这个函数中,除此之外这个函数还需要三个预留参数
到这里这个三个对象就初始化完成了
到时候服务器收到一个数据包之后会调用ProtobufCodec中的onProtobufMessage方法将这个数据包中的protobuf数据取出来并进行反序列化,然后再通过事件分发器根据这个请求的类型去执行不同的方法。下面的认识就是去完成对应的处理函数了(处理加法请求的函数,处理翻译请求的函数)格式和处理未知请求的函数是一样的格式:
然后将上面的函数在ProtobufDispatcher类对象中创建对应的映射。让收到不同的请求时这个分发器能够调用不同的方法进行处理。
为了能够让服务器在收到一个信息的时候就调用处理方法将一个protobuf信息提取出来,所以需要将ProtobufCodec中的消息处理函数绑定到服务器类的OnMessage函数中
下面就来完成上面的这些事情。
因为函数有点过长了所以没有截图完全。最后还有一个当链接建立/撤销的时候需要调用的处理函数,我也需要绑定到Tcpserver中去,这里我就自己写一个简单的函数了
下面就是服务器的启动函数了:
这是链接在建立/关闭的时候会执行的函数:
然后就是翻译业务的处理函数了,这里就直接使用之前的代码了:
有了这个业务处理函数下面就可以来完成onTranslate函数了:
加法业务的处理函数和上面的步骤基本是一样的
这样一个简单的基于protobuf进行通信的服务端就完成了。
最后在main函数中创建对象启动start函数即可,到这里服务端已经编写完成了,下面要完成的就是客户端代码了。依旧是仿照大佬的设计来进行代码的书写
首先成员对象中肯定要具有client对象,然后和selver一样需要具有分发器和协议处理类。之后因为我的这个客户端不会去连接多个服务端所以使用一个子线程去创建EventLoop类并进行loop监控。
然后就是定义请求和响应的智能指针了,然后完成构造函数和服务端基本都是一样的
下面要做的就是来完成这些函数了。需要注意的是客户端的连接建立/关闭的函数和服务端是不一样的,毕竟为了防止客户端在连接没有建立的时候就发送信息,所以需要在确定连接建立之后再让客户端的主线程进行运行。
然后就是让客户端输入可以创建请求的数据了,也就是建立不同类型的请求了:
建立请求的功能由上面两个函数来完成,下面是当客户端收到了服务端的响应之后会做出的反应:
然后来完成构造请求的两个函数:
这里的发送使用了多态的原理,因为TranslateRequest和AddRequest这两个类的父类都是Message所以这里使用父类的指针去接收子类的对象,后面底层再根据子类的不同调用不同的发送函数。
最后再来完成一个客户端的启动函数:
再在main函数中创建对象进行实验了:
总结一下客户端和服务端一样,因为有了第三方库,所以能够让我们将更多的思想注意到业务逻辑上,而不是序列化,协议处理等等其它方面。其它的方面,服务端和客户端都是差不多一样的,都是使用了第三方库。
到这里客户端也就完成了。
下面画图说明一下初始化服务端那里的各行代码的作用:
muduo库使用起来很方便,但是理解上面的图就更好了。主要记住ProtobufDispatcher这个类是分发器,会根据不同的请求调用不同的业务处理函数。ProtobufCodec则是专门用来获取从底层传递来的数据(网络层状结构)解决TCP包的粘包问题,然后将一个包中的完整的结构对象拿出来,再去调用分发器进行业务处理,由此完成循环。
对于这些类内部的实现更加烧脑。可以去了解。
下面要做的事情就是编译,运行了,下面就是makefile文件的编写了,需要注意要连接上第三方库的组件。并且需要修改一下codec.cc文件,因为这个文件中的需要的两个第三方库已经被我复制到了我现在使用的工作目录下,修改后的头文件如下:
修改的是codec.h以及google-inl.h这两个文件都被我复制到了这个和这个文件的相同目录下:
下面再makefile中联合编译这些文件即可。
需要注意的是因为zlib库并没有默认连接到Linux的项目中,所以最后需要手动-lz去连接zlib库(我写的代码中没有使用到这个库,但是大佬写的代码中应该使用到了这个库,而我现在正在联合编译这个库,所以也需要联动这个库),由此就能够编译成功了:
下面运行一下:
服务端运行之后可以看到8081这个端口已经被成功监控起来了,说明服务端已经没有问题了。
客户端运行后我在客户端执行的两个任务也被成功运行了,到这里这个代码也就写完了。
sqlite数据库
首先要知道什么是sqlite数据库?
SQLite是⼀个进程内的轻量级数据库,它实现了⾃给⾃⾜的、⽆服务器的、零配置的、事务性的 SQL 数据库引擎。它是⼀个零配置的数据库,这意味着与其他数据库不⼀样,我们不需要在系统中配置。 像其他数据库,SQLite 引擎不是⼀个独⽴的进程,可以按应⽤程序需求进⾏静态或动态连接,SQLite 直接访问其存储⽂件,因为这个数据库是一个轻量级的数据库,所以在使用的时候并不需要和MySQL一样,需要运行一个mysql数据库的服务端,然后代码中使用MySQL的客户端来完成增删查改的工作。sqlite是一个本地型的数据库,而mysql是支持远程进行修改的。sqlite主打的就是轻便,不需要进行配置。
下面是为什么使用这个数据库的原因:
- 无服务器架构:
- 不需要一个单独的服务器进程或操作系统(无服务器的)。
- 易于配置:
- SQLite 不需要配置。
- 数据库存储:
- 一个完整的 SQLite 数据库存储在一个单一的跨平台的磁盘文件中。
- 轻量级:
- SQLite 非常小,完全配置时小于 400 KiB,省略可选功能配置时小于 250 KiB。
- 自给自足:
- SQLite 是自给自足的,不需要任何外部的依赖。
- 事务支持:
- SQLite 事务完全兼容 ACID,允许从多个进程或线程安全访问。
- SQL 支持:
- SQLite 支持 SQL92(SQL2)标准的大多数查询语言的功能。
- 编程语言:
- SQLite 使用 ANSI-C 编写,并提供简单易用的 API。
- 跨平台运行:
- SQLite 可在 UNIX(Linux, Mac OS-X, Android, iOS)和 Windows(Win32, WinCE, WinRT)中运行
在我的项目中要使用这个SQLite因为轻量并且兼容性好,以上做了解即可。下面是对sqlite数据库的接口介绍。
下面是官方的文档链接: https://www.sqlite.org/c3ref/funclist.html
可以去看官方的文档,只不过缺点是英文需要翻译。下面是一些常用的接口及其说明:
SQLite3 操作流程
- 查看当前数据库的线程安全状态
- 使用函数:
int sqlite3_threadsafe();
- 返回值:
-
0
-
1
注意:SQLite3 有三种线程安全等级:
- 非线程安全模式
- 线程安全模式:不同的连接在不同的线程/进程间是安全的,单个句柄不能用于多线程间。句柄就是在代码中用于操作sqlite数据库的结构体
- 串行化模式:可以在不同的线程/进程间使用同一个句柄。
这三种模式在启动数据库的时候可以进行设置,下面就进行了说明
- 创建/打开数据库文件,并返回操作句柄
- 使用函数:
int sqlite3_open(const char *filename, sqlite3 **ppDb);
- 成功返回
SQLITE_OK
- 若在编译阶段启动了线程安全,则可以通过参数选择线程安全等级:
- 使用函数:
int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs);
- flags 参数:
-
SQLITE_OPEN_READWRITE
-
SQLITE_OPEN_CREATE
-
SQLITE_OPEN_NOMUTEX
-
SQLITE_OPEN_FULLMUTEX
- 返回值:
SQLITE_OK
- 执行 SQL 语句
- 使用函数:
int sqlite3_exec(sqlite3*, char *sql, int (***callback)(void***,int,char**,char**), void* arg, char **err);
- callback 函数:
- 类型:
int (*callback)(void*, int, char**, char**)
- 参数:
-
void*
- 设置的在回调时传入的 arg
-
int
-
char**
-
char**
- 返回值:成功处理的情况下必须返回
0
,返回非 0
会触发 ABORT
- 返回值:
SQLITE_OK
- 销毁句柄
- 使用函数:
int sqlite3_close(sqlite3* db);
- 成功返回
SQLITE_OK
- 推荐使用:
int sqlite3_close_v2(sqlite3*);
- 无论如何都会返回
SQLITE_OK
获取错误信息
- 使用函数:
const char
sqlite3_errmsg(sqlite3
db);
下面我们在代码中封装上面的接口成为一个类,用于完成对数据库的增删查改操作。
首先定义我要封装的这个类要完成的功能:
首先来说明一下exec函数,这个函数用于对数据库进行操作,而通过对上面的SQLite数据库的操作函数可以知道,要执行一个sql语句需要向这个函数提供一个函数指针,那么我们这里自然也要将这个函数指针作为参数做到exec函数中。其实这个函数指针就是用于对查询语句做的,因为一般来说,非查询语句,直接执行完成之后告诉你执行结果即可,但是查询语句需要告诉我的是查询结果,对于这个查询结果的保存是需要借助一个函数的。
未来这个arg指针很关键,这个arg是要传入到回调函数中的。将查询出来的结果需要进行保存,就是通过这个arg来实现的。
下面就来实现一下上面设置的接口:
首先是构造和open函数:
然后是exec(sql执行函数)和close函数以及成员变量:
代码并不难,只需要了解接口参数的作用然后按照参数进行传递即可。上面的类很简单因为没有涉及到对事务的操作,比如开始事务,保存事务回滚点,回滚事务等等操作其实都是可以做成一个函数的,但是因为在我的这个项目中对于事务的相关操作并没有被使用到,所以就没有对上面的操作进行封装了。
下面要做的事情就是通过测试用例来对上面的类进行测试了
首先测试的是创建打开和插入的功能:
注意我上图中的autoincrement使用错误了,在sqlite数据库中INTEGER PRIMARY KEY创建的主键才能使用自增长的功能。也就是下面的sql:
然后使用sqlit3应用程序打开这个数据库文件进行查询:
成功执行。至于其它的修改删除语句和增加语句一样,只要sql不错,那么是一定可以进行执行的,这里就不测试了,关键点在于查询的操作要如何完成,其实和MySQL一样,都是通过我传入的空间,将数据库查询到的结果从缓冲区拷贝到我传入的空间,如何传入我的空间呢?就是通过回调函数:
然后是主函数:
最后就是打印结果了,可以看到所有的结果都打印出来了,只不过我这里的打印方式存在问题导致了格式不好看。
这就说明我封装的这个类是没有问题的,可以使用即可。
GTEST
首先GTEST是什么呢?GTest是⼀个跨平台的 C++单元测试框架,由google公司发布。gtest是为了在不同平台上为编写C++单 元测试⽽⽣成的。它提供了丰富的断⾔、致命和⾮致命判断、参数化等等。我们最主要的就是了解GTEST的基本使用即可。GTEST的使用分为两个大类:
第一个简单的宏断言(但是c语言中就存在断言,所以这个功能并不是主要的功能)。第二个就是事件机制,具有全局的或者是单独用例的事件机制具体介绍在下面。
首先来介绍GTEST中的宏断言是如何使用的。
GTest中的断⾔的宏可以分为两类:
• ASSERT_系列:如果当前点检测失败则退出当前函数
• EXPECT_系列:如果当前点检测失败则继续往下执⾏
下面是具体的断言:
// bool值检查
ASSERT_TRUE(参数),期待结果是true
ASSERT_FALSE(参数),期待结果是false
//数值型数据检查
ASSERT_EQ(参数1,参数2),传⼊的是需要⽐较的两个数 equal
ASSERT_NE(参数1,参数2),not equal,不等于才返回true
ASSERT_LT(参数1,参数2),less than,⼩于才返回true
ASSERT_GT(参数1,参数2),greater than,⼤于才返回true
ASSERT_LE(参数1,参数2),less equal,⼩于等于才返回true
ASSERT_GE(参数1,参数2),greater equal,⼤于等于才返回true
下面使用代码来了解断言的使用,需要注意GTEST的断言是在GTEST的单元测试框架中使用的,所以这里为了使用这个宏我会写一个GTEST的单元测试框架。
然后就是在main函数中启动这些测试了。并且因为这里我设置的是ASSER_系列的断言,所以在断言失败之后就不会往下运行了,所以最后打印出一个hello。下面来完成main函数:
运行截图:
可以看到在小于比较的那里,因为断言失败了,所以最后没有打印hello并且这个断言还会告诉你是哪里出现了错误。如果使用另外一个系列的断言那么即使断言失败了依旧会运行:
可以看到虽然第一次的断言失败了,但是hello依旧打印了出来。最后总结一下断言宏的使用:
那么为什么不使用c语言自带的assert呢?c语言中的还不需要使用单元测试框架。但是断言只不过是gtest最基本的使用而已,gtest最重要的应该是对于事件机制。
要使用GTEST的事件机制首先就要知道gtest中的事件机制是什么
GTest中的事件机制是指在测试前和测试后提供给⽤⼾⾃⾏添加操作的机制,⽽且该机制也可以让同⼀ 测试套件下的测试⽤例共享数据。GTest框架中事件的结构层次:
但是光看上面的图是无法感受GTEST中的事件机制的。需要继续往下理解,在GTEST中提供了三种常见的事件:
全局事件,TestSuite事件,TestCase事件。
首先来介绍第一种全局事件: 这种事件就是:针对整个测试程序。实现全局的事件机制
上图中也简单说明了一下什么是测试套件,下面来写代码认识一下全局测试事件,代码逻辑:需要创建⼀个⾃⼰的类,然后继承 testing::Environment类,然后分别实现成员函数 SetUp 和 TearDown ,同时在main函数内调用testing::AddGlobalTestEnvironment(new MyEnvironment); 函数添加全局 的事件机制
SetUp接口会完成测试环境的初始化,TearDown 会在所有的单元测试都执行完成之后再去进行执行进行环境的清理
class HashTestEnv : public testing::Environment
{
public:virtual void SetUp() override{std::cout << "测试前:提前准备数据!!\n";dict.insert(std::make_pair("Hello", "你好"));dict.insert(std::make_pair("hello", "你好"));dict.insert(std::make_pair("雷吼", "你好"));}virtual void TearDown() override{std::cout << "测试结束后:清理数据!!\n";dict.clear();}
};
再解释一下上面的代码:
下面使用上面的代码来写一段代码进行全局测试套件的测试:
然后main函数中需要增加一些东西:
运行结果:
可以看到确实在单元测试启动前先进行了测试环境的初始化。
那么这有什么用呢?有了这个东西就能够在不修改源代码的情况下去完成对一些代码的测试如下:这里我设定了一个全局的map,然后在初始化的时候进行值的插入,然后故意让一个测试无法通过:
main函数:
最后的运行结果:
可以看到两个测试用例,都是先进行的环境初始化,然后再去进行的测试,而没有通过测试的那个例子也告诉了我为什么没有通过,最后在进行环境的清理。
对于这些测试套件理解最重要的一点就是能够帮我们完成环境的初始化准备,以及测试环境完成后的清理,不需要去进行源代码的修改。虽然这个全局事件会存在不同的测试用例之间的相互影响,但是gtest还存在其它的测试事件,能够解决这个问题,下面我们就来介绍这个事件:
这个事件的特点就是虽然初始化的测试环境是一样的,但是各个单独的测试是不会互相影响的。这就是独立的测试套件。
TestSuite事件:针对⼀个个测试套件。测试套件的事件机制我们同样需要去创建⼀个类,继承⾃testing::Test ,实现两个静态函数 SetUpTestCase 和 TearDownTestCase ,该测试套件的事件机制不需要像全局事件机制⼀样在 main 注册,⽽是需要将我们平时使⽤的 TEST 宏改为 TEST_F 宏。
◦ SetUpTestCase() 函数是在测试套件第⼀个测试⽤例开始前执⾏
◦ TearDownTestCase() 函数是在测试套件最后⼀个测试⽤例结束后执⾏
◦ 需要注意TEST_F的第⼀个参数是我们创建的类名,也就是当前测试套件的名称,这样在TEST_F宏的测试套件中就可以访问类中的成员了。可以看到这个测试套件依旧是会在所有测试之前进行环境的初始化,而在所有测试结束后进行环境的清理。但是在类中可以设定独立的成员,这样各个单元测试之间就不会相互影响了。简单说就是下面的意思:
下面是这个测试套件的创建以及注意事项:
下面就是写代码来进行测试了。
如果是全局的测试套件,那么这里的第二个单元测试是会报错的,但是这里运行之后:
可以看到所有的单元测试都没有出现错误,说明了不同的单元测试中,map的值是不一样的。但是如果现在我有一个这样的环境呢?现在我存在一个全局的测试对象,并且这个全局测试对象,对所有的单元测试都是具有公共资源的,而另外一个对象则是每一个单元测试中都具有不同的初始化值,要怎么办呢?这就要引出在这个单元测试套件类中依旧可以使用SetUp 和 TearDown这两个函数了,这两个函数都是用于在每一个单元测试前,对单元测试进行初始化和环境清理的。这样上面的代码我就没有必要在单元测试中对类进行初始化了。
再次进行测试
可以开单到第二个测试因为值为2所以测试不通过,而第一个测试成功通过,并且在每每一次单元测试之前都进行了环境初始化,结束后进行环境清理。
这样写的好处就是,未来所有测试对象公共的资源我就可以放到SetUpTestCase()中进去初始化,而各个测试单元之间需要进行单独初始化的对象我就可以放到SetUp中,而清理函数则也是互相对应,一个用于清理全局的公共资源,一个用于清理某一个单元测试所作的环境,并且测试后的结果也很清晰,不需要像打印测试一样,往上翻看打印结果(并且这种测试还要修改源代码)。以上就是gtest的单元测试框架。
C++11异步操作实现线程池
这里需要使用c++11中的异步操作来完成一个线程池来完成一些异步任务,想要实现一个线程池很简单,只需要有一组工作线程,再加上线程安全的任务队列,即可完成,工作线程不断的从任务队列中取出任务然后执行,这就是最简单的线程池。但是最关键的点在于现在一个任务在线程池中进行了执行,但是这个任务的执行结果我要如何拿到呢?如果我不关注某一个任务的结果,那么这个线程池很容易就能够实现,但是我关注任务的执行结果呢?这就需要使用到c++11中的异步操作了,来完成这个工作了。选哟使用到的模板类:
std::future是C++11标准库中的⼀个模板类,它表示⼀个异步操作的结果。当我们在多线程编程中使⽤异步任务时,std::future可以帮助我们在需要的时候获取任务的执⾏结果。std::future的⼀个重要特性是能够阻塞当前线程,直到异步操作完成,从⽽确保我们在获取结果时不会遇到未完成的操作。这个类能够在我想要得到这个异步操作任务的结果的时候,帮助我取得这个任务的结果,并且这个模板类中具有同步的机制,当我想要结果的这个任务还没有结果的时候,这个模板类就会阻塞直到这个任务被执行,std::future才能得到结果。对于这个事务先有一个功能上的了解。再简单介绍一下这个东西的使用场景:
• 异步任务: 当我们需要在后台执⾏⼀些耗时操作时,如⽹络请求或计算密集型任务等,std::future可以⽤来表⽰这些异步任务的结果。通过将任务与主线程分离,我们可以实现任务的并⾏处理,从⽽提⾼程序的执⾏效率
• 并发控制: 在多线程编程中,我们可能需要等待某些任务完成后才能继续执⾏其他操作。通过使⽤std::future,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执⾏后续操作
• 结果获取:std::future提供了⼀种安全的⽅式来获取异步任务的结果。我们可以使⽤std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样,在调⽤get()函数时,我们可以确保已经获取到了所需的结果。
下面是官方文档对这个类的介绍:
然后是这个类中具有三个函数来使用这个类
async实现异步
这个函数就是异步的意思,简单的理解就是异步的去调用一个函数,并且这个函数内部会阻塞等待这个函数异步的执行,这个函数会返回一个future对象用于让我们获取函数执行的结果。这个函数还有两个策略一个async策略一个是deffered策略。
deffered策略的关键点:
- 任务并不立即执行:使用
deferred
- 主线程执行:任务将在调用
get()
或 wait()
- 适用场景:
deferred
aync策略的 关键点:
- 任务在新线程中执行:使用
std::launch::async
- 并行性:主线程可以继续执行其他操作,直到你需要获取结果时才会等待。
- 资源管理:使用
std::async
- 使用
std::launch::async
- 这使得你可以实现并行处理,提高程序的响应性和性能。
并且使用async策略的时候,如果我想要这个任务的执行结果的时候这个任务没有准备好,此时主线程就会等待子线程执行。
下面就来写代码使用这两种策略:
打印结果应该是先打印---------然后打印加法执行,然后是结果。
而如果我将策略换成async
这就证明了两个策略的不同点,一个是创建了子线程,一个就是主线程去执行函数,并且创建子线程执行的那个策略在主线程要结果,而子线程还没有执行完成的时候,因为同步的机制会让主线程阻塞等待。
promise实现异步
下面是第二种future函数(其实也是一个模板类)的使用方式:
这个模板类会返回一个future对象,这个future对象和这个模板类之间会具有一个共享空间,使用例子:
首先就定义了一个模板类对象,然后调用这个类对象中的方法获取future对象,此时上图中的fut和prom之间就具有了同步关系,然后porm对象设置的值,可以通过future对象进行获取。光说没有体现出异步结果保存的效果,但是上面的代码则体现了异步的效果,上面的代码通过启动一个子线程,然后将future对象传递过去,因为同步机制,当子线程需要获取值的时候,如果主线程中的porm没有设置值,就会被阻塞,一直等待到主线程中的porm设置了值。两者之间是具有同步关系的,这就和使用地址的方式不一样,对于使用地址来获取的方法,如果另外一个线程没有执行完成,那么这个地址中的值是不正确的,但是主线程还是会拿取这个值,这就导致了错误的发生。
下面我也来写一个代码,使用一下这个promise:
如果使用传递一个整型的地址的话,如果主线程在子线程之前执行就有可能出现错误,因为你不能确定主线程和子线程谁先执行。
packaged_task实现异步
首先是官方文档:
简单理解就是这个类将一个函数进行了封装,然后通过返回的future对象返回这个函数的执行结果
看例子:
如果你将这个对象当作一个纯粹的可调用对象又不正确,不能当作一个线程的入口函数或者async的一个异步执行任务,也不能完全当作普通的函数来使用但是自己又能和函数一样可以直接执行,还是通过自己写代码来进行理解
首先就是如同普通函数一样去使用:
但是这样的执行并不是异步执行的,那么我能否将这个task当作可调用对象传递到之前学习的async的async策略中进行执行呢?
虽然没有语法报错但是可行吗?
不可行,编译直接报错。
那么我能否将这个函数当作新线程的执行函数呢?
最后也是不可行的:
那么我要如何在子线程中使用这个函数呢?就需要通过指针来实现了,在传递给新线程的时候将这个函数当作参数传递给子线程再去让子线程去执行。这里为了防止出现生命周期的问题,所以创建的package_task对象需要在堆上进行建立。
这样才能让这个对象在另外一个线程进行执行:
总的来说,看起来复杂,但是难度并不高。到这里c++11中的三种能够获取结果的异步方法就全部说明完毕了
线程池的实现
现在具有三种异步方法的实现,选择哪一种作为可以获取结果的线程池的异步实现方案呢?这里选择了package_task加上future来实现的。选择这种方法首先是这种方法的实现比较简单,而promise方法,不太适合,因为promise需要占据一个参数,以此来获取最后线程的结果。async因为内部自带了自己的工作线程,和线程池结合不太合适,所以这里就选择了pakage_task+future实现线程池。线程池的功能就是让用户传入一个可执行函数,以及这个函数的参数,然后让线程池去执行这个函数即可
要实现这个线程池就需要实现这个线程池管理的成员,以及管理的操作了。
下面就是写代码来实现一个基本的框架,首先就是将这个类需要的成员变量写出来:
然后函数:
并且互斥锁和条件变量是要放在线程的前面进行初始化的,如果线程先初始化完成了就会去任务队列中使用互斥锁进行等待,此时如果锁或者条件变量没有初始化完成,就会引发问题。
这里我们先来思考push函数的参数要如何设置,push函数的作用是将,用户传递过来的函数和参数一起push到任务池中,并且因为我的这个线程池还要告诉告诉用户自己的这个任务执行的情况怎么样了,所以需要返回一个future对象。由此产生了两个问题,首先就是我如何收取用户的参数呢?第二个就是我如何得到用户传入的这个函数执行后的返回值呢?
对于获取用户传入的函数以及参数可以通过模板+参数包来实现
但是还有一个问题就是如何得到用户这个函数的返回值呢?可以使用auto+推导的方式得到返回值
那么为什么这个任务池不是package_task的池子呢?首先因为package_task不能完全当作一个函数来直接进行传递,而之前的package_task的例子中执行的是一个匿名函数,在匿名函数中再去调用package_task的。在这个线程池中每一个package_task最后都会生成一个匿名函数,保存在任务池中。上图中的右值引用,是为了保持传入参数的属性。
下面首先来完成构造函数:构造函数的目的就是根据传入的参数创建足够数量的进程
这样构造函数就完成了。然后是stop函数,也就是线程池启动的函数。
然后我们来完成push函数,push函数的步骤也就是:
将获取的用户函数封装为一个异步函数(package_task),然后对这个异步函数封装为匿名函数之后放入到任务队列中。
首先来完成第一步将用户的函数封装为异步函数,但是这一步就出现了问题,那就是对于用户的这个函数的类型我是完全不知道的啊,这里就需要通过将用户的参数和函数都绑定为一个新函数的方式进行解决了(函数的参数类型通过推导可以获取)。
上图中的->后面的代码出现了错误,正确的形式应该是std::funture<decltype(func(args...))>,通过decltype(func(args...))得到用户函数的返回值,然后就能够构建我push函数的返回值了,还有一个auto func这一句代码也错误了,正确形式应该是: auto tmpfunc = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
这样创建的异步任务是符合规定的吗?很显然并不符合规定,因为这样创建的异步对象只是一个局部的对象,一旦出了这个作用域就会被销毁。所以这里需要使用shared_ptr创建在堆上的异步对象,然后将任务函数push到任务队列中,然后唤醒一个线程去进行执行即可
最后就来完成线程的入口函数,这个函数的功能就是当任务池不为空,或者_stop被置位的时候唤醒。
下面线程要做的事情就是拿出任务然后去执行了,但是如果一个线程一次只拿一个任务的话就会面临频繁的上锁那任务解锁的操作,增大锁的竞争。
所以做下面的操作,创建一个临时的任务数组,因为这个数组是局部的,所以在出了作用域之后就会被销毁,然后在加锁期间让其和任务队列中的内容进行交换,这样一个线程就拿到了所有的任务,然后去执行这些任务。
因为所有的线程都是在执行完任务之后,再去执行下一组任务,所以所有的子线程执行的函数都是死循环,只要线程池没有被停止就会不断的运行。
这样就解决了频繁加锁然后获取任务的问题,减少了锁的冲突问题。任务被执行的过程是不需要被加锁保护的。线程池只是保证任务数组的操作是线程安全的即可。上面的{}是为了确定锁的保护范围。
下面就要进行功能测试了:
成功执行:
从1开始加,第一次就是2,一直到从1+到18,所以最后的答案是19。最后再优化一下:
这个代码使用很多的技术:参数绑定,类型推导,参数包的使用,完美转发,future和package_task的使用,以及再线程池中定义函数类型,再用函数类型来组成任务池的技术。
到这里线程池就写完了。
下面就可以正式的开始项目的书写了
成功执行:
从1开始加,第一次就是2,一直到从1+到18,所以最后的答案是19。最后再优化一下:
这个代码使用很多的技术:参数绑定,类型推导,参数包的使用,完美转发,future和package_task的使用,以及再线程池中定义函数类型,再用函数类型来组成任务池的技术。
到这里线程池就写完了。
项目的准备工作也就完成了。