我的Redis笔记🍥🍥


Redis


Redis实现 [短信登陆功能]

先来一个最简单的实现

pom.xml 依赖文件

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3</version>
</dependency>

<!--hutool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.5</version>
</dependency>
</dependencies>

图解

image-20221008110140875

image-20221008110331140

image-20221008110903511

UserService 实现

package com.ayaka.service.impl;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.RandomUtil;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ayaka.dto.LoginFormDTO;
import com.ayaka.dto.Result;
import com.ayaka.dto.UserDTO;
import com.ayaka.entity.User;
import com.ayaka.mapper.UserMapper;
import com.ayaka.service.IUserService;
import com.ayaka.utils.RedisConstants;
import com.ayaka.utils.RegexUtils;
import com.ayaka.utils.SystemConstants;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.servlet.http.HttpSession;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;


@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {


/**
* 注入RedisTemplate
*/
@Resource //Resource 注入时 对象名不能瞎起
//@Autowired
private StringRedisTemplate stringRedisTemplate;

/**
* 发送手机验证码 redis
*/
@Override
public Result sendCode(String phone, HttpSession session) {
//验证手机号格式是否正确
boolean phoneInvalid = RegexUtils.isPhoneInvalid(phone);
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误!");
}

//生成验证码
String code = RandomUtil.randomNumbers(6);

//将验证码发送在Redis当中
stringRedisTemplate.opsForValue().set(
RedisConstants.LOGIN_CODE_KEY + phone,
code,
RedisConstants.LOGIN_CODE_TTL, // 5分钟
TimeUnit.MINUTES);

//发送验证码 模拟
log.debug("[模拟]发送验证码:" + code);
//返回结果 ok
return Result.ok();
}

/**
* 登录功能 redis
*/
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
//验证手机号是否正确
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误!");
}
//用户输入的验证码
String code = loginForm.getCode();

//取出redis中的验证码
String codeRedis = stringRedisTemplate.opsForValue().get(RedisConstants.LOGIN_CODE_KEY + phone);

//与redis存储的验证码进行比对
if (!Objects.equals(codeRedis, code)) {
return Result.fail("验证码错误!");
}
//当前手机号是否存在用户
User user = query().eq("phone", phone).one();
//不存在:通过手机号创建新用户 登录用户 保持到session当中
if (ObjectUtils.isEmpty(user)) {
user = createUserWithPhone(phone);
}
//存在: 登录用户 保持到redis当中
//不管用户是否存在都要 登录用户 保持到reids当中

//转成 UserDTO 对象 【增加安全】
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);

//将 UserDTO 对象 保存在redis 使用hash结构
// 键:token
String token = RedisConstants.LOGIN_USER_KEY + UUID.randomUUID().toString(true);
// 值:用户信息
Map<String, String> userMap = new HashMap<>();
userMap.put(UserDTO.USER_DTO_ID, userDTO.getId().toString());
userMap.put(UserDTO.USER_DTO_NICKNAME, userDTO.getNickName());
userMap.put(UserDTO.USER_DTO_ICON, userDTO.getIcon());
//存储
stringRedisTemplate.opsForHash().putAll(token, userMap);
//设置有效时间
stringRedisTemplate.expire(token, RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);

//返回给前端用户数据 token
return Result.ok(token);
}

/**
* 通过手机号创建用户
*/
public User createUserWithPhone(String phone) {
//生成一个随机用户名
String userName = SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomString(8);
User user = new User();
user.setPhone(phone);
user.setNickName(userName);
save(user);
return user;
}
}

拦截器的实现

package com.ayaka.config;

import com.ayaka.interceptor.LoginInterceptor;
import com.ayaka.interceptor.RefreshTokenInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import javax.annotation.Resource;

@Configuration
public class MvcConfig implements WebMvcConfigurer {

@Resource
private StringRedisTemplate stringRedisTemplate;

/**
* 关于拦截器的顺序问题:
* 当添加多个拦截器的时候,第一个添加的默认的拦截器顺序是0,后面添加的依次递增
* 顺序的两种解决方案:
* 1. 先过滤的过滤器 先添加 即:按过滤的顺序依次 registry.addInterceptor()
* 2. 使用 registry.addInterceptor().order() order方法手动设置过滤顺序: 0 > 1 > 2 > ...
* @param registry
*/
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/shop/**",
"/voucher/**",
"/shop-type/**",
"/upload/**",
"/blog/hot",
"/user/code",
"/user/login"
).order(1);
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0);
}


}

Redis解决 [缓存] [TODO]

什么是缓存

image-20221008111515914

缓存作用和成本

image-20221008111603068

缓存更新策略

image-20221008112021404

主动更新策略

image-20221008112131756

Cache Adide Pattern

image-20221008112148352

image-20221008112331473

缓存更新策略最佳方案

image-20221008112431914

缓存穿透

什么是缓存穿透

image-20221008112509532

缓存空对象解决方案

image-20221008112632643

缓存穿透产生的原因是什么?

  • 用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请 求,给数据库带来巨大压力 缓存穿透的解决方案有哪些?

  • 被动防御

    • 缓存null值
    • 布隆过滤
  • 主动防御

    • 增强id的复杂度,避免被猜测id规律

    • 做好数据的基础格式校验

    • 加强用户权限校验

    • 做好热点参数的限流


缓存雪崩

image-20221008112934964

缓存击穿

什么是缓存击穿

image-20221008113030208

缓存击穿解决方案

image-20221008113325878

两种方案的优缺点

image-20221008113352925

**一致还是性能,这是个值得思考的问题~ **

案例解决方案

image-20221008113428273

image-20221008113440575

缓存案例

@Override
public Result queryById(Long id) {

//使用缓存
//Result result = queryByIdCache(id);

//使用缓存解决 [缓存穿透] 问题
//Result result = queryByIdCachePenetration(id);

//使用缓存解决 [缓存击穿] 问题 方案一: [互斥锁方案]
//Result result = queryByIdCacheMutex(id);

//使用缓存解决 [缓存击穿] 问题 方案一: [逻辑过期方案]
//Result result = queryByIdCacheLogicalExpire(id);

//不存在,返回404
return result;
}

缓存穿透 —缓存空对象

/**
* 解决缓存穿透问题
* 使用 缓存空对象 解决方案
*
* @param id
* @return
*/
public Result queryByIdCachePenetration(Long id) {

//1. 获取商品id的key
String key = RedisConstants.CACHE_SHOP_KEY + id;
//2. 通过key向redis缓存中查询数据
String shopStr = stringRedisTemplate.opsForValue().get(key);
//3. 如果命中,返回查询结果
if (ObjectUtil.isNotEmpty(shopStr)) {
Shop shop = JSONUtil.toBean(shopStr, Shop.class); //转成对象
return Result.ok(shop);
}

//3.1 判断是否命中的是空值
if (shopStr != null) {
return null;
}

//4. 如果未命中,查询数据库
//5. 通过id查询数据库
Shop shopById = this.getById(id);
//6. 存在,写入redis缓存,并返回查询结果
if (ObjectUtil.isNotEmpty(shopById)) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shopById));
return Result.ok(shopById);
}

//7. 不存在,向redis写入空对象
stringRedisTemplate.opsForValue().set(
RedisConstants.CACHE_SHOP_KEY + id, "", RedisConstants.CACHE_SHOP_NOLL_TTL, TimeUnit.SECONDS);

//8. 向前端返回信息
return Result.fail("查询的商品不存在");
}

缓存击穿 —互斥锁方案

/**
* 解决缓存击穿问题
* 使用 互斥锁解决方案
*
* @param id
* @return
*/
public Result queryByIdCacheMutex(Long id) {

//1. 获取商品id的key
String key = RedisConstants.CACHE_SHOP_KEY + id;
//2. 通过key向redis缓存中查询数据
String shopStr = stringRedisTemplate.opsForValue().get(key);
//3. 如果命中,返回查询结果
if (ObjectUtil.isNotEmpty(shopStr)) {
Shop shop = JSONUtil.toBean(shopStr, Shop.class); //转成对象
return Result.ok(shop);
}

//3.1 判断是否命中的是空值
if (shopStr != null) {
return null;
}

//4. 如果未命中,进行缓存重建
try {

//4.1 获取互斥锁
boolean lock = setShopMutex(id);

//4.2 判断互斥锁是否获取成功 不成功,等待 + 递归
if (!lock) {
Thread.sleep(500); //等到500毫秒
queryByIdCacheMutex(id); //递归
}

//4.3 获取互斥锁成功 通过id查询数据库
Shop shopById = this.getById(id);

//4.4 存在,写入redis缓存,并返回查询结果
if (ObjectUtil.isNotEmpty(shopById)) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shopById));
return Result.ok(shopById);
}

//4.5 不存在,向redis写入空对象
stringRedisTemplate.opsForValue()
.set
(RedisConstants.CACHE_SHOP_KEY + id, "", RedisConstants.CACHE_SHOP_NOLL_TTL, TimeUnit.SECONDS);

} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {

//4.6 释放互斥锁
delShopMutex(id);
}

//5. 向前端返回信息
return Result.fail("查询的商品不存在");
}

互斥锁的简单构建

//获取互斥锁
public boolean setShopMutex(Long id) {
//使用 .setIfAbsent()方法
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(RedisConstants.LOCK_SHOP_KEY + id, "lock", RedisConstants.LOCK_SHOP_TTL, TimeUnit.SECONDS);
return BooleanUtil.isTrue(lock);
}

//删除互斥锁
public void delShopMutex(Long id) {
stringRedisTemplate.delete(RedisConstants.LOCK_SHOP_KEY + id);
}

缓存击穿 —逻辑过期方案

数据的变动

import lombok.Data;
import java.time.LocalDateTime;

/**
* 存储 数据 + 逻辑时间
*/
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}

缓存重建方法

//缓存重建
public void saveShopRedis(Long id, Long expireSeconds) {

//模拟重建时间
System.out.println("休眠中");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("休眠结束");

//查询数据库数据
Shop shop = this.getById(id);
//封装 数据+逻辑时间
RedisData redisData = new RedisData();
redisData.setData(shop); //数据
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds)); //时间
//序列化
String json = JSONUtil.toJsonStr(redisData);
//写入redis缓存当中
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, json);
}

业务逻辑

