黑马Redis实战篇


黑马Redis实战篇

参考文章链接:https://blog.csdn.net/weixin_51515308/article/details/128010464?spm=1001.2014.3001.5502

仅记录一些之前没见过的操作

一些mybatisplus操作也会记录

验证码

多台Tomcat不共享session存储空间,当请求切换到不同的tomcat服务时导致数据丢失的问题

所以我们把数据存入Redis,集群的Redis可以替代session

生成验证码,保存到session

//2.生成验证码:导入hutool依赖,内有RandomUtil
String code = RandomUtil.randomNumbers(6);
//3.保存验证码到session
session.setAttribute("code",code);

根据手机号查询用户

 //4.一致,根据手机号查询用户(需要写对应的单表查询方法:select * from tb_user where phone = #{phone})
    User user = query().eq("phone", phone).one();

发送验证码和用户登录

发送验证码:

//3.保存验证码到Redis
        stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY +phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);//有效期2mins

用户登录,获取验证码:

 //2.从Redis中获取验证码
        String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY+phone);

保存用户到Redis,而且只保存部分信息

//6.保存用户到Redis
//(1)生成token
String token = UUID.randomUUID().toString(true);//hutools
//(2)User转为HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user,UserDTO.class);
HashMap<Object, Object> userMap = new HashMap<>();
userMap.put("id", userDTO.getId().toString());
userMap.put("nickName", userDTO.getNickName());
userMap.put("icon", userDTO.getIcon());
//(3)存储到Redis
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);
//(4) 设置有效期
stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);

MvcConfig注入stringRedisTemplate,然后传给LoginInterceptor,因为LoginInterceptor不是bean不能用spring注入其他bean

拦截器进行的预处理-保存ThreadLocal和刷新有效期

  @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1.获取请求头中的token
        String token = request.getHeader("authorization");
        if (StrUtil.isBlank(token)) {
            return true;
        }
        // 2.基于TOKEN获取redis中的用户
        String key  = LOGIN_USER_KEY + token;
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
        // 3.判断用户是否存在
        if (userMap.isEmpty()) {
            return false;
        }
        // 5.将查询到的hash数据转为UserDTO
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        // 6.存在,保存用户信息到 ThreadLocal
        UserHolder.saveUser(userDTO);
        // 7.刷新token有效期
        stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
        // 8.放行
        return true;
    }

而保存进ThreadLocal的东西已经被封装好了

public class UserHolder {
    private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();

    public static void saveUser(UserDTO user){
        tl.set(user);
    }

    public static UserDTO getUser(){
        return tl.get();
    }

    public static void removeUser(){
        tl.remove();
    }
}

缓存

写入/写出redis,要序列化JSONUtil(⭐⭐⭐)

@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public Result querygetById(Long id) {
        //1.从Redis内查询商品缓存
        String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
        if(StrUtil.isNotBlank(shopJson)){
            //手动反序列化
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return Result.ok(shop);
        }
        //2.不存在就根据id查询数据库
        Shop shop = getById(id);
        if(shop==null){
            return Result.fail("商户不存在!");
        }
        //3.数据库数据写入Redis
        //手动序列化
        String shopStr = JSONUtil.toJsonStr(shop);
        stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,shopStr,CACHE_SHOP_TTL, TimeUnit.MINUTES);
        return Result.ok(shop);
    }
}

查很多东西,并且写。设置有效时间。

