第七章:消息管理模块

news/2025/2/26 7:34:01

目录

第一节:代码实现

        1-1.消息持久化管理思想

        1-2.MessageMapper类

         1-3.QueueMessage类

        1-4.MessageManager

第二节:单元测试

下期预告:


        消息管理模块在mqserver下实现。

第一节:代码实现

        消息管理首先需要消息类,它之前在mq/mq_msg.pd.h中已经定义了,所以不用再定义,包含头文件即可。

        创建一个名为mq_message.hpp的文件,并添加"老生常谈"的内容:

#ifndef __M_MESSAGE_H__
#define __M_MESSAGE_H__
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"

#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <list>

namespace zd
{};

#endif

        1-1.消息持久化管理思想

        消息的管理是每个队列各自私有一个文件,这样某个队列操作时就不会影响其他队列了,文件名设置为"队列名.mqd"。

        当一条持久化消息被推送后,就需要删除它,但是并不是直接将它的从文件抹去,而是将其存储的有效性标志设置为"0",它就变成了无效消息。

        当消息大于2000且无效消息占比超过50%时,就会启动垃圾回收函数,它会将文件中的消息全部读取,如果读取到的消息是有效的,就保存在一个临时文件(.mqd.tmp)中,如果是无效的就不做处理,继续读取下一条。

        当消息读取完毕后,就删除原来的.mqd文件,将临时文件重命名成"队列名.mpd"。

        消息在文件中存储时,为了解决读取太长或者太短的问题,会先存储一个长度,这个长度占用4字节的数据,然后存储消息,即:

