一:高并发点
高并发出现在秒杀详情页,主要可能出现高并发问题的地方有:秒杀地址暴露、执行秒杀操作。
二:静态资源访问(页面)优化——CDN
CDN,内容分发网络。我们把静态的资源(html/css/js)放在CDN上,以加快用户获取数据的速度。
用户访问页面时,优先从最近的CDN服务器上获取页面资源,而不是从单一的网站服务器上获取。只有CDN获取不到才会访问后端服务器。
因此,我们可以使用CDN进行网站的加速优化,把静态资源(或某些动态资源)推送到CDN站点上。(大公司自己搭建CDN网络,小公司可以租用)
三:服务器资源访问(地址暴露)优化——Redis缓存
在第一次访问时,从数据库中查找,然后放到redis缓存中,在之后的访问可以直接从缓存中查找。
redis可以支持10W的并发访问,集群化后甚至可以支持百万级的并发访问。
一致性维护低:如果内容发生变化,只需修改一下数据库,改变一下缓存即可。
四:服务器数据库操作(秒杀操作)优化——消息队列
顶层,用redis或NoSQL为每一个数据库记录实现原子计数器,原子计数器记录了数据库中对应商品的库存量,而redis则提供了高并发的访问修改操作;
原子计数器修改成功后,产生一条行为消息——“谁,修改了什么”,然后把这条消息加到消息队列中,消息队列有RabbitMQ,RocketMQ,kafka等。
底层,从消息队列中提取消息进行实际的数据库操作,把数据修改在数据库中实现。
解惑:为何用NoSQL进行并发修改再同步到Mysql,而不是直接在Mysql进行?——因为Mysql低效。
1:Mysql事务管理的控制方案是——行级锁。
因此,一个记录在被操作时,数据库表对应行被加锁,那么此时其他对这行进行操作的事务只能等待。这就造成了效率低。
2:如果是用Java操作mysql修改数据库的话,sql语句传输会有网络延迟,JVM的GC会占去一定时间,commit/rollback耗时。
优化Mysql事务竞争:减少事务锁持有时间
数据库操作是否成功,分为两步:一是数据库update语句的执行,二是执行结果在客户端得到确认。
而延迟就发生在与客户端来回传输之间。
优化就是:使用存储过程,把整个事务(执行与确认)在mysql端完成。
五:地址暴露接口访问优化——Redis编码
1:下载,安装redis。
2:打开项目pom.xml,添加Java访问Redis的依赖包Jedis:
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
3:redisDao编码
Redis缓存主要作用是,把一下不经常变化的数据从数据库中取出来放在缓存中,以支持高并发的访问而无需高并发地查询数据库。
通常缓存优化的编码逻辑为:
get from cache
if(null)
get from database
put into cache
else
do Logic
首先,在dao包下新建cache包,用于存放对缓存操作的dao,cache包下新建RedisDao类:
package org.myseckill.dao.cache; import org.myseckill.entity.Seckill;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtobufIOUtil;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.runtime.RuntimeSchema; import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool; public class RedisDao { private Logger logger = LoggerFactory.getLogger(this.getClass());
private JedisPool jedisPool; //构造函数,创建一个连接到ip的port端口的redis池
public RedisDao(String ip,int port){
jedisPool=new JedisPool(ip, port);
} //protostuff通过类的schema来进行序列化,所以传入要序列化的类创建一个自定义的schema。
private RuntimeSchema<Seckill> schema=RuntimeSchema.createFrom(Seckill.class);
//get from cache
public Seckill getSeckill(long seckillId){
try {
Jedis jedis=jedisPool.getResource();
try {
String key = "seckill:"+seckillId;
//get byte[]——>反序列化得到 object(seckill)。因此,我们需要把Seckill对象序列化
//实现serializable接口是Java自带的序列化,效果不是很好
//这里采用第三方的自定义序列化工具类protostuff
//通过key来查询对象,对象在cache中以序列化形式存在,所以返回的是字节数组
byte[] bytes=jedis.get(key.getBytes());
if(bytes!=null){//字节数组非空,则查到了对象
//创建一个空对象,用于接收转换后的结果
Seckill seckill=schema.newMessage();
//把字节数组根据schema转换成对象,传到空对象中——反序列化
ProtostuffIOUtil.mergeFrom(bytes, seckill, schema);
return seckill;
} } finally{
jedis.close();
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
return null;
} //put into cache
public String putSeckill(Seckill seckill){
//把对象序列化——>把字节数组写入redis
try {
Jedis jedis=jedisPool.getResource();
try{
String key="seckillId:"+seckill.getSeckillId();
byte[] bytes=ProtostuffIOUtil.toByteArray(seckill, schema, LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE)); //定义超时缓存
int timeout=60*60;
//写入缓存
String result=jedis.setex(key.getBytes(), timeout, bytes);
return result;
}finally{
jedis.close();
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
return null;
}
}
然后,在spring-dao.xml中注册RedisDao
<!-- 5:注册redisDao -->
<bean id="redisDao" class="org.myseckill.dao.cache.RedisDao">
<!-- 构造方法注入 -->
<constructor-arg index="0" value="localhost"/>
<constructor-arg index="1" value="6379"/>
</bean>
4:使用redisDao优化地址暴露
在Service中注入redisDao,使用redisDao改造 exportSeckillUrl():
@Autowired
private RedisDao redisDao;
public Exposer exportSeckillUrl(long seckillId) {//1:先访问redis
Seckill seckill=redisDao.getSeckill(seckillId);
if(seckill==null){//缓存中没有
//2:访问数据库
seckill=seckillDao.queryById(seckillId);
if(seckill==null){//没有这个产品的秒杀记录,不进行暴露
return new Exposer(false, seckillId);
}
}else{
//3:数据库中有,则查出来后放入redis
redisDao.putSeckill(seckill);
} Date now=new Date();
Date start=seckill.getStartTime();
Date end=seckill.getEndTime();
//若时间非法,不秒杀
if(now.getTime()<start.getTime() || now.getTime()>end.getTime()){
return new Exposer(false, seckillId, now.getTime(), start.getTime(), end.getTime());
}
//否则,进行秒杀网址暴露
String md5=getMD5(seckillId);
return new Exposer(true, md5, seckillId);
}
六:秒杀操作并发优化
秒杀操作的并发瓶颈在于行级锁的持有时间以及传输期间的网络延迟,
在update操作时,数据库加行级锁,期间又进行了insert成功秒杀记录的操作,最后事务提交或回滚才释放行级锁。
一种简单优化就是,把insert放前面,update在后面,通过insert是否成功来判断是否执行update。
这样可以减少行级锁的持有时间。
public SeckillExecution executeSeckill(long seckillId, long userPhone, String md5)
throws SeckillException, RepeatKillException, SeckillException {
if(md5==null||!md5.equals(getMD5(seckillId))){
throw new SeckillException("seckill data rewrite");
}
//执行秒杀逻辑:减库存+记录购买行为
try {
//记录购买行为
int insertCount = successKilledDao.insertSuccessKilled(seckillId, userPhone);
if(insertCount <= 0 ){
//重复秒杀
throw new RepeatKillException("seckill repeated");
}else{
//减库存,热点商品竞争(高并发点)
int updateCount = seckillDao.reduceNumber(seckillId, new Date());
if(updateCount<=0){
//没有更新到记录,秒杀结束,rollback
throw new SeckillClosedException("seckill is closed");
}else{
//秒杀成功,commit
SuccessKilled successKilled = successKilledDao.queryByIdWithSeckill(seckillId, userPhone);
return new SeckillExecution(seckillId,SeckillStateEnum.SUCCESS,successKilled);
}
} } catch(SeckillClosedException e1){
throw e1;
} catch(RepeatKillException e2){
throw e2;
}catch (Exception e) {
logger.error(e.getMessage(),e);
//所有异常转化为运行期异常
throw new SeckillException("seckill inner error:"+e.getMessage());
}
}
深度优化,把update和insert都放在mysql端执行,而无需客户端传输指令过来——使用存储过程
在resources下新建一个sql文件——seckill.sql,在其中书写存储过程。
--秒杀执行的存储过程
DELIMITER $$ -- console ; 转换为 $$
--定义存储过程
--参数: in输入参数;out输出参数
--row_count():返回上一条修改类型(delete,insert,update)的影响行数
--row_count: 0:表示未修改数据;>0:表示修改的行数;<0:sql错误/未执行修改sql
CREATE PROCEDURE `seckill`.`execute_seckill`
(in v_seckill_id bigint,in v_phone bigint,
in v_kill_time timestamp, out r_result int)
BEGIN
DECLARE insert_count int DEFAULT 0;
START TRANSACTION;
insert ignore into success_killed
(seckill_id,user_phone,create_time)
value (v_seckill_id,v_phone,v_kill_time);
select row_count() into insert_count;
IF (insert_count=0) THEN
ROLLBACK;
set r_result = -1;
ELSEIF(insert_count<0) THEN
ROLLBACK;
set r_result = -2;
ELSE
update seckill
set number = number-1
where seckill_id=v_seckill_id
and v_kill_time > start_time
and v_kill_time < end_time
and number > 0;
select row_count() into insert_count;
IF (insert_count = 0 ) THEN
ROLLBACK;
set r_result = 0;
ELSEIF (insert_count <0) THEN
ROLLBACK;
set r_result = -2;
ELSE
COMMIT;
set r_result = 1;
END IF;
END IF;
END;
$$
--存储过程定义结束 --调用方式举例:
set @r_result=-3
call execute_seckill(1003,15764210366,now(),@r_result);
select @r_result; --存储过程
--1.存储过程优化:事务行级锁持有的时间
--2.不要过度依赖存储过程
--3.简单的逻辑可以应用存储过程
--4.QPS 一个秒杀单6000/QPS
--QPS:每秒查询率QPS是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。
在数据库执行该脚本,生成存储过程。
然后在seckillDao类中新增一个接口,调用存储过程执行秒杀操作:
//使用存储过程执行秒杀
void killByProcedure(Map<String,Object> paramMap);
修改seckillDao.xml,书写接口的实现语句:
<!-- mybatis调用存储过程 -->
<select id="killByProcedure" statementType="CALLABLE">
call execute_seckill(
#{seckillId,jdbcType=BIGINT,mode=IN},
#{phone,jdbcType=BIGINT,mode=IN},
#{killTime,jdbcType=TIMESTAMP,mode=IN},
#{result,jdbcType=INTEGER,mode=OUT}
)
</select>
最后修改seckillSerivice,增加一个 executeSeckillByProcedure(long seckillId, long userPhone, String md5) 接口。
修改pom.xml,引入maputils工具类所在依赖包:
<!-- MapUtils -->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2</version>
</dependency>
在seckillServiceImpl实现类中实现该方法,调用数据库存储过程来进行秒杀操作:
@Override
public SeckillExecution executeSeckillByProcedure(long seckillId,
long userPhone, String md5) {
if(md5==null||!md5.equals(getMD5(seckillId))){
return new SeckillExecution(seckillId,SeckillStateEnum.DATA_REWRITE);
} Date killTime = new Date();
Map<String,Object> map = new HashMap<String,Object>();
map.put("seckillId", seckillId);
map.put("phone",userPhone);
map.put("killTime", killTime);
map.put("result", null); try {
seckillDao.killByProcedure(map);
int result = MapUtils.getInteger(map, "result", -2);//result默认为-2
if(result==1){
//秒杀成功
SuccessKilled sk = successKilledDao.queryByIdWithSeckill(seckillId, userPhone);
return new SeckillExecution(seckillId,SeckillStateEnum.SUCCESS,sk);
}else{
return new SeckillExecution(seckillId,SeckillStateEnum.stateOf(result));
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new SeckillExecution(seckillId,SeckillStateEnum.INNER_ERROR);
}
}
最后,修改controller,改为调用 executeSeckillByProcedure() 方法:
public SeckillResult<SeckillExecution> execute(@PathVariable Long seckillId, @PathVariable String md5,@CookieValue(value = "killPhone", required = false) Long userPhone) {
if(userPhone==null){
return new SeckillResult<SeckillExecution>(false, "未注册");
}
try {
SeckillExecution execution=seckillService.executeSeckillByProcedure(seckillId, userPhone, md5);
return new SeckillResult<SeckillExecution>(true, execution);
} catch (RepeatKillException e) {
SeckillExecution execution = new SeckillExecution(seckillId, SeckillStateEnum.REPEAT_KILL);
return new SeckillResult<SeckillExecution>(true,execution);
} catch (SeckillException e) {
SeckillExecution execution = new SeckillExecution(seckillId, SeckillStateEnum.END);
return new SeckillResult<SeckillExecution>(true,execution);
} catch (Exception e){
SeckillExecution execution = new SeckillExecution(seckillId, SeckillStateEnum.INNER_ERROR);
return new SeckillResult<SeckillExecution>(false,execution);
}
}
七:部署架构分析
CDN加速:内容分发网络,把静态的页面、css、js等资源推送到CDN结点,接口用户访问。
web服务器:用Nginx集群实现反向代理和负载均衡:nginx服务器作为http代理服务器以供用户访问,同时把请求分发给后端真正的服务器Tomcat/Jetty(负载均衡:尽量平均地分发这些请求),然后把后端服务器上的处理结果返回给相应客户端(反向代理)。
Redis缓存:热点数据的快速存取,支持高并发访问这些热点数据。
Mysql存储过程:把一个事务中多条sql语句放在一个存储过程来执行,减少了多条语句在服务器与数据库之间网络传输的时间延迟,优化了行级锁持有时间。
架构工作流程分析:
1:客户请求流量首先访问CDN站点,获取页面等静态资源;
2:调用服务器端的相关请求时,通过DNS访问到Nginx代理服务器;
3:Nginx集群通过分发,把请求转发给稍微空闲的后端服务器Jetty;
4:后端服务器执行逻辑操作,存取数据时,首先访问redis集群;
5:如果redis集群没有,则访问数据库。数据库集群利用分库、分表的方式提高访问效率;
6:最后,可以使用hadoop等对数据库中的数据进行统计分析。
总结:
项目已上传我的github:
项目来源:慕课网zhangyijun老师,十分感谢!真的是受益良多!
附上老师的慕课主页:http://www.imooc.com/u/2145618/courses?sort=publish