不奢望岁月静好 只希望点滴积累

0%

###索引管理

索引创建

1
2
3
4
5
6
7
8
9
10
11
12
db.COLLECTION_NAME.ensureIndex(keys[,options])

keys: 要建立索引的参数列表 key字段名、1升序 -1降序

options: 可选参数
background: Boolean 在后台建立索引
unique: boolean 创建唯一索引、默认false
name String指定索引名称
dropDups Boolean创建唯一索引时、若若出现重复、删除后续出现的相同索引、只保留第一个
sparse Boolean 对文档中不存在的字段数据不启用索引、默认false
v index version 索引的版本号
weights document索引权重值、表示改索引相对其它索引字段的得分权重值

重建索引

1
2
db.collectionName.reIndex()
作用类似于mysqloptimize、修复多次修改产生的文件空洞

查看索引

1
2
db.collectionName.getIndexes()
查看索引大小

删除索引

1
2
3
4
db.collectionName.dropIndex(IndexName)

删除所有索引
db.collectionName.dropIndexes;

基础索引与复合索引

基础索引

1
2
3
4
5
1. 为一个集合中的某个字段创建索引
eg. db.useers.ensureIndex({age:1})

2. 当数据很多时、索引创建会比较慢、可以指定 background: true
eg. db.users.ensureIndex({age:1}, {background: true})

组合索引

1
2
users表的agecity字段、创建联合索引、分表按照升序和降序排列
eg. db.useers.ensureIndex({age:1, city:-1})

查看索引

1
2
db.dbname.getIndexes() 返回当前集合所有的索引
1:升序 -1:降序

文档索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1. mongodb可以为多个字段创建索引、当字段是子文档时、同样可以创建
eg. mongo中有下列数据
{name:"aaa", address:{city:"bj",district:"海淀"}}

可以为address创建索引如下:
db.users.ensureIndex({address:1})

建立索引后、查询时子文档的字段顺序要和查询顺序一致、才可使用索引

// 会使用索引
db.users.find({address:{city:"bj",district:"海淀"}})

// 不会使用索引
db.users.find({address:{district:"海淀",city:"bj"}})

也可以只对子文档的某一个或者部分字段创建索引
db.users.ensureIndex({"address.city":1})

唯一索引与强制索引

创建唯一索引

1
2
3
4
5
添加索引时、指定 unique:true 等效于唯一索引
eg. 为users表的email字段添加唯一索引
db.users.ensureIndex({email:1},{unique:true})

创建唯一索引后、插入重复值时、mongo会报错..

强制使用索引

1
db.users.find({name:'aaa', age:3}).hint({age:1})

MQ消息积压如何处理

优化性能避免消息积压

MQ本身的处理能力是远大于业务系统的处理能力的、主流消息队列的单个节点、消息收发的性能可以达到每秒几万到几十万的水平、水平扩展Broker的实例数可以成倍的提升处理能力、应该更多的关注在消息的收发两端、让业务代码和MQ配合、达到最佳性能

  • 发送端性能优化
    若代码发送消息的性能较弱、很可能是发MQ之前的逻辑太耗时导致、一般设置合理的并发和批量大小、就可以达到很好的性能

MQ的完整交互: P -> Broker -> R
假设单线程发送、每秒处理请求 1000ms/1ms*1条/1ms = 1000 条msg、并不能发挥MQ的实力

  1. 增加batch大小、2. 并发请求 都可以提升性能
    对于关注延时的RPC系统、可以选择增加并发量
    对于关注吞吐量的离线分析系统、它不关心时延、可以选择批量发送
  • 消费端性能优化
    若消费的速度跟不上msg生产的速度、MQ存储被填满之后就会造成无法提供服务、消息丢失、对于整个系统都是严重故障

消费端的性能优化除了优化消费业务逻辑之外、还可以通过简单的水平扩容来增加消费端的并发数来提高整体的消费性能
注意
在扩容consumer实例的同时、必须同步扩容主题中的分区(队列)数量、确保consumer的实例数和分区数相等、因为每个分区只支持单线程消费

  • 消息积压了如何处理?
  1. 若单位时间内发送的消息增多、最快的办法就是通过扩容消费端的实例来提升总体的消费能力,或者可以通过系统降级、减少生产者发送数据
  2. 若是消费和生产速度都无明显变化、需要检查消费端、数不胜数消费失败导致反复消费
  3. 若是消费变慢、可以快速排查消费日志、看看消费线程是不是卡着不动了