/**
* 解决缓存击穿问题
* 使用 逻辑过期解决方案
*
* @param id
* @return
*/
public Result queryByIdCacheLogicalExpire(Long id) {

//1. 查询缓存是否命中
String jsonData = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
//2. 未命中直接返回前端信息
if (ObjectUtil.isEmpty(jsonData)) {
return Result.fail("查询的商品不存在!");
}
//3. 命中
//4. 反序列化数据
RedisData redisData = JSONUtil.toBean(jsonData, RedisData.class);
JSONObject data = (JSONObject) redisData.getData(); //转成 JSONObject对象
Shop shop = JSONUtil.toBean(data, Shop.class); //转成 Shop 对象
LocalDateTime expireTime = redisData.getExpireTime(); //获取逻辑时间
//5. 判断逻辑时间是否过去
if (expireTime.isAfter(LocalDateTime.now())) { //expireTime.isAfter(LocalDateTime.now()) --> true:未过期 false:过期
//6. 逻辑时间未过期 直接向前端返回数据
return Result.ok(shop);
}
//7. 逻辑时间过期 尝试获取互斥锁
boolean lock = setShopMutex(id);
//8. 获取互斥成功
if (lock) {
//9. 开启新的线程
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
//10. 重建缓存
saveShopRedis(id, 10L);
} finally {
//11. 释放锁
delShopMutex(id);
}
});
}

//12. 返回旧数据
return Result.ok(shop);
}

缓存工具封装

**U基于StringRedisTemplate封装一个缓存工具类,满足下列需求: **

✓ 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间

✓ 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存 击穿问题

✓ 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题

✓ 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

package com.ayaka.utils;


import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
* 封装缓存工具类
*/
@Slf4j
@Component
public class CacheClient {

//注入RedisTemplate
@Resource
private StringRedisTemplate stringRedisTemplate;


//自定义线程池
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);



//✓ 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
public <ID> void setCache(String prefix, ID id, Object value, Long time, TimeUnit timeUnit){
//序列化 并写入redis缓存
String key = prefix + id;
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,timeUnit);
}


//✓ 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存 击穿问题
public <ID> void setWithLogicalExpire(String prefix, ID id, Object value, Long timeSeconds){
String key = prefix + id;
//封装成 RedisData类型
RedisData redisData = new RedisData();
redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeSeconds));
redisData.setData(value);
//序列化
String json = JSONUtil.toJsonStr(redisData);
//写入redis缓存
stringRedisTemplate.opsForValue().set(key, json);
}
// ✓ 方法2的另一种写法,+缓存重建
public <R,ID> R newCache(String key, ID id, Long time, TimeUnit timeUnit, Function<ID, R> sqlFunction){
//获取数据库数据
R rs = sqlFunction.apply(id);
//是否存在数据 不存在
if (ObjectUtil.isEmpty(rs)){
return null;
}
//序列化 并 封装
RedisData redisData = new RedisData();
redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(time)));
redisData.setData(rs);
String value = JSONUtil.toJsonStr(redisData);
//写入缓存
stringRedisTemplate.opsForValue().set(key,value);

//将数据库中的数据返回
return rs;
}


//======================================================================================


// 缓存击穿
//✓ 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
public <R,ID> R queryWithPassThrough(
String keyPrefix,
ID id, Class<R> type,
Function<ID,R> sqlFunction,
Long time, TimeUnit timeUnit,
Long timeNull, TimeUnit timeUnitNull){

String key = keyPrefix + id;
//从数据库中查询 key 数据
String jsonStr = stringRedisTemplate.opsForValue().get(key);
//判断是否命中 命中
if (ObjectUtil.isNotEmpty(jsonStr)){
//反序列化对象 并返回调用者
return JSONUtil.toBean(jsonStr, type);
}

//未命中 先判断是否为空值
if (jsonStr != null){
//返回错误信息
return null;
}

//数据库调用 日志
log.info("数据库被调用...");

//未命中 并且 没对象对象 重建缓存
R sqlData = sqlFunction.apply(id);
//判断是否存在
if (ObjectUtil.isNotEmpty(sqlData)){
//存在 序列化 并且写入缓存
this.setCache(keyPrefix,id,sqlData,time,timeUnit);
//返回调用者数据
return sqlData;
}
//不存在 缓存空对象
stringRedisTemplate.opsForValue().set(keyPrefix+id,"",timeNull,timeUnitNull);

return null;
}


//缓存击穿

//互斥锁方案
//✓ 方法4.1:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
public <R,ID> R queryWithMutex(
String keyPrefix, String lockPrefix,
Long keyTime, TimeUnit keyTimeUnit,
Long nullTime, TimeUnit nullTimeUnit,
Long lockTime, TimeUnit lockTimeUnit,
ID id,
Class<R> type, //返回值类型
//函数式 sql具体实现
Function<ID,R> sqlFunction){

String key = keyPrefix + id;
//通过缓存进行查询
String jsonStr = stringRedisTemplate.opsForValue().get(key);
//命中 反序列化并返回调用者
if(ObjectUtil.isNotEmpty(jsonStr)){
return JSONUtil.toBean(jsonStr,type);
}
//未命中 判断是否为空值
if (jsonStr != null){
//命中空对象 返回null
return null;
}

//未命中 且 不为空对象 --> 缓存中无key-value
//重建缓存
//获取互斥锁
try {
boolean isLock = getCacheLock(lockPrefix, id, lockTime, lockTimeUnit);
if (!isLock){

//失败 线程等待
Thread.sleep(600);
//回调 递归
this.queryWithMutex(
keyPrefix,lockPrefix,
keyTime,keyTimeUnit,
nullTime,nullTimeUnit,
lockTime,lockTimeUnit,
id,type,sqlFunction);
}


String str = stringRedisTemplate.opsForValue().get(key);
if (ObjectUtil.isNotEmpty(str)){
return JSONUtil.toBean(str,type);
}

//获取互斥锁成功 查询数据库数据
R rd = sqlFunction.apply(id);
if (ObjectUtil.isNotEmpty(rd)){
//重建缓存
//存在: 调用 this.setCache
this.setCache(keyPrefix,id,rd,keyTime,keyTimeUnit);
return rd;
}

//不存在: 写入缓存空对象
stringRedisTemplate.opsForValue().set(key,"",nullTime,nullTimeUnit);

} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//释放锁
this.delCacheLock(lockPrefix,id);
}

return null;
}


//✓ 方法4.2:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
public <R,ID> R queryWithLogicalExpire(
String keyPrefix, String lockPrefix,
Long time, TimeUnit timeUnit,
Long lockTime, TimeUnit lockTimeUnit,
ID id, R r, Class<R> type,
Function<ID,R> sqlFunction){


String key = keyPrefix + id;
//通过缓存查询
String jsonStr = stringRedisTemplate.opsForValue().get(key);
//判断是否命中 未命中
if (ObjectUtil.isEmpty(jsonStr)){
//直接向前端返回null
}
//命中
//反序列化 并 拆分 RedisData
RedisData redisData = JSONUtil.toBean(jsonStr, RedisData.class);
LocalDateTime expireTime = redisData.getExpireTime();
JSONObject jsonObject = (JSONObject) redisData.getData();
R rd = JSONUtil.toBean(jsonObject, type);

//判断逻辑时间是否过期
if (expireTime.isAfter(LocalDateTime.now())){
//未过期 直接返回数据
return rd;
}
//逻辑时间过期 获取互斥锁
boolean cacheLock = this.getCacheLock(lockPrefix, id, lockTime, lockTimeUnit);
//获取互斥锁成功
if (cacheLock){
//获取新线程
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
//重建缓存
this.newCache(key,id,time,timeUnit,sqlFunction);
} finally {
//释放锁
this.delCacheLock(lockPrefix,id);
}
});
}

//先将旧数据返回给调用者 失去的一致性 但提高了性能!
return rd;
}


//另外的 : 互斥锁 Redis简单的实现

//获取互斥锁
public <ID> boolean getCacheLock(String prefix, ID id, Long time, TimeUnit timeUnit){
String key = prefix + id;
//创建Mutex
Boolean ifs = stringRedisTemplate.opsForValue().setIfAbsent(key, RedisConstants.CACHE_LOCK_VALUE, time, timeUnit );
//log.info("互斥锁已 [创建] ...");
return BooleanUtil.isTrue(ifs); //为了防止自动拆箱 使用了工具类
}

//释放锁
public <ID> void delCacheLock(String prefix, ID id){
String key = prefix + id;
// 删除Mutex
stringRedisTemplate.delete(key);
//log.info("互斥锁已 [删除] ...");
}



}

Redis解决 [生成全局ID方案]

解决方案:

package com.ayaka.utils;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;

@Component
public class RedisIdWorker {

/**
* 开始时间戳
*/
public static final long BEGIN_TIMESTAMP = 1640995200L; //2022,1,1,0,0

/**
* 时间戳移动的位数
*/
public static final int COUNT_BITS = 32;

/**
* StringRedisTemplate对象
*/
public StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

/**
* 生成全局id
* @param keyPrefix
* @return
*/
public long nextId(String keyPrefix) {

//当前时间戳
LocalDateTime now = LocalDateTime.now();
long nowTimestamp = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowTimestamp - BEGIN_TIMESTAMP;

//获取自增
//1.获取当前日期
String format = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2.获取自增
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + format);

//计算并返回
return timestamp << COUNT_BITS | count;
}


/*
public static void main(String[] args) {
LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0);
long times = time.toEpochSecond(ZoneOffset.UTC);
System.out.println(times);
}
*/

}

测试代码

package com.ayaka;

import com.ayaka.utils.RedisIdWorker;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@SpringBootTest
@ContextConfiguration(classes = RedisIdWorkerApp.class)
class RedisIdWorkerTests {

@Resource
private RedisIdWorker redisIdWorker;

public ExecutorService es= Executors.newFixedThreadPool(500);

@Test
void gitIdTest() throws InterruptedException {

CountDownLatch countDownLatch = new CountDownLatch(300);

//开始时间戳
long startTimestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);

Runnable task = ()->{
//一个任务生成100个id
for (int i = 0; i < 100; i++) {
long id = redisIdWorker.nextId("order");
System.out.println(id);
}
countDownLatch.countDown();
};

//300个任务
for (int i = 0; i < 300; i++) {
es.submit(task);
}

countDownLatch.await();
long stopTimestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
System.out.println(stopTimestamp - startTimestamp);
}


}

Redis解决 [秒杀] [秒杀优惠卷]

秒杀卷库表设计

image-20221028150115925

tb_voucher 优惠卷的基本信息

