Redis

Redis的发布和订阅

什么是发布和订阅

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。

Redis 客户端可以订阅任意数量的频道

发布和订阅

  1. 客户端可以订阅频道如下图

  1. 当给这个频道发布消息后,消息就会发送给订阅的客户端

发布订阅命令行实现

  1. 打开一个客户端订阅 channel1
1
SUBSCRIBE channel1

其中可以在控制台看到如下

1
2
3
4
5
127.0.0.1:6379> SUBSCRIBE channel1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
  1. 打开另外一个客户端, 给channel1发布消息 hello
1
publish channel1 hello

当前客户端会反馈

1
2
127.0.0.1:6379> publish channel1 hello
(integer) 1 # 返回的1是订阅者数量
  1. 订阅客户端可以看到发送的消息
1
2
3
4
5
6
7
8
127.0.0.1:6379> SUBSCRIBE channel1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "message"
2) "channel1"
3) "hello" # 刚刚发布的信息

注:发布的消息没有持久化,如果在订阅的客户端收不到hello,只能收到订阅后发布的消息

Redis的新数据类型

Bitmaps位图

setbit

setbit设置Bitmaps中某个偏移量的值(0或1)

*offset:偏移量从0开始

实例: 每个独立用户是否访问过网站存放在Bitmaps中, 将访问的用户记做1, 没有访问的用户记做0, 用偏移量作为用户的id。

设置键的第offset个位的值(从0算起) , 假设现在有6个用户,userid=1, 3, 5 的用户对网站进行了访问, 那么当前Bitmaps初始化结果如图

Bitmaps 0 1 0 1 0 1
用户 0 1 2 3 4 5

注: 很多应用的用户id以一个指定数字(例如10000) 开头, 直接将用户id和Bitmaps的偏移量对应势必会造成一定的浪费, 通常的做法是每次做setbit操作时将用户id减去这个指定数字。

在第一次初始化Bitmaps时, 假如偏移量非常大, 那么整个初始化过程执行会比较慢, 可能会造成Redis的阻塞。

getbit

获取Bitmaps中某个偏移量的值 : getbit<key><offset>

例如(假如刚刚的案例为k1): getbit k1 1 , 那么则返回的是 1

bitcount

统计字符串被设置为1的bit数。一般情况下,给定的整个字符串都会被进行计数,通过指定额外的 start 或 end 参数,可以让计数只在特定的位上进行。start 和 end 参数的设置,都可以使用负数值:比如 -1 表示最后一个位,而 -2 表示倒数第二个位,start、end 是指bit组的字节的下标数,二者皆包含.

统计字符串从start字节到end字节比特值为1的数量 : bitcount<key>[start end]

举例: K1 【01000001 01000000 00000000 00100001】,对应【0,1,2,3】

bitcount K1 1 2 : 统计下标1、2字节组中bit=1的个数,即01000000 00000000

–>bitcount K1 1 2  –> 1

bitcount K1 1 3 : 统计下标1、3字节组中bit=1的个数,即01000000 00000000 00100001

–> bitcount K1 1 3  –> 3

bitcount K1 0 -2 : 统计下标0到下标倒数第2,字节组中bit=1的个数,即01000001 01000000 00000000

–> bitcount K1 0 -2  –> 3

注意:redis的setbit设置或清除的是bit位置,而bitcount计算的是byte位置。

bitop

bitop and(or/not/xor) <destkey> [key…]

bitop是一个复合操作, 它可以做多个Bitmaps的and(交集) 、 or(并集) 、 not(非) 、 xor(异或) 操作并将结果保存在destkey中。

实例 2020-11-04 日访问网站的userid=1,2,5,9。

setbit users:20201104 1 1

setbit users:20201104 2 1

setbit users:20201104 5 1

setbit users:20201104 9 1

2020-11-03 日访问网站的userid=0,1,4,9。

setbit users:20201103 0 1

setbit users:20201103 1 1

setbit users:20201103 4 1

setbit users:20201103 9 1

计算出两天都访问过网站的用户数量

bitop and users:and:20201104_03 users:20201103 users:20201104

users:and:20201104_03 -> 20201104和20201103的

Bitmaps与set对比

假设网站有1亿用户, 每天独立访问的用户有5千万, 如果每天用集合类型和Bitmaps分别存储活跃用户可以得到表

