当前位置: 首页 > news >正文

【Linux-进程间通信】消息队列

消息队列的引入以及基本概念

在Linux操作系统中,消息队列(Message Queue)是一种进程间通信(Inter-process Communication, IPC)机制,允许一个进程向另一个进程发送和接收消息。消息队列在操作系统内核中维护,允许消息以队列的形式存储,并且可以在发送和接收之间进行排队。这种机制在需要多个进程协作时非常有用,比如在客户端和服务器之间通信。

消息队列的引入主要是为了实现解耦、异步处理、削峰填谷、容错性以及系统的高可用性。在现代的分布式系统中,服务之间的直接依赖可能会导致系统复杂度增加,尤其是在面对高并发、大数据量处理时。消息队列能够作为中间层,通过在生产者和消费者之间插入一个队列,让系统各个模块之间相互独立、松耦合,从而提高系统的稳定性和性能。

1. 解耦

在传统系统中,模块A可能会直接调用模块B的接口并等待其处理结果。这种强依赖增加了系统的耦合性,使得模块之间的修改和升级变得困难。通过引入消息队列,模块A可以将消息放入队列,不需要知道B是如何处理的,B也可以独立地从队列中消费消息并处理。

2. 异步处理

很多场景下,生产者需要处理大量请求,如果每次请求都需要等待消费者完成处理,系统响应时间将被拉长。消息队列允许生产者将消息发送到队列中,然后立即返回,由消费者在后台异步处理消息,极大提升了系统的响应速度。

3. 削峰填谷

在面对流量激增的场景中,系统可能会出现瞬时的负载过高,导致宕机或服务不可用。通过消息队列,生产者可以将大量的请求写入队列,而消费者可以根据自身的处理能力从队列中读取消息。即使系统在某一时刻产生了大量消息,消费者也可以有序处理,不至于崩溃。

4. 容错性

消息队列通常支持消息的持久化和重试机制,即使在消费者处理消息时出现异常,也可以确保消息不会丢失。在消费者恢复后,它可以继续从队列中消费未处理的消息,保障系统的容错性。

消息队列的基本概念

1.消息:消息是消息队列中传递的基本单位,通常包含数据的负载和一些元数据,如消息的唯一ID、时间戳等。消息可以是字符串、JSON、XML或者任何其他格式的数据。

2.生产者:生产者是发送消息到队列的一方,它将消息按照指定的格式写入消息队列。生产者不需要关心消息的处理方式或处理时间,因而可以专注于业务逻辑。

3.消费者:消费者是从消息队列中接收消息并进行处理的一方。消费者可以是单个进程或多个进程,并且可以同时处理来自多个生产者的消息。

4.队列:队列是消息的存储结构,通常是先进先出(FIFO)的,但有时可以根据优先级或其他规则处理消息。队列中的消息可以存储一定时间,直到被消费或者过期。

5.Broker:消息中介(Broker)是负责管理消息的传递与存储的服务,它在生产者和消费者之间充当中间层,接收消息并确保它们被正确发送到目标消费者。常见的消息中介包括RabbitMQ、Kafka、ActiveMQ等。

消息队列与命名管道和共享内存的不同

消息队列的原理

消息队列的核心思想是将消息存储在一个有序队列中,发送进程将消息放入队列,接收进程从队列中读取消息。消息队列由操作系统内核维护,消息按照顺序或者优先级进行存放。消息队列的设计使得不同进程不需要同时存在就可以实现消息传递,这是异步通信的基础。

 系统中可能存在大量的消息队列,内核需要使用如下数据结构对消息队列进行管理

消息队列工作流程

生产者进程:通过msgsnd()将消息放入队列,并指定消息类型。如果消息队列已满,生产者进程可以选择等待队列有空位或直接返回错误。

消费者进程:通过msgrcv()从队列中读取消息,并指定需要的消息类型。如果没有符合条件的消息,消费者可以选择阻塞等待消息的到来或直接返回错误。

消息传递机制:消息队列在内核中排队,确保消息按照指定的顺序或优先级传递给接收者。

