1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| 1. 存储设计 commitLog: 消息存储文件、所有topic的消息都存储在commitLog 大小默认1G、以文件中第一个偏移量为文件名、偏移量小于20位、用0补全 consumeQueue: 消息消费队列、消息到达commitLog后悔异步转发到Queue、 indexFile: 消息索引文件、主要存储key与offset的对应关系 事务状态服务: 存储每条消息的事务状态 定时消息服务: 每一个延迟级别对应一个消息消费队列、存储延迟队列的消息拉取进度 2. msg发送存储流程 若当前broker挂掉或者broker为slave或者不支持写入时、拒绝写入 若topic的长度>256个字符、或者msg的属性长度>65535时、拒绝写入 若消息的延迟级别>0, 将消息的原主题名称和原消息队列id存入消息属性、使用延迟消息队列schedule_topic transientStorePoolEnable 是否开启缓存池. rocketmq单独创建一个mappedByteBuffer内存缓存池、临时存储数据、数据会先写入该内存映射、然后由commit线程复制到与物理文件对应的内存映射中、主要是提供一种内存锁定、将当前堆外内存一直锁定在内存中、避免被进程将该内存交互到磁盘、提高存储性能 3. 存储文件组织与内存映射机制 doAppend只是将消息存储到byteBuffer中、然后创建AppendMessageResult、只是将消息存储在MappedFile对应的内存映射buffer中、并未刷到磁盘 handleDiskFlush方法会处理数据持久化 handleHA会处理m-s 4. 存储文件 commitlog: 消息存储目录 config运行期间配置信息 consumequque 消息消费队列存储目录 index 消息索引文件存储目录 abort 启动时创建、正常退出之前删除 checkpoint 文件检测点、存储commitlog文件最后一次刷盘时间戳、consumeQueue最后一次刷盘时间、index文件最后一次刷盘时间戳 5. 消费队列、索引文件构成和机制 6. 文件恢复机制 rocketmq将消息全量存储在commitlog中、然后异步转发任务更新consumeQueue、index文件、若消息成功存储在commitLog转发任务执行失败、eg. broker宕机、则存储三者不一致的情况、commitlog中的消息可能永远不会被消费、rocketmq是如何保障最终一致性的 ? 1) 判断上次退出是否正常、在broker启动时check是否存储abort文件、若存在、说明是非正常退出、需要修复 2) 加载延迟队列 3) 加载commitlog文件 4) 加载消息消费队列 5) 加载checkpoint文件、 6) 加载索引文件、索引文件上次刷盘时间<该索引文件最大的消息时间戳时 说明索引文件不完备、立即销毁 7) 根据broker是否正常停止、执行不同的恢复策略 7. 刷盘机制 rocketMQ的刷盘是基于NIO的内存映射机制MappedByteBuffer、消息存储时先将消息追加到内存、再根据配置的刷盘策略在不同时间进行刷写磁盘。 同步刷盘时: 消息追加到内存后悔同步调用force()方法 消息生产者将在消息服务器端将消息内容追加到内存映射文件后、需同步将内存内容立刻刷到磁盘、调用内存映射文件MapppedByteBuffer的force方法可将内存中的数据写入磁盘 异步刷盘时: 消息写到内存后、会立即返回给producer、mq单独起一个线程按照固定频率刷盘 若开启 transientStorePoolEnable机制、RocketMQ会单独申请一个与目标物理文件commitlog相同大小的堆外内存、它将会使用内存锁定、不会被置换到虚拟内存中、消息先追加到堆外内存、然后提交到与物理文件的内存映射内存中、再flush到磁盘 若未开启transientStorePoolEnable机制、消息直接追加到与物理文件直接映射内存中、然后刷写到磁盘 1) 先将消息直接追加到ByteBuffer(堆外内存DirectlyByteBuffer), WrotePosition随消息增加不断向后移动 2) CommiteRealTimeService线程默认每200ms将ByteBuffer中新追加的内容(WritePosition-commitedPosition)提交到MappedByteBuffer中 3) MappedByteBuffer在内存中追加提交的内存、wrotePosition向前后移动、然后返回 4) commit操作成功返回、将commitedPosition想前后移动本次提交的内容长度、此时wrotePosition指针依然可以向前推进 5) flushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内容wrotePosition-上次刷写位置 flushedPositiont通过调研MappedByteBuffer#force方法讲数据刷写到磁盘 8. 文件删除机制 rocketMQ操作commitLog、conssumeQueue是基于内存映射机制并在启动的时候加载commitLog、consumeQueue下所有的文件、为了避免内存与磁盘浪费、引入文件过期机制: 超过一定时间间隔内没有更新的文件被认为过期文件、默认72h、可通过fileReservedTime来修改 满足以下条件会删除: 1) 指定删除文件的时间点、固定时间执行删除 默认凌晨4点 2) 磁盘不足时、主动触发过期文件删除操作 磁盘分区使用率超过90%不可写入 3) 预留、手工触发
|