Redis

下面两个章节(重要)

Redis的线程模型🪜

对于读写命令来说,Redis 一直是单线程模型。不过,在 Redis 4.0 版本之后引入了多线程来执行一些大键值对的异步删除操作, Redis 6.0 版本之后引入了多线程来处理网络请求(提高网络 IO 读写性能)。

Redis线程模型

Redis 基于 Reactor 模式设计开发了一套高效的事件处理模型 (Netty 的线程模型也基于 Reactor 模式,Reactor 模式不愧是高性能 IO 的基石),这套事件处理模型对应的是 Redis 中的文件事件处理器(file event handler)。由于文件事件处理器(file event handler)是单线程方式运行的,所以我们一般都说 Redis 是单线程模型。

《Redis 设计与实现》有一段话是如是介绍文件事件处理器的,我觉得写得挺不错。

Redis 基于 Reactor 模式开发了自己的网络事件处理器:这个处理器被称为文件事件处理器(file event handler)。

  • 文件事件处理器使用 I/O 多路复用(multiplexing)程序来同时监听多个套接字,并根据套接字目前执行的任务来为套接字关联不同的事件处理器。
  • 当被监听的套接字准备好执行连接应答(accept)、读取(read)、写入(write)、关 闭(close)等操作时,与操作相对应的文件事件就会产生,这时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。

虽然文件事件处理器以单线程方式运行,但通过使用 I/O 多路复用程序来监听多个套接字,文件事件处理器既实现了高性能的网络通信模型,又可以很好地与 Redis 服务器中其他同样以单线程方式运行的模块进行对接,这保持了 Redis 内部单线程设计的简单性。

既然是单线程,那怎么监听大量的客户端连接呢?

Redis 通过 IO 多路复用程序 来监听来自客户端的大量连接(或者说是监听多个 socket),它会将感兴趣的事件及类型(读、写)注册到内核中并监听每个事件是否发生。

这样的好处非常明显:I/O 多路复用技术的使用让 Redis 不需要额外创建多余的线程来监听客户端的大量连接,降低了资源的消耗(和 NIO 中的 Selector 组件很像)。

文件事件处理器(file event handler)主要是包含 4 个部分:

  • 多个 socket(客户端连接)
  • IO 多路复用程序(支持多个客户端连接的关键)
  • 文件事件分派器(将 socket 关联到相应的事件处理器)
  • 事件处理器(连接应答处理器、命令请求处理器、命令回复处理器)

以上部分摘抄自 JavaGuide

NIO模型

了解多路复用前先了解一下nio吧。

当然他还有aio、bio、nio。在此处笔者就提出对nio的解析:

首先我们得知道他的三大件(SelectorChannelBuffer

其中他的结构如下

selector

对于一个服务来说,他的当前线程(Thread)通过访问Selector,然后Selector对于注册到自己这的Channel事件进行轮询。

那么也就是说:selector不断对注册的channel事件进行轮询,其中事件是一个非常重要的概念,然后selector会根据不同的channel进行不同的处理操作。

这样做的好处就是:在之前的bio模型,他操作之前有一个read()函数,需要等待输入输出,如果没有的话,他就会阻塞在这里。那么我们使用nio的话,我们只需要轮询注册的事件,如果事件有需要待处理的操作,那么则通知一下selector即可,没有则继续轮询。

channel

用于存储与通信的通道,对于处理(接收或者发送)的数据放入内部buffer缓冲区。

buffer(缓存区通道)

缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个数组,该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的 状态变化情况。Channel 提供从网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer.

IO多路复用

流程见图

img

他有三种模型 select、poll、epoll。

对于这个模型(epoll):

  • 红黑树:存储注册的每一个fd(socket)
  • 链表:存储每一个就绪的fd

如果是select是没有这个就绪链表的,因为select就是轮询注册的红黑树,然后查看是否有数据。

对于链表来说,它是怎么构建的呢,他会在内核态进行事件的标记,然后对于处理的数据通过DMA操作拷贝回去。

select

看到这里,select是不是和nio的selector很像?其实selector里面也是有一个函数为select( )的,那么在当前的io多路复用中:

select选择的是socket,图如下

1
2
3
	↗  Socket
select -> Socket
↘ Socket

这个其实就是我们select的简述图,对于select其实就是selector。

对于敏感一点的读者来说,看到Socket可以知道,他肯定是位于内核态的操作,对于我们的代码运行操作,他肯定是位于用户态。

如果是内核态,我们通过代码肯定是不可以直接通过代码去获取内核态Socket的数据的。

然后对于select操作,我们首先得知道你当前管理的socket文件标识(fd),在linux下,万事万物都是文件,其实对于任务的操作,你ps一下可以看到那些pid也是处于文件下的,笔者之前在处理docker容器中的fastdfs发现其实运行起来就是会生成一个标识文件用来跟踪当前进程的。

所以,我们调用 select 会把所有要管理的 socket 的 fd (文件描述符,Linux下皆为文件,简单理解就是通过 fd 能找到这个 socket)传到内核中。

此时,要遍历所有 socket,看看是否有感兴趣的事件发生。如果没有一个 socket 有事件发生,那么 select 的线程就需要让出 cpu 阻塞等待,这个等待可以是不设置超时时间的死等,也可以是设置 timeout 的有超时时间的等待。

假设此时客户端发送了数据,网卡接收到的数据塞到对应的 socket 的接收队列中,此时 socket 知道来数据了,那如何唤醒 select 呢?

其实每个 socket 有个属于自己的睡眠队列,select 会安排一个内应,即在被管理的 socket 的睡眠队列里面塞入一个 entry。

img

当 socket 接收到网卡的数据后,就会去它的睡眠队列里遍历 entry,调用 entry 设置的 callback 方法,这个 callback 方法里就能唤醒 select 。

但是,select 的实现不太好,因为唤醒的 select 此时只知道他需要处理数据了,但是并不知道具体是哪个 socket 来数据了,所以只能遍历所有 socket ,看看到底是哪个 scoket 来活了,然后把所有需要处理的 socket 封装成事件返回。

这样用户程序就能获得发生的事件,然后进行 I/O 和业务处理了。

小结:对于select来说,他管理了一些socket,然后socket有数据需要处理的时候,select就会去遍历所有注册的socket(因而造成全部轮询一遍,从而耗费大量资源进行定位)定位事件,然后进行io操作与业务处理。

poll

poll和select很像。

epoll

在了解了select的模式之后,我们其实可以发现有可以优化的点,我们可以把select所管理监控的fd列表存入内核态。

每次select可不可以不轮询所有呢?

其实epoll就是做了以上的优化:首先,搞了个叫 epoll_ctl 的方法,这方法就是用来管理维护 epoll 所监控的哪些 socket。

如果你的 epoll 要新加一个 socket 来管理,那就调用 epoll_ctl,要删除一个 socket 也调用 epoll_ctl,通过不同的入参来控制增删改。

这样,在内核里面就维护了此 epoll 管理的 socket 集合,这样就不用每次调用的时候都得把所有管理的 fds 拷贝到内核了。

对了,这个 socket 集合是用红黑树实现的。

然后和 select 类似,每个 socket 的睡眠队列里都会加个 entry,当每个 socket 来数据之后,同样也会调用 entry 对应的 callback。

与 select 不同的是,引入了一个 ready_list 双向链表,callback 里面会把当前的 socket 加入到 ready_list 然后唤醒 epoll。

这样被唤醒的 epoll 只需要遍历 ready_list 即可,这个链表里一定是有数据可读的 socket,相比于 select 就不会做无用的遍历了。

同时收集到的可读的 fd 按理是要拷贝到用户空间的,这里又做了个优化,利用了 mmp,让用户空间和内核空间映射到同一块内存中,这样就避免了拷贝。

完美啊~

这就是 epoll 基于 select 所作的优化,还有一些差别没细说,比如 epoll 是阻塞睡眠在一个 single_epoll_wait_list 而不是 socket 的睡眠队列等等,我就不提了,理解上面的这些已经够了。

ET模式和LT模式

ET(边缘触发模式),LT(水平触发模式)。

读的时候

如果设置为LT,只要读缓冲区不为空,就会一直触发EPOLLIN事件。

如果设置为ET,仅会触发一次EPOLLIN事件,因而要一次读完。

写的时候

如果设置为LT,只要写缓冲区不满,就会一直触发EPOLLOUT事件。

如果设置为ET,注册了EPOLLOUT事件,才会触发一次EPOLLOUT。

ET模式:

ET比LT效率高,对于使用ET模式的文件描述符,在边缘触发模式下,只有在文件描述符状态变化(对于EPOLLIN事件,只有在状态从未就绪变化为就绪,才叫做变化)发生时才会触发事件,所以调用epoll_wait检测到其上有事件发生,并通知应用程序,应用程序必须立即处理完毕该事件,否则会造成数据丢失,因为后续的epoll_wait调用不再重复向应用程序通知此事件

应用场景:读取大型文件,用while循环一次全部读取完毕

LT模式:

LT是默认的工作模式,对于采用LT的文件描述符,在水平触发模式下,如果某个文件描述符上有数据可读,内核会持续通知应用程序,直到应用程序处理完数据或者缓冲区不再有数据可读为止。当调用epoll_wait检测到其上有事件发生并通知应用程序时,应用程序可以不立即处理完毕该事件。这样,当程序下一次调用epoll_wait时,epoll_wait还会向应用程序通知此事件,直到事件被处理完毕

应用场景:解析http报文,分次解析http报文

Redis持久化之RDB🪜

Redis官网

Redis 提供了2个不同形式的持久化方式。

(1) RDB(Redis DataBase)

(2) AOF(Append Of File)

RDB(Redis DataBase)

是什么?

在指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里, 快照记录某一刻的数据

备份是如何执行的?

Redis会单独创建(fork)一个子进程来进行持久化,会将数据写入到 一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件(dump.rdb)。 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失 .

Fork

  • Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程
  • 在Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了写时复制技术
  • 一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。

RDB持久流程

dump.rdb文件

在redis.conf中配置文件名称, 默认为dump.rdb

1
2
3
4
5
6
7
8
# RDB files created with checksum disabled have a checksum of zero that will
# tell the loading code to skip the check.
rdbchecksum yes

# The filename where to dump the DB
dbfilename dump.rdb # 此处为修改持久保存的文件名

# The working directory.

配置位置

rdb文件的保存位置, 也可以修改. 默认为Redis启动时命令行所在的目录下

dir "/myredis"

1
2
3
4
5
6
7
8
9
# The working directory.
#
# The DB will be written inside this directory, with the filename specified
# above using the 'dbfilename' configuration directive.
#
# The Append Only File will also be created inside this directory.
#
# Note that you must specify a directory here, not a file name.
dir ./ # 此处为修改文件保存位置路径

如何触发RDB快照;保持策略

配置文件中默认的快照配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
################################ SNAPSHOTTING  ################################
#
# Save the DB on disk:
#
# save <seconds> <changes>
#
# Will save the DB if both the given number of seconds and the given
# number of write operations against the DB occurred.
#
# In the example below the behaviour will be to save:
# after 900 sec (15 min) if at least 1 key changed
# after 300 sec (5 min) if at least 10 keys changed
# after 60 sec if at least 10000 keys changed
#
# Note: you can disable saving completely by commenting out all "save" lines.
#
# It is also possible to remove all the previously configured save
# points by adding a save directive` with a single empty string argument
# like in the following example:
#
# save ""

#如果至少有1个key变化在900秒后保存
save 900 1
#如果至少有10个key变化在300秒后保存
save 300 10
#如果至少有10000个key变化在60秒后保存
save 60 10000
save VS bgsave

save :save时只管保存,其它不管,全部阻塞。手动保存。不建议。

bgsaveRedis会在后台异步进行快照操作,快照同时还可以响应客户端请求。

可以通过lastsave 命令获取最后一次成功执行快照的时间

flushall命令

AOF(Append of File)

Redis的发布和订阅

什么是发布和订阅

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。

Redis 客户端可以订阅任意数量的频道

发布和订阅

  1. 客户端可以订阅频道

  2. 当给这个频道发布消息后,消息就会发送给订阅的客户端

发布订阅命令行实现

  1. 打开一个客户端订阅 channel1
1
SUBSCRIBE channel1

其中可以在控制台看到如下

1
2
3
4
5
127.0.0.1:6379> SUBSCRIBE channel1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
  1. 打开另外一个客户端, 给channel1发布消息 hello
1
publish channel1 hello

当前客户端会反馈

1
2
127.0.0.1:6379> publish channel1 hello
(integer) 1 # 返回的1是订阅者数量
  1. 订阅客户端可以看到发送的消息
1
2
3
4
5
6
7
8
127.0.0.1:6379> SUBSCRIBE channel1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "message"
2) "channel1"
3) "hello" # 刚刚发布的信息

