🚛RocketMQ5.x教程-从安装到实战到经典面试题

1.RocketMQ介绍

RocketMQ是一款由阿里巴巴开源的分布式消息中间件。它具有低延迟、高吞吐量、高可用性和高可靠性等特点,适用于构建具有海量消息堆积和异步解耦功能的应用系统。

1.1.基本概念

●生产者(Producer):也称为消息发布者,是RocketMQ中用来构建并传输消息到服务端的运行实体。
●主题(Topic):Topic是RocketMQ中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息;Topic是一个逻辑概念,并不是实际的消息容器;
●消息队列(MessageQueue):队列是RocketMQ中消息存储和传输的实际容器,也是消息的最小存储单元。
●消费者(Consumer):也称为消息订阅者,是RocketMQ中用来接收并处理消息的运行实体。
●消费者组(ConsumerGroup):消费者组是RocketMQ中承载多个消费行为一致的消费者负载均衡分组。和消费者不同,消费者组是一个逻辑概念。
●NameServer:可以理解成注册中心,负责更新和发现Broker服务。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的,它是无状态的。
●Broker:可以理解为消息中转角色,负责消息的存储和转发,接收生产者产生的消息并持久化消息;当用户发送的消息被发送到Broker时,Broker会将消息转发到与之关联的Topic中,以便让更多的接收者进行处理。

1.2.消息模型

image.png

1.3.部署模型

image.png

2.下载RocketMQ

RocketMQ的官网地址:https://rocketmq.apache.org/
Github地址:https://github.com/apache/rocketmq
下载地址:https://rocketmq.apache.org/zh/download/
当前最新的版本为5.1.0,本教程安装5.1.0版本。

3.安装RocketMQ

3.1.安装前需要准备一个CentOS7的Linux机器,使用的Linux版本如下:

3.2.安装JDK

推荐使用JDK1.8版本。可以使用课件资料包或者自行下载tar.gz包。

1.统一规划目录,创建app文件夹,在创建jdk文件夹,将jdk的包上载到 /app/jdk目录下然后解压

2.配置jdk环境变量,将JAVA_HOME变量加上;将path路径替换成相应配置

3.更新配置并查看jdk版本,显示以下信息则安装成功。

3.3.安装RocketMQ

3.3.1.将安装包上载到 /app/rocketMQ目录下

3.3.2.配置rocketMQ环境变量

3.3.3.更新配置

ROCKETMQ_HOME的环境变量是必须要单独配置的,如果不配置的话,启动NameSever和Broker都会报错。这个环境变量的作用是用来加载$ROCKETMQ_HOME/conf下的除broker.conf以外的几个配置文件。所以实际情况中,可以不按这个配置,但是一定要能找到配置文件。这样RocketMQ就安装完成了。

3.4.RocketMQ工作原理

官网5.0版本速览链接:https://rocketmq.apache.org/zh/version/
RocketMQ5.0 引入了全新的弹性无状态代理模式,将当前的Broker职责进行拆分,对于客户端协议适配、权限管理、消费管理等计算逻辑进行抽离,独立无状态的代理角色提供服务,Broker则继续专注于存储能力的持续优化。值得注意的是RocketMQ 5.0的全新模式是和4.0的极简架构模式相容相通的,5.0的代理架构完全可以以Local模式运行,实现与4.0架构完全一致的效果。开发者可以根据自身的业务场景自由选择架构部署,本教程也是部署的Local模式。
RocketMQ启动流程如下:
■启动NameServer
■启动Broker

3.5.NameServer服务搭建

启动NameServer非常简单,在$ROCKETMQ_HOME/bin目录下有个mqnamesrv。直接执行这个脚本就可以启动RocketMQ的NameServer服务。
由于RocketMQ默认预设的JVM内存是4G,这是RocketMQ给我们的最佳配置。但是通常我们用虚拟机的话都是不够4G内存的,所以需要调整下JVM内存
大小。修改的方式是直接修改runserver.sh。

3.5.1.修改NameServer启动配置

3.5.2.启动NameServer

NameServer的配置修改完成,然后我们用静默启动的方式启动NameServer服务,启动完成后在nohup.out里看到这一条关键日志就是启动成功。并且使用jps指令可以看到有一个NamesrvStartup进程。

3.6.Broker服务搭建