image-20221028150847270

tb_seckill_voucher 特价/秒杀 优惠卷的拓展信息

image-20221028152907291

SQL

# tb_voucher
CREATE TABLE `tb_voucher` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`shop_id` bigint(20) unsigned DEFAULT NULL COMMENT '商铺id',
`title` varchar(255) NOT NULL COMMENT '代金券标题',
`sub_title` varchar(255) DEFAULT NULL COMMENT '副标题',
`rules` varchar(1024) DEFAULT NULL COMMENT '使用规则',
`pay_value` bigint(10) unsigned NOT NULL COMMENT '支付金额,单位是分。例如200代表2元',
`actual_value` bigint(10) NOT NULL COMMENT '抵扣金额,单位是分。例如200代表2元',
`type` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '0,普通券;1,秒杀券',
`status` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '1,上架; 2,下架; 3,过期',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;

# tb_seckill_voucher
CREATE TABLE `tb_seckill_voucher` (
`voucher_id` bigint(20) unsigned NOT NULL COMMENT '关联的优惠券的id',
`stock` int(8) NOT NULL COMMENT '库存',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`begin_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '生效时间',
`end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '失效时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`voucher_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT COMMENT='秒杀优惠券表,与优惠券是一对一关系';

添加秒杀卷

VoucherController

/**
* 新增秒杀券
* @param voucher 优惠券信息,包含秒杀信息
* @return 优惠券id
*/
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}

VoucherServiceImpl

//也就是说 秒杀卷 也包含的普通卷的创建 
//这里调用的seckillVoucherService业务
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
}

也就是说 秒杀卷 也包含的普通卷的创建

POST	http://localhost:8081/voucher/seckill

{
"shopId":1,
"title":"100元代金券",
"subTitle":"周一至周五均可使用",
"rules":"全场通用\\n无需预约\n可无限叠加\\不兑现、不找零\\n仅限堂食",
"payValue":8000,
"actualValue":10000,
"type":1,
"stock":100,
"beginTime":"2022-10-26T10:01:00",
"endTime":"2022-10-31T23:01:00"
}

秒杀卷下单

库表设计:tb_voucher_order

image-20221028151740027

# tb_vouche_order
CREATE TABLE `tb_voucher_order` (
`id` bigint(20) NOT NULL COMMENT '主键',
`user_id` bigint(20) unsigned NOT NULL COMMENT '下单的用户id',
`voucher_id` bigint(20) unsigned NOT NULL COMMENT '购买的代金券id',
`pay_type` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '支付方式 1:余额支付;2:支付宝;3:微信',
`status` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '订单状态,1:未支付;2:已支付;3:已核销;4:已取消;5:退款中;6:已退款',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下单时间',
`pay_time` timestamp NULL DEFAULT NULL COMMENT '支付时间',
`use_time` timestamp NULL DEFAULT NULL COMMENT '核销时间',
`refund_time` timestamp NULL DEFAULT NULL COMMENT '退款时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;

实体类:Voucher

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_voucher")
public class Voucher implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;

/**
* 商铺id
*/
private Long shopId;

/**
* 代金券标题
*/
private String title;

/**
* 副标题
*/
private String subTitle;

/**
* 使用规则
*/
private String rules;

/**
* 支付金额
*/
private Long payValue;

/**
* 抵扣金额
*/
private Long actualValue;

/**
* 优惠券类型
*/
private Integer type;

/**
* 优惠券类型
*/
private Integer status;
/**
* 库存
*/
@TableField(exist = false)
private Integer stock;

/**
* 生效时间
*/
@TableField(exist = false)
private LocalDateTime beginTime;

/**
* 失效时间
*/
@TableField(exist = false)
private LocalDateTime endTime;

/**
* 创建时间
*/
private LocalDateTime createTime;


/**
* 更新时间
*/
private LocalDateTime updateTime;


}

下单流程:

image-20221029165002258

业务代码:

package com.ayaka.service.impl;

import com.ayaka.dto.Result;
import com.ayaka.entity.SeckillVoucher;
import com.ayaka.entity.Voucher;
import com.ayaka.entity.VoucherOrder;
import com.ayaka.mapper.VoucherOrderMapper;
import com.ayaka.service.ISeckillVoucherService;
import com.ayaka.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ayaka.service.IVoucherService;
import com.ayaka.utils.RedisIdWorker;
import com.ayaka.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;


/**
* <p>
* 服务实现类
* </p>
*
*/
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Resource
private ISeckillVoucherService iSeckillVoucherService;

@Resource
private RedisIdWorker redisIdWorker;

/**
* 实现订单秒杀业务
* @param voucherId
* @return
*/
@Override
@Transactional //添加上事务
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷信息
SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
Integer stock = voucher.getStock();
System.out.println(voucher);
//2.判断优惠卷是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始,返回异常给前端
return Result.fail("秒杀尚未开始!");
}
//3.判断优惠卷是否过期
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
//秒杀已结束,返回前端异常信息
return Result.fail("秒杀已结束!");
}
//4.判断优惠卷库存是否充足
if (stock < 1){
//秒杀卷库存不足,返回给前端异常信息
return Result.fail("库存不足!");
}
//5.扣减库存
boolean isOK = iSeckillVoucherService
.update()
.setSql("stock =stock - 1")
.eq("voucher_id", voucherId)
.update();
if (!isOK){
//秒杀失败,返回给前端异常信息
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1.生成订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2.设置用户id
voucherOrder.setUserId(UserHolder.getUser().getId());
//6.3.设置代金卷id
voucherOrder.setVoucherId(voucherId);
//6.4.当生产的订单id写入数据库
this.save(voucherOrder);

//7.返回订单ID
return Result.ok(orderId);
}

}

存在问题:

  1. 【超卖问题】 并发情况下,会出现线程安全问题 【超卖问题】
  2. 【一人一单】 一人可以下多单,应该是一人只能抢一个秒杀卷 【一人一单】

问题–超卖问题

image-20221029170512498

解决方案 : 加锁

锁的选择:

image-20221029170556067

乐观锁:

版本号法

image-20221029170746672

CAS法

image-20221029170826616

超卖这样的线程安全问题,解决方案有哪些?

  1. 悲观锁:添加同步锁,让线程串行执行

    • 优点:简单粗暴

    • 缺点:性能一般

  2. 乐观锁:不加锁,在更新时判断是否有其它线程在修改

    • 优点:性能好

    • 缺点:存在成功率低的问题

解决问题:

image-20221029173628445

package com.ayaka.service.impl;

import com.ayaka.dto.Result;
import com.ayaka.entity.SeckillVoucher;
import com.ayaka.entity.Voucher;
import com.ayaka.entity.VoucherOrder;
import com.ayaka.mapper.VoucherOrderMapper;
import com.ayaka.service.ISeckillVoucherService;
import com.ayaka.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ayaka.service.IVoucherService;
import com.ayaka.utils.RedisIdWorker;
import com.ayaka.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;


/**
* <p>
* 服务实现类
* </p>
*
*/
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Resource
private ISeckillVoucherService iSeckillVoucherService;

@Resource
private RedisIdWorker redisIdWorker;

/**
* 实现订单秒杀业务
* @param voucherId
* @return
*/
@Override
@Transactional //添加上事务
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷信息
SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
Integer stock = voucher.getStock();
System.out.println(voucher);
//2.判断优惠卷是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始,返回异常给前端
return Result.fail("秒杀尚未开始!");
}
//3.判断优惠卷是否过期
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
//秒杀已结束,返回前端异常信息
return Result.fail("秒杀已结束!");
}
//4.判断优惠卷库存是否充足
if (stock < 1){
//秒杀卷库存不足,返回给前端异常信息
return Result.fail("库存不足!");
}
//5.扣减库存
boolean isOK = iSeckillVoucherService
.update()
.setSql("stock =stock - 1")
.eq("voucher_id", voucherId)
//.eq("stock",stock) // CAS乐观锁
.gt("stock",0)
// CAS乐观锁改进 stock > 0 就可以执行下单业务
.update();
if (!isOK){
//秒杀失败,返回给前端异常信息
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1.生成订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2.设置用户id
voucherOrder.setUserId(UserHolder.getUser().getId());
//6.3.设置代金卷id
voucherOrder.setVoucherId(voucherId);
//6.4.当生产的订单id写入数据库
this.save(voucherOrder);

//7.返回订单ID
return Result.ok(orderId);
}

}

问题–一人一单

image-20221029233136089

重点!!!!!!!

/**
几个重点:
1. 一人一单问题使用 悲观锁 还是 乐观锁?
使用悲观锁
2. 使用悲观锁 synchronized 加载 方法上 还是 内部?
内部,如果加载方法上,那整个订单业务都是串行,那刚刚解决的 超卖问题[乐观锁]也没意义了
3. synchronized 的锁对象是什么
userId.toString().intern()
而不是 userId.toString()
Long的 toString() 底层是:
return new String(buf, UTF16);
.intern()方法是:返回字符串对象的规范表示。
4. 对于事务的添加 是 锁释放完了再提交 还是 提交完了再释放锁
提交完了再释放锁
具体操作:
1.在方法上加上@Transactional
2.在调用者 调用语句外加上 synchronized
synchronized (userId.toString().intern()) {
return queryOrderVoucherSave(voucherId);
}
5. 事务失效问题
spring的事务是 AOP动态代理的
this.queryOrderVoucherSave(voucherId) //并非是代理对象
解决方法
这里使用 获取动态代理的方式 :
//获取动态代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
//通过代理对象调用
proxy.queryOrderVoucherSave(voucherId);
注意: 使用AopContext.currentProxy()
导入aspectjweaver依赖
开启 @EnableAspectJAutoProxy(exposeProxy = true) 暴露代理对象

*/

业务的实现

@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷信息
SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
Integer stock = voucher.getStock();
System.out.println(voucher);
//2.判断优惠卷是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始,返回异常给前端
return Result.fail("秒杀尚未开始!");
}
//3.判断优惠卷是否过期
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
//秒杀已结束,返回前端异常信息
return Result.fail("秒杀已结束!");
}
//4.判断优惠卷库存是否充足
if (stock < 1) {
//秒杀卷库存不足,返回给前端异常信息
return Result.fail("库存不足!");
}


//一人一单问题
//获取用户id
Long userId = UserHolder.getUser().getId();

//缩小悲观锁范围
synchronized (userId.toString().intern()) {
//获取当前代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
//通过代理对象调用 保证事务的正常
return proxy.queryOrderVoucherSave(voucherId);
}

}