疑问点记录

假如、有一个topic、Q为5、Broker为2

  1. 有3个生产者实例、如何对应到5个Q ?
    不用对应、随便发或者指定Q选取规则

  2. 每个消费组都是单独的订阅、拥有队列的全部消息、消费完后消息不会删除

  3. 多个消费组订阅同一个topic彼此不影响、
    eg. 消费组G0 | G1 、G0挂掉、积压很多消息、对G1也没有任何影响

  4. 消费位置
    每个消费组会维护一组消费位置、每个队列对应一个消费位置、并且消费位置和消费者无关、保存在服务端

  5. 如何实现单个队列的并行消费
    eg. MQ中有10条消息、对应的编号是0-9、当前消费位置是5、同时有5、6、7三个消费者拉取消息、5、6、7三个消息给每个消费者一人一条、理想情况下、3个消费者成功响应、消费位置更新为8 实现并行消费
    假如5卡着某个环节不动了、位置5就是消息空洞、为了避免整个队列卡住、将5复制到一个重试队列、然后更新消费位置、消费时优先给出重试队列的数据
    这是实现并行消费的一种实现方式、但是开销很大、不应该作为常规手段、若需要增加消费者的并发数、还是需要扩容队列数

消息队列架构演进

早期mq就是按照队列的数据结构来设计的
早期MQ架构

若有多个生产者往同一队列发送消息、这个队列可以消费到生产者消息的集合、顺序为生产者发送消息的自然顺序
若多个消费者消费一个队列、这些消费者是竞争关系、只消费到队列的一部分数据
需要每个消费者消费全量消息时、只能创建多个Queue、让生产者发送多份数据、并且生产者必须知道有多少消费者、违背了消息队列解耦的初衷

演进架构
发布 - 订阅模型(Publish-Subscribe Pattern)
发布-订阅模型

在P-S架构中、msg发送方称为发布者Publisher、接收方称为订阅者Subscriber、服务端存放消息的容器称为主题Topic、消费之前必须先订阅主题

其实和队列模式没有本质区别、最大的区别在于: 一份数据能不能被消费多次

RabbitMQ的消息模型

RabbitMQ是少数坚持使用队列模型的产品之一、它如何解决多个消费者的问题呢 ?
Exchange: 位于生产者和队列之间、生产者不关心将消息发送给哪个队列、而是将消息发送给Exchange、由Exchange上配置的策略来决定将消息投递到哪些队列中
RabbitMQ消息模型

同一份消息需要被多个消费者消费时、需要配置exchange将消息发送到多个队列、每个队列中存放一份完整的消息数据、

RocketMQ的消息模型

RocketMQ使用的是标准的发布-订阅模型、

普通的MQ都是使用的请求-确认机制、确保消息不会再传递过程中由于网络故障或服务器故障丢失. 在生产端、生产者将消息发给服务端Broker、Broker在收到消息并将消息写入主题或者队列后、会给生产者发送确认的响应、若生产者未收到服务端的确认或者收到失败的响应、会重新发消息; 在消费端、消费者收到消息并完成消费逻辑后也会给服务端发送消费成功的确认、服务端只有在收到消费确认后、才会认为一条消息被成功消费、否则会重发消息

为了保证消息有序性、在消息被成功消费前、下一条消息不能被消费、否则就出现了消息空洞、违背了有序性的原则、所以 MQ增加了队列的概念、只在队列层面保证消息的有序性、主题层面不保证

消息会被不同消费组消费、消费完的消息不会立即删除、MQ为每个队列维护了一个消费位置Consumer Offset、每成功消费一条消息、位置就+1

Kafka的消息模型

Kafka的消费模型和RocketMQ的模型是完全一样的、唯一的区别是 在Kafka中队列这个概念的名称不同、Kafka对应的概念叫 分区 - Partition