启动Broker的脚本是runbroker.sh。Broker的默认预设内存是8G,启动前,如果内存不够,同样需要调整下JVM内存。修改的方式是直接修改runbroker.sh。

3.6.1.修改broker启动脚本配置

3.6.2.修改broker配置文件

3.6.3.启动broker服务

Broker的配置修改完成,然后我们用静默启动的方式启动Broker服务,同样是检查nohup.out日志, 并且jps指令可以看到一个BrokerStartup进程。

3.7.测试RocketMQ消息发送与消费

在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。

1.我们在bin录下执行以下命令测试消息发送,默认会发1000条消息,发送完成自动关闭

2.出现以下提示则代表消息发送成功

3.执行以下命令测试消息接收,Consumer执行不会自动关闭,会一直挂起等待新消息过来;

4.出现以下提示则代表消息接收成功

3.8.关闭RocketMQ服务

在bin目录下通过脚本关闭服务:

1.关闭Broker

2.关闭NameServer

3.查看服务

至此RockMQ单机测试成功,接下来搭建集群。

4.RocketMQ集群架构

刚才的演示中,我们已经体验到了RocketMQ是如何工作的。我们回头看RocketMQ的集群架构,就能够有更全面的理解了。

4.1.RocketMQ集群架构解析

一个完整的RocketMQ集群中,有如下几种角色 :
●Producer:消息的发送者;举例:发信者
●Consumer:消息接收者;举例:收信者
●Broker:暂存和传输消息;举例:邮局
●NameServer:管理Broker;举例:各个邮局的管理机构
●Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息 。
●Message Queue:相当于是Topic的分区;用于并行发送和接收消息。

4.2.RocketMQ集群搭建

准备三台虚机,并配置机器名。可以利用安装好的虚机通过克隆出另外两个机器。

4.2.1.系统配置

1.使用vi /etc/hosts命令,配置机器名,在文件末尾加上以下配置:
2.服务之间设置免密登陆,三个机器都使用ssh-keygen生成秘钥。提示录入直接回车即可
3.三个机器都使用以下命令分发给其他机器,输入yes,然后输入密码;这样可以直接某个机器使用ssh或者scp到另外的机器。
4.停止并禁用防火墙或者删除防火墙,我这边使用的是删除防火墙。

4.2.2.配置RocketMQ主从集群

使用conf/2m-2s-async下的配置文件搭建一个2主2从异步刷盘的集群。设计的集群情况如下:

机器名

nemaeServer节点部署

broker节点部署

worker1

nameserver

worker2

nameserver

broker-a,broker-b-s

worker3

nameserver

broker-b,broker-a-s

4.2.2.1.配置方式:conf目录下存在三种配置方式

●2m-2s-async:2主2从异步刷盘(吞吐量较大,但是消息可能丢失)
●2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全)
●2m-noslave:2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置
而dleger就是用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。

4.2.2.2.搭建2主2从模式,配置2m-2s-async目录Broker文件:

这样2主2从的集群配置基本就完成了。搭建过程中需要注意的配置项:
■同一机器上两个实例的store目录不能相同,否则会报错 Lock failed,MQ already started
■同一机器上两个实例的listenPort也不能相同。否则会报端口占用的错
■如果是多网卡的机器,比如云服务器,那么需要在broker.conf中增加brokerIP1属性,指定所在机器的外网网卡地址。

4.2.3.启动集群

由于我们之前已经在worker1单机部署过,所以相关的启动jvm参数已经调整过,如果是新配置需要注意jvm参数根据实际的内存大小分配。其他两个机器是克隆过来的所以无需在进行调整,nameServer不需要进行配置,直接启动nameServer即可。这也看出nameserver是无状态的。
RocketMQ5.X版本兼容之前旧版本的启动方式,即如下部署方式:

4.2.3.1.启动worker1、worker2、worker3的nameServer,并观察启动日志
4.2.3.2.worker2上启动broker-a节点与broker-b-s节点
4.2.3.3.worker3上启动broker-b节点与broker-a-s节点
4.2.3.4.使用测试工具测试消息收发

RocketMQ5.X版本兼容之前旧版本部署完成。在部署新版之前先通过maven安装一个rocketmq-dashboard可视化界面查看我们的集群。

4.2.4.安装rocketmq-dashboard