System V 消息队列的主要函数

msgget

msgget 是一个在 POSIX 系统中用于消息队列操作的函数。该函数用于获取一个消息队列的标识符,或者创建一个新的消息队列

 

参数:

  • key: 这是一个整型值,通常由 ftok 函数生成,用于唯一标识一个消息队列。如果这个值设置为 IPC_PRIVATE,则创建一个只能由调用进程访问的消息队列。

  • msgflg: 这是一个位掩码,用于指定创建消息队列时的权限和标志。它可以是以下值的按位或操作组合:

msgflg选项描述哦
IPC_CREAT如果消息队列不存在,则创建一个。
IPC_EXCL: 与 IPC_CREAT 一起使用确保创建一个新的消息队列。如果队列已经存在,则调用失败。
权限位这些位定义了谁可以读写该消息队列,类似于文件权限位,例如 0644

返回值:

  • 成功时,返回消息队列的标识符(一个非负整数)。

  • 失败时,返回 -1,并设置 errno 以指示错误。

#include <iostream>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>int main()
{key_t key = ftok("/home/wuxu/day27/func", 66);int msgid = msgget(key, IPC_CREAT | 0666);if (msgid == -1){perror("msgget");exit(1);}return 0;
}

可以借助ipcs -q查看系统中存在的消息队列

msgrcv

 

msgsnd 函数用于将一条消息发送到指定的消息队列。

参数:

  • msqid: 消息队列标识符,由 msgget 函数返回。

  • msgp: 指向消息结构的指针。消息结构通常定义如下:

    struct msgbuf {long mtype;       /* 消息类型 */char mtext[1];    /* 消息数据 */
    };

注意,mtext 字段的大小应该至少为 msgsz,并且结构体的大小应该足够大以包含 mtext 中的数据。

  • msgsz: msgpmtext 字段的大小,即要发送的数据的实际字节数。

  • msgflg: 控制消息发送的标志。如果设置为 0msgsnd 将阻塞直到消息队列有足够的空间来放置消息。可以设置的标志包括:IPC_NOWAIT: 如果消息队列已满,msgsnd 将不等待,而是立即返回错误。

返回值:

  • 成功时返回 0。

  • 失败时返回 -1,并设置 errno 以指示错误。

msgsnd

msgrcv 函数用于从指定的消息队列中接收一条消息。

 

参数:

  • msgsz: msgpmtext 字段的最大大小,即可以接收的数据的最大字节数。

  • msgtyp: 指定要接收的消息类型。可以是以下之一:

msgtyp描述
具体的消息类型值...
0表示接收队列中的第一个消息
>0表示接收类型等于或大于 msgtyp 的第一个消息
<0表示接收类型小于或等于 msgtyp 绝对值的最小类型消息
  • msgflg: 控制消息接收的标志

msgflg描述
0msgrcv 将阻塞直到消息队列中有消息可接收
IPC_NOWAIT如果没有消息可接收,msgrcv 将不等待,而是立即返回错误
MSG_NOERROR如果消息的大小大于 msgsz,则消息将被截断

返回值:

  • 成功时返回实际读取的消息字节数。

  • 失败时返回 -1,并设置 errno 以指示错误。

msgctl

msgctl是POSIX系统调用之一,用于控制消息队列的行为。它允许你获取消息队列的当前状态、设置消息队列的属性,或者删除消息队列

参数说明:

  • msqid:消息队列的标识符,由msgget函数返回。

  • cmd:要执行的命令

  • buf:指向msqid_ds结构体的指针,该结构体用于保存消息队列的属性或状态信息。如果cmd参数是IPC_RMID,则此参数可以为NULL

返回值:

  • 0:成功。如果函数调用成功,msgctl返回0。

  • -1:失败。如果函数调用失败,msgctl返回-1,并且设置全局变量errno来指示错误类型。可以使用perrorstrerror函数来获取与errno相关的错误信息。