@Transactional
public Result queryOrderVoucherSave(Long voucherId) {

//获取用户id
Long userId = UserHolder.getUser().getId();

//判断订单表中是否已经存在
int count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherId)
.count();
if (count > 0) {
//存在,返回前端信息
return Result.fail("你已领取,每人只能领取一份!");
}

//以上都满足 施行扣减下单业务
//5.扣减库存
boolean isOK = iSeckillVoucherService
.update()
.setSql("stock =stock - 1")
.eq("voucher_id", voucherId)
//.eq("stock",stock) // CAS乐观锁
// CAS乐观锁改进 stock > 0 就可以执行下单业务
.gt("stock", 0)
.update();
if (!isOK) {
//秒杀失败,返回给前端异常信息
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1.生成订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2.设置用户id
voucherOrder.setUserId(userId);
//6.3.设置代金卷id
voucherOrder.setVoucherId(voucherId);
//6.4.当生产的订单id写入数据库
this.save(voucherOrder);

//7.返回订单ID
return Result.ok(orderId);


}

暴露代理对象

<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.9.1</version>
</dependency>
//暴露动态代理
@EnableAspectJAutoProxy(exposeProxy = true)
@MapperScan("com.ayaka.mapper")
@SpringBootApplication
public class SeckillVouchersApp {
public static void main(String[] args) {
SpringApplication.run(SeckillVouchersApp.class, args);
}
}

以上依然存在问题!

image-20221030200606796

在集群 或 分布式系统下 , 每个JVM的锁监视器是独立的,就会出现并发安全问题

解决方案:使用 分布式锁

下面👇


Redis解决 [分布式锁]

分布式锁

什么是分布式锁

image-20221030200931213

分布式锁的实现

image-20221030201024610

基于Redis的分布式锁

image-20221030201149540

一个简单的实现:

/**
* 基于Redis的分布式锁
*/
public interface ILock {

/**
* 尝试获取锁
* @param timeoutSec 兜底过期时间
* @return 获取是否成功 true成功
*/
boolean tryLock(long timeoutSec);

/**
* 释放锁
*/
void unLock();

}

impl

package com.ayaka.utils;

import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;

public class LockImpl implements ILock{

/**
* redis
*/
private StringRedisTemplate stringRedisTemplate;
/**
* 锁名称
*/
private String name;
public LockImpl(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}

/**
* 锁前缀
*/
private static final String KEY_PREFIX = "lock:";

/**
* 锁的唯一标识
*/
private String ID_PREFIX = UUID.randomUUID().toString(true);

/**
* 尝试获取锁
* @param timeoutSec 兜底过期时间
* @return 获取是否成功 true成功
*/
@Override
public boolean tryLock(long timeoutSec) {
// 锁的唯一标识:这里用 UUID + 线程id
String value = ID_PREFIX + Thread.currentThread().getId();
// 获取锁的 key
String key = KEY_PREFIX + name;

//尝试获取锁
Boolean isLock = stringRedisTemplate
.opsForValue()
.setIfAbsent(key, value, timeoutSec, TimeUnit.SECONDS);

//返回结果
//return Boolean.TRUE.equals(isLock); //或者 👇
return BooleanUtil.isTrue(isLock);
}


/**
* 释放锁
*/
@Override
public void unLock() {
//释放锁
Boolean delete = stringRedisTemplate.delete(KEY_PREFIX + name);
}

}

业务:

package com.ayaka.service.impl;

import com.ayaka.dto.Result;
import com.ayaka.entity.SeckillVoucher;
import com.ayaka.entity.VoucherOrder;
import com.ayaka.mapper.VoucherOrderMapper;
import com.ayaka.service.ISeckillVoucherService;
import com.ayaka.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ayaka.utils.LockImpl;
import com.ayaka.utils.RedisIdWorker;
import com.ayaka.utils.UserHolder;
import org.springframework.aop.framework.AopContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;


/**
* <p>
* 服务实现类
* </p>
*/
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Resource
private ISeckillVoucherService iSeckillVoucherService;

@Resource
private RedisIdWorker redisIdWorker;

@Resource
private StringRedisTemplate stringRedisTemplate;

/**
* 实现订单秒杀业务
*
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷信息
SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
Integer stock = voucher.getStock();
System.out.println(voucher);
//2.判断优惠卷是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始,返回异常给前端
return Result.fail("秒杀尚未开始!");
}
//3.判断优惠卷是否过期
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
//秒杀已结束,返回前端异常信息
return Result.fail("秒杀已结束!");
}
//4.判断优惠卷库存是否充足
if (stock < 1) {
//秒杀卷库存不足,返回给前端异常信息
return Result.fail("库存不足!");
}


//一人一单问题
//获取用户id
Long userId = UserHolder.getUser().getId();

//缩小悲观锁范围
/*synchronized (userId.toString().intern()) {
//获取当前代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
//通过代理对象调用 保证事务的正常
return proxy.queryOrderVoucherSave(voucherId);
}*/

//获取分布式锁对象
LockImpl lock = new LockImpl(stringRedisTemplate,"order:" + userId);
//尝试获取锁
boolean isLock = lock.tryLock(1200);
//判断是否获取成功
if (!isLock){
//获取锁失败
return Result.fail("不能重复下单!");
}
//成功 执行业务
try{
//获取当前代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
//通过代理对象调用 保证事务的正常
return proxy.queryOrderVoucherSave(voucherId);
}finally{
//确保锁的释放
lock.unLock();
}


}


/**
*<br> 几个重点:
*<br> 1. 一人一单问题使用 悲观锁 还是 乐观锁?
*<br> 使用悲观锁
*<br> 2. 使用悲观锁 synchronized 加载 方法上 还是 内部?
*<br> 内部,如果加载方法上,那整个订单业务都是串行,那刚刚解决的 超卖问题[乐观锁]也没意义了
*<br> 3. synchronized 的锁对象是什么
*<br> userId.toString().intern()
*<br> 而不是 userId.toString()
*<br> Long的 toString() 底层是:
*<br> return new String(buf, UTF16);
*<br> .intern()方法是:返回字符串对象的规范表示。
*<br> 4. 对于事务的添加 是 锁释放完了再提交 还是 提交完了再释放锁
*<br> 提交完了再释放锁
*<br> 具体操作:
*<br> 1.在方法上加上@Transactional
*<br> 2.在调用者 调用语句外加上 synchronized
*<br> synchronized (userId.toString().intern()) {
*<br> return queryOrderVoucherSave(voucherId);
*<br> }
*<br> 5. 事务失效问题
*<br> spring的事务是 AOP动态代理的
*<br> this.queryOrderVoucherSave(voucherId) //并非是代理对象
*<br> 解决方法
*<br> 这里使用 获取动态代理的方式 :
*<br> //获取动态代理对象
*<br> IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
*<br> //通过代理对象调用
*<br> proxy.queryOrderVoucherSave(voucherId);
*<br> 注意: 使用AopContext.currentProxy()
*<br> 导入aspectjweaver依赖
*<br> 开启 @EnableAspectJAutoProxy(exposeProxy = true) 暴露代理对象
*<br>
*<br> @param voucherId
*<br> @return
*/
@Transactional
public Result queryOrderVoucherSave(Long voucherId) {

//获取用户id
Long userId = UserHolder.getUser().getId();

//判断订单表中是否已经存在
int count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherId)
.count();
if (count > 0) {
//存在,返回前端信息
return Result.fail("你已领取,每人只能领取一份!");
}

//以上都满足 施行扣减下单业务
//5.扣减库存
boolean isOK = iSeckillVoucherService
.update()
.setSql("stock =stock - 1")
.eq("voucher_id", voucherId)
//.eq("stock",stock) // CAS乐观锁
// CAS乐观锁改进 stock > 0 就可以执行下单业务
.gt("stock", 0)
.update();
if (!isOK) {
//秒杀失败,返回给前端异常信息
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1.生成订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2.设置用户id
voucherOrder.setUserId(userId);
//6.3.设置代金卷id
voucherOrder.setVoucherId(voucherId);
//6.4.当生产的订单id写入数据库
this.save(voucherOrder);

//7.返回订单ID
return Result.ok(orderId);

}


}

存在的问题:误删问题

上面的简单实现

在正常情况下:

image-20221030213559181

极端情况下:

image-20221030214121368

解决方案:

image-20221030214149274

对 上面代码优化:

impl

package com.ayaka.utils;

import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;

public class LockImpl implements ILock{

/**
* redis
*/
private StringRedisTemplate stringRedisTemplate;
/**
* 锁名称
*/
private String name;
public LockImpl(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}

/**
* 锁前缀
*/
private static final String KEY_PREFIX = "lock:";

/**
* 锁的唯一标识
*/
private String ID_PREFIX = UUID.randomUUID().toString(true);

/**
* 尝试获取锁
* @param timeoutSec 兜底过期时间
* @return 获取是否成功 true成功
*/
@Override
public boolean tryLock(long timeoutSec) {
// 锁的唯一标识:这里用 UUID + 线程id
String value = ID_PREFIX + Thread.currentThread().getId();
// 获取锁的 key
String key = KEY_PREFIX + name;

//尝试获取锁
Boolean isLock = stringRedisTemplate
.opsForValue()
.setIfAbsent(key, value, timeoutSec, TimeUnit.SECONDS);

//返回结果
//return Boolean.TRUE.equals(isLock); //或者 👇
return BooleanUtil.isTrue(isLock);
}


/**
* 释放锁
*/
@Override
public void unLock() {

//判断将要释放的锁 的 线程表示是否一致 解决分布式锁误删问题

//锁的唯一标识:这里用 UUID + 线程id
String value = ID_PREFIX + Thread.currentThread().getId();
//获取锁的 key
String key = KEY_PREFIX + name;
//获取锁的标识
String value2 = stringRedisTemplate.opsForValue().get(key);

//判断将要释放的锁 的 线程表示是否一致 解决分布式锁误删问题
if (value.equals(value2)){
//释放锁
stringRedisTemplate.delete(key);
}
//否则 不释放锁
}

}

依然存在问题:原子性

删除锁时 判断锁的标识 和 释放锁 并发问题

极端情况下:

判断锁的标识 后 发生阻塞,超时释放了锁,此时其它线程获取锁,那么这个线程释放的锁 就是 其他线程的锁了

image-20221030221249309