注意:
RocketMQ 和 Kafka 的业务模型一样、不代表实现一致、其实实现是完全不同的、就像MySQL和hbase存放数据的单元都是表、实现完全不同、MySQL使用B+树来存储、HBase 使用KV结构存储

MQ的事务实现

Kafka 和 RocketMQ 都提供了事务的实现
在消息队列上开启一个事务、给服务器发送一个半消息状态、它包含完整的消息内容、但对于消费者不可见、本地事务提交后再提交消息事务
但: 如果提交消息事务失败怎么办 ?

  1. kafka的方案简单粗暴、直接抛异常、用户自行处理
  2. RocketMQ提供了事务反查机制、若Broker未收到提交或者回滚的请求、Broker会定期去Producer上反查本地事务对应状态、来决定事务提交还是回滚
    image.png

消息可靠性保证

检测消息丢失
1.若基础设施比较完善、一般会有分布式链路追踪系统、可以追踪消息流
2.利用消息有序性校验: 给每个发出的消息附加一个连续的序号、若序号不连续、就可以判定为消息丢失
注意: 多个producer 和 多个consumer的情况、需要判定每个分区的序列有效性
如何保证
1.生产阶段、根据不同mq的确认机制、进行对应的处理 eg: kafka -> 捕获异常
2.存储阶段、正常broker运行的时候、不会出现消息丢失、但若broker出现了故障或者宕机、是可能丢消息的. 单节点broker可以配置broker在收到消息后立即落盘、broker是集群的时候、可以配置至少2个节点收到消息再确认
3.消费阶段、同生产阶段、消费端业务逻辑成功后再回传确认
有可能消息再网络传输过程中发生错误、发送方收不到确认就会重发消息、所以Broker 和 Consumer 都可能会收到重复消息、需要注意接口的幂等性

业务变相实现
1.利用数据库的唯一约束实现幂等、
2.为更新的数据设置前置条件、满足某种条件才更新
3.记录并检查操作(需要考虑唯一ID的生成和多步操作的原子性)

思考

幂等方案、不止是可以解决消息重复的问题、也同样适用于其它场景.eg: 将HTTP服务设计成幂等的、解决前端或者APP重复提交表单数据问题、也可以将一个微服务设计成幂等的、解决RPC框架自动重试导致的重复调用问题

组件:

nameSrv作用:

1
2
3
4
5
6
7
8
1.加载kv配置、创建nettyServer网络处理对象、
2.开启定时任务进行心跳监测
task1: nameSrv 每10s扫描一次broker、移除处于不激活状态的broker
task2: nameSrv 每10min打印一次kv配置
3.注册jvm钩子函数、监听broker、消息生产者的请求

broker如果挂掉、producer向broker的消息发送是否失败 ? 如何处理 ?
10s之后才会更新nameSrv信息、producer才可以检测到

NameSrv功能实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1. 路由注册、故障剔除
nameSrv 测主要作用是为Producer和Consumer提供topic信息、所以需要存储路由的基础信息、能够管理Broker节点、包括路由注册、故障剔除等

元信息:
topicQueueTable: topic 消息队列路由信息、消息发送时进行负载均衡
brokerAddrTable: broker基础信息、包含BrokerName、所属集群地址、主备broker地址
clusterAddrTable: broker集群信息、存储集群中brokerName
brokerLiveTable: broker状态信息、nameServer每次收到心跳包时更新
filterServerTable: broker上的FilterServer列表、用于消息过滤

一个topic有多个Queue、一个broker默认为每个topic创建4个读Q 4个写Q、多个broker组成一个集群、多台相同brokerName的broker机器组成M-S架构、brokerId=0代表M、

BrokerLiveInfo中的lastUpdateTimestamp存储上次收到broker心跳包的时间

路由注册通过broker和nameSrv的心跳实现、broker每隔30s向所有nameSrv发送心跳、nameSrv收到心跳包更新brokerLiveTable中的BrokerLiveInfo的心跳时间、10s扫描一次brokerLiveTable、若连续120s未收到心跳包、nameSrv会移除该broker

路由删除:
1. nameSrv定时扫描 brokerLiveTable、超过120s未收到心跳包、会移除broker信息
2. broker正常关闭时、主动调用unregisterBroker
1) brokerLiveTable、filterServerTable移除该broker
2) 维护brokerAddrList信息
3) 维护clusterAddrTable信息
4) 遍历topic、从路由信息移除该broker