set和Bitmaps存储一天活跃用户对比
数据 类型 每个用户id占用空间 需要存储的用户量 全部内存量
集合 类型 64位 50000000 64位*50000000 = 400MB
Bitmaps 1位 100000000 1位*100000000 = 12.5MB

很明显, 这种情况下使用Bitmaps能节省很多的内存空间, 尤其是随着时间推移节省的内存还是非常可观的

set和Bitmaps存储独立用户空间对比
数据类型 一天 一个月 一年
集合类型 400MB 12GB 144GB
Bitmaps 12.5MB 375MB 4.5GB

但Bitmaps并不是万金油, 假如该网站每天的独立访问用户很少, 例如只有10万(大量的僵尸用户) , 那么两者的对比如下表所示, 很显然, 这时候使用Bitmaps就不太合适了, 因为基本上大部分位都是0。

set和Bitmaps存储一天活跃用户对比(独立用户比较少)
数据类型 每个userid占用空间 需要存储的用户量 全部内存量
集合类型 64位 100000 64位*100000 = 800KB
Bitmaps 1位 100000000 1位*100000000 = 12.5MB

HyperLogLog统计

在工作当中,我们经常会遇到与统计相关的功能需求,比如统计网站PV(PageView页面访问量),可以使用Redis的incr、incrby轻松实现。

但像UV(UniqueVisitor,独立访客)、独立IP数、搜索记录数等需要去重和计数的问题如何解决?这种求集合中不重复元素个数的问题称为基数问题。

Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。

在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。

但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。

什么是基数?

比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5。 基数估计就是在误差可接受的范围内,快速计算基数

pfadd

添加指定元素到 HyperLogLog 中: pfadd <key>< element> [element ...]

例如:

1
2
pfadd program "redis"   # 往program里面添加redis
pfadd program "kafka"

将所有元素添加到指定HyperLogLog数据结构中。如果执行命令后HLL估计的近似基数发生变化,则返回1,否则返回0

在此时例如再往program添加redis 返回的结果是0,因为基数没有发生变化

pfcount

计算HLL的近似基数: pfcount<key> [key ...] ,可以计算多个HLL,比如用HLL存储每天的UV,计算一周的UV可以使用7天的UV合并计算即可, 示例

pfmerge

将一个或多个HLL合并后的结果存储在另一个HLL中: pfmerge<destkey><sourcekey> [sourcekey ...] ,比如每月活跃用户可以使用每天的活跃用户来合并计算可得

Geospatial地理信息

Redis 3.2 中增加了对GEO类型的支持。GEO,Geographic,地理信息的缩写。该类型,就是元素的2维坐标,在地图上就是经纬度。redis基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度Hash等常见操作

geoadd添加地理位置

添加地理位置(经度,纬度,名称): geoadd<key>< longitude><latitude><member> [longitude latitude member...]

实例

1
2
geoadd china:city 121.47 31.23 shanghai
geoadd china:city 106.50 29.53 chongqing 114.05 22.52 shenzhen 116.38 39.90 beijing

两极无法直接添加,一般会下载城市数据,直接通过 Java 程序一次性导入。

有效的经度从 -180 度到 180 度。有效的纬度从 -85.05112878 度到 85.05112878 度。

当坐标位置超出指定范围时,该命令将会返回一个错误。

已经添加的数据,是无法再次往里面添加的。

geopos地区坐标值

获得指定地区的坐标值: geopos <key><member> [member...]

实例

1
2
3
127.0.0.1:6379> geopos china:city shanghai
1) 2) "121.47000163793563843"
2) "31.22999903975783553"

geodist直线距离

获取两个位置之间的直线距离geodist<key><member1><member2> [m|km|ft|mi ]

实例

获取两个位置之间的直线距离

1
2
127.0.0.1:6379> geopos china:city beijin shanghai km
"1087.4816"

单位:

m 表示单位为米[默认值]。

km 表示单位为千米。

mi 表示单位为英里。

ft 表示单位为英尺。

如果用户没有显式地指定单位参数, 那么 GEODIST 默认使用米作为单位

georadius找某一半径内的元素

以给定的经纬度为中心,找出某一半径内的元素 georadius<key>< longitude><latitude>radius m|km|ft|mi经度 纬度 距离 单位

实例