注:发布的消息没有持久化,如果在订阅的客户端收不到hello,只能收到订阅后发布的消息

Redis的新数据类型

Bitmaps位图

setbit

setbit设置Bitmaps中某个偏移量的值(0或1)

*offset:偏移量从0开始

实例: 每个独立用户是否访问过网站存放在Bitmaps中, 将访问的用户记做1, 没有访问的用户记做0, 用偏移量作为用户的id。

设置键的第offset个位的值(从0算起) , 假设现在有6个用户,userid=1, 3, 5 的用户对网站进行了访问, 那么当前Bitmaps初始化结果如图

Bitmaps 0 1 0 1 0 1
用户 0 1 2 3 4 5

注: 很多应用的用户id以一个指定数字(例如10000) 开头, 直接将用户id和Bitmaps的偏移量对应势必会造成一定的浪费, 通常的做法是每次做setbit操作时将用户id减去这个指定数字。

在第一次初始化Bitmaps时, 假如偏移量非常大, 那么整个初始化过程执行会比较慢, 可能会造成Redis的阻塞。

getbit

获取Bitmaps中某个偏移量的值 : getbit<key><offset>

例如(假如刚刚的案例为k1): getbit k1 1 , 那么则返回的是 1

bitcount

统计字符串被设置为1的bit数。一般情况下,给定的整个字符串都会被进行计数,通过指定额外的 start 或 end 参数,可以让计数只在特定的位上进行。start 和 end 参数的设置,都可以使用负数值:比如 -1 表示最后一个位,而 -2 表示倒数第二个位,start、end 是指bit组的字节的下标数,二者皆包含.

