Flink - 双十一总金额、TopN商品统计案例实操(Kafka数据实时计算、输出至Redis)
一、功能说明
1,整体架构介绍
(1)下面是一个某电商网站数据大屏整体架构:
- 当用户在 PC 或 App 上提交一个订单后,业务系统会通过日志的方式记录这条订单的相关数据。
- 使用 Filebeat 这个日志采集工具采集前端业务机器上的日志数据。这里采集的日志数据其实就是用户的订单数据。
- 通过 Filebeat 将订单数据采集到 Kafka 中进行缓冲,避免数据峰值给后端的计算系统造成太大的压力。
- 使用 Flink 开发实时计算程序对 Kafka 中的数据进行处理,Flink 内部涉及对接 Kafka 数据源、数据解析、数据过滤及数据聚合之类的功能,在计算出需要的结果数据后将结果数据输出到 Redis 中。
- 通过 DataV 查询 Redis 中的数据并进行展示。在对接时会用到数据接口,需要提前封装好数据接口,在对接时直接在 DataV 中调用即可。

(2)本文样例主要实现 Flink 实时数据计算部分的核心代码,即对 Kafka 中的数据进行处理,并将计算出的结果数据输出到 Redis 中。前面的日志数据采集和后面的数据展示模块暂不实现。
2,需求描述
(1)“双十一”数据大屏中展现的指标和图表有近 20 个,这里主要计算两个核心指标。
- 第 1 个指标:统计全站“双十一”当天的实时 GMV(总的成交金额)。
- 第 2 个指标:统计实时销量 Top N 的商品品类。
(2)在此项目中需要用到的订单数据的格式如下:
- 这是一个 JSON 格式的数据,表示是用户的一个订单数据信息,其中包含订单编号、订单总金额和订单明细。
- 订单明细中存储了订单中商品的基本信息(包括商品编号、商品数量、商品单价、商品名称和商品品类)。
{ "detal": [ { "goodsCount": 1, "goodsName": "美的 606 升", "goodsNo": "6002", "goodsPrice": 100, "goodsType": "冰箱" }, { "goodsCount": 2, "goodsName": "荣耀 30Pro", "goodsNo": "1005", "goodsPrice": 200, "goodsType": "手机" } ], "orderNo": "9fc3ffe3-3255-414e-8df8-1580cfbd2ff9", "totalPrice": 500 }
二、程序开发(Java 版)
1,添加依赖
我们创建一个 Maven 项目,然后在 pom.xml 文件中添加 Flink、Redis、JSON 依赖。
<!-- Flink 依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.16.0</version> </dependency> <!-- Redis 依赖 --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <!-- JSON 依赖 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency>
2,自定义 Sink 代码
提示:由于 Flink 内置的 RedisSink 组件无法满足需求,所以这里自定义了 RedisSink 组件。
(1)下面是用户输出实时总成交金额的自定义 Sink 代码:
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import redis.clients.jedis.Jedis; public class GmvRedisSink extends RichSinkFunction<Long> { private String host; private int port; private String password; private String key; private Jedis jedis = null; public GmvRedisSink(String host, int port, String password, String key) { this.host = host; this.port = port; this.password = password; this.key = key; } /** * 初始化方法,只执行一次 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { this.jedis = new Jedis(host, port); if (password!= null &&!password.isEmpty()) { jedis.auth(password); } } /** * 核心代码,来一条数据此方法会执行一次 * * @param value * @param context * @throws Exception */ @Override public void invoke(Long value, Context context) throws Exception { //对 GMV 数据进行递增操作 jedis.incrBy(key, value); } /** * 任务停止时会先调用此方法 * 适合关闭资源链接 * * @throws Exception */ @Override public void close() throws Exception { //关闭链接 if (jedis != null) { jedis.close(); } } }
(2)下面是用户输出实时销量 Top N 的商品品类的自定义 Sink 代码:
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import redis.clients.jedis.Jedis; public class TopNRedisSink extends RichSinkFunction<Tuple2<String, Long>> { private String host; private int port; private String password; private String key; private Jedis jedis = null; public TopNRedisSink(String host, int port, String password, String key) { this.host = host; this.port = port; this.password = password; this.key = key; } /** * 初始化方法,只执行一次 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { this.jedis = new Jedis(host, port); if (password != null &&!password.isEmpty()) { jedis.auth(password); } } /** * 核心代码,来一条数据此方法会执行一次 * * @param value * @param context * @throws Exception */ @Override public void invoke(Tuple2<String, Long> value, Context context) throws Exception { //给 sortedset 中的指定元素递增添加分值 jedis.zincrby(key, value.f1, value.f0); } /** * 任务停止时会先调用此方法 * 适合关闭资源链接 * * @throws Exception */ @Override public void close() throws Exception { //关闭链接 if (jedis != null) { jedis.close(); } } }
3,Flinl 核心程序代码
代码具体流程是:
- 从 Kafka 中读取订单数据,使用 FlatMap 将订单中的商品信息提取并转换为 Tuple3。
- 过滤掉异常数据(商品数量小于等于 0)。
- 对订单数据进行两项实时统计:GMV 统计和实时销量 Top N 的商品品类统计。
- 将 GMV 和 Top N 的统计结果写入 Redis 中。
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class StreamDataCalcJava { public static void main(String[] args) throws Exception { //获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //指定 KafkaSource 相关配置 KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("192.168.121.128:9092") .setTopics("order_detail") .setGroupId("con") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); //指定 Kafka 作为 Source DataStreamSource<String> text = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source"); //解析订单数据,只保留需要用到的字段 //goodsCount、goodsPrice、goodsType SingleOutputStreamOperator<Tuple3<Long, Long, String>> orderStream = text.flatMap(new FlatMapFunction<String, Tuple3<Long, Long, String>>() { public void flatMap(String line, Collector<Tuple3<Long, Long, String>> out) throws Exception { JSONObject orderJson = JSON.parseObject(line); //获取 JSON 数据中的商品明细 JSONArray orderDetail = orderJson.getJSONArray("detal"); for (int i = 0; i < orderDetail.size(); i++) { JSONObject orderObj = orderDetail.getJSONObject(i); long goodsCount = orderObj.getLongValue("goodsCount"); long goodsPrice = orderObj.getLongValue("goodsPrice"); String goodsType = orderObj.getString("goodsType"); out.collect(new Tuple3<Long, Long, String>(goodsCount, goodsPrice, goodsType)); } } }); //过滤异常数据 SingleOutputStreamOperator<Tuple3<Long, Long, String>> filterStream = orderStream.filter(new FilterFunction<Tuple3<Long, Long, String>>() { public boolean filter(Tuple3<Long, Long, String> tup) throws Exception { //商品数量大于 0 的数据才是有效数据 return tup.f0 > 0; } }); //1.统计全站“双十一”当天的实时 GMV SingleOutputStreamOperator<Long> gmvStream = filterStream .map(new MapFunction<Tuple3<Long, Long, String>, Long>() { public Long map(Tuple3<Long, Long, String> tup) throws Exception { //计算单个商品的消费金额 return tup.f0 * tup.f1; } }); //将 GMV 数据保存到 Redis 中 gmvStream.addSink(new GmvRedisSink("192.168.121.128", 6379, "123","gmv")); //2.统计实时销量 Top N 的商品品类 SingleOutputStreamOperator<Tuple2<String, Long>> topNStream = filterStream.map(new MapFunction<Tuple3<Long, Long, String>, Tuple2<String, Long>>() { //获取商品品类和购买的商品数量 public Tuple2<String, Long> map(Tuple3<Long, Long, String> tup) throws Exception { return new Tuple2<String, Long>(tup.f2, tup.f0); } }); //根据商品品类分组 KeyedStream<Tuple2<String, Long>, String> keyStream = topNStream.keyBy(tuple -> tuple.f0); //设置时间窗口为 1 s WindowedStream<Tuple2<String, Long>, String, TimeWindow> windowStream = keyStream .window(TumblingProcessingTimeWindows.of(Time.seconds(1))); //求和,指定 tuple 中的第 2 列,即商品数量 SingleOutputStreamOperator<Tuple2<String, Long>> resStream = windowStream.sum(1); //将 goods_type 数据保存到 Redis 中 resStream.addSink(new TopNRedisSink("192.168.121.128", 6379, "123","goods_type")); //执行任务 env.execute("StreamDataCalcJava"); } }
三、运行测试
(1)首先我们创建好 Kafka 的 topic。然后运行我们编写的 Flink 程序。
kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 1 --topic order_detail
(2)接着创建一个该主题的消息生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic order_detail
(3)然后发送如下两条测试数据:
(4)Flink 程序运行一段时间,并且成功计算了一批数据后,在 Redis 中可以看到相关的结果。
(2)下面是用户输出实时销量 Top N 的商品品类的自定义 Sink 代码:
{ "detal": [ { "goodsCount": 1, "goodsName": "美的 606 升", "goodsNo": "6002", "goodsPrice": 100, "goodsType": "冰箱" }, { "goodsCount": 2, "goodsName": "荣耀 30Pro", "goodsNo": "1005", "goodsPrice": 200, "goodsType": "手机" } ], "orderNo": "9fc3ffe3-3255-414e-8df8-1580cfbd2ff9", "totalPrice": 500 }
{ "detal": [ { "goodsCount": 2, "goodsName": "联想 Y50", "goodsNo": "8001", "goodsPrice": 300, "goodsType": "电脑" }, { "goodsCount": 1, "goodsName": "华为 Mate60", "goodsNo": "1006", "goodsPrice": 200, "goodsType": "手机" } ], "orderNo": "414effe3-3255-0cfb-8df8-1580cfb255f9", "totalPrice": 800 }
(4)Flink 程序运行一段时间,并且成功计算了一批数据后,在 Redis 中可以看到相关的结果。
- 下面是查看实时总成交金额:
get gmv

- 下面是查看实时销量 Top N 的商品品类
zrevrange goods_type 0 -1 withscores

附一:Scala 版程序代码
1,自定义 Sink 代码
提示:由于 Flink 内置的 RedisSink 组件无法满足需求,所以这里自定义了 RedisSink 组件。
(1)下面是用户输出实时总成交金额的自定义 Sink 代码:
import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import redis.clients.jedis.Jedis class GmvRedisSink extends RichSinkFunction[Long] { var host: String = _ var port: Int = _ var key: String = _ var password: String = _ var jedis: Jedis = _ /** * 构造函数 * * @param host * @param port * @param key * @param password */ def this(host: String, port: Int, password: String, key: String) { this() this.host = host this.port = port this.password = password this.key = key } /** * 初始化方法,只执行一次 * 适合初始化资源链接 * * @param parameters */ override def open(parameters: Configuration): Unit = { this.jedis = new Jedis(host, port) // 如果设置了密码,进行认证 if (this.password != null && this.password.nonEmpty) { this.jedis.auth(this.password) } } /** * 核心代码,来一条数据此方法会执行一次 * * @param value * @param context */ override def invoke(in: Long): Unit = { jedis.incrBy(key, in) } /** * 任务停止时会先调用此方法 * 适合关闭资源链接 */ override def close(): Unit = { //关闭链接 if (jedis != null) { jedis.close() } } }
(2)下面是用户输出实时销量 Top N 的商品品类的自定义 Sink 代码:
import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import redis.clients.jedis.Jedis class TopNRedisSink extends RichSinkFunction[(String, Long)] { var host: String = _ var port: Int = _ var key: String = _ var password: String = _ var jedis: Jedis = _ /** * 构造函数 * * @param host * @param port * @param key * @param password */ def this(host: String, port: Int, password: String, key: String) { this() this.host = host this.port = port this.password = password this.key = key } /** * 初始化方法,只执行一次 * 适合初始化资源链接 * * @param parameters */ override def open(parameters: Configuration): Unit = { this.jedis = new Jedis(host, port) // 如果设置了密码,进行认证 if (this.password != null && this.password.nonEmpty) { this.jedis.auth(password) } } /** * 核心代码,来一条数据此方法会执行一次 * * @param value * @param context */ override def invoke(in: (String, Long)): Unit = { jedis.zincrby(key, in._2, in._1) } /** * 任务停止时会先调用此方法 * 适合关闭资源链接 */ override def close(): Unit = { //关闭链接 if (jedis != null) { jedis.close() } } }
2,Flinl 核心程序代码
代码具体流程是:
- 从 Kafka 中读取订单数据,使用 FlatMap 将订单中的商品信息提取并转换为 Tuple3。
- 过滤掉异常数据(商品数量小于等于 0)。
- 对订单数据进行两项实时统计:GMV 统计和实时销量 Top N 的商品品类统计。
- 将 GMV 和 Top N 的统计结果写入 Redis 中。
import com.alibaba.fastjson.JSON import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object StreamDataCalcScala { def main(args: Array[String]): Unit = { //获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ // 指定 KafkaSource 相关配置 val kafkaSource = KafkaSource.builder[String] .setBootstrapServers("192.168.121.128:9092") .setTopics("order_detail") .setGroupId("con") .setStartingOffsets(OffsetsInitializer.earliest) .setValueOnlyDeserializer(new SimpleStringSchema()) .build //指定 Kafka 作为 Source val text = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source") //解析订单数据,将数据打平,只保留需要用到的字段 //goodsCount、goodsPrice、goodsType val orderStream = text.flatMap(line=>{ val orderJson = JSON.parseObject(line) val orderDetal = orderJson.getJSONArray("detal") val res = new Array[(Long,Long,String)](orderDetal.size()) for(i <- 0 until orderDetal.size()){ val orderObj = orderDetal.getJSONObject(i) val goodsCount = orderObj.getLongValue("goodsCount") val goodsPrice = orderObj.getLongValue("goodsPrice") val goodsType = orderObj.getString("goodsType") res(i) = (goodsCount,goodsPrice,goodsType) } res }) //过滤异常数据 val filterStreram = orderStream.filter(_._1 > 0) //1. 统计全站“双十一”当天的实时 GMV val gmvStream = filterStreram.map(tup=>tup._1 * tup._2) //计算单个商品的消费金额 //将 GMV 数据保存到 Redis 中 gmvStream.addSink(new GmvRedisSink("192.168.121.128",6379,"123","gmv")) //2.统计实时销量 Top N 的商品品类 val topNStream = filterStreram.map(tup=>(tup._3,tup._1)) //获取商品品类和购买的商品数量 .keyBy(tup=>tup._1) //根据商品品类分组 .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) //设置时间窗口为 1 s .sum(1) //指定 tuple 中的第 2 列,即商品数量 //将 goods_type 数据保存到 Redis 中 topNStream.addSink(new TopNRedisSink("192.168.121.128",6379,"123","goods_type")) //执行任务 env.execute("StreamDataCalcScala") } }
附二:自动提交 offset,防止 Kafka 消息重复消费
1,问题描述
上面样例程序我们运行一段时间后,重新启动时会发现程序又会从 Kafka 的第一条消息开始重新消费一遍,这样就造成 Redis 里面数据重复累计。
2,解决办法
要让程序重启后从上一次位置继续消费,我们将 KafkaSource 配置部分做如下修改,启用自动提交消费位移(offset)的功能,并将消费者的起始位移被设置为已提交的最早位移。
注意:OffsetResetStrategy.EARLIEST 表示如果没有先前的消费位移(例如,新的消费者组),则从最早的可用消息开始消费。这确保了即使是新的消费者组,也能从消息队列的开头开始消费。
// 指定 KafkaSource 相关配置 val kafkaSource = KafkaSource.builder[String] .setBootstrapServers("192.168.121.128:9092") .setTopics("order_detail") .setGroupId("con") .setProperty("enable.auto.commit","true") // 启用自动提交消费位移(offset)的功能 .setProperty("auto.commit.interval.ms","1000") // 设置自动提交消费位移的时间间隔为1秒 // 消费者的起始位移被设置为已提交的最早位移 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) //.setStartingOffsets(OffsetsInitializer.earliest) .setValueOnlyDeserializer(new SimpleStringSchema()) .build //指定 Kafka 作为 Source val text = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")