@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public Result queryList() {
        //1.从Redis中查询
        String key = CACHE_SHOPTYPE_KEY;
        List<String> list = stringRedisTemplate.opsForList().range(key, 0, -1);
        if(!list.isEmpty()){
            //手动反序列化
            List<ShopType> typeList = new ArrayList<>();
            for (String s : list) {
                ShopType shopType = JSONUtil.toBean(s, ShopType.class);
                typeList.add(shopType);
            }
            return Result.ok(typeList);
        }

        //2.从数据库内查询
        List<ShopType> typeList = query().orderByAsc("sort").list();
        if(typeList.isEmpty()){
            return Result.fail("不存在该分类!");
        }
          //序列化
        for (ShopType shopType : typeList) {
            String s = JSONUtil.toJsonStr(shopType);
            list.add(s);
        }

        //3.存入缓存
        stringRedisTemplate.opsForList().rightPushAll(key,list);
        stringRedisTemplate.expire(key,CACHE_SHOPTYPE_TTL,TimeUnit.MINUTES);
        return Result.ok(list);
    }
}

更新数据库的时候要删除缓存,以实现缓存和数据库双写一致

基于互斥锁解决缓存击穿问题

/**
 * 获取互斥锁
 */
private boolean tryLock(String key) {
    Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "1", TTL_TEN, TimeUnit.SECONDS);
    return BooleanUtil.isTrue(flag);
}

/**
 * 释放互斥锁
 */
private void unLock(String key) {
    redisTemplate.delete(key);
}
/**互斥锁实现解决缓存击穿**/
public Shop queryWithMutex(Long id){
    //1.从Redis内查询商品缓存
    String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
    if(StrUtil.isNotBlank(shopJson)){
        //手动反序列化
        return JSONUtil.toBean(shopJson, Shop.class);
    }
    //如果上面的判断不对,那么就是我们设置的""(有缓存"",证明数据库内肯定是没有的)或者null(没有缓存)
    //判断命中的是否时空值
    if(shopJson!=null){//
        return null;
    }

    //a.实现缓存重建
    //a.1 获取互斥锁
    String lockKey = LOCK_SHOP_KEY + id;
    Shop shop = null;
    try {
        boolean hasLock = tryLock(lockKey);
        //a.2 判断是否获取到,获取到:根据id查数据库 获取不到:休眠
        if(!hasLock){
            Thread.sleep(50);
            return queryWithMutex(id);
        }

        //2.不存在就根据id查询数据库
        shop = getById(id);
        //模拟重建的延时
        Thread.sleep(200);
        if(shop==null){
            stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
            return null;
        }
        //3.数据库数据写入Redis
        //手动序列化
        String shopStr = JSONUtil.toJsonStr(shop);
        stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,shopStr,CACHE_SHOP_TTL, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        //释放互斥锁
        unlock(lockKey);
    }

    return shop;
}
@Override
public Result querygetById(Long id) {
    //缓存穿透
    //Shop shop = queryWithPassThrough(id);

    //互斥锁解决缓存击穿
    Shop shop = queryWithMutex(id);
    if(shop==null) return Result.fail("店铺不存在!");

    return Result.ok(shop);
}

基于逻辑过期解决缓存击穿问题

private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);//开启10个线程

/**逻辑过期实现解决缓存击穿**/
public Shop queryWithLogical(Long id){
    //1.从Redis内查询商品缓存
    String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
    //2.判断是否存在
    if(StrUtil.isBlank(shopJson)){
        return null;
    }
    //3.命中,需要先把json反序列化为对象
    RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
    JSONObject data = (JSONObject) redisData.getData();
    Shop shop = JSONUtil.toBean(data, Shop.class);

    //4.判断是否过期
    LocalDateTime expireTime = redisData.getExpireTime();
    if(expireTime.isAfter(LocalDateTime.now())){
        //未过期直接返回
        return shop;
    }
        //5.过期的话需要缓存重建
    //5.1 获取互斥锁
    String lockKey = LOCK_SHOP_KEY + id;
    boolean hasLock = tryLock(lockKey);
    //5.2判断是否获取到,获取到:根据id查数据库 获取不到:休眠
   if(hasLock){
       //成功就开启独立线程,实现缓存重建, 这里的话用线程池
       CACHE_REBUILD_EXECUTOR.submit(()->{
           try {
               //重建缓存
               this.saveShop2Redis(id,20L);
           } catch (Exception e) {
               throw new RuntimeException(e);
           }finally {
               //释放锁
               unlock(lockKey);
           }

       });

   }

    return shop;
}
/**缓存重建方法**/
public void saveShop2Redis(Long id,Long expireSeconds) throws InterruptedException {
    //1.查询店铺信息
    Shop shop = getById(id);
    Thread.sleep(200);
    //2.封装逻辑过期时间
    RedisData redisData = new RedisData();
    redisData.setData(shop);
    redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
    //3.写入Redis
    stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}