统计字符串从start字节到end字节比特值为1的数量 : bitcount<key>[start end]

举例: K1 【01000001 01000000 00000000 00100001】,对应【0,1,2,3】

bitcount K1 1 2 : 统计下标1、2字节组中bit=1的个数,即01000000 00000000

–>bitcount K1 1 2  –> 1

bitcount K1 1 3 : 统计下标1、3字节组中bit=1的个数,即01000000 00000000 00100001

–> bitcount K1 1 3  –> 3

bitcount K1 0 -2 : 统计下标0到下标倒数第2,字节组中bit=1的个数,即01000001 01000000 00000000

–> bitcount K1 0 -2  –> 3

注意:redis的setbit设置或清除的是bit位置,而bitcount计算的是byte位置。

bitop

bitop and(or/not/xor) <destkey> [key…]

bitop是一个复合操作, 它可以做多个Bitmaps的and(交集) 、 or(并集) 、 not(非) 、 xor(异或) 操作并将结果保存在destkey中。

实例 2020-11-04 日访问网站的userid=1,2,5,9。

setbit users:20201104 1 1

setbit users:20201104 2 1

setbit users:20201104 5 1

setbit users:20201104 9 1

2020-11-03 日访问网站的userid=0,1,4,9。

setbit users:20201103 0 1