1
2
3
127.0.0.1:6379> geopos china:city 110 30 1000 km
1) "chongqing"
2) "shengzhen"

Redis在SpringBoot的配置

导入依赖

SpringBoot2.x

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- spring2.X集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

RedisConfig设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {

@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}

@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}

Redis事务、锁机制、秒杀★

Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

Redis事务的主要作用就是串联多个命令防止别的命令插队

Multi、Exec、discard

Multi:类似于开启事务 Transational(value=true)

从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行

组队的过程中可以通过discard来放弃组队

案例: 当前我有两个命令 设置k1为1,k2为2 , 控制台如下:

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> multi    # 进入事务模式
OK
127.0.0.1:6379> set k1 1 # 设置k1为1,并且进入队列
QUEUED
127.0.0.1:6379> set k2 2 # 设置k2为2,并且进入队列
QUEUED
127.0.0.1:6379> exec # 提交队列,依次执行
1) OK
2) OK

组队阶段报错,提交失败

1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k3 3
QUEUED
127.0.0.1:6379> set k4
(error) ERR wrong number of arguments for 'set' command
127.0.0.1:6379> set k5 5
QUEUED
127.0.0.1:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.

组队成功,提交有成功有失败情况

1
2
3
4
5
6
7
8
9
10
11
12
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 k
QUEUED
127.0.0.1:6379> incr k1
QUEUED
127.0.0.1:6379> set k2 k
QUEUED
127.0.0.1:6379> exec
1) OK
2) (error) ERR value is not an integer or out of range
3) OK

事务的错误处理

组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消(组队时是类似于原子性的)

如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚

事务冲突的问题

例子:

在一个账户中,一个请求想给金额减8000

一个请求想给金额减5000

一个请求想给金额减1000

开始金额 逻辑判断 操作结果 最后结果
if 10000>8000 then -8000 -8000
10000 if 10000>8000 then -5000 -5000 -4000
if 10000>8000 then -1000 -1000

悲观锁

悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁表锁等,读锁写锁等,都是在做操作之前先上锁

乐观锁

乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。

WATCH key [key . . .]

监视key: 在执行multi之前,先执行watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。乐观锁版本号控制

unwatch

取消 WATCH 命令对所有 key 的监视。

如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH 了。

EXEC — Redis 命令参考 (redisfans.com)

Redis事务三特性

单独的隔离操作

事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

没有隔离级别的概念

队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行

不保证原子性

事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

Redis事务秒杀案例

解决计数器和人员记录的事务操作

商品库存: ( 减个数 )

key string
sk:prodid:qt 剩余个数

秒杀成功者清单: ( 加个数, set集合去重 )

key set
成功者的user_id
sk:prod-id:user 成功者的user_id
成功者的user_id

使用简单的代码模拟秒杀流程代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//秒杀过程 模拟
public static boolean doSecKill(String uid,String prodid) throws IOException {
//1 uid和prodid非空判断
if(uid == null || prodid == null) {
return false;
}

//2 连接redis
// Jedis jedis = new Jedis("192.168.44.168",6379);
//====(连接池)====(解决超时问题)
//通过连接池得到jedis对象
JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedisPoolInstance.getResource();

//3 拼接key
// 3.1 库存key
String kcKey = "sk:"+prodid+":qt";
// 3.2 秒杀成功用户key
String userKey = "sk:"+prodid+":user";
//====(乐观锁)====
//监视库存
jedis.watch(kcKey);

//4 获取库存,如果库存null,秒杀还没有开始
String kc = jedis.get(kcKey);
if(kc == null) {
System.out.println("秒杀还没有开始,请等待");
jedis.close();
return false;
}

// 5 判断用户是否重复秒杀操作
if(jedis.sismember(userKey, uid)) {
System.out.println("已经秒杀成功了,不能重复秒杀");
jedis.close();
return false;
}

//6 判断如果商品数量,库存数量小于1,秒杀结束
if(Integer.parseInt(kc)<=0) {
System.out.println("秒杀已经结束了");
jedis.close();
return false;
}

//7 秒杀过程
//使用事务
Transaction multi = jedis.multi();

//组队操作
multi.decr(kcKey); // 减少库存
multi.sadd(userKey,uid);// 加入用户

//执行
List<Object> results = multi.exec();
if(results == null || results.size()==0) {
System.out.println("秒杀失败了....");
jedis.close();
return false;
}
// 没有添加事务,会超卖
//7.1 库存-1
//jedis.decr(kcKey);
//7.2 把秒杀成功用户添加清单里面
//jedis.sadd(userKey,uid);

System.out.println("秒杀成功了..");
jedis.close();
return true;
}