路由发现:
1. 非实时、topic路由信息变化、nameSrv不会主动推送、而是client主动定时拉取、
从topicQueueTable、brokerAddrTable、filterServerTable中填充TopicQueueData的QueueData、BrokerData、filterServer地址表
2. 若从topic找到对应的路由信息为顺序消息、则从nameServer kvConfig中获取关于顺序消息相关的配置填充路由信息

Borker:

1
2
3
4
提供消息存储、
对于mq的消息存储、一般考虑 消息堆积能力 和 消息存储能力
rocketmq 引入内存映射机制、所有主题的消息顺序存储在同一文件
引入 消息文件过期机制 和 文件存储空间报警机制 - 避免消息无限堆积

Producer启动流程

1
2
3
4
1. 检查producerGroup是否符合要求、改变生产者的instanceName为进程id
2. 创建MQClientInstance实例、整个jvm中只存在一个MQClientManager实例、维护一个MQClientInstance的hashMap表、即: 同一个clientID只会创建一个MQClientInstance
instance是封装了rocketMQ的网络处理API、是Producer、Consumer与NameSrv、Broker打交道的网络通道
3. 将producer注册到MQClientInstance管理中、方便后续进行网络请求、心跳检测等

消息发送过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
支持3种方式: 同步sync、异步async、单向oneway
sync: 同步等待broker将结果返回
async: 指定回调函数、不阻塞producer线程、消息结果通过回调通知
oneway: 不等待消息存储结果、亦不提供回调函数、不关心是否存储成功

1. 消息长度验证、消息长度最大4M
2. 查找路由信息
首次发msg时、本地未缓存topic路由信息、查询nameSrv获取、未找到时会尝试创建、若路由信息变化则更新broker地址列表
3. queue选择
2m-2s架构、queueNum=4、rocketmq会如何选择 ?
消息发送时会多次执行queue选择算法、lastBrokerName是上次send failed的broker、第一次执行Q选择时、last为null、直接在所有Q中选择、
再次执行时、会先判断、选择brokerName!=lastBrokerName的来进行规避

如果再次选择、得到的依然是不可用的Queue呢 ?会再次重试、造成资源浪费
broker故障不会立即摘除(nameSrv 10s才检测一次、Producer也是30s才更新路由信息、最快感知也需要30s)、

broker延迟故障机制
1) 获取一个小小队列
2) 验证是否可用
3) 若可用、移除latencyFaultTolerance关于该topic的条目、表明broker故障恢复

消息发送

1. 消息队列如何负载 ?
2. 如何实现高可用?
3. 批量消息如何实现一致性?

Consumer消息消费

1
2
3
4
5
6
7
8
9
10
11
集群模式: 同一topic下一条消息只能允许被其中一个消费者消费、消费进度保存在broker端
广播模式: 同一topic下一条消息可以被同一消费组的所有消费者消费、消息消费进度保存在消费端



消息队列负载与重新分布
消息消费模式
消息拉取方式
消息进度反馈
消息过滤
顺序消息

消费者启动流程

1
2
3
4
5
6
7
1. 构建订阅主题信息SubscriptionData并加入到RebalanceImpl的订阅消息中、订阅关系来自:
1) 调用subscribe方法
2) 订阅重试主题消息 以消费组为单位 命名: %RETRY%+消费组名
2. 初始化MQClientInstance、RebalanceImpl等
3. 初始化消费进度、broadcast保存在broker、cluster保存在consumer
4. 根据是否顺序消费、创建消费端消费线程服务、ConsumeMessageService主要负责消息消费、内部维护一个线程池
5. 向MQClientInstance注册消费者、并启动MQInstance、在一个JVM中的所有消费者、生产者持有同一个MQClientInstance、只启动一次

消息可用性保障

1
2
3
4
5
6
7
8
9
1. Broker 正常关机
2. Broker 异常Crash
3. OS Crash
4. 机器断电、可立即恢复供电
5. 机器无法开机、可能是CPU、主板、内存等关键设备损坏
6. 磁盘设备损坏