@Test
void testSaveShop() throws InterruptedException {
    shopService.saveShop2Redis(1L,10L);
}

秒杀-分布式锁

一人一单的秒杀(不支持并发安全)

@Override
public Result seckillVoucher(Long voucherId) {
    //1.查询优惠卷
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    //2.判断秒杀是否开始,是否结束
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        return Result.fail("秒杀尚未开始!");
    }
    if(voucher.getEndTime().isBefore(LocalDateTime.now())){
        return Result.fail("秒杀已结束!");
    }
    //3.判断库存是否充足
    if(voucher.getStock()<=0){
        return Result.fail("优惠券库存不足!");
    }

    Long userId = UserHolder.getUser().getId();
    synchronized (userId.toString().intern()) {//userId一样的持有同一把锁,最好不要放在整个方法上,intern:去字符串常量池找相同字符串
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//获得代理对象
        return proxy.createVoucherOrder(voucherId);//默认是this,我们要实现事务需要proxy
    }//先获取锁,然后再进入方法,确保我的前一个订单会添加上,能先提交事务再释放锁
}

@Transactional
public Result createVoucherOrder(Long voucherId){
    //查询订单看看是否存在
    Long userId = UserHolder.getUser().getId();

    if (query().eq("user_id",userId).eq("voucher_id",voucherId).count()>0) {
        return Result.fail("用户已经购买过一次!");
    }

    //4.扣减库存
    boolean success = seckillVoucherService.update()
            .setSql("stock = stock -1")
            .eq("voucher_id", voucherId).gt("stock",0)//where id = ? and stock >0 添加了乐观锁
            .update();
    //5.创建订单
    if(!success){
        return Result.fail("优惠券库存不足!");
    }

    //6.返回订单id
    VoucherOrder voucherOrder = new VoucherOrder();
    //6.1订单id
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    //6.2用户id
    //Long userId = UserHolder.getUser().getId();
    voucherOrder.setUserId(userId);
    //6.3代金券id
    voucherOrder.setVoucherId(voucherId);

    //7.订单写入数据库
    save(voucherOrder);
    //8.返回订单Id
    return Result.ok(orderId);
}

分布式锁Redisson

1.导入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

2.写一个配置类

@Configuration
public class RedissionConfig {
    @Bean
    public RedissonClient redissionClient(){
        //配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.200.131:6379").setPassword("1234");
        //创建RedissonClient对象
        return Redisson.create(config);
    }
}

3.在业务实现类中使用RedissonClient

@Autowired
private RedissonClient redissonClient;

业务方法{
//···前面流程
 Long userId = UserHolder.getUser().getId();
        //创建锁对象
        //SimpleRedisLock lock = new SimpleRedisLock("order:" + userId,stringRedisTemplate);
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        //获取锁
        boolean hasLock = lock.tryLock( );
        if(!hasLock){
            //获取锁失败: return fail 或者 retry 这里业务要求是返回失败
            return Result.fail("请勿重复下单!");
        }

        try {
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//获得代理对象
            return proxy.createVoucherOrder(voucherId);//默认是this,我们要实现事务需要proxy
        } catch (IllegalStateException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
}

看门狗watchdog

tryLock(waitTime,leaseTime,TimeUnit)

waitTime:获取锁的等待时长,获取锁失败后等待waitTime再去获取锁

leaseTime: 锁自动失效时间,这里测试锁重试不需要用到

WatchDog—–超时释放

对抢锁过程进行监听,抢锁完毕后,scheduleExpirationRenewal(threadId) 方法会被调用来对锁的过期时间进行续约,在后台开启一个线程,进行续约逻辑,也就是看门狗线程。

// 续约逻辑
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {… }, 锁失效时间 / 3, TimeUnit.MILLISECONDS);

