SpringCloud之RocketMQ
1、下载安装RocketMQ
-
配置环境变量
变量名:ROCKETMQ_HOME
变量值:rocketMQ的解压路径
-
启动NAMESERVER
-
启动BROKER
1、打开runbroker.cmd文件 在行末%CLASSPATH%处加上双引号
2、执行
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
-
安装windowsRocketMQ插件
-
地址:https://github.com/apache/rocketmq-externals.git
进入rocket-externals/rocketmq-console/src/main/resources文件夹,配置application.properties文件
需要配置插件访问地址、已经nameserver地址(127.0.0.1:9876)
编译插件
mvn clean package -Dmaven.test.skip=true
执行target下jar包启动
-
2、简介
- RocketMQ是一款分布式,队列模型的消息中间件
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
3、网络架构
4、特性
1、nameserver
nameserver
互相独立,无状态,彼此没有通信关系(即节点之间没有任何同步信息),单台nameserver
挂掉,不影响其他nameserver
,即使全部挂掉,也不影响业务系统使用。nameserver
不会有频繁的读写,所以性能开销非常小,稳定性高。
2、broker
与nameserver的关系
- 连接
- 单个
broker
和所有的nameserver
保持长连接
- 单个
- 心跳
- 心跳间隔:每隔30秒(此时间无法更改)向所有
nameserver
发送心跳,心跳包含了自身的topic配置信息 - 心跳超时:
nameserver
每隔10秒(此时间无法更改),扫描所有还存活的broker
连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接
- 心跳间隔:每隔30秒(此时间无法更改)向所有
- 断开
- 时机:
beoker
挂掉,心跳超时导致nameserver
主动关闭连接 - 动作:一旦连接断开,
nameserver
会立即感知,更新topic
与队列的对应关系,但不会通知生产者和消费者
- 时机:
负载均衡
- 一个
topic
分布在多个broker
上,一个broker
可以配置多个topic
,它们是多对多的关系 - 如果某个
topic
消息量很大,应该给它多配制几个队列,并且尽量多分布在不同的broker
上,减轻某个broker的压力 topic
消息量都比较均匀的情况下,如果某个broker
上的队列越多,则该broker
压力越大
可用性
由于消息分布在各个broker
上,一旦某个broker
宕机,则该broker
上的消息读写都会受到影响。所以RocketMQ提供了master/slave
的结构。slave
定时从master
同步数据,如果master
宕机,则slave
提供消费服务。但是不能写入消息,此过程对应用透明。由RocketMQ内部解决。
这里有两个关键点:
- 一旦某个
broker master
宕机,生产者和消费者需要多久才能发现?受限于RocketMQ的网络连接机制,默认情况下,最多需要30秒,但这个时间可由应用设定参数来缩短时间。这个时间段内,发往该broker
的消息都是失败的。而且该broker
的消息无法消费,因为此时消费者不知道该broker
已经宕机 - 消费者得到
master
宕机通知后,转向slave
消费(重定向,对于2次开发者透明),但是slave
不能保证msater
的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢失,一旦master
恢复,未同步的消息也会被消费掉
消息清理
- 扫描间隔:默认10秒,由
broker
配置参数cleanResourceInterval
决定 - 空间阈值:物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接收消息,
broker
打印出日志,消息发送失败,阈值为固定值85% - 清理时间:默认每天凌晨4点,由
broker
配置参数deleteWhen
决定;或者磁盘空间达到阈值 - 文件保留时长:默认72小时,由
broker
配置参数fileReservedTime
决定
读写性能
- 文件内存映射方式操作文件,避免
read/write
系统调用和实时文件读写,性能非常高 - 永远一个文件在写,其他文件在读
- 顺序写,随机读
- 利用Linux的
sendfile
“零拷贝”和mmap
内存映射机制,将消息内容直接输出到Socket
管道,避免了系统调用
系统特性
- 大内存,内存越大性能越高,否则系统
swap
会成为性能瓶颈 - IO密集
- CPU
load
高,使用率低,因为CPU占用后,大部分时间在IOWAIT
上 - 磁盘可靠性要求高,为了兼顾安全和性能,采用RAID 10阵列
- 磁盘读取速度要求高,要求高转速大容量磁盘
3、消费者
与nameserver关系
- 连接
- 单个消费者和一台
nameserver
保持长连接,定时查询topic
配置信息,如果该nameserver
挂掉,消费者会自动连接下一个nameserver
,直到有可用连接为止,并能自动连接
- 单个消费者和一台
- 心跳: 与
nameserver
没有心跳 - 轮询时间
- 默认情况下,消费者每30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由
DefaultMQPushConsumer
的pollNameServerInterval
参数决定,可手动配置。
- 默认情况下,消费者每30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由
与broker关系
- 连接:单个消费者和该消费者关联的所有broker保持长连接
- 心跳
- 默认情况下,消费者每30秒向所有broker发送心跳,该时间由
DefaultMQPushConsumer
的heartbeatBrokerInterval
参数决定。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,从时间无法更改)没有发送心跳数据,则关闭连接,并向该消费者各组的所有消费者发出通知,分组内消费者重新分配队列继续消费
- 默认情况下,消费者每30秒向所有broker发送心跳,该时间由
- 断开
- 时间:消费者挂掉;心跳超时导致broker主动关闭连接
- 动作:一旦连接断开,broker会立即感知到,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费
负载均衡
集群消费模式下,一个消费者集群多台机器共同消费一个topic
的多个队列一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其他消费者会接替挂掉的消费者继续消费。
消费机制
- 本地队列
- 消费者不间断从
broker
拉取消息,消息拉取到本地队列,然后本地消费线程消费本地消费队列,只是一个异步过程,拉取线程不会等待本地消费线程,这种模式实时性非常高(本地消息队列达到解耦的效果,响应时间减少)。对消费者对本地队列有一个保护,因此本地消息队列不能无限大,否则可能会占用大量内存,本地队列大小由DefaultMQPushConsumer
的pullThresholdForQueue
属性控制,默认1000,可以手动设置
- 消费者不间断从
- 轮询间隔
- 消息拉取线程拉取间隔时间由
DefaultMQPushConsumer
的pullInterval
属性控制,默认为0,可手动设置
- 消息拉取线程拉取间隔时间由
- 消息消费数量
- 监听器每次接受本地队列的消息条数由
DefaultMQPushConsumer
的consumerMessageBatchMaxSize
属性控制,默认为1,可手动设置
- 监听器每次接受本地队列的消息条数由
消费进度存储
每隔一段时间,将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumer
的persistConsumerOffsetInterval
属性控制,默认为5秒,可手动设置
问题:如果一个topic在某个broker上有3个队列,一个消费者消费3个队列,那么该消费者和这个broker有几个连接?
一个连接,消费单位与队列相关,消费连接只和broker相关,事实上,消费者将所有队列的消息拉取任务放到本地的队列,挨个拉取,拉取完毕后,又将拉取任务放到队尾,然后执行下一个拉取任务
4、生产者
与nameserver的关系
- 连接
- 单个生产者和一台
nameserver
保持长连接,定时查询topic
配置信息,如果该nameserver
挂掉,生产者会自动连接下一个nameserver
,直到有可用连接为止,并能自动重连
- 单个生产者和一台
- 轮询时间
- 默认情况下,生产者每个30秒从
nameserver
获取所有topic
的最新队列情况,这意味着某个broker
如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败,该时间由DefaultMQProducer
的pollNameServerInterval
参数决定
- 默认情况下,生产者每个30秒从
- 心跳:与
nameserver没有心跳
与broker关系
- 连接
- 单个生产者和该生产者关联的所有
broker
保持长连接
- 单个生产者和该生产者关联的所有
- 心跳
- 默认情况下,生产者每30秒向所有的
broker
发送心跳,该时间由DefaultMQProducer
的heartbeatBrokerInterval
参数决定,可手动配置。broker
每个10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接。
- 默认情况下,生产者每30秒向所有的
- 断开
- 移除
broker
上的生产者信息
- 移除
负载均衡
生产者时间没有关系,每个生产者向队列轮流发送消息
5、Broker集群配置方式及优缺点
1、单个Master
这种方式风险比较大,一旦Broker
重启或者宕机时,会导致整个服务不可用。
2、多Master方式
一个集群无Slave
,全是Master
- 优点:配置简单,单个
Master
宕机或者重启维护对应用无影响,在磁盘配置为RAID 10时,即使机器宕机不可恢复情况下,由于RAID 10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高 - 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
3、多Master多Slave模式,异步复制
每个Master
配置一个Slave
,有多对Master-Slave
,HA采用异步复制方式,准备有短暂消息延迟,毫秒级
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受到影响,因为
Master
宕机后,消费者仍然可以从Slave
消费,此过程对应用透明。不需要人工干预。性能同多Master
模式几乎一样。 - 缺点:
Master
宕机,磁盘损坏情况,会丢失少量消息。
4、多Master多Slave模式,同步双写
每个Master
配置一个Slave,有多对Master-Slave
,HA采用同步双写方式,主从都写成功,向应用返回成功。
- 优点:数据与服务都无单点,
Master
宕机情况下,消息无延迟,服务可用性与数据可用性都非常高 - 缺点:性能比异步复制模式略点,大约低10%左右,发送单个消息的RT会略高。主机宕机后,备机不能自动切换为主机
6、常用配置参数
1、客户端的公共配置类: ClientConfig
参数名 | 默认值 | 说明 |
---|---|---|
NamesrvAddr | NameServer地址列表,多个NameServer地址用分号分隔开 | |
ClientIP | 本机IP | 客户端本机IP地址,某些机器会发生无法识别客户端IP地址的情况,需要在代码中强制指定 |
instanceName | DEFAULT | 客户端实例名称,客户端创建的多个Producer,Consumer实际是共用一个内部实例(这个实例包含网络连接,线程资源等) |
clientCallbackExecutorThreads | 4 | 通信层异步回调线程数 |
pollNameServerInteval | 30000 | 轮询Name Server 间隔时间,单位毫秒 |
heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 |
persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 |
2、Producer配置
参数名 | 默认值 | 说明 |
---|---|---|
producerGroup | DEFAULT_PRODUCER | Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。 |
createTopicKey | TBW102 | 在发送消息时,自动创建服务器不存在的topic,需要指定key |
defaultTopicQueueNums | 10000 | 发送消息超时时间,单位毫秒 |
compressMsgBodyOverHowmuch | 4096 | 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
retryAnotherBrokerWhenNotStoreOK | FALSE | 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 |
maxMessageSize | 131072 | 客户端限制的消息大小,超过报错,同时服务端也会限制(默认128K) |
transactionCheckListener | 事物消息回查监听器,如果发送事务消息,必须设置 | |
checkThreadPoolMinSize | 1 | Broker回查Producer事务状态时,线程池大小 |
checkThreadPoolMaxSize | 1 | Broker回查Producer事务状态时,线程池大小 |
checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
3、PushConsumer配置
参数名 | 默认值 | 说明 |
---|---|---|
consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应将它们归为同一组 |
messageModel | CLUSTERING | 消息模型,支持以下两种1.集群消费2.广播消费 |
consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer启动后,默认从什么位置开始消费 |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
Subscription | {} | 订阅关系 |
messageListener | 消息监听器 | |
offsetStore | 消费进度存储 | |
consumeThreadMin | 10 | 消费线程池最小数量 |
consumeThreadMax | 20 | 消费线程池最大数量 |
consumeConcurrentlyMaxSpan | 2000 | 单队列并行消费允许的最大跨度 |
pullThresholdForQueue | 1000 | 拉消息本地队列缓存消息最大数 |
Pullinterval | 0 | 拉消息间隔,由于是长轮询,所以为0,但是如果应用了流控,也可以设置大于0的值,单位毫秒 |
consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 |
pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 |
4、PullConsumer配置
参数 | 默认值 | 说明 |
---|---|---|
consumerGroup | Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 | |
brokerSuspendMaxTimeMillis | 20000 | 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 |
consumerPullTimeoutMillis | 10000 | 非长轮询,拉消息超时时间,单位毫秒 |
consumerTimeoutMillisWhenSuspend | 30000 | 长轮询,Consumer拉消息请求咋broker挂起超过指定时间,客户端认为超时,单位毫秒 |
messageModel | BROADCASTING | 消息模型,支持以下两种:1集群消费 2广播模式 |
messageQueueListener | 监听队列变化 | |
offsetStore | 消费进度存储 | |
registerTopics | 注册的topic集合 | |
allocateMessageQueueStrategy | Rebalance算法实现策略 |
5、Broker配置
参数名 | 默认值 | 说明 |
---|---|---|
consumerGroup | Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 | |
listenPort | 10911 | Broker对外服务的监听端口 |
namesrvAddr | Null | Name Server地址 |
brokerIP | 本机IP | 本机IP地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况下可以人工配置。 |
brokerName | 本机主机名 | |
brokerClusterName | DefaultCluster | Broker所属哪个集群 |
brokerId | 0 | BrokerId,必须是大等于0的整数,0表示Master,>0表示Slave,一个Master可以挂多个Slave,Master和Slave通过BrokerName来配对 |
storePathCommitLog | $HOME/store/commitlog | commitLog存储路径 |
storePathConsumeQueue | $HOME/store/consumequeue | 消费队列存储路径 |
storePathIndex | $HOME/store/index | 消息索引存储队列 |
deleteWhen | 4 | 删除时间时间点,默认凌晨4点 |
fileReservedTime | 48 | 文件保留时间,默认48小时 |
maxTransferBytesOnMessageInMemory | 262144 | 单次pull消息(内存)传输的最大字节数 |
maxTransferCountOnMessageInMemory | 32 | 单次pull消息(内存)传输的最大条数 |
maxTransferBytesOnMessageInMemory | 65535 | 单次pull消息(磁盘)传输的最大字节数 |
maxTransferCountOnMessageInDisk | 8 | 单次pull消息(磁盘)传输的最大条数 |
messageIndexEnable | TRUE | 是否开启消息索引功能 |
messageIndexSafe | FALSE | 是否提供安全的消息索引机制,索引保证不丢 |
brokerRole | ASYNC_MASTER | Broker的角色-ASYNC_MASTER异步复制Master-SYNC_MASTER同步双写Master-SLAVE |
flushDiskType | ASYNC_FLUSH | 刷盘方式 -ASYNC_FLUSH异步刷盘 -SYNC_FLUSH同步刷盘 |
cleanFileForciblyEnable | TRUE | 磁盘满,且无过期文件情况下TRUE表示强制删除文件,优先保证服务可用 |