改进方案:

  1. Redis的事务功能:麻烦不用
  2. Redis的Lua脚本

Redis的Lua脚本

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种 编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html 这里重点介绍Redis提供的调用函数,语法如下:

# 执行redis命令
redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行set name jack,则脚本是这样:

# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行set name Rose,再执行get name,则脚本如下:

先执行 set name jack
redis.call('set', 'name', 'jack')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

Redis的Lua脚本的执行

image-20221030222016825

释放锁的业务流程是这样的:

  1. 获取锁中的线程标示

  2. 判断是否与指定的标示(当前线程标示)一致

  3. 如果一致则释放锁(删除)

  4. 如果不一致则什么都不做 如果用Lua脚本来表示则是这样的:

    -- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
    -- 获取锁中的标示,判断是否与当前线程标示一致
    if (redis.call('GET', KEYS[1]) == ARGV[1]) then
    -- 一致,则删除锁
    return redis.call('DEL', KEYS[1])
    end
    -- 不一致,则直接返回
    return 0

对之前的impl进行优化:

resources/unlock.lua

-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致

if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end

-- 不一致,则直接返回
return 0

impl

package com.ayaka.utils;

import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class LockImpl implements ILock{

/**
* redis
*/
private StringRedisTemplate stringRedisTemplate;
/**
* 锁名称
*/
private String name;
public LockImpl(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}

/**
* 锁前缀
*/
private static final String KEY_PREFIX = "lock:";

/**
* 锁的唯一标识
*/
private String ID_PREFIX = UUID.randomUUID().toString(true);

/**
* 初始化Lua脚本对象 RedisScript的实现类
*/
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
//创建 RedisScript的实现类 DefaultRedisScript
UNLOCK_SCRIPT = new DefaultRedisScript<Long>();
//设置Lua脚本位置
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
//设置脚本执行后的返回值类型
UNLOCK_SCRIPT.setResultType(Long.class);
}


/**
* 尝试获取锁
* @param timeoutSec 兜底过期时间
* @return 获取是否成功 true成功
*/
@Override
public boolean tryLock(long timeoutSec) {
// 锁的唯一标识:这里用 UUID + 线程id
String value = ID_PREFIX + Thread.currentThread().getId();
// 获取锁的 key
String key = KEY_PREFIX + name;

//尝试获取锁
Boolean isLock = stringRedisTemplate
.opsForValue()
.setIfAbsent(key, value, timeoutSec, TimeUnit.SECONDS);

//返回结果
//return Boolean.TRUE.equals(isLock); //或者 👇
return BooleanUtil.isTrue(isLock);
}


/**
* 释放锁
*/
@Override
public void unLock() {

//判断将要释放的锁 的 线程表示是否一致 解决分布式锁误删问题

//锁的唯一标识:这里用 UUID + 线程id
String value = ID_PREFIX + Thread.currentThread().getId();
//获取锁的 key
String key = KEY_PREFIX + name;

//判断将要释放的锁 的 线程表示是否一致 解决分布式锁误删问题
//使用Lua脚本 确保 [判断标识] 和 [释放锁] 的 原子性
stringRedisTemplate
.execute(UNLOCK_SCRIPT, //Lua脚本对象
Collections.singletonList(key), //KEYS[1] list
value); //ARGV[1] object

//否则 不释放锁
}


}

到此 实现了一个较为完善的 基于Redis的分布式锁

但是…….在某些场景下 依然需要优化…….


基于Redis的分布式锁优化


还有些问题可以进一步优化:

image-20221030231315439

这些实现起来比较繁琐

可以使用开源框架去解决:

**使用 Redisson **👇


Redisson解决Redis分布式锁

Redisson介绍


Redisson Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。

它不仅提供了一系列的分布 式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

image-20221031105827347

官网地址: https://redisson.org

GitHub地址: https://github.com/redisson/redisson

简单的使用

package com.ayaka.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
* Redisson 初始化配置
*/
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient1() {

Config config = new Config();
//链接Redis
config.useSingleServer()
.setAddress("redis://ayaka520:6379")
.setPassword("gangajiang521");
//解耦合 可以使用yaml的方式 解耦合
//通过Redisson.create(config) 指定配置文件 创建RedissonClient
return Redisson.create(config);
}

//@Bean
public RedissonClient redissonClient2() {
Config config = new Config();
config.useSingleServer().setAddress("redis://ayaka521:6380");
return Redisson.create(config);
}

//@Bean
public RedissonClient redissonClient3() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://ayaka521:6381");
return Redisson.create(config);
}

}

业务改造

package com.ayaka.service.impl;

import com.ayaka.dto.Result;
import com.ayaka.entity.SeckillVoucher;
import com.ayaka.entity.VoucherOrder;
import com.ayaka.mapper.VoucherOrderMapper;
import com.ayaka.service.ISeckillVoucherService;
import com.ayaka.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ayaka.utils.RedisIdWorker;
import com.ayaka.utils.UserHolder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;


/**
* <p>
* 服务实现类
* </p>
*/
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Resource
private ISeckillVoucherService iSeckillVoucherService;

@Resource
private RedisIdWorker redisIdWorker;

//@Resource
//private StringRedisTemplate stringRedisTemplate;

/**
* Redisson
*/
@Resource
private RedissonClient redissonClient;

/**
* 实现订单秒杀业务
*
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷信息
SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
Integer stock = voucher.getStock();
//2.判断优惠卷是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始,返回异常给前端
return Result.fail("秒杀尚未开始!");
}
//3.判断优惠卷是否过期
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
//秒杀已结束,返回前端异常信息
return Result.fail("秒杀已结束!");
}
//4.判断优惠卷库存是否充足
if (stock < 1) {
//秒杀卷库存不足,返回给前端异常信息
return Result.fail("库存不足!");
}


//一人一单问题
//获取用户id
Long userId = UserHolder.getUser().getId();

//缩小悲观锁范围
/*synchronized (userId.toString().intern()) {
//获取当前代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
//通过代理对象调用 保证事务的正常
return proxy.queryOrderVoucherSave(voucherId);
}*/

//获取分布式锁对象
//LockImpl lock = new LockImpl(stringRedisTemplate,"order:" + userId);

//从Redisson中获取锁
RLock lock = redissonClient.getLock("lock:order:" + userId);

//尝试获取锁
boolean isLock = lock.tryLock();

//判断是否获取成功
if (!isLock){
//获取锁失败
return Result.fail("不能重复下单!");
}
//成功 执行业务
try{
//获取当前代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
//通过代理对象调用 保证事务的正常
return proxy.queryOrderVoucherSave(voucherId);
}finally{
//确保锁的释放
lock.unlock();
}
}

@Transactional
public Result queryOrderVoucherSave(Long voucherId) {

//获取用户id
Long userId = UserHolder.getUser().getId();

//判断订单表中是否已经存在
int count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherId)
.count();
if (count > 0) {
//存在,返回前端信息
return Result.fail("你已领取,每人只能领取一份!");
}

//以上都满足 施行扣减下单业务
//5.扣减库存
boolean isOK = iSeckillVoucherService
.update()
.setSql("stock =stock - 1")
.eq("voucher_id", voucherId)
//.eq("stock",stock) // CAS乐观锁
// CAS乐观锁改进 stock > 0 就可以执行下单业务
.gt("stock", 0)
.update();
if (!isOK) {
//秒杀失败,返回给前端异常信息
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1.生成订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2.设置用户id
voucherOrder.setUserId(userId);
//6.3.设置代金卷id
voucherOrder.setVoucherId(voucherId);
//6.4.当生产的订单id写入数据库
this.save(voucherOrder);

//7.返回订单ID
return Result.ok(orderId);

}


}

image-20221030231315439

Redisson可重入锁问题

原理:

image-20221031105235063


使用Lua脚本实现 — 获取锁

image-20221031110822288

源码:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

使用Lua脚本实现 — 释放锁

image-20221031111022292

源码:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

可重试–WatchDog机制


源码jiji看 jiji分析

超时释放–发布订阅/信号量


源码jiji看 jiji分析

Redisson分布式锁的原理

image-20221102084703840

Redisson分布式锁原理:

  • 可重入:利用hash结构记录线程id和重入次数
  • 可重试:利用信号量和PubSub功能实现等待、唤醒,获取 锁失败的重试机制
  • 超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间


Redisson主从一致性问题


image-20221102090850998

image-20221102091109610

使用:

RedissonConfig

package com.ayaka.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

/**
* Redisson 初始化配置
*/
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient1() throws IOException {

Config config = new Config();
//链接Redis
config.useSingleServer()
.setAddress("redis://ayaka520:6379")
.setPassword("gangajiang521");

//通过Redisson.create(config) 指定配置文件 创建RedissonClient
return Redisson.create(config);
}

@Bean
public RedissonClient redissonClient2() throws IOException {
Config config = new Config();
config.useSingleServer().setAddress("redis://ayaka521:6380");
return Redisson.create(config);
}

@Bean
public RedissonClient redissonClient3() throws IOException {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://ayaka521:6381");
return Redisson.create(config);
}


}

RedissonTests

package com.ayaka;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@Slf4j
@SpringBootTest
class RedissonTest {

@Resource
private RedissonClient redissonClient1;

@Resource
private RedissonClient redissonClient2;

@Resource
private RedissonClient redissonClient3;


private RLock lock;
//创建连锁
@BeforeEach
void setUp() {
//获取 RLock对象
RLock lock1 = redissonClient1.getLock("lock:test");
RLock lock2 = redissonClient2.getLock("lock:test");
RLock lock3 = redissonClient3.getLock("lock:test");
//创建连锁
lock = redissonClient1.getMultiLock(lock1,lock2,lock3);
}


@Test
void method1() throws InterruptedException {
// 尝试获取锁
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
if (!isLock) {
log.error("获取锁失败 .... 1");
return;
}
try {
log.info("获取锁成功 .... 1");
method2();
log.info("开始执行业务 ... 1");
} finally {
log.warn("准备释放锁 .... 1");
lock.unlock();
}
}
void method2() {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 .... 2");
return;
}
try {
log.info("获取锁成功 .... 2");
log.info("开始执行业务 ... 2");
} finally {
log.warn("准备释放锁 .... 2");
lock.unlock();
}
}
}

总结


总结

1)不可重入Redis分布式锁:

  • 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判 断线程标示

  • 缺陷:不可重入、无法重试、锁超时失效