Method(new TimerTask(){}, 参数2, 参数3)

通过参数2、参数3 去描述,什么时候做参数1 的事情。

锁的失效时间为 30s,10s 后这个 TimerTask 就会被触发,于是进行续约,将其续约为 30s;
若操作成功,则递归调用自己,重新设置一个 TimerTask 并且在 10s 后触发;循环往复,不停的续约。

因为不想写lua脚本,也不想看redis自己实现的消息队列。所以这一章就结束了

达人探店

点赞功能

@Override
public Result likeBlog(Long id) {
    //1.判断当前用户是否已点赞
    Long userId = UserHolder.getUser().getId();
    String key = BLOG_LIKED_KEY + id;
    Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
    if(BooleanUtil.isFalse(isMember)){
        //2.未点赞:数据库赞+1
        boolean isSuccess = update().setSql("liked = liked +  1").eq("id", id).update();
        //3.用户信息保存到Redis的点赞set
        if(isSuccess){
            stringRedisTemplate.opsForSet().add(key,userId.toString());
        }
    }
   else{
        //4.已点赞:数据库-1
        boolean isSuccess = update().setSql("liked = liked -  1").eq("id", id).update();
        //5.把用户信息从Redis的点赞set移除
        if(isSuccess){
            stringRedisTemplate.opsForSet().remove(key,userId.toString());
        }
    }
   return Result.ok();
}

当我们点开一篇blog的时候就需要被看到是否点赞过,这就要求我们改一下queryBlogById(id)咯,当然isLikeBlog(blog)也是需要

@Override
public Result queryBlogById(Long id) {
    //1.查询blog
    Blog blog = getById(id);
    if(blog==null){
        return Result.fail("笔记不存在!");
    }
    //2.查询blog相关用户
    queryBlogUser(blog);
    //3.查询用户是否点过赞,其实就是给blog的isLike添加值
    isLikeBlog(blog);

    return Result.ok(blog);
}

private void isLikeBlog(Blog blog) {
    Long userId = UserHolder.getUser().getId();
    String key = BLOG_LIKED_KEY + id;
    Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
    blog.setIsLike(BooleanUtil.isTrue(isMember));
}

 @Override
    public Result queryHotBlog(Integer current) {
        // 根据用户查询
        Page<Blog> page = query()
                .orderByDesc("liked")
                .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
        // 获取当前页数据
        List<Blog> records = page.getRecords();
        // 查询用户
        records.forEach(blog->{
            this.queryBlogUser(blog);
            this.isLikeBlog(blog);
        });//就是用blog遍历的
        return Result.ok(records);
    }

排行榜