1-4 在同步刷盘机制下、可以确保不丢失消息、异步刷盘模式下丢失少量消息
56属于单点故障、一旦发生、节点上的消息全部丢失、若开启了异步复制可保证只丢失少量消息、双写机制下 不会丢失消息

消息延迟

1
在正常不发生消息堆积的情况下、以长轮询方式实现准实时消息推送

消息堆积

1
rocketmq消息存储使用磁盘文件(内存映射)并且在物理布局上为多个大小相等的文件组成逻辑文件组、可无限循环使用、提供消息过期机制、默认保留3

消息过滤:

1
2
1. broker端过滤、
2. consumer过滤、

msg status 说明:

1
2
3
4
SEND_OK, // 发送成功
FLUSH_DISK_TIMEOUT, // 在规定时间内没有完成刷盘、在 flushDiskType 为 SYNC_FLUSH 时才出现
FLUSH_SLAVE_TIMEOUT, // 在主备方式下、broker被设置为 sync_master方式时、未在指定时间内完成主从同步
SLAVE_NOT_AVAILABLE, // 在主备方式下、broker被设置为 sync_master方式时、未找到slave

疑问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1. broker收到消息会先写commitlog、为什么producer发消息的时候会选择一个Queue去发 ?而不是broker ?再由broker分发到Queue ?

2. 写commitlog的时候、同步刷盘指针和异步刷盘指针写的是同一个commitlog文件、如何保证消息的有序存储?

3. 文件清除何时进行 ?

4. 消息队列如何进行负载均衡?

5. 消息发送如何保证高可用

6. 批量消息如何保持一致性

7. broker失效后120s才能将该broker从路由表中移除、若生产者获取到的路由信息包含已宕机的broker如何处理?

8.

使用场景

  • 异步处理
  • 流量控制(令牌桶)
  • 服务解耦

局限

  • 增加系统复杂度
  • 增加部分延迟
  • 可能导致数据不一致

思考

  1. CPU的运算速度 远大于 内存读写速度 –> CPU缓存(L1 / L2 / L3)
  2. 内存的读写速度 远大于 硬盘 –> 内存缓存系统 (redis / memcache 等) 和 本地缓存 、线程缓存
  3. 上游系统的处理速度大于下游依赖系统 –> mq(缓存上游请求)
  4. 网络请求、磁盘操作比较耗时 –> 出现了缓存系统 / 协程 / 多线程等概念来解决
  5. CPU写内存的速度小于写cache -> 先写cache、定时刷新 -> 出现了线程同步问题
  6. 写内存的速度大于写磁盘 –> redis 、mysql等系统应用的日志都是先写cache、按照指定规则刷新 –> 有了系统崩溃短时间内log不能恢复的问题
  7. 生产者和消费者速度不一致 –> 有了broker暂存消息

启动:
进入到编译target目录
eg. 我的下载路径是 /Users/nj/build/rocketmq-all-4.4.0
cd /Users/nj/build/rocketmq-all-4.4.0/distribution/target/apache-rocketmq/

