[toc]
0.JavaGuide
这里面的东西也是推荐要看一看的。
1.Java
ArrayList源码
1.用空参创建的集合,在底层创建一个默认长度为0的数组。
2.添加第一个元素时,底层会创建一个新的长度为10的数组。
3.存满时,会扩容1.5倍。
4.如果一次添加多个元素,1.5倍还放不下,则新创建数组的长度以实际为准。
ctrl+shift+t 可以查ArrayList源码
ctrl + o 搜索指定的方法。
源码:add函数 add(e,elementData,size)
参数一:当前要添加的元素 参数二:集合底层的数组名字 参数三:集合的长度/当前元素应存入的位置
源码 newLength(int oldLength,int minGrowth,int prefGrowth)
int prefLength=oldLength+Math.max(minGrowth,prefGrowth);
扩容因子1.5,一方面是位运算比较快,一方面是1.5可以让之前的内存空间可以得到复用。
HashMap源码
加载因子0.75, 超过时会扩容为2倍。扩容之后桶下标要重新计算。
树化:当容量大于等于64时,且一个Key下面的链表长度大于8。 将链表转为红黑树
退化:扩容会拆分,当树的元素个数小于等于6,会退化为链表。remove之前,如果root.root.left, root.right, root.left.left如果有一个为null,则会退化为链表
数组容量为什么是2的n次幂? 计算索引时,如果是2的n次幂,可以使用位与运算代替取模,效率更高;扩容时oldCap & hash ==0 的元素留在原来位置,否则新位置=旧位置+oldCap
例子:容量16时,10&16=0,26&16=16.这俩都在10的位置。(新位置)26=10+16
但是这样可能会导致哈希分布性不好,所以也推荐选质数。比如HashTable{0->11->23->47->95}
HashMap不是线程安全的,多线程操作会造成数据错乱现象。
HashMap的key可以为Null,其他的map不行
作为key的对象,必须要实现hashCode和equals,并且Key的内容不能修改
String对象的hashCode()如何设计才能让散列的更加均匀?
每个字符串的hashCode足够独特,$S_0$×$31^{n-1}$+……+$S_{n-1}$×$31^0$
31×h又可以被优化为h<<5 - h
2.计算机网络
HTTP
1)从输入URL到页面展示到底发生了什么?(打开一个网页,整个过程会使用哪些协议)
注意是OSPF协议。
- DNS解析
- TCP连接
- 发送HTTP请求
- 服务器处理请求并返回HTTP报文
- 浏览器解析渲染页面
- 连接结束
DNS解析:一个网址到IP地址的转换。
浏览器中有DNS缓存,DNS负载均衡可以把最近的IP地址返回给用户。DNS端口是53
2)HTTP 和 HTTPS 的区别
端口号 :HTTP 默认是 80,HTTPS 默认是 443
3)HTTP1.0 和 HTTP1.1的区别
连接方式:HTTP/1.0 为短连接,HTTP/1.1 支持长连接。
状态响应码: HTTP/1.1 中新加入了大量的状态码,
缓存机制:HTTP/1.1 则引入了更多的缓存控制策略
带宽: HTTP/1.1它允许只请求资源的某个部分,即返回码是 206(Partial Content)
Host 头(Host Header)处理 :HTTP/1.1 引入了 Host 头字段,允许在同一 IP 地址上托管多个域名,从而支持虚拟主机的功能。
4)HTTP1.1 和 HTTP2.0 的区别
IO 多路复用(Multiplexing) :HTTP/2.0 在同一连接上可以同时传输多个请求和响应(可以看作是 HTTP/1.1 中长链接的升级版本)。HTTP/1.1 则使用串行方式,每个请求和响应都需要独立的连接。这使得 HTTP/2.0 在处理多个请求时更加高效,减少了网络延迟和提高了性能。
二进制帧(Binary Frames) :HTTP/2.0 使用二进制帧进行数据传输,而 HTTP/1.1 则使用文本格式的报文。二进制帧更加紧凑和高效,减少了传输的数据量和带宽消耗。
头部压缩(Header Compression) :HTTP/1.1 支持Body
压缩,Header
不支持压缩。HTTP/2.0 支持对Header
压缩,减少了网络开销。
服务器推送(Server Push):HTTP/2.0 支持服务器推送,可以在客户端请求一个资源时,将其他相关资源一并推送给客户端,从而减少了客户端的请求次数和延迟。而 HTTP/1.1 需要客户端自己发送请求来获取相关资源。
5)HTTP2.0 和 HTTP 3.0的区别
传输协议 :HTTP/2.0 是基于 TCP 协议实现的,HTTP/3.0 新增了 QUIC(Quick UDP Internet Connections) 协议来实现可靠的传输,提供与 TLS/SSL 相当的安全性,具有较低的连接和传输延迟。你可以将 QUIC 看作是 UDP 的升级版本,在其基础上新增了很多功能比如加密、重传等等。HTTP/3.0 之前名为 HTTP-over-QUIC,从这个名字中我们也可以发现,HTTP/3 最大的改造就是使用了 QUIC。
连接建立 :HTTP/2.0 需要经过经典的 TCP 三次握手过程(一般是 3 个 RTT)。由于 QUIC 协议的特性,HTTP/3.0 可以避免 TCP 三次握手的延迟,允许在第一次连接时发送数据(0 个 RTT ,零往返时间)。
队头阻塞 :HTTP/2.0 多请求复用一个 TCP 连接,一旦发生丢包,就会阻塞住所有的 HTTP 请求。由于 QUIC 协议的特性,HTTP/3.0 在一定程度上解决了队头阻塞(Head-of-Line blocking, 简写:HOL blocking)问题,一个连接建立多个不同的数据流,这些数据流之间独立互不影响,某个数据流发生丢包了,其数据流不受影响(本质上是多路复用+轮询)。
错误恢复 :HTTP/3.0 具有更好的错误恢复机制,当出现丢包、延迟等网络问题时,可以更快地进行恢复和重传。而 HTTP/2.0 则需要依赖于 TCP 的错误恢复机制。
安全性 :HTTP/2.0 和 HTTP/3.0 在安全性上都有较高的要求,支持加密通信,但在实现上有所不同。HTTP/2.0 使用 TLS 协议进行加密,而 HTTP/3.0 基于 QUIC 协议,包含了内置的加密和身份验证机制,可以提供更强的安全性。
TCP/IP
1)TCP 与 UDP 的区别
- 是否面向连接 :UDP 在传送数据之前不需要先建立连接。而 TCP 提供面向连接的服务,在传送数据之前必须先建立连接,数据传送结束后要释放连接。
- 是否是可靠传输:远地主机在收到 UDP 报文后,不需要给出任何确认,并且不保证数据不丢失,不保证是否顺序到达。TCP 提供可靠的传输服务,TCP 在传递数据之前,会有三次握手来建立连接,而且在数据传递时,有确认、窗口、重传、拥塞控制机制。通过 TCP 连接传输的数据,无差错、不丢失、不重复、并且按序到达。
- 是否有状态 :这个和上面的“是否可靠传输”相对应。TCP 传输是有状态的,这个有状态说的是 TCP 会去记录自己发送消息的状态比如消息是否发送了、是否被接收了等等。为此 ,TCP 需要维持复杂的连接状态表。而 UDP 是无状态服务,简单来说就是不管发出去之后的事情了(这很渣男!)。
- 传输效率 :由于使用 TCP 进行传输的时候多了连接、确认、重传等机制,所以 TCP 的传输效率要比 UDP 低很多。
- 传输形式 : TCP 是面向字节流的,UDP 是面向报文的。
- 首部开销 :TCP 首部开销(20 ~ 60 字节)比 UDP 首部开销(8 字节)要大。
- 是否提供广播或多播服务 :TCP 只支持点对点通信,UDP 支持一对一、一对多、多对一、多对多;
2) TCP 的三次握手的过程
三次握手能够保证自己和对方的发送和接收功能都是正常的。
3)四次挥手
为什么不能把服务器发送的 ACK 和 FIN 合并起来,变成三次挥手?
因为服务器收到客户端断开连接的请求时,可能还有一些数据没有发完,这时先回复 ACK,表示接收到了断开连接的请求。等到数据发完之后再发 FIN,断开服务器到客户端的数据传送.
当被动关闭方(服务端)在 TCP 挥手过程中,「没有数据要发送」并且「开启了 TCP 延迟确认机制」,那么第二和第三次挥手就会合并传输,这样就出现了三次挥手。
然后因为 TCP 延迟确认机制是默认开启的,所以导致我们抓包时,看见三次挥手的次数比四次挥手还多。
当发送没有携带数据的 ACK,它的网络效率也是很低的,因为它也有 40 个字节的 IP 头 和 TCP 头,但却没有携带数据报文。 为了解决 ACK 传输效率低问题,所以就衍生出了 TCP 延迟确认。 TCP 延迟确认的策略:
- 当有响应数据要发送时,ACK 会随着响应数据一起立刻发送给对方
- 当没有响应数据要发送时,ACK 将会延迟一段时间,以等待是否有响应数据可以一起发送
- 如果在延迟等待发送 ACK 期间,对方的第二个数据报文又到达了,这时就会立刻发送 ACK
4)TCP传输可靠性保证
5) NAT的作用
NAT(Network Address Translation,网络地址转换) 主要用于在不同网络之间转换 IP 地址。它允许将私有 IP 地址(如在局域网中使用的 IP 地址)映射为公有 IP 地址(在互联网中使用的 IP 地址)或者反向映射,从而实现局域网内的多个设备通过单一公有 IP 地址访问互联网。
NAT 不光可以缓解 IPv4 地址资源短缺的问题,还可以隐藏内部网络的实际拓扑结构,使得外部网络无法直接访问内部网络中的设备,从而提高了内部网络的安全性。
3.设计模式
软件设计原则:
开闭原则:对拓展开放,对修改封闭。
里式代换原则:任何基类可以出现的地方,子类一定可以出现,反之不一定。
依赖倒转原则:高层模块不应该依赖低层模块,两者都应该依赖其抽象。
接口隔离原则:客户端不应该被迫依赖于它不使用的方法,一个类对另一个类的依赖应该建立在最小的接口上。
迪米特法则:只和你的直接朋友交谈,不跟 “陌生人” 说话
合成复用原则:尽量先使用组合或者聚合等关联关系来实现,其次才考虑使用继承关系来实现。
设计模式分类:
创建型:专注于对象的创建
结构型:关注类和对象的组织
行为型:关注对象之间的相互交互
创建型模式:单例模式、工厂模式、原型模式、建造者模式
结构型模式:代理模式、适配器模式、装饰器模式、桥接模式、外观模式、组合模式、享元模式
行为型模式:模板方法模式、策略模式、命令模式、职责链模式、状态模式、观察者模式、中介者模式、迭代器模式、访问者模式、备忘录模式、解释器模式
单例模式
一个类只有一个实例
掌握单例模式常见五种实现方式
了解jdk中有哪些地方体现了单例模式
- 饿汉式:类加载就会导致该单实例对象被创建
- 懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建
工厂模式
简单工厂模式:
简单咖啡工厂类,用来生产咖啡`
`*/`
`public class SimpleCoffeeFactory {`
`public Coffee createCoffee(String type) {`
`// 声明Coffee类型的变量,根据不同类型创建不同的coffee子类对象`
`Coffee coffee = null;`
`if ("american".equals(type)) {`
`coffee = new AmericanCoffee();`
`} else if ("latte".equals(type)) {`
`coffee = new LatteCoffee();`
`} else {`
`throw new RuntimeException("对不起,您所点的咖啡没有");`
`}`
`return coffee;`
`}`
`}
你传入type,然后工厂来生产。如果要增加新产品直接修改工厂类,容易扩展,但也违反了“开闭”原则。
工厂方法模式:
分为抽象工厂,具体工厂;抽象产品,具体产品。
无需对原工厂修改,满足开闭原则。但是有新产品就要新建工厂。
抽象工厂模式:
工厂方法模式:一个工厂生产一种类对象的模式。
抽象工厂模式:一个工厂可以生产多种类对象的模式。
意大利风味的甜品工厂 ,生产拿铁咖啡和提拉米苏甜品
美式风味的甜品工厂 ,生产美式咖啡和抹茶慕斯
有咖啡和甜品两个产品族。
策略模式
策略模式的主要角色如下:
- 抽象策略(Strategy)类:这是一个抽象角色,通常由一个接口或抽象类实现。此角色给出所有的具体策略类所需的接口。
- 具体策略(Concrete Strategy)类:实现了抽象策略定义的接口,提供具体的算法实现或行为。
- 环境(Context)类:持有一个策略类的引用,最终给客户端调用。
优点:
- 策略类之间可以自由切换:由于策略类都实现同一个接口,所以使它们之间可以自由切换。
- 易于扩展:增加一个新的策略只需要添加一个具体的策略类即可,基本不需要改变原有的代码,符合“开闭原则“
- 避免使用多重条件选择语句(if else),充分体现面向对象设计思想。
实例:
Arrays.sort(data, new Comparator<Integer>() {
public int compare(Integer o1, Integer o2) {
return o2 - o1;
}
});
其中Comparator 充当的是抽象策略角色,而具体的子实现类充当的是具体策略角色。
不如我的发票管理的策略模式。
责任链模式
职责链模式主要包含以下角色:
- 抽象处理者(Handler)角色:定义一个处理请求的接口,包含抽象处理方法和一个后继连接。
- 具体处理者(Concrete Handler)角色:实现抽象处理者的处理方法,判断能否处理本次请求,如果可以处理请求则处理,否则将该请求转给它的后继者。
- 客户类(Client)角色:创建处理链,并向链头的具体处理者对象提交请求,它不关心处理细节和请求的传递过程。
最经典的就是请假流程控制系统
优点:
降低了对象之间的耦合度,该模式降低了请求发送者和接收者的耦合度。
增强了系统的可扩展性,可以根据需要增加新的请求处理类,满足开闭原则。
增强了给对象指派职责的灵活性,当工作流程发生变化,可以动态地改变链内的成员或者修改它们的次序,也可动态地新增或者删除责任。
责任链简化了对象之间的连接,一个对象只需保持一个指向其后继者的引用,不需保持其他所有处理者的引用,这避免了使用众多的 if 或者 if···else 语句。
责任分担,每个类只需要处理自己该处理的工作,不能处理的传递给下一个对象完成,明确各类的责任范围,符合类的单一职责原则。
代理模式
代理模式的作用:保护、增强、解耦
当访问对象不适合或者不能直接引用目标对象,可以提供一个代理以控制对该对象的访问,代理对象作为访问对象和目标对象之间的中介。
Java 中的代理按照代理类生成时机不同又分为静态代理和动态代理;
静态代理代理类在编译期就生成,而动态代理代理类则是在 Java 运行时动态生成;
动态代理又有 JDK 代理和 CGLib 代理两种。
静态代理:火车站是目标对象,代售点是代理对象。
JDK 动态代理要求必须定义接口,因为它只能对接口进行代理。
获取代理对象的工厂类 ProxyFactory。
CGLIB 动态代理可以不代理接口,直接代理类。
使用场景:远程代理我们只能访问接口。
4.MySQL
MySQL基础语法
执行顺序
MySQL Full join的实现
因为MySQL不支持FULL JOIN,下面是替代方法/
1.UNION 会自动去重
select * from t1 left join t2 on t1.id = t2.id
union
select * from t1 right join t2 on t1.id = t2.id;
2.UNION ALL 的方式 (取左表的全部和右表独有)
select * from t1 left join t2 on t1.id = t2.id
union all
select * from t1 right join t2 on t1.id = t2.id
where t1.id is null;
sql实战
阿里开发手册:
【强制】在 varchar 字段上建立索引时,必须指定索引长度,没必要对全字段建立索引,根据实际文本区
分度决定索引长度。
说明:索引的长度与区分度是一对矛盾体,一般对字符串类型数据,长度为 20 的索引,区分度会高达 90%以上,可以使用 count(distinct left(列名,索引长度)) / count(*) 的区分度来确定。
时间日期处理
SELECT DATE_FORMAT(NOW() , '%Y年%m月%d日 %H时%i分%s秒');
SELECT NOW();
SELECT * FROM ucenter_member WHERE DATE(gmt_create) = '2019-01-02';
SELECT * FROM ucenter_member WHERE DATE_FORMAT(gmt_create , '%Y-%m-%d') = '2019-01-02';
行转列
SELECT * FROM t_score;
需求:行转列显示学生直观显示学生各科成绩
SELECT stuid ,
SUM(IF(SUBJECT = 'Java基础' , score , NULL)) 'Java基础',
SUM(IF(SUBJECT = 'mysql' , score , NULL)) 'mysql',
SUM(IF(SUBJECT = 'Javaweb' , score , NULL)) 'Javaweb',
SUM(IF(SUBJECT = 'ssm' , score , NULL)) 'ssm'
FROM t_score
GROUP BY stuid;
删除重复行
SELECT * FROM t_score ORDER BY stuid,SUBJECT;
需求:每个学生同一学科有多个成绩的,保留分数高的
DELETE FROM t_score WHERE id NOT IN(
SELECT tmp.id FROM
(SELECT id FROM t_score t1 JOIN (
SELECT stuid , SUBJECT , MAX(score) m_score
FROM t_score
GROUP BY stuid , SUBJECT) t2
ON t1.`stuid` = t2.stuid
AND t1.`subject` = t2.subject
AND t1.`score` = t2.m_score)tmp
);
存储引擎
存储引擎对比
MySQL 5.5.5 之前,MyISAM 是 MySQL 的默认存储引擎。5.5.5 版本之后,InnoDB 是 MySQL 的默认存储引擎。
只有 InnoDB 支持事务。
存储引擎是基于表的,而不是数据库。我们可以为不同的数据库的表配置不同的存储引擎
MyISAM 的性能还行,各种特性也还不错(比如全文索引、压缩、空间函数等)。但是,MyISAM 不支持事务和行级锁,而且最大的缺陷就是崩溃后无法安全恢复。
重点区分还是行级锁,事务,外键,MVCC,崩溃恢复,读性能,写性能
- InnoDB 支持行级别的锁粒度,MyISAM 不支持,只支持表级别的锁粒度。
- MyISAM 不提供事务支持。InnoDB 提供事务支持,实现了 SQL 标准定义了四个隔离级别。
- MyISAM 不支持外键,而 InnoDB 支持。
- MyISAM 不支持 MVCC,而 InnoDB 支持。
- 虽然 MyISAM 引擎和 InnoDB 引擎都是使用 B+Tree 作为索引结构,但是两者的实现方式不太一样。
- MyISAM 不支持数据库异常崩溃后的安全恢复,而 InnoDB 支持。依赖于redo日志。
- InnoDB 的性能比 MyISAM 更强大。随着 CPU 核数的增加,InnoDB 的读写能力呈线性增长。MyISAM 因为读写不能并发,它的处理能力跟核数没关系。如果有更新和删除操作,应该首先考虑InnoDB.
还有一些其他的存储引擎,各有特点。
Archive引擎:用于数据存档,只支持INSERT和SELECT操作
Blackhole引擎:丢弃写操作,读操作会返回空内容
CSV引擎:存储数据时,以逗号分隔各个数据项
Memory引擎:置于内存的表
Federated引擎:访问远程表
Merge引擎:管理多个MyISAM表构成的集合
NDB引擎:Mysql集群专用存储引擎
页,区,段,表
InnoDB中页的大小默认为16KB,以页作为磁盘和内存之间交互的基本单位。
查看mysql文件页大小(16K):
SHOW GLOBAL STATUS LIKE '%page_size%'
为什么mysql页文件默认16K?
假设我们一行数据大小为1K,那么一页就能存16条数据,也就是一个叶子节点能存16条数据;再看非叶子节点,假设主键ID为bigint类型,那么长度为8B,指针大小在Innodb源码中为6B,一共就是14B,那么一页里就可以存储16K/14B=1170个(主键+指针)
一颗高度为2的B+树能存储的数据为:1170*16=18720条,
一颗高度为3的B+树能存储的数据为:1170*1170*16=21902400(千万级)
因为每次是将这个记录所在的页加载到内存中进行读取。一个页中有上千条记录。所以普通索引和唯一索引的性能差异不大。
引入 区 的概念,一个区就是在物理位置上连续的 64个页。因为InnoDB 中的页大小默认是16KB,所以 一个区的大小是64*16KB= 1MB 。
InnoDB对 B+树的 叶子节点 和 非叶子节点 进行了区别对待,也就是说叶子节点有自己独有的区,非叶子节点也有自 己独有的区。存放叶子节点的区的集合就算是一个 段( segment) ,存放非叶子节点的区的集合也算是 一个段。
段其实不对应表空间中某一个连续的物理区域,而是一个逻辑上的概念,由若干个零散的页面以及一些 完整的区组成。
为了考虑以完整的区为单位分配给某个段对于 数据量较小 的表太浪费存储空间的这种情况,InnoDB提 出了一个 碎片(fragment)区 的概念。在一个碎片区中,并不是所有的页都是为了存储同一个段的数据 而存在的,而是碎片区中的页可以用于不同的目的,比如有些页用于段A,有些页用于段B,有些页甚至 哪个段都不属于。 碎片区直属于表空间 ,并不属于任何一个段。
表空间可以看做是InnoDB存储引擎逻辑结构的最高层,所有的数据都存放在表空间中。 表空间是一个 逻辑容器 ,表空间存储的对象是段,在一个表空间中可以有一个或多个段,但是一个段只 能属于一个表空间。
索引底层原理
索引(Index)是帮助MySQL高效获取数据的数据结构.
优点:查的快,降低IO成本; 通过创建唯一索引,可以保证数据的唯一性; 加速表与表之间的连接,减低分组和排序子句的时间。
缺点:创建索引和维护索引要耗费时间; 索引需要占磁盘空间; 索引会降低更新表的速度。
B+树
底层数据结构:B+树,很适合排序查找和范围查找。
B树
B+树
而且B+树的非叶子节点不存数据,则可以比B树存更多的数据。这使得B+树更加矮胖
B+树优点:B+树查询效率更稳定(要访问到叶子节点); B+树的查询效率更高(一般比B树更矮胖);B+树的磁盘读写代价更低(因为内部非叶子节点存储的东西更少)
B+树的存储能力如何?为何说一般查找行记录,最多只需1~3次磁盘IO? InnoDB存储引擎中页的大小为16KB,一个int类型或者一个指针也就4个字节或者8个字节。所以一个页可以存储16KB/(8B+8B)=1000个键值。则深度为3的树就可以存储$10^3$×*$10^3$×$10^3$=一亿条数据
InnoDB的B+树索引的注意事项
1.根页面位置不动 2.非叶子节点目录项记录唯一 3.一个页面最少存储两条记录
MyISAM索引:将数据和索引分开存储,索引树的叶子节点存的是主键值+数据记录地址
MyISAM与InnoDB的对比:
1.InnoDB中,根据主键值对聚簇索引进行一次查询即可。而在MyISAM中,要进行一次回表。MyISAM的回表操作是十分 快速 的。
2.InnoDB的数据文件本身就是索引文件,而MyISAM索引文件和数据文件是 分离的 ,索引文件仅保存数 据记录的地址。
3.InnoDB的非聚簇索引data域存储相应记录 主键的值 ,而MyISAM索引记录的是 地址 。
4.InnoDB要求表 必须有主键 ( MyISAM可以没有 )。如果没有显式指定,则MySQL系统会自动选择一个 可以非空且唯一标识数据记录的列作为主键。再没有的话可以生成一个隐含字段。
事务与锁
1)事务基础知识
在MySQL中,只有InnoDB是支持事务的。
事务的ACID特性:
原子性(atomicity): 要么全部提交,要么全部失败回滚。
一致性(consistency):指事务执行前后,数据从一个 合法性状态 变换到另外一个 合法性状态 。
隔离性(isolation): 指一个事务的执行 不能被其他事务干扰 ,即一个事务内部的操作及使用的数据对 并发 的 其他事务是隔离的,并发执行的各个事务之间不能互相干扰。
持久性(durability): 一个事务一旦被提交,它对数据库中数据的改变就是 永久性的 ,接下来的其他操作和数据库 故障不应该对其有任何影响。
事务的状态:
事务的使用: BEGIN; COMMIT; ROLLBACK;
数据并发问题 : 脏写,脏读,不可重复读,幻读
1)脏写(Dirty Write): 事务Session A 修改了 另一个 未提交 事务Session B 修改过 的数据
- 脏读(Dirty Read): Session A 读取 了已经被 Session B 更新 但还 没有被提交 的字段。 之后若 Session B 回滚 ,Session A 读取 的内容就是 临时且无效 的.
- 不可重复读(Non-Repeatable Read):Session A 读取 了一个字段,然后 Session B 更新 了该字段。 之后 Session A 再次读取 同一个字段, 值就不同 了。
4)幻读(Phantom): Session A 从一个表中 读取 了一个字段, 然后 Session B 在该表中 插 入 了一些新的行。 之后, 如果 Session A 再次读取 同一个表, 就会多出几行。
四种隔离级别:未提交读,提交读,可重复读,序列化
脏写的问题太严重了,不论是哪种隔离级别,都不允许脏写的情况发生。
MySQL的默认隔离级别为REPEATABLE READ。
2)MySQL事务日志-redo,undo,bin
事务有4种特性:原子性、一致性、隔离性和持久性。那么事务的四种特性到底是基于什么机制实现呢?
事务的隔离性由 锁机制 实现。 而事务的原子性、一致性和持久性由事务的 redo 日志和undo 日志来保证。
REDO LOG 称为 重做日志 ,提供再写入操作,恢复提交事务修改的页操作,用来保证事务的持 久性。
UNDO LOG 称为 回滚日志 ,回滚行记录到某个特定版本,用来保证事务的原子性、一致性。
redo log:只需要把 修改 了哪些东西 记录一下 就好.
好处:redo日志降低了刷盘频率 redo日志占用的空间非常小
特点:redo日志是顺序写入磁盘的 事务执行过程中,redo log不断记录
redo log的组成:(保存在内存中的) 重做日志的缓冲 (redo log buffer),(保存在磁盘中的)重做日志文件 (redo log file)
redo log 的流程:
刷盘:就是将redo log buffer的东西写入到磁盘中(即redo log file)中。
InnoDB给出 innodb_flush_log_at_trx_commit 参数,该参数控制 commit提交事务 时,如何将 redo log buffer 中的日志刷新到 redo log file 中。
它支持三种策略:
设置为0 :表示每次事务提交时不进行刷盘操作。(系统默认master thread每隔1s进行一次重做日 志的同步)
设置为1 :表示每次事务提交时都将进行同步,刷盘操作( 默认值 )
设置为2 :表示每次事务提交时都只把 redo log buffer 内容写入 page cache,不进行同步。由os自己决定什么时候同步到磁盘文件。
redo log是事务持久性的保证,undo log是事务原子性的保证。在事务中 更新数据 的 前置操作 其实是要 先写入一个 undo log 。
undo log 的作用: 1. 回滚 2.MVCC
undo log是逻辑日志,对事务回滚时,只是将数据库逻辑地恢复到原来的样子。
redo log是物理日志,记录的是数据页的物理变化,undo log不是redo log的逆过程.
binlog(归档日志)保证了 MySQL 集群架构的数据一致性。主节点写bin log 日志,从节点读bin log 日志。
两阶段提交
redo log 在事务执行过程中可以不断写入,而 binlog 只有在提交事务时才写入,所以 redo log 与 binlog 的写入时机不一样。会出问题。所以将redo log拆为prepare阶段和commit阶段。
3)锁
事务的隔离性由 锁来实现
怎么解决脏读 、 不可重复读 、 幻读 这些问题呢?其实有两种可选的解决方案:
方案一:读操作利用多版本并发控制( MVCC ,下章讲解),写操作进行 加锁 。
方案二:读、写操作都采用 加锁 的方式。
锁的分类:
按对数据的操作类型划分:读锁/共享锁(S), 写锁/排他锁(X)
按锁的粒度划分: 表级锁, 行级锁, 页级锁
按对锁的态度:悲观锁,乐观锁
按加锁方式: 隐式锁, 显示锁
Java中 synchronized 和 ReentrantLock 等独占锁就是 悲观锁思想的实现
乐观锁:在更新 的时候会判断一下在此期间别人有没有去更新这个数据。在程序上,我们可以采用 版本号机制 或者 CAS机制 实现。在Java中 java.util.concurrent.atomic 包下的原子变量类就是使用了乐观锁的CAS实现的。
发生死锁应该怎么办?:1,直接进入等待,直到超时。2,发起死锁检测,发现死锁后,主动回滚死锁链条中的某一个事务。
InnoDB 行锁是通过对索引数据页上的记录加锁实现的,MySQL InnoDB 支持三种行锁定方式:
- 记录锁(Record Lock):也被称为记录锁,属于单个行记录上的锁。
- 间隙锁(Gap Lock):锁定一个范围,不包括记录本身。
- 临键锁(Next-Key Lock):Record Lock+Gap Lock,锁定一个范围,包含记录本身,主要目的是为了解决幻读问题(MySQL 事务部分提到过)。记录锁只能锁住已经存在的记录,为了避免插入新记录,需要依赖间隙锁。
在 InnoDB 默认的隔离级别 REPEATABLE-READ 下,行锁默认使用的是 Next-Key Lock。
意向锁是表级锁,共有两种:
- 意向共享锁(Intention Shared Lock,IS 锁):事务有意向对表中的某些记录加共享锁(S 锁),加共享锁前必须先取得该表的 IS 锁。
- 意向排他锁(Intention Exclusive Lock,IX 锁):事务有意向对表中的某些记录加排他锁(X 锁),加排他锁之前必须先取得该表的 IX 锁。
意向锁是由数据引擎自己维护的,用户无法手动操作意向锁,在为数据行加共享/排他锁之前,InooDB 会先获取该数据行所在在数据表的对应意向锁。
意向锁之间是互相兼容的。
意向锁和共享锁和排它锁互斥(这里指的是表级别的共享锁和排他锁,意向锁不会与行级的共享锁和排他锁互斥)。
IS 锁 | IX 锁 | |
---|---|---|
S 锁 | 兼容 | 互斥 |
X 锁 | 互斥 | 互斥 |
MVCC机制
MVCC (Multiversion Concurrency Control),多版本并发控制。顾名思义,MVCC 是通过数据行的多个版 本管理来实现数据库的 并发控制 。这项技术使得在InnoDB的事务隔离级别下执行 一致性读 操作有了保 证。换言之,就是为了查询一些正在被另一个事务更新的行,并且可以看到它们被更新之前的值,这样 在做查询的时候就不用等待另一个事务释放锁。
MVCC是快照读,是一种乐观锁的体现
MVCC的实现依赖于:隐藏字段、 Undo Log版本链、 Read View
隐藏字段
在内部,InnoDB
存储引擎为每行数据添加了三个 隐藏字段:
DB_TRX_ID(6字节)
:表示最后一次插入或更新该行的事务 id。此外,delete
操作在内部被视为更新,只不过会在记录头Record header
中的deleted_flag
字段将其标记为已删除DB_ROLL_PTR(7字节)
回滚指针,指向该行的undo log
。如果该行未被更新,则为空DB_ROW_ID(6字节)
:如果没有设置主键且该表没有唯一非空索引时,InnoDB
会使用该 id 来生成聚簇索引
隐藏字段,trx_id, 最近被哪个事务修改过,记录其Id。 roll_pointer 是undo log 版本链的指针。
undo log 版本链
ReadView
核心问 题就是需要判断一下版本链中的哪个版本是当前事务可见的,这是ReadView要解决的主要问题。
Read View就是事务A在使用MVCC机制进行快照读操作时产生的读视图。
Read View主要是用来做可见性判断,里面保存了 “当前对本事务不可见的其他活跃事务”
class ReadView {
/* ... */
private:
trx_id_t m_low_limit_id; /* 大于等于这个 ID 的事务均不可见 */
trx_id_t m_up_limit_id; /* 小于这个 ID 的事务均可见 */
trx_id_t m_creator_trx_id; /* 创建该 Read View 的事务ID */
trx_id_t m_low_limit_no; /* 事务 Number, 小于该 Number 的 Undo Logs 均可以被 Purge */
ids_t m_ids; /* 创建 Read View 时的活跃事务列表 */
m_closed; /* 标记 Read View 是否 close */
}
ReadView里面的内容:
- 生成ReadView时,当前系统中活跃的事务id的列表
- 活跃的事务中最小的事务Id
- 生成ReadView时系统中应该分配给下一个事务的id. (所有事务id中最大的id+1)
- 创建这个ReadView的事务Id
查找规则:按照ReadView里面的事务id去Undo Log里面去查,找一样的版本。或者小于活跃的最小值,或者在活跃的最大值和最小值之间,但是不在ReadView里(即不活跃,已提交的事务)
操作流程:
- 首先获取事务自己的版本号,也就是事务 ID;
- 获取 ReadView;
- 查询得到的数据,然后与 ReadView 中的事务版本号进行比较;
- 如果不符合 ReadView 规则,就需要从 Undo Log 中获取历史快照;
- 最后返回符合规则的数据
因为读未提交可以读到最新数据,串行化读也是。所以只考虑读已提交RC和可重复读RR的MVCC
READ COMMITTED :每次读取数据前都生成一个ReadView。
使用 REPEATABLE READ 隔离级别的事务来说,只会在第一次执行查询语句时生成一个 ReadView ,之 后的查询就不会重复生成了。
可重复读(RR)如何解决幻读的?因为只生成了最开始的一个ReadView,生成ReadView的时机。
性能优化
性能分析工具: 首先要开启慢查询,指定一下阈值(默认是10s),慢查询日志分析工具:mysqldumpslow
EXPLAIN加在查询语句前面。
聚簇索引:所有的用户记录都存在了叶⼦节点,数据即索引,索引即数据。以主键值大小进行排序。
非聚簇索引(二级索引):以别的键为搜索条件时使用,回表。
联合索引:可以先按C2列排序,再按C3列排序。
索引优化
最佳左前缀法则
计算,函数,类型转换导致索引失效。
范围条件右边的列索引失效
不等于(!= 或者 <>)索引失效
is null 可以使用索引, is not null 无法使用索引
like 以通配符%开头索引失效
OR前后存在非索引的列,索引失效
覆盖索引:简单说就是, 索引列+主键 包含 SELECT 到 FROM之间查询的列
索引下推:本来有100条需要回表,但是where里面有 name like %张%, 所以也可以用一下这种索引去查张,把回表数降为10.
month会导致索引失效。所以下推的这个,只是个普通的查询条件?
SELECT * FROM user WHERE zipcode = '431200' AND MONTH(birthdate) = 3;
排序优化:
以下三种情况不走索引:
- 无过滤,不索引
- 顺序错,不索引
- 方向反,不索引
create index idx_age_deptid_name on emp (age,deptid,name)
# 下面没有where或limit,属于无过滤,则不会走索引。
explain select SQL_NO_CACHE * from emp order by age,deptid;
# 下面这种会走索引
explain select SQL_NO_CACHE * from emp order by age,deptid limit 10;
explain select * from emp where age=45 order by deptid;
# 顺序错,不索引
explain select * from emp where age=45 order by deptid desc, name desc ;
# 方向反 不索引
explain select * from emp where age=45 order by deptid asc, name desc ;
关联查询
驱动表的数据一定要全查。
对于下面3个SQL
EXPLAIN SELECT * FROM class LEFT JOIN book ON class.card = book.card;
EXPLAIN SELECT * FROM class RIGHT JOIN book ON class.card = book.card;
EXPLAIN SELECT * FROM class INNER JOIN book ON class.card = book.card;
如果我对Book创建了索引。
第一条。class作为驱动表要全查,book有索引,查的很快。 16*1
第二条。book作为驱动表要全查,没有被优化. 20*16
第三条。inner join会自动选择记录少的作为驱动表。 class为驱动表, 16*1
综上,肯定是把20行变为1行好,所以应该是大表创建索引,然后将其作为被驱动表(小表驱动大表)。
关于exits和In
filesort算法
如果不在索引列上,filesort有两种算法:mysql就要启动双路排序和单路排序
MySQL在4.1版本之前是双路排序。从磁盘取排序字段,再buffer进行排序,再从磁盘取其他字段。
在MySQL4.1之后,出现了第二种改进的算法,就是单路排序。
从磁盘读取查询需要的所有列,按照order by列在buffer对它们进行排序,然后扫描排序后的列表进行输出,它的效率更快一些,避免了第二次读取数据。并且把随机IO变成了顺序IO,但是它会使用更多的空间,因为它把每一行都保存在内存中了。
其他
尽量控制单表数据量的大小,建议控制在 500 万以内。
常见索引列建议
- 出现在 SELECT、UPDATE、DELETE 语句的 WHERE 从句中的列
- 包含在 ORDER BY、GROUP BY、DISTINCT 中的字段
- 并不要将符合 1 和 2 中的字段的列都建立一个索引, 通常将 1、2 中的字段建立联合索引效果更好
- 多表 join 的关联列
索引列的顺序:区分度高的,字段长度小的,最频繁的列放在联合索引的左侧。
对于频繁的查询优先考虑覆盖索引。覆盖索引:就是包含了所有查询字段 (where,select,order by,group by 包含的字段) 的索引。
这样可以,避免 InnoDB 表进行索引的二次查询,也就是回表操作。把随机IO变为顺序IO加快查询效率。
在定义联合索引时,如果 a 列要用到范围查找的话,就要把 a 列放到联合索引的右侧,使用 left join 或 not exists 来优化 not in 操作,因为 not in 也通常会使用索引失效。
将子查询优化为left join 操作。子查询性能差的原因: 子查询的结果集无法使用索引,通常子查询的结果集会被存储到临时表中,不论是内存临时表还是磁盘临时表都不会存在索引,所以查询性能会受到一定的影响。
在明显不会有重复值时使用 UNION ALL 而不是 UNION
- UNION 会把两个结果集的所有数据放到临时表中后再进行去重操作
- UNION ALL 不会再对结果集进行去重操作
5.Redis
mysql的并发是1000/s, redis的并发是10W/s
常见数据结构
详细请参考这篇文章
五种基本数据类型:字符串(string),散列表(hash), 列表(list),集合(set),有序集合(sortedset)
三种特殊数据类型:地理位置(Geospatial),基数统计(Hyperloglog), 位图(Bitmap)
应用场景:
string: 需要存储常规数据的场景,比如缓存 Session、Token、图片地址、序列化后的对象(相比较于 Hash 存储更节省内存);需要计数的场景,用户单位时间的请求数(简单限流可以用到)、页面单位时间的访问数。
list:信息流展示,文章列表,动态列表。
hash:对象数据存储,
set:需要存放的数据不能重复的场景,网站 UV 统计(数据量巨大的场景还是 HyperLogLog
更适合一些)、文章点赞、动态点赞等场景。需要获取多个数据源交集、并集和差集的场景,共同好友(交集)、共同粉丝(交集)、共同关注(交集)、好友推荐(差集)、音乐推荐(差集)、订阅号推荐(差集+交集) 等场景。需要随机获取数据源中的元素的场景,抽奖系统、随机点名等场景。
sortedset:需要随机获取数据源中的元素根据某个权重进行排序的场景,各种排行榜比如直播间送礼物的排行榜、朋友圈的微信步数排行榜、王者荣耀中的段位排行榜、话题热度排行榜等等。需要存储的数据有优先级或者重要程度的场景 比如优先级任务队列。
bitmap:需要保存状态信息(0/1 即可表示)的场景,用户签到情况、活跃用户情况、用户行为统计(比如是否点赞过某个视频)。
Hyperloglog:数量量巨大(百万、千万级别以上)的计数场景,热门网站每日/每周/每月访问 ip 数统计、热门帖子 uv 统计
Geospatial:需要管理使用地理空间数据的场景,附近的人。
对于这些数据结构更底层的实现
动态字符串-SDS
C语言字符串存在很多问题:获取字符串长度的需要通过运算,非二进制安全,不可修改
SDS是一个结构体:当flag=1时,为uint8_t的类型
假如我们要给SDS追加一段字符串“,Amy”,这里首先会申请新内存空间:
如果新字符串小于1M,则新空间为扩展后字符串长度的两倍+1;
如果新字符串大于1M,则新空间为扩展后字符串长度+1M+1。称为内存预分配
SDS的优点:
①获取字符串长度的时间复杂度为O(1)
②支持动态扩容
③减少内存分配次数
④二进制安全
IntSet
先把20放到合适的位置,再放10,再放5.
Intset的一些特点:
①Redis会确保Intset中的元素唯一、有序
②具备类型升级机制,可以节省内存空间
③底层采用二分查找方式来查询
Dict
uDict采用渐进式rehash,每次访问Dict时执行一次rehash
urehash时ht[0]只减不增,新增操作只在ht[1]执行,其它操作在两个哈希表
ZipList
ZipList特性:
①压缩列表的可以看做一种连续内存空间的”双向链表”
②列表的节点之间不是通过指针连接,而是记录上一节点和本节点长度来寻址,内存占用较低
③如果列表数据过多,导致链表过长,可能影响查询性能
④增或删较大数据时有可能发生连续更新问题
QuickList
每个ZipList的内存占用不能超过8kb(可配置),控制ziplist占用内存的大小。
zipList的中间节点可以压缩,进一步节省了内存
SkipList
跳表,比AVL树好在不用旋转保持平衡的耗时,红黑树太复杂,Redis作为内存数据库不用存储大量数据,也就不用B+树
Redis五种数据结构
String
其基本编码方式是RAW,基于简单动态字符串(SDS)实现,存储上限为512mb。
如果存储的SDS长度小于44字节,则会采用EMBSTR编码,此时object head与SDS是一段连续空间。申请内存时只需要调用一次内存分配函数,效率更高。
如果存储的字符串是整数值,并且大小在LONG_MAX范围内,则会采用INT编码:直接将数据保存在RedisObject的ptr指针位置(刚好8字节),不再需要SDS了。
List
在3.2版本之前,Redis采用ZipList和LinkedList来实现List,当元素数量小于512并且元素大小小于64字节时采用ZipList编码,超过则采用LinkedList编码。
在3.2版本之后,采用QuickList
set
少量整数IntSet,正常采用Dict,元素存在key中,value为null
当添加一个字符串后,会从上面变成下面。
zset
zset底层数据结构必须满足键值存储、键必须唯一、可排序这几个需求。
SkipList:可以排序,并且可以同时存储score和ele值(member)
HashTable:可以键值存储,并且可以根据key找value
数据存了两份,太占用内存了。
当元素数量不多时,zset还会采用ZipList结构来节省内存,不过需要同时满足两个条件:
①元素数量小于zset_max_ziplist_entries,默认值128
②每个元素都小于zset_max_ziplist_value字节,默认值64
ziplist本身没有排序功能,而且没有键值对的概念,因此需要有zset通过编码实现:
- ZipList是连续内存,因此score和element是紧挨在一起的两个entry, element在前,score在后
- score越小越接近队首,score越大越接近队尾,按照score值升序排列
hash
少量数据ziplist,大量数据dict
Hash结构默认采用ZipList编码,用以节省内存。 ZipList中相邻的两个entry 分别保存field和value
当数据量较大时,Hash结构会转为HT编码,也就是Dict,触发条件有两个:
- ZipList中的元素数量超过了hash-max-ziplist-entries(默认512)
- ZipList中的任意entry大小超过了hash-max-ziplist-value(默认64字节)
缓存
写入/写出redis,要序列化JSONUtil(⭐⭐⭐)
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result querygetById(Long id) {
//1.从Redis内查询商品缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
if(StrUtil.isNotBlank(shopJson)){
//手动反序列化
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//2.不存在就根据id查询数据库
Shop shop = getById(id);
if(shop==null){
return Result.fail("商户不存在!");
}
//3.数据库数据写入Redis
//手动序列化
String shopStr = JSONUtil.toJsonStr(shop);
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,shopStr,CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
}
}
查很多东西,并且写。设置有效时间。
@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryList() {
//1.从Redis中查询
String key = CACHE_SHOPTYPE_KEY;
List<String> list = stringRedisTemplate.opsForList().range(key, 0, -1);
if(!list.isEmpty()){
//手动反序列化
List<ShopType> typeList = new ArrayList<>();
for (String s : list) {
ShopType shopType = JSONUtil.toBean(s, ShopType.class);
typeList.add(shopType);
}
return Result.ok(typeList);
}
//2.从数据库内查询
List<ShopType> typeList = query().orderByAsc("sort").list();
if(typeList.isEmpty()){
return Result.fail("不存在该分类!");
}
//序列化
for (ShopType shopType : typeList) {
String s = JSONUtil.toJsonStr(shopType);
list.add(s);
}
//3.存入缓存
stringRedisTemplate.opsForList().rightPushAll(key,list);
stringRedisTemplate.expire(key,CACHE_SHOPTYPE_TTL,TimeUnit.MINUTES);
return Result.ok(list);
}
}
更新数据库的时候要删除缓存,以实现缓存和数据库双写一致
缓存穿透
定义概念 :
缓存穿透是指查询一个一定不存在的数据,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到 DB 去查询,可能导致 DB 挂掉。这种情况大概率是遭到了攻击。
比如http://localhost:15080/vat-admin-gateway/rc/rcModel/selectById/id=0
显然id不能等于0,从1开始
解决方案 :
(1)业务层校验,先对参数进行校验,id小于等于0的直接返回错误请求。
(2)缓存空对象:在没有的数据中存一个null,而这些空的对象会设置一个有效期)
(3) 布隆过滤器。 先访问布隆过滤器,再访问Redis,再访问数据库
缓存击穿
定义概念:
缓存击穿的意思是对于设置了过期时间的key,缓存在某个时间点过期的时候,恰好这时间点对这个Key有大量的并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把 DB 压垮。
解决方案:
1.粗暴的设置永不过期 2.定时更新过期时间 3.设置互斥锁 4.逻辑过期
①、设置热点数据永不过期
对于某个需要频繁获取的信息,缓存在Redis中,并设置其永不过期。当然这种方式比较粗暴,对于某些业务场景是不适合的。
②、定时更新/提前预热
比如这个热点数据的过期时间是1h,那么每到59minutes时,通过定时任务去更新这个热点key,并重新设置其过期时间。
③、互斥锁
这是解决缓存穿透比较常用的方法。这样只会有一个请求去查询数据库并更新缓存
互斥锁简单来说就是在Redis中根据key获得的value值为空时,先锁上,然后从数据库加载,加载完毕,释放锁。若其他线程也在请求该key时,发现获取锁失败,则睡眠一段时间(比如100ms)后重试。
④、逻辑过期
1):在设置key的时候,设置一个过期时间字段一块存入缓存中,不给当前key设置过期时间
2):当查询的时候,从redis取出数据后判断时间是否过期
3):如果过期则开通另外一个线程进行数据同步,当前线程正常返回数据,这个数据不是最新
缓存雪崩:
定义概念:
缓存雪崩意思是设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB 瞬时压力过重雪崩。与缓存击穿的区别:雪崩是很多key,击穿是某一个key缓存。
解决方案:
主要是可以将缓存失效时间分散开,比如可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
提前预热,设置合理的过期时间。
当然也可以利用Redis的集群提高服务的可用性。(哨兵模式、集群模式);给缓存业务添加降级限流策略(nginx或spring cloud gateway);给业务添加多级缓存
双写一致性
redis做为缓存,mysql的数据如何与redis进行同步呢? 要根据业务场景来回答
强一致性
我们采用的是redisson实现的读写锁,在读的时候添加共享锁,可以保证读读不互斥,读写互斥。当我们更新数据的时候,添加排他锁,它是读写,读读都互斥,这样就能保证在写数据的同时是不会让其他线程读数据的,避免了脏数据。这里面需要注意的是读方法和写方法上需要使用同一把锁才行。
排他锁是如何保证读写、读读互斥的呢?
排他锁底层使用也是setnx,保证了同时只能有一个线程操作锁住的方法
延时双删(被抛弃)
面试官:你听说过延时双删吗?为什么不用它呢?
候选人:延迟双删,如果是写操作,我们先把缓存中的数据删除,然后更新数据库,最后再延时删除缓存中的数据,其中这个延时多久不太好确定,在延时的过程中可能会出现脏数据,并不能保证强一致性,所以没有采用它。
数据工作的大致流程:
- 服务节点删除 redis 主库数据。
- 服务节点修改 mysql 主库数据。
- 服务节点使得当前业务处理
等待一段时间
,等 redis 和 mysql 主从节点数据同步成功。 - 服务节点从 redis 主库删除数据。
- 当前或其它服务节点读取 redis 从库数据,发现 redis 从库没有数据,从 mysql 从库读取数据,并写入 redis 主库。
数据同步允许延时
我们当时采用的阿里的canal组件实现数据同步:不需要更改业务代码,部署一个canal服务。canal服务把自己伪装成mysql的一个从节点,当mysql数据更新以后,canal会读取binlog数据,然后在通过canal的客户端获取到数据,更新缓存即可。
分布式锁
一人一单的秒杀Redisson
@Autowired
private RedissonClient redissonClient;
业务方法{
//···前面流程
Long userId = UserHolder.getUser().getId();
//创建锁对象
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId,stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁
boolean hasLock = lock.tryLock( );
if(!hasLock){
//获取锁失败: return fail 或者 retry 这里业务要求是返回失败
return Result.fail("请勿重复下单!");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//获得代理对象
return proxy.createVoucherOrder(voucherId);//默认是this,我们要实现事务需要proxy
} catch (IllegalStateException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
看门狗watchdog
tryLock(waitTime,leaseTime,TimeUnit)
waitTime:获取锁的等待时长,获取锁失败后等待waitTime再去获取锁
leaseTime: 锁自动失效时间,这里测试锁重试不需要用到
WatchDog—–超时释放
对抢锁过程进行监听,抢锁完毕后,scheduleExpirationRenewal(threadId) 方法会被调用来对锁的过期时间进行续约,在后台开启一个线程,进行续约逻辑,也就是看门狗线程。
// 续约逻辑
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {… }, 锁失效时间 / 3, TimeUnit.MILLISECONDS);Method(new TimerTask(){}, 参数2, 参数3)
通过参数2、参数3 去描述,什么时候做参数1 的事情。
锁的失效时间为 30s,10s 后这个 TimerTask 就会被触发,于是进行续约,将其续约为 30s;
若操作成功,则递归调用自己,重新设置一个 TimerTask 并且在 10s 后触发;循环往复,不停的续约。
tryLock的第一个参数是尝试时间,第二个参数是预计运行业务时间。如果有第二个参数,则不会触发看门狗机制。如果没有第二个参数,默认续30s,每10s续一次。
lock的参数是预计运行业务时间(也就是叫自动释放时间),有此参数则不会触发看门狗机制,没有则会触发。还是直接用tryLock吧。
lock() 一直等锁释放;tryLock() 获取到锁返回true,获取不到锁并直接返回false。
private void redissonDoc() throws InterruptedException {
//1. 普通的可重入锁
RLock lock = redissonClient.getLock("generalLock");
// 拿锁失败时会不停的重试
// 具有Watch Dog 自动延期机制 默认续30s 每隔30/3=10 秒续到30s
lock.lock();
// 尝试拿锁10s后停止重试,返回false
// 具有Watch Dog 自动延期机制 默认续30s
boolean res1 = lock.tryLock(10, TimeUnit.SECONDS);
// 拿锁失败时会不停的重试
// 没有Watch Dog ,10s后自动释放
lock.lock(10, TimeUnit.SECONDS);
// 尝试拿锁100s后停止重试,返回false
// 没有Watch Dog ,10s后自动释放
boolean res2 = lock.tryLock(100, 10, TimeUnit.SECONDS);
//2. 公平锁 保证 Redisson 客户端线程将以其请求的顺序获得锁
RLock fairLock = redissonClient.getFairLock("fairLock");
//3. 读写锁 没错与JDK中ReentrantLock的读写锁效果一样
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
readWriteLock.readLock().lock();
readWriteLock.writeLock().lock();
}
Redisson实现的分布式锁。可重入。这样做是为了避免死锁的产生,判断是否是当前线程持有的锁。如果是当前线程,就会计数。释放锁就会减一。在存储此数据的时候用的hash.大key是业务,小key是当前线程的唯一标识(比如线程Id),value是当前重入的次数。
redisson实现的分布式锁不能解决主从一致问题。用redisson提供的红锁(锁一半以上从节点)效率太低了。如果要保证强一致性,建议使用zookeeper实现的分布式锁。
消息队列
其他功能
点赞-用set。 每一篇博客都多少用户点赞(显然一个用户只能点赞一次)
stringRedisTemplate.opsForSet().add(key,userId.toString()); //key是博客
排行榜用zset-因为要排序,但是真实的redis命令是zadd key score value。Java封装的时候颠倒了一下
// zadd key value score
stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
//zrange key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
共同关注,用set,直接取交集
@Override
public Result followCommons(Long followUserId) {
//1.先获取当前用户
Long userId = UserHolder.getUser().getId();
String followKey1 = "follows:" + userId;
String followKey2 = "follows:" + followUserId;
//2.求交集
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(followKey1, followKey2);
if(intersect==null||intersect.isEmpty()){
return Result.ok(Collections.emptyList());
}
//3.解析出id数组
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
//4.根据ids查询用户数组 List<User> ---> List<UserDTO>
List<UserDTO> userDTOS = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(userDTOS);
}
滚动分页,基本也是根据时间或热度,所以有排序,zset
下面正常的只有两个参数,就是range,分数范围在x和xx之间的
//返回所有符合条件 1 < score <= 5 的成员
ZRANGEBYSCORE zset (1 5
//返回所有符合条件 5 < score < 10 的成员
ZRANGEBYSCORE zset (5 (10
下面这种,withscores是带上分数的,3就是分数。后面是从索引0开始最多一个数字,和sql一样。
"+inf"
或者 "-inf"
来表示记录中最大值和最小值。
ZREVRANGEBYSCORE key max min WITHSCORES LIMIT offset count //从大到小
ZRANGEBYSCORE就是从小到大了,min在前面,max在后面
redis> ZREVRANGEBYSCORE myzset +inf (2 WITHSCORES LIMIT 0 1
- “three”
- “3”
Redis最佳实践
一、BigKey
Redis的Key虽然可以自定义,但最好遵循下面的几个最佳实践约定:
- 遵循基本格式:[业务名称]:[数据名]:[id]
- 长度不超过44字节
- 不包含特殊字符
例如:我们的登录业务,保存用户信息,其key可以设计成如下格式:
local : user :10
这样设计的好处:
- 可读性强
- 避免key冲突
- 方便管理
- 更节省内存: key是string类型,底层编码包含int、embstr和raw三种。embstr在小于44字节使用,采用连续内存空间,内存占用更小。当字节数大于44字节时,会转为raw模式存储,在raw模式下,内存空间不是连续的,而是采用一个指针指向了另外一段内存空间,在这段空间里存储SDS内容,这样空间不连续,访问的时候性能也就会收到影响,还有可能产生内存碎片
BigKey通常以Key的大小和Key中成员的数量来综合判定,例如:
- Key本身的数据量过大:一个String类型的Key,它的值为5 MB
- Key中的成员数过多:一个ZSET类型的Key,它的成员数量为10,000个
- Key中成员的数据量过大:一个Hash类型的Key,它的成员数量虽然只有1,000个但这些成员的Value(值)总大小为100 MB
那么如何判断元素的大小呢?redis也给我们提供了命令
memory usage name 不过这种太耗费CPU了
所以我们用长度或列表大小来估算 strlen name llen list2
推荐值:
- 单个key的value小于10KB
- 对于集合类型的key,建议元素数量小于1000
bigKey一般是怎么产生的?
- 程序设计不当,比如直接使用 String 类型存储较大的文件对应的二进制数据。
- 对于业务的数据规模考虑不周到,比如使用集合类型的时候没有考虑到数据量的快速增长。
- 未及时清理垃圾数据,比如哈希中冗余了大量的无用键值对。
BigKey的危害
- 网络阻塞
- 对BigKey执行读请求时,少量的QPS就可能导致带宽使用率被占满,导致Redis实例,乃至所在物理机变慢
- 数据倾斜
- BigKey所在的Redis实例内存使用率远超其他实例,无法使数据分片的内存资源达到均衡
- Redis阻塞
- 对元素较多的hash、list、zset等做运算会耗时较旧,使主线程被阻塞
- CPU压力
- 对BigKey的数据序列化和反序列化会导致CPU的使用率飙升,影响Redis实例和本机其它应用
如何发现BigKey
①redis-cli –bigkeys
利用redis-cli提供的–bigkeys参数,可以遍历分析所有key,并返回Key的整体统计信息与每个数据的Top1的big key
命令:redis-cli -a 密码 --bigkeys
②scan扫描
自己编程,利用scan扫描Redis中的所有key,利用strlen、hlen等命令判断key的长度(此处不建议使用MEMORY USAGE)
③第三方工具
- 利用第三方工具,如 Redis-Rdb-Tools 分析RDB快照文件,全面分析内存使用情况
- https://github.com/sripathikrishnan/redis-rdb-tools
④网络监控
- 自定义工具,监控进出Redis的网络数据,超出预警值时主动告警
- 一般阿里云搭建的云服务器就有相关监控页面
如何删除BigKey
BigKey内存占用较多,即便时删除这样的key也需要耗费很长时间,导致Redis主线程阻塞,引发一系列问题。
redis 3.0 及以下版本
- 如果是集合类型,则遍历BigKey的元素,先逐个删除子元素,最后删除BigKey
Redis 4.0以后
- Redis在4.0后提供了异步删除的命令:unlink
除了删除以外,也有其他的处理及优化方法:
分割 bigkey:将一个 bigkey 分割为多个小 key。例如,将一个含有上万字段数量的 Hash 按照一定策略(比如二次哈希)拆分为多个 Hash。
采用合适的数据结构:例如,文件二进制数据不使用 String 保存、使用 HyperLogLog 统计页面 UV、Bitmap 保存状态信息(0/1)。
开启 lazy-free(惰性删除/延迟释放) :lazy-free 特性是 Redis 4.0 开始引入的,指的是让 Redis 采用异步方式延迟释放 key 使用的内存,将该操作交给单独的子线程处理,避免阻塞主线程。
二、恰当的数据类型
例1:比如存储一个User对象,我们有三种存储方式:
①方式一:json字符串
user:1 | {“name”: “Jack”, “age”: 21} |
---|
优点:实现简单粗暴
缺点:数据耦合,不够灵活
②方式二:字段打散
user:1:name | Jack |
---|---|
user:1:age | 21 |
优点:可以灵活访问对象任意字段
缺点:占用空间大、没办法做统一控制
③方式三:hash(推荐)
user:1 | name | jack |
age | 21 |
优点:底层使用ziplist,空间占用小,可以灵活访问对象的任意字段
缺点:代码相对复杂
例2:假如有hash类型的key,其中有100万对field和value,field是自增id,这个key存在什么问题?如何优化?
key | field | value |
someKey | id:0 | value0 |
..... | ..... | |
id:999999 | value999999 |
存在的问题:
- hash的entry数量超过500时,会使用哈希表而不是ZipList,内存占用较多
- 62.23M
- 可以通过hash-max-ziplist-entries配置entry上限。但是如果entry过多就会导致BigKey问题
方案一–77.54M
拆分为string类型
key | value |
id:0 | value0 |
..... | ..... |
id:999999 | value999999 |
存在的问题:
string结构底层没有太多内存优化,内存占用较多
想要批量获取这些数据比较麻烦
方案二-24.46M
拆分为小的hash,将 id / 100 作为key, 将id % 100 作为field,这样每100个元素为一个Hash
key | field | value |
key:0 | id:00 | value0 |
..... | ..... | |
id:99 | value99 | |
key:1 | id:00 | value100 |
..... | ..... | |
id:99 | value199 | |
.... | ||
key:9999 | id:00 | value999900 |
..... | ..... | |
id:99 | value999999 |
三、批处理优化
Redis提供了一些原生的批处理指令,不过每条指令只能处理特定的数据结构,例如mset,hmset
如果有对复杂数据类型的批处理需要,建议使用Pipeline
@Test
void testPipeline() {
// 创建管道
Pipeline pipeline = jedis.pipelined();
long b = System.currentTimeMillis();
for (int i = 1; i <= 100000; i++) {
// 放入命令到管道
pipeline.set("test:key_" + i, "value_" + i);
if (i % 1000 == 0) {
// 每放入1000条命令,批量执行
pipeline.sync();
}
}
long e = System.currentTimeMillis();
System.out.println("time: " + (e - b));
}
针对于集群下的批处理
Spring集群环境下批处理代码,实现的是性能最好的并行slot
@Test
void testMSetInCluster() {
Map<String, String> map = new HashMap<>(3);
map.put("name", "Rose");
map.put("age", "21");
map.put("sex", "Female");
stringRedisTemplate.opsForValue().multiSet(map);
List<String> strings = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("name", "age", "sex"));
strings.forEach(System.out::println);
}
Redis为什么那么快
- Redis是纯内存操作,执行速度非常快
- 采用单线程,避免不必要的上下文切换可竞争条件,多线程还要考虑线程安全问题
- 使用I/O多路复用模型,非阻塞IO
- 渐进式ReHash
- 缓存时间戳
- Redis内部的数据类型/结构实现都是经过优化的,性能非常高。
其中,渐进式Rehash用到了两张全局哈希表,缓存时间戳就是,不用System.currentTimeInMillis(因为要调用系统级别的IO,很耗时,单线程的Redis承受不起),而是对时间进行缓存,由一个定时任务,每毫秒更新一次时间缓存,获取时间都是从缓存中直接拿。
能解释一下I/O多路复用模型?
Redis是纯内存操作,执行速度非常快,它的性能瓶颈是网络延迟而不是执行速度, I/O多路复用模型主要就是实现了高效的网络请求
IO多路复用监听的是socket集合,而普通的阻塞IO和非阻塞IO都是只监听一个socket.阻塞IO就是在第一步等待,非阻塞就是不停重试,非阻塞IO感觉好笨。
候选人:嗯~~,I/O多路复用是指利用单个线程来同时监听多个Socket,并在某个Socket可读、可写时得到通知,从而避免无效的等待,充分利用CPU资源。目前的I/O多路复用都是采用的epoll模式实现,它会在通知用户进程Socket就绪的同时,把已就绪的Socket写入用户空间,不需要挨个遍历Socket来判断是否就绪,提升了性能。
其中Redis的网络模型就是使用I/O多路复用结合事件的处理器来应对多个Socket请求,比如,提供了连接应答处理器、命令回复处理器,命令请求处理器;
在Redis6.0之后,为了提升更好的性能,在命令回复处理器使用了多线程来处理回复事件,在命令请求处理器中,将命令的转换使用了多线程,增加命令转换速度,在命令执行的时候,依然是单线程
StringRedisTemplate 与 RedisTemplate的区别
前者继承后者,一般用前者。
最大不同在于序列化器不同。所以不能混用。如在RDM客户端中,redisTemplate存储的东西为乱码,而stringRedisTemplate存储的东西为可见的字符串。
StringRedisTemplate的常用方法
https://blog.csdn.net/weixin_43835717/article/details/92802040
持久化
RDB和AOF
Redis 提供了两个命令来生成 RDB 快照文件:
save
: 同步保存操作,会阻塞 Redis 主线程;bgsave
: fork 出一个子进程,子进程执行,不会阻塞 Redis 主线程,默认选项。
6.JUC
线程基础知识
多个线程共享进程的堆和方法区,但每个线程有自己的程序计数器、虚拟机栈和本地方法栈。
程序计数器私有主要是为了线程切换后能恢复到正确的执行位置
为了保证线程中的局部变量不被别的线程访问到,虚拟机栈和本地方法栈是线程私有的
产生死锁的四个必要条件
互斥条件:该资源任意一个时刻只由一个线程占用。
请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
不剥夺条件:线程已获得的资源在未使用完之前不能被其他线程强行剥夺,只有自己使用完毕后才释放资源。
循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系。
synchronized等关键字
如何使用synchronized
synchronized
关键字的使用方式主要有下面 3 种:
- 修饰实例方法
- 修饰静态方法
- 修饰代码块
1、修饰实例方法 (锁当前对象实例)
给当前对象实例加锁,进入同步代码前要获得 当前对象实例的锁 .
synchronized void method() {
//业务代码
}
2、修饰静态方法 (锁当前类)
给当前类加锁,会作用于类的所有对象实例 ,进入同步代码前要获得 当前 class 的锁。
这是因为静态成员不属于任何一个实例对象,归整个类所有,不依赖于类的特定实例,被类的所有实例共享。
synchronized static void method() {
//业务代码
}
静态 synchronized
方法和非静态 synchronized
方法之间的调用互斥么?不互斥!如果一个线程 A 调用一个实例对象的非静态 synchronized
方法,而线程 B 需要调用这个实例对象所属类的静态 synchronized
方法,是允许的,不会发生互斥现象,因为访问静态 synchronized
方法占用的锁是当前类的锁,而访问非静态 synchronized
方法占用的锁是当前实例对象锁。
3、修饰代码块 (锁指定对象/类)
对括号里指定的对象/类加锁:
synchronized(object)
表示进入同步代码库前要获得 给定对象的锁。synchronized(类.class)
表示进入同步代码前要获得 给定 Class 的锁
synchronized(this) {
//业务代码
}
总结:
synchronized
关键字加到static
静态方法和synchronized(class)
代码块上都是是给 Class 类上锁;synchronized
关键字加到实例方法上是给对象实例上锁;- 尽量不要使用
synchronized(String a)
因为 JVM 中,字符串常量池具有缓存功能。
本质都是对对象监视器 monitor 的获取,monitor
另一个名字叫做管程
在执行monitorenter
时,会尝试获取对象的锁,如果锁的计数器为 0 则表示锁可以被获取,获取后将锁计数器设为 1 也就是加 1。
对象锁的的拥有者线程才可以执行 monitorexit
指令来释放锁。在执行 monitorexit
指令后,将锁计数器设为 0,表明锁被释放,其他线程可以尝试获取锁。
如果获取对象锁失败,那当前线程就要阻塞等待,直到锁被另外一个线程释放为止。
JDK6优化后synchronized锁的分类
级别从低到高依次是:
- 无锁状态
- 偏向锁状态
- 轻量级锁状态
- 重量级锁状态
锁可以升级,但不能降级。即:无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁是单向的。
Java中的synchronized有偏向锁、轻量级锁、重量级锁三种形式,分别对应了锁只被一个线程持有、不同线程交替持有锁、多线程竞争锁三种情况。
一旦锁发生了竞争,都会升级为重量级锁。 锁只能升级,不能降级。
wait vs sleep
共同点
- wait() ,wait(long) 和 sleep(long) 的效果都是让当前线程暂时放弃 CPU 的使用权,进入阻塞状态
不同点
方法归属不同
- sleep(long) 是 Thread 的静态方法
- 而 wait(),wait(long) 都是 Object 的成员方法,每个对象都有
醒来时机不同
- 执行 sleep(long) 和 wait(long) 的线程都会在等待相应毫秒后醒来
- wait(long) 和 wait() 还可以被 notify 唤醒,wait() 如果不唤醒就一直等下去
- 它们都可以被打断唤醒
锁特性不同(重点)
- wait 方法的调用必须先获取 wait 对象的锁,而 sleep 则无此限制
- wait 方法执行后会释放对象锁,允许其它线程获得该对象锁(我放弃 cpu,但你们还可以用)
- 而 sleep 如果在 synchronized 代码块中执行,并不会释放对象锁(我放弃 cpu,你们也用不了)
备注:
没有获得对象锁,是不能调用wait()的。 Lock.wait()是不对的。要synchronized(Lock){Lock.wait();}
lock vs synchronized
相同点
两者都是可重入锁。
可重入锁 也叫递归锁,指的是线程可以再次获取自己的内部锁。
JDK 提供的所有现成的 Lock
实现类,包括 synchronized
关键字锁都是可重入的。
在下面的代码中,method1()
和 method2()
都被 synchronized
关键字修饰,method1()
调用了method2()
。
public class SynchronizedDemo {
public synchronized void method1() {
System.out.println("方法1");
method2();
}
public synchronized void method2() {
System.out.println("方法2");
}
}
由于 synchronized
锁是可重入的,同一个线程在调用method1()
时可以直接获得当前对象的锁,执行 method2()
的时候可以再次获取这个对象的锁,不会产生死锁问题。假如synchronized
是不可重入锁的话,由于该对象的锁已被当前线程所持有且无法释放,这就导致线程在执行 method2()
时获取锁失败,会出现死锁问题。
不同点
- 语法层面
- synchronized 是关键字,源码在 jvm 中,用 c++ 语言实现
- Lock 是接口,源码由 jdk 提供,用 java 语言实现
- 使用 synchronized 时,退出同步代码块锁会自动释放,而使用 Lock 时,需要手动调用 unlock 方法释放锁
- 功能层面
- 二者均属于悲观锁、都具备基本的互斥、同步、锁重入功能
- Lock 提供了许多 synchronized 不具备的功能,例如获取等待状态、公平锁、可打断、可超时、多条件变量
- Lock 有适合不同场景的实现,如 ReentrantLock, ReentrantReadWriteLock
- 性能层面
- 在没有竞争时,synchronized 做了很多优化,如偏向锁、轻量级锁,性能不赖
- 在竞争激烈时,Lock 的实现通常会提供更好的性能
JMM(Java内存模型)
Java内存模型(Java Memory Model)描述了Java程序中各种变量(线程共享变量)的访问规则,以及在JVM中将变量存储到内存和从内存中读取变量这样的底层细节。
Java内存模型,就是为了屏蔽系统和硬件的差异,让一套代码在不同平台下能到达相同的访问结果。
特点:
所有的共享变量都存储于主内存(计算机的RAM)这里所说的变量指的是实例变量和类变量。不包含局部变量,因为局部变量是线程私有的,因此不存在竞争问题。
每一个线程还存在自己的工作内存,线程的工作内存,保留了被线程使用的变量的工作副本。
线程对变量的所有的操作(读,写)都必须在工作内存中完成,而不能直接读写主内存中的变量,不同线程之间也不能直接访问对方工作内存中的变量,线程间变量的值的传递需要通过主内存完成。
volatile+原子性,可见性,有序性
要求
- 掌握线程安全要考虑的三个问题-可见性,有序性,原子性
- 掌握 volatile 能解决哪些问题-可以解决可见性和有序性,不能解决原子性
可见性:一个线程对共享变量修改,另一个线程能看到最新的结果
有序性:一个线程内代码按编写顺序执行
原子性:一个线程内多行代码以一个整体运行,期间不能有其他线程的代码插队
原子性
- 起因:多线程下,不同线程的指令发生了交错导致的共享变量的读写混乱
- 解决:用悲观锁或乐观锁解决,volatile 并不能解决原子性
可见性
- 起因:由于编译器优化、或缓存优化、或 CPU 指令重排序优化导致的对共享变量所做的修改另外的线程看不到
- 解决:用 volatile 修饰共享变量,能够防止编译器等优化发生,让一个线程对共享变量的修改对另一个线程可见
跳过本地内存,直接去主内存读取。
有序性
起因:由于编译器优化、或缓存优化、或 CPU 指令重排序优化导致指令的实际执行顺序与编写顺序不一致
解决:用 volatile 修饰共享变量会在读、写共享变量时加入不同的屏障,阻止其他读写操作越过屏障,从而达到阻止重排序的效果
注意:
内存屏障
- volatile 变量写加的屏障是阻止上方其它写操作越过屏障排到 volatile 变量写之下
- volatile 变量读加的屏障是阻止下方其它读操作越过屏障排到 volatile 变量读之上
- volatile 读写加入的屏障只能防止同一线程内的指令重排
CAS
乐观锁的代表是 AtomicInteger,使用 CAS(compare and swap)来保证原子性。AtomicInteger的底层是Unsafe。
- 其核心思想是【无需加锁,每次只有一个线程能成功修改共享变量,其它失败的线程不需要停止,不断重试直至成功】
- 由于线程一直运行,不需要阻塞,因此不涉及线程上下文切换
- 它需要多核 cpu 支持,且线程数不应超过 cpu 核数
LongAdder,分成多个值,最后再加在一起。
LongAddr中的cell数组,Cell就用了@Contended注解
这个注解是为了解决伪共享问题,解决缓存行同步带来的性能问题。
CPU缓存L1,是以缓存行为单位存储数据的,一般默认64字节。
@Contended注解,就是将一个缓存行后面填充7个没有意义的数据。
CAS避免了像悲观锁那种用户态和内核态的切换。
在JUC( java.util.concurrent )包下实现的很多类都用到了CAS操作
- AbstractQueuedSynchronizer(AQS框架)
- AtomicXXX类
一个当前内存值V、旧的预期值A、即将更新的值B,当且仅当旧的预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false。如果CAS操作失败,通过自旋的方式等待并再次尝试,直到成功
缺点:
开销大:在并发量比较高的情况下,如果反复尝试更新某个变量,却又一直更新不成功,会给CPU带来较大的压力
ABA问题:当变量从A修改为B再修改回A时,变量值等于期望值A,但是无法判断是否修改,CAS操作在ABA修改后依然成功。
不能保证代码块的原子性:CAS机制所保证的只是一个变量的原子性操作,而不能保证整个代码块的原子性。
ABA问题的解决:
AtomicStampedReference:版本号原子引用
AtomicStampedReference在构建的时候需要一个类似于版本号的int类型变量stamped,每一次针对共享数据的变化都会导致该 stamped 的变化(stamped 需要应用程序自身去负责,AtomicStampedReference并不提供,一般使用时间戳作为版本号)
public static void main(String[] args) {
AtomicStampedReference<String> r = new AtomicStampedReference<String>("a",1);
System.out.println("修改前版本号:"+r.getStamp()+" ,值: "+ r.getReference());
r.compareAndSet("a","b",1,2);
System.out.println("第一次修改后版本号:"+r.getStamp()+" ,值 "+ r.getReference());
r.compareAndSet("b","a",1,3);
System.out.println("第二次修改后版本号:"+r.getStamp()+" ,值 "+ r.getReference());
}
结果:
修改前版本号:1 ,值: a
第一次修改后版本号:2 ,值 b
第二次修改后版本号:2 ,值 b
AQS
概述
全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架,它是构建锁或者其他同步组件的基础框架
AQS与Synchronized的区别
synchronized | AQS |
---|---|
关键字,c++ 语言实现 | java 语言实现 |
悲观锁,自动释放锁 | 悲观锁,手动开启和关闭 |
锁竞争激烈都是重量级锁,性能差 | 锁竞争激烈的情况下,提供了多种解决方案 |
AQS常见的实现类
- ReentrantLock 阻塞式锁
- Semaphore 信号量
- CountDownLatch 倒计时锁
- CyclicBarrier 循环栅栏
工作机制
- 在AQS中维护了一个使用了volatile修饰的state属性来表示资源的状态,0表示无锁,1表示有锁
- 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
- 线程0来了以后,去尝试修改state属性,如果发现state属性是0,就修改state状态为1,表示线程0抢锁成功
- 线程1和线程2也会先尝试修改state属性,发现state的值已经是1了,有其他线程持有锁,它们都会到FIFO队列中进行等待,
- FIFO是一个双向队列,head属性表示头结点,tail表示尾结点
如果多个线程共同去抢这个资源是如何保证原子性的呢?
在去修改state状态的时候,使用的cas自旋锁来保证原子性,确保只能有一个线程修改成功,修改失败的线程将会进入FIFO队列中等待
AQS是公平锁吗,还是非公平锁?
新的线程与队列中的线程共同来抢资源,是非公平锁
新的线程到队列中等待,只让队列中的head线程获取锁,是公平锁
比较典型的AQS实现类ReentrantLock,它默认就是非公平锁,新的线程与队列中的线程共同来抢资源
ReentrantLock
ReentrantLock lock=new ReentrantLock();
lock.lock();
try{
//任务
}finally {
lock.unlock();
}
ReentrantLock的lock方法
ReentrantLock
里面有一个内部类 Sync
,Sync
继承 AQS(AbstractQueuedSynchronizer
),添加锁和释放锁的大部分操作实际上都是在 Sync
中实现的。Sync
有公平锁 FairSync
和非公平锁 NonfairSync
两个子类。
ReentrantLock
默认使用非公平锁,也可以通过构造器来显式的指定使用公平锁。
// 传入一个 boolean 值,true 时为公平锁,false 时为非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
从上面的内容可以看出, ReentrantLock
的底层就是由 AQS 来实现的。
AQS(AbstractQueuedSynchronizer)内部维护着一个队列(或者说是双向链表)
AQS内部三个变量,head,tail,state. 线程用CAS的方法让state从0变为1,这样就是获得锁了。
没有拿到锁的线程就进入AQS的队列了。
对于NonfairSync来说
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
对于FairSync来说
final void lock() {
acquire(1);
}
tryAcquire方法:tryAcquire方法是AQS提供的,内部没有任何实现,需要继承AQS的类自己去实现逻辑代码。
查看到tryAcquire在ReentrantLock中提供了两种实现:公平锁、非公平锁。
addWaiter方法:在线程没有通过tryAcquire拿到锁资源时,需要将当前线程封装为Node对象,去AQS内部排队。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquiredQueued方法:查看当前线程是否排在队伍前面,如果是,则取获取锁;如果长时间没拿到锁,则需要将当前线程挂起。
public void unlock() {
sync.release(1);
}
unlock释放锁操作不分为公平和非公平,都是执行sync的release方法
释放锁的核心,就是将state从大于0的数更改为0即为释放锁成功
并且unlock方法应该会涉及到将AQS队列中阻塞的线程进行唤醒,阻塞用的方法是park方法,唤醒必然是unpark方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantReadWriteLock
ReentrantReadWriteLock 实现了 ReadWriteLock
,是一个可重入的读写锁,既可以保证多个线程同时读的效率,同时又可以保证有写入操作时的线程安全。适用于读多写少的情况。
ReentrantReadWriteLock
其实是两把锁,一把是 WriteLock
(写锁),一把是 ReadLock
(读锁) 。读锁是共享锁,写锁是独占锁。读锁可以被同时读,可以同时被多个线程持有,而写锁最多只能同时被一个线程持有。
- 共享锁:一把锁可以被多个线程同时获得。
- 独占锁:一把锁只能被一个线程获得。
实现原理:state的高16位是读锁,低16位是写锁。可重入锁。写锁对state加一就好,读锁要用ThreadLocal来记录一下每个线程锁重入的次数。
StampedLock
StampedLock
不是直接实现 Lock
或 ReadWriteLock
接口,而是基于 CLH 锁 实现的(AQS 也是基于这玩意),CLH 锁是对自旋锁的一种改良,是一种隐式的链表队列。StampedLock
通过 CLH 队列进行线程的管理,通过同步状态值 state
来表示锁的状态和类型。
StampedLock
的原理和 AQS 原理比较类似。
StampedLock
提供了三种模式的读写控制模式:读锁、写锁和乐观读。
- 写锁:独占锁,一把锁只能被一个线程获得。当一个线程获取写锁后,其他请求读锁和写锁的线程必须等待。类似于
ReentrantReadWriteLock
的写锁,不过这里的写锁是不可重入的。 - 读锁 (悲观读):共享锁,没有线程获取写锁的情况下,多个线程可以同时持有读锁。如果己经有线程持有写锁,则其他线程请求获取该读锁会被阻塞。类似于
ReentrantReadWriteLock
的读锁,不过这里的读锁是不可重入的。 - 乐观读:允许多个线程获取乐观读以及读锁。同时允许一个写线程获取写锁。
StampedLock
在获取锁的时候会返回一个 long 型的数据戳,该数据戳用于稍后的锁释放参数,如果返回的数据戳为 0 则表示锁获取失败。当前线程持有了锁再次获取锁还是会返回一个新的数据戳,这也是StampedLock
不可重入的原因。
相比于传统读写锁多出来的乐观读是StampedLock
比 ReadWriteLock
性能更好的关键原因。StampedLock
的乐观读允许一个写线程获取写锁,所以不会导致所有写线程阻塞,也就是当读多写少的时候,写线程有机会获取写锁,减少了线程饥饿的问题,吞吐量大大提高。
和 ReentrantReadWriteLock
一样,StampedLock
同样适合读多写少的业务场景,可以作为 ReentrantReadWriteLock
的替代品,性能更好。
不过,需要注意的是StampedLock
不可重入,不支持条件变量 Conditon
,对中断操作支持也不友好(使用不当容易导致 CPU 飙升)。如果你需要用到 ReentrantLock
的一些高级性能,就不太建议使用 StampedLock
了。
另外,StampedLock
性能虽好,但使用起来相对比较麻烦,一旦使用不当,就会出现生产问题。
AtomicInteger原子类
其原子性操作的实现是基于CAS.
多线程下原子的加减
import java.util.concurrent.atomic.AtomicInteger;
AtomicInteger counter = new AtomicInteger(0);
int result = counter.incrementAndGet(); // 原子递增
result = counter.addAndGet(5); // 原子加5
result = counter.updateAndGet(x -> x * 2); // 使用自定义函数原子更新
LongAdder
为什么 LongAdder 在高并发环境下更具优势?
减少竞争: 在高并发情况下,使用传统的AtomicInteger或AtomicLong时,多个线程可能会争夺同一个原子变量的更新,导致性能瓶颈。而LongAdder采用了一种分段的思想,将一个变量分成多个小的段,每个线程只更新其中一个段,最后再将这些段的值相加。这样可以大大减少线程之间的竞争,提高了性能。
懒惰初始化: LongAdder在初始化时不会分配一个连续的数组,而是在需要的时候再进行懒惰初始化。这减少了初始开销,提高了并发更新时的性能。
分段累加: LongAdder内部使用Cell数组,每个Cell代表一个独立的段,线程更新时通过hash定位到不同的Cell,实现分段累加。这种方式在高并发情况下避免了对同一个变量的竞争,降低了锁的粒度,提高了吞吐量
使用 LongAdder 解决传统 Atomic 类型的性能瓶颈问题:
import java.util.concurrent.atomic.LongAdder;
public class HighConcurrencyCounter {
private LongAdder counter = new LongAdder();
public void increment() {
counter.increment();
}
public long getCount() {
return counter.sum();
}
public static void main(String[] args) {
HighConcurrencyCounter highConcurrencyCounter = new HighConcurrencyCounter();
// 多线程同时递增计数器
Runnable incrementTask = () -> {
for (int i = 0; i < 1000; i++) {
highConcurrencyCounter.increment();
}
};
// 创建多个线程执行递增任务
Thread thread1 = new Thread(incrementTask);
Thread thread2 = new Thread(incrementTask);
// 启动线程
thread1.start();
thread2.start();
// 等待线程执行完成
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 输出计数器的最终值
System.out.println("Final Count: " + highConcurrencyCounter.getCount());
}
}
ConcurrentHashMap
采用 CAS + Synchronized来保证并发安全进行实现
- CAS控制数组节点的添加 (在判断数组中当前位置为null的时候,使用CAS来把这个新的Node写入数组中对应的位置)
- synchronized只锁定当前链表或红黑二叉树的首节点,只要hash不冲突,就不会产生并发的问题 , 效率得到提升
concurrentHashMap在JDK1.8做的优化
引入红黑树。数组+链表。链表足够长会变为红黑树。
在没有hash冲突时,CAS锁头节点去put。
在出现hash冲突时,synchronized。
扩容的时候会有协助扩容。
计数器用addCount,与LongAdder极其相似。线程A对1位置++,线程B对2位置++,最后汇总。
弱一致性。A写B查,但是B确实不一定能查到,不保证这种复合情况下的强一致性。
ThreadLocal
要求
- 掌握 ThreadLocal 的作用与原理
- 掌握 ThreadLocal 的内存释放时机
作用
- ThreadLocal 可以实现【资源对象】的线程隔离,让每个线程各用各的【资源对象】,避免争用引发的线程安全问题
- ThreadLocal 同时实现了线程内的资源共享
原理
每个线程内有一个 ThreadLocalMap 类型的成员变量,用来存储资源对象
- 调用 set 方法,就是以 ThreadLocal 自己作为 key,资源对象作为 value,放入当前线程的 ThreadLocalMap 集合中
- 调用 get 方法,就是以 ThreadLocal 自己作为 key,到当前线程中查找关联的资源值
- 调用 remove 方法,就是以 ThreadLocal 自己作为 key,移除当前线程关联的资源值
ThreadLocalMap 的一些特点
- key 的 hash 值统一分配
- 初始容量 16,扩容因子 2/3,扩容容量翻倍
- key 索引冲突后用开放寻址法解决冲突
弱引用 key
ThreadLocalMap 中的 key 被设计为弱引用,原因如下
- Thread 可能需要长时间运行(如线程池中的线程),如果 key 不再使用,需要在内存不足(GC)时释放其占用的内存
内存释放时机
- 被动 GC 释放 key
- 仅是让 key 的内存释放,关联 value 的内存并不会释放
- 懒惰被动释放 value
- get key 时,发现是 null key,则释放其 value 内存
- set key 时,会使用启发式扫描,清除临近的 null key 的 value 内存,启发次数与元素个数,是否发现 null key 有关
- 主动 remove 释放 key,value
- 会同时释放 key,value 的内存,也会清除临近的 null key 的 value 内存
- 推荐使用它,因为一般使用 ThreadLocal 时都把它作为静态变量(即强引用),因此无法被动依靠 GC 回收
线程池
线程状态:
线程池的核心参数:
keepAliveTime是没有任务以后救急线程会生存多久。
当核心线程和阻塞队列满了之后,才会创造救急线程。
比如1和2任务在被核心线程被执行,3和4任务在阻塞队列中,5任务来了。生成救急线程,执行5任务,然后再执行3任务和4任务。
代码实现:
这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
CopyOnWrite容器
ArrayList不是并发安全的,因为他的add方法没有加上synchronized也没有Lock。
CopyOnWrite容器(简称COW容器)即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
CopyOnWrite并发容器用于读多写少的并发场景。比如:白名单,黑名单。假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单一定周期才会更新一次。
缺点:
- 内存占用问题。写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存。通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。
- 数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。
同步容器可以简单地理解为通过synchronized来实现同步的容器。同步容器会导致多个线程中对容器方法调用的串行执行,降低并发性,因为它们都是以容器自身对象为锁。在并发下进行迭代的读和写时并不是线程安全的。如:Vector、Stack、HashTable、Collections类的静态工厂方法创建的类(如Collections.synchronizedList)
并发容器是针对多个线程并发访问而设计的,在jdk5.0引入了concurrent包,其中提供了很多并发容器,如ConcurrentHashMap、CopyOnWriteArrayList等。
分布式事务解决方案
分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免。
操作数据在不同的数据库里面,保证不同数据库里面数据一致性
主流的解决方案如下:一般用方法3和方法5
1.XA两段提交(低效率)-分布式事务解决方案
2.TCC三段提交(2段,高效率[不推荐(补偿代码)])
3.本地消息(MQ+Table)
4.消息事务+最终一致性(RocketMQ[alibaba])
5.Seata(alibaba)
基于XA协议的两阶段提交(2PC)
X/Open 组织(即现在的 Open Group )定义了分布式事务处理模型
XA协议:XA是一个分布式事务协议。XA中大致分为两部分:事务管理器和本地资源管理器。其中本地资源管理器往往由数据库实现,比如Oracle、DB2这些商业数据库都实现了XA接口(MySQL从5.5版本开始支持,SQL Server 2005 开始支持,Oracle 7 开始支持。),而事务管理器作为全局的调度者,负责各个本地资源的提交和回滚。
概念
二阶段提交2PC(Two phase Commit)是指,在分布式系统里,为了保证所有节点在进行事务提交时保持一致性的一种算法。
背景
在分布式系统里,每个节点都可以知晓自己操作的成功或者失败,却无法知道其他节点操作的成功或失败。
当一个事务跨多个节点时,为了保持事务的原子性与一致性,需要引入一个协调者(Coordinator)来统一掌控所有参与者(Participant)的操作结果,并指示它们是否要把操作结果进行真正的提交(commit)或者回滚(rollback)。
思路
2PC顾名思义分为两个阶段,其实施思路可概括为:
(1)投票阶段(voting phase):参与者将操作结果通知协调者;
(2)提交阶段(commit phase):收到参与者的通知后,协调者再向参与者发出通知,根据反馈情况决定各参与者是否要提交还是回滚;
举例
甲乙丙丁四人要组织一个会议,需要确定会议时间,不妨设甲是协调者,乙丙丁是参与者。
投票阶段:
(1)甲发邮件给乙丙丁,周二十点开会是否有时间;
(2)丁回复有时间;
(3)乙回复有时间;
(4)丙迟迟不回复,此时对于这个活动,甲乙丁均处于阻塞状态,算法无法继续进行;
(5)丙回复有时间(或者没有时间);
提交阶段:
(1)协调者甲将收集到的结果反馈给乙丙丁(什么时候反馈,以及反馈结果如何,在此例中取决与丙的时间与决定);
(2)乙收到;
(3)丙收到;
(4)丁收到;
实际应用交互流程
1、2PC两阶段提交的正向流程
第一阶段:
2PC中包含着两个角色:事务协调者和事务参与者。让我们来看一看他们之间的交互流程:
在分布式事务的第一阶段,作为事务协调者的节点会首先向所有的参与者节点发送Prepare请求。
在接到Prepare请求之后,每一个参与者节点会各自执行与事务有关的数据更新,写入Undo Log和Redo Log。如果参与者执行成功,暂时不提交事务,而是向事务协调节点返回”完成”消息。
当事务协调者接到了所有参与者的返回消息,整个分布式事务将会进入第二阶段。
第二阶段:
在2PC分布式事务的第二阶段,如果事务协调节点在之前所收到都是正向返回,那么它将会向所有事务参与者发出Commit请求。
接到Commit请求之后,事务参与者节点会各自进行本地的事务提交,并释放锁资源。当本地事务完成提交后,将会向事务协调者返回”完成”消息。
当事务协调者接收到所有事务参与者的”完成”反馈,整个分布式事务完成。
2、失败情况的处理流程
第一阶段:
第二阶段:
在2PC的第一阶段,如果某个事务参与者反馈失败消息,说明该节点的本地事务执行不成功,必须回滚。
于是在第二阶段,事务协调节点向所有的事务参与者发送Abort(中止)请求。接收到Abort请求之后,各个事务参与者节点需要在本地进行事务的回滚操作,回滚操作依照Undo Log来进行。
以上就是2PC两阶段提交协议的详细过程。
总结
2PC两阶段提交究竟有哪些不足呢?
- 性能问题
2PC遵循强一致性。在事务执行过程中,各个节点占用着数据库资源,只有当所有节点准备完毕,事务协调者才会通知提交,参与者提交后释放资源。这样的过程有着非常明显的性能问题。
- 协调者单点故障问题
2PC模型的核心,一旦事务协调者节点挂掉,参与者收不到提交或是回滚通知,参与者会一直处于中间状态无法完成事务。
- 丢失消息导致的不一致问题。
第二个阶段,如果发生局部网络问题,一部分事务参与者收到了提交消息,另一部分事务参与者没收到提交消息,那么就导致了节点之间数据的不一致。
总的来说,XA协议比较简单,而且一旦商业数据库实现了XA协议,使用分布式事务的成本也比较低。但是,XA也有致命的缺点,那就是性能不理想,特别是在交易下单链路,往往并发量很高,XA无法满足高并发场景。
两阶段提交涉及多次节点间的网络通信,通信时间太长!
事务时间相对于变长了,锁定的资源的时间也变长了,造成资源等待时间也增加好多。
另外:XA目前在商业数据库支持的比较理想,在mysql数据库中支持的不太理想,mysql的XA实现,没有记录prepare阶段日志,主备切换会导致主库与备库数据不一致。许多nosql也没有支持XA,这让XA的应用场景变得非常狭隘。
缺陷:算法执行过程中,所有节点都处于阻塞状态,所有节点所持有的资源(例如数据库数据,本地文件等)都处于封锁状态。存在事务管理器的单点故障问题。
对应的开源框架:atomikos
总的来说,2PC是一种比较保守的算法,效率很低。
低并发强一致性
代码补偿事务(TCC)
TCC的作用主要是解决跨服务调用场景下的分布式事务问题
是一种编程式分布式事务解决方案。
TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。TCC模式要求从服务提供三个接口:Try、Confirm、Cancel。
- Try:主要是对业务系统做检测及资源预留
- Confirm:真正执行业务,不作任何业务检查;只使用Try阶段预留的业务资源;Confirm操作满足幂等性。
- Cancel:释放Try阶段预留的业务资源;Cancel操作满足幂等性。
场景案例
以航班预定的案例,来介绍TCC要解决的事务场景。这里虚构一个场景,把自己当做航班预定的主人公,来介绍这个案例。从合肥 –> 昆明 –> 大理。
准备从合肥出发,到云南大理去游玩,然后使用美团App(机票代理商)来订机票。发现没有从合肥直达大理的航班,需要到昆明进行中转。如下图:
从图中我们可以看出来,从合肥到昆明乘坐的是四川航空,从昆明到大理乘坐的是东方航空。
由于使用的是美团App预定,当我选择了这种航班预定方案后,美团App要去四川航空和东方航空各帮我购买一张票。如下图:
考虑最简单的情况:美团先去川航帮我买票,如果买不到,那么东航也没必要买了。如果川航购买成功,再去东航购买另一张票。
现在问题来了:假设美团先从川航成功买到了票,然后去东航买票的时候,因为天气问题,东航航班被取消了。那么此时,美团必须取消川航的票,因为只有一张票是没用的,不取消就是浪费我的钱。那么如果取消会怎样呢?如果读者有取消机票经历的话,非正常退票,肯定要扣手续费的。在这里,川航本来已经购买成功,现在因为东航的原因要退川航的票,川航应该是要扣代理商的钱的。
那么美团就要保证,如果任一航班购买失败,都不能扣钱,怎么做呢?
两个航空公司都为美团提供以下3个接口:机票预留接口、确认接口、取消接口。美团App分2个阶段进行调用,如下所示:
在第1阶段:主业务服务分别调用所有从业务的try操作,并在活动管理器中登记所有从业务服务。当所有从业务服务的try操作都调用成功或者某个从业务服务的try操作失败,进入第二阶段。
例如:美团分别请求两个航空公司预留机票,两个航空公司分别告诉美团预留成功还是失败。航空公司需要保证,机票预留成功的话,之后一定能购买到。
在第2阶段:活动管理器根据第一阶段的执行结果来执行confirm或cancel操作。如果第一阶段所有try操作都成功,则活动管理器调用所有从业务活动的confirm操作。否则调用所有从业务服务的cancel操作。
例如:
如果两个航空公司都预留成功,则分别向两个公司发送确认购买请求。
如果两个航空公司任意一个预留失败,则对于预留成功的航空公司也要取消预留。这种情况下,对于之前预留成功机票的航班取消,也不会扣用户的钱,因为购买并没实际发生,之前只是请求预留机票而已。
通过这种方案,可以保证两个航空公司购买机票的一致性,要不都成功,要不都失败,即使失败也不会扣用户的钱。如果在两个航班都已经已经确认购买后,再退票,那肯定还是要扣钱的。
当然,实际情况肯定这里提到的肯定要复杂,通常航空公司在第一阶段,对于预留的机票,会要求在指定的时间必须确认购买(支付成功),如果没有及时确认购买,会自动取消。假设川航要求10分钟内支付成功,东航要求30分钟内支付成功。以较短的时间算,如果用户在10分钟内支付成功的话,那么美团会向两个航空公司都发送确认购买的请求,如果超过10分钟(以较短的时间为准),那么就不能进行支付。
这个方案提供给我们一种跨服务保证事务一致性的一种解决思路,可以把这种方案当做TCC的雏形。
具体流程:
TCC是Try ( 尝试 ) — Confirm(确认) — Cancel ( 取消 ) 的简称:
操作方法 | 含义 |
---|---|
Try | 完成所有业务检查(一致性),预留业务资源(准隔离性) 回顾上面航班预定案例的阶段1,机票就是业务资源,所有的资源提供者(航空公司)预留都成功,try阶段才算成功 |
Confirm | 确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。回顾上面航班预定案例的阶段2,美团APP确认两个航空公司机票都预留成功,因此向两个航空公司分别发送确认购买的请求。 |
Cancel | 取消Try阶段预留的业务资源。回顾上面航班预定案例的阶段2,如果某个业务方的业务资源没有预留成功,则取消所有业务资源预留请求。 |
缺点:
- Confirm和Cancel的幂等性很难保证。
- 这种方式缺点比较多,通常在复杂场景下是不推荐使用的,除非是非常简单的场景,非常容易提供回滚Cancel,而且依赖的服务也非常少的情况。
- 这种实现方式会造成代码量庞大,耦合性高。而且非常有局限性,因为有很多的业务是无法很简单的实现回滚的,如果串行的服务很多,回滚的成本实在太高。
不少大公司里,其实都是自己研发 TCC 分布式事务框架的,专门在公司内部使用。国内开源出去的:ByteTCC,TCC-transaction,Himly。
TCC两阶段提交与XA两阶段提交的区别
XA是资源层面的分布式事务,d遨游性,在两阶段提交的整个过程中,一直会持有资源的锁。
TCC是业务层面的分布式事务,最终一致性,不会一直持有资源的锁。
其核心在于将业务分为两个操作步骤完成。不依赖 RM 对分布式事务的支持,而是通过对业务逻辑的分解来实现分布式事务。
本地消息表(异步确保)- 事务最终一致性(:star:)
这种实现方式的思路,其实是源于 ebay,后来通过支付宝等公司的布道,在业内广泛使用。其基本的设计思想是将远程分布式事务拆分成一系列的本地事务。如果不考虑性能及设计优雅,借助关系型数据库中的表即可实现。
- 订单系统新增一条消息表,将新增订单和新增消息放到一个事务里完成,然后通过轮询的方式去查询消息表,将消息推送到 MQ,库存系统去消费 MQ。
执行流程:
订单系统,添加一条订单和一条消息,在一个事务里提交。
订单系统,使用定时任务轮询查询状态为未同步的消息表,发送到 MQ,如果发送失败,就重试发送。
库存系统,接收 MQ 消息,修改库存表,需要保证幂等操作。
如果修改成功,调用 RPC 接口修改订单系统消息表的状态为已完成或者直接删除这条消息。
如果修改失败,可以不做处理,等待重试。
订单系统中的消息有可能由于业务问题会一直重复发送,所以为了避免这种情况可以记录一下发送次数,当达到次数限制之后报警,人工接入处理;库存系统需要保证幂等,避免同一条消息被多次消费造成数据一致。
本地消息表这种方案实现了最终一致性,需要在业务系统里增加消息表,业务逻辑中多一次插入的 DB 操作,所以性能会有损耗,而且最终一致性的间隔主要由定时任务的间隔时间决定。
优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。
缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。
适用于高并发最终一致
低并发基本一致:二阶段提交
高并发强一致:没有解决方案
RocetMQ 事务消息
有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 RabbitMQ 和 Kafka 都不支持。
以阿里的 RocketMQ 中间件为例,其思路大致为:
RocketMQ提供了类似X/Open XA的分布事务功能,通过MQ的事务消息能达到分布式事务的最终一致。
发送方在业务执行开始会先向消息服务器中投递 “ 半消息 “ ,半消息即暂时不会真正投递的消息,当发送方(即生产者)将消息成功发送给了MQ服务端且并未将该消息的二次确认结果返回,此时消息状态是” 暂时不可投递 “ 状态(可以认为是状态未知)。该状态下的消息即半消息。
如果出现网络闪断、生产者应用重启等原因导致事务消息二次确认丢失,MQ服务端会通过扫描发现某条消息长期处于 “ 半消息 “ 状态,MQ服务端会主动向生产者查询该消息的最终状态是处于Commit(消息提交)还是Rollback(消息回滚)。这个过程称为消息回查。
在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
总体而言RocketMQ事务消息分为两条主线
定时任务发送流程:发送half message(半消息),执行本地事务,发送事务执行结果
定时任务回查流程:MQ服务器回查本地事务,发送事务执行结果
具体流程如下
1、Producer 向 MQ 服务器 发送消息 , MQ Server 将消息状态标记为 Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。
2、MQ 服务器收到消息并持久化成功之后,会向Producer 确认首次消息发送成功,此时消息处于 half message(半消息) 状态,并未发送给对应的 Consumer 。
3、Producer 开始执行本地事务逻辑 , 通过本地数据库事务控制。
4、根据事务执行结果,Producer 向 MQ 服务器提交二次确认 ( commit 或者 rollback) 。MQ Server 收到 Commit 状态则将半消息标记为可投递,Consumer 最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,Consumer 将不会接受该消息。
5、在断网或者应用重启的情况下,二次确认未成功的发给 MQ Server,MQ Server 会主动向 Producer 启动消息回查
6、Producer 根据事务执行结果,对消息回查返回对应的结果。
7、Mq Server根据返回结果,决定继续投递消息或者丢弃消息(重复第4步操作)。
注意 1-4 为事务消息的发送过程
优点: 实现了最终一致性,不需要依赖本地数据库事务。
缺点: 目前主流MQ中只有RocketMQ支持事务消息。
Seata(:star:)
介绍
seata:Simple Extensible Autonomous Transaction Architecture
简单可扩展自治事务框架。
具体使用的话就是在具体业务方法上面加上@GlobalTransactional注解即可
2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar(Fast & EaSy Commit And Rollback),和社区一起共建开源分布式事务解决方案。Fescar 的愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们遇到的分布式事务方面的所有难题。
Fescar 开源后,蚂蚁金服加入 Fescar 社区参与共建,并在 Fescar 0.4.0 版本中贡献了 TCC 模式。
为了打造更中立、更开放、生态更加丰富的分布式事务开源社区,经过社区核心成员的投票,大家决定对 Fescar 进行品牌升级,并更名为 Seata,意为:Simple Extensible Autonomous Transaction Architecture,是一套一站式分布式事务解决方案。
Seata 融合了阿里巴巴和蚂蚁金服在分布式事务技术上的积累,并沉淀了新零售、云计算和新金融等场景下丰富的实践经验,但要实现适用于所有的分布式事务场景的愿景,仍有很长的路要走。因此,我们决定建立一个完全中立的分布式事务组织,希望更多的企业、开发者能够加入我们,一起打造 Seata。
现在使用seata的企业:
历史:
Ant Financial
XTS:Extended Transaction Service,可扩展事务服务。蚂蚁金服中间件团队自2007年以来开发了分布式事务中间件,广泛应用于Ant Financial,解决了跨数据库和服务的数据一致性问题。
DTX:Distributed Transaction Extended。自2013年以来,XTS已在Ant Financial Cloud上发布,名称为DTX。
阿里巴巴
TXC:Taobao Transaction Constructor。阿里巴巴中间件团队自2014年起启动该项目,以解决因应用程序架构从单片机改为微服务而导致的分布式事务问题。
GTS:Global Transaction Service。 TXC作为Aliyun中间件产品,新名称GTS自2016年起发布。
Fescar:我们从2019年开始基于TXC / GTS开源开源项目Fescar,以便在未来与社区密切合作。
Seata社区
Seata:简单的可扩展自治交易架构。 Ant Financial加入Fescar,使其成为一个更加中立和开放的分布式服务社区,并将Fescar更名为Seata。
Seata是一个分布式事务框架,设计思路是将一个分布式事务可以理解成一个全局事务,下面挂了若干个分支事务,而一个分支事务是一个满足 ACID 的本地事务,因此我们可以操作分布式事务像操作本地事务一样。一个注解搞定分布式事务。Seata中有两种分布式事务实现方案,AT及TCC
AT模式
Automatic (Branch) Transaction Mode
基于支持本地ACID事务的关系型数据库,对业务无侵入
Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并决定全局事务的提交或回滚。
Transaction Manager(TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
Resource Manager (RM):资源管理器,负责本地事务的注册,本地事务状态的汇报(投票),并且负责本地事务的提交和回滚。
XID:一个全局事务的唯一标识
其中,TM是一个分布式事务的发起者和终结者,TC负责维护分布式事务的运行状态,而RM则负责本地事务的运行。
如下图所示:
下面是一个分布式事务在Seata中的执行流程:
TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID
XID 在微服务调用链路的上下文中传播。
RM 向 TC 注册分支事务,接着执行这个分支事务并提交(重点:RM在第一阶段就已经执行了本地事务的提交/回滚),最后将执行结果汇报给TC
TM 根据 TC 中所有的分支事务的执行情况,发起全局提交或回滚决议。
TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。
Seata 中有三大模块,分别是 TM、RM 和 TC。 其中 TM 和 RM 是作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署。
至此,seata的协议机制总体上看与 XA 是一致的。但是是有一些差别的:
XA 方案的 RM 实际上是在数据库层,RM 本质上就是数据库自身(通过提供支持 XA 的驱动程序来供应用使用)。
而 Fescar 的 RM 是以二方包的形式作为中间件层部署在应用程序这一侧的,不依赖于数据库本身对协议的支持,当然也不需要数据库支持 XA 协议。这点对于微服务化的架构来说是非常重要的:应用层不需要为本地事务和分布式事务两类不同场景来适配两套不同的数据库驱动。
这个设计,剥离了分布式事务方案对数据库在 协议支持 上的要求。
MT模式(TCC)
Manual (Branch) Transaction Mode
Seata还支持MT模式。MT模式本质上是一种TCC方案,业务逻辑需要被拆分为 Prepare/Commit/Rollback 3 部分,形成一个 MT 分支,加入全局事务。
不依赖于底层数据资源的事务支持,需自定义prepare/commit/rollback
操作,对业务有侵入
如图所示:
MT 模式一方面是 AT 模式的补充。另外,更重要的价值在于,通过 MT 模式可以把众多非事务性资源纳入全局事务的管理中。
7.JVM
内存区域详解
JDK1.7线程共享区域有个方法区,JDK8变为了本地内存里的元空间。
在多线程的情况下,程序计数器用于记录当前线程执行的位置,从而当线程被切换回来的时候能够知道该线程上次运行到哪儿了。
栈由一个个栈帧组成,而每个栈帧中都拥有:局部变量表、操作数栈、动态链接、方法返回地址。
堆内存区域的唯一目的就是存放对象实例,几乎所有的对象实例以及数组都在这里分配内存
Java 堆还可以细分为:新生代和老年代;再细致一点新生代有:Eden、S0、S1 等空间
JDK 8 版本之后 PermGen(永久代) 已被 Metaspace(元空间) 取代,元空间使用的是本地内存。
新生代->老年代->永久代
永久代和元空间是 HotSpot 虚拟机对虚拟机规范中方法区的两种实现方式。
Java对象的创建过程:1.类加载检查 2.分配内存 3.初始化零值 4.设置对象头 5.执行init方法
GC算法
GC 是 Garbage Collection 的简称,中文称为“垃圾回收”。GC ,是指程序把不用的内存空间视为垃圾并回收掉的整套动作。
Java 自动内存管理最核心的功能是 堆 内存中对象的分配与回收。
对象优先在Eden区分配,当 Eden 区没有足够空间进行分配时,虚拟机将发起一次 Minor GC。
- 在这次垃圾回收过程中,存活的对象会被复制到Survivor区(通常是S0或S1),而不存活的对象则会被清除。
- 如果Survivor区的空间不足,则存活的对象会被直接晋升到老年代。
- 在下一次minor GC过程中,会将Eden区和上次的Survivor区(例如S0)中的存活对象复制到另一个Survivor区(例如S1)。同时,上一轮minor GC中存活下来的对象年龄会增加1,并一起复制到新的Survivor区。
大对象就是需要大量连续内存空间的对象,直接进入老年代。
针对 HotSpot VM 的实现,它里面的 GC 其实准确分类只有两大种:
部分收集 (Partial GC):
- 新生代收集(Minor GC / Young GC):只对新生代进行垃圾收集;
- 老年代收集(Major GC / Old GC):只对老年代进行垃圾收集。需要注意的是 Major GC 在有的语境中也用于指代整堆收集;目前只有CMS会有单独收集老年代的行为
- 混合收集(Mixed GC):对整个新生代和部分老年代进行垃圾收集。
整堆收集 (Full GC):收集整个 Java 堆和方法区。
空间分配担保:进行Minor GC之前,先看看老年代有没有足够的空间。如果没有,则进行Full GC
垃圾收集算法:标记-清除算法。标记-复制算法。标记-整理算法。分代收集算法。
1.标记-清除算法
标记清除算法,是将垃圾回收分为2个阶段,分别是标记和清除。
1.根据可达性分析算法得出的垃圾进行标记
2.对这些标记为可回收的内容进行垃圾回收
可以看到,标记清除算法解决了引用计数算法中的循环引用的问题,没有从root节点引用的对象都会被回收。
同样,标记清除算法也是有缺点的:
- 效率较低,标记和清除两个动作都需要遍历所有的对象,并且在GC时,需要停止应用程序,对于交互性要求比较高的应用而言这个体验是非常差的。
- (重要)通过标记清除算法清理出来的内存,碎片化较为严重,因为被回收的对象可能存在于内存的各个角落,所以清理出来的内存是不连贯的。
2.标记-复制算法
复制算法的核心就是,将原有的内存空间一分为二,每次只用其中的一块,在垃圾回收时,将正在使用的对象复制到另一个内存空间中,然后将该内存空间清空,交换两个内存的角色,完成垃圾的回收。
如果内存中的垃圾对象较多,需要复制的对象就较少,这种情况下适合使用该方式并且效率比较高,反之,则不适合。
1)将内存区域分成两部分,每次操作其中一个。
2)当进行垃圾回收时,将正在使用的内存区域中的存活对象移动到未使用的内存区域。当移动完对这部分内存区域一次性清除。
3)周而复始。
优点:
- 在垃圾对象多的情况下,效率较高
- 清理后,内存无碎片
缺点:
- 分配的2块内存空间,在同一个时刻,只能使用一半,内存使用率较低
3.标记-整理算法
标记压缩算法是在标记清除算法的基础之上,做了优化改进的算法。和标记清除算法一样,也是从根节点开始,对对象的引用进行标记,在清理阶段,并不是简单的直接清理可回收对象,而是将存活对象都向内存另一端移动,然后清理边界以外的垃圾,从而解决了碎片化的问题。
1)标记垃圾。
2)需要清除向右边走,不需要清除的向左边走。
3)清除边界以外的垃圾。
优缺点同标记清除算法,解决了标记清除算法的碎片化的问题,同时,标记整理算法多了一步,对象移动内存位置的步骤,其效率也有有一定的影响。
与复制算法对比:复制算法标记完就复制,但标记整理算法得等把所有存活对象都标记完毕,再进行整理
分代收集算法
比如在新生代中,每次收集都会有大量对象死去,所以可以选择”标记-复制“算法,只需要付出少量对象的复制成本就可以完成每次垃圾收集。而老年代的对象存活几率是比较高的,而且没有额外的空间对它进行分配担保,所以我们必须选择“标记-清除”或“标记-整理”算法进行垃圾收集。
在java8时,堆被分为了两份:新生代和老年代【1:2】
对于新生代,内部又被分为了三个区域。Eden区,S0区,S1区【8:1:1】
当对新生代产生GC:MinorGC【young GC】
当对老年代代产生GC:Major GC 【old GC】
当对新生代和老年代产生FullGC: 新生代 + 老年代完整垃圾回收,暂停时间长,应尽力避免
MinorGC【young GC】发生在新生代的垃圾回收,暂停时间短(STW)
Mixed GC 新生代 + 老年代部分区域的垃圾回收,G1 收集器特有
FullGC: 新生代 + 老年代完整垃圾回收,暂停时间长(STW),应尽力避免?
名词解释:
STW(Stop-The-World):暂停所有应用程序线程,等待垃圾回收的完成
工作机制
- 新创建的对象,都会先分配到eden区
当伊甸园内存不足,标记伊甸园与 from(现阶段没有)的存活对象
将存活对象采用复制算法复制到 to 中,复制完毕后,伊甸园和 from 内存都得到释放
- 经过一段时间后伊甸园的内存又出现不足,标记eden区域to区存活的对象,将存活的对象复制到from区
- 当幸存区对象熬过几次回收(最多15次),晋升到老年代(幸存区内存不足或大对象会导致提前晋升)
垃圾收集器
JDK默认的垃圾收集器:
- JDK 8:Parallel Scavenge(新生代)+ Parallel Old(老年代),即Parallel GC
- JDK 9 ~ JDK20: G1
如果两个收集器之间存在连线,则说明它们可以搭配使用。虚拟机所处的区域则表示它是属于新生代还是老年代收集器。
整堆收集器: G1
垃圾回收器选择策略 :
客户端程序 : Serial + Serial Old;
吞吐率优先的服务端程序(比如:计算密集型) : Parallel Scavenge + Parallel Old;
响应时间优先的服务端程序 :ParNew + CMS。
G1收集器是基于标记整理算法实现的,不会产生空间碎片,可以精确地控制停顿,将堆划分为多个大小固定的独立区域,并跟踪这些区域的垃圾堆积程度,在后台维护一个优先列表,每次根据允许的收集时间,优先回收垃圾最多的区域(Garbage First)。
Serial收集器
Serial收集器:串行,单线程,在进行垃圾收集工作的时候必须暂停其他所有的工作线程( “Stop The World” ),直到它收集结束。采用标记-复制算法.
Serial Old 收集器
Serial Old 收集器:串行,单线程,Serial收集器的老年代版本(标记-整理算法),与Parallel Scavenge配合使用。
ParNew收集器
ParNew 收集器:其实就是 Serial 收集器的多线程版本,除了使用多线程进行垃圾收集外,其余行为(控制参数、收集算法、回收策略等等)和 Serial 收集器完全一样。
Parallel Scavenge收集器
Parallel Scavenge 收集器:和ParNew收集器基本一样。Parallel Scavenge 收集器关注点是吞吐量(高效率的利用 CPU)。CMS 等垃圾收集器的关注点更多的是用户线程的停顿时间(提高用户体验)。
探究ParNew与Parallel Scavenge的区别:
ParNew收集器(新生代)通常与CMS收集器(老年代)配合使用,它们共同为低延迟场景提供服务。
ParNew注重的是降低暂停时间,因此更适合需要低延迟的应用,如Web服务器、交互式应用等。而Parallel Scavenge注重高吞吐量,更适合后台运算为主的场景,如大型计算任务、批处理等。
ParNew为了保证低延迟,可能会牺牲部分吞吐量。而Parallel Scavenge则相反,它会牺牲部分延迟来保证最大的吞吐量。Parallel Scavenge有一些参数可以进行调整。
Parallel Scavenge具有自适应调节策略(-XX:+UseAdaptiveSizePolicy),能够根据系统的实际运行情况调整各个区域的大小及目标暂停时间。ParNew没有这种自适应机制。
Parallel Old收集器
Parallel Old 收集器:多线程,Parallel Scavenge收集器的老年代版本,标记-整理算法
CMS收集器
CMS收集器:CMS(Concurrent Mark Sweep)收集器是一种以获取最短回收停顿时间为目标的收集器。它非常符合在注重用户体验的应用上使用
CMS(Concurrent Mark Sweep)收集器是 HotSpot 虚拟机第一款真正意义上的并发收集器,它第一次实现了让垃圾收集线程与用户线程(基本上)同时工作。
CMS 收集器是一种 “标记-清除”算法实现的。
整个过程分为四个步骤:
- 初始标记: 暂停所有的其他线程,并记录下直接与 root 相连的对象,速度很快 ;
- 并发标记: 同时开启 GC 和用户线程,用一个闭包结构去记录可达对象。但在这个阶段结束,这个闭包结构并不能保证包含当前所有的可达对象。因为用户线程可能会不断的更新引用域,所以 GC 线程无法保证可达性分析的实时性。所以这个算法里会跟踪记录这些发生引用更新的地方。
- 重新标记: 重新标记阶段就是为了修正并发标记期间因为用户程序继续运行而导致标记产生变动的那一部分对象的标记记录,这个阶段的停顿时间一般会比初始标记阶段的时间稍长,远远比并发标记阶段时间短
- 并发清除: 开启用户线程,同时 GC 线程开始对未标记的区域做清扫。
主要优点:并发收集、低停顿。但是它有下面三个明显的缺点:
- 对 CPU 资源敏感;
- 无法处理浮动垃圾;
- 它使用的回收算法-“标记-清除”算法会导致收集结束时会有大量空间碎片产生
G1收集器
G1 (Garbage-First) 是一款面向服务器的垃圾收集器,主要针对配备多颗处理器及大容量内存的机器. 以极高概率满足 GC 停顿时间要求的同时,还具备高吞吐量性能特征.
它具备以下特点:
- 并行与并发:G1 能充分利用 CPU、多核环境下的硬件优势,使用多个 CPU(CPU 或者 CPU 核心)来缩短 Stop-The-World 停顿时间。部分其他收集器原本需要停顿 Java 线程执行的 GC 动作,G1 收集器仍然可以通过并发的方式让 java 程序继续执行。
- 分代收集:虽然 G1 可以不需要其他收集器配合就能独立管理整个 GC 堆,但是还是保留了分代的概念。但是在读书的时候发现,是将堆内存分为好多Region区域,然后每个区域都有8:1:1的新生代和老年代,然后回收的衡量标准是哪块区域内存中存放的垃圾最多。
- 空间整合:与 CMS 的“标记-清除”算法不同,G1 从整体来看是基于“标记-整理”算法实现的收集器;从局部上来看是基于“标记-复制”算法实现的。
- 可预测的停顿:这是 G1 相对于 CMS 的另一个大优势,降低停顿时间是 G1 和 CMS 共同的关注点,但 G1 除了追求低停顿外,还能建立可预测的停顿时间模型,能让使用者明确指定在一个长度为 M 毫秒的时间片段内,消耗在垃圾收集上的时间不得超过 N 毫秒。
G1 收集器的运作大致分为以下几个步骤:
- 初始标记
- 并发标记
- 最终标记
- 筛选回收
G1 收集器在后台维护了一个优先列表,每次根据允许的收集时间,优先选择回收价值最大的 Region(这也就是它的名字 Garbage-First 的由来) 。这种使用 Region 划分内存空间以及有优先级的区域回收方式,保证了 G1 收集器在有限时间内可以尽可能高的收集效率(把内存化整为零)。
从 JDK9 开始,G1 垃圾收集器成为了默认的垃圾收集器
CMS新生代的Young GC、G1和ZGC都基于标记-复制算法,但算法具体实现的不同就导致了巨大的性能差异。
G1停顿时间的瓶颈主要是标记-复制中的转移阶段STW。为什么转移阶段不能和标记阶段一样并发执行呢?主要是G1未能解决转移过程中准确定位对象地址的问题。
分成三个阶段:新生代回收、并发标记、混合收集
如果并发失败(即回收速度赶不上创建新对象速度),会触发 Full GC
Young Collection(年轻代垃圾回收)
初始时,所有区域都处于空闲状态
创建了一些对象,挑出一些空闲区域作为伊甸园区存储这些对象
当伊甸园需要垃圾回收时,挑出一个空闲区域作为幸存区,用复制算法复制存活对象,需要暂停用户线程
随着时间流逝,伊甸园的内存又有不足
将伊甸园以及之前幸存区中的存活对象,采用复制算法,复制到新的幸存区,其中较老对象晋升至老年代
Young Collection + Concurrent Mark (年轻代垃圾回收+并发标记)
当老年代占用内存超过阈值(默认是45%)后,触发并发标记,这时无需暂停用户线程
并发标记之后,会有重新标记阶段解决漏标问题,此时需要暂停用户线程。
这些都完成后就知道了老年代有哪些存活对象,随后进入混合收集阶段。此时不会对所有老年代区域进行回收,而是根据暂停时间目标优先回收价值高(存活对象少)的区域(这也是 Gabage First 名称的由来)。
Mixed Collection (混合垃圾回收)
复制完成,内存得到释放。进入下一轮的新生代回收、并发标记、混合收集
其中H叫做巨型对象,如果对象非常大,会开辟一块连续的空间存储巨型对象
ZGC收集器
ZGC是一款JDK 11中新加入的具有实验性质的低延迟垃圾收集器,ZGC是Azul System公司开发的 C4(Concurrent Continuously Compacting Collector)收集器。
ZGC收集器是一款基于Region内存布局的,(暂时)不设分代的,使用了读屏障、染色指针和内存多重映射等技术来实现可并发的标记-整理算法的,以低延迟为首要目标的一款垃圾收集器。
与 CMS 中的 ParNew 和 G1 类似,ZGC 也采用标记-复制算法,不过 ZGC 对该算法做了重大改进。
在 ZGC 中出现 Stop The World 的情况会更少!
它的设计目标包括:
- 停顿时间不超过10ms;
- 停顿时间不会随着堆的大小,或者活跃对象的大小而增加;
- 支持8MB~4TB级别的堆(未来支持16TB)。
与CMS中的ParNew和G1类似,ZGC也采用标记-复制算法,不过ZGC对该算法做了重大改进:ZGC在标记、转移和重定位阶段几乎都是并发的,这是ZGC实现停顿时间小于10ms目标的最关键原因。
ZGC运作过程
ZGC的运作过程大致可划分为以下四个大的阶段:
①并发标记(Concurrent Mark):与G1一样,并发标记是遍历对象图做可达性分析的阶段,它的初始标记 (Mark Start)和最终标记(Mark End)也会出现短暂的停顿,与G1不同的是, ZGC的标记是在指针上而不是在对象 上进行的, 标记阶段会更新染色指针中的Marked 0、 Marked 1标志位。
②并发预备重分配(Concurrent Prepare for Relocate):这个阶段需要根据特定的查询条件统计得出本次收 集过程要清理哪些Region,将这些Region组成重分配集(Relocation Set)。ZGC每次回收都会扫描所有的 Region,用范围更大的扫描成本换取省去G1中记忆集的维护成本。
③并发重分配(Concurrent Relocate):重分配是ZGC执行过程中的核心阶段,这个过程要把重分配集中的存 活对象复制到新的Region上,并为重分配集中的每个Region维护一个转发表(Forward Table),记录从旧对象 到新对象的转向关系。ZGC收集器能仅从引用上就明确得知一个对象是否处于重分配集之中,如果用户线程此时并 发访问了位于重分配集中的对象,这次访问将会被预置的内存屏障所截获,然后立即根据Region上的转发表记录将访问转发到新复制的对象上,并同时修正更新该引用的值,使其直接指向新对象,ZGC将这种行为称为指 针的“自愈”(Self-Healing)能力。
④并发重映射(Concurrent Remap):重映射所做的就是修正整个堆中指向重分配集中旧对象的所有引用,但是ZGC中对象引用存在“自愈”功能,所以这个重映射操作并不是很迫切。ZGC很巧妙地把并发重映射阶段要做的工作,合并到了下一次垃圾收集循环中的并发标记阶段里去完成,反正它们都是要遍历所有对象的,这样合并就节 省了一次遍历对象图的开销。一旦所有指针都被修正之后, 原来记录新旧对象关系的转发表就可以释放掉了。
ZGC通过着色指针和读屏障技术,解决了转移过程中准确访问对象的问题,实现了并发转移。大致原理描述如下:并发转移中“并发”意味着GC线程在转移对象的过程中,应用线程也在不停地访问对象。假设对象发生转移,但对象地址未及时更新,那么应用线程可能访问到旧地址,从而造成错误。而在ZGC中,应用线程访问对象将触发“读屏障”,如果发现对象被移动了,那么“读屏障”会把读出来的指针更新到对象的新地址上,这样应用线程始终访问的都是对象的新地址。那么,JVM是如何判断对象被移动过呢?就是利用对象引用的地址,即着色指针。
双亲委派模型
双亲委派模型:当一个类需要被加载的时候,一步步往上问看是否已经被加载;都没有加载的话再从上到下一步步尝试去加载。
JVM 不仅要看类的全名是否相同,还要看加载此类的类加载器是否一样。只有两者都相同的情况,才认为两个类是相同的。
双亲委派模型保证了 Java 程序的稳定运行,可以避免类的重复加载(JVM 区分不同类的方式不仅仅根据类名,相同的类文件被不同的类加载器加载产生的是两个不同的类),也保证了 Java 的核心 API 不被篡改。
实际调优
堆内存区域的唯一目的就是存放对象实例,几乎所有的对象实例以及数组都在这里分配内存
显式指定堆内存最小值和最大值–Xms
和-Xmx
我们还有很多JVM参数可以去调:
https://javaguide.cn/java/jvm/jvm-parameters-intro.html#%E6%96%87%E7%AB%A0%E6%8E%A8%E8%8D%90
案例:
https://javaguide.cn/java/jvm/jvm-in-action.html
jps
:查看所有 Java 进程
jstat
: 监视虚拟机各种运行状态信息
jinfo
: 实时地查看和调整虚拟机各项参数
jmap
(Memory Map for Java)命令用于生成堆转储快照
jhat
用于分析 heapdump 文件,它会建立一个 HTTP/HTML 服务器,让用户可以在浏览器上查看分析结果。
jstack
(Stack Trace for Java)命令用于生成虚拟机当前时刻的线程快照。线程快照就是当前虚拟机内每一条线程正在执行的方法堆栈的集合.
JDK可视化分析工具:
JConsole:Java 监视与管理控制台;可以查看Java程序概况,内存监控,线程监控,
Visual VM:多合一故障处理工具;
8.MQ:star:
阻塞队列-BlockingQueue
被阻塞的情况主要有如下两种:
当队列满了的时候进行入队列操作
当队列空了的时候进行出队列操作
因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空队列进行出队列操作时,它将会被阻塞,除非有另一个线程进行了入队列操作。
阻塞队列主要用在生产者/消费者的场景,下面这幅图展示了一个线程生产、一个线程消费的场景:
为什么需要BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
java.util.concurrent 包里的 BlockingQueue是一个接口,继承Queue接口,Queue接口继承 Collection。
BlockingQueue接口主要有以下7个实现类:
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
- LinkedTransferQueue:由链表组成的无界阻塞队列。
- LinkedBlockingDeque:由链表组成的双向阻塞队列。
BlockingQueue接口有以下几个方法:
它的方法可以分成以下4类:
抛出异常 | 特殊值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | 不可用 | 不可用 |
抛出异常
add正常执行返回true,element(不删除)和remove返回阻塞队列中的第一个元素
当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:Queue full
当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException
当阻塞队列空时,再调用element检查元素会抛出NoSuchElementException
特定值
插入方法,成功ture失败false
移除方法,成功返回出队列的元素,队列里没有就返回null
检查方法,成功返回队列中的元素,没有返回null
一直阻塞
如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
当阻塞队列满时,再往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出
当阻塞队列空时,再从队列里take元素,队列会一直阻塞消费者线程直到队列可用
超时退出
如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。
返回一个特定值以告知该操作是否成功(典型的是 true / false)。
自己实现一个简单的MQ
项目里原本是使用Kafka当消息队列的,但是组内大师觉得Kafka太重了,只要确保订单信息都持久化到数据库里,每次重启的时候重新加载任务,那么其实是可以自己写一个消息队列来替代Kafka的
- 下面是我自己写的一个任务队列执行器,使用了
ArrayBlockingQueue
来存储任务,并创建一个单独的线程来处理队列中的任务。并且允许指定处理任务的处理器(Processor),可以根据需要执行不同的任务。 - 关于TaskQueueExecutor的构造函数,我这里自定义了一个函数式接口,但是其实也可以直接用Java内置的函数式接口:Function,写法类似于
public TaskQueueExecutor(String threadName, int queueSize, Function<T, R> processor) {}
@Slf4j
public class TaskQueueExecutor<T> {
// 为记录消息和错误信息定义一个日志记录器
// 用于存储类型为T的任务的阻塞队列
private final ArrayBlockingQueue<T> queue;
// 用于处理任务的独立线程
private final Thread msgLooper;
// 用于处理任务的处理器接口
private final Processor<T> processor;
// 用于跟踪队列中任务的数量的AtomicInteger是原子整数类型,可以避免并发环境下的线程安全问题
private final AtomicInteger taskNum;
// 定义一个处理任务的接口
public interface Processor<T> {
void process(T task);
}
// 构造函数用于初始化TaskQueueExecutor
public TaskQueueExecutor(String threadName, int queueSize, Processor<T> processor) {
// 使用给定的大小初始化阻塞队列
this.queue = new ArrayBlockingQueue<T>(queueSize);
// 存储用于处理任务的处理器
this.processor = processor;
// 创建一个新线程用于消息处理
this.msgLooper = new Thread(this::looper);
// 设置线程名称,我这里是根据业务名称来设置的,这样排查问题会比较方便
this.msgLooper.setName(threadName);
// 启动消息处理线程
this.msgLooper.start();
// 初始化taskNum
taskNum = new AtomicInteger(0);
}
// 方法用于将消息(任务)发送到队列
public void sendMessage(T task) {
try {
// 将任务放入队列
this.queue.put(task);
// 增加任务计数
taskNum.incrementAndGet();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 方法用于获取队列中的任务数量
public int getTaskNum() {
return taskNum.get();
}
// 主要的消息处理循环
private void looper() {
while (true) {
try {
// 从队列中获取任务(如果队列为空,则阻塞)
T task = queue.take();
// 使用提供的处理器处理任务
processor.process(task);
} catch (InterruptedException e) {
// 如果线程被中断,则中断循环
break;
} catch (Exception e) {
// 处理任务处理过程中出现的任何异常
log.error(String.format("TaskQueueExecutor %s run task exception",
this.msgLooper.getName()), e);
} finally {
// 即使发生异常,也会在finally块中减少任务计数
taskNum.decrementAndGet();
}
}
}
}
业务逻辑
QueueService
:是用于管理不同类型的任务队列。(我这里目前就俩任务队列)@PostConstruct init()
:使用注解来标记初始化方法,该方法在类实例化后自动调用,主要是初始化了两个ArrayList
,分别用于存储不同类型的任务队列执行器,同时可以通过配置项来判断是否需要从数据库加载未完成任务。sendMessage()
:用于将订单添加到适当的队列执行器,根据订单的类型选择合适的执行器列表,计算任务最少的分区并添加订单。getQueueSize()
: 用于获取特定订单的队列大小,根据订单类型选择相应的队列执行器列表并返回队列大小。getTaskQueueExecutors()
:根据订单类型返回相应的任务队列执行器。
@Service
@Slf4j
public class QueueService {
// 引入依赖的服务和组件
@Resource
private ThirdManager thirdManager;
@Resource
private QueueConfig queueConfig;
@Resource
private OrderMapper orderMapper;
@Resource
private OrderConsumerService orderConsumerService;
// 定义两个ArrayList,用于存储不同类型的任务队列执行器,这里可以根据自身业务需求来定义多个。
private ArrayList<TaskQueueExecutor<Torder>> paperTaskExecutors;
private ArrayList<TaskQueueExecutor<Torder>> textTaskExecutors;
// 初始化方法,在类实例化后自动调用,创建信息从配置文件中读取。
@PostConstruct
public void init() {
// 加载配置,设置队列大小和处理器
for (int idx = 0; idx < queueConfig.getPaperTopicPartitionsNum(); idx++) {
int partition = idx;
paperTaskExecutors.add(new TaskQueueExecutor<>(
String.format("%s-%d", queueConfig.getPaperTopic(), partition),
1000,
torder -> {
orderConsumerService.paperConsumeMessage(torder, partition);
}
));
}
// 加载配置,设置队列大小和处理器
for (int idx = 0; idx < queueConfig.getTextTopicPartitionsNum(); idx++) {
int partition = idx;
textTaskExecutors.add(new TaskQueueExecutor<>(
String.format("%s-%d", queueConfig.getTextTopic(), partition),
1000,
torder -> {
orderConsumerService.textConsumeMessage(torder, partition);
}
));
}
// 每次重启的时候,从数据库加载未完成订单,可以通过配置项配置,生产环境加载未完成任务,开发环境不加载
if (queueConfig.isNeedLoadFromDB()) {
// 查询数据库中未处理的订单。
List<Torder> orders = orderMapper.selectList(Wrappers.lambdaQuery(Torder.class).eq(Torder::getStatus, 1));
for (Torder torder : orders) {
log.info("添加未处理订单:{}", torder);
// 将这些订单添加到适当的消息队列,以便后续处理。
sendMessage(torder);
}
}
}
// 用于将订单消息发送到适当的队列执行器。
public void sendMessage(Torder torder) {
var executorOptional = getTaskQueueExecutors(torder);
executorOptional.ifPresent(executors -> {
// stream操作,用于寻找任务数最少的队列,默认值为0
int partition = IntStream.range(0, executors.size())
.reduce((i, j) -> executors.get(i).getTaskNum() > executors.get(j).getTaskNum() ? j : i)
.orElse(0);
int taskNum = executors.get(partition).getTaskNum();
// 这里是我的业务逻辑
torder.setZone(partition);
orderMapper.updateById(torder);
log.info("当前消息入队列,所在分区{}, 当前订单号为{}, 前面还有{}人在排队",
partition, torder.getOrderId(), executors.get(partition).getTaskNum());
if (taskNum > 100) {
thirdManager.sendExceptionToFeiShu("报告降重,分区:【" + partition + "】当前排队人数已达:【" + taskNum + "】人次");
}
// 添加订单到选定的队列执行器。
executors.get(partition).sendMessage(torder);
});
}
// 方法用于获取特定订单的队列大小(即队列中等待处理的订单数量)。
public int getQueueSize(Torder torder) {
var executorOptional = getTaskQueueExecutors(torder);
return executorOptional.map(taskQueueExecutors ->
taskQueueExecutors.get(torder.getZone()).getTaskNum())
.orElse(0);
}
// 方法用于根据订单类型获取相应的任务队列执行器列表。
private Optional<ArrayList<TaskQueueExecutor<Torder>>> getTaskQueueExecutors(Torder torder) {
ArrayList<TaskQueueExecutor<Torder>> executors;
if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_PAPER)) {
executors = paperTaskExecutors;
} else if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_TEXT)) {
executors = textTaskExecutors;
} else if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_EXPAND)) {
executors = textTaskExecutors;
} else if (torder.getPolishType().equals(Common.POLISH_TYPE_WITH_RELINE)) {
executors = textTaskExecutors;
} else {
log.error("Error PolishType: {}", torder.getPolishType());
return Optional.empty();
}
return Optional.of(executors);
}
}
MQ基础知识
MQ的作用:异步,削峰,解耦。实现分布式事务、顺序保证、延时/定时处理,即时通讯数据流处理。
RocketMQ、 Kafka 都提供了事务相关的功能。事务允许事件流应用将消费,处理,生产消息整个过程定义为一个原子操作。
大部分消息队列,例如 RocketMQ、RabbitMQ、Kafka,都支持顺序消息,延时/定时处理。
RabbitMQ 内置了 MQTT 插件用于实现 MQTT 功能(默认不启用,需要手动开启),支持即时消息传递,即使在网络条件较差的情况下也能保持通信的稳定性。
RocketMQ中性能比较高的原因:
1.内部使用Netty这个高效的NIO通信框架
2.大量使用多线程和异步
3.采用零拷贝技术优化(MMAP) 性能提升50%
4.采用文件存储,顺序读写。接近内存的速度
5.锁优化(CAS机制无锁化)
6.存储设计:读写分离。
如何设计一个消息队列
存储:高可用-磁盘存储,顺序读写,零拷贝技术
可伸缩:分布式,参考Kafka, broker \topic\partition
消息的丢失:多主多从, 多副本,raft协议,一台主服务器宕机,选举机制。
消息重复设计
网络框架:Netty 高效的NIO框架。
RabbitMQ
2007开源
RabbitMQ 是一个在 AMQP(Advanced Message Queuing Protocol )基础上实现的,使用 Erlang 编写的一个开源的消息队列。
RabbitMQ 中通过 Binding(绑定) 将 Exchange(交换器) 与 Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定建) ,这样 RabbitMQ 就知道如何正确将消息路由到队列了,如下图所示。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。
RabbitMQ 中消息只能存储在 队列 中,这一点和 Kafka 这种消息中间件相反。Kafka 将消息存储在 topic(主题) 这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。
RabbitMQ 不支持队列层面的广播消费,如果有广播消费的需求,需要在其上进行二次开发,这样会很麻烦,不建议这样做。
交换机类型有fanout,direct,topic,headers四种,最后一种性能很差,第一种太简单,第二种如上图,第三种就是将Routing Key改为*.rabbitmq.*这种偏正则的
AMQP
RabbitMQ 就是 AMQP 协议的 Erlang
的实现
AMQP 协议的三层:
- Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
- Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
- TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。
AMQP 模型的三大组件:
- **交换器 (Exchange)**:消息代理服务器中用于把消息路由到队列的组件。
- **队列 (Queue)**:用来存储消息的数据结构,位于硬盘或内存中。
- **绑定 (Binding)**:一套规则,告知交换器消息应该将消息投递给哪个队列。
死信队列
DLX,全称为 Dead-Letter-Exchange
,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message
) 之后,它能被重新发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列(Dead Letter Queue,简称 DLQ)。
导致的死信的几种原因:
- 消息被拒(
Basic.Reject /Basic.Nack
) 且requeue = false
。 - 消息 TTL 过期。
- 队列满了,无法再添加。
延迟队列
RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:
- 通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。
- 在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OPT 18.0 及以上。
也就是说,AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTL 和 DLX 模拟出延迟队列的功能。
Kafka
2011年开源,2015年成为Apache顶级项目
一些参数:
broker.id 每一个broker在集群中的唯一表示。
log.dirs 消息保存的路径。Kafka把所有消息都保存在磁盘上。
auto.create.topics.enable 是否允许自动创建主题
一些适合的场景:
限时订单不可以。没有对每条消息做限时的发送。RocketMQ可以
日志收集。很适合。并发量比较大。天生持久化,默认写磁盘。
消息系统。作为一款生产-消费的消息系统。
流式处理。Stream的流的组件。 Flink+Kakfa 做一个异步。
RocketMQ 的消息模型和 Kafka 基本是完全一样的。唯一的区别是 Kafka 中没有队列这个概念,与之对应的是 Partition(分区)
Kafka 将生产者发布的消息发送到 Topic(主题) 中,需要这些消息的消费者可以订阅这些 Topic(主题),如下图所示:
Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。
Partition(分区) : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上
消费顺序
每次添加消息到 Partition(分区) 的时候都会采用尾加法,如上图所示。 Kafka 只能为我们保证 Partition(分区) 中的消息有序。
我们就有一种很简单的保证消息消费顺序的方法:1 个 Topic 只对应一个 Partition。
Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。
总结一下,对于如何保证 Kafka 中消息消费的顺序,有了下面两种方法:
- 1 个 Topic 只对应一个 Partition。
- (推荐)发送消息的时候指定 key/Partition。
消息不丢失
生产者send消息,如果用get方法获取调用结果判断是否消息丢失,就变成同步的了。可以添加回调函数
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
Producer 的retries
(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。
消费者这边主要是聚焦于(真正消费,提交offset)这俩的顺序谁先谁后。禁止自动提交偏移量,改为手动。
Kafka本身丢失信息:设置acks=all,去同步所有的副本;设置 replication.factor >= 3,保证每个分区至少有3个副本;设置 min.insync.replicas > 1,消息至少要被写入到 2 个副本才算是被成功发送。
一般推荐设置成 replication.factor = min.insync.replicas + 1。
不重复消费
kafka 出现消息重复消费的原因:
- 服务端侧已经消费的数据没有成功提交 offset(根本原因)。
- Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
解决方案:
- 消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。
- 将
enable.auto.commit
参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:什么时候提交 offset 合适?- 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
- 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
重试机制
Kafka 消费者在默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试。如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。
RocketMQ
2012年开源,2017年成为Apache顶级项目
Kafka的分区对应RocketMQ的队列
一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。所以一般来讲要控制 消费者组中的消费者个数和主题中队列个数相同 。当然也可以消费者个数小于队列个数,只不过不太建议。
每个消费组在每个队列上维护一个消费位置.仅仅是为每个消费者组维护一个 消费位移(offset) .
一个Topic可以分布在多个Broker上。一个Broker也可以配置多个Topic.
在 RocketMQ
中使用的是 事务消息加上事务反查机制 来解决分布式事务问题的。
在第一步发送的 half 消息 ,它的意思是 在事务提交之前,对于消费者来说,这个消息是不可见的 。
那么,如何做到写入消息但是对用户不可见呢?RocketMQ 事务消息的做法是:如果消息是 half 消息,将备份原消息的主题与消息消费队列,然后 改变主题 为 RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费 half 类型的消息,然后 RocketMQ 会开启一个定时任务,从 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC 中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
你可以试想一下,如果没有从第 5 步开始的 事务反查机制 ,如果出现网路波动第 4 步没有发送成功,这样就会产生 MQ 不知道是不是需要给消费者消费的问题,他就像一个无头苍蝇一样。在 RocketMQ
中就是使用的上述的事务反查来解决的,而在 Kafka
中通常是直接抛出一个异常让用户来自行解决。
你还需要注意的是,在 MQ Server
指向系统 B 的操作已经和系统 A 不相关了,也就是说在消息队列中的分布式事务是——本地事务和存储消息到消息队列才是同一个事务。这样也就产生了事务的最终一致性,因为整个过程是异步的,每个系统只要保证它自己那一部分的事务就行了。
最快速解决消息堆积问题的方法还是增加消费者实例,不过 同时你还需要增加每个主题的队列数量 。因为一个队列只会被一个消费者消费。
零拷贝技术
传统的IO:
- 用户调用 read()方法,开始读取数据,此时发生一次上下文从用户态到内核态的切换,也就是图示的切换 1
- 将磁盘数据通过 DMA 拷贝到内核缓存区
- 将内核缓存区的数据拷贝到用户缓冲区,这样用户,也就是我们写的代码就能拿到文件的数据
- read()方法返回,此时就会从内核态切换到用户态,也就是图示的切换 2
- 当我们拿到数据之后,就可以调用 write()方法,此时上下文会从用户态切换到内核态,即图示切换 3
- CPU 将用户缓冲区的数据拷贝到 Socket 缓冲区
- 将 Socket 缓冲区数据拷贝至网卡
- write()方法返回,上下文重新从内核态切换到用户态,即图示切换 4
整个过程发生了 4 次上下文切换和 4 次数据的拷贝,这在高并发场景下肯定会严重影响读写性能故引入了零拷贝技术
mmap
mmap(memory map)是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。
简单地说就是内核缓冲区和应用缓冲区共享,从而减少了从读缓冲区到用户缓冲区的一次 CPU 拷贝。基于此上述架构图可变为:
基于 mmap IO 读写其实就变成 mmap + write 的操作,也就是用 mmap 替代传统 IO 中的 read 操作。
当用户发起 mmap 调用的时候会发生上下文切换 1,进行内存映射,然后数据被拷贝到内核缓冲区,mmap 返回,发生上下文切换 2;随后用户调用 write,发生上下文切换 3,将内核缓冲区的数据拷贝到 Socket 缓冲区,write 返回,发生上下文切换 4。
发生 4 次上下文切换和 3 次 IO 拷贝操作
sendfile
sendfile()跟 mmap()一样,也会减少一次 CPU 拷贝,但是它同时也会减少两次上下文切换。
如图,用户在发起 sendfile()调用时会发生切换 1,之后数据通过 DMA 拷贝到内核缓冲区,之后再将内核缓冲区的数据 CPU 拷贝到 Socket 缓冲区,最后拷贝到网卡,sendfile()返回,发生切换 2。发生了 3 次拷贝和两次切换。
sendfile 是无法知道文件的具体的数据的;但是 mmap 不一样,他是可以修改内核缓冲区的数据的。假设如果需要对文件的内容进行修改之后再传输,只有 mmap 可以满足。Kafka用sendfile,RocketMQ因为功能很多,需要一些数据的返回值,所以采用mmp.
RocketMQ中的分布式事务及其实现
银行转账,A->B。
A账户 -100 SQL事务,B账户 +100 SQL事务
两阶段提交,半消息。MQ的消费者本身就会不断重试。
超时订单自动取消
使用延迟队列消息实现,有很多种方式:但基本都是死信和延时
第一种 使用RabbitMQ里面TTL和死信队列实现。如果死信队列里有消息,则执行订单取消逻辑。
第二种 在RabbitMQ安装延迟队列实现
第三种 使用Redisson实现,也是用的延迟队列。
第四种 使用RocketMQ的延时消息实现。
尚硅谷的视频利用redissonClient 发送延迟消息。
redissonClient.getBlockingDeque(): 创建一个阻塞队列
redissonClient.getDelayedQueue(): 获取延迟队列
delayedQueue.offer(): 向队列中存储数据
blockingDeque.take(): 从队列中获取数据
但是我感觉延迟队列不太合适,你把消息放到延迟队列里了,但是消息订单被正常处理了咋办。还会15min后继续从延迟队列里出来吗。—-应该也没有问题,做好业务校验,我消费这个消息,发现订单已经被接单了,就不用再进行取消订单的逻辑了。
如果是Redis过期事件监听的做法
Redis 中有很多默认的 channel,这些 channel 是由 Redis 本身向它们发送消息的,而不是我们自己编写的代码。其中,__keyevent@0__:expired
就是一个默认的 channel,负责监听 key 的过期事件。也就是说,当一个 key 过期之后,Redis 会发布一个 key 过期的事件到__keyevent@<db>__:expired
这个 channel 中。(和死信队列功能一样)
我们只需要监听这个 channel,就可以拿到过期的 key 的消息,进而实现了延时任务功能。
这个功能被 Redis 官方称为 keyspace notifications ,作用是实时监控实时监控 Redis 键和值的变化。
Redis实现的延迟队列,时效性差一些:过期事件消息是在 Redis 服务器删除 key 时发布的,而不是一个 key 过期之后就会就会直接发布。
而删除采用的是定期删除+惰性删除 。这样就会存在我设置了 key 的过期时间,但到了指定时间 key 还未被删除,进而没有发布过期事件的情况。
Redis 的 pub/sub 模式中的消息并不支持持久化,这与消息队列不同。当没有订阅者时,消息会被直接丢弃,在 Redis 中不会存储该消息。
Redis 的 pub/sub 模式目前只有广播模式,这意味着当生产者向特定频道发布一条消息时,所有订阅相关频道的消费者都能够收到该消息。这会造成多服务实例下消息重复消费。
如果是基于Redisson的延迟队列
Redisson 的延迟队列 RDelayedQueue 是基于 Redis 的 SortedSet 来实现的。设置相应的过期时间作为分数。
Redisson 使用 zrangebyscore
命令扫描 SortedSet 中过期的元素,然后将这些过期元素从 SortedSet 中移除,并将它们加入到就绪消息列表中。就绪消息列表是一个阻塞队列,有消息进入就会被监听到。这样做可以避免对整个 SortedSet 进行轮询,提高了执行效率。
DelayedQueue 中的消息会被持久化,不会丢消息。每个客户端都是从同一个目标队列中获取任务的,不存在重复消费的问题。
跟 Redisson 内置的延时队列相比,消息队列可以通过保障消息消费的可靠性、控制消息生产者和消费者的数量等手段来实现更高的吞吐量和更强的可靠性,实际项目中首选使用消息队列的延时消息这种方案。
在订单生成时,我们将订单关闭消息发送到 RocketMQ,并设置消息的延迟时间为十分钟。RocketMQ 支持设置消息的延迟时间,可以通过设置消息的 delayLevel 来指定延迟级别,每个级别对应一种延迟时间。这样,订单关闭消息将在十分钟后自动被消费者接收到。
public class ProducerExample {
public static void main(String[] args) throws Exception {
// 创建生产者实例,并设置生产者组名
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置 Name Server 地址,此处为示例,实际使用时请替换为真实的 Name Server 地址
producer.setNamesrvAddr("192.168.179.130:9876");
producer.start();
try {
// 创建消息实例,指定 topic、Tag和消息体
Message msg = new Message("TestTopic", "TagA", ("Hello RocketMQ").getBytes());
msg.setDelayTimeLevel(14);
// 发送消息并获取发送结果
SendResult sendResult = producer.send(msg);
System.out.println("Message sent: " + new String(msg.getBody()));
System.out.println("Send result: " + sendResult);
} catch (Exception e) {
e.printStackTrace();
System.out.println("Message sending failed.");
} finally {
// 关闭生产者
producer.shutdown();
}
}
}
设置对应的延迟等级。一共18个等级
投递等级(delay level) | 延迟时间 | 投递等级(delay level) | 延迟时间 |
---|---|---|---|
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
public class ConsumerExample {
public static void main(String[] args) throws Exception {
// 创建消费者实例,并设置消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置 Name Server 地址,此处为示例,实际使用时请替换为真实的 Name Server 地址
consumer.setNamesrvAddr("192.168.179.130:9876");
// 订阅指定的主题和标签(* 表示所有标签)
consumer.subscribe("TestTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Consumer started.");
}
}