4.2.4.1.在1号机通过maven安装dashboard,所以要先安装maven服务
4.2.4.2.配置maven环境变量
4.2.4.3.更新环境配置,查看maven是否成功
4.2.4.4.修改maven仓库配置
4.2.4.5.安装dashboard
4.2.4.6.编译dashboard
4.2.4.7.启动dashboard
4.2.4.8.启动成功后访问:http://192.168.43.134:8080/#/ 操作dashboard界面:

4.2.5.部署5.x版本-Local模式

Apache RocketMQ 5.0 版本完成基本消息收发,包括 NameServer、Broker、Proxy 组件。 在 5.0 版本中 Proxy 和 Broker 根据实际诉求可以分为 Local 模式和 Cluster 模式,一般情况下如果没有特殊需求,或者遵循从早期版本平滑升级的思路,可以选用Local模式。
●在 Local 模式下,Broker 和 Proxy 是同进程部署,只是在原有 Broker 的配置基础上新增 Proxy 的简易配置就可以运行。
●在 Cluster 模式下,Broker 和 Proxy 分别部署,即在原有的集群基础上,额外再部署 Proxy 即可。

4.2.5.1.关闭worker2,worker3的broker服务
4.2.5.2.使用Local方式部署,每个机器只能部署一个broker,否则会出现端口占用的异常,这里启用worker2的broker-a与worker3的broker-b节点。
4.2.5.3.测试消息收发
4.2.5.4.登录dashboard页面查看注册成功
4.2.5.5.其他部署模式

官网还提供了其他部署模式,有兴趣的小伙伴可以自行研究,官网部署方式:https://rocketmq.apache.org/zh/docs/deploymentOperations/01deploy
集群部署搭建的过程我们到此结束,接下来我们使用官方提供的exmaple代码进行实战。

5.官方API实战

实战之前,我们需要先搭建一个基于Maven的springboot项目,只需要加入以下依赖:
接下来使用IDEA搭建一个Maven项目:

工程创建后,我们添加Pom依赖:

这样工程就搭建完成了,接下来我们进入API实战。

5.1.基本样例

消息生产者分别通过三种方式发送消息:
●同步发送:等待消息返回后再继续进行下面的操作。
●异步发送:不等待消息返回直接进入后续流程。broker将结果返回后调用callback函数,并使用CountDownLatch计数。
●单向发送:只负责发送,不管消息是否发送成功。
消费者消费消息分两种:
●拉模式:消费者主动去Broker上拉取消息。
●推模式:消费者等待Broker把消息推送过来。
通常情况下,用推模式比较简单。需要注意DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。
●LitePullConsumerSubscribe:随机获取一个queue消息
●LitePullConsumerAssign:指定一个queue消息

5.2.顺序消息

顺序消息指生产者局部有序发送到一个queue,但多个queue之间是全局无序的。
●顺序消息生产者样例:通过MessageQueueSelector将消息有序发送到同一个queue中。
●顺序消息消费者样例:通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取(通过加锁的方式实现)。

5.3.广播消息

广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。
●MessageModel.BROADCASTING:广播消息。一条消息会发给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组。
●MessageModel.CLUSTERING:集群消息。每一条消息只会被同一个消费者组中的一个实例消费。

5.4.延迟消息

延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。
●message.setDelayTimeLevel(3):预定日常定时发送。1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;可以在dashboard中broker配置查看。
●msg.setDelayTimeMs(10L):指定时间定时发送。默认支持最大延迟时间为3天,可以根据broker配置:timerMaxDelaySec修改。

5.5.批量消息

批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。
批量消息的使用限制:
●消息大小不能超过4M,虽然源码注释不能超1M,但是实际使用不超过4M即可。平衡整体的性能,建议保持1M左右。
●相同的Topic,
●相同的waitStoreMsgOK
●不能是延迟消息、事务消息等

5.6.过滤消息

在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。

5.6.1.使用Tag方式过滤(通过consumer.subscribe(“TagFilterTest”, “TagA || TagC”)实现):

Tag是RocketMQ中特有的一个消息属性。
RocketMQ的最佳实践中就建议使用RocketMQ时,一个应用可以就用一个Topic,而应用中的不同业务就用Tag来区分。
Tag方式有一个很大的限制,就是一个消息只能有一个Tag,这在一些比较复杂的场景就有点不足了。 这时候可以使用SQL表达式来对消息进行过滤。