1.启动namesrv
sh bin/mqnamesrv &

  1. 启动broker
    sh bin/mqbroker &

  2. 测试:
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    报错:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    17:28:23.123 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
    org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest1
    See http://rocketmq.apache.org/docs/faq/ for further details.
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:610)
    17:28:29.595 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1223)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1173)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:214)
    at com.lct.quickstart.Producer.main(Producer.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

原因:
mac本地搭建使用VPN时、会有两个内网ip、用telnet找到可以访问的那个、
修改broker.conf
指定brokerIP的地址:
brokerIP1 = 192.168.10.54
brokerIP2 = 192.168.10.54

查看broker启动参数:
sh bin/mqbroker -m

mqAdmin使用:
####查看集群情况
./mqadmin clusterList -n 127.0.0.1:9876

####查看broker状态
./mqadmin brokerStatus -n 127.0.0.1:9876 -b 192.168.10.54:10911

####查看topic列表
./mqadmin topicList -n 192.168.10.54:9876

####查看topic状态
./mqadmin topicStatus -n 127.0.0.1:9876 -t PushTopic

####查看topic路由
./mqadmin topicRoute -n 127.0.0.1:9876 -t PushTopic

实现通过网络来传输数据、需要网络通信类库, 大部分网络通信基础类库都是同步的. 一个TCP连接建立后、用户代码会得到用于收发数据的通道、每个通道会在内存开辟两片区域用于收发数据的缓存

发数据比较简单:
用户代码在发送时写入的数据会暂存在缓存中、然后操作系统会通过网卡、把发送缓存中的数据传输到对端的服务器上
只要缓存不满、或者发送数据的速度未超过网卡传输速度的上限、发送数据的操作耗时就只是写入一次内存的时间、同步发送即可、无需异步

接收数据比较麻烦、它不知道什么时候会收到数据
同步IO: 用一个线程阻塞、等待数据、数据到来、操作系统会先把数据写入到接收缓存、然后给接收数据的线程发通知、接收线程收到通知结束等待、开始读取数据、处理完将继续阻塞、等待下次数据到来
image.png

同步IO处理少量连接没问题、但同时处理大量连接的时候、每个连接都会阻塞一个线程来等待数据、这些连接都在收发数据的时候就会有大量的线程来抢占CPU的世界、造成频繁的CPU上下文切换、导致CPU的负载升高、系统性能下降

期望值:
事先定义好收到数据后的处理逻辑、将它作为一个回调方法、收到数据后、网络通信框架直接回调这个方法就好

Java网络模型

极客时间留言区看到一个很不错的评论、收藏下
举例: 有一个养鸡的农场、里边养着各个农户Thread的鸡Socket、每个农户都在农场中建立了自己的鸡舍SocketCahnnel

  1. BIO: Bolck IO, 每个农户盯着自己的鸡舍、有下蛋、就去捡
  2. NIO: No-Block IO, 单 Selector、农户们花钱请了一个饲养员 Selector, 并且告诉饲养员Register 若哪家的鸡下蛋要向农户报告 select keys
  3. NIO: No-Block IO - 多Selector、鸡舍增多时、一个饲养员巡视(轮询)一次的时间增大、延迟较大、多请几个、每个饲养员分配几个鸡舍管理、减小延迟
  4. epoll: 饲养员不巡查鸡舍、听到有鸡打鸣(活跃连接)就知道下单了
  5. AIO: Asynchronous IO、鸡下单后、饲养员负责取蛋、通知农户来取即可、不需要农户自己到鸡舍取蛋

序列化和反序列化

TCP连接上、传输数据的基本形式是二进制流、0和1、在一般编程语言或者框架提供的api中、传输数据的基本形式是字节Byte、本质上是二进制流

编写的程序是结构化的数据, eg. 类或者结构体
显然, 要使用网络框架的API来传输结构化的数据、必须先实现结构化的数据和字节流之间的双向转换

文件内保存数据的形式也是二进制序列、所以、也需要序列化结构化数据才能实现

如何选择序列化方式

需要权衡的因素:

  • 序列化后的数据易于阅读
  • 实现复杂度低
  • 序列化和发序列化越快越好
  • 序列化后的信息密度越大越好、即同一个结构化刷数据、序列化后占用的空间越小越好

而 1和4 是矛盾的、2 和 3是矛盾的、所以需要根据业务场景合理选择

思考

在内存中存放的数据、最基础的存储单元也是二进制比特、也就是说应用程序操作的对象、在内存中也是使用二进制存储的、既然都是二进制、为什么不直接把二进制数据通过网络发送出去 ?

内存里的内容、不通用、不同系统不同语言的组织可能都是不一样的、而且存在很多引用、指针、并不是直接数据块
序列化、反序列化其实是约定一种标准、大家都遵守就能实现跨语言、跨平台

内存管理

高并发下的内存管理技巧

  1. 优化代码中处理请求的业务逻辑、减少创建一次性对象、eg. 可以将受到请求的Request对象在业务流程中尽量传递下去、而不是执行一个步骤就创建一个内容和Request对象差不多的新对象

  2. 使用频繁的对象、可以考虑建立一个对象池、收到请求后在对象池内申请一个对象、使用完放回对象池

  3. 可能的话、直接使用更大内存的服务器、也可以非常有效的缓解这个问题

1
2
3
4
5
1. 120s更新一次nameSrv地址表
2. 30s或指定配置时间pollNameServerInterval 更新一次 topic路由信息
3. 30s或指定时间 heartbeatBrokerInterval 更新一次broker信息
4. 5s或persistConsumerOffsetInterval时间、持久化一次消费偏移量信息
5. 每分钟动态调整一个threadpool大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
rocketmq消息格式
totalSize: 消息长度、4字节
magicCode: 魔术 4字节
bodycrc: 消息体crc校验码 4字节
queueId: 消息队列id 4字节
flag: 消息flag、mq不处理、
ququeoffset: 消息在队列中的偏移量
physicaloffset: 消息在commitlog中的偏移量
sysflag: 消息系统flag、eg 是否压缩、是否是事务消息等 4字节
bronTimeStamp: 生产者调用消息发送API时的时间戳 8字节
bornHost: 消息发送者ip、端口 8字节
storeTimeStamp: 消息存储时间戳 8字节
storeHostAddress: broker服务器ip+port 8字节
reconsumetimes: 消息重试次数 4字节
bodylength: 消息长度
body: 消息内容
topicLength: 主题存储长度
topic: 主题
propertiesLength: 消息属性长度、2字节
Properties: 消息属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

1. 存储设计
commitLog: 消息存储文件、所有topic的消息都存储在commitLog
大小默认1G、以文件中第一个偏移量为文件名、偏移量小于20位、用0补全

consumeQueue: 消息消费队列、消息到达commitLog后悔异步转发到Queue、
indexFile: 消息索引文件、主要存储key与offset的对应关系

事务状态服务: 存储每条消息的事务状态
定时消息服务: 每一个延迟级别对应一个消息消费队列、存储延迟队列的消息拉取进度

2. msg发送存储流程
若当前broker挂掉或者broker为slave或者不支持写入时、拒绝写入
若topic的长度>256个字符、或者msg的属性长度>65535时、拒绝写入

若消息的延迟级别>0, 将消息的原主题名称和原消息队列id存入消息属性、使用延迟消息队列schedule_topic

transientStorePoolEnable 是否开启缓存池. rocketmq单独创建一个mappedByteBuffer内存缓存池、临时存储数据、数据会先写入该内存映射、然后由commit线程复制到与物理文件对应的内存映射中、主要是提供一种内存锁定、将当前堆外内存一直锁定在内存中、避免被进程将该内存交互到磁盘、提高存储性能


3. 存储文件组织与内存映射机制
doAppend只是将消息存储到byteBuffer中、然后创建AppendMessageResult、只是将消息存储在MappedFile对应的内存映射buffer中、并未刷到磁盘
handleDiskFlush方法会处理数据持久化
handleHA会处理m-s

4. 存储文件
commitlog: 消息存储目录
config运行期间配置信息
consumequque 消息消费队列存储目录
index 消息索引文件存储目录
abort 启动时创建、正常退出之前删除
checkpoint 文件检测点、存储commitlog文件最后一次刷盘时间戳、consumeQueue最后一次刷盘时间、index文件最后一次刷盘时间戳

5. 消费队列、索引文件构成和机制
6. 文件恢复机制
rocketmq将消息全量存储在commitlog中、然后异步转发任务更新consumeQueue、index文件、若消息成功存储在commitLog转发任务执行失败、eg. broker宕机、则存储三者不一致的情况、commitlog中的消息可能永远不会被消费、rocketmq是如何保障最终一致性的 ?
1) 判断上次退出是否正常、在broker启动时check是否存储abort文件、若存在、说明是非正常退出、需要修复
2) 加载延迟队列
3) 加载commitlog文件
4) 加载消息消费队列
5) 加载checkpoint文件、
6) 加载索引文件、索引文件上次刷盘时间<该索引文件最大的消息时间戳时 说明索引文件不完备、立即销毁
7) 根据broker是否正常停止、执行不同的恢复策略

