RabbitMQ
在哪些场景可能用到消息队列:
- 异步发送(验证码、短信、邮件…)
- MYSQL和Redis , ES之间的数据同步
- 分布式事务
- 削峰填谷
RabbitMQ-如何保证消息不丢失
第一个是开启生产者确认机制,确保生产者的消息能到达队列,如果报错可以先记录到日志中,再去修复数据
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功
消息失败之后如何处理呢?
- 回调方法即时重发
- 记录日志
- 保存到数据库然后定时重发,成功发送后即刻删除表中的数据
第二个是开启持久化功能,确保消息未消费前在队列中不会丢失,其中的交换机、队列、和消息都要做持久化
第三个是开启消费者确认机制为auto,由spring确认消息处理成功后完成ack,当然也需要设置一定的重试次数,我们当时设置了3次,如果重试3次还没有收到消息,就将失败后的消息投递到异常交换机,交由人工处理
RabbitMQ-消费重复问题
我们当时消费者是设置了自动确认机制,当服务还没来得及给MQ确认的时候,服务宕机了,导致服务重启之后,又消费了一次消息。这样就重复消费了(或者也可能是因为网络波动)
因为我们当时处理的支付(订单|业务唯一标识),它有一个业务的唯一标识,我们再处理消息时,先到数据库查询一下,这个数据是否存在,如果不存在,说明没有处理过,这个时候就可以正常处理这个消息了。如果已经存在这个数据了,就说明消息重复消费了,我们就不需要再消费了
解决方案:
每条消息设置一个唯一的标识id
幂等方案:【 分布式锁、数据库锁(悲观锁、乐观锁) 】
RabbitMQ-死信交换机/延迟队列
延迟队列:进入队列的消息会被延迟消费的队列 延迟队列=死信交换机+TTL(生存时间)
场景:超时订单、限时优惠、定时发布
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
DelayExchange插件,需要安装在RabbitMQ中
RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html
RabbitMQ-消息堆积
解决消息堆积有三种种思路:
- 增加更多消费者,提高消费速度
- 在消费者内开启线程池加快消息处理速度
- 扩大队列容积,提高堆积上限
扩大队列容积可以使用RabbitMQ惰性队列。
- 在声明队列的时候可以设置属性x-queue-mode为lazy,即为惰性队列
- 基于磁盘存储,消息上限高
- 性能比较稳定,但基于磁盘存储,受限于磁盘IO,时效性会降低
惰性队列的流程特点:
①接收到消息后直接存入磁盘而非内存
②消费者要消费消息时才会从磁盘中读取并加载到内存
③支持数百万条的消息存储
RabbitMQ的高可用机制(未看)
镜像集群、仲裁队列
Kafka
Kafka-如何保证消息不丢失
生产者发送-丢失
1.异步发送 2.消息重试
broker存储-丢失
可以将acks=all.
消费者接受-丢失
Kafka-重复消费问题
kafka消费消息都是按照offset进行标记消费的,消费者默认是自动按期提交已经消费的偏移量,默认是每隔5s提交一次,如果出现重平衡的情况,可能会重复消费或丢失数据。我们一般都会禁用掉自动提价偏移量,改为手动提交,当消费成功以后再报告给broker消费的位置,这样就可以避免消息丢失和重复消费了
为了消息的幂等,我们也可以设置唯一主键来进行区分,或者是加锁,数据库的锁,或者是redis分布式锁,都能解决幂等的问题
Kafka-保证消费的顺序性
一个topic的数据可能存储在不同的分区中,每个分区都有一个按照顺序的存储的偏移量,如果消费者关联了多个分区不能保证顺序性
把消息都存储同一个分区下就行了,可以通过指定分区号或指定key。(因为默认情况下分区也是通过key的hashcode值来选择分区的)
Kafka-高可用机制
集群+分区备份
选举新Leader:
如果leader失效后,需要选出新的leader,选举的原则如下:
第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的
第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取
ISR(in-sync replica)需要同步复制保存的follower
Kafka-文件存储机制
橙色的itheima是主题topic(逻辑概念),蓝色的itheima-0是分区(物理概念),分区如果文件过大会分段存储segment
每个分段都在磁盘上以索引(xxxx.index)和日志文件(xxxx.log)的形式存储,这样分段的好处是,第一能够减少单个文件内容的大小,查找数据方便,第二方便kafka进行日志清理。
Kafka-数据清理机制
在kafka中提供了两个日志的清理策略:
第一,根据消息的保留时间,当消息保存的时间超过了指定的时间,就会触发清理,默认是168小时( 7天)
第二,是根据topic存储的数据大小,当topic所占的日志文件大小大于一定的阈值,则开始删除最久的消息。这个默认是关闭的
这两个策略都可以通过kafka的broker中的配置文件进行设置
Kafka-高性能设计
Kafka 高性能,是多方面协同的结果,包括宏观架构、分布式存储、ISR 数据同步、以及高效的利用磁盘、操作系统特性等。主要体现有这么几点:
消息分区:不受单台服务器的限制,可以不受限的处理更多的数据
顺序读写:磁盘顺序读写,提升读写效率
页缓存:把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问
零拷贝:减少上下文切换及数据拷贝
消息压缩:减少磁盘IO和网络IO
分批发送:将消息打包批量发送,减少网络开销
其中的页拷贝
将数据原本需要复制四次,改为了复制两次
原来四次:从磁盘文件->页缓存,再到kafka,再到socket缓冲区,再到网卡
改了之后直接从页缓存到网卡
**