setbit users:20201103 1 1

setbit users:20201103 4 1

setbit users:20201103 9 1

计算出两天都访问过网站的用户数量

bitop and users:and:20201104_03 users:20201103 users:20201104

users:and:20201104_03 -> 20201104和20201103的

Bitmaps与set对比

假设网站有1亿用户, 每天独立访问的用户有5千万, 如果每天用集合类型和Bitmaps分别存储活跃用户可以得到表

set和Bitmaps存储一天活跃用户对比
数据 类型 每个用户id占用空间 需要存储的用户量 全部内存量
集合 类型 64位 50000000 64位*50000000 = 400MB
Bitmaps 1位 100000000 1位*100000000 = 12.5MB

很明显, 这种情况下使用Bitmaps能节省很多的内存空间, 尤其是随着时间推移节省的内存还是非常可观的

set和Bitmaps存储独立用户空间对比
数据类型 一天 一个月 一年
集合类型 400MB 12GB 144GB
Bitmaps 12.5MB 375MB 4.5GB

但Bitmaps并不是万金油, 假如该网站每天的独立访问用户很少, 例如只有10万(大量的僵尸用户) , 那么两者的对比如下表所示, 很显然, 这时候使用Bitmaps就不太合适了, 因为基本上大部分位都是0。

set和Bitmaps存储一天活跃用户对比(独立用户比较少)
数据类型 每个userid占用空间 需要存储的用户量 全部内存量
集合类型 64位 100000 64位*100000 = 800KB
Bitmaps 1位 100000000 1位*100000000 = 12.5MB

HyperLogLog统计

在工作当中,我们经常会遇到与统计相关的功能需求,比如统计网站PV(PageView页面访问量),可以使用Redis的incr、incrby轻松实现。

但像UV(UniqueVisitor,独立访客)、独立IP数、搜索记录数等需要去重和计数的问题如何解决?这种求集合中不重复元素个数的问题称为基数问题。

Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。

在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。

但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。

什么是基数?

比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5。 基数估计就是在误差可接受的范围内,快速计算基数

pfadd

添加指定元素到 HyperLogLog 中: pfadd <key>< element> [element ...]

例如:

1
2
pfadd program "redis"   # 往program里面添加redis
pfadd program "kafka"

将所有元素添加到指定HyperLogLog数据结构中。如果执行命令后HLL估计的近似基数发生变化,则返回1,否则返回0

在此时例如再往program添加redis 返回的结果是0,因为基数没有发生变化

pfcount

计算HLL的近似基数: pfcount<key> [key ...] ,可以计算多个HLL,比如用HLL存储每天的UV,计算一周的UV可以使用7天的UV合并计算即可, 示例

pfmerge

将一个或多个HLL合并后的结果存储在另一个HLL中: pfmerge<destkey><sourcekey> [sourcekey ...] ,比如每月活跃用户可以使用每天的活跃用户来合并计算可得

Geospatial地理信息

Redis 3.2 中增加了对GEO类型的支持。GEO,Geographic,地理信息的缩写。该类型,就是元素的2维坐标,在地图上就是经纬度。redis基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度Hash等常见操作

geoadd添加地理位置

添加地理位置(经度,纬度,名称): geoadd<key>< longitude><latitude><member> [longitude latitude member...]

实例

1
2
geoadd china:city 121.47 31.23 shanghai
geoadd china:city 106.50 29.53 chongqing 114.05 22.52 shenzhen 116.38 39.90 beijing