7. 刷盘机制
rocketMQ的刷盘是基于NIO的内存映射机制MappedByteBuffer、消息存储时先将消息追加到内存、再根据配置的刷盘策略在不同时间进行刷写磁盘。
同步刷盘时: 消息追加到内存后悔同步调用force()方法
消息生产者将在消息服务器端将消息内容追加到内存映射文件后、需同步将内存内容立刻刷到磁盘、调用内存映射文件MapppedByteBuffer的force方法可将内存中的数据写入磁盘

异步刷盘时: 消息写到内存后、会立即返回给producer、mq单独起一个线程按照固定频率刷盘
若开启 transientStorePoolEnable机制、RocketMQ会单独申请一个与目标物理文件commitlog相同大小的堆外内存、它将会使用内存锁定、不会被置换到虚拟内存中、消息先追加到堆外内存、然后提交到与物理文件的内存映射内存中、再flush到磁盘
若未开启transientStorePoolEnable机制、消息直接追加到与物理文件直接映射内存中、然后刷写到磁盘
1) 先将消息直接追加到ByteBuffer(堆外内存DirectlyByteBuffer), WrotePosition随消息增加不断向后移动
2) CommiteRealTimeService线程默认每200ms将ByteBuffer中新追加的内容(WritePosition-commitedPosition)提交到MappedByteBuffer中
3) MappedByteBuffer在内存中追加提交的内存、wrotePosition向前后移动、然后返回
4) commit操作成功返回、将commitedPosition想前后移动本次提交的内容长度、此时wrotePosition指针依然可以向前推进
5) flushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内容wrotePosition-上次刷写位置 flushedPositiont通过调研MappedByteBuffer#force方法讲数据刷写到磁盘