总结: 当前案例代码模拟的秒杀, 先获取秒杀用户key和秒杀商品的key, 使用Redis的Watch监视库存, 判断库存情况, 如果存在库存, 开始秒杀, 在秒杀过程中判断用户是否重复秒杀, 以及判断库存是否不足, 开启事物, 进行组队(队列)操作, 然后执行exec(), 根据结果(List results)判断结果如何, 流程结束 (上述代码为完整的流程,加了连接池,加了乐观锁)

Redis事务秒杀并发模拟

使用工具ab模拟测试 (CentOS6 默认安装 CentOS7需要手动安装)

联网:yum install httpd-tools

无网络:(1) 进入cd /run/media/root/CentOS 7 x86_64/Packages(路径跟centos6不同)

顺序安装 :

apr-1.4.8-3.el7.x86_64.rpm

apr-util-1.5.2-6.el7.x86_64.rpm

httpd-tools-2.4.6-67.el7.centos.x86_64.rpm

通过ab测试

vim postfile 模拟表单提交参数,以&符号结尾;存放当前目录。

内容:prodid=0101&

ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded http://127.0.0.1:8081/Seckill/doseckill

2000 个 请求中有200个是并发执行

-n 请求数, -c 请求中的并发数量, -p 提交参数, -t 参数类型

127.0.0.1:8081: 实际上要加上当前主机的ip地址 ipconfig

还可以通过Apipost等工具进行压测并发请求

超卖

Redis中商品数

1
2
3
4
127.0.0.1:6379> set sk:0101:qt 10
OK
127.0.0.1:6379> get sk:0101:qt
"-2" # 在并发的时候库存变成了-2

在后台

1
2
3
4
5
6
7
8
9
秒杀成功!
秒杀成功!
秒杀成功!
秒杀成功!
已秒光!
秒杀成功!
已秒光!
已秒光!
已秒光!
超卖问题
总库存 用户请求 操作 库存
A :1.检查是否还有库存 2. 有则-1 -1
10 (总) B: 1.检查是否还有库存 2. 有则-1 -1 -N (总)
C: 1.检查是否还有库存 2. 有则-1 -1
…. …
利用乐观锁淘汰用户,解决超卖问题

对库存的key不断监视

1
jedis.watch(kcKey); //★

对秒杀的过程使用事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//★使用事务
Transaction multi = jedis.multi();

//★组队操作
multi.decr(kcKey); // 减少库存
multi.sadd(userKey,uid);// 加入用户,

// 执行
List<Object> results = multi.exec();
if(results == null || results.size() == 0) { // 结果为空或者有其他的情况->失败 否则成功
log.info("秒杀失败了....");
jedis.close();
return false;
}

总结: 先Watch监视key, 然后加入事务组队, 最后执行, 判断返回的list集合

走到这里了, 解决了超卖问题(加乐观锁), 解决了连接超时问题(使用JedisPool连接池), 但是发现还有库存遗留问题. 乐观锁造成库存遗留问题

解决库存遗留问题
Lua脚本

Lua 是一个小巧的 脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。

很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。

LUA脚本在Redis中的优势

将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能

LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。

但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。

利用lua脚本淘汰用户,解决超卖问题。

redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题

Lua 教程_w3cschool

解决库存依赖问题,lua脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
local userid=KEYS[1]; -- 获取用户id
local prodid=KEYS[2]; -- 获取商品id
-- key拼接
local qtkey="sk:"..prodid..":qt";
local usersKey="sk:"..prodid.":usr';
-- 判断用户key是否在当前清单中存在
local userExists=redis.call("sismember",usersKey,userid);
-- 用户已经秒杀过了,约定返回2
if tonumber(userExists)==1 then
return 2;
end
local num= redis.call("get" ,qtkey);
-- 秒杀结束
if tonumber(num)<=0 then
return 0;
-- 否则添加清单,减少库存
else
redis.call("decr",qtkey);
redis.call("sadd",usersKey,userid);
end
-- 秒杀成功
return 1;

