当前位置: 首页 > news >正文

项目扩展四:交换机和队列的特性完善【自动删除与队列独占的实现】

项目扩展四:交换机和队列的特性完善【自动删除与队列独占的实现】

  • 一、自动删除标志的完善
    • 1.交换机的自动删除
    • 2.如何实现
      • 1.BindManager模块
      • 2.虚拟机模块实现
      • 3.验证
    • 3.队列的自动删除
      • 1.何时进行自动删除
      • 2.实现
        • 1.前置函数的实现
        • 2.实现判断并清理函数
      • 3.调用
      • 4.验证
        • 1.unBind导致的自动删除
        • 2.BasicCancel导致的自动删除
        • 3.反向验证BasicAck的影响
  • 二、队列独占标志的完善
    • 1.什么是独占机制
    • 2.独占机制的自动删除和持久化之间的关系
    • 3.设计
    • 4.proto文件修改
    • 5.Connection和队列模块的成员修改
    • 6.队列访问的限制
      • 1.declare和erase
        • 1.队列模块的修改
        • 2.虚拟机模块
        • 3.信道模块
      • 2.Bind和UnBind
        • 1.虚拟机模块
        • 2.信道模块
      • 3.basicConsume和basicCancel
      • 4.basicAck和basicPull
    • 7.broker服务器修改
    • 8.验证
      • 1.声明,删除队列的验证
      • 2.绑定,解绑队列验证
      • 3.订阅取消订阅验证
      • 4.消息确认和消息拉取验证【顺便验证一下不会影响basicPublish】
      • 5.独占队列的持久化验证
  • 三、独占队列自动删除的实现
    • 1.独占队列自动删除的目的
    • 2.设计
    • 3.实现
      • 1.队列模块:
      • 2.虚拟机模块
        • 1.消息模块
        • 2.绑定信息模块
        • 3.虚拟机模块
      • 3.broker服务器模块
    • 4.测试
      • 1.代码
      • 2.演示

一、自动删除标志的完善

1.交换机的自动删除

何谓自动删除:当该交换机跟所有的队列都解除绑定,且auto_delete字段被设置为true,此时该交换机就会被删除

注意:若该交换机被设置为了durable,那么它在持久化字段当中也会被删除。服务器重启后,该交换机不会被恢复

2.如何实现

首先,先看这个操作涉及到哪些模块:交换机,队列,绑定信息模块
这三个模块都是被虚拟机整合的模块,根据封装屏蔽底层细节这一好处,我们只需要修改虚拟机内部即可

总结一下,就是unBind之后,若auto_delete为true,则检查该交换机的绑定信息是否为空
若为空,则删除该交换机

1.BindManager模块

首先我们要先从BindManager模块增加上
用来判断指定交换机和队列的绑定信息是否为空的函数

// 判断交换机的绑定信息是否为空
bool emptyBindingOfExchange(const std::string &ename)
{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _exchange_map.find(ename);if (iter == _exchange_map.end())return true;return iter->second.empty();
}// 判断队列的绑定信息是否为空
bool emptyBindingOfMsgQueue(const std::string &qname)
{std::unique_lock<std::mutex> ulock(_mutex);// 遍历整张交换机表,查找是否有当前队列for (auto &kv : _exchange_map){if (kv.second.count(qname) > 0){return false;}}return true;
}

2.虚拟机模块实现

bool unBind(const std::string &ename, const std::string &qname)
{// 0. 删交换机// 1. 解绑if (!_bmp->unBind(ename, qname))return false;// 2. 判断是否是auto_deleteExchange::ptr ep = _emp->getExchange(ename);if (ep->auto_delete){// 3. 判断绑定信息是否为空if (_bmp->emptyBindingOfExchange(ename)){return eraseExchange(ename);}}return true;
}

3.验证

创建一个交换机和一个队列,进行绑定,然后解绑
然后获取查找该交换机是否存在,然后看一下SQLite3数据库

int main()
{VirtualHostManager::ptr vhp = std::make_shared<VirtualHostManager>("main.db");vhp->declareVirtualHost("vhost1", "host1/host.db", "host1");vhp->declareExchange("vhost1", "exchange1", DIRECT, true, true, {});vhp->declareMsgQueue("vhost1", "queue1", true, false, true, {});vhp->bind("vhost1", "exchange1", "queue1", "news.sport.#");vhp->unBind("vhost1", "exchange1", "queue1");std::cout << vhp->existsExchange("vhost1", "exchange1") << "\n";return 0;
}

在这里插入图片描述
验证成功

3.队列的自动删除

1.何时进行自动删除

因为队列不仅关乎到与交换机的绑定,他也跟消费者和消息直接相连,只有当这些外部链接全都断开时,这个队列才可以进行自动删除。

因此,当队列与交换机的绑定全都解除,与消费者关联全都解除,与消息的关联全都解除之后,这个队列才可以自动删除

2.实现

首先,我们要看队列的自动删除涉及哪些模块:队列模块,绑定信息模块,消息模块,消费者模块

也就是虚拟机模块和消费者模块,因此我们需要一直追溯修改到信道

我们要看断开连接具体都是哪些函数:
unBind【绑定信息的判断】basicCancel【消费者关联信息的判断】,还有消息的判断,因此我们需要加一个函数用来进行消息的判断

判断某个队列是否具有消息,有两种情况:

  1. 这个队列压根就没有QueueMessageManager
    MessageManager当中是否有该队列关联的QueueMessageManager
  2. 这个队列的消息为空

针对第一种情况:
在MessageManager当中添加如下函数:

bool existQueueMessageManager(const std::string &qname)
{std::unique_lock<std::mutex> ulock(_mutex);return _qmsg_map.count(qname) > 0;
}

针对第二种情况:
在QueueMessageManager当中添加如下函数:

// 该队列当中是否还有待推送/待拉取/待确认的消息,若都没有,则该队列的消息为空
bool empty()
{std::unique_lock<std::mutex> ulock(_mutex);return _waitpublish_map.empty() && _waitack_map.empty();
}

修改完善一下existQueueMessageManager,改名为emptyQueueMessage