5.6.2.使用Sql方式过滤(通过MessageSelector.bySql(String sql)参数实现):

这里面的sql语句是按照SQL92标准来执行的。sql中可以使用的参数有默认的TAGS和一个在生产者中加入的自定义属性。
SQL92语法:
RocketMQ只定义了一些基本语法来支持这个特性。我们可以很容易地扩展它。
●数值比较,比如:>,>=,<,<=,BETWEEN,=;
●字符比较,比如:=,<>,IN;
●IS NULL ,IS NOT NULL;
●逻辑符号 AND,OR,NOT;
常量支持类型为:
●数值,比如:123,3.1415;
●字符,比如:’abc’,必须用单引号包裹起来;
●NULL,特殊的常量
●布尔值,TRUE 或 FALSE
使用注意:
●只有推模式的消费者可以使用SQL过滤。拉模式是用不了的;
●另外消息过滤是在Broker端进行的,提升网络传输性能,但是broker服务会比较繁忙。(consumer将过滤条件推送给broker端)

5.7.事务消息

这个事务消息是RocketMQ提供的一个非常有特色的功能,需要着重理解。

5.7.1.什么是事务消息

事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。

5.7.2.事务消息的实现机制

事务消息机制的关键是在发送消息时会将消息转为一个half半消息,并存入RocketMQ内部的一个Topic(RMQ_SYS_TRANS_HALF_TOPIC),这个Topic对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。

5.7.3.事务消息的编程模型

事务消息只保证消息发送者的本地事务与发消息这两个操作的原子性,因此,事务消息的示例只涉及到消息发送者,对于消息消费者来说,并没有什么特别的。
事务消息的关键是在TransactionMQProducer中指定了一个TransactionListener事务监听器,这个事务监听器就是事务消息的关键控制器。

5.7.4.事务消息的使用限制

●事务消息不支持延迟消息和批量消息。
●为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过N次的话(N = transactionCheckMax)则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。可以通过重写AbstractTransactionCheckListener类来修改这个行为。
●事务性消息可能不止一次被检查或消费。

6.RocketMQ使用中常见的问题

6.1.RocketMQ如何保证消息不丢失

我们将消息流程分为三大部分,每一部分都有可能会丢失数据。
●生产阶段:Producer通过网络将消息发送给Broker,这个发送可能会发生丢失。比如网络延迟不可达等。
●存储阶段:Broker肯定是先把消息放到内存的,然后根据刷盘策略持久化到硬盘中。刚收到Producer的消息,放入内存,但是异常宕机了,导致消息丢失。
●消费阶段:消费失败。比如先提交ack再消费,处理过程中出现异常,该消息就出现了丢失。
解决方案:
●生产阶段:使用同步发送失败重试机制;异步发送重写回调方法检查发送结果;Ack确认机制。
●存储阶段:同步刷盘机制;集群模式采用同步复制。
●消费阶段:正常消费处理完成才提交ACK;如果处理异常返回重试标识。
除了上述,在生产阶段与消费者阶段部分消息还需要确保消息顺序消费。

6.2.RocketMQ的消息持久化机制

RocketMQ的消息持久化机制是指将消息存储在磁盘上,以确保消息能够可靠地存储和检索。RocketMQ 的消息持久化机制涉及到以下三个角色:CommitLog、ConsumeQueue 和 IndexFile。
●CommitLog:消息真正的存储文件,所有的消息都存在 CommitLog文件中。
RocketMQ默认会将消息数据先存储到内存中的一个缓冲区,每当缓冲区中积累了一定量的消息或者一定时间后,就会将缓冲区中的消息批量写入到磁盘上的 CommitLog 文件中。消息在写入 CommitLog 文件后就可以被消费者消费了。
Commitlog文件的大小固定1G,写满之后生成新的文件,并且采用的是顺序写的方式。
●ConsumeQueue:消息消费逻辑队列,类似数据库的索引文件。
RocketMQ 中每个主题下的每个消息队列都会对应一个 ConsumeQueue。ConsumeQueue存储了消息的offset以及该offset对应的消息在CommitLog文件中的位置信息,便于消费者快速定位并消费消息。
每个ConsumeQueue文件固定由30万个固定大小20byte的数据块组成;据块的内容包括:msgPhyOffset(8byte,消息在文件中的起始位置)+msgSize(4byte,消息在文件中占用的长度)+msgTagCode(8byte,消息的tag的Hash值)。
●IndexFile:消息索引文件,主要存储消息Key与offset的对应关系,提升消息检索速度。
如果生产者在发送消息时设置了消息Key,那么RocketMQ会将消息Key值和消息的物理偏移量(offset)存储在IndexFile文件中,这样当消费者需要根据消息Key查询消息时,就可以直接在IndexFile文件中查找对应的offset,然后通过 ConsumeQueue文件快速定位并消费消息。
IndexFile文件大小固定400M,可以保存2000W个索引。
三个角色构成的消息存储结构如下:

消息存储过程:

6.3.RocketMQ如何保证消息顺序

RocketMQ架构本身是无法保证消息有序的,但是提供了相应的API保证消息有序消费。RocketMQ API利用FIFO先进先出的特性,保证生产者消息有序进入同一队列,消费者在同一队列消费就能达到消息的有序消费。
●使用MessageQueueSelector编写有序消息生产者
有序消息生产者会按照一定的规则将消息发送到同一个队列中,从而保证同一个队列中的消息是有序的。RocketMQ 并不保证整个主题内所有队列的消息都是按照发送顺序排列的。
●使用MessageListenerOrderly进行顺序消费与之对应的MessageListenerConcurrently并行消费(push模式)
MessageListenerOrderly是RocketMQ 专门提供的一种顺序消费的接口,它可以让消费者按照消息发送的顺序,一个一个地处理消息。这个接口支持按照消息的重试次数进行顺序消费、订单ID等作为消息键来实现顺序消费、批量消费等操作。
通过加锁的方式实现(有超时机制),一个队列同时只有一个消费者;并且存在一个定时任务,每隔一段时间就会延长锁的时间,直到整个消息队列全部消费结束。
●消费端自己保证消息顺序消费(pull模式)
●消费者并发消费时设置消费线程为1
RocketMQ 的消费者可以开启多个消费线程同时消费同一个队列中的消息,如果要保证消息的顺序,需要将消费线程数设置为1。这样,在同一个队列中,每个消息只会被单个消费者线程消费,从而保证消息的顺序性
rokectMQ消息模型:

6.4.RocketMQ的事务消息原理

RocketMQ 的事务消息是一种保证消息可靠性的机制。在RocketMQ中,事务消息的实现原理主要是通过两个发送阶段和一个确认阶段来实现的。
●发送消息的预处理阶段:在发送事务消息之前,RocketMQ 会将消息的状态设置为“Preparing”,并将消息存储到消息存储库中。
●执行本地事务:当预处理阶段完成后,消息发送者需要执行本地事务,并返回执行结果(commit 或 rollback)。
●消息的二次确认阶段:根据本地事务的执行结果,如果是 commit,则 RocketMQ 将消息的状态设置为“Committing”;否则将消息的状态设置为“Rollback”。
●完成事务:最后在消息的消费者消费该消息时,RocketMQ 会根据消息的状态来决定是否提交该消息。如果消息的状态是“Committing”,则直接提交该消息;否则忽略该消息。
需要注意的是,如果在消息发送的过程中出现异常或者网络故障等问题,RocketMQ 会触发消息回查机制。在回查过程中,RocketMQ 会调用消息发送方提供的回查接口来确认事务的提交状态,从而解决消息投递的不确定性。

7.部署过程相关的命令集汇总

8.机器配置后的完整截图

8.1.hosts配置:cat /etc/hosts

三个机器都是如此

8.2.bash_profile配置:cat ~/.bash_profile

8.2.1worker1:
8.2.2.worker2、worker3:

8.3.conf/broker.conf配置文件:cat borker.conf

8.3.1.worker1:borker.conf

8.3.2.主2从架构:cd conf/2m-2s-async/

8.3.2.1.worker2:broker-a.properties
8.3.2.2worker2:broker-b-s.properties
8.3.2.3.worker3:broker-b.properties
8.3.2.4.worker3:broker-a-s.properties

8.4.Maven配置(worker1)

8.4.1.本地仓库路径

8.4.2.阿里云配置

8.5.整体文件目录结构