黑马Redis实战篇
参考文章链接:https://blog.csdn.net/weixin_51515308/article/details/128010464?spm=1001.2014.3001.5502
仅记录一些之前没见过的操作
一些mybatisplus操作也会记录
验证码
多台Tomcat不共享session存储空间,当请求切换到不同的tomcat服务时导致数据丢失的问题
所以我们把数据存入Redis,集群的Redis可以替代session
生成验证码,保存到session
//2.生成验证码:导入hutool依赖,内有RandomUtil
String code = RandomUtil.randomNumbers(6);
//3.保存验证码到session
session.setAttribute("code",code);
根据手机号查询用户
//4.一致,根据手机号查询用户(需要写对应的单表查询方法:select * from tb_user where phone = #{phone})
User user = query().eq("phone", phone).one();
发送验证码和用户登录
发送验证码:
//3.保存验证码到Redis
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY +phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);//有效期2mins
用户登录,获取验证码:
//2.从Redis中获取验证码
String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY+phone);
保存用户到Redis,而且只保存部分信息
//6.保存用户到Redis
//(1)生成token
String token = UUID.randomUUID().toString(true);//hutools
//(2)User转为HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user,UserDTO.class);
HashMap<Object, Object> userMap = new HashMap<>();
userMap.put("id", userDTO.getId().toString());
userMap.put("nickName", userDTO.getNickName());
userMap.put("icon", userDTO.getIcon());
//(3)存储到Redis
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);
//(4) 设置有效期
stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);
MvcConfig注入stringRedisTemplate,然后传给LoginInterceptor,因为LoginInterceptor不是bean不能用spring注入其他bean
拦截器进行的预处理-保存ThreadLocal和刷新有效期
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1.获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)) {
return true;
}
// 2.基于TOKEN获取redis中的用户
String key = LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
// 3.判断用户是否存在
if (userMap.isEmpty()) {
return false;
}
// 5.将查询到的hash数据转为UserDTO
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 6.存在,保存用户信息到 ThreadLocal
UserHolder.saveUser(userDTO);
// 7.刷新token有效期
stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
// 8.放行
return true;
}
而保存进ThreadLocal的东西已经被封装好了
public class UserHolder {
private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
public static void saveUser(UserDTO user){
tl.set(user);
}
public static UserDTO getUser(){
return tl.get();
}
public static void removeUser(){
tl.remove();
}
}
缓存
写入/写出redis,要序列化JSONUtil(⭐⭐⭐)
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result querygetById(Long id) {
//1.从Redis内查询商品缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
if(StrUtil.isNotBlank(shopJson)){
//手动反序列化
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//2.不存在就根据id查询数据库
Shop shop = getById(id);
if(shop==null){
return Result.fail("商户不存在!");
}
//3.数据库数据写入Redis
//手动序列化
String shopStr = JSONUtil.toJsonStr(shop);
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,shopStr,CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
}
}
查很多东西,并且写。设置有效时间。
@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result queryList() {
//1.从Redis中查询
String key = CACHE_SHOPTYPE_KEY;
List<String> list = stringRedisTemplate.opsForList().range(key, 0, -1);
if(!list.isEmpty()){
//手动反序列化
List<ShopType> typeList = new ArrayList<>();
for (String s : list) {
ShopType shopType = JSONUtil.toBean(s, ShopType.class);
typeList.add(shopType);
}
return Result.ok(typeList);
}
//2.从数据库内查询
List<ShopType> typeList = query().orderByAsc("sort").list();
if(typeList.isEmpty()){
return Result.fail("不存在该分类!");
}
//序列化
for (ShopType shopType : typeList) {
String s = JSONUtil.toJsonStr(shopType);
list.add(s);
}
//3.存入缓存
stringRedisTemplate.opsForList().rightPushAll(key,list);
stringRedisTemplate.expire(key,CACHE_SHOPTYPE_TTL,TimeUnit.MINUTES);
return Result.ok(list);
}
}
更新数据库的时候要删除缓存,以实现缓存和数据库双写一致
基于互斥锁解决缓存击穿问题
/**
* 获取互斥锁
*/
private boolean tryLock(String key) {
Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "1", TTL_TEN, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
/**
* 释放互斥锁
*/
private void unLock(String key) {
redisTemplate.delete(key);
}
/**互斥锁实现解决缓存击穿**/
public Shop queryWithMutex(Long id){
//1.从Redis内查询商品缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
if(StrUtil.isNotBlank(shopJson)){
//手动反序列化
return JSONUtil.toBean(shopJson, Shop.class);
}
//如果上面的判断不对,那么就是我们设置的""(有缓存"",证明数据库内肯定是没有的)或者null(没有缓存)
//判断命中的是否时空值
if(shopJson!=null){//
return null;
}
//a.实现缓存重建
//a.1 获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
Shop shop = null;
try {
boolean hasLock = tryLock(lockKey);
//a.2 判断是否获取到,获取到:根据id查数据库 获取不到:休眠
if(!hasLock){
Thread.sleep(50);
return queryWithMutex(id);
}
//2.不存在就根据id查询数据库
shop = getById(id);
//模拟重建的延时
Thread.sleep(200);
if(shop==null){
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
//3.数据库数据写入Redis
//手动序列化
String shopStr = JSONUtil.toJsonStr(shop);
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,shopStr,CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//释放互斥锁
unlock(lockKey);
}
return shop;
}
@Override
public Result querygetById(Long id) {
//缓存穿透
//Shop shop = queryWithPassThrough(id);
//互斥锁解决缓存击穿
Shop shop = queryWithMutex(id);
if(shop==null) return Result.fail("店铺不存在!");
return Result.ok(shop);
}
基于逻辑过期解决缓存击穿问题
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);//开启10个线程
/**逻辑过期实现解决缓存击穿**/
public Shop queryWithLogical(Long id){
//1.从Redis内查询商品缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
//2.判断是否存在
if(StrUtil.isBlank(shopJson)){
return null;
}
//3.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);
//4.判断是否过期
LocalDateTime expireTime = redisData.getExpireTime();
if(expireTime.isAfter(LocalDateTime.now())){
//未过期直接返回
return shop;
}
//5.过期的话需要缓存重建
//5.1 获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean hasLock = tryLock(lockKey);
//5.2判断是否获取到,获取到:根据id查数据库 获取不到:休眠
if(hasLock){
//成功就开启独立线程,实现缓存重建, 这里的话用线程池
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
//重建缓存
this.saveShop2Redis(id,20L);
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
//释放锁
unlock(lockKey);
}
});
}
return shop;
}
/**缓存重建方法**/
public void saveShop2Redis(Long id,Long expireSeconds) throws InterruptedException {
//1.查询店铺信息
Shop shop = getById(id);
Thread.sleep(200);
//2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
//3.写入Redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}
@Test
void testSaveShop() throws InterruptedException {
shopService.saveShop2Redis(1L,10L);
}
秒杀-分布式锁
一人一单的秒杀(不支持并发安全)
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始,是否结束
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已结束!");
}
//3.判断库存是否充足
if(voucher.getStock()<=0){
return Result.fail("优惠券库存不足!");
}
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {//userId一样的持有同一把锁,最好不要放在整个方法上,intern:去字符串常量池找相同字符串
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//获得代理对象
return proxy.createVoucherOrder(voucherId);//默认是this,我们要实现事务需要proxy
}//先获取锁,然后再进入方法,确保我的前一个订单会添加上,能先提交事务再释放锁
}
@Transactional
public Result createVoucherOrder(Long voucherId){
//查询订单看看是否存在
Long userId = UserHolder.getUser().getId();
if (query().eq("user_id",userId).eq("voucher_id",voucherId).count()>0) {
return Result.fail("用户已经购买过一次!");
}
//4.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId).gt("stock",0)//where id = ? and stock >0 添加了乐观锁
.update();
//5.创建订单
if(!success){
return Result.fail("优惠券库存不足!");
}
//6.返回订单id
VoucherOrder voucherOrder = new VoucherOrder();
//6.1订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2用户id
//Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3代金券id
voucherOrder.setVoucherId(voucherId);
//7.订单写入数据库
save(voucherOrder);
//8.返回订单Id
return Result.ok(orderId);
}
分布式锁Redisson
1.导入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
2.写一个配置类
@Configuration
public class RedissionConfig {
@Bean
public RedissonClient redissionClient(){
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.200.131:6379").setPassword("1234");
//创建RedissonClient对象
return Redisson.create(config);
}
}
3.在业务实现类中使用RedissonClient
@Autowired
private RedissonClient redissonClient;
业务方法{
//···前面流程
Long userId = UserHolder.getUser().getId();
//创建锁对象
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId,stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁
boolean hasLock = lock.tryLock( );
if(!hasLock){
//获取锁失败: return fail 或者 retry 这里业务要求是返回失败
return Result.fail("请勿重复下单!");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//获得代理对象
return proxy.createVoucherOrder(voucherId);//默认是this,我们要实现事务需要proxy
} catch (IllegalStateException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
看门狗watchdog
tryLock(waitTime,leaseTime,TimeUnit)
waitTime:获取锁的等待时长,获取锁失败后等待waitTime再去获取锁
leaseTime: 锁自动失效时间,这里测试锁重试不需要用到
WatchDog—–超时释放
对抢锁过程进行监听,抢锁完毕后,scheduleExpirationRenewal(threadId) 方法会被调用来对锁的过期时间进行续约,在后台开启一个线程,进行续约逻辑,也就是看门狗线程。
// 续约逻辑
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {… }, 锁失效时间 / 3, TimeUnit.MILLISECONDS);Method(new TimerTask(){}, 参数2, 参数3)
通过参数2、参数3 去描述,什么时候做参数1 的事情。
锁的失效时间为 30s,10s 后这个 TimerTask 就会被触发,于是进行续约,将其续约为 30s;
若操作成功,则递归调用自己,重新设置一个 TimerTask 并且在 10s 后触发;循环往复,不停的续约。
因为不想写lua脚本,也不想看redis自己实现的消息队列。所以这一章就结束了
达人探店
点赞功能
@Override
public Result likeBlog(Long id) {
//1.判断当前用户是否已点赞
Long userId = UserHolder.getUser().getId();
String key = BLOG_LIKED_KEY + id;
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
if(BooleanUtil.isFalse(isMember)){
//2.未点赞:数据库赞+1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
//3.用户信息保存到Redis的点赞set
if(isSuccess){
stringRedisTemplate.opsForSet().add(key,userId.toString());
}
}
else{
//4.已点赞:数据库-1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
//5.把用户信息从Redis的点赞set移除
if(isSuccess){
stringRedisTemplate.opsForSet().remove(key,userId.toString());
}
}
return Result.ok();
}
当我们点开一篇blog的时候就需要被看到是否点赞过,这就要求我们改一下queryBlogById(id)咯,当然isLikeBlog(blog)也是需要
@Override
public Result queryBlogById(Long id) {
//1.查询blog
Blog blog = getById(id);
if(blog==null){
return Result.fail("笔记不存在!");
}
//2.查询blog相关用户
queryBlogUser(blog);
//3.查询用户是否点过赞,其实就是给blog的isLike添加值
isLikeBlog(blog);
return Result.ok(blog);
}
private void isLikeBlog(Blog blog) {
Long userId = UserHolder.getUser().getId();
String key = BLOG_LIKED_KEY + id;
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
blog.setIsLike(BooleanUtil.isTrue(isMember));
}
@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(blog->{
this.queryBlogUser(blog);
this.isLikeBlog(blog);
});//就是用blog遍历的
return Result.ok(records);
}
排行榜
用zset
private void isBlogLiked(Blog blog) {
// 1.获取登录用户
UserDTO user = UserHolder.getUser();
if (user == null) {
// 用户未登录,无需查询是否点赞
return;
}
Long userId = user.getId();
// 2.判断当前登录用户是否已经点赞
String key = "blog:liked:" + blog.getId();
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
blog.setIsLike(score != null);
}
@Override
public Result likeBlog(Long id) {
// 1.获取登录用户
Long userId = UserHolder.getUser().getId();
// 2.判断当前登录用户是否已经点赞
String key = BLOG_LIKED_KEY + id;
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
if (score == null) {
// 3.如果未点赞,可以点赞
// 3.1.数据库点赞数 + 1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
// 3.2.保存用户到Redis的set集合 zadd key value score
if (isSuccess) {
stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
}
} else {
// 4.如果已点赞,取消点赞
// 4.1.数据库点赞数 -1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
// 4.2.把用户从Redis的set集合移除
if (isSuccess) {
stringRedisTemplate.opsForZSet().remove(key, userId.toString());
}
}
return Result.ok();
}
@Override
public Result queryBlogLikes(Long id) {
String key = BLOG_LIKED_KEY + id;
// 1.查询top5的点赞用户 zrange key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if (top5 == null || top5.isEmpty()) {
return Result.ok(Collections.emptyList());
}
// 2.解析出其中的用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
String idStr = StrUtil.join(",", ids);
// 3.根据用户id查询用户 WHERE id IN ( 5 , 1 ) ORDER BY FIELD(id, 5, 1)
List<UserDTO> userDTOS = userService.query()
.in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list()
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
// 4.返回
return Result.ok(userDTOS);
}
共同关注
普通的关注和取消关注也是写到了redis里
if(isSave){
//把被关注用户id放入Redis sadd follows:userId(key) followerId(value)
stringRedisTemplate.opsForSet().add(followKey,followUserId.toString());
}
if(isRemove) {
//把被关注用户id从Redis移除
stringRedisTemplate.opsForSet().remove(followKey, followUserId.toString());
}
共同关注
@Override
public Result followCommons(Long followUserId) {
//1.先获取当前用户
Long userId = UserHolder.getUser().getId();
String followKey1 = "follows:" + userId;
String followKey2 = "follows:" + followUserId;
//2.求交集
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(followKey1, followKey2);
if(intersect==null||intersect.isEmpty()){
return Result.ok(Collections.emptyList());
}
//3.解析出id数组
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
//4.根据ids查询用户数组 List<User> ---> List<UserDTO>
List<UserDTO> userDTOS = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(userDTOS);
}
滚动分页
通俗来说reverseRangeWithScore
函数就是先将指定key下的zset类型的有序集合按照score由大到小排序
public Result queryBloyOfFollow(Long max, Integer offset) {
//1.获取当前用户
Long userId = UserHolder.getUser().getId();
//2.查询当前用户收件箱 zrevrangebyscore key max min limit offset count
String feedKey = FEED_KEY + userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(feedKey, 0, max, offset, 2);
if(typedTuples==null||typedTuples.isEmpty()){
return Result.ok();
}
//3.解析出收件箱中的blogId,score(时间戳),offset
List<Long> ids = new ArrayList<>(typedTuples.size());
long minTime = 0;
int count = 1;//最小时间的相同个数
for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {
//3.1 获取id
ids.add(Long.valueOf(typedTuple.getValue()));//blog的id
//3.2 获取分数(时间戳)
long time = typedTuple.getScore().longValue();
if(time == minTime){
count++;
}else{
minTime = time;
count=1;
}
}
//4.根据blogId查找blog
String idStr = StrUtil.join(",",ids);
List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id, " + idStr + ")").list();
for (Blog blog : blogs) {
//4.1 查询blog有关的用户
queryBlogUser(blog);
//4.2 查询blog是否被点过赞
isLikeBlog(blog);
}
//5.封装并返回
ScrollResult r = new ScrollResult();
r.setList(blogs);
r.setOffset(count);
r.setMinTime(minTime);
return Result.ok(r);
}
还有bitmap、geohash、hyperloglog等数据结构,不常用就先不写了。