Emove

  • 首页
  • 归档
  • 分类
  • 标签

  • 搜索
context 反射 channel LRU BeanDefinition JVM 装饰者模式 MyBatis 快慢指针 归并排序 链表 hash表 栈 回溯 贪心 主从复制 二分查找 双指针 动态规划 AOF RDB 规范 BASE理论 CAP B树 RocketMQ Sentinel Ribbon Eureka 命令模式 访问者模式 迭代器模式 中介者模式 备忘录模式 解释器模式 状态模式 策略模式 职责链模式 模板方法模式 代理模式 享元模式 桥接模式 外观模式 组合模式 适配器模式 建造者模式 原型模式 工场模式 单例 UML 锁 事务 sql 索引

SpringCloud之RocketMQ

发表于 2020-04-04 | 分类于 微服务 | 2 | 阅读次数 161

SpringCloud之RocketMQ

1、下载安装RocketMQ

  1. 配置环境变量

    变量名:ROCKETMQ_HOME

    变量值:rocketMQ的解压路径

  2. 启动NAMESERVER

  3. 启动BROKER

    1、打开runbroker.cmd文件 在行末%CLASSPATH%处加上双引号

    2、执行

    start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
    
  4. 安装windowsRocketMQ插件

    1. 地址:https://github.com/apache/rocketmq-externals.git

      进入rocket-externals/rocketmq-console/src/main/resources文件夹,配置application.properties文件

      需要配置插件访问地址、已经nameserver地址(127.0.0.1:9876)

      编译插件

    mvn clean package -Dmaven.test.skip=true
    

    执行target下jar包启动

2、简介

  1. RocketMQ是一款分布式,队列模型的消息中间件
  2. 能够保证严格的消息顺序
  3. 提供丰富的消息拉取模式
  4. 高效的的订阅者水平扩展能力
  5. 实时的消息订阅机制
  6. 亿级消息堆积能力

3、网络架构

RocketMQ架构图

4、特性

1、nameserver

  1. nameserver互相独立,无状态,彼此没有通信关系(即节点之间没有任何同步信息),单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用。
  2. nameserver不会有频繁的读写,所以性能开销非常小,稳定性高。

2、broker

与nameserver的关系

  • 连接
    • 单个broker和所有的nameserver保持长连接
  • 心跳
    • 心跳间隔:每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息
    • 心跳超时:nameserver每隔10秒(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接
  • 断开
    • 时机:beoker挂掉,心跳超时导致nameserver主动关闭连接
    • 动作:一旦连接断开,nameserver会立即感知,更新topic与队列的对应关系,但不会通知生产者和消费者

负载均衡

  • 一个topic分布在多个broker上,一个broker可以配置多个topic,它们是多对多的关系
  • 如果某个topic消息量很大,应该给它多配制几个队列,并且尽量多分布在不同的broker上,减轻某个broker的压力
  • topic消息量都比较均匀的情况下,如果某个broker上的队列越多,则该broker压力越大

可用性

由于消息分布在各个broker上,一旦某个broker宕机,则该broker上的消息读写都会受到影响。所以RocketMQ提供了master/slave的结构。slave定时从master同步数据,如果master宕机,则slave提供消费服务。但是不能写入消息,此过程对应用透明。由RocketMQ内部解决。

这里有两个关键点:

  • 一旦某个broker master宕机,生产者和消费者需要多久才能发现?受限于RocketMQ的网络连接机制,默认情况下,最多需要30秒,但这个时间可由应用设定参数来缩短时间。这个时间段内,发往该broker的消息都是失败的。而且该broker的消息无法消费,因为此时消费者不知道该broker已经宕机
  • 消费者得到master宕机通知后,转向slave消费(重定向,对于2次开发者透明),但是slave不能保证msater的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢失,一旦master恢复,未同步的消息也会被消费掉

消息清理

  • 扫描间隔:默认10秒,由broker配置参数cleanResourceInterval决定
  • 空间阈值:物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接收消息,broker打印出日志,消息发送失败,阈值为固定值85%
  • 清理时间:默认每天凌晨4点,由broker配置参数deleteWhen决定;或者磁盘空间达到阈值
  • 文件保留时长:默认72小时,由broker配置参数fileReservedTime决定

读写性能

  • 文件内存映射方式操作文件,避免read/write系统调用和实时文件读写,性能非常高
  • 永远一个文件在写,其他文件在读
  • 顺序写,随机读
  • 利用Linux的sendfile“零拷贝”和mmap内存映射机制,将消息内容直接输出到Socket管道,避免了系统调用

系统特性

  • 大内存,内存越大性能越高,否则系统swap会成为性能瓶颈
  • IO密集
  • CPU load高,使用率低,因为CPU占用后,大部分时间在IO WAIT上
  • 磁盘可靠性要求高,为了兼顾安全和性能,采用RAID 10阵列
  • 磁盘读取速度要求高,要求高转速大容量磁盘

3、消费者

与nameserver关系

  • 连接
    • 单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动连接
  • 心跳: 与nameserver没有心跳
  • 轮询时间
    • 默认情况下,消费者每30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由DefaultMQPushConsumer的pollNameServerInterval参数决定,可手动配置。

与broker关系

  • 连接:单个消费者和该消费者关联的所有broker保持长连接
  • 心跳
    • 默认情况下,消费者每30秒向所有broker发送心跳,该时间由DefaultMQPushConsumer的heartbeatBrokerInterval参数决定。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,从时间无法更改)没有发送心跳数据,则关闭连接,并向该消费者各组的所有消费者发出通知,分组内消费者重新分配队列继续消费
  • 断开
    • 时间:消费者挂掉;心跳超时导致broker主动关闭连接
    • 动作:一旦连接断开,broker会立即感知到,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费