长度|消息|长度|消息|长度|消息|长度|消息|长度|消息|长度|消息.............

        这样就可以保存每次读取的消息完整了。

        1-2.MessageMapper类

        先在前面定义一些符号,方便使用:

    #define DATAFILE_SUBFIX ".mqd"    // 数据文件后缀
    #define TMPFILE_SUBFIX ".mqd.tmp" // 临时文件后缀

    using MessagePtr = std::shared_ptr<Message>;

        一个MessageMapper实例仅仅管理一个队列的持久化消息,所以它的构造函数是路径和队列名:

    // 一个消息队列的持久化管理类
    class MessageMapper
    {
    public:
        // base_dir:数据文件路径
        MessageMapper(std::string& base_dir,const std::string& qname):
        _qname(qname)
        {
            // 使用FileHelper创建 /base_dir/qname.mpd 文件
            if(base_dir.back() != '/')
                base_dir.push_back('/');
            _datafile = base_dir+qname+DATAFILE_SUBFIX;
            _tmpfile = base_dir+qname+TMPFILE_SUBFIX;
            assert(FileHelper::createDirectory(base_dir));
            // 文件不存在才创建,否则会清空文件内容
            if(FileHelper(_datafile).exists() == false)
            {
                // 创建文件
                FileHelper::createDirectory(FileHelper::parentDirectory(_datafile));
                FileHelper::createFile(_datafile);
            }
        }
    private:
        std::string _qname;
        std::string _datafile; // 含路径的数据文件名
        std::string _tmpfile;  // 含路径的临时文件名
    };

        创建数据文件的接口:

        // 创建数据文件
        bool createMsgFile()
        {
            bool ret = FileHelper::createFile(_datafile);
            if(ret == false)
            {
                LOG("创建队列文件失败,文件名%s",_datafile.c_str());
                return false;
            }
            return true;
        }

        清理队列数据的接口:

        // 删除数据文件/临时文件
        void removeMsgFile()
        {
            FileHelper::removeFile(_datafile);
            FileHelper::removeFile(_tmpfile);
        }

        向数据文件持久化消息的接口:

        // 向数据文件插入消息
        bool insert(MessagePtr& msg)
        {
            return insert(_datafile,msg);
        }
    private:
        bool insert(const std::string& filename,MessagePtr& msg)
        {
            // 1.序列化消息的属性和内容
            std::string body = msg->payload().SerializeAsString();
            // 2.获取长度  
            FileHelper helper(filename);
            size_t fsize = helper.size();
            size_t len = body.size();
            // 3.写入数据
                // 3.1.写入4字节长度
            if(helper.write((char*)&len,fsize,sizeof(size_t)) == false)
            {
                LOG("向队列文件写入失败,文件:%s",filename.c_str());
                return false;
            }
                // 3.2.写入主体内容
            if(helper.write(body.c_str(),fsize+sizeof(size_t),body.size()) == false)
            {
                LOG("向队列文件写入失败,文件:%s",filename.c_str());
                return false;
            }
            // 4.更新msg的实际存储位置和长度
            msg->set_offset(fsize+sizeof(size_t));
            msg->set_length(body.size());

            return true;
        }

        从数据文件中删除消息,只需要将有效位变成"0"即可:

        // 删除消息
        bool remove(MessagePtr& msg)
        {
            // 1.有效标志设置为false,0
            msg->mutable_payload()->set_valid("0");
            // 2.序列化msg
            std::string body = msg->payload().SerializeAsString();
            // 判断长度,防止后面的消息被覆盖
            if(body.size() != msg->length())
            {
                LOG("不能修改消息有效性,因为新旧数据长度不一致");
                return false;
            }

            // 3.写入修改后的数据
            FileHelper helper(_datafile);
            if(helper.write(body.c_str(),msg->offset(),body.size()) == false)
            {
                LOG("向队列文件写入失败,文件:%s",_datafile.c_str());
                return false;
            }
            return true;
        }

         最后是最重要的加载历史消息的函数,它也是垃圾回收函数,一定要弄懂它的工作逻辑:

        // 加载历史有效消息/垃圾回收
        std::list<MessagePtr> gc()
        {
            // 0.创建临时文件
            FileHelper::createFile(_tmpfile);

            std::list<MessagePtr> result;
            // 1.读取出文件中所有有效数据
            FileHelper helper(_datafile);
            size_t offset = 0;
            size_t fsize = helper.size();
            while(offset < fsize)
            {
                // 先读取消息长度
                size_t len;
                if(helper.read((char*)&len,offset,sizeof(size_t))==false)
                {
                    LOG("读取消息长度时发生错误");
                    abort();
                }
                offset+=sizeof(size_t);
                
                // 再读取实际内容
                std::string body;
                body.resize(len);
                if(helper.read(&body[0],offset,len)==false)
                {
                    LOG("读取消息主体时发生错误");
                    abort();
                }
                offset+=len;

                // 反序列化
                MessagePtr msgp = std::make_shared<Message>();
                msgp->mutable_payload()->ParseFromString(body);
                
                // 如果是无效消息,直接读取下一个
                if(msgp->payload().valid() == "0")
                {
                    LOG("加载到无效消息 %s",msgp->payload().body().c_str());
                    continue;
                }
                // 如果是有效消息,保存起来
                else
                {
                    LOG("加载到有效消息 %s",msgp->payload().body().c_str());
                    result.push_back(msgp);
                }
            }
            // 2.将有效消息序列化存储到临时文件
            for(auto& msg:result)
            {
                if(insert(_tmpfile,msg) == false)
                {
                    LOG("向临时文件写入失败");
                    return result;
                }
            }
            // 3. 删除原来的文件
            if(helper.removeFile()==false)
            {
                LOG("源文件删除失败");
                return result;
            }
            // 4.将临时文件"转正"
            if(FileHelper::rename(_tmpfile,_datafile) == false)
            {
                LOG("临时文件名修改失败");
                return result;
            }
            return result;
        }

        这个函数不仅在垃圾回收时会调用,在服务器初始化,拉取历史消息时也会调用,它会只保存有效消息,这与垃圾回收的理念不谋而合。

         1-3.QueueMessage类

        该类有一个MessageMapper句柄,它不仅管理单个队列的持久化消息,也会管理非持久化消息。

        构造函数和成员变量如下:

class QueueMessage
    {
    public:
        using ptr = std::shared_ptr<QueueMessage>;

        QueueMessage(std::string& base_dir,const std::string& qname):
        _mapper(base_dir,qname),
        _qname(qname),
        _valid_count(0),
        _total_count(0)
        {}
    private:
        std::mutex _mtx;
        std::string _qname;
        size_t _valid_count; // 持久化有效消息的数量
        size_t _total_count; // 持久化消息的数量,有效+无效
        std::list<MessagePtr> _waitpush_msgs; // 待推送消息
        std::unordered_map<std::string,MessagePtr> _durable_msgs; // 持久化消息,不包括无效消息
        std::unordered_map<std::string,MessagePtr> _waitack_msgs; // 待确认消息
        MessageMapper _mapper; // 持久化管理句柄
    };

        (1)_waitpush_msg:记录队列中还没有推送给订阅者的消息

        (2)_durable_msgs:记录队列中还未推送的有效的持久化消息

        (3)_waitack_msgs:记录已经推送,等待订阅者确认收到,然后删除的消息。

        第一个函数是恢复历史消息的函数,封装MessageMapper句柄的gc()接口,然后将提取到的消息(在文件中的必然是持久化消息),保存在 _durable_msgs 中:

        // 恢复历史消息
        bool recovery()
        {
            // 恢复历史消息
            std::unique_lock<std::mutex> lock(_mtx);
            _waitpush_msgs = _mapper.gc();
            for(auto& msg:_waitpush_msgs)
            {
                _durable_msgs.insert(std::make_pair(msg->payload().properties().id(),msg));
            }

            _valid_count = _total_count = _durable_msgs.size();
            return true;
        }

        然后是向队列插入消息的函数,一般来说,调用者应该给我们一个消息属性 bp ,如果给了,就把消息设置成给的属性。

        如果没有给,就设置成一些默认值。

        消息在内存(_watipush_msg)中管理起来之后,如果需要持久化,还要在数据文件中保存起来。

        // 向队列插入消息
        bool insert(const BasicProperties* bp,const std::string& body,bool queue_durable)
        {
            // 1.构造消息对象
            MessagePtr msg = std::make_shared<Message>();
            msg->mutable_payload()->set_body(body);
            if(bp != nullptr)
            {   
                // 如果消息设置为持久化,还需要队列也是持久化才进行持久化存储
                DeliveryMode mode = queue_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;
                msg->mutable_payload()->mutable_properties()->set_id(bp->id());
                msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
                msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());
            }
            else
            {
                // 如果消息没有设置是否持久化,那么队列持久化,消息默认持久化
                DeliveryMode mode = queue_durable ? DeliveryMode::DURABLE : DeliveryMode::UNDURABLE;
                msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());
                msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
                msg->mutable_payload()->mutable_properties()->set_routing_key("");
            }

            std::unique_lock<std::mutex> lock(_mtx);

            // 2.判断是否需要持久化
            if(msg->payload().properties().delivery_mode() == DeliveryMode::DURABLE)
            {
                msg->mutable_payload()->set_valid("1");
                // 3.持久化存储
                if(_mapper.insert(msg) == false)
                {
                    LOG("消息持久化失败,消息内容:%s",body.c_str());
                    return false;
                }
                // 添加到持久化管理
                _durable_msgs.insert(std::make_pair(msg->payload().properties().id(),msg));

                _valid_count+=1;
                _total_count+=1;
            }
            
            // 4.添加到待推送
            _waitpush_msgs.push_back(msg);
            return true;
        }

        其次是获取队首消息的接口,它是队列推送消息的第一步——取出消息:

        // 获取待推送队首的消息
        zd::MessagePtr front()
        {
            std::unique_lock<std::mutex> lock(_mtx);

            if(_waitpush_msgs.empty()) return nullptr;
            // 取出要推送的消息
            MessagePtr msg = _waitpush_msgs.front();
            _waitpush_msgs.pop_front();

            // 将消息放到待确认,等待确认后删除
            _waitack_msgs.insert(std::make_pair(msg->payload().properties().id(),msg));

            return msg;
        }

        然后是删除消息的接口,当服务器收到订阅者的应答后,就会调用它删除消息:

        // 删除一条消息
        bool remove(const std::string& msg_id)
        {
            std::unique_lock<std::mutex> lock(_mtx);
            // 1.从待确认中删除
            auto it = _waitack_msgs.find(msg_id);
            if(it == _waitack_msgs.end())
            {
                // 没有不需要删除
                LOG("要删除的待确认消息不存在");
                return true;
            }

            //!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
            // _waitack_msgs.erase(msg_id);
            // 此处就调用是不对的,调用之后消息对象就自动析构了,
            // 而持久化删除时还需要消息的长度(length)和存储位置(offset)
            //!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

            // 2.从持久化中删除
            if(it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE)
            {
                if(_mapper.remove(it->second) == false)
                {
                    LOG("消息去持久化失败");
                    return false;
                }
                _durable_msgs.erase(msg_id);
                
                // 文件中有效少1,无效多1,所以_total_count不变
                _valid_count-=1;
                // 判断是否需要进行垃圾回收
                gc();
            }
            _waitack_msgs.erase(msg_id);
            return true;
        }

         remove()接口的gc(),就是封装了MessageMapper中的gc(),专门用来进行垃圾回收:

    private:
        bool GccCheck()
        {
            // 消息数大于2000 && 有效消息比例小于50%
            return _total_count > 2000 && _valid_count*10/_total_count < 0.5*10;
        }

        void gc()
        {
            if(GccCheck())
            {
                auto msgs = _mapper.gc();

                // 遍历加载的所有有效消息,并将新位置更新到_durable_msgs中
                for(auto& msg:msgs)
                {
                    auto it = _durable_msgs.find(msg->payload().properties().id());
                    if(it == _durable_msgs.end())
                    {
                        LOG("垃圾回收后,有一条持久化消息未在内存中进行管理");
                        // 该消息已经在文件了,只在_waitpush_msgs和_durable_msgs中进行添加即可
                        _waitpush_msgs.push_back(msg);
                        _durable_msgs.insert(std::pair<std::string,zd::MessagePtr>(msg->payload().properties().id(),msg));
                    }
                    else
                    {
                        it->second->set_offset(msg->offset());
                        it->second->set_length(msg->length());
                    }
                }

                _valid_count = _total_count = msgs.size();
            }
        }

        最后是一些与单元测试有关的接口,实际工作中并不使用:

        size_t waitpush_count()
        {
            std::unique_lock<std::mutex> lock(_mtx);
            return _waitpush_msgs.size();
        }
        size_t waitack_count()
        {
            std::unique_lock<std::mutex> lock(_mtx);
            return _waitack_msgs.size();
        }
        size_t durable_count()
        {
            std::unique_lock<std::mutex> lock(_mtx);
            return _durable_msgs.size();
        }
        size_t total()
        {
            std::unique_lock<std::mutex> lock(_mtx);
            return _total_count;
        }
        void clear()
        {
            _mapper.removeMsgFile(); 
            _waitack_msgs.clear();
            _waitpush_msgs.clear();
            _durable_msgs.clear();
        }

        1-4.MessageManager

        一个队列可以管理之后,就可以实现管理多个队列的模块了。

        MessageManager有一个unordered_map成员,它将队列名与该队列的管理句柄映射起来:

    // 管理所有队列
    class MessageManager
    {
    public:

        using ptr = std::shared_ptr<MessageManager>;
        
        MessageManager(const std::string& base_dir):
        _base_dir(base_dir)
        {}
    private:
        std::mutex _mtx;
        std::string _base_dir;
        std::unordered_map<std::string,QueueMessage::ptr> _queue_msgs;
    };

        添加一个队列管理句柄的接口:

        // 添加一个消息队列
        void initQueueMesssge(const std::string& qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mtx);
                auto it = _queue_msgs.find(qname);
                if(it != _queue_msgs.end())
                {
                    return;
                }
                qmp = std::make_shared<QueueMessage>(_base_dir,qname);
                _queue_msgs.insert(std::make_pair(qname,qmp));
            }
            // 恢复历史数据,如果有的话
            qmp->recovery();
        }

        上述代码的_mtx并不会把 qmp->recovery(); 锁起来,因为每个队列的管理是独立的,队列执行操作自己不影响 MessageManager 操作其他队列,否则线程还需要等待队列操作完毕才能释放锁,这种写法提升了效率。

        移除一个队列的管理句柄:

        // 删除一个消息队列
        void destoryQueueMessage(const std::string& qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mtx);
                auto it = _queue_msgs.find(qname);
                if(it != _queue_msgs.end())
                {
                    return;
                }
                qmp = it->second;
                _queue_msgs.erase(qname);
            }
            qmp->clear();
        }

        向指定队列新增消息:

        // 向指定消息队列插入消息
        bool insert(const std::string& qname,BasicProperties* bp,const std::string& body,bool queue_durable)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mtx);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    LOG("向消息队列插入消息失败,消息队列 %s 不存在",qname.c_str());
                    return false;
                }
                qmp = it->second;
            }
            return qmp->insert(bp,body,queue_durable);
        }

        获取指定队列的队首消息:

        // 获取指定队列队首消息
        MessagePtr front(const std::string& qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mtx);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    LOG("向消息队列获取消息失败,消息队列 %s 不存在",qname.c_str());
                    return nullptr;
                }
                qmp = it->second;
            }
            return qmp->front();
        }

        确认/删除指定队列的消息:

        // 确认指定队列中的消息
        bool ack(const std::string& qname,const std::string& msg_id)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mtx);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    LOG("向消息队列确认消息失败,消息队列 %s 不存在",qname.c_str());
                    return false;
                }
                qmp = it->second;
            }
            return qmp->remove(msg_id);
        }

        单元测试相关接口:

        size_t waitpush_count(const std::string& qname)
        {
            {
                std::unique_lock<std::mutex> lock(_mtx);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    LOG("获取消息队列待推送消息数量失败,消息队列 %s 不存在",qname.c_str());
                    return 0;
                }
            }
            return _queue_msgs[qname]->waitpush_count();
        }
        size_t waitack_count(const std::string& qname)
        {
            QueueMessage::ptr qmp;
            {
                std::unique_lock<std::mutex> lock(_mtx);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    LOG("获取消息队列待确认消息数量失败,消息队列 %s 不存在",qname.c_str());
                    return 0;
                }
                qmp = it->second;
            }
            
            return qmp->waitack_count();
        }
        size_t durable_count(const std::string& qname)
        {
            {
                std::unique_lock<std::mutex> lock(_mtx);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {   
                    LOG("获取消息队列持久化有效消息数量失败,消息队列 %s 不存在",qname.c_str());
                    return 0;
                }
            }
            return _queue_msgs[qname]->durable_count();
        }
        size_t total(const std::string& qname)
        {
            {
                std::unique_lock<std::mutex> lock(_mtx);
                auto it = _queue_msgs.find(qname);
                if(it == _queue_msgs.end())
                {
                    LOG("获取消息队列总持久化消息数量失败,消息队列 %s 不存在",qname.c_str());
                    return 0;
                }
            }
            return _queue_msgs[qname]->total();
        }

        void clear()
        {
            std::unique_lock<std::mutex> lock(_mtx);
            for(auto& qmsg:_queue_msgs)
            {
                qmsg.second->clear();
            }
        }