8. 文件删除机制
rocketMQ操作commitLog、conssumeQueue是基于内存映射机制并在启动的时候加载commitLog、consumeQueue下所有的文件、为了避免内存与磁盘浪费、引入文件过期机制:
超过一定时间间隔内没有更新的文件被认为过期文件、默认72h、可通过fileReservedTime来修改
满足以下条件会删除:
1) 指定删除文件的时间点、固定时间执行删除 默认凌晨4
2) 磁盘不足时、主动触发过期文件删除操作 磁盘分区使用率超过90%不可写入
3) 预留、手工触发

专业术语

  • Producer

    消息生产者、负责生产消息、一般由业务系统负责生产消息

  • Consumer

    消息消费者、负责消费消息、一般由后台系统负责异步消费

    • Push Consumer

      Consumer的一种、通常向consumer对象注册一个Listener接口、一旦收到消息、Consumer对象立即回调Listener接口方法

    • Pull Consumer

      Consumer的一种、应用通常主动调用Consumer的拉取消息放啊从Broker拉取消息、主动权由应用控制

  • Producer Group

    一类Producer的集合名称、这类Producer通常发送一类消息、且发送逻辑一致

  • Consumer Group

    一类Consumer的集合名称、消费同一类消息、消费逻辑一致

  • Broker

    消息中转角色、负责存储消息、转发消息、一般也称为Server、在JMS规范中称为Provider

  • 广播消费

    一条消息被多个Consumer消费、即使这些Consumer属于同一个group、可以认为在消息划分方面无意义

  • 集群消费

    一个Consumer Group中的Consumer实例平均分摊消费消息、eg. 某个topic有9条消息、其中一个Consumer Group有3个实例、则 每个实例只消费其中的3条消息

  • 顺序消息

    消费消息的顺序要同发送消息的顺序一致、在RocketMQ中、主要指的是局部顺序、即: 同一类消息为满足顺序性、必须Producer单线程顺序发送、且发送到同一个队列、

  • 普通顺序消费

RocketMQ物理部署结构.png

RocketMQ网络部署特点

  • NameServer 几乎无状态节点、可集群部署、节点间无任何信息同步

  • Broker部署相对复杂、分为master和slave、0表示master、非0表示slave、

    每个Broker与Name Server集群中的所有节点建立长连接、定时注册Topic信息到所有的Name Server

  • Producer 与NameServer中的随机一个节点建立长连接、定期从NameServer取topic信息、并向提供topic服务的Master建立长连接、定时向master发送心跳、Producer完全无状态、可集群部署

  • Consumer与NameServer中随机一个节点建立长连接、从NameServer取topic路由信息、并向提供topic服务的Master、Slave建立长连接、定时向master、slave发送心跳、即可以从master订阅消息、也可以从slave订阅消息

RocketMQ逻辑部署结构

(逻辑部署结构.png