数据结构
5大基本数据类型,6大底层结构
Zset
其他都很好理解,但是这个接触的少,他结合了set的唯一性和list的排序功能
他的底层数据结构是跳表,这个接触的也少
跳表
在支持排序的基础上还兼顾了查询啊,插入啊,删除
只有他能做到
是以空间换时间的方式优化了查询效率
跳表是一种多层次的链表结构
我们要查询一个数时,先在最上层进行查找,在通过down指针逐层向下,其查询的平均时间复杂度为O(logn)
各个数据结构适用的场景
string是最常用的数据结构,一般key-value都会使用他
list主要可以用来做消息队列,比如说你需要进行一个数据量很大的查询,你可以每500的查,查一次放入list,再从list里poll出来
hash因为他O(1)的查询和插入,适合需要对字段频繁查询或者改动的情况,比如说购物车
zset他因为有个一权重,所以他适合排序的情况嘛,比如说按时间排序
set是一个无序唯一集合,但是他的方法给了他很多遍历,比如说UV(unique vistors),以及需要对多个数据取交并差集(今日新增人数,今日留存人数)
缓存读写一致性
3中常见的缓存读写
只读:旁路缓存
读写:同步写回,异步写回
旁路缓存
读 :从 cache 中读取数据,读取到就直接返回,cache 中读取不到的话,就从 db 中读取数据返回,再把数据放到 cache 中。
写:先更新 db然后直接删除 cache 。
在写数据的过程中,可以先删除 cache ,后更新 db 么?
no!,在这两个操作之间,如果有查询的话
在写数据的过程中,先更新 db,后删除 cache 就没有问题了么?
也会,在这两个之间的话,但是!
缺陷:
第一次读的时候数据一定不再缓存中。
写操作比较频繁的话导致 cache 中的数据会被频繁被删除,这样会影响缓存命中率 。
同步写回
读是一样的。
写请求发给缓存的同时,也会发给后端数据库进行处理,等到缓存和数据库都写完数据,才给客户端返回。
这样保证了数据一致性,
但是同步直写会降低缓存的访问性能。
异步写回
读是一样的
此时,所有写请求都先在缓存中处理。等到这些增改的数据要被从缓存中淘汰出来时,缓存将它们写回后端数据库。
这样一来,处理这些数据的操作是在缓存中进行的,很快就能完成。只不过,如果服务器宕机就会有数据丢失的风险了。
如何保证双写一致性
只针对只读缓存中的修改操作
是啥:
如果缓存中有数据,要保证数据库和缓存中值相同
如果缓存中没数据,那数据库中的值必须是最新的
同步写回的话,是已经保证了双写一致性的
异步写回的话,这由于他的设计就是允许暂时不同步,就不考虑双写一致性问题把
所以我们针对只读缓存来讲:
对于新增操作,是直接操作数据库的,缓存中没有值,所以是读写一致的
针对修改操作:这里可以出现双写一致性问题,因为修改数据库和删缓存不是原子性操作,所以……
旁路缓存就是一个非常好的操作
延迟双删:即在删缓存,更新数据库之后休眠一段时间,再删除缓存
生产问题
缓存穿透
造成原因:?
正则表达式进行校验 缓存穿透的一个原因是有大量的恶意请求访问不存在的数据,所以我们用正则表达式进行合法性检测,把恶意的请求(例如请求参数不合理、请求参数是非法值、请求字段不存在)直接过滤掉,不让它们访问后端缓存和数据库。这样一来,也就不会出现缓存穿透问题了。
布隆过滤器来解决
缓存空值(默认值,比如说0)
布隆过滤器的核心思想是使用多个哈希函数来将元素映射到位数组中的多个位置上。当一个元素被加入到布隆过滤器中时,它会被多次哈希,并将对应的位数组位置设置为1
。当需要判断一个元素是否在布隆过滤器中时,我们只需将该元素进行多次哈希,并检查对应的位数组位置是否都为1,如果其中有任意一位为0,则说明该元素不在集合中
;如果所有位都为1,则说明该元素可能
在集合中(因为有可能存在哈希冲突)
redission里已经实现了布隆过滤器
bloomFilter = bloomFilterUtil.create("idWhiteList", size, fpp);
idWhiteList
是布隆过滤器的名称
size
是布隆过滤器的长度
fpp
是误判率,想要误判率尽可能低,那么数组要越长,哈希函数也要越多
缓存雪崩
造成原因:
大量过期
大量过期:我们可以在设置过期时间时,加一个较小的随机数
reids无法处理请求(宕机了,不用讲,这个接触非常少,属于线上问题)
宕机:比如说来通过主从节点,当主节点宕机后,从节点切换为主节点继续提供服务
发生后通用方案
服务熔断:即关闭服务,但是对业务影响很大
(已经发送了雪崩)服务降级:当访问非核心数据时,直接返回预定信息(空值,错误,预定义信息) 访问核心数据:仍然按正常流程去走,先查询缓存,没有就查询数据库
服务限流,当每秒请求有1万次,我们启动限流机制,使其每秒只允许进入1000个,这样可以避免大量请求压力到数据库
缓存击穿
造成原因
这里采取的方法比较直接,就是对于访问特别频繁的热点数据,我们就不设置过期时间了。这样一来,对热点数据的访问请求,都可以在缓存中进行处理,而 Redis 数万级别的高吞吐量可以很好地应对大量的并发请求访问。
逻辑过期
需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题
思路分析:当用户开始查询redis时,判断是否命中,如果没有命中则直接返回空数据,不查询数据库,而一旦命中后,将value取出,判断value中的过期时间是否满足,如果没有过期,则直接返回redis中的数据,如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。
如果封装数据:因为现在redis中存储的数据的value需要带上过期时间,此时要么你去修改原来的实体类,要么你
新建一个实体类,我们采用第二个方案,这个方案,对原来代码没有侵入性。
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}
步骤二、
在ShopServiceImpl 新增此方法,利用单元测试进行缓存预热
步骤三:正式代码
ShopServiceImpl
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire( Long id ) {
String key = CACHE_SHOP_KEY + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isBlank(json)) {
// 3.存在,直接返回
return null;
}
// 4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())) {
// 5.1.未过期,直接返回店铺信息
return shop;
}
// 5.2.已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2.判断是否获取锁成功
if (isLock){
CACHE_REBUILD_EXECUTOR.submit( ()->{
try{
//重建缓存
this.saveShop2Redis(id,20L);
}catch (Exception e){
throw new RuntimeException(e);
}finally {
unlock(lockKey);
}
});
}
// 6.4.返回过期的商铺信息
return shop;
}
互斥锁
缓存击穿中,请求的 key 对应的是 热点数据 ,该数据 存在于数据库中,但不存在于缓存中(通常是因为缓存中的那份数据已经过期) 。这就可能会导致瞬时大量的请求直接打到了数据库上,对数据库造成了巨大的压力,可能直接就被这么多请求弄宕机了。
查询缓存没有命中时,会尝试获取一个锁,
如果获取失败,让他休眠一段时间之后再去尝试重新查询,
如果获取锁成功,那么就用旁路缓存模式先读数据库,再写入缓存,最后释放锁
这样久避免了大量请求直接打在了数据库上
假设现在线程1过来访问,他查询缓存没有命中,但是此时他获得到了锁的资源,那么线程1就会一个人去执行逻辑,假设现在线程2过来,线程2在执行过程中,并没有获得到锁,那么线程2就可以进行到休眠,直到线程1把锁释放后,线程2获得到锁,然后再来执行逻辑,此时就能够从缓存中拿到数据了
public Shop queryWithMutex(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1、从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get("key");
// 2、判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 存在,直接返回
return JSONUtil.toBean(shopJson, Shop.class);
}
//判断命中的值是否是空值
if (shopJson != null) {
//返回一个错误信息
return null;
}
//不存在缓存中
String lockKey = "lock:shop:" + id;
Shop shop = null;
try {
boolean isLock = tryLock(lockKey);
// 4.2 判断否获取成功
if(!isLock){
//4.3 失败,则休眠重试
Thread.sleep(50);
return queryWithMutex(id);
}
//4.4 成功,根据id查询数据库
shop = getById(id);
// 5.不存在,返回错误
if(shop == null){
//将空值写入redis
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
//返回错误信息
return null;
}
//6.写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_NULL_TTL,TimeUnit.MINUTES);
}catch (Exception e){
throw new RuntimeException(e);
}
finally {
//7.释放互斥锁
unlock(lockKey);
}
return shop;
}
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
内存管理(过期淘汰)
redis默认是不进行数据淘汰的。我们需要进入redis.conf文件中,加入这两项
maxmenory 256mn
maxmenory-policy volatile-lru
进行数据淘汰主要可以分为两大类,一种是在设置了过期时间的数据中进行淘汰(volatile),一种是在所有数据中进行淘汰(allkeys)。这两类又细分为random,lfu,lru,ttl。
用的最多的就是allkeys-lru。还是依据不同业务不同需求来确定
持久化
RDB快照机制
rdb时一个快照文件,记录的是某一时刻的内存数据
RDB 快照可以通过手动命令(如 SAVE 或 BGSAVE)或配置文件中的时间策略(一定是bgsave)来生成。
//默认配置,触发采用的是异步!!!
save 900 1 //900秒内有一个修改就触发
save 300 10
save 60 10000
AOF追加机制
Redis 就会将该命令写入到 AOF 文件中
appendonly yes
appendfilename "appendonly.aof"
appendfsync always # 每条写命令都同步到磁盘,最安全,但性能最低
appendfsync everysec # 每秒同步一次(默认),性能和安全平衡
appendfsync no # 不主动同步,由操作系统决定,性能最高,但数据丢失风险最大
实操
yml文件,才发现不和mysql一样,不需要账号和密码,离谱呀。说是上线之后
spring:
redis:
host: 47.108.159.244
port: 6379
timeout: 3000
3个操作对象,StringRedisTemplate,RedisTemplate,Redission。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
Redission需要单独在来个依赖
RedisTemplate一般需要配置文件,但StringRedisTemplate就不用,因为他key和value规定了只能用字符串
配置好两个就直接用
//string数据结构
stringRedisTemplate.opsForValue().set("key","value");
stringRedisTemplate.opsForValue().set("key","value",Duration.ofMinutes(5));
String val = stringRedisTemplate.opsForValue().get("key");
//list数据结构
stringRedisTemplate.opsForList().leftPush("listKey","item1");
stringRedisTemplate.opsForList().rightPop("listKey");
List<String> listKey = stringRedisTemplate.opsForList().range("listKey", 0, 1);
//set数据结构
stringRedisTemplate.opsForSet().add("setKey","a","b","c");
Set<String> setKey = stringRedisTemplate.opsForSet().members("setKey");
stringRedisTemplate.opsForSet().isMember("setKey","a");
stringRedisTemplate.opsForSet().remove("setKey","a");
//Zset数据结构
stringRedisTemplate.opsForZSet().add("zsetKey", "member1", 10);
Set<String> range = stringRedisTemplate.opsForZSet().range("zsetKey", 0, -1);
Set<String> rangeByScore = stringRedisTemplate.opsForZSet().rangeByScore("zsetKey", 5, 15);
stringRedisTemplate.opsForZSet().remove("zsetKey", "member1");
//Hash数据结构
stringRedisTemplate.opsForHash().put("user:1", "name", "Alice");
String name = (String) stringRedisTemplate.opsForHash().get("user:1", "name");
Map<Object, Object> map = stringRedisTemplate.opsForHash().entries("user:1");
stringRedisTemplate.opsForHash().delete("user:1", "name");
//redis的操作
stringRedisTemplate.delete("key");
boolean hasKey = stringRedisTemplate.hasKey("key");
stringRedisTemplate.expire("key", Duration.ofMinutes(10));
Duration expire = stringRedisTemplate.getExpire("key");
99%的生产环境:优先选择 Redisson
学习/测试环境:可以用 RedisTemplate 理解原理