第二节:单元测试

        在mqtest目录下创建mq_message_test.cc的文件,并测试以下内容:

#include "../mqserver/mq_message.hpp"
#include <gtest/gtest.h>
#include <iostream>
#include <unordered_map>

zd::MessageManager::ptr mmp;
// 全局测试套件------------------------------------------------
// 自己初始化自己的环境,使不同单元测试之间解耦
class MessageTest :public testing::Environment
{
public:
    // 全部单元测试之前调用一次
    virtual void SetUp() override
    {
        // std::cout << "单元测试执行前的环境初始化" << std::endl;
        mmp = std::make_shared<zd::MessageManager>("./data/");
        mmp->initQueueMesssge("q1");
    }   

    // 全部单元测试之后调用一次
    virtual void TearDown() override
    {
        // std::cout << "单元测试执行后的环境清理" << std::endl;
        // mmp->clear();
    }
};

// 单元测试
// 测试名称与类名称相同,则会先调用SetUp
TEST(MessageTest,MessageTest_test1_Test)
{
    std::cout << "单元测试-1" << std::endl;
    // 插入一些消息
    zd::BasicProperties bp;
    bp.set_delivery_mode(zd::DeliveryMode::DURABLE);
    bp.set_id(zd::UUIDHelper::uuid());
    bp.set_routing_key("");
    mmp->insert("q1",&bp,"Hello World-1",zd::DeliveryMode::DURABLE);
    mmp->insert("q1",nullptr,"Hello World-2",zd::DeliveryMode::DURABLE);
    mmp->insert("q1",nullptr,"Hello World-3",zd::DeliveryMode::DURABLE);
    mmp->insert("q1",nullptr,"Hello World-4",zd::DeliveryMode::DURABLE);

    ASSERT_EQ(mmp->durable_count("q1"),4);
    ASSERT_EQ(mmp->total("q1"),4);
    ASSERT_EQ(mmp->waitack_count("q1"),0);
    ASSERT_EQ(mmp->waitpush_count("q1"),4);

    // 删除一条消息
    mmp->front("q1");
    mmp->ack("q1",bp.id());
    ASSERT_EQ(mmp->durable_count("q1"),3); 
    ASSERT_EQ(mmp->total("q1"),4); 
    ASSERT_EQ(mmp->waitack_count("q1"),0);
    ASSERT_EQ(mmp->waitpush_count("q1"),3);
}