cmd命令描述
IPC_STAT获取消息队列的当前状态,并将其存储在buf指向的msqid_ds结构体中
IPC_SET设置消息队列的属性,属性值由buf指向的msqid_ds结构体提供
IPC_RMID删除消息队列

msqid_ds结构体定义在<sys/msg.h>头文件中,它包含以下字段:

 【示例】获取消息队列的状态

#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>int main() {key_t key;int msgid;struct msqid_ds msq_ds;int result;// 生成keykey = ftok("/home/wuxu/day28/func", 5);// 获取消息队列标识符msgid = msgget(key, 0666 | IPC_CREAT);if (msgid == -1) {perror("msgget failed");exit(1);}// 获取消息队列状态result = msgctl(msgid, IPC_STAT, &msq_ds);if (result == -1) {perror("msgctl failed");exit(1);}printf("Message queue status:\n");printf("msg_perm.uid = %d\n", msq_ds.msg_perm.uid);printf("msg_perm.gid = %d\n", msq_ds.msg_perm.gid);printf("msg_qnum = %ld\n", msq_ds.msg_qnum);printf("msg_qbytes = %ld\n", msq_ds.msg_qbytes);printf("msg_lspid = %ld\n", msq_ds.msg_lspid);printf("msg_lrpid = %ld\n", msq_ds.msg_lrpid);printf("msg_stime = %ld\n", msq_ds.msg_stime);printf("msg_rtime = %ld\n", msq_ds.msg_rtime);printf("msg_ctime = %ld\n", msq_ds.msg_ctime);return 0;
}

设置消息队列状态

#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>int main()
{sleep(2);key_t key;int msgid;struct msqid_ds msq_ds;int result;// 生成keykey = ftok("/home/wuxu/day28/func", 65);// 获取消息队列标识符msgid = msgget(key, 0666 | IPC_CREAT);if (msgid == -1){perror("msgget failed");exit(1);}// 设置消息队列属性memset(&msq_ds, 0, sizeof(msq_ds));msq_ds.msg_perm.uid = getuid(); // 设置所有者为当前用户msq_ds.msg_perm.gid = getgid(); // 设置组为当前用户组msq_ds.msg_qbytes = 1024;       // 设置队列最大长度为1024字节result = msgctl(msgid, IPC_SET, &msq_ds);if (result == -1){perror("msgctl failed");exit(1);}printf("Message queue attributes set successfully.\n");return 0;
}

 


http://www.mrgr.cn/news/67349.html

相关文章:

  • IDEA启动提示Downloading pre-built shared indexes
  • 讲解JVM日志的查看及解决系统频繁GC问题
  • 【大数据技术基础 | 实验七】HBase实验:部署HBase
  • 【软考】错题分析1105
  • 青少年编程与数学 02-003 Go语言网络编程 02课题、网络分层模型
  • 智慧商城项目-VUE2
  • LLMs之Leaderboard:Chatbot Arena的简介、使用方法、案例应用之详细攻略
  • SIwave:释放 TDR(时域反射计)向导的强大功能
  • C++ | Leetcode C++题解之第543题二叉树的直径
  • 【1个月速成Java】基于Android平台开发个人记账app学习日记——第8天,完成注册登录并保存到数据库
  • GEE 使用 JavaScript 中的 API 自动删除文件夹内的所有资产
  • verilog-HDL基础
  • 超实惠的租借服务器训练深度学习方法
  • Renesas R7FA8D1BH (Cortex®-M85) 存储空间介绍
  • C语言 | Leetcode C语言题解之第543题二叉树的直径
  • SIwave:释放信号网络分析仪的强大功能
  • 使用AMD GPU进行图像分类的ResNet模型
  • ArcGIS006:ArcMap常用操作151-200例动图演示
  • 龙芯交叉编译openssl
  • Scala的包及其导入
  • Renesas R7FA8D1BH (Cortex®-M85) Flash的功能介绍
  • 【LeetCode】【算法】155. 最小栈
  • 11.6日志
  • RTMP推流H264和AAC
  • 计算机网络综合题
  • 【c++语言程序设计】字符串与浅层复制(深拷贝与浅拷贝)