两极无法直接添加,一般会下载城市数据,直接通过 Java 程序一次性导入。

有效的经度从 -180 度到 180 度。有效的纬度从 -85.05112878 度到 85.05112878 度。

当坐标位置超出指定范围时,该命令将会返回一个错误。

已经添加的数据,是无法再次往里面添加的。

geopos地区坐标值

获得指定地区的坐标值: geopos <key><member> [member...]

实例

1
2
3
127.0.0.1:6379> geopos china:city shanghai
1) 2) "121.47000163793563843"
2) "31.22999903975783553"

geodist直线距离

获取两个位置之间的直线距离geodist<key><member1><member2> [m|km|ft|mi ]

实例

获取两个位置之间的直线距离

1
2
127.0.0.1:6379> geopos china:city beijin shanghai km
"1087.4816"

单位:

m 表示单位为米[默认值]。

km 表示单位为千米。

mi 表示单位为英里。

ft 表示单位为英尺。

如果用户没有显式地指定单位参数, 那么 GEODIST 默认使用米作为单位

georadius找某一半径内的元素

以给定的经纬度为中心,找出某一半径内的元素 georadius<key>< longitude><latitude>radius m|km|ft|mi经度 纬度 距离 单位

实例

1
2
3
127.0.0.1:6379> geopos china:city 110 30 1000 km
1) "chongqing"
2) "shengzhen"

Redis在SpringBoot的配置

导入依赖

SpringBoot2.x

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- spring2.X集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

RedisConfig设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {

@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}

@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}

Redis事务、锁机制、秒杀★

Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

Redis事务的主要作用就是串联多个命令防止别的命令插队

Multi、Exec、discard

Multi:类似于开启事务 Transational(value=true)

从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行

组队的过程中可以通过discard来放弃组队

案例: 当前我有两个命令 设置k1为1,k2为2 , 控制台如下:

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> multi    # 进入事务模式
OK
127.0.0.1:6379> set k1 1 # 设置k1为1,并且进入队列
QUEUED
127.0.0.1:6379> set k2 2 # 设置k2为2,并且进入队列
QUEUED
127.0.0.1:6379> exec # 提交队列,依次执行
1) OK
2) OK

组队阶段报错,提交失败

1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k3 3
QUEUED
127.0.0.1:6379> set k4
(error) ERR wrong number of arguments for 'set' command
127.0.0.1:6379> set k5 5
QUEUED
127.0.0.1:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.

组队成功,提交有成功有失败情况

1
2
3
4
5
6
7
8
9
10
11
12
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 k
QUEUED
127.0.0.1:6379> incr k1
QUEUED
127.0.0.1:6379> set k2 k
QUEUED
127.0.0.1:6379> exec
1) OK
2) (error) ERR value is not an integer or out of range
3) OK

事务的错误处理

组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消(组队时是类似于原子性的)

如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚

事务冲突的问题

例子:

在一个账户中,一个请求想给金额减8000

一个请求想给金额减5000

一个请求想给金额减1000

开始金额 逻辑判断 操作结果 最后结果
if 10000>8000 then -8000 -8000
10000 if 10000>8000 then -5000 -5000 -4000
if 10000>8000 then -1000 -1000

悲观锁

悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁表锁等,读锁写锁等,都是在做操作之前先上锁

乐观锁

乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。

WATCH key [key . . .]

监视key: 在执行multi之前,先执行watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。乐观锁版本号控制

unwatch

取消 WATCH 命令对所有 key 的监视。

如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH 了。

EXEC — Redis 命令参考 (redisfans.com)

Redis事务三特性

单独的隔离操作

事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

没有隔离级别的概念

队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行

不保证原子性

事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

Redis事务秒杀案例

解决计数器和人员记录的事务操作

商品库存: ( 减个数 )

key string
sk:prodid:qt 剩余个数

秒杀成功者清单: ( 加个数, set集合去重 )