// 单元测试全部结束后调用TearDown
 
// ----------------------------------------------------------
int main(int argc,char** argv)
{
    testing::InitGoogleTest(&argc,argv);

    testing::AddGlobalTestEnvironment(new MessageTest); // 注册Test的所有单元测试

    if(RUN_ALL_TESTS() != 0) // 运行所有单元测试
    {
        printf("单元测试失败!\n");
    }
    return 0;
}

        因为使用了BasicProperties构造了一个类,所以编译时还需要加上 mq_msg.pb.cc文件:

mq_message_test:mq_message_test.cc ../mqcommon/mq_msg.pb.cc
	g++ -std=c++14 $^ -o $@ -lgtest -lprotobuf

        执行结果:

                                

        再测试拉取历史数据的功能,将insert全部注释掉,保留第一条消息的insert(因为它被删除了,重新插入一下),将删除消息也注释掉:

TEST(MessageTest,MessageTest_test1_Test)
{
    std::cout << "单元测试-1" << std::endl;
    // 插入一些消息
    zd::BasicProperties bp;
    bp.set_delivery_mode(zd::DeliveryMode::DURABLE);
    bp.set_id(zd::UUIDHelper::uuid());
    bp.set_routing_key("");
    mmp->insert("q1",&bp,"Hello World-1",zd::DeliveryMode::DURABLE);
    // mmp->insert("q1",nullptr,"Hello World-2",zd::DeliveryMode::DURABLE);
    // mmp->insert("q1",nullptr,"Hello World-3",zd::DeliveryMode::DURABLE);
    // mmp->insert("q1",nullptr,"Hello World-4",zd::DeliveryMode::DURABLE);

    ASSERT_EQ(mmp->durable_count("q1"),4);
    ASSERT_EQ(mmp->total("q1"),4);
    ASSERT_EQ(mmp->waitack_count("q1"),0);
    ASSERT_EQ(mmp->waitpush_count("q1"),4);

    // 删除一条消息
    // mmp->front("q1");
    // mmp->ack("q1",bp.id());
    // ASSERT_EQ(mmp->durable_count("q1"),3); 
    // ASSERT_EQ(mmp->total("q1"),4); 
    // ASSERT_EQ(mmp->waitack_count("q1"),0);
    // ASSERT_EQ(mmp->waitpush_count("q1"),3);
}

        执行结果:

        ​​​​​​​        ​​​​​​​        

        第一条是无效消息,后三条都是有效消息,加上又插入的1条消息,共四条消息,仍然不会报错。

        那么消息管理模块也完成了。