2)可重入的Redis分布式锁:

  • 原理:利用hash结构,记录线程标示和重入次数;利用 watchDog延续锁时间;利用>信号量控制锁重试等待

  • 缺陷:redis宕机引起锁失效问题

image-20221102084703840

3)Redisson的multiLock:

  • 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
  • 缺陷:运维成本高、实现复杂

image-20221102091109610

这样就就觉了分布式锁的问题

但是,还可以继续优化:


Redis解决 [MQ] [消息队列]


秒杀业务的优化

image-20221104151321789

改进方案:

image-20221104151803752

image-20221104151933960

image-20221104152114257

Redis 解决消息队列效果并不好 这里就不实现了 还dai是带MQ的技术去解决 RabbitMQ…



Redis解决 [点赞] [排行榜]

分析 与 问题

Redis类型的选择:

  • 一个用户不能重复点赞 – 集合元素不可重复
  • 点赞榜 Top5 需要排序 – 集合需要排序 最终选出前5个数据 || Top5(最先点赞的前5人)

image-20221123193728144

所以 选择了 SortedSet 数据类型

  • key : blog:liked: + 博客id
  • member:用户id
  • score:时间戳

SorteddSet 要用的的命令

添加成员 — 新增点赞

image-20221123194309452

  • reids命令:zadd blog:liked:博客id 用户id 时间戳
  • java命令: stringRedisTemplate.opsForZSet().add(key,userId,System.currentTimeMillis());

判断是否为该成员 — 判断该用户是否为该博客点赞过

image-20221123194947893

  • reids命令:zscore blog:liked:博客id 时间戳 返回部位 nil 为该成员
  • java命令: Double isMemberScore = stringRedisTemplate.opsForZSet().score(key, userId); 不为 null 为该成员

按分数顺序查询成员 — 实现点赞 Top5 功能

image-20221123195204150

  • reids命令:zscore blog:liked:博客id 时间戳 返回部位 nil 为该成员
  • java命令: Set<String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);

几个坑

数据库问题:

/**
根据博客id 获取点赞排行榜前 5 名 <br>
*/
@Override
public Result likesBlogTop(Long id) {
String key = BLOG_LIKED_KEY + id.toString();
//根据博客id查询点赞前 5 名
Set<String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);// top5
if (range == null){
return Result.ok();
}
List<Long> ids = range.stream().map(Long::valueOf).collect(Collectors.toList());
if (ObjectUtil.isEmpty(ids)){
return Result.ok();
}
String idStr = StrUtil.join("," , ids);
System.out.println(idStr);
//根据这些用户 id 获取用户信息
List<UserDTO> userDTOS = userService
//.listByIds(ids)
.query()
.in("id",ids)
.last("order by field(id,"+ idStr + ")")
.list()
.stream()//封装成 UserDTO 防止敏感信息泄露
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());

return Result.ok(userDTOS);
}

根据博客id 获取点赞排行榜前 5 名

有一个坑:

select * from tb_user where id in(3,2,1); 的查询结果顺序 是 1 2 3

select * from tb_user where id in(3,2,1) order by field(id,3,2,1)

这样才能保证按给定的顺序查询

这里依然有个坑!!

当未有用户点赞时 :SQL语句:SELECT * FROM tb_user WHERE (id IN ()) order by field(id,)语法错误

解决方法 :提前判断

if (ObjectUtil.isEmpty(ids)){
return Result.ok();
}

空指针异常

//判断当前用户是否为当前博客点赞
private void isLiked(Blog blog){
UserDTO userDTO = UserHolder.getUser();
if (userDTO == null){
//用户未登录,不需要查询是否点赞过
return;
}
String userId = userDTO.getId().toString();
String key = BLOG_LIKED_KEY + blog.getId().toString();
//Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
Double isMember = stringRedisTemplate.opsForZSet().score(key, userId.toString());
//封装blog
blog.setIsLike(isMember != null);
}

用户未登录 会报空指针异常 提前判断一下

同一用户多地登录 并发安全问题 TODO: T.T

更多异常 TODO: T.T

库表设置

image-20221123112636816

CREATE TABLE `tb_blog` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`shop_id` bigint(20) NOT NULL COMMENT '商户id',
`user_id` bigint(20) unsigned NOT NULL COMMENT '用户id',
`title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '标题',
`images` varchar(2048) NOT NULL COMMENT '探店的照片,最多9张,多张以","隔开',
`content` varchar(2048) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '探店的文字描述',
`liked` int(8) unsigned DEFAULT '0' COMMENT '点赞数量',
`comments` int(8) unsigned DEFAULT NULL COMMENT '评论数量',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;

实体类

package com.ayaka.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.time.LocalDateTime;


@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_blog")
public class Blog implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 商户id
*/
private Long shopId;
/**
* 用户id
*/
private Long userId;
/**
* 用户图标
*/
@TableField(exist = false)
private String icon;
/**
* 用户姓名
*/
@TableField(exist = false)
private String name;
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;

/**
* 标题
*/
private String title;

/**
* 探店的照片,最多9张,多张以","隔开
*/
private String images;

/**
* 探店的文字描述
*/
private String content;

/**
* 点赞数量
*/
private Integer liked;

/**
* 评论数量
*/
private Integer comments;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;


}

BlogController

package com.ayaka.controller;


import com.ayaka.dto.Result;
import com.ayaka.dto.UserDTO;
import com.ayaka.entity.Blog;
import com.ayaka.service.IBlogService;
import com.ayaka.utils.UserHolder;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

/**
* <p>
* 前端控制器
* </p>
*/
@RestController
@RequestMapping("/blog")
public class BlogController {

@Resource
private IBlogService blogService;

@PostMapping
public Result saveBlog(@RequestBody Blog blog) {

// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
blogService.save(blog);
// 返回 id
return Result.ok(blog.getId());
}


@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
// 修改点赞数量
return blogService.likeBlog(id);
}

@GetMapping("/likes/{id}")
public Result likesBlogTop(@PathVariable("id") Long id){
// 点赞榜单 前 5 位
return blogService.likesBlogTop(id);
}


@GetMapping("/of/me")
public Result queryMyBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
return blogService.queryMyBlog(current);
}

@GetMapping("/of/user")
public Result queryOfUserBlog(@RequestParam("id") Long id,
@RequestParam(value = "current", defaultValue = "1") Integer current){
return blogService.queryOfUserBlog(id, current);
}

@GetMapping("/hot")
public Result queryHotBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
return blogService.queryHotBlog(current);
}

@GetMapping("/{id}")
public Result queryByIdBlog(@PathVariable Long id){
return blogService.queryByIdBlog(id);
}

}

BlogServer

package com.ayaka.service.impl;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.ayaka.dto.Result;
import com.ayaka.dto.UserDTO;
import com.ayaka.entity.Blog;
import com.ayaka.entity.User;
import com.ayaka.mapper.BlogMapper;
import com.ayaka.service.IBlogService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ayaka.service.IUserService;
import com.ayaka.utils.SystemConstants;
import com.ayaka.utils.UserHolder;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static com.ayaka.utils.RedisConstants.BLOG_LIKED_KEY;

/**
* <p>
* 服务实现类
* </p>
*/
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {

@Resource
private IUserService userService;

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = this.query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(blog -> {
// 封装blog
queryUserBlog(blog);
// 判断用户是否点赞
isLiked(blog);
});


return Result.ok(records);
}


/**
* 根据id查询博客
* @param id
* @return
*/
@Override
public Result queryByIdBlog(Long id) {
//根据id查询博客
Blog blog = this.getById(id);
if (ObjectUtil.isEmpty(blog)){
return Result.fail("博客不存在!");
}
//封装后的 blog 对象
this.queryUserBlog(blog);
// 判断用户是否点赞
isLiked(blog);
return Result.ok(blog);
}

/**
* 更改点赞
* @param id
* @return
*/
@Override
public Result likeBlog(Long id) {

//1.获取用户 id
String userId = UserHolder.getUser().getId().toString();
String key = BLOG_LIKED_KEY + id;

//2.判断当前用户是否点赞
//Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId);
Double isMemberScore = stringRedisTemplate.opsForZSet().score(key, userId);
if (isMemberScore == null){
//3.未点赞
//3.1 数据库点赞数 +1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
//3.2 redis中 添加zset 成员
if (isSuccess){
//stringRedisTemplate.opsForSet().add(key,userId);
stringRedisTemplate.opsForZSet().add(key,userId,System.currentTimeMillis()); //key value score
}
}else{
//4.未点赞
//4.1 数据库点赞数 -1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
//4.2 redis中 删除zset 成员
if (isSuccess){
//stringRedisTemplate.opsForSet().remove(key,userId);
stringRedisTemplate.opsForZSet().remove(key,userId);
}
}

return Result.ok();
}

/**
* 根据博客id 获取点赞排行榜前 5 名 <br>
* 注意: <br>
* 有一个坑: <br>
* select * from tb_user where id in(3,2,1); 的查询结果顺序 是 1 2 3 <br>
* select * from tb_user where id in(3,2,1) order by field(id,3,2,1) <br>
* 这样才能保证按给定的顺序查询
*
* @param id
* @return
*/
@Override
public Result likesBlogTop(Long id) {
String key = BLOG_LIKED_KEY + id.toString();
//根据博客id查询点赞前 5 名
Set<String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);// top5
if (range == null){
return Result.ok();
}
List<Long> ids = range.stream().map(Long::valueOf).collect(Collectors.toList());
if (ObjectUtil.isEmpty(ids)){
return Result.ok();
}
String idStr = StrUtil.join("," , ids);
//根据这些用户 id 获取用户信息
List<UserDTO> userDTOS = userService
//.listByIds(ids)
.query()
.in("id",ids)
.last("order by field(id,"+ idStr + ")")
.list()
.stream()//封装成 UserDTO 防止敏感信息泄露
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());

return Result.ok(userDTOS);
}

@Override
public Result queryOfUserBlog(Long id, Integer current) {
//根据用户查询blog
Page<Blog> page = query()
.eq("user_id", id)
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
//获取当前页面数据
List<Blog> records = page.getRecords();
return Result.ok(records);
}

@Override
public Result queryMyBlog(Integer current) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
// 根据用户查询
Page<Blog> page = query()
.eq("user_id", user.getId()).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
return Result.ok(records);
}


//为博客 并设置用户信息
private void queryUserBlog(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}

//判断当前用户是否为当前博客点赞
private void isLiked(Blog blog){
UserDTO userDTO = UserHolder.getUser();
if (userDTO == null){
//用户未登录,不需要查询是否点赞过
return;
}
String userId = userDTO.getId().toString();
String key = BLOG_LIKED_KEY + blog.getId().toString();
//Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
Double isMember = stringRedisTemplate.opsForZSet().score(key, userId.toString());
//封装blog
blog.setIsLike(isMember != null);
}

}

Redis的效果

image-20221123193540629

Redis解决 [关注] [共同关注]

分析

单纯的关注 取消关注功能 只用数据库就行

查看共同关注

Redis数据类型选择 Set类型

  • key : follow:user: + 用户id
  • member:关注用户的id

Set类型 有一个命令可以查询 诺干个key的交集

image-20221124104641565

  • reids命令:sinter follow:user:3 follow:user:6 用户id:3 和 用户id:6 的共同关注
  • java命令: stringRedisTemplate.opsForSet().intersect(key1,key2);

库表设计

image-20221124103904419

CREATE TABLE `tb_follow` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_id` bigint(20) unsigned NOT NULL COMMENT '用户id',
`follow_user_id` bigint(20) unsigned NOT NULL COMMENT '关联的用户id',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;

