Redis:
incr key 自增value (业务场景:分库分表可以用来生成表的自增ID(会有浪费资源的问题,数据量变大有压力),文章的阅读数量)
incrby key N数量 自增N个value 可以解决分库分表产生的数据量压力问题
decr key 将 key 中储存的数字值减一。
decr key N数量 所储存的值减去给定的减量值 。
zset
默认内部实现算法是压缩列表"ziplist"
跳表算法:将有序链表改造为支撑"折半查找算法",可以进行快速的插入,删除,查找操作
缓存穿透:
访问一个缓存和数据库都不存在的 key,此时会直接打到数据库上,并且查不到数据,没法写缓存,所以下一次同样会打到数据库上。
解决方案:
1、接口校验。在正常业务流程中可能会存在少量访问不存在 key 的情况,但是一般不会出现大量的情况,所以这种场景最大的可能性是遭受了非法攻击。可以在最外层先做一层校验:用户鉴权、数据合法性校验等,例如商品查询中,商品的ID是正整数,则可以直接对非正整数直接过滤等等。
2、缓存空值。当访问缓存和DB都没有查询到值时,可以将空值写进缓存,但是设置较短的过期时间,该时间需要根据产品业务特性来设置。
缓存击穿:
某一个热点 key,在缓存过期的一瞬间,同时有大量的请求打进来,由于此时缓存过期了,所以请求最终都会走到数据库,造成瞬时数据库请求量大、压力骤增,甚至可能打垮数据库。
解决方案:
1、加互斥锁。在并发的多个请求中,只有第一个请求线程能拿到锁并执行数据库查询操作,其他的线程拿不到锁就阻塞等着,等到第一个线程将数据写入缓存后,直接走缓存。
关于互斥锁的选择,选择 Redis 分布式锁,因为这个可以保证只有一个请求会走到数据库,这是一种思路。
但是其实仔细想想的话,这边其实没有必要保证只有一个请求走到数据库,只要保证走到数据库的请求能大大降低即可,所以还有另一个思路是 JVM 锁。
JVM 锁保证了在单台服务器上只有一个请求走到数据库,通常来说已经足够保证数据库的压力大大降低,同时在性能上比分布式锁更好。
需要注意的是,无论是使用“分布式锁”,还是“JVM 锁”,加锁时要按 key 维度去加锁。
2、热点数据不过期。直接将缓存设置为不过期,然后由定时任务去异步加载数据,更新缓存。
这种方式适用于比较极端的场景,例如流量特别特别大的场景,使用时需要考虑业务能接受数据不一致的时间,还有就是异常情况的处理,不要到时候缓存刷新不上,一直是脏数据,那就凉了。
缓存雪崩:
大量的热点 key 设置了相同的过期时间,导在缓存在同一时刻全部失效,造成瞬时数据库请求量大、压力骤增,引起雪崩,甚至导致数据库被打挂。
缓存雪崩其实有点像“升级版的缓存击穿”,缓存击穿是一个热点 key,缓存雪崩是一组热点 key。
解决方案:
1、过期时间打散。既然是大量缓存集中失效,那最容易想到就是让他们不集中生效。可以给缓存的过期时间时加上一个随机值时间,使得每个 key 的过期时间分布开来,不会集中在同一时刻失效。
2、热点数据不过期。该方式和缓存击穿一样,也是要着重考虑刷新的时间间隔和数据异常如何处理的情况。
3、加互斥锁。该方式和缓存击穿一样,按 key 维度加锁,对于同一个 key,只允许一个线程去计算,其他线程原地阻塞等待第一个线程的计算结果,然后直接走缓存即可。
线程状态
6个状态定义:java.lang.Thread.State
1.New:尚未启动的线程的线程状态。
2.Runnable:可运行线程的线程状态,等待CPU调度。
3.Blocked:线程阻塞等待监视器锁定的线程状态。处于synchronized同步代码块或方法中被阻塞。
4.Waiting:等待线程的线程状态。下列不带超时的方式:
Object.wait、Thread.join、LockSupport.park
5.Timed Waiting:具有指定等待时间的等待线程的线程状态。下列带超时的方式:
Thread.sleep、Object.wait、Thread.join、LockSupport.parkNanos、LockSupport.parkUntil
6.Terminated:终止线程的线程状态。线程正常完成执行或者出现异常。
线程中止 Thread.interrupt()
如果目标线程在调用Object class的wait()、wait(long)或wait(long,int)方法、join()、join(long,int)或sleep(long,int)方法时被阻塞,那么Interrupt会生效,该线程的中断状态将被清除,抛出InterruptedException异常。
如果目标线程是被I/0或者NIO中的Channel所阻塞,同样,I/O操作会被中断或者返回特殊异常值。达到终止线程的目的。
如果以上条件都不满足,则会设置此线程的中断状态。
线程停止/唤醒
wait / notify 必须在synchronized(this)锁块里面执行,会自动释放锁 不然会抛出异常,执行顺序必须先wait再notify 不然会导致一直等待
park / unpark 不能在synchronized(this)锁块里面执行不会自动释放锁
线程伪唤醒
官方建议应该在循环中检查等待条件,原因是处于等待状态的线程可能会收到错误警报和伪唤醒,如果不在循环中检查等待条件,程序就会在没有满足结束条件的情况下退出。
伪唤醒是指线程并非因为notify、notifyall、unpark等api调用而唤醒,是更底层原因导致的。
线程封闭
数据都被封闭在各自的线程之中,就不需要同步,这种通过将数据封闭在线程中而避免使用同步的技术称为线程封闭。
线程封闭具体的体现有:ThreadLocal、局部变量
用法:ThreadLocal-T> var=new ThreadLocal-T>();会自动在每一个线程上创建一个T的副本,副本之间彼此独立,互不影响。
可以用ThreadLocal存储一些参数,以便在线程中多个方法中使用,用来代替方法传参的做法。
线程安全
sun.misc.Unsafe 类: Unsafe类的CAS操作可能是用的最多的,它为Java的锁机制提供了一种新的解决办法,比如AtomicInteger等类都是通过该方法来实现的。compareAndSwap方法是原子的,可以避免繁重的锁机制,提高代码效率。这是一种乐观锁,通常认为在大部分情况下不出现竞态条件,如果操作失败,会不断重试直到成功。
LongAdder 类:分布式计数器,多个线程之间自主保存计数结果,只有在sum方法才计算结果
锁消除=自动优化掉一些没有线程安全的syn锁,锁粗化=多把锁合并为1把锁, 都是jit为了提升性能而自动优化的 JVM优化的偏向锁:偏向标记第一次有用,出现过争用后就没用了。-XX:-UseBiasedLocking禁用使用偏置锁定,偏向锁,本质就是无锁,如果没有发生过任何多线程争抢锁的情况,JVM认为就是单线程,无需做同步(jvm为了少干活:同步在JVM底层是有很多操作来实现的,如果 是没有争用,就不需要去做同步操作)
线程volatile修饰 变量的内存可见性,线程代码重排序
vm参数
被volatile修饰的共享变量,就具有了以下两点特性:
1.保证了不同线程对该变量操作的内存可见性;
2.禁止指令重排序
如线程中出现如下代码
private boolean flag = true;
run(){
while(flag){
xxx
}
}
sleep(2000)
flag = false;
没有加volatile修饰可能会导致死循环,因为CPU重排序会把while(flag)改为while(true)
悲观锁,乐观锁:
悲观锁:假定会发生并发冲突,同步所有对数据的相关操作,从读数据就开始上锁。
乐观锁:假定没有冲突,在修改数据时如果发现数据和之前获取的不一致,则读最新数据,修改后重试修改。
Mybatis源码分析
Mybatis是一个ORM(持久化对象关系映射)框架
操作数据库步骤 --> 解析配置连接数据库源 --> 加载mapping --> 执行语句
有4种加载mappings配置方法 package ->resource->class->url
操作执行器:org.apache.ibatis.session {
// 简单
SIMPLE,
// 复用
REUSE,
// 批量
BATCH
}
Java动态代理
1、JDK动态代理
利用拦截器(拦截器必须实现InvocationHanlder)加上反射机制生成一个实现代理接口的匿名类,
在调用具体方法前调用InvokeHandler来处理。
2、CGLiB动态代理
利用ASM开源包,对代理对象类的class文件加载进来,通过修改其字节码生成子类来处理。
3、何时使用JDK还是CGLiB?
1)如果目标对象实现了接口,默认情况下会采用JDK的动态代理实现AOP。
2)如果目标对象实现了接口,可以强制使用CGLIB实现AOP。
3)如果目标对象没有实现了接口,必须采用CGLIB库,Spring会自动在JDK动态代理和CGLIB之间转换。
4、如何强制使用CGLIB实现AOP?
1)添加CGLIB库(aspectjrt-xxx.jar、aspectjweaver-xxx.jar、cglib-nodep-xxx.jar)
2)在Spring配置文件中加入<aop:aspectj-autoproxy proxy-target-class="true"/>
5、JDK动态代理和CGLIB字节码生成的区别?
1)JDK动态代理只能对实现了接口的类生成代理,而不能针对类。
2)CGLIB是针对类实现代理,主要是对指定的类生成一个子类,覆盖其中的方法,
并覆盖其中方法实现增强,但是因为采用的是继承,所以该类或方法最好不要声明成final,
对于final类或方法,是无法继承的。
6、CGlib比JDK快?
1)使用CGLib实现动态代理,CGLib底层采用ASM字节码生成框架,使用字节码技术生成代理类,在jdk6之前比使用Java反射效率要高。唯一需要注意的是,CGLib不能对声明为final的方法进行代理,因为CGLib原理是动态生成被代理类的子类。
2)在jdk6、jdk7、jdk8逐步对JDK动态代理优化之后,在调用次数较少的情况下,JDK代理效率高于CGLIB代理效率,只有当进行大量调用的时候,jdk6和jdk7比CGLIB代理效率低一点,但是到jdk8的时候,jdk代理效率高于CGLIB代理,总之,每一次jdk版本升级,jdk代理效率都得到提升,而CGLIB代理消息确有点跟不上步伐。
7、Spring如何选择用JDK还是CGLiB?
1)当Bean实现接口时,Spring就会用JDK的动态代理。
2)当Bean没有实现接口时,Spring使用CGlib是实现。
3)可以强制使用CGlib(在spring配置中加入<aop:aspectj-autoproxy proxy-target-class="true"/>)。
AQS抽象队列同步器
提供了对资源占用、释放,线程的等待、唤醒等等接口和具体实现。
可以用在各种需要控制资源争用的场景中。(ReentrantLock/CountDownLatch/Semphore)
Rabbit:
如何保证消息被消费成功
1.定时任务去读取数据库中处理失败的数据,重新提交给消息队列,加上重试次数字段,达到N次处理失败情况后通知人工处理
如何保证消息不会被重复消费(集群架构下)
1.乐观锁,在数据库中标记一个版本属性,在更新时代码将版本号+1 where 条件再加上读取出来的老版本
接口幂等性(一次调用跟N次调用结果是相同的):
非幂等性:UPDATE tab1SET col1=col1+1WHERE col2=2,每次执行的结果都会发生变化,不是幂等的。
解决方案:
1.Token机制(可以随机生成一个Token返回)
步骤 (获取Token 对比Token 删除Token),这整个步骤必须要是原子性的,所以可以用 redis的lua脚本来执行
if redis. call("get', KEYS[1])==ARGV[1] then return redis. call("del", KEYS[1]) else return 0 end
2.乐观锁,在数据库中标记一个版本属性,在更新时代码将版本号+1 where 条件再加上读取出来的老版本
3.redis防重 使用redis的set功能 将处理的数据加密后加入到redis里面,每次处理数据前先判断是不是已经被处理过了
JVM启动参数:
-Xms = 堆内存最小空间
-Xmx = 堆内存最大空间
老年代(old),与新生代(eden) 堆内存分配比是 old:2 eden:1
-Xss = 分配线程内存大小
-Xmn = 年轻代的内存大小(年轻代下面的空间是 eden,S0,S1 分配空间比例是 8:1:1 2G的内存大小分配为:eden=1.6G S0=200M S1=200M)
-XX:MetaspaceSize = 元空间GC初始阈值 MetaspaceSize表示的并非是元空间的大小,它的含义是:主要控制matesaceGC发生的初始阈值,也就是最小阈值。也就是说当使用的matespace空间到达了MetaspaceSize的时候,就会触发Metaspace的GC
-XX:MaxMetaspaceSize = 在jvm启动的时候,并不会分配MaxMetaspaceSize这么大的一块内存出来,metaspace是可以一直扩容的,直到到达MaxMetaspaceSize。
java -Xms3G -Xmx3G -Xmn2G -Xss1M -XX:MetaspaceSize=512M -XX:MaxMetaspaceSize=512M -jar microservice-eureka-server.jar
JDK自带的JVM性能分析工具
jvisualvm
B树与B+树的区别:
索引是:Key:Value
B树:
B树允许一个节点存放多个数据,这样可以使更小的树的深度存放更多的数据。但是,B树的一个节点中到底能存放多少个数据,决定了树的深度
Mysql默认情况下一个节点的大小是16K
通过索引的 Key:Value 占用大小大致计算,B数存储的节点是非常有限的
B+树:
非叶子节点冗余了叶子节点中的索引Key
叶子节点是从小到大,从左到右排列的
叶子节点之间提供了指针,提高了区间访问的性能
只有叶子节点才存放数据,非叶子节点不存放数据只存放索引Key
因为非叶子节点只存放索引Key,所以B+树一个节点可以存放大量的索引Key,3层结构最多可以存放2000w级别的数据
数据结构:树(Tree)
floor:向下取整,去除小数点后面的数字(2.5转换后成为2,2.9999也是2)
ceiling:向上取整,去除小数点后面的数字后+1(3.2转换后变成4)
树的基本概念:
节点、根节点、父节点、子节点、兄弟节点
一棵树可以没有任何节点,称为空树
一棵树可以只有1个节点,也就是只有根节点
子树、左子树、右子树
节点的度(degree):子树的个数
树的度:所有节点度中的最大值
叶子节点(leaf):度为0的节点
非叶子节点:度不为0的节点
二叉树:
每个节点最多拥有2颗子树
左子树和右子树是有顺序的
即使某节点只有一棵子树,也要区分左右子树
二叉树是有序树
真二叉树:
节点的度要么是0要么是2
满二叉树:
所有节点的度要么为0,要么为2,且所有的叶子节点都在最后一层。
在同样高度的二叉树中,满二叉树的叶子节点数量最多、总节点数量最多
满二叉树一定是真二叉树,真二叉树不一定是满二叉树
完全二叉树:
叶子节点只会出现最后2层,且最后1层的叶子结点都靠左对齐
完全二叉树从根结点至倒数第2层是一棵满二叉树
满二叉树一定是完全二叉树,完全二叉树不一定是满二又树
如果i=1,它是根节点
如果i>1,它的父节点编号为floor(i/2)
如果2i <= n,它的左子节点编号为2i
如果2i > n,它无左子节点
如果2i+1 <= n,它的右子节点编号为2i+1
如果2i+1 > n,它无右子节点
面试题:
如果一棵完全二叉树有768个节点,求叶子节点的个数?
假设叶子节点个数为n0,度为1的节点个数为n1,度为2的节点个数为n2
总结点个数n=n0+n1+n2,而且n0=n2+1 n=2n0+n1-1
完全二叉树的n1要么为0,要么为1
n1为1时,n=2n0,n必然是偶数
叶子节点个数n0=n/2,非叶子节点个数n1+n2=n/2
n1为0时,n=2n0-1,n必然是奇数
叶子节点个数n0=(n+1)/2,非叶子节点个数n1+n2=(n-1)/ 2
叶子节点个数n0=floor((n+1)/2)=ceiling(n/2)
非叶子节点个数n1+n2=floor(n/2)=ceiling((n-1)/2)
因此叶子节点个数为384
二叉搜索树(又称:二叉查找树,二叉排序树):
任意一个节点的值都大于其左子树所有节点的值
任意一个节点的值都小于其右子树所有节点的值
它的左右子树也是一棵二叉搜索树
二叉搜索树可以大大提高搜索数据的效率
二叉搜索树存储的元素必须具备可比较性比如int、double等如果是自定义类型,需要指定比较方式
不允许为nul1
二叉树常见遍历方式:
口前序遍历(Preorder Traversal)
从根节点先访问左子树在访问右子树,先左边再右边
遍历应用:树状结构展示
口中序遍历(Inorder Traversal)
中序遍历先访问左子树,在访问根节点,在访问右子树,最后的结果是从小到大排列
遍历应用:从大到小排列或从小到大
口后序遍历(Postorder Traversal)
后序遍历先访问左子树,在访问右子树,在访问根节点
遍历应用:适用于先子后父的操作
口层序遍历(Level Order Traversal)
从根节点向下按层次依次访问左子树和右子树
遍历应用:计算二叉树的高度,判断是否为一棵完全二叉树
前驱节点 predecessor:中序遍历时的前一个节点
后继节点 successor:中序遍历时的后一个节点
平衡二叉搜索树(AVL树):
当树的数据是从小到大添加时,这时二叉树就会变退化成链表
平衡:当节点数量固定时,左右子树的高度越接近,这棵二又树就越平衡(高度越低)
改进方案是:在节点的添加、删除操作之后,想办法让二叉搜索树恢复平衡(减小树的高度)
添加导致的失衡:最坏情况:可能会导致所有祖先节点都失衡,父节点、非祖先节点,都不可能失衡(比如添加的数据在右子树导致失衡,左子树肯定不会失衡)
代码演示:
/**
* 层序遍历
*/
public void levelOrderTraversal(Visitor<E> eVisitor){
if(eVisitor == null) return;
levelOrderTraversal(root,eVisitor);
}
private void levelOrderTraversal(Node<E> node,Visitor<E> eVisitor){
if(node == null) return;
Queue<Node<E>> queue = new LinkedList<>();
queue.offer(node);
while (!queue.isEmpty()){
Node<E> poll = queue.poll();
if(eVisitor.stop) return;
eVisitor.visitor(poll.element);
if(poll.left != null) queue.offer(poll.left);
if(poll.right != null) queue.offer(poll.right);
}
}
/**
* 后序遍历
*/
public void postorderTraversal(Visitor<E> eVisitor){
if(eVisitor == null) return;
postorderTraversal(root , eVisitor);
}
private void postorderTraversal(Node<E> node , Visitor<E> eVisitor){
if(node == null){
return;
}
postorderTraversal(node.left , eVisitor);
postorderTraversal(node.right , eVisitor);
if(eVisitor.stop) return;
eVisitor.visitor(node.element);
}
/**
* 中序遍历
*/
public void inorderTraversal(Visitor<E> eVisitor){
if(eVisitor == null) return;
inorderTraversal(root , eVisitor);
}
private void inorderTraversal(Node<E> node,Visitor<E> eVisitor){
if(node == null){
return;
}
inorderTraversal(node.left , eVisitor);
if(eVisitor.stop) return;
eVisitor.visitor(node.element);
inorderTraversal(node.right , eVisitor);
}
/**
* 前序遍历
*/
public void preorderTraversal(Visitor<E> eVisitor){
if(eVisitor == null) return;
preorderTraversal(root , eVisitor);
}
private void preorderTraversal(Node<E> node,Visitor<E> eVisitor){
if(node == null){
return;
}
if(eVisitor.stop) return;
eVisitor.visitor(node.element);
preorderTraversal(node.left , eVisitor);
preorderTraversal(node.right , eVisitor);
}
/**
* 计算出指定节点的前驱节点
*/
public Node<E> predecessor(Node<E> node){
if(node.left != null) {
node = node.left;
while (node.right != null) node = node.right;
return node;
}else{
while (node.parent != null && node.parent.left == node) node = node.parent;
return node.parent;
}
}
/**
* 计算出指定节点的后继节点
*/
public Node<E> successor(Node<E> node){
if(node.right != null) {
node = node.right;
while (node.left != null) node = node.left;
return node;
}else{
while (node.parent != null && node.parent.right == node) node = node.parent;
return node.parent;
}
}
/**
* 递归获取树的高度
**/
public int height(Node<E> node){
if(node == null) return 0;
return 1 + Math.max(height(node.left) , height(node.right));
}
/**
* 迭代获取树的高度
*/
public int levelHeight(){
if(root == null) return 0;
int height = 0;
int num = 1;
Queue<Node<E>> queue = new LinkedList<>();
queue.offer(root);
while (!queue.isEmpty()){
Node<E> poll = queue.poll();
num--;
if(poll.left != null){
queue.offer(poll.left);
}
if(poll.right != null){
queue.offer(poll.right);
}
if(num == 0){
// num=下一层树的节点数
num = queue.size();
height++;
}
}
return height;
}
/**
* 判断是否为一棵完全二叉树
* 完全二叉树:
* 叶子节点只会出现最后2层,且最后1层的叶子结点都靠左对齐
* 完全二叉树从根结点至倒数第2层是一棵满二叉树
*/
public boolean isComplete(){
if(root == null) return false;
Queue<Node<E>> queue = new LinkedList<>();
queue.offer(root);
boolean leaf = false;
while (!queue.isEmpty()){
Node<E> poll = queue.poll();
if(leaf && !poll.isLeaf()) return false;
if(poll.left != null) queue.offer(poll.left);
else if(poll.right != null) return false; // poll.left==null
if(poll.right != null) queue.offer(poll.right);
else leaf = true; // poll.right==null 有左子树没右子树 那就表示接下来节点都是叶子节点
}
return true;
}
/**
* 删除节点
*/
private void remove(Node<E> node){
if(node == null) return;
// 度为2的节点
if(node.hasTwoChilds()){
// 找出后继节点
Node<E> sNode = successor(node);
// 用后继节点的值覆盖度为2的节点的值
node.element = sNode.element;
// 删除后继节点
node = sNode;
}
// 删除node节点(node的度必然是1或者0)
Node<E> replacement = node.left != null ? node.left : node.right;
if(replacement != null){
// 更改父节点
replacement.parent = node.parent;
// 更改父节点左子树右子树的指向
if(node.parent == null){
// node是度为1的节点 并且是根节点
root = replacement;
}else if(node == node.parent.left){
node.parent.left = replacement;
}else if(node == node.parent.right){
node.parent.right = replacement;
}
}else if(node.parent == null){
// replacement是叶子节点也是根节点
root = null;
}else{
// replacement=null node是叶子节点
if(node == node.parent.left){
node.parent.left = null;
}else{
node.parent.right = null;
}
}
size--;
}
/**
* 算法题:反转一棵二叉树
*/
public void fanzhuang(){
if(root == null) return ;
fanzhuang(root);
}
public void fanzhuang(Node<E> node){
if(node == null) return;
// 递归
/*Node<E> tmp = node.left;
node.left = node.right;
node.right = tmp;
fanzhuang(node.left);
fanzhuang(node.right);*/
// 迭代
Queue<Node<E>> queue = new LinkedList<>();
queue.offer(node);
while (!queue.isEmpty()){
Node<E> poll = queue.poll();
Node<E> tmp = poll.left;
poll.left = poll.right;
poll.right = tmp;
if(poll.left != null){
queue.offer(poll.left);
}
if(poll.right != null){
queue.offer(poll.right);
}
}
}