linux
深度解析:Linux消息队列的底层实现原理
一、深度解析:Linux消息队列的底层实现原理
Linux消息队列的底层实现原理
Linux消息队列是一种进程间通信的方式,它允许一个进程向另一个进程发送消息。在Linux系统中,消息队列是通过内核提供的机制来实现的,它具有高效、可靠的特点,被广泛应用于各种领域。要理解Linux消息队列的底层实现原理,首先需要了解其数据结构和内核级实现。
内核数据结构:Linux消息队列的内核级实现是基于数据结构来管理消息的。在内核中,消息队列是由系统全局唯一的一个队列数组struct msgque
来管理的,每个消息队列都有一个唯一的标识符msgid
。消息队列中的消息是通过struct msg
结构来表示的,其中包括消息类型、消息数据和消息长度等信息。
系统调用:用户空间的进程可以通过系统调用来进行消息队列的创建、发送和接收操作。其中,msgget()
系统调用用于创建或获取一个消息队列,msgsnd()
系统调用用于向消息队列发送消息,msgrcv()
系统调用用于从消息队列接收消息。这些系统调用最终会通过内核中的相应函数来完成消息队列操作。
内核实现:内核中的消息队列实现涉及到诸多细节,包括进程间同步、锁机制、内存管理等。其中,内核需要保证消息队列的并发访问安全,防止数据竞争和消息丢失。同时,内核还需要对消息队列的消息进行缓存管理,确保消息的可靠传递和存储。
Linux消息队列的底层实现原理涉及到系统编程、进程间通信、内核数据结构等多个方面的知识,对于想深入了解Linux操作系统内部工作原理的开发人员和系统工程师来说,是一个很有价值的研究课题。
感谢您阅读本文,相信通过本文的介绍,能够更深入地了解Linux消息队列的内部实现原理,对相关领域的开发和应用有所帮助。
二、Linux工作队列和等待队列的区别?
工作队列中是即将要调度到的任务队列,等待队列是暂时被挂起的任务队列,或者有些任务无事可做休眠状态的任务,它们会在某些条件触发时恢复换入工作队列并进入执行状态,同样在工作队列中的任务在某个时刻也可以被换入到等待队列中
三、实现循环队列中入队列主要语句?
要实现循环队列的入队操作,首先需要判断队列是否已满。如果队列已满,则无法入队。如果队列未满,则将元素插入到队尾,并更新队尾指针。
如果队尾指针已经指向队列的最后一个位置,则将队尾指针指向队列的第一个位置,实现循环。
入队操作的主要语句包括判断队列是否已满的条件语句、插入元素的语句以及更新队尾指针的语句。这些语句的具体实现会根据编程语言和数据结构的不同而有所差异。
四、rocketmq延时队列实现原理?
RocketMQ是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠、万亿级容量、灵活可伸缩的消息发布与订阅服务。
它前身是MetaQ,是阿里基于Kafka的设计使用Java进行自主研发的。在2012年,阿里将其开源, 在2016年,阿里将其捐献给Apache软件基金会(Apache Software Foundation,简称为ASF),正式成为孵化项目。2017 年,Apache软件基金会宣布RocketMQ已孵化成为 Apache顶级项目(Top Level Project,简称为TLP ),是国内首个互联网中间件在 Apache上的顶级项目。
延迟消息
生产者把消息发送到消息队列中以后,并不期望被立即消费,而是等待指定时间后才可以被消费者消费,这类消息通常被称为延迟消息。
在RocketMQ中,支持延迟消息,但是不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。如果要支持任意时间精度,不能避免在Broker层面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免产生巨大的性能开销。
消息延迟级别分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。在发送消息时,设置消息延迟级别即可,设置消息延迟级别时有以下3种情况:
设置消息延迟级别等于0时,则该消息为非延迟消息。
设置消息延迟级别大于等于1并且小于等于18时,消息延迟特定时间,如:设置消息延迟级别等于1,则延迟1s;设置消息延迟级别等于2,则延迟5s,以此类推。
设置消息延迟级别大于18时,则该消息延迟级别为18,如:设置消息延迟级别等于20,则延迟2h。
延迟消息示例
首先,写一个消费者,用于消费延迟消息:
再写一个延迟消息的生产者,用于发送延迟消息:
运行生产者以后,就会发送一条延迟消息:
10秒钟后,消费者收到的这条延迟消息:
延迟消息的原理分析
以下分析的RocketMQ源码的版本号是4.7.1,版本不同源码略有差别。
CommitLog
在org.apache.rocketmq.store.CommitLog中,针对延迟消息做了一些处理:
可以看到,每一个延迟消息的主题都被暂时更改为SCHEDULE_TOPIC_XXXX,并且根据延迟级别延迟消息变更了新的队列Id。接下来,处理延迟消息的就是org.apache.rocketmq.store.schedule.ScheduleMessageService。
ScheduleMessageService
ScheduleMessageService是由org.apache.rocketmq.store.DefaultMessageStore进行初始化的,初始化包括构造对象和调用load方法。最后,再执行ScheduleMessageService的start方法:
遍历所有延迟级别,根据延迟级别获得对应队列的偏移量,如果偏移量不存在,则设置为0。然后为每个延迟级别创建定时任务,第一次启动任务延迟为1秒,第二次及以后的启动任务延迟才是延迟级别相应的延迟时间。
然后,又创建了一个定时任务,用于持久化每个队列消费的偏移量。持久化的频率由flushDelayOffsetInterval属性进行配置,默认为10秒。
定时任务
ScheduleMessageService的start方法执行之后,每个延迟级别都创建自己的定时任务,这里的定时任务的具体实现就在DeliverDelayedMessageTimerTask类之中,它核心代码是executeOnTimeup方法之中,我们来看一下主要部分:
如果没有获取到对应的消息队列,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:
如果没有获取到有效消息,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:
如果当前消息不到消费的时间,则在countdown毫秒后再执行任务。如果到消费的时间,就继续执行下面操作:
如果获取到消息,则继续执行下面操作:
清除了消息的延迟级别,并且恢复了真正的消息主题和队列Id,重新把消息发送到真正的消息队列上以后,消费者就可以立即消费了。
总结
经过以上对源码的分析,可以总结出延迟消息的实现步骤:
如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列Id为延迟级别减1。
消息进入SCHEDULE_TOPIC_XXXX的队列中。
定时任务根据上次拉取的偏移量不断从队列中取出所有消息。
根据消息的物理偏移量和大小再次获取消息。
根据消息属性重新创建消息,清除延迟级别,恢复原主题和队列Id。
重新发送消息到原主题的队列中,供消费者进行消费。
五、kafka延时队列实现原理?
延迟队列存储的是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
基于消息的延迟:指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,当然这也会对性能造成极大的影响。
基于队列的延迟: 设置不同延迟级别的队列,比如5s、10s、30s、1min、5mins、10mins等,每个队列中消息的延迟时间都是相同的,这样免去了延迟排序所要承受的性能之苦,通过一定的扫描策略(比如定时)即可投递超时的消息。
六、kafka延迟队列如何实现?
在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,
然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。
七、websocket怎么实现消息队列?
websocket是双向链接的。当成功连接之后,你可以获得一个客户端的socket。在需要主动发送数据的时候,只需要socket.send就可以发送数据了。当然前提是这个socket要依然有效。
八、Linux中的工作队列?
-P 是指定一个打印机, 一个主机可以连好几个打印机。 -P 是指定其中的一个。
-Plj5 是 指定 被主机识别为 lj5 的这个打印机。
lpq/lprm 加上-P的参数表示 可以查看 指定打印机下的打印队列 ,以及删除指 定打印机队列 中的某一个任务。
九、linux消息队列的优缺点?
优点:
消息队列提供了一种从进程向另一个进程发送一个数据块的方法。每个数据块都被认为是用一个类型,接收者进程接收的数据块可以有不同的类型值。我们可以通过发送消息来避免命名管道的同步和阻塞的问题。消息队列与管道不同的事,消息队列是基于消息的,而管道是基于字节流的,且消息队列的读取不一定是先入先出。
缺点:
消息队列和命名管道有一样的不足,就是每个消息的最大长度是有上限的(MSGMAX),每个消息队列的总的字节数是有上限的(MSGMNB),系统上消息队列的总数也是有一个上限(MSGMNI)。
十、rocketmq任意时间队列实现原理?
RocketMQ是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠、万亿级容量、灵活可伸缩的消息发布与订阅服务。
它前身是MetaQ,是阿里基于Kafka的设计使用Java进行自主研发的。在2012年,阿里将其开源, 在2016年,阿里将其捐献给Apache软件基金会(Apache Software Foundation,简称为ASF),正式成为孵化项目。2017 年,Apache软件基金会宣布RocketMQ已孵化成为 Apache顶级项目(Top Level Project,简称为TLP ),是国内首个互联网中间件在 Apache上的顶级项目。
延迟消息
生产者把消息发送到消息队列中以后,并不期望被立即消费,而是等待指定时间后才可以被消费者消费,这类消息通常被称为延迟消息。
在RocketMQ中,支持延迟消息,但是不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。如果要支持任意时间精度,不能避免在Broker层面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免产生巨大的性能开销。
消息延迟级别分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。在发送消息时,设置消息延迟级别即可,设置消息延迟级别时有以下3种情况:
设置消息延迟级别等于0时,则该消息为非延迟消息。
设置消息延迟级别大于等于1并且小于等于18时,消息延迟特定时间,如:设置消息延迟级别等于1,则延迟1s;设置消息延迟级别等于2,则延迟5s,以此类推。
设置消息延迟级别大于18时,则该消息延迟级别为18,如:设置消息延迟级别等于20,则延迟2h。
延迟消息示例
首先,写一个消费者,用于消费延迟消息:
再写一个延迟消息的生产者,用于发送延迟消息:
运行生产者以后,就会发送一条延迟消息:
10秒钟后,消费者收到的这条延迟消息:
延迟消息的原理分析
以下分析的RocketMQ源码的版本号是4.7.1,版本不同源码略有差别。
CommitLog
在org.apache.rocketmq.store.CommitLog中,针对延迟消息做了一些处理:
可以看到,每一个延迟消息的主题都被暂时更改为SCHEDULE_TOPIC_XXXX,并且根据延迟级别延迟消息变更了新的队列Id。接下来,处理延迟消息的就是org.apache.rocketmq.store.schedule.ScheduleMessageService。
ScheduleMessageService
ScheduleMessageService是由org.apache.rocketmq.store.DefaultMessageStore进行初始化的,初始化包括构造对象和调用load方法。最后,再执行ScheduleMessageService的start方法:
遍历所有延迟级别,根据延迟级别获得对应队列的偏移量,如果偏移量不存在,则设置为0。然后为每个延迟级别创建定时任务,第一次启动任务延迟为1秒,第二次及以后的启动任务延迟才是延迟级别相应的延迟时间。
然后,又创建了一个定时任务,用于持久化每个队列消费的偏移量。持久化的频率由flushDelayOffsetInterval属性进行配置,默认为10秒。
定时任务
ScheduleMessageService的start方法执行之后,每个延迟级别都创建自己的定时任务,这里的定时任务的具体实现就在DeliverDelayedMessageTimerTask类之中,它核心代码是executeOnTimeup方法之中,我们来看一下主要部分:
如果没有获取到对应的消息队列,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:
如果没有获取到有效消息,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:
如果当前消息不到消费的时间,则在countdown毫秒后再执行任务。如果到消费的时间,就继续执行下面操作:
如果获取到消息,则继续执行下面操作:
清除了消息的延迟级别,并且恢复了真正的消息主题和队列Id,重新把消息发送到真正的消息队列上以后,消费者就可以立即消费了。
总结
经过以上对源码的分析,可以总结出延迟消息的实现步骤:
如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列Id为延迟级别减1。
消息进入SCHEDULE_TOPIC_XXXX的队列中。
定时任务根据上次拉取的偏移量不断从队列中取出所有消息。
根据消息的物理偏移量和大小再次获取消息。
根据消息属性重新创建消息,清除延迟级别,恢复原主题和队列Id。
重新发送消息到原主题的队列中,供消费者进行消费。
热点信息
-
在Python中,要查看函数的用法,可以使用以下方法: 1. 使用内置函数help():在Python交互式环境中,可以直接输入help(函数名)来获取函数的帮助文档。例如,...
-
一、java 连接数据库 在当今信息时代,Java 是一种广泛应用的编程语言,尤其在与数据库进行交互的过程中发挥着重要作用。无论是在企业级应用开发还是...
-
一、idea连接mysql数据库 php connect_error) { die("连接失败: " . $conn->connect_error);}echo "成功连接到MySQL数据库!";// 关闭连接$conn->close();?> 二、idea连接mysql数据库连...
-
要在Python中安装modbus-tk库,您可以按照以下步骤进行操作: 1. 确保您已经安装了Python解释器。您可以从Python官方网站(https://www.python.org)下载和安装最新版本...