实体类

package com.ayaka.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
* <p>
*
* </p>
*
* @author 虎哥
* @since 2021-12-22
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_follow")
public class Follow implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;

/**
* 用户id
*/
private Long userId;

/**
* 关联的用户id
*/
private Long followUserId;

/**
* 创建时间
*/
private LocalDateTime createTime;


}

FollowController

package com.ayaka.controller;


import com.ayaka.dto.Result;
import com.ayaka.service.IFollowService;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

/**
* <p>
* 前端控制器
* </p>
*/
@RestController
@RequestMapping("/follow")
public class FollowController {

@Resource
private IFollowService followService;

/**
* 根据用户id 修改关注 取消关注
* @param id
* @param isFollow
* @return
*/
@PutMapping("/{id}/{isFollow}")
public Result followUser(@PathVariable("id") Long id, @PathVariable("isFollow") boolean isFollow){
return followService.followUser(id,isFollow);
}

/**
* 根据 id 判断是否关注
* @param id
* @return
*/
@GetMapping("/or/not/{id}")
public Result notFollow(@PathVariable("id") Long id){
return followService.notFollow(id);
}

/**
* 共同关注查询
* @param id
* @return
*/
@GetMapping("/common/{id}")
public Result followCommon(@PathVariable("id") Long id){
return followService.followCommon(id);
}

}

FollowService

package com.ayaka.service.impl;

import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.ayaka.dto.Result;
import com.ayaka.dto.UserDTO;
import com.ayaka.entity.Follow;
import com.ayaka.mapper.FollowMapper;
import com.ayaka.service.IFollowService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ayaka.service.IUserService;
import com.ayaka.utils.UserHolder;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static com.ayaka.utils.RedisConstants.Follow_USER_KEY;

/**
* <p>
* 服务实现类
* </p>
*
* @author 虎哥
* @since 2021-12-22
*/
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {

@Resource
private IUserService userService;

@Resource
private StringRedisTemplate stringRedisTemplate;

/**
* 根据 id 修改 关注/取消关注
* @param id
* @param isFollow
* @return
*/
@Override
public Result followUser(Long id, boolean isFollow) {
//1.获取该用户 id
Long userId = UserHolder.getUser().getId();
//2.获取 要判断是否关注的用户id
Long followUserId = userService.getById(id).getId();
//3.判断 关注 还是 取消关注
String key = Follow_USER_KEY + userId;
if (isFollow){
//关注逻辑
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
boolean isSuccess = save(follow);
//存入Redis
if (isSuccess){
stringRedisTemplate.opsForSet().add(key, id.toString());
}

}else{
//取消关注的逻辑
//数据库中移除
LambdaQueryWrapper<Follow> queryWrapper = new LambdaQueryWrapper<Follow>();
queryWrapper.eq(Follow::getUserId,userId).eq(Follow::getFollowUserId,followUserId);
boolean isSuccess = remove(queryWrapper);
//Redis中移除
if (isSuccess){
stringRedisTemplate.opsForSet().remove(key, id.toString());
}
}

return Result.ok();
}

/**
* 根据 id 判断是否关注
* @param id
* @return
*/
@Override
public Result notFollow(Long id) {
//获取用户id
Long userId = UserHolder.getUser().getId();
//从数据库中查询
Integer count = query().eq("user_id", userId)
.eq("follow_user_id", id)
.count();
//判断是否已经关注
return Result.ok(count > 0);
}

/**
* 共同关注查询
* @param id
* @return
*/
@Override
public Result followCommon(Long id) {
//1.获取当前用户id key
String key1 = Follow_USER_KEY + UserHolder.getUser().getId().toString();
//2.获取查询用户id key
String key2 = Follow_USER_KEY + id.toString();
//3.从Redis中求交集
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);
//4.转换为集合
if (ObjectUtils.isEmpty(intersect)){
return Result.ok(Collections.emptyList());
}
List<Long> idList = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
//查询用户
List<UserDTO> users =
userService.listByIds(idList)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(users);
}
}

Redis效果

image-20221124104624870

Redis解决 [推送] [Feed流]

Feed流

关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。

image-20221124231338092

image-20221124231401963

Feed流的模式

Feed流产品有两种常见模式:

  • Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈

    • ➢ 优点:信息全面,不会有缺失。并且实现也相对简单
    • ➢ 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
  • 智能排序[Rank]:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户

    • ➢ 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
    • ➢ 缺点:如果算法不精准,可能起到反作用

Feed流 Timeline模式

Feed流,Timeline的模式,该模式的实现方案有三种:

  1. 拉模式
  2. 推模式
  3. 推拉结合

拉模式


image-20221124231652089


推模式


image-20221124231726647


推拉结合


image-20221124231742522


Feed流的实现方案


image-20221124231929634


案例分析

案例要求:

点击关注后,会查询出该用户关注的其他用户的博客,按时间降序排列,滚动分页。

使用 Timeline模式 的 推模式

接口:


image-20221124234334506


Redis的类型选择

使用 SortedSet 数据类型

  • key : feed:user: + 用户id
  • member:关注用户的博客id (被关注者发布的博客)
  • score:博客发布的时间戳

当一个用户发布博客时被关注者发布的博客id

根据当前用户,查询所有的关注者,

得到所有关注者的id 即: feed:user: + 用户id 后,

保存到 Redis 当中:

image-20221125001818690

要使用的命令

发布订阅时,向用户推送,被关在的用户会保存该博客id

image-20221125000345829

  • reids命令:zadd feed:user:3 5 521 id为x的用户,在521时刻,发布了id为5的博客,并推送给了id为3用户。
  • java命令: stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());

用户获取订阅的内容,可以进行分页

image-20221125002735581

  • reids命令:ZREMRANGEBYSCORE follow:user:5 最大时间戳 0 withscores limit 0 3 首次

  • reids命令:ZREMRANGEBYSCORE follow:user:5 上次最小时间戳 0 withscores limit 上次最小时间戳次数 3 下次

  • java命令: stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, maxTime, offset, 2);

    • 返回一个 ZSetOperations.TypedTuple<String>>

    • ```java
      //TypedTuple接口

    @Nullable
    V getValue(); //成员

    @Nullable
    Double getScore(); //分数











    ---

    ### ===开始实现===

    ---





    ### 返回给前端实体

    用于分页擦寻后 返回的数据

    ```java
    package com.ayaka.dto;

    import lombok.Data;

    import java.util.List;

    @Data
    public class ScoreResult {

    //分页后的数据
    private List<?> list;
    //最小时间戳 用于下次请求分页
    private Long minTime;
    //最小时间戳出现的次数 用于下次请求分页的偏移量
    private Integer offset;

    }

[发布] 信息的发布

保存博客 并 实现推送功能

image-20221124230414472

/**
* 接口
*
* 保存博客 并 推送
*/
@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
//保存博客 并推送Feed流
return blogService.saveBlog(blog);
}


/**
* 业务
*
* 保存博客 并 推送
* @param blog
* @return
*/
@Override
public Result saveBlog(Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
boolean isSuccess = save(blog);
if (!isSuccess){
return Result.fail("新增笔记失败!");
}
// 推送给关注的 用户 Feet流 推模式
//1.获取粉丝id
List<Follow> follows = followService.query().eq("follow_user_id", UserHolder.getUser().getId()).list();
//2.推送到邮箱当中
for (Follow follow: follows) {
//3.以 SortedSet 类型存储
String key = FEED_USER_KEY + follow.getUserId();
stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
}

// 返回 id
return Result.ok(blog.getId());
}

[接收] 滚动分页查询的实现

image-20221125032511467

/**
* 前端控制器
*
* 分页查询当前用户 关注用户的博客
* Feed流 取
* @param maxTime
* @param offset
* @return
*/
@GetMapping("/of/follow")
private Result queryOfFollow(@RequestParam("lastId") Long maxTime,
@RequestParam(value = "offset",defaultValue = "0") Integer offset){
return blogService.queryOfFollow(maxTime,offset);
}


/**
* 业务层
*
* 分页查询当前用户 关注用户的博客
* @param maxTime
* @param offset
* @return
*/
@Override
public Result queryOfFollow(Long maxTime, Integer offset) {
//1.获取用户id
Long userId = UserHolder.getUser().getId();
//2.从Redis用户信箱中取出 关注用的发布的博客
String key = FEED_USER_KEY + userId.toString();
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate
.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, maxTime, offset, 2);
//3.判断邮件是否为空
if (ObjectUtil.isEmpty(typedTuples)){
return Result.ok();
}

//4.进行解析 博客id集合 当前页最小时间戳:minTime 最小时间戳出现的次数
ArrayList<Long> blogIds = new ArrayList<>(typedTuples.size());
long minTime = 0;
int minCount = 1;
for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
//关注的博客id
Long blogId = Long.valueOf(tuple.getValue());
blogIds.add(blogId);
//获取最小分数 和 最小分数个数
long score = tuple.getScore().longValue();
if (score == minTime){
minCount ++;
}else{
minTime = score;
minCount = 1;
}
}

//5.根据 博客id集合 查询博客对象
String idStr = StrUtil.join(",", blogIds);
List<Blog> blogs = query()
.in("id", blogIds)
.last("order by field(id," + idStr + ")")
.list();
//给博客封装非数据库字段
blogs.forEach(blog -> {
//封装后的 blog 对象
queryUserBlog(blog);
// 判断用户是否点赞
isLiked(blog);
});