负载均衡

集群消费模式下,一个消费者集群多台机器共同消费一个topic的多个队列一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其他消费者会接替挂掉的消费者继续消费。

消费机制

  • 本地队列
    • 消费者不间断从broker拉取消息,消息拉取到本地队列,然后本地消费线程消费本地消费队列,只是一个异步过程,拉取线程不会等待本地消费线程,这种模式实时性非常高(本地消息队列达到解耦的效果,响应时间减少)。对消费者对本地队列有一个保护,因此本地消息队列不能无限大,否则可能会占用大量内存,本地队列大小由DefaultMQPushConsumer的pullThresholdForQueue属性控制,默认1000,可以手动设置
  • 轮询间隔
    • 消息拉取线程拉取间隔时间由DefaultMQPushConsumer的pullInterval属性控制,默认为0,可手动设置
  • 消息消费数量
    • 监听器每次接受本地队列的消息条数由DefaultMQPushConsumer的consumerMessageBatchMaxSize属性控制,默认为1,可手动设置

消费进度存储
每隔一段时间,将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumer的persistConsumerOffsetInterval属性控制,默认为5秒,可手动设置

问题:如果一个topic在某个broker上有3个队列,一个消费者消费3个队列,那么该消费者和这个broker有几个连接?
一个连接,消费单位与队列相关,消费连接只和broker相关,事实上,消费者将所有队列的消息拉取任务放到本地的队列,挨个拉取,拉取完毕后,又将拉取任务放到队尾,然后执行下一个拉取任务

4、生产者

与nameserver的关系

  • 连接
    • 单个生产者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连
  • 轮询时间
    • 默认情况下,生产者每个30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败,该时间由DefaultMQProducer的pollNameServerInterval参数决定
  • 心跳:与nameserver没有心跳

与broker关系

  • 连接
    • 单个生产者和该生产者关联的所有broker保持长连接
  • 心跳
    • 默认情况下,生产者每30秒向所有的broker发送心跳,该时间由DefaultMQProducer的heartbeatBrokerInterval参数决定,可手动配置。broker每个10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接。
  • 断开
    • 移除broker上的生产者信息

负载均衡

生产者时间没有关系,每个生产者向队列轮流发送消息

5、Broker集群配置方式及优缺点

1、单个Master

这种方式风险比较大,一旦Broker重启或者宕机时,会导致整个服务不可用。

2、多Master方式

一个集群无Slave,全是Master

  • 优点:配置简单,单个Master宕机或者重启维护对应用无影响,在磁盘配置为RAID 10时,即使机器宕机不可恢复情况下,由于RAID 10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

3、多Master多Slave模式,异步复制

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,准备有短暂消息延迟,毫秒级

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受到影响,因为Master宕机后,消费者仍然可以从Slave消费,此过程对应用透明。不需要人工干预。性能同多Master模式几乎一样。
  • 缺点:Master宕机,磁盘损坏情况,会丢失少量消息。

4、多Master多Slave模式,同步双写

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,主从都写成功,向应用返回成功。

  • 优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
  • 缺点:性能比异步复制模式略点,大约低10%左右,发送单个消息的RT会略高。主机宕机后,备机不能自动切换为主机

6、常用配置参数

1、客户端的公共配置类: ClientConfig

参数名默认值说明
NamesrvAddr NameServer地址列表,多个NameServer地址用分号分隔开
ClientIP本机IP客户端本机IP地址,某些机器会发生无法识别客户端IP地址的情况,需要在代码中强制指定
instanceNameDEFAULT客户端实例名称,客户端创建的多个Producer,Consumer实际是共用一个内部实例(这个实例包含网络连接,线程资源等)
clientCallbackExecutorThreads4通信层异步回调线程数
pollNameServerInteval30000轮询Name Server 间隔时间,单位毫秒
heartbeatBrokerInterval30000向Broker发送心跳间隔时间,单位毫秒
persistConsumerOffsetInterval5000持久化Consumer消费进度间隔时间,单位毫秒

2、Producer配置

