不奢望岁月静好 只希望点滴积累

0%

RocketMQ简介

组件:

nameSrv作用:

1
2
3
4
5
6
7
8
1.加载kv配置、创建nettyServer网络处理对象、
2.开启定时任务进行心跳监测
task1: nameSrv 每10s扫描一次broker、移除处于不激活状态的broker
task2: nameSrv 每10min打印一次kv配置
3.注册jvm钩子函数、监听broker、消息生产者的请求

broker如果挂掉、producer向broker的消息发送是否失败 ? 如何处理 ?
10s之后才会更新nameSrv信息、producer才可以检测到

NameSrv功能实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1. 路由注册、故障剔除
nameSrv 测主要作用是为Producer和Consumer提供topic信息、所以需要存储路由的基础信息、能够管理Broker节点、包括路由注册、故障剔除等

元信息:
topicQueueTable: topic 消息队列路由信息、消息发送时进行负载均衡
brokerAddrTable: broker基础信息、包含BrokerName、所属集群地址、主备broker地址
clusterAddrTable: broker集群信息、存储集群中brokerName
brokerLiveTable: broker状态信息、nameServer每次收到心跳包时更新
filterServerTable: broker上的FilterServer列表、用于消息过滤

一个topic有多个Queue、一个broker默认为每个topic创建4个读Q 4个写Q、多个broker组成一个集群、多台相同brokerName的broker机器组成M-S架构、brokerId=0代表M、

BrokerLiveInfo中的lastUpdateTimestamp存储上次收到broker心跳包的时间

路由注册通过broker和nameSrv的心跳实现、broker每隔30s向所有nameSrv发送心跳、nameSrv收到心跳包更新brokerLiveTable中的BrokerLiveInfo的心跳时间、10s扫描一次brokerLiveTable、若连续120s未收到心跳包、nameSrv会移除该broker

路由删除:
1. nameSrv定时扫描 brokerLiveTable、超过120s未收到心跳包、会移除broker信息
2. broker正常关闭时、主动调用unregisterBroker
1) brokerLiveTable、filterServerTable移除该broker
2) 维护brokerAddrList信息
3) 维护clusterAddrTable信息
4) 遍历topic、从路由信息移除该broker

路由发现:
1. 非实时、topic路由信息变化、nameSrv不会主动推送、而是client主动定时拉取、
从topicQueueTable、brokerAddrTable、filterServerTable中填充TopicQueueData的QueueData、BrokerData、filterServer地址表
2. 若从topic找到对应的路由信息为顺序消息、则从nameServer kvConfig中获取关于顺序消息相关的配置填充路由信息

Borker:

1
2
3
4
提供消息存储、
对于mq的消息存储、一般考虑 消息堆积能力 和 消息存储能力
rocketmq 引入内存映射机制、所有主题的消息顺序存储在同一文件
引入 消息文件过期机制 和 文件存储空间报警机制 - 避免消息无限堆积

Producer启动流程

1
2
3
4
1. 检查producerGroup是否符合要求、改变生产者的instanceName为进程id
2. 创建MQClientInstance实例、整个jvm中只存在一个MQClientManager实例、维护一个MQClientInstance的hashMap表、即: 同一个clientID只会创建一个MQClientInstance
instance是封装了rocketMQ的网络处理API、是Producer、Consumer与NameSrv、Broker打交道的网络通道
3. 将producer注册到MQClientInstance管理中、方便后续进行网络请求、心跳检测等

消息发送过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
支持3种方式: 同步sync、异步async、单向oneway
sync: 同步等待broker将结果返回
async: 指定回调函数、不阻塞producer线程、消息结果通过回调通知
oneway: 不等待消息存储结果、亦不提供回调函数、不关心是否存储成功