key set
成功者的user_id
sk:prod-id:user 成功者的user_id
成功者的user_id

使用简单的代码模拟秒杀流程代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//秒杀过程 模拟
public static boolean doSecKill(String uid,String prodid) throws IOException {
//1 uid和prodid非空判断
if(uid == null || prodid == null) {
return false;
}

//2 连接redis
// Jedis jedis = new Jedis("192.168.44.168",6379);
//====(连接池)====(解决超时问题)
//通过连接池得到jedis对象
JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedisPoolInstance.getResource();

//3 拼接key
// 3.1 库存key
String kcKey = "sk:"+prodid+":qt";
// 3.2 秒杀成功用户key
String userKey = "sk:"+prodid+":user";
//====(乐观锁)====
//监视库存
jedis.watch(kcKey);

//4 获取库存,如果库存null,秒杀还没有开始
String kc = jedis.get(kcKey);
if(kc == null) {
System.out.println("秒杀还没有开始,请等待");
jedis.close();
return false;
}

// 5 判断用户是否重复秒杀操作
if(jedis.sismember(userKey, uid)) {
System.out.println("已经秒杀成功了,不能重复秒杀");
jedis.close();
return false;
}

//6 判断如果商品数量,库存数量小于1,秒杀结束
if(Integer.parseInt(kc)<=0) {
System.out.println("秒杀已经结束了");
jedis.close();
return false;
}

//7 秒杀过程
//使用事务
Transaction multi = jedis.multi();

//组队操作
multi.decr(kcKey); // 减少库存
multi.sadd(userKey,uid);// 加入用户

//执行
List<Object> results = multi.exec();
if(results == null || results.size()==0) {
System.out.println("秒杀失败了....");
jedis.close();
return false;
}
// 没有添加事务,会超卖
//7.1 库存-1
//jedis.decr(kcKey);
//7.2 把秒杀成功用户添加清单里面
//jedis.sadd(userKey,uid);

System.out.println("秒杀成功了..");
jedis.close();
return true;
}

总结: 当前案例代码模拟的秒杀, 先获取秒杀用户key和秒杀商品的key, 使用Redis的Watch监视库存, 判断库存情况, 如果存在库存, 开始秒杀, 在秒杀过程中判断用户是否重复秒杀, 以及判断库存是否不足, 开启事物, 进行组队(队列)操作, 然后执行exec(), 根据结果(List results)判断结果如何, 流程结束 (上述代码为完整的流程,加了连接池,加了乐观锁)

Redis事务秒杀并发模拟

使用工具ab模拟测试 (CentOS6 默认安装 CentOS7需要手动安装)

联网:yum install httpd-tools

无网络:(1) 进入cd /run/media/root/CentOS 7 x86_64/Packages(路径跟centos6不同)

顺序安装 :

apr-1.4.8-3.el7.x86_64.rpm

apr-util-1.5.2-6.el7.x86_64.rpm

httpd-tools-2.4.6-67.el7.centos.x86_64.rpm

通过ab测试

vim postfile 模拟表单提交参数,以&符号结尾;存放当前目录。

内容:prodid=0101&

ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded http://127.0.0.1:8081/Seckill/doseckill

2000 个 请求中有200个是并发执行

-n 请求数, -c 请求中的并发数量, -p 提交参数, -t 参数类型

127.0.0.1:8081: 实际上要加上当前主机的ip地址 ipconfig

还可以通过Apipost等工具进行压测并发请求

超卖

Redis中商品数

1
2
3
4
127.0.0.1:6379> set sk:0101:qt 10
OK
127.0.0.1:6379> get sk:0101:qt
"-2" # 在并发的时候库存变成了-2

在后台

