Kafka - 保证消息不会重复消费的实现方案(布隆过滤器、 Redis、 唯一索引)
在高并发场景下要解决消息重复发送或者重复消费问题,思路基本上一致,就是把消费者设计成幂等的。也就是说,同一个消息,不管消费多少次,系统状态都是一样的。本文我将详细讲解如何实现一个强大的高并发幂等方案。
一、基本概念介绍
1,重复消费的原因
(1)重复消费的可能原因有 2 种:
- 生产者重复发送。比如说我们的业务在发送消息的时候,收到了一个超时响应,这个时候我们很难确定这个消息是否真的发送出去了,那么我们就会考虑重试,重试就可能导致同一个消息发送了多次。
- 消费者重复消费。比如说我们在处理消息完毕之后,准备提交了。这个时候突然宕机了,没有提交。等恢复过来,我们会再次消费同一个消息。
(2)避免重复消费的原则就是:一定要把消费逻辑设计成幂等的。我们的微服务也要尽可能设计成幂等的,这样上游就可以利用重试来提高可用性了。
提示:现在大多数消息中间件都声称自己实现了恰好一次(exactly once)语义,都是依赖于重试和幂等来达成的。
2,布隆过滤器
(1)布隆过滤器(Bloom Filter)是一种数据结构,它可以用于检索一个元素是否在一个集合里。它的优点是空间效率和查询时间都远远超过一般的算法。缺点是存在假阳性的问题,还有对于删除操作不是很友好。
(2)布隆过滤器的基本思路是当集合加入某个元素的时候,通过哈希算法把元素映射到比特数组的 N 个点,把对应的比特位设置成 1。
(3)在查找的时候,只需要看对应的比特位是不是 1,就可以粗略判断集合里有没有这个元素。
- 如果查询的元素对应的 N 个点都是 1,那么这个元素可能存在。如果布隆过滤器认为一个元素存在,但是实际上并不存在,也叫做假阳性。
- 任何一个点是 0,那么这个元素必然不存在。
(4)因此布隆过滤器经常和其他手段结合在一起判断某个元素在不在。它在判断某个元素必定不存在的场景下,效果非常好。
3,bit array(bit map)
(1)bit array(也叫做 bit map)和布隆过滤器类似,它也是用一个比特位来代表元素是否存在,1 代表存在,0 代表不存在。
(2)它和布隆过滤器的核心区别是它不需要哈希函数,因为它的值本身就是一个数字。比如说,你的用户 ID 是数字,那么你就可以把 ID 当成 bit array 的下标,对应位置的比特位是 1。
(3)并且,bit array 不存在假阳性的说法,它的判断是精确的。
二、方案1:利用唯一索引
1,基本思路
最简单的幂等方案就是利用唯一索引,比如说:
- 在业务处理的时候,先根据消息内容往业务表里面插入一条数据,而这个业务表上有唯一索引。
- 如果插入成功了就说明这个消息没有被处理过,可以继续处理。
- 如果插入的时候出现了唯一索引错误,那就说明这个消息之前被处理过了。
2,本地事务将数据插入到唯一索引
要使用唯一索引,最好的方式是把唯一索引和业务操作组合在一起,做成一个事务。
- 也就是在收到消息的时候先开启事务,把数据插入到唯一索引,然后执行业务操作,最后提交事务。
- 提交事务之后,就认为业务已经成功了,就算接下来提交消息失败了也没关系,因为后面重复请求还是会被唯一索引拦下来。
3,非本地事务将数据插入到唯一索引
(1)在不能使用本地事务的时候(比如说在分库分表的条件下),实现幂等就更加麻烦了,因为我们不能再把业务操作和将数据插入到唯一索引这两步做成原子操作。无法保证要么都成功,要么都失败。这时候,只能追求最终一致性,依赖于一个第三方来检测了。
(2)基本步骤如下:
- 首先把数据插入到唯一索引里面,避免重复消费,这个时候数据保持在初始化状态。
- 然后执行业务操作。
- 最后在执行业务操作之后,把唯一索引里的数据更新为成功状态。
(3)那么会出问题的地方就是第二步成功了,但是第三步失败了。在这种场景下,需要启动一个用一个异步检测系统,这个异步检测系统会定时扫描唯一索引的表,然后再去扫描业务表。这个时候会有两种情况:
- 如果业务表的数据表明业务操作已经处理成功了,那么这个异步检测系统就会把唯一索引更新为成功状态。
- 如果业务表的数据表明业务操作没有成功,那么异步检测系统可以直接触发重试。
三、方案2:布隆过滤器 + Redis + 唯一索引
1,方案说明
(1)方案综合运用了大数据处理中常见的布隆过滤器、Redis 和唯一索引。从思路上来说,就是四个字层层削流。从目标上来说,就是确保到达数据库的流量最小化。
- 如果依赖于数据库唯一索引来判断幂等,那么数据库撑不住高并发。所以我们就想办法在使用唯一索引之前,尽可能先削减流量。
- 这个场景就非常适合使用布隆过滤器。而为了避免假阳性的问题,进一步降低发送到数据库的流量,在布隆过滤器和数据库之间再引入一个 Redis。
(2)基本流程如下:
- 首先,一个请求过来的时候,我们会利用布隆过滤器来判断它有没有被处理过。如果布隆过滤器说没有处理过,那么就确实没有被处理过,可以直接处理。如果布隆过滤器说处理过(可能是假阳性),那么就要执行下一步。
- 第二步就是利用 Redis 存储近期处理过的 key。如果 Redis 里面有这个 key,说明它的确被处理过了,直接返回,否则进入第三步。这一步的关键就是 key 的过期时间应该是多长。
- 第三步则是利用唯一索引,如果唯一索引冲突了,那么就代表已经处理过了。这个唯一索引一般就是业务的唯一索引,并不需要额外创建一个索引。
2,更新顺序
(1)如果业务方是第一次执行这个请求,它需要把更新数据库的操作放到自己的业务本地事务里面。等业务方提交的时候,一起提交。
(2)在数据库事务提交之后,再去更新布隆过滤器和 Redis。这时候失败了影响也不大,因为最终重复请求被处理的时候,会因为唯一索引冲突而报错,这时候我们就知道这个请求是重复请求了。
提示:可以说这个方案最终都是要依靠唯一索引来兜底的。也就是说不管什么原因导致布隆过滤器或者 Redis 没生效,只要跑到了插入唯一索引这一步,都可以确保你最终不会重复处理消息或者请求。
3,Redis key 的过期时间
(1)Redis 的作用就是在布隆过滤器之后进一步削减流量,而 key 的过期时间就决定了削减流量的效果,所以只需要确保重复请求过来的时候,这个 key 还没过期就可以了。
- 举个例子,假如说预计某个特定业务的重试请求会在 10 分钟内到达,那么可以把过期时间设置成 11 分钟,多出来的一分钟就是余量。
(2)但是如果并发非常高,以至于 key 非常多,也要考虑 Redis 能不能放下这么多 key。另外一个就是有些业务的重试间隔非常长,比如说一小时,这就不太适合引入 Redis 了。可以考虑下面这个简化版本的方案。
三、方案3:“布隆过滤器 + 唯一索引” 或 “Redis + 唯一索引”
1,“布隆过滤器 + 唯一索引”
该简化方案就是只用布隆过滤器和唯一索引。如果 Redis 资源不足,又或者重复请求间隔太长,导致使用 Redis 的效果不好,那么就比较适合用这个简化方案。
2,“Redis + 唯一索引”
该简化方案就是只用 Redis 和唯一索引。Redis 资源多,又或者担心布隆过滤器的假阳性问题,就可以采用这个方案。
附一:使用本地布隆过滤器提高性能
1,基本思路
(1)在性能要求非常苛刻的情况下,可以考虑使用本地布隆过滤器。具体来说,利用一致性哈希等类型的算法执行负载均衡,确保同一个 key 的请求落到同一个实例上,那么我就可以在这台机器上使用基于本地内存的布隆过滤器了。
(2)当然,在消息队列的场景下,这个问题就变成了在发消息的时候要把同一个业务的消息发到同一个分区。这样就可以在消费者身上使用基于本地内存的布隆过滤器了。
2,重建本地布隆过滤器
本地布隆过滤器在服务器重新启动之后,重建起来也很简单,基本上有两种思路:
- 一种就是不用重建,直接处理新请求。在处理新请求的过程中,逐步重建起布隆过滤器。这种思路适合时效性很强的请求,比如说今天就不可能收到昨天的重复请求这种场景。
- 另外一种思路就是利用过去一段时间的数据,比如说我预计我今天收到的重复请求最多来自三天前,那么我就用这三天内处理过的请求来构建布隆过滤器。
附二:使用 bit array 提高性能
1,基本思路
(1)使用本地布隆过滤器就已经很快了,但如果还想进一步提高性能,那可以用更加高效的数据结构:bit array。如果 key 本身就是数字,比如数据库某张表的自增主键,像这种情况下,bit array 性能更好,并且更能节省内存。
(2)使用 bit array 有两种策略:
- 一种策略就是直接在前面介绍方案的基本流程的时候,我们就把布隆过滤器换成 bit array。
- 另一种策略方案就是可以自由切换到 bit array 上。具体来说,我们在布隆过滤器这边做了一个抽象,也就是说,对于一些业务,其实我这里可以把布隆过滤器换成 bit array 的结构。比如说某个业务要求判断幂等,用的就是业务的自增主键,那么他们就可以使用 bit array 这个实现来判断幂等。
2,bit array 小技巧
(1)有些时候为了防止竞争对手猜到自己的业务量,自增主键不是从 0 开始的,又或者在使用分布式 ID 的时候,ID 也不是从 0 开始的。
(2)假如说自增主键的起点是 N,那么在 bit array 的第一位就可以表示为 N,第二位表示为 N + 1,这样就能进一步节省内存。
- 假如说最小的 ID 是 5,000,000。那么 bit array 里的第一个比特位就可以表示为 5,000,000,第二个比特位就表示为 5,000,001。