用zset

    private void isBlogLiked(Blog blog) {
        // 1.获取登录用户
        UserDTO user = UserHolder.getUser();
        if (user == null) {
            // 用户未登录,无需查询是否点赞
            return;
        }
        Long userId = user.getId();
        // 2.判断当前登录用户是否已经点赞
        String key = "blog:liked:" + blog.getId();
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        blog.setIsLike(score != null);
    }
   @Override
    public Result likeBlog(Long id) {
        // 1.获取登录用户
        Long userId = UserHolder.getUser().getId();
        // 2.判断当前登录用户是否已经点赞
        String key = BLOG_LIKED_KEY + id;
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        if (score == null) {
            // 3.如果未点赞,可以点赞
            // 3.1.数据库点赞数 + 1
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            // 3.2.保存用户到Redis的set集合  zadd key value score
            if (isSuccess) {
                stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
            }
        } else {
            // 4.如果已点赞,取消点赞
            // 4.1.数据库点赞数 -1
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            // 4.2.把用户从Redis的set集合移除
            if (isSuccess) {
                stringRedisTemplate.opsForZSet().remove(key, userId.toString());
            }
        }
        return Result.ok();
    }

    @Override
    public Result queryBlogLikes(Long id) {
        String key = BLOG_LIKED_KEY + id;
        // 1.查询top5的点赞用户 zrange key 0 4
        Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
        if (top5 == null || top5.isEmpty()) {
            return Result.ok(Collections.emptyList());
        }
        // 2.解析出其中的用户id
        List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
        String idStr = StrUtil.join(",", ids);
        // 3.根据用户id查询用户 WHERE id IN ( 5 , 1 ) ORDER BY FIELD(id, 5, 1)
        List<UserDTO> userDTOS = userService.query()
                .in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list()
                .stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
        // 4.返回
        return Result.ok(userDTOS);
    }

共同关注

普通的关注和取消关注也是写到了redis里

if(isSave){
            //把被关注用户id放入Redis sadd follows:userId(key) followerId(value)
            stringRedisTemplate.opsForSet().add(followKey,followUserId.toString());
        }
if(isRemove) {
                //把被关注用户id从Redis移除
                stringRedisTemplate.opsForSet().remove(followKey, followUserId.toString());
            }

共同关注

@Override
public Result followCommons(Long followUserId) {
    //1.先获取当前用户
    Long userId = UserHolder.getUser().getId();
    String followKey1 = "follows:" + userId;
    String followKey2 = "follows:" + followUserId;

    //2.求交集
    Set<String> intersect = stringRedisTemplate.opsForSet().intersect(followKey1, followKey2);
    if(intersect==null||intersect.isEmpty()){
        return Result.ok(Collections.emptyList());
    }
    //3.解析出id数组
    List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());

    //4.根据ids查询用户数组 List<User> ---> List<UserDTO>
    List<UserDTO> userDTOS = userService.listByIds(ids)
            .stream()
            .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
            .collect(Collectors.toList());

    return Result.ok(userDTOS);
}

滚动分页

通俗来说reverseRangeWithScore函数就是先将指定key下的zset类型的有序集合按照score由大到小排序

public Result queryBloyOfFollow(Long max, Integer offset) {
    //1.获取当前用户
    Long userId = UserHolder.getUser().getId();
    //2.查询当前用户收件箱 zrevrangebyscore key max min limit offset count
    String feedKey = FEED_KEY + userId;
    Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(feedKey, 0, max, offset, 2);
    if(typedTuples==null||typedTuples.isEmpty()){
        return Result.ok();
    }
    //3.解析出收件箱中的blogId,score(时间戳),offset
    List<Long> ids = new ArrayList<>(typedTuples.size());
    long minTime = 0;
    int count = 1;//最小时间的相同个数
    for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {
        //3.1 获取id
        ids.add(Long.valueOf(typedTuple.getValue()));//blog的id
        //3.2 获取分数(时间戳)
        long time = typedTuple.getScore().longValue();
        if(time == minTime){
            count++;
        }else{
            minTime = time;
            count=1;
        }
    }
    //4.根据blogId查找blog
    String idStr = StrUtil.join(",",ids);
    List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id, " + idStr + ")").list();

    for (Blog blog : blogs) {
        //4.1 查询blog有关的用户
        queryBlogUser(blog);
        //4.2 查询blog是否被点过赞
        isLikeBlog(blog);
    }

    //5.封装并返回
    ScrollResult r = new ScrollResult();
    r.setList(blogs);
    r.setOffset(count);
    r.setMinTime(minTime);
    return Result.ok(r);
}

还有bitmap、geohash、hyperloglog等数据结构,不常用就先不写了。


文章作者: 爱敲代码の鱼儿
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 爱敲代码の鱼儿 !
  目录