参数名默认值说明
producerGroupDEFAULT_PRODUCERProducer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。
createTopicKeyTBW102在发送消息时,自动创建服务器不存在的topic,需要指定key
defaultTopicQueueNums10000发送消息超时时间,单位毫秒
compressMsgBodyOverHowmuch4096消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
retryAnotherBrokerWhenNotStoreOKFALSE如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送
maxMessageSize131072客户端限制的消息大小,超过报错,同时服务端也会限制(默认128K)
transactionCheckListener 事物消息回查监听器,如果发送事务消息,必须设置
checkThreadPoolMinSize1Broker回查Producer事务状态时,线程池大小
checkThreadPoolMaxSize1Broker回查Producer事务状态时,线程池大小
checkRequestHoldMax2000Broker回查Producer事务状态时,Producer本地缓冲请求队列大小

3、PushConsumer配置

参数名默认值说明
consumerGroupDEFAULT_CONSUMERConsumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应将它们归为同一组
messageModelCLUSTERING消息模型,支持以下两种1.集群消费2.广播消费
consumeFromWhereCONSUME_FROM_LAST_OFFSETConsumer启动后,默认从什么位置开始消费
allocateMessageQueueStrategyAllocateMessageQueueAveragelyRebalance算法实现策略
Subscription{}订阅关系
messageListener 消息监听器
offsetStore 消费进度存储
consumeThreadMin10消费线程池最小数量
consumeThreadMax20消费线程池最大数量
consumeConcurrentlyMaxSpan2000单队列并行消费允许的最大跨度
pullThresholdForQueue1000拉消息本地队列缓存消息最大数
Pullinterval0拉消息间隔,由于是长轮询,所以为0,但是如果应用了流控,也可以设置大于0的值,单位毫秒
consumeMessageBatchMaxSize1批量消费,一次消费多少条消息
pullBatchSize32批量拉消息,一次最多拉多少条

4、PullConsumer配置

参数默认值说明
consumerGroup Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
brokerSuspendMaxTimeMillis20000长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒
consumerPullTimeoutMillis10000非长轮询,拉消息超时时间,单位毫秒
consumerTimeoutMillisWhenSuspend30000长轮询,Consumer拉消息请求咋broker挂起超过指定时间,客户端认为超时,单位毫秒
messageModelBROADCASTING消息模型,支持以下两种:1集群消费 2广播模式
messageQueueListener 监听队列变化
offsetStore 消费进度存储
registerTopics 注册的topic集合
allocateMessageQueueStrategy Rebalance算法实现策略

5、Broker配置

参数名默认值说明
consumerGroup Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
listenPort10911Broker对外服务的监听端口
namesrvAddrNullName Server地址
brokerIP本机IP本机IP地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况下可以人工配置。
brokerName本机主机名
brokerClusterNameDefaultClusterBroker所属哪个集群
brokerId0BrokerId,必须是大等于0的整数,0表示Master,>0表示Slave,一个Master可以挂多个Slave,Master和Slave通过BrokerName来配对
storePathCommitLog$HOME/store/commitlogcommitLog存储路径
storePathConsumeQueue$HOME/store/consumequeue消费队列存储路径
storePathIndex$HOME/store/index消息索引存储队列
deleteWhen4删除时间时间点,默认凌晨4点
fileReservedTime48文件保留时间,默认48小时
maxTransferBytesOnMessageInMemory262144单次pull消息(内存)传输的最大字节数
maxTransferCountOnMessageInMemory32单次pull消息(内存)传输的最大条数
maxTransferBytesOnMessageInMemory65535单次pull消息(磁盘)传输的最大字节数
maxTransferCountOnMessageInDisk8单次pull消息(磁盘)传输的最大条数
messageIndexEnableTRUE是否开启消息索引功能
messageIndexSafeFALSE是否提供安全的消息索引机制,索引保证不丢
brokerRoleASYNC_MASTERBroker的角色-ASYNC_MASTER异步复制Master-SYNC_MASTER同步双写Master-SLAVE
flushDiskTypeASYNC_FLUSH刷盘方式 -ASYNC_FLUSH异步刷盘 -SYNC_FLUSH同步刷盘
cleanFileForciblyEnableTRUE磁盘满,且无过期文件情况下TRUE表示强制删除文件,优先保证服务可用
# context # 反射 # channel # LRU # BeanDefinition # JVM # 装饰者模式 # MyBatis # 快慢指针 # 归并排序 # 链表 # hash表 # 栈 # 回溯 # 贪心 # 主从复制 # 二分查找 # 双指针 # 动态规划 # AOF # RDB # 规范 # BASE理论 # CAP # B树 # RocketMQ # Sentinel # Ribbon # Eureka # 命令模式 # 访问者模式 # 迭代器模式 # 中介者模式 # 备忘录模式 # 解释器模式 # 状态模式 # 策略模式 # 职责链模式 # 模板方法模式 # 代理模式 # 享元模式 # 桥接模式 # 外观模式 # 组合模式 # 适配器模式 # 建造者模式 # 原型模式 # 工场模式 # 单例 # UML # 锁 # 事务 # sql # 索引
SpringCloud之Sentinel
寻找两个有序数组的中位数
  • 文章目录
  •   |  
  • 概览
林亦庭

林亦庭

less can be more

87 文章
11 分类
54 标签
RSS
Github
Creative Commons
© 2021 林亦庭
粤ICP备20029050号