bool emptyQueueMessage(const std::string &qname)
{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end())return true;return iter->second->empty();
}
1.前置函数的实现

当我们进行unBind,basicCancel和basicAck的时候都有可能会触发队列的自动删除

而队列的自动删除需要判断以下三个条件:

  1. 该队列的绑定信息为空
  2. 该队列的消费者为空
  3. 该队列相关消息为空

既然我们都决定把这个函数统一起来,它位于信道模块,因此我们需要虚拟机模块提供判断队列相关消息是否为空以及队列绑定信息是否为空的函数

VirtualHost

bool emptyBindingOfMsgQueue(const std::string& qname)
{return _bmp->emptyBindingOfMsgQueue(qname);
}bool emptyMessageOfMsgQueue(const std::string& qname)
{return _mmp->emptyQueueMessage(qname);
}

VirtualHostManager

bool emptyBindingOfMsgQueue(const std::string& vname,const std::string& qname)
{std::ostringstream oss;oss << "判断指定队列绑定信息是否为空失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->emptyBindingOfMsgQueue(qname);
}bool emptyMessageOfMsgQueue(const std::string& vname,const std::string& qname)
{std::ostringstream oss;oss << "判断指定队列绑定消息是否为空失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->emptyMessageOfMsgQueue(qname);
}

同理,在消费者管理模块也要增加一个判断某个队列关联的消费者是否存在

QueueConsumerManager

bool empty()
{std::unique_lock<std::mutex> ulock(_mutex);return _consumer_map.empty();
}

ConsumerManager

bool emptyOfConsumer(const std::string &vname, const std::string &qname)
{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _consumer_map.find({vname, qname});if (iter == _consumer_map.end())return true;return iter->second->empty();
}
2.实现判断并清理函数
void check_deleteQueue(const std::string& vname,const std::string& qname)
{// 1. 判断该队列auto_delete是否为trueMsgQueue::ptr mqp=_vhost_manager_ptr->getMsgQueue(vname,qname);if(!mqp->auto_delete) return;// 2. 判断该队列的绑定信息是否为空if(!_vhost_manager_ptr->emptyBindingOfMsgQueue(vname,qname)) return;// 3. 判断该队列关联消息是否为空if(!_vhost_manager_ptr->emptyMessageOfMsgQueue(vname,qname)) return;// 4. 判断该队列关联消费者是否为空if(!_consumer_manager_ptr->emptyOfConsumer(vname,qname)) return;// 5. 删除该队列// 注意: 销毁队列的消费者管理句柄_consumer_manager_ptr->destroyQueueConsumerManager(vname, qname);_vhost_manager_ptr->eraseMsgQueue(vname, qname);
}

3.调用

下面我们在Channel的unBind,basicAck和basicCancel部分进行调用

void unBind(const UnbindRequestPtr &req)
{bool ret = _vhost_manager_ptr->unBind(req->vhost_name(), req->exchange_name(), req->queue_name());if(ret){check_deleteQueue(req->vhost_name(),req->queue_name());}basicResponse(req->req_id(), req->channel_id(), ret);
}
void basicAck(const BasicAckRequestPtr &req)
{bool ret = _vhost_manager_ptr->basicAck(req->vhost_name(), req->queue_name(), req->msg_id());if(ret){check_deleteQueue(req->vhost_name(),req->queue_name());}basicResponse(req->req_id(), req->channel_id(), ret);
}
// 正常情况下肯定是当前信道所关联的消费者才会调用取消订阅这个函数
void basicCancel(const BasicCancelRequestPtr &req)
{bool ret = _consumer_manager_ptr->removeConsumer(req->vhost_name(), req->queue_name(), req->consumer_tag());_consumer.reset();if(ret){check_deleteQueue(req->vhost_name(),req->queue_name());}basicResponse(req->req_id(), req->channel_id(), ret);
}

4.验证

#include "connection.hpp"
using namespace ns_mq;void ConsumerCb(const Channel::ptr &cp, const std::string &consumer_tag, const BasicProperities *bp, const std::string &body)
{std::cout << "void ConsumerCallback(const std::string& consumer_tag,const BasicProperities* bp,const std::string& body) 被调用\n";if (bp != nullptr){if (cp->BasicAck(bp->msg_id()))cout << "消息ACK成功\n";elsecout << "消息ACK失败\n";}else{cout << "消息没有基础属性,无法ACK\n";}
}void ProductorCb(const BasicPublishResponsePtr &resp)
{std::cout << "void ProductorCallback(const BasicPublishResponsePtr& resp) 被调用\n";
}int main()
{Connection conn("127.0.0.1", 8888, std::make_shared<AsyncWorker>());Channel::ptr cp = conn.getChannel();cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareExchange("vhost1", "exchange1", FANOUT, true, false, {});cp->declareMsgQueue("vhost1", "queue1", true, false, true, {});cp->bind("vhost1", "exchange1", "queue1", "news.science.#");// 自动确认了cp->BasicConsume("vhost1", "consumer1", "queue1",std::bind(ConsumerCb, cp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), false);cp->ProductorBind("productor1", ProductorCb);BasicProperities bp;bp.set_msg_id(UUIDHelper::uuid());bp.set_mode(DURABLE);bp.set_routing_key("news.science.computer");cp->BasicPublish("vhost1", "exchange1", &bp, "Hello World", PUSH);std::this_thread::sleep_for(std::chrono::seconds(3)); // 休眠3s,自动cp->BasicCancel();cp->unBind("vhost1", "exchange1", "queue1");cp->ProductorUnBind();conn.returnChannel(cp);return 0;
}
1.unBind导致的自动删除

上面的ACK和Cancel都比unBind调用的早,因此这是测试UnBind的
在这里插入图片描述
验证成功

2.BasicCancel导致的自动删除
cp->unBind("vhost1", "exchange1", "queue1");
cp->BasicCancel();

这样就最后才调用BasicCancel了:
在这里插入图片描述
验证成功

3.反向验证BasicAck的影响

跟上面一样按照顺序验证行不通,因为ACK的时候这个信道必须要跟对应的消费者处于关联状态
也就是说ACK必须在Cancel之前调用,否则就会失败

