Kafka - 消息回查机制的实现方案
消息回查机制依赖于消息队列的支持,目前 RocketMQ 是支持的,但是不幸的是 Kafka 和 RabbitMQ 都不支持。本文将讲解怎么在 Kafka 的基础上实现消息回查。
1,什么是消息回查机制?
(1)所谓的消息回查机制是指消息队列允许你在发送消息的时候,先发一个准备请求,里面带着我们的消息。这个时候消息队列并不会把消息转交给消费者,而是当业务完成之后,我们需要再发一个确认请求,这时候消息中间件才会把消息转交给消费者。
- 显然,这就是两阶段提交的一个简单应用,所以这个也被叫做事务消息。
(2)如果我的业务成功了,但是我的确认请求没发。这时候就有了消息回查,也就是消息在没收到确认请求的时候,会反过来问一下你,这个消息要不要交给消费者。
2,整体流程
Kafka 在其核心设计中不提供原生的消息回查(message replay)机制,但我们可以通过一些手动的方式来实现类似的功能,基本步骤如下:
- 应用代码把准备消息发送到 topic=look_back_msg 上。里面包含业务 topic、消息体、业务类型、业务 ID、准备状态、回查接口。
- 回查中间件消费这个 look_back_msg,把消息内容存储到数据库里。
- 应用代码执行完业务操作之后,再发送一个消息到 look_back_msg 上,带上业务类型、业务 ID 和提交状态这些信息。如果应用代码执行业务出错了,那么就使用回滚状态。
- 回查中间件查询消息内容,转发到业务 topic 上。
3,回查的实现
(1)如果业务方最终没有发送提交消息,那么回查中间件会找出这些长时间没提交的消息,执行回查。回查中间件执行回查的关键点是利用准备消息中带着的回查接口配置来发起调用。可以回查 HTTP 接口,也可以回查 RPC 接口:
- 对于 HTTP 调用来说,业务方需要提供回查 URL。
- 对于 RPC 调用来说,业务方需要实现提供的回查接口,然后提供对应的服务名。
- 我们在回查的时候会带上业务类型和业务 ID,业务方需要告诉我这个消息能不能提交,也就是要不要发给业务 topic。
(2)举个例子,如果是回查一个订单业务的 HTTP 接口,那么业务方需要告诉我们回查 URL,那么回查中间件发出的请求就类似于下面这样。
method: POST URL: https://abc.com:8080/order/lookback Body: { "biz_type": "order", "biz_id": "oid-123" }
- 业务方返回的响应:
Body: { "biz_type": "order", "biz_id": "oid-123", "status": "提交" //如果业务没成功,那么可以是回滚 }
(3)而如果是 RPC 接口,回查中间件就可以定义一个接口,要求所有的业务方都实现这个接口。
type MsgLookBack interface{ LookBack(bizType string, bizID int) Status }
- 然后业务方需要提供服务名字,比如说 hangge.com.order.msg_look_back,回查中间件会利用 RPC 的泛化调用功能,发起调用。
4,数据存储的实现
(1)为了保证这个回查机制的高性能和高可靠,我们可以使用分区表。我按照时间进行分区,并且历史分区是可以快速归档的,毕竟这个回查机制使用的数据库只是临时存储一下消息数据而已。
(2)当然,后续随着业务扩展,也可以考虑使用分库分表的,比如说按照业务 topic 来分库分表。
提示:这部分和在延迟队列里的数据存储设计差不多。在延迟队列中,我们可以考虑使用分区表、交替表或者分库分表来存储延迟消息。这里我们同样可以用分区表、交替表和分库分表来存储回查消息。
5,有序消息的实现
(1)这个方案是要求准备消息和提交消息是有序的,也就是说,同一个业务的准备消息一定要先于提交消息。
(2)解决方案也很简单,在发送的时候要求业务方按照业务 ID 计算一个哈希值,然后除以分区数量的余数,就是目标分区。