1. 消息长度验证、消息长度最大4M
2. 查找路由信息
首次发msg时、本地未缓存topic路由信息、查询nameSrv获取、未找到时会尝试创建、若路由信息变化则更新broker地址列表
3. queue选择
2m-2s架构、queueNum=4、rocketmq会如何选择 ?
消息发送时会多次执行queue选择算法、lastBrokerName是上次send failed的broker、第一次执行Q选择时、last为null、直接在所有Q中选择、
再次执行时、会先判断、选择brokerName!=lastBrokerName的来进行规避

如果再次选择、得到的依然是不可用的Queue呢 ?会再次重试、造成资源浪费
broker故障不会立即摘除(nameSrv 10s才检测一次、Producer也是30s才更新路由信息、最快感知也需要30s)、

broker延迟故障机制
1) 获取一个小小队列
2) 验证是否可用
3) 若可用、移除latencyFaultTolerance关于该topic的条目、表明broker故障恢复

消息发送

1. 消息队列如何负载 ?
2. 如何实现高可用?
3. 批量消息如何实现一致性?

Consumer消息消费

1
2
3
4
5
6
7
8
9
10
11
集群模式: 同一topic下一条消息只能允许被其中一个消费者消费、消费进度保存在broker端
广播模式: 同一topic下一条消息可以被同一消费组的所有消费者消费、消息消费进度保存在消费端



消息队列负载与重新分布
消息消费模式
消息拉取方式
消息进度反馈
消息过滤
顺序消息

消费者启动流程

1
2
3
4
5
6
7
1. 构建订阅主题信息SubscriptionData并加入到RebalanceImpl的订阅消息中、订阅关系来自:
1) 调用subscribe方法
2) 订阅重试主题消息 以消费组为单位 命名: %RETRY%+消费组名
2. 初始化MQClientInstance、RebalanceImpl等
3. 初始化消费进度、broadcast保存在broker、cluster保存在consumer
4. 根据是否顺序消费、创建消费端消费线程服务、ConsumeMessageService主要负责消息消费、内部维护一个线程池
5. 向MQClientInstance注册消费者、并启动MQInstance、在一个JVM中的所有消费者、生产者持有同一个MQClientInstance、只启动一次

消息可用性保障

1
2
3
4
5
6
7
8
9
1. Broker 正常关机
2. Broker 异常Crash
3. OS Crash
4. 机器断电、可立即恢复供电
5. 机器无法开机、可能是CPU、主板、内存等关键设备损坏
6. 磁盘设备损坏

1-4 在同步刷盘机制下、可以确保不丢失消息、异步刷盘模式下丢失少量消息
56属于单点故障、一旦发生、节点上的消息全部丢失、若开启了异步复制可保证只丢失少量消息、双写机制下 不会丢失消息

消息延迟

1
在正常不发生消息堆积的情况下、以长轮询方式实现准实时消息推送

消息堆积

1
rocketmq消息存储使用磁盘文件(内存映射)并且在物理布局上为多个大小相等的文件组成逻辑文件组、可无限循环使用、提供消息过期机制、默认保留3

消息过滤:

1
2
1. broker端过滤、
2. consumer过滤、

msg status 说明:

1
2
3
4
SEND_OK, // 发送成功
FLUSH_DISK_TIMEOUT, // 在规定时间内没有完成刷盘、在 flushDiskType 为 SYNC_FLUSH 时才出现
FLUSH_SLAVE_TIMEOUT, // 在主备方式下、broker被设置为 sync_master方式时、未在指定时间内完成主从同步
SLAVE_NOT_AVAILABLE, // 在主备方式下、broker被设置为 sync_master方式时、未找到slave

疑问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1. broker收到消息会先写commitlog、为什么producer发消息的时候会选择一个Queue去发 ?而不是broker ?再由broker分发到Queue ?

2. 写commitlog的时候、同步刷盘指针和异步刷盘指针写的是同一个commitlog文件、如何保证消息的有序存储?

3. 文件清除何时进行 ?

4. 消息队列如何进行负载均衡?

5. 消息发送如何保证高可用

6. 批量消息如何保持一致性

7. broker失效后120s才能将该broker从路由表中移除、若生产者获取到的路由信息包含已宕机的broker如何处理?

8.