所以我们反向验证,发布消息时发布到拉取链表当中,不去ACK他,那么即使队列的绑定信息和消费者全都解除了
但是队列依然存在

cp->BasicPublish("host1", "exchange1", &bp, "Hello World", PULL);

队列还在,验证成功
在这里插入图片描述
在这里插入图片描述

二、队列独占标志的完善

1.什么是独占机制

在这里插入图片描述
独占队列只能被第一次声明该队列的连接所访问:
即:队列的绑定、解绑、订阅、取消订阅、拉取消息、声明、删除只能被该连接所成功执行

注意:
basicPublish不会收到独占队列的影响,因为他只跟交换机有关,而交换机能否发到队列取决于是否绑定,交换机类型,binding_key,routing_key

2.独占机制的自动删除和持久化之间的关系

在这里插入图片描述
注意:RabbitMQ是允许在同一个虚拟机当中拥有同名队列的(无论是独占还是非独占),但是实际当中严重不推荐这么做
就像是菱形虚拟继承+多态一样,坚决严格杜绝使用

3.设计

独占队列是根据连接来进行区分的,因此独占队列跟连接要建立一对一或者多对一的关系

那就在队列模块当中耦合进连接Connection::ptr对象?还是耦合进TcpConnectionPtr对象呢?

可见,无论耦合哪种,都会极大程度上增大模块间的耦合度

但是连接关系是必要的,那怎么办?
在中间加一层
其实这个中间层较为隐晦,是Connection里面的唯一标识

  • 因此我们给Connection对象加一个标识符字段:connection_id,用UUID进行生成
    在这里插入图片描述

4.proto文件修改

因为这一修改的范围下到队列,上到连接,所以我们直接把proto通信协议改掉
给DeclareMsgQueueRequest,EraseMsgQueueRequest,BindRequest,UnbindRequest,BasicConsumeRequest,BasicCancelRequest,BasicAckRequest,BasicPullRequest
都要加上string connection_id这一字段

//4. 队列的声明与删除
message DeclareMsgQueueRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string queue_name = 4;bool durable = 5;bool exclusive = 6;bool auto_delete = 7;map<string,string> args = 8;string connection_id = 9;
}message EraseMsgQueueRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string queue_name = 4;string connection_id = 5;
}
//5. 队列的绑定与解除绑定
message BindRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string exchange_name = 4;string queue_name = 5;string binding_key = 6;string connection_id = 7;
}message UnbindRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string exchange_name = 4;string queue_name = 5;string connection_id = 6;
}
//6. 队列的订阅与取消订阅
message BasicConsumeRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string consumer_tag = 4;string queue_name = 5;bool auto_ack = 6;      
//注意:我们无需将消费处理回调函数传给服务器,因为服务器需要使用自己内部实现并绑定的消费处理回调函数string connection_id = 7;
}message BasicCancelRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string consumer_tag = 4;string queue_name = 5;string connection_id = 6;
}
message BasicAckRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string queue_name = 4;string msg_id = 5;string connection_id = 6;
}//8. 消息的拉取
message BasicPullRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string queue_name = 4;string consumer_tag = 5;string connection_id = 6;
}
protoc --cpp_out=. mq_proto.proto

5.Connection和队列模块的成员修改

  1. 在Connection模块当中增加成员
std::string _connection_id;
: _connection_id(UUIDHelper::uuid())//构造函数初始化
  1. 在MsgQueue模块当中增加成员
std::string connection_id;

注意:因为持久化恢复之后的独占队列一开始是处于空闲状态的,因此独占队列当中的connection_id成员无需持久化

6.队列访问的限制

1.declare和erase

connection_id是队列成员的一部分,所以我们必须要下到队列模块进行限制

因为独占队列持久化恢复之后是处于“空闲”状态的,而string的默认值就是空串,所以我们声明队列时需要这么判断:

实在不放心的话,可以在队列的持久化恢复模块给上这个:

static int SelectCallback(void *arg, int column, char **rows, char **fields)
{MsgQueueMap *mqmp = static_cast<MsgQueueMap *>(arg);MsgQueue::ptr mqp = std::make_shared<MsgQueue>();mqp->name = rows[0];mqp->durable = static_cast<bool>(std::stoi(rows[1]));mqp->exclusive = static_cast<bool>(std::stoi(rows[2]));mqp->auto_delete = static_cast<bool>(std::stoi(rows[3]));mqp->setArgs(rows[4]);mqp->connection_id = "";mqmp->insert(std::make_pair(mqp->name, mqp));return 0;
}
1.队列模块的修改
bool declareMsgQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs, const std::string &connection_id)
{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _mqp.find(qname);if (iter != _mqp.end()){MsgQueue::ptr mp = iter->second;// 独占队列,只允许第一次声明他的连接所访问使用,因此对于其他连接,直接连声明也不允许if (mp->exclusive && mp->connection_id != "" && mp->connection_id != connection_id){return false;}// 如果该独占队列的connection_id是空的,那么需要更新if (mp->exclusive)mp->connection_id = connection_id;return true;}MsgQueue::ptr mp = std::make_shared<MsgQueue>(qname, qdurable, qexclusive, qauto_delete, qargs, connection_id);if (qdurable){if (!_mapper.insert(mp)){default_error("声明队列持久化数据失败, 队列名: %s",qname.c_str());return false;}}_mqp.insert(std::make_pair(qname, mp));return true;
}bool eraseMsgQueue(const std::string &qname, const std::string &connection_id)
{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _mqp.find(qname);if (iter == _mqp.end())return true;MsgQueue::ptr mp = iter->second;// 判断exclusive和connection_idif (mp->exclusive && mp->connection_id != connection_id){default_error("删除队列失败,因为该连接不是该独占队列的所有者");return false;}if (mp->durable){if (!_mapper.erase(mp->name)){default_error("删除队列持久化数据失败, 队列名: %s",mp->name.c_str());return false;}}_mqp.erase(iter);return true;
}

那能不能允许查独占队列呢?
允许啊,没什么问题,因为你不是对该队列进行访问和操作,你只能拿到该队列的ptr,啥也干不了

2.虚拟机模块

declare队列时,要先初始化该队列的消息管理模块,因此在这里也需要进行限制

这么写就比较优雅了:只有声明队列成功,才允许初始化队列消息管理模块

// 声明/删除队列
bool declareMsgQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs,const std::string& connection_id)
{bool ret=_mqmp->declareMsgQueue(qname, qdurable, qexclusive, qauto_delete, qargs);if(ret){// 初始化该队列的消息管理模块句柄_mmp->initQueueMessageManager(qname);}return ret;
}

删除也是如此,只有删除成功,才可以删除队列消息管理模块和队列绑定信息

bool eraseMsgQueue(const std::string &qname,const std::string& connection_id)
{bool ret=_mqmp->eraseMsgQueue(qname);if(ret){// 删除该队列的所有绑定信息if (!_bmp->removeMsgQueueBindings(qname)){default_error("删除队列失败,因为删除队列的所有绑定信息失败, 队列名:%s",qname.c_str());return false;}// 删除该队列的消息管理模块句柄_mmp->destroyQueueMessageManager(qname);}return ret;
}

虚拟机管理模块:只需要传参一下即可

// 声明/删除队列
bool declareMsgQueue(const std::string &vname, const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs, const std::string &connection_id)
{std::ostringstream oss;oss << "声明队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->declareMsgQueue(qname, qdurable, qexclusive, qauto_delete, qargs, connection_id);
}bool eraseMsgQueue(const std::string &vname, const std::string &qname, const std::string &connection_id)
{std::ostringstream oss;oss << "删除队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->eraseMsgQueue(qname, connection_id);
}
3.信道模块

声明队列时要初始化队列消费者管理模块,删除队列时要删除队列消费者管理模块

void declareMsgQueue(const DeclareMsgQueueRequestPtr &req)
{bool ret = _vhost_manager_ptr->declareMsgQueue(req->vhost_name(), req->queue_name(), req->durable(), req->exclusive(),req->auto_delete(), req->args(), req->connection_id());if (ret){// 注意: 初始化队列的消费者管理句柄!!!!_consumer_manager_ptr->initQueueConsumerManager(req->vhost_name(), req->queue_name());}basicResponse(req->req_id(), req->channel_id(), ret);
}void eraseMsgQueue(const EraseMsgQueueRequestPtr &req)
{bool ret = _vhost_manager_ptr->eraseMsgQueue(req->vhost_name(), req->queue_name(), req->connection_id());if (ret){// 注意: 销毁队列的消费者管理句柄_consumer_manager_ptr->destroyQueueConsumerManager(req->vhost_name(), req->queue_name());}basicResponse(req->req_id(), req->channel_id(), ret);
}

因为虚拟机的eraseMsgQueue增加了一个参数,因此检查并自动删除队列函数也需要跟着改,所以:

void check_deleteQueue(const std::string &vname, const std::string &qname, const std::string &connection_id)
{// 1. 判断该队列auto_delete是否为trueMsgQueue::ptr mqp = _vhost_manager_ptr->getMsgQueue(vname, qname);if (!mqp->auto_delete)return;// 2. 判断该队列的绑定信息是否为空if (!_vhost_manager_ptr->emptyBindingOfMsgQueue(vname, qname))return;// 3. 判断该队列关联消息是否为空if (!_vhost_manager_ptr->emptyMessageOfMsgQueue(vname, qname))return;// 4. 判断该队列关联消费者是否为空if (!_consumer_manager_ptr->emptyOfConsumer(vname, qname))return;// 5. 删除该队列// 注意: 销毁队列的消费者管理句柄_consumer_manager_ptr->destroyQueueConsumerManager(vname, qname);_vhost_manager_ptr->eraseMsgQueue(vname, qname, connection_id);
}

2.Bind和UnBind

Bind和UnBind不牵扯队列具体信息,因此绑定信息模块无需修改,只需要修改虚拟机和信道模块即可

1.虚拟机模块

因为虚拟机模块当中有很多函数都需要进行独占标志判断,因此我们单独提出一个函数来完成这个任务

bool check_queue(const std::string &qname, const std::string &connection_id)
{MsgQueue::ptr mp = _mqmp->getMsgQueue(qname);if (mp.get() == nullptr)return false;if (mp->exclusive && mp->connection_id != connection_id)return false;return true;
}

因为bind,unbind,basicConsume,basicCancel,basicPull,basicAck都要求队列必须存在,而declare和erase不要求队列必须存在,因此我们就不修改declare和erase了

// 绑定/解绑队列
bool bind(const std::string &ename, const std::string &qname, const std::string &binding_key, const std::string &connection_id)
{if (!check_queue(qname, connection_id)){default_error("绑定队列失败,因为该连接不是该独占队列的所有者");return false;}// 因为不知道该交换机,队列是否存在// 也不知道他们是否持久化,所以先查找他们Exchange::ptr ep = _emp->getExchange(ename);if (ep.get() == nullptr){default_error("绑定交换机与队列失败,因为该交换机不存在, 交换机名称: %s",ename.c_str());return false;}MsgQueue::ptr mqp = _mqmp->getMsgQueue(qname);if (mqp.get() == nullptr){default_error("绑定交换机与队列失败,因为该队列不存在, 队列名称:  %s",qname.c_str());return false;}return _bmp->bind(ename, qname, binding_key, ep->durable && mqp->durable);
}bool unBind(const std::string &ename, const std::string &qname,const std::string& connection_id)
{if (!check_queue(qname, connection_id)){default_error("解除绑定队列失败,,因为该连接不是该独占队列的所有者");return false;}// 0. 删交换机// 1. 解绑if (!_bmp->unBind(ename, qname))return false;// 2. 判断是否是auto_deleteExchange::ptr ep = _emp->getExchange(ename);if (ep->auto_delete){// 3. 判断绑定信息是否为空if (_bmp->emptyBindingOfExchange(ename)){return eraseExchange(ename);}}return true;
}

虚拟机管理模块:

// 绑定/解绑队列
bool bind(const std::string &vname, const std::string &ename, const std::string &qname, const std::string &binding_key,const std::string& connection_id)
{std::ostringstream oss;oss << "绑定队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->bind(ename, qname, binding_key,connection_id);
}bool unBind(const std::string &vname, const std::string &ename, const std::string &qname,const std::string& connection_id)
{std::ostringstream oss;oss << "解绑队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->unBind(ename, qname,connection_id);
}
2.信道模块

因为队列的绑定和解除绑定只跟虚拟机模块有关,因此对于信道模块,我们只需要修改传参即可

void bind(const BindRequestPtr &req)
{bool ret = _vhost_manager_ptr->bind(req->vhost_name(), req->exchange_name(), req->queue_name(), req->binding_key(), req->connection_id());basicResponse(req->req_id(), req->channel_id(), ret);
}void unBind(const UnbindRequestPtr &req)
{bool ret = _vhost_manager_ptr->unBind(req->vhost_name(), req->exchange_name(), req->queue_name(), req->connection_id());if (ret){check_deleteQueue(req->vhost_name(), req->queue_name(), req->connection_id());}basicResponse(req->req_id(), req->channel_id(), ret);
}

3.basicConsume和basicCancel

队列的订阅和取消订阅跟消费者管理模块是相关的,但是消费者管理模块不牵扯队列的具体信息,因此无需修改消费者管理模块,只需要修改信道模块即可

因此我们需要在信道模块复用一下虚拟机模块当中的check_queue函数,因此需要虚拟机管理模块封装一下接口

bool check_queue(const std::string &vname, const std::string &qname, const std::string &connection_id)
{std::ostringstream oss;oss << "进行指定队列独占标志检验失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->check_queue(qname, connection_id);
}
void basicConsume(const BasicConsumeRequestPtr &req)
{if(!_vhost_manager_ptr->check_queue(req->vhost_name(),req->queue_name(),req->connection_id())){default_error("当前信道的队列订阅失败,因为未通过该独占队列的检验审核");basicResponse(req->req_id(), req->channel_id(), false);return;}// 1. 看看当前信道有无关联什么消费者if (_consumer.get() != nullptr){default_error("当前信道的队列订阅失败,因为当前信道已经关联了消费者,信道ID:%s ,消费者tag:%s",_channel_id.c_str(),_consumer->_consumer_tag.c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 2. 判断队列是否存在if (!_vhost_manager_ptr->existMsgQueue(req->vhost_name(), req->queue_name())){default_error("当前信道的队列订阅失败,因为队列不存在,队列名:%s",req->queue_name().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 3. 创建消费者_consumer = _consumer_manager_ptr->createConsumer(req->vhost_name(), req->queue_name(), req->consumer_tag(),::std::bind(&Channel::consumeCallback, this, ::std::placeholders::_1, ::std::placeholders::_2, ::std::placeholders::_3), req->auto_ack());// 4. 返回响应basicResponse(req->req_id(), req->channel_id(), true);
}// 正常情况下肯定是当前信道所关联的消费者才会调用取消订阅这个函数
void basicCancel(const BasicCancelRequestPtr &req)
{if(!_vhost_manager_ptr->check_queue(req->vhost_name(),req->queue_name(),req->connection_id())){default_error("当前信道的取消队列订阅失败,因为未通过该独占队列的检验审核");basicResponse(req->req_id(), req->channel_id(), false);return;}bool ret = _consumer_manager_ptr->removeConsumer(req->vhost_name(), req->queue_name(), req->consumer_tag());_consumer.reset();if (ret){check_deleteQueue(req->vhost_name(), req->queue_name(),req->connection_id());}basicResponse(req->req_id(), req->channel_id(), ret);
}

4.basicAck和basicPull

void basicAck(const BasicAckRequestPtr &req)
{if(!_vhost_manager_ptr->check_queue(req->vhost_name(),req->queue_name(),req->connection_id())){default_error("当前信道的消息确认失败,因为未通过该独占队列的检验审核");basicResponse(req->req_id(), req->channel_id(), false);return;}bool ret = _vhost_manager_ptr->basicAck(req->vhost_name(), req->queue_name(), req->msg_id());if (ret){check_deleteQueue(req->vhost_name(), req->queue_name(),req->connection_id());}basicResponse(req->req_id(), req->channel_id(), ret);
}
void basicPull(const BasicPullRequestPtr &req)
{if(!_vhost_manager_ptr->check_queue(req->vhost_name(),req->queue_name(),req->connection_id())){default_error("当前信道的消息拉取失败,因为未通过该独占队列的检验审核");basicResponse(req->req_id(), req->channel_id(), false);return;}// 1. 拿到该消费者Consumer::ptr cp = _consumer_manager_ptr->getConsumer(req->vhost_name(), req->queue_name(), req->consumer_tag());if (cp.get() == nullptr){default_error("拉取消息失败,因为消费者不存在,消费者tag:%s",req->consumer_tag().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 2. 拿到消息MessagePtr mp = _vhost_manager_ptr->basicPull(req->vhost_name(), req->queue_name());if (mp.get() == nullptr){default_error("拉取消息失败,因为该队列没有待推送消息,队列名:%s",req->queue_name().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 3. 封装异步任务,抛入线程池auto func = [cp, mp, req, this](){// 3. 调用该消费者的消费处理回调函数cp->_callback(cp->_consumer_tag, mp->mutable_valid()->mutable_properities(), mp->valid().body());// 4. auto_ack的问题if (cp->_auto_ack){this->_vhost_manager_ptr->basicAck(req->vhost_name(), req->queue_name(), mp->valid().properities().msg_id());}};_pool_ptr->put(func);// 4. 基础相应basicResponse(req->req_id(), req->channel_id(), true);
}

7.broker服务器修改

为了给用户提供良好的使用体验,我们Connection的connection_id是内部自己维护的,无需用户在客户端填写,因此客户端无需任何改动

因此我们需要在broker服务器模块的On…Callback等诸多函数当中设置进这个字段,因此就需要Connection模块提供一个get接口返回内部id值

std::string connection_id()
{return _connection_id;
}
// 3. 设置connection_id
req->set_connection_id(myconn->connection_id());
// 4. 声明/删除队列
void OnDeclareMsgQueue(const muduo::net::TcpConnectionPtr &conn, const DeclareMsgQueueRequestPtr &req, muduo::Timestamp)
{// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("声明队列时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("声明队列失败,因为获取信道失败");return;}// 3. 设置connection_idreq->set_connection_id(myconn->connection_id());mychannel->declareMsgQueue(req);
}void OnEraseMsgQueue(const muduo::net::TcpConnectionPtr &conn, const EraseMsgQueueRequestPtr &req, muduo::Timestamp)
{// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("删除队列时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("删除队列失败,因为获取信道失败");return;}// 3. 设置connection_idreq->set_connection_id(myconn->connection_id());mychannel->eraseMsgQueue(req);
}// 5. 绑定/解绑队列
void OnBind(const muduo::net::TcpConnectionPtr &conn, const BindRequestPtr &req, muduo::Timestamp)
{// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("绑定队列时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("绑定队列失败,因为获取信道失败");return;}// 3. 设置connection_idreq->set_connection_id(myconn->connection_id());mychannel->bind(req);
}void OnUnbind(const muduo::net::TcpConnectionPtr &conn, const UnbindRequestPtr &req, muduo::Timestamp)
{// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("解除绑定队列时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("解除绑定队列失败,因为获取信道失败");return;}// 3. 设置connection_idreq->set_connection_id(myconn->connection_id());mychannel->unBind(req);
}// 6. 订阅/取消订阅队列
void OnBasicConsume(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeRequestPtr &req, muduo::Timestamp)
{// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("订阅队列时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("订阅队列失败,因为获取信道失败");return;}// 3. 设置connection_idreq->set_connection_id(myconn->connection_id());mychannel->basicConsume(req);
}void OnBasicCancel(const muduo::net::TcpConnectionPtr &conn, const BasicCancelRequestPtr &req, muduo::Timestamp)
{// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("取消订阅队列时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("取消订阅队列失败,因为获取信道失败");return;}// 3. 设置connection_idreq->set_connection_id(myconn->connection_id());mychannel->basicCancel(req);
}void OnBasicAck(const muduo::net::TcpConnectionPtr &conn, const BasicAckRequestPtr &req, muduo::Timestamp)
{// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("确认消息时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("确认消息失败,因为获取信道失败");return;}// 3. 设置connection_idreq->set_connection_id(myconn->connection_id());mychannel->basicAck(req);
}// 8. 消息拉取
void OnBasicPull(const muduo::net::TcpConnectionPtr &conn, const BasicPullRequestPtr &req, muduo::Timestamp)
{// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("确认消息时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("确认消息失败,因为获取信道失败");return;}// 3. 设置connection_idreq->set_connection_id(myconn->connection_id());mychannel->basicPull(req);
}

8.验证

1.声明,删除队列的验证

开两个客户端【进程】,一个先跑,创建独占队列,然后另一个跑,声明+删除

先跑的给个10s,后跑给个2s

代码:

#include "connection.hpp"
using namespace ns_mq;// 声明独占队列
int main(int argc, char *argv[])
{if (argc != 2){std::cout << "Usage: " << argv[0] << " 声明队列后休眠的秒数\n";return 1;}int second = std::stoi(argv[1]);Connection conn("127.0.0.1", 8888, std::make_shared<AsyncWorker>());Channel::ptr cp = conn.getChannel();cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareMsgQueue("host1", "exclusive_queue1", true, true, false, {});// 休眠second秒std::this_thread::sleep_for(std::chrono::seconds(second));cp->eraseMsgQueue("host1", "exclusive_queue1");conn.returnChannel(cp);return 0;
}

在这里插入图片描述

2.绑定,解绑队列验证

int main(int argc, char *argv[])
{if (argc != 2){std::cout << "Usage: " << argv[0] << " 声明队列后休眠的秒数\n";return 1;}int second = std::stoi(argv[1]);Connection conn("127.0.0.1", 8888, std::make_shared<AsyncWorker>());Channel::ptr cp = conn.getChannel();cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareExchange("host1","exchange1",TOPIC,true,false,{});cp->declareMsgQueue("host1", "exclusive_queue1", true, true, false, {});cp->bind("host1","exchange1","exclusive_queue1","news.sport.#");// 休眠second秒std::this_thread::sleep_for(std::chrono::seconds(second));cp->unBind("host1","exchange1","exclusive_queue1");cp->eraseMsgQueue("host1", "exclusive_queue1");conn.returnChannel(cp);return 0;
}

在这里插入图片描述

3.订阅取消订阅验证

void Callback(const Channel::ptr &cp, const std::string &consumer_tag, const BasicProperities *bp, const std::string &body)
{std::cout << consumer_tag << "消费了消息:" << body << "\n";if (bp != nullptr){cp->BasicAck(bp->msg_id());}else{std::cout << "无法ACK该消息,因为消息没有基础属性BasicProperities\n";}
}int main(int argc, char *argv[])
{if (argc != 2){std::cout << "Usage: " << argv[0] << " 声明队列后休眠的秒数\n";return 1;}int second = std::stoi(argv[1]);Connection conn("127.0.0.1", 8888, std::make_shared<AsyncWorker>());Channel::ptr cp = conn.getChannel();cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareExchange("host1", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host1", "exclusive_queue1", true, true, false, {});cp->bind("host1", "exchange1", "exclusive_queue1", "news.sport.#");cp->BasicConsume("host1", "consumer" + std::to_string(second), "exclusive_queue1",std::bind(Callback,cp,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3),false);// 休眠second秒std::this_thread::sleep_for(std::chrono::seconds(second));cp->BasicCancel();cp->unBind("host1", "exchange1", "exclusive_queue1");cp->eraseMsgQueue("host1", "exclusive_queue1");conn.returnChannel(cp);return 0;
}

在这里插入图片描述

4.消息确认和消息拉取验证【顺便验证一下不会影响basicPublish】

这里要给一个生产者:

#include "connection.hpp"
using namespace ns_mq;void Callback(const Channel::ptr &cp, BasicPublishResponsePtr resp) {cout<<"我是Callback(const Channel::ptr &cp, BasicPublishResponsePtr resp)!!!!!\n";
}int main()
{Connection conn("127.0.0.1", 8888, std::make_shared<AsyncWorker>());Channel::ptr cp = conn.getChannel();cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareExchange("host1", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host1", "exclusive_queue1", true, true, false, {});cp->bind("host1", "exchange1", "exclusive_queue1", "news.sport.#");std::cout << "A\n";cp->ProductorBind("productor1", std::bind(Callback, cp, std::placeholders::_1));std::cout << "B\n";BasicProperities bp;bp.set_msg_id(UUIDHelper::uuid());bp.set_mode(DURABLE);bp.set_routing_key("news.sport.basketball");std::cout << "C\n";cp->BasicPublish("host1", "exchange1", &bp, "Hello World", PULL);std::cout << "D\n";cp->ProductorUnBind();std::cout << "E\n";conn.returnChannel(cp);std::cout << "F\n";std::this_thread::sleep_for(std::chrono::seconds(10));
/*
防止异步编程当中的这种问题:异步任务未完成,但 std::future 已被销毁:如果主线程(或任何持有 std::future 的线程)
在异步任务完成之前销毁了 std::future 对象,并且随后某个线程尝试通过该 std::future 访问结果,就会发生错误*/return 0;
}
#include "connection.hpp"using namespace ns_mq;int second = 0;void Callback(const Channel::ptr &cp, const std::string &consumer_tag, const BasicProperities *bp, const std::string &body)
{std::cout << consumer_tag << "消费了消息:" << body << "\n";if (bp != nullptr){cp->BasicAck(bp->msg_id());}else{std::cout << "无法ACK该消息,因为消息没有基础属性BasicProperities\n";}
}int main(int argc, char *argv[])
{if (argc != 2){std::cout << "Usage: " << argv[0] << " 声明队列后休眠的秒数\n";return 1;}second = std::stoi(argv[1]);Connection conn("127.0.0.1", 8888, std::make_shared<AsyncWorker>());Channel::ptr cp = conn.getChannel();cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareExchange("host1", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host1", "exclusive_queue1", true, true, false, {});cp->bind("host1", "exchange1", "exclusive_queue1", "news.sport.#");cp->BasicConsume("host1", "consumer" + std::to_string(second), "exclusive_queue1", std::bind(Callback, cp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), false);std::this_thread::sleep_for(std::chrono::seconds(second));cp->BasicPull();cp->BasicCancel();cp->unBind("host1", "exchange1", "exclusive_queue1");cp->eraseMsgQueue("host1", "exclusive_queue1");conn.returnChannel(cp);std::this_thread::sleep_for(std::chrono::seconds(10));return 0;
}

这里另一个消费者连队列的订阅都失败了,更别提消息确认了
在这里插入图片描述
下面是消息拉取:
把PUSH改为PULL,并且给消费者增加一个basicPull的调用
在这里插入图片描述
首先,先把消息发布了,这个消息是有效的,然后重启服务器
然后开始验证消息拉取:
在这里插入图片描述

5.独占队列的持久化验证

其实刚才那个就已经验证了,因为第一个消费者能够声明生产者持久化的那个队列(因为消息的确确认成功了)

三、独占队列自动删除的实现

1.独占队列自动删除的目的

独占队列主要是用来确保队列在创建它的连接关闭时自动删除
这个特性在测试环境中非常有用,
因为它可以帮助自动化测试脚本在测试完成后清理其产生的资源,避免测试间的干扰

2.设计

独占队列的自动删除是在连接断开之后自动触发的,跟普通的基于信道就能提供的服务是不一样的

因此,我们这个自动删除需要在broker服务器模块完成
而内存级别队列的删除,事关内存级别绑定信息,消息,消费者的删除,因此涉及到虚拟机管理模块和消费者管理模块

而broker服务器本身就有虚拟机管理模块和消费者管理模块,因此直接在broker服务器模块统领执行独占队列的自动删除即可

【因为连接关闭时,信道也早就关闭了,所以这个操作如果牵扯连接和信道,那么纯属浪费,而且不符合高内聚和低耦合】

因为这个操作是以TCP连接为单位的,不应牵扯信道

3.实现

1.队列模块:

遍历所有队列,删除独占标志为true,且connection_id匹配成功的队列
返回vector< string >里面是被删除的队列名,这些队列名需要用于删除绑定信息等等队列相关信息

// 基于连接删除独占队列
std::vector<std::string> eraseExclusiveMsgQueueByConnection(const std::string &connection_id)
{std::unique_lock<std::mutex> ulock(_mutex);std::vector<std::string> vec;for (auto iter = _mqp.begin(); iter != _mqp.end();){MsgQueue::ptr mp = iter->second;// 小心迭代器失效问题if (mp->exclusive && mp->connection_id == connection_id){vec.push_back(mp->name);iter=_mqp.erase(iter);}else ++iter;}return vec;
}

2.虚拟机模块

因为独占队列的删除只会删除内存级队列,不会影响持久化,所以我们需要在消息模块增加一个只删除内存级消息,不删除持久化消息的接口

1.消息模块

队列消息模块:

// 内存级别删除
void clearOnlyInMemory()
{std::unique_lock<std::mutex> ulock(_mutex);_waitpush_list.clear();_waitpull_list.clear();_waitpublish_map.clear();_waitack_map.clear();_durable_map.clear();_valid_count = _total_count = 0;
}

消息总体管理模块:

void destroyQueueMessageManagerOnlyInMemory(const std::string &qname)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){return;}qmmp = iter->second;_qmsg_map.erase(iter);}qmmp->clearOnlyInMemory();
}
2.绑定信息模块

绑定信息模块也是如此,需要提供一个内存级删除接口
BindingManager

bool removeMsgQueueBindingsOnlyInMemory(const std::string& qname)
{std::unique_lock<std::mutex> ulock(_mutex);// 遍历整张交换机表,删除队列信息for (auto &kv : _exchange_map){MsgQueueBindingMap &mqbm = kv.second;mqbm.erase(qname);}return true;
}
3.虚拟机模块

这里依然需要返回那个vector

std::vector<std::string> eraseExclusiveMsgQueueByConnection(const std::string &connection_id)
{std::vector<std::string> vec = _mqmp->eraseExclusiveMsgQueueByConnection(connection_id);// 遍历vector,删除所有绑定信息for(auto& qname:vec){// 必须是基于内存级删除该队列的所有绑定信息if (!_bmp->removeMsgQueueBindingsOnlyInMemory(qname)){default_info("删除独占队列的所有绑定信息失败, 队列名: %s",qname.c_str());return;}// 注意: 必须是基于内存级删除该队列的消息管理模块句柄!_mmp->destroyQueueMessageManagerOnlyInMemory(qname);}return vec;
}

虚拟机管理模块:在这里它需要遍历所有的虚拟机进行复用
因为消费者是以虚拟机名称和队列名称作为key进行组织的,所以需要返回pair

std::vector<std::pair<std::string,std::string>> eraseExclusiveMsgQueueByConnection(const std::string &connection_id)
{std::unique_lock<std::mutex> ulock(_mutex);std::vector<std::pair<std::string,std::string>> vec_sum;for(auto& kv:_vhmap){std::vector<std::string> vec_single= kv.second->eraseExclusiveMsgQueueByConnection(connection_id);for(auto& qname:vec_single){vec_sum.push_back(std::make_pair(kv.first,qname));}}return vec_sum;
}

3.broker服务器模块

因为消费者不会进行持久化,所以无需修改消费者模块

void OnConnection(const muduo::net::TcpConnectionPtr &conn)
{std::ostringstream oss;if (conn->connected()){// 创建连接_connection_manager_ptr->createConnection(conn, _codec, _consumer_manager_ptr, _vhost_manager_ptr, _pool_ptr);default_info("连接建立成功");}else{// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("断开连接,查看并删除独占队列时,没有找到连接对应的Connection对象");return;}// 2. 删除该连接对应的独占队列std::vector<std::pair<std::string, std::string>> vec = _vhost_manager_ptr->eraseExclusiveMsgQueueByConnection(myconn->connection_id());for (auto &kv : vec){// 注意: 销毁队列的消费者管理句柄_consumer_manager_ptr->destroyQueueConsumerManager(kv.first, kv.second);}// 3. 关闭连接_connection_manager_ptr->destroyConnection(conn);default_info("连接断开成功");}
}

4.测试

由于它是内存级删除,所以我们测试能否将消息发布到该队列即可

1.代码

消费者:声明虚拟机,交换机,独占队列,并进行绑定
不手动删除队列

#include "connection.hpp"using namespace ns_mq;int main()
{Connection conn("127.0.0.1", 8888, std::make_shared<AsyncWorker>());Channel::ptr cp = conn.getChannel();cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareExchange("host1", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host1", "exclusive_queue1", true, true, false, {});cp->bind("host1", "exchange1", "exclusive_queue1", "news.sport.#");std::this_thread::sleep_for(std::chrono::seconds(2));conn.returnChannel(cp);std::this_thread::sleep_for(std::chrono::seconds(10));return 0;
}

生产者:直接向对应交换机发送消息

void Callback(const Channel::ptr &cp, BasicPublishResponsePtr resp) {cout << resp->productor_tag() << " 收到消息的确认应答,消息体:" << resp->body() << ",消息处理情况:" << (resp->ok() ? "成功" : "失败") << ","<< resp->status_str() << "\n";
}int main()
{Connection conn("127.0.0.1", 8888, std::make_shared<AsyncWorker>());Channel::ptr cp = conn.getChannel();cp->ProductorBind("productor1", std::bind(Callback, cp, std::placeholders::_1));BasicProperities bp;bp.set_msg_id(UUIDHelper::uuid());bp.set_mode(DURABLE);bp.set_routing_key("news.sport.basketball");cp->BasicPublish("host1", "exchange1", &bp, "Hello World", PULL);std::this_thread::sleep_for(std::chrono::seconds(10));cp->ProductorUnBind();conn.returnChannel(cp);std::this_thread::sleep_for(std::chrono::seconds(10));
/*
防止异步编程当中的这种问题:异步任务未完成,但 std::future 已被销毁:如果主线程(或任何持有 std::future 的线程)
在异步任务完成之前销毁了 std::future 对象,并且随后某个线程尝试通过该 std::future 访问结果,就会发生错误
*/return 0;
}

2.演示

先启动消费者,然后启动生产者,如果对应消息发布失败,则证明对应队列自动删除成功
在这里插入图片描述
在这里插入图片描述
可见,当创建独占队列的消费者连接关闭之后,生产者便无法向那个独占队列发送消息了,因为他在内存级别被删除了

因此,我们成功在内存级别进行了删除

以上就是项目扩展四:交换机和队列的特性完善【自动删除与队列独占的实现】的全部内容


http://www.mrgr.cn/news/33607.html

相关文章:

  • Java是怎么处理死锁的
  • hive-拉链表
  • LeetCode讲解篇之238. 除自身以外数组的乘积
  • torch模型量化方法总结
  • HarmonyOS元服务与卡片
  • Spring AOP - 配置文件方式实现
  • Linux进阶命令-rsync daemon
  • 【通讯协议】S32K142芯片——LIN通信的学习和配置
  • 解决docker指令卡住的场景之一
  • KTH5702系列 低功耗、高精度 2D 霍尔旋转位置传感器 车规AEC-Q100
  • 01 基础request
  • linux之进程信号
  • 【网络安全】依赖混淆漏洞实现RCE
  • java Nio的应用
  • OpenCV特征检测(9)检测图像中直线的函数HoughLines()的使用
  • 命名管道详解
  • 用最容易理解的方法,实现LRU、LFU算法
  • C#如何把写好的类编译成dll文件
  • ArcGIS核密度分析(栅格处理范围与掩膜分析)
  • NLP:命名实体识别及案例(Bert微调)