下期预告:

        交换机管理模块、队列管理模块、绑定管理模块、消息管理模块都完成了,之后将这四个模块整合成虚拟机模块。

        虚拟机模块实际上很简单,只需要封装四个子模块的接口即可。 


http://www.niftyadmin.cn/n/5868310.html

相关文章

交换机与路由器连接方式

交换机和路由器连接的三种主要方式如下&#xff1a; 一、直连连接 这是最简单直接的连接方式。通过一根网线将交换机的一个端口与路由器的一个LAN端口相连。这种连接方式适用于小型网络&#xff0c;其中交换机负责局域网内部的数据交换&#xff0c;而路由器则负责将内部网络连接…

回归分析中的回归含义的理解

“回归”这个词在回归分析中有着特定的历史背景和统计意义&#xff0c;它的含义与现代汉语中的“回归”有所不同。以下是详细的解释&#xff1a; 1. 回归的起源 历史背景&#xff1a;回归分析最早由英国统计学家弗朗西斯高尔顿&#xff08;Francis Galton&#xff09;在19世纪…

SpringBoot使用Jasypt对YML文件配置内容进行加密(例:数据库密码加密)

SpringBoot使用Jasypt对YML文件配置内容进行加密&#xff08;例&#xff1a;数据库密码加密&#xff09; 前言 在SpringBoot的项目开发中&#xff0c;大多数情况下 yml 配置文件中存储的密码均以明文形式展示&#xff0c;这种方式显然存在较大的安全隐患。一旦有开发人员离职&…