1
2
3
4
5
6
7
8
9
秒杀成功!
秒杀成功!
秒杀成功!
秒杀成功!
已秒光!
秒杀成功!
已秒光!
已秒光!
已秒光!
超卖问题
总库存 用户请求 操作 库存
A :1.检查是否还有库存 2. 有则-1 -1
10 (总) B: 1.检查是否还有库存 2. 有则-1 -1 -N (总)
C: 1.检查是否还有库存 2. 有则-1 -1
…. …
利用乐观锁淘汰用户,解决超卖问题

对库存的key不断监视

1
jedis.watch(kcKey); //★

对秒杀的过程使用事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//★使用事务
Transaction multi = jedis.multi();

//★组队操作
multi.decr(kcKey); // 减少库存
multi.sadd(userKey,uid);// 加入用户,

// 执行
List<Object> results = multi.exec();
if(results == null || results.size() == 0) { // 结果为空或者有其他的情况->失败 否则成功
log.info("秒杀失败了....");
jedis.close();
return false;
}

总结: 先Watch监视key, 然后加入事务组队, 最后执行, 判断返回的list集合

走到这里了, 解决了超卖问题(加乐观锁), 解决了连接超时问题(使用JedisPool连接池), 但是发现还有库存遗留问题. 乐观锁造成库存遗留问题

解决库存遗留问题
Lua脚本

Lua 是一个小巧的 脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。

很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。

LUA脚本在Redis中的优势

将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能

LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。

但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。

利用lua脚本淘汰用户,解决超卖问题。

redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题

Lua 教程_w3cschool

解决库存依赖问题,lua脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
local userid=KEYS[1]; -- 获取用户id
local prodid=KEYS[2]; -- 获取商品id
-- key拼接
local qtkey="sk:"..prodid..":qt";
local usersKey="sk:"..prodid.":usr';
-- 判断用户key是否在当前清单中存在
local userExists=redis.call("sismember",usersKey,userid);
-- 用户已经秒杀过了,约定返回2
if tonumber(userExists)==1 then
return 2;
end
local num= redis.call("get" ,qtkey);
-- 秒杀结束
if tonumber(num)<=0 then
return 0;
-- 否则添加清单,减少库存
else
redis.call("decr",qtkey);
redis.call("sadd",usersKey,userid);
end
-- 秒杀成功
return 1;

对于这个脚本如何使用?其实就是Lua脚本, 例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class RedisByScript {
// ===Lua脚本===
static String secKillScript ="local userid=KEYS[1];\r\n" +
"local prodid=KEYS[2];\r\n" +
"local qtkey='sk:'..prodid..\":qt\";\r\n" +
"local usersKey='sk:'..prodid..\":usr\";\r\n" +
"local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" +
"if tonumber(userExists)==1 then \r\n" +
" return 2;\r\n" +
"end\r\n" +
"local num= redis.call(\"get\" ,qtkey);\r\n" +
"if tonumber(num)<=0 then \r\n" +
" return 0;\r\n" +
"else \r\n" +
" redis.call(\"decr\",qtkey);\r\n" +
" redis.call(\"sadd\",usersKey,userid);\r\n" +
"end\r\n" +
"return 1" ;

static String secKillScript2 =
"local userExists=redis.call(\"sismember\",\"{sk}:0101:usr\",userid);\r\n" +
" return 1";
// 秒杀流程封装, 结合Lua
public static boolean doSecKill(String uid,String prodid) throws IOException {

JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis=jedispool.getResource();

//String sha1= .secKillScript;
// 调用Jedis加载脚本方法,加载刚刚描写的lua脚本
String sha1= jedis.scriptLoad(secKillScript);
Object result= jedis.evalsha(sha1, 2, uid,prodid);

String reString=String.valueOf(result);
if ("0".equals( reString ) ) {
System.err.println("已抢空!!");
}else if("1".equals( reString ) ) {
System.out.println("抢购成功!!!!");
}else if("2".equals( reString ) ) {
System.err.println("该用户已抢过!!");
}else{
System.err.println("抢购异常!!");
}
jedis.close();
return true;
}
}