对于这个脚本如何使用?我也不知道,不是, 例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class RedisByScript {
// ===Lua脚本===
static String secKillScript ="local userid=KEYS[1];\r\n" +
"local prodid=KEYS[2];\r\n" +
"local qtkey='sk:'..prodid..\":qt\";\r\n" +
"local usersKey='sk:'..prodid..\":usr\";\r\n" +
"local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" +
"if tonumber(userExists)==1 then \r\n" +
" return 2;\r\n" +
"end\r\n" +
"local num= redis.call(\"get\" ,qtkey);\r\n" +
"if tonumber(num)<=0 then \r\n" +
" return 0;\r\n" +
"else \r\n" +
" redis.call(\"decr\",qtkey);\r\n" +
" redis.call(\"sadd\",usersKey,userid);\r\n" +
"end\r\n" +
"return 1" ;

static String secKillScript2 =
"local userExists=redis.call(\"sismember\",\"{sk}:0101:usr\",userid);\r\n" +
" return 1";
// 秒杀流程封装, 结合Lua
public static boolean doSecKill(String uid,String prodid) throws IOException {

JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis=jedispool.getResource();

//String sha1= .secKillScript;
// 调用Jedis加载脚本方法,加载刚刚描写的lua脚本
String sha1= jedis.scriptLoad(secKillScript);
Object result= jedis.evalsha(sha1, 2, uid,prodid);

String reString=String.valueOf(result);
if ("0".equals( reString ) ) {
System.err.println("已抢空!!");
}else if("1".equals( reString ) ) {
System.out.println("抢购成功!!!!");
}else if("2".equals( reString ) ) {
System.err.println("该用户已抢过!!");
}else{
System.err.println("抢购异常!!");
}
jedis.close();
return true;
}
}

Redis持久化之RDB

Redis官网

Redis 提供了2个不同形式的持久化方式。

(1) RDB(Redis DataBase)

(2) AOF(Append Of File)

RDB(Redis DataBase)

是什么?

在指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里, 快照记录某一刻的数据

备份是如何执行的?

Redis会单独创建(fork)一个子进程来进行持久化,会将数据写入到 一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件(dump.rdb)。 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失 .

Fork

  • Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程
  • 在Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了写时复制技术
  • 一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。

RDB持久流程

dump.rdb文件

在redis.conf中配置文件名称, 默认为dump.rdb

1
2
3
4
5
6
7
8
# RDB files created with checksum disabled have a checksum of zero that will
# tell the loading code to skip the check.
rdbchecksum yes

# The filename where to dump the DB
dbfilename dump.rdb # 此处为修改持久保存的文件名

# The working directory.

配置位置

rdb文件的保存位置, 也可以修改. 默认为Redis启动时命令行所在的目录下

dir "/myredis"

1
2
3
4
5
6
7
8
9
# The working directory.
#
# The DB will be written inside this directory, with the filename specified
# above using the 'dbfilename' configuration directive.
#
# The Append Only File will also be created inside this directory.
#
# Note that you must specify a directory here, not a file name.
dir ./ # 此处为修改文件保存位置路径

如何触发RDB快照;保持策略

配置文件中默认的快照配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
################################ SNAPSHOTTING  ################################
#
# Save the DB on disk:
#
# save <seconds> <changes>
#
# Will save the DB if both the given number of seconds and the given
# number of write operations against the DB occurred.
#
# In the example below the behaviour will be to save:
# after 900 sec (15 min) if at least 1 key changed
# after 300 sec (5 min) if at least 10 keys changed
# after 60 sec if at least 10000 keys changed
#
# Note: you can disable saving completely by commenting out all "save" lines.
#
# It is also possible to remove all the previously configured save
# points by adding a save directive` with a single empty string argument
# like in the following example:
#
# save ""

#如果至少有1个key变化在900秒后保存
save 900 1
#如果至少有10个key变化在300秒后保存
save 300 10
#如果至少有10000个key变化在60秒后保存
save 60 10000
save VS bgsave

save :save时只管保存,其它不管,全部阻塞。手动保存。不建议。

bgsaveRedis会在后台异步进行快照操作,快照同时还可以响应客户端请求。

可以通过lastsave 命令获取最后一次成功执行快照的时间

flushall命令

AOF(Append of File)