RGMII(Reduced Gigabit Media Independent Interface)详解

一、RGMII的定义与作用 RGMII&#xff08;精简版千兆介质无关接口&#xff09;是一种用于千兆以太网&#xff08;1Gbps&#xff09;的高效接口标准&#xff0c;旨在减少传统GMII接口的引脚数量&#xff0c;同时保持相同的传输速率。其核心作用包括&#xff1a; 减少引脚数量&a…

K8s部署主从结构MySQL服务

01 介绍 RC、Deployment、DaemonSet都是面向无状态的服务,它们所管理的Pod的IP、名字、启停顺序等都是随机分配的,而StatefulSet,管理所有有状态的服务。 StatefulSet为了解决有状态服务的问题,它所管理的Pod拥有固定的Pod名称,一定的启停顺序,在StatefulSet中,Pod名字…

灵犀互娱游戏测试开发一面面经

阿里的子公司, 做的是游戏业务, 所以投递的时候把简历上加上了自己的游戏经历. 面试官大哥也围绕着游戏问了一些问题, 面试体验很好~~ 1. 介绍一下自己 (巴拉巴拉一顿说) 2. 看你的简历上有写关于用友的实习, 可以介绍一下那边的业务吗? 还有为什么那边用的c#来写自动化脚…

Hadoop 常用命令汇总

Hadoop 常用命令汇总 查看帮助信息查看指定目录文件列表上传文件下载文件移动文件/重命名拷贝文件查找文件查看内容其他命令 HDFS 文件操作命令风格有两种&#xff0c;两种命令效果一样 hadoop fs 开头 hdfs dfs 开头 查看帮助信息 hadoop fs -help [cmd] 查看指定目录文件列表…

深入探讨分布式事务解决方案:从二阶段提交到现代模式

在当今的软件开发领域&#xff0c;随着微服务架构和分布式系统的普及&#xff0c;如何保证跨多个服务或数据库的操作的一致性和可靠性成为了开发者面临的重要挑战之一。分布式事务的概念应运而生&#xff0c;旨在解决这些系统中数据一致性的问题。然而&#xff0c;实现分布式事…