//6.封装 ScoreResult 对象
ScoreResult scoreResult = new ScoreResult();
scoreResult.setList(blogs);
scoreResult.setMinTime(minTime);
scoreResult.setOffset(minCount);

//7.返回个前端
return Result.ok(scoreResult);
}

Redis解决 [地理] [附近店铺]

分析

附近商户搜索

使用GEO数据类型

GEO就是Geolocation的简写形式,代表地理坐标。

Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。

image-20221127194357429

要使用的命令

添加地理坐标

image-20221127191501135

获取相对 位置/距离

image-20221127191942091

接口

image-20221127193942434

/**
* 根据商铺类型分页查询商铺信息
* @param typeId 商铺类型
* @param current 页码
* @param x 经度
* @param y 维度
* @return 商铺列表
*/
@GetMapping("/of/type")
public Result queryShopByType(
@RequestParam("typeId") Integer typeId,
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam(value = "x",required = false) Double x,
@RequestParam(value = "y",required = false) Double y
) {
return shopService.queryShopByType(typeId, current, x, y);
}

添加地理坐标

image-20221127194817147

package com.ganga;

import com.ganga.entity.Shop;
import com.ganga.service.IShopService;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.connection.RedisGeoCommands;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.ContextConfiguration;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static com.ganga.utils.RedisConstants.SHOP_GEO_KEY;

@SpringBootTest
@ContextConfiguration(classes = GEOApp.class)
public class RedisGEOShopTest {

@Resource
private IShopService shopService;

@Resource
private StringRedisTemplate stringRedisTemplate;

/**
* 查询所有商铺 将 [店铺类型] [id] [经纬度] 存入Redis当中
* 数据类型: GEO
* key: 店铺类型
* member: 店铺id
* score: 经纬度
*/
@Test
void addEGORedisShop(){
//1.获取所有店铺
List<Shop> shopList = shopService.list();
//2.根据店铺类型进行分组
Map<Long, List<Shop>> collect = shopList.stream().collect(Collectors.groupingBy(Shop::getTypeId));
//3.添加到 Redis 当中
collect.forEach((typeId, value) -> {
//3.1.获取店铺类型id key
String key = SHOP_GEO_KEY + typeId.toString();
//3.2.同种店铺封装到集合当中 [店铺] + [经纬坐标] 封装成 RedisGeoCommands.GeoLocation<String>>
ArrayList<RedisGeoCommands.GeoLocation<String>> location = new ArrayList<>();
value.forEach(shop -> location.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(), new Point(shop.getX(), shop.getY()))));
//3.3.写入 Redis 当中
stringRedisTemplate.opsForGeo().add(key, location);
});
}

}

分页查询

image-20221127195003244

/**
* 根据商铺类型分页查询商铺信息
* @param typeId 商铺类型
* @param current 页码
* @param x 经度
* @param y 维度
* @return 商铺列表
*/
@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {

//1.判断是否需要通过 [地理坐标查询] 店铺
if (x == null || y == null){
//不通过地理坐标查询 使用数据库查询
Page<Shop> page = query()
.eq("type_id", typeId)
.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
return Result.ok(page.getRecords());
}
//使用地理坐标查询
//2.分页参数
int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE; //size = 5
int end = current * SystemConstants.DEFAULT_PAGE_SIZE;
//3.从 Redis 获取 GEO 数据
String key = SHOP_GEO_KEY + typeId.toString();
GeoResults<RedisGeoCommands.GeoLocation<String>> search = stringRedisTemplate.opsForGeo()
.search(
key, //shop:geo:店铺类型id
GeoReference.fromCoordinate(x, y),//圆心:用户当前坐标
new Distance(5000),//半径距离 默认是米
RedisGeoCommands //返回结果带上距离
.GeoSearchCommandArgs
.newGeoSearchArgs()
.includeDistance() //返回结果 包含坐标!
.limit(end) //这里只能 0 - end
);
if (search == null){
return Result.ok(Collections.emptyList());
}
//4.解析数据
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> content = search.getContent();

if (content.size() <= from) return Result.ok(Collections.emptyList());
ArrayList<Long> ids = new ArrayList<>(content.size());
HashMap<String, Distance> dis = new HashMap<>();
//5.逻辑分页 + 解析 店铺id、店铺距离
content.stream().skip(from).forEach(data ->{
//店铺id
String shopId = data.getContent().getName();
ids.add(Long.valueOf(data.getContent().getName()));
//店铺距离
dis.put(shopId,data.getDistance());
Distance distance = data.getDistance();
});
//6.根据 店铺id 查询店铺
String join = StrUtil.join(",", ids);
List<Shop> shops = query()
.in("id", ids)
.last("order by field(id," + join + ")")
.list();
//7.封装: shop + 地理位置
shops.forEach(shop -> shop.setDistance(dis.get(shop.getId().toString()).getValue()));
//8.返回给前端
return Result.ok(shops);
}

Redis存储

image-20221127195343308

实现效果

Redis解决 [签到] [BitMap]

案例分析

要求

image-20221128163019925

image-20221128163025985

如果使用数据库记录


image-20221128163437289


CREATE TABLE `tb_sign` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_id` bigint(20) unsigned NOT NULL COMMENT '用户id',
`year` year(4) NOT NULL COMMENT '签到的年',
`month` tinyint(2) NOT NULL COMMENT '签到的月',
`date` date NOT NULL COMMENT '签到的日期',
`is_backup` tinyint(1) unsigned DEFAULT NULL COMMENT '是否补签',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;

假如有1000万用户,平均每人每年签到次数为10次,

则这张表一年的数据量为 1亿条 每签到一次需要使用(8 + 8 + 1 + 1 + 3 + 1)共22 字节的内存,

一个月则最多需要600多字节

使用 BitMap数据结构 –位图

把每一个bit位对应当月的每一天,形成了映射关系。

01 标示业务状态,这种思路就称为位图

image-20221128165031696

image-20221128165558375

要使用的命令

解决方案

签到好解决,是需要使用 **setbit sign:用户ID:年月 当月几号-1**即可。

判断 连续签到 [从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。]

image-20221128171234233

实现:

int count = 0;
while(true){
if ( (signBit & 1) == 0 ){
//未签到
break;
}else{
//计数器 +1
count ++;
//原数据 右移
signBit >>>= 1;
}
}

前端控制器

image-20221128165908062

/**
* 用户签到
* @return
*/
@PostMapping("/sign")
public Result userSign(){
return userService.userSign();
}

/**
* 用户今天开始 连续签到次数
* @return
*/
@GetMapping("/sign/count")
public Result userSignCount(){
return userService.userSignCount();
}

业务实现

image-20221128165825813

/**
* 签到功能
* @return
*/
@Override
public Result userSign() {
//1.获取用户id
Long id = UserHolder.getUser().getId();
//2.1.获取当前时间
LocalDateTime now = LocalDateTime.now();
String format = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
//2.2.获取偏移量 -> 当月的第几天 几号
//[注意]:这里从 0 开始 28号 偏移量就是 27
int offset = now.getDayOfMonth() - 1;
//3.获取key
String key = USER_SIGN_KEY + id + format;
//4.写入 Redis 实现签到
Boolean is = stringRedisTemplate.opsForValue().getBit(key, offset);
if (BooleanUtil.isTrue(is)){
return Result.ok("今日已经签到过咯~");
}
Boolean isSuccess = stringRedisTemplate.opsForValue().setBit(key, offset, true);
//5.返回前端
if (BooleanUtil.isFalse(isSuccess)){
return Result.ok("签到成功!");
}
return Result.fail("签到失败!");
}

// sign::202211
// 0000 0000 0000 0000 0000 0000 0000 1000 <-- -1
// 4 X 7 + 1 - 1 = 28号

@Override
public Result userSignCount() {
//1.获取用户id
Long id = UserHolder.getUser().getId();
//2.1.获取当前时间
LocalDateTime now = LocalDateTime.now();
String format = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
//2.2.获取偏移量 -> 当月的第几天 几号
//[注意]:这里从 1 开始 28号 偏移量就是 28
int offset = now.getDayOfMonth();
//3.获取key
String key = USER_SIGN_KEY + id + format;
//4.从 Redis 中拿到数据
List<Long> lists = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands
.create() // get查询 unsigned()不带符号 -> 参数:偏移量
.get(BitFieldSubCommands.BitFieldType.unsigned(offset))
.valueAt(0)
);
//5.解析 + 健壮性判断
if (ObjectUtils.isEmpty(lists)){
return Result.ok(0);
}
Long signBit = lists.get(0);
if (ObjectUtils.isEmpty(signBit)){
return Result.ok(0);
}
//6.获取连续签到次数
int count = 0;
while(true){
if ( (signBit & 1) == 0 ){
//未签到
break;
}else{
//计数器 +1
count ++;
//原数据 右移
signBit >>>= 1;
}
}
//7.返回前端数据
return Result.ok(count);
}

Redis解决 [统计] [UV统计]

HyperLogLog用法

首先我们搞懂两个概念:

  • UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次 访问该网站,只记录1次。
  • PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则 记录多次PV。往往用来衡量网站的流量。

UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。

但是如果每个访 问的用户都保存到Redis中,数据量会非常恐怖

Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。

相关算法 原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0 Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb,内存占用低的令人发指!作为代价,其测量结 果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。

image-20221128192606733

Redis实现 UV统计

image-20221128192759046

package com.ganga;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.ObjectUtils;

@SpringBootTest(classes = HyperLogLog.class)
public class UVTests {

@Autowired
private StringRedisTemplate stringRedisTemplate;

/**
* Redis 实现 UV 统计
* 生成 10万 条数据写入Redis 并统计
*/
@Test
void uvTest(){

String[] array = new String[1000];
int j = 0;
for (int i = 0; i < 100000; i++) {
j = i % 1000;
array[j] = "user_id:" + i;
if (j == 999){
stringRedisTemplate.opsForHyperLogLog().add("uv:user",array);
}
}

if (!ObjectUtils.isEmpty(array)){
stringRedisTemplate.opsForHyperLogLog().add("uv:user",array);
}

}

@Test
void uvGetTest(){
Long size = stringRedisTemplate.opsForHyperLogLog().size("uv:user");
System.out.println(size);
}

@Test
void uvRmTest(){
stringRedisTemplate.delete("uv:user");
}

}

TODO: