当前位置: > > > Flink - 双十一总金额、TopN商品统计案例实操(Kafka数据实时计算、输出至Redis)

Flink - 双十一总金额、TopN商品统计案例实操(Kafka数据实时计算、输出至Redis)

一、功能说明

1,整体架构介绍

(1)下面是一个某电商网站数据大屏整体架构:
  • 当用户在 PCApp 上提交一个订单后,业务系统会通过日志的方式记录这条订单的相关数据。
  • 使用 Filebeat 这个日志采集工具采集前端业务机器上的日志数据。这里采集的日志数据其实就是用户的订单数据。
  • 通过 Filebeat 将订单数据采集到 Kafka 中进行缓冲,避免数据峰值给后端的计算系统造成太大的压力。
  • 使用 Flink 开发实时计算程序对 Kafka 中的数据进行处理,Flink 内部涉及对接 Kafka 数据源、数据解析、数据过滤及数据聚合之类的功能,在计算出需要的结果数据后将结果数据输出到 Redis 中。
  • 通过 DataV 查询 Redis 中的数据并进行展示。在对接时会用到数据接口,需要提前封装好数据接口,在对接时直接在 DataV 中调用即可。

(2)本文样例主要实现 Flink 实时数据计算部分的核心代码,即对 Kafka 中的数据进行处理,并将计算出的结果数据输出到 Redis 中。前面的日志数据采集和后面的数据展示模块暂不实现。

2,需求描述

(1)“双十一”数据大屏中展现的指标和图表有近 20 个,这里主要计算两个核心指标。
  • 1 个指标:统计全站“双十一”当天的实时 GMV(总的成交金额)。
  • 2 个指标:统计实时销量 Top N 的商品品类。
(2)在此项目中需要用到的订单数据的格式如下:
  • 这是一个 JSON 格式的数据,表示是用户的一个订单数据信息,其中包含订单编号、订单总金额和订单明细。
  • 订单明细中存储了订单中商品的基本信息(包括商品编号、商品数量、商品单价、商品名称和商品品类)。
{
  "detal": [
    {
      "goodsCount": 1,
      "goodsName": "美的 606 升",
      "goodsNo": "6002",
      "goodsPrice": 100,
      "goodsType": "冰箱"
    },
    {
      "goodsCount": 2,
      "goodsName": "荣耀 30Pro",
      "goodsNo": "1005",
      "goodsPrice": 200,
      "goodsType": "手机"
    }
  ],
  "orderNo": "9fc3ffe3-3255-414e-8df8-1580cfbd2ff9",
  "totalPrice": 500
}

二、程序开发(Java 版)

1,添加依赖

我们创建一个 Maven 项目,然后在 pom.xml 文件中添加 FlinkRedisJSON 依赖。
<!-- Flink 依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.16.0</version>
</dependency>
<!-- Redis 依赖 -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>
<!-- JSON 依赖 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.68</version>
</dependency>

2,自定义 Sink 代码

提示:由于 Flink 内置的 RedisSink 组件无法满足需求,所以这里自定义了 RedisSink 组件。
(1)下面是用户输出实时总成交金额的自定义 Sink 代码:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

public class GmvRedisSink extends RichSinkFunction<Long> {
    private String host;
    private int port;
    private String password;
    private String key;
    private Jedis jedis = null;

    public GmvRedisSink(String host, int port, String password, String key) {
        this.host = host;
        this.port = port;
        this.password = password;
        this.key = key;
    }

    /**
     * 初始化方法,只执行一次
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        this.jedis = new Jedis(host, port);
        if (password!= null &&!password.isEmpty()) {
            jedis.auth(password);
        }
    }

    /**
     * 核心代码,来一条数据此方法会执行一次
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(Long value, Context context) throws Exception {
        //对 GMV 数据进行递增操作
        jedis.incrBy(key, value);
    }

    /**
     * 任务停止时会先调用此方法
     * 适合关闭资源链接
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        //关闭链接
        if (jedis != null) {
            jedis.close();
        }
    }
}

(2)下面是用户输出实时销量 Top N 的商品品类的自定义 Sink 代码:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

public class TopNRedisSink extends RichSinkFunction<Tuple2<String, Long>> {
    private String host;
    private int port;
    private String password;
    private String key;
    private Jedis jedis = null;

    public TopNRedisSink(String host, int port, String password, String key) {
        this.host = host;
        this.port = port;
        this.password = password;
        this.key = key;
    }

    /**
     * 初始化方法,只执行一次
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        this.jedis = new Jedis(host, port);
        if (password != null &&!password.isEmpty()) {
            jedis.auth(password);
        }
    }

    /**
     * 核心代码,来一条数据此方法会执行一次
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
        //给 sortedset 中的指定元素递增添加分值
        jedis.zincrby(key, value.f1, value.f0);
    }

    /**
     * 任务停止时会先调用此方法
     * 适合关闭资源链接
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        //关闭链接
        if (jedis != null) {
            jedis.close();
        }
    }
}

3,Flinl 核心程序代码

    代码具体流程是:
  • Kafka 中读取订单数据,使用 FlatMap 将订单中的商品信息提取并转换为 Tuple3
  • 过滤掉异常数据(商品数量小于等于 0)。
  • 对订单数据进行两项实时统计:GMV 统计和实时销量 Top N 的商品品类统计。
  • GMVTop N 的统计结果写入 Redis 中。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class StreamDataCalcJava {
    public static void main(String[] args) throws Exception {
        //获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //指定 KafkaSource 相关配置
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.121.128:9092")
                .setTopics("order_detail")
                .setGroupId("con")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        //指定 Kafka 作为 Source
        DataStreamSource<String> text = env.fromSource(kafkaSource,
                WatermarkStrategy.noWatermarks(), "Kafka Source");

        //解析订单数据,只保留需要用到的字段
        //goodsCount、goodsPrice、goodsType
        SingleOutputStreamOperator<Tuple3<Long, Long, String>> orderStream =
                text.flatMap(new FlatMapFunction<String, Tuple3<Long, Long, String>>() {
                    public void flatMap(String line, Collector<Tuple3<Long, Long, String>> out)
                            throws Exception {
                        JSONObject orderJson = JSON.parseObject(line);
                        //获取 JSON 数据中的商品明细
                        JSONArray orderDetail = orderJson.getJSONArray("detal");
                        for (int i = 0; i < orderDetail.size(); i++) {
                            JSONObject orderObj = orderDetail.getJSONObject(i);
                            long goodsCount = orderObj.getLongValue("goodsCount");
                            long goodsPrice = orderObj.getLongValue("goodsPrice");
                            String goodsType = orderObj.getString("goodsType");
                            out.collect(new Tuple3<Long, Long, String>(goodsCount, goodsPrice,
                                    goodsType));
                        }
                    }
                });

        //过滤异常数据
        SingleOutputStreamOperator<Tuple3<Long, Long, String>> filterStream =
                orderStream.filter(new FilterFunction<Tuple3<Long, Long, String>>() {
                    public boolean filter(Tuple3<Long, Long, String> tup) throws Exception {
                        //商品数量大于 0 的数据才是有效数据
                        return tup.f0 > 0;
                    }
                });
        //1.统计全站“双十一”当天的实时 GMV
        SingleOutputStreamOperator<Long> gmvStream = filterStream
                .map(new MapFunction<Tuple3<Long, Long, String>, Long>() {
                    public Long map(Tuple3<Long, Long, String> tup) throws Exception {
                        //计算单个商品的消费金额
                        return tup.f0 * tup.f1;
                    }
                });
        //将 GMV 数据保存到 Redis 中
        gmvStream.addSink(new GmvRedisSink("192.168.121.128", 6379, "123","gmv"));

        //2.统计实时销量 Top N 的商品品类
        SingleOutputStreamOperator<Tuple2<String, Long>> topNStream =
                filterStream.map(new MapFunction<Tuple3<Long, Long, String>, 
                        Tuple2<String, Long>>() {
                    //获取商品品类和购买的商品数量
                    public Tuple2<String, Long> map(Tuple3<Long, Long, String> tup)
                            throws Exception {
                        return new Tuple2<String, Long>(tup.f2, tup.f0);
                    }
                });
        //根据商品品类分组
        KeyedStream<Tuple2<String, Long>, String> keyStream = topNStream.keyBy(tuple -> tuple.f0);
        //设置时间窗口为 1 s
        WindowedStream<Tuple2<String, Long>, String, TimeWindow> windowStream = keyStream
                .window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
        //求和,指定 tuple 中的第 2 列,即商品数量
        SingleOutputStreamOperator<Tuple2<String, Long>> resStream = windowStream.sum(1);
        //将 goods_type 数据保存到 Redis 中
        resStream.addSink(new TopNRedisSink("192.168.121.128", 6379, "123","goods_type"));

        //执行任务
        env.execute("StreamDataCalcJava");
    }
}

三、运行测试

(1)首先我们创建好 Kafkatopic。然后运行我们编写的 Flink 程序。
kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 1 --topic order_detail

(2)接着创建一个该主题的消息生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic order_detail

(3)然后发送如下两条测试数据:
提示:由于我们使用 shell 命令进行发送,所以下面原始的 json 数据需要先压缩转换成一行提交,json 数据的压缩转换可以借助一些在线工具实现(点击访问
{
  "detal": [
    {
      "goodsCount": 1,
      "goodsName": "美的 606 升",
      "goodsNo": "6002",
      "goodsPrice": 100,
      "goodsType": "冰箱"
    },
    {
      "goodsCount": 2,
      "goodsName": "荣耀 30Pro",
      "goodsNo": "1005",
      "goodsPrice": 200,
      "goodsType": "手机"
    }
  ],
  "orderNo": "9fc3ffe3-3255-414e-8df8-1580cfbd2ff9",
  "totalPrice": 500
}
{
  "detal": [
    {
      "goodsCount": 2,
      "goodsName": "联想 Y50",
      "goodsNo": "8001",
      "goodsPrice": 300,
      "goodsType": "电脑"
    },
    {
      "goodsCount": 1,
      "goodsName": "华为 Mate60",
      "goodsNo": "1006",
      "goodsPrice": 200,
      "goodsType": "手机"
    }
  ],
  "orderNo": "414effe3-3255-0cfb-8df8-1580cfb255f9",
  "totalPrice": 800
}

(4)Flink 程序运行一段时间,并且成功计算了一批数据后,在 Redis 中可以看到相关的结果。
  • 下面是查看实时总成交金额:
get gmv

  • 下面是查看实时销量 Top N 的商品品类
zrevrange goods_type 0 -1 withscores

附一:Scala 版程序代码 

1,自定义 Sink 代码

提示:由于 Flink 内置的 RedisSink 组件无法满足需求,所以这里自定义了 RedisSink 组件。
(1)下面是用户输出实时总成交金额的自定义 Sink 代码:
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import redis.clients.jedis.Jedis

class GmvRedisSink extends RichSinkFunction[Long] {
  var host: String = _
  var port: Int = _
  var key: String = _
  var password: String = _
  var jedis: Jedis = _

  /**
   * 构造函数
   *
   * @param host
   * @param port
   * @param key
   * @param password
   */
  def this(host: String, port: Int, password: String, key: String) {
    this()
    this.host = host
    this.port = port
    this.password = password
    this.key = key
  }

  /**
   * 初始化方法,只执行一次
   * 适合初始化资源链接
   *
   * @param parameters
   */
  override def open(parameters: Configuration): Unit = {
    this.jedis = new Jedis(host, port)
    // 如果设置了密码,进行认证
    if (this.password != null && this.password.nonEmpty) {
      this.jedis.auth(this.password)
    }
  }

  /**
   * 核心代码,来一条数据此方法会执行一次
   *
   * @param value
   * @param context
   */
  override def invoke(in: Long): Unit = {
    jedis.incrBy(key, in)
  }

  /**
   * 任务停止时会先调用此方法
   * 适合关闭资源链接
   */
  override def close(): Unit = {
    //关闭链接
    if (jedis != null) {
      jedis.close()
    }
  }
}

(2)下面是用户输出实时销量 Top N 的商品品类的自定义 Sink 代码:
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import redis.clients.jedis.Jedis

class TopNRedisSink extends RichSinkFunction[(String, Long)] {
  var host: String = _
  var port: Int = _
  var key: String = _
  var password: String = _
  var jedis: Jedis = _

  /**
   * 构造函数
   *
   * @param host
   * @param port
   * @param key
   * @param password
   */
  def this(host: String, port: Int, password: String, key: String) {
    this()
    this.host = host
    this.port = port
    this.password = password
    this.key = key
  }

  /**
   * 初始化方法,只执行一次
   * 适合初始化资源链接
   *
   * @param parameters
   */
  override def open(parameters: Configuration): Unit = {
    this.jedis = new Jedis(host, port)
    // 如果设置了密码,进行认证
    if (this.password != null && this.password.nonEmpty) {
      this.jedis.auth(password)
    }
  }

  /**
   * 核心代码,来一条数据此方法会执行一次
   *
   * @param value
   * @param context
   */
  override def invoke(in: (String, Long)): Unit = {
    jedis.zincrby(key, in._2, in._1)
  }

  /**
   * 任务停止时会先调用此方法
   * 适合关闭资源链接
   */
  override def close(): Unit = {
    //关闭链接
    if (jedis != null) {
      jedis.close()
    }
  }
}

2,Flinl 核心程序代码

    代码具体流程是:
  • Kafka 中读取订单数据,使用 FlatMap 将订单中的商品信息提取并转换为 Tuple3
  • 过滤掉异常数据(商品数量小于等于 0)。
  • 对订单数据进行两项实时统计:GMV 统计和实时销量 Top N 的商品品类统计。
  • GMVTop N 的统计结果写入 Redis 中。
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object StreamDataCalcScala {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    // 指定 KafkaSource 相关配置
    val kafkaSource = KafkaSource.builder[String]
      .setBootstrapServers("192.168.121.128:9092")
      .setTopics("order_detail")
      .setGroupId("con")
      .setStartingOffsets(OffsetsInitializer.earliest)
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build
    //指定 Kafka 作为 Source
    val text = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")

    //解析订单数据,将数据打平,只保留需要用到的字段
    //goodsCount、goodsPrice、goodsType
    val orderStream = text.flatMap(line=>{
      val orderJson = JSON.parseObject(line)
      val orderDetal = orderJson.getJSONArray("detal")
      val res = new Array[(Long,Long,String)](orderDetal.size())
      for(i <- 0 until orderDetal.size()){
        val orderObj = orderDetal.getJSONObject(i)
        val goodsCount = orderObj.getLongValue("goodsCount")
        val goodsPrice = orderObj.getLongValue("goodsPrice")
        val goodsType = orderObj.getString("goodsType")
        res(i) = (goodsCount,goodsPrice,goodsType)
      }
      res
    })
    //过滤异常数据
    val filterStreram = orderStream.filter(_._1 > 0)

    //1. 统计全站“双十一”当天的实时 GMV
    val gmvStream = filterStreram.map(tup=>tup._1 * tup._2) //计算单个商品的消费金额
    //将 GMV 数据保存到 Redis 中
    gmvStream.addSink(new GmvRedisSink("192.168.121.128",6379,"123","gmv"))

    //2.统计实时销量 Top N 的商品品类
    val topNStream = filterStreram.map(tup=>(tup._3,tup._1)) //获取商品品类和购买的商品数量
      .keyBy(tup=>tup._1) //根据商品品类分组
      .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) //设置时间窗口为 1 s
      .sum(1) //指定 tuple 中的第 2 列,即商品数量
    //将 goods_type 数据保存到 Redis 中
    topNStream.addSink(new TopNRedisSink("192.168.121.128",6379,"123","goods_type"))

    //执行任务
    env.execute("StreamDataCalcScala")
  }
}

附二:自动提交 offset,防止 Kafka 消息重复消费

1,问题描述

    上面样例程序我们运行一段时间后,重新启动时会发现程序又会从 Kafka 的第一条消息开始重新消费一遍,这样就造成 Redis 里面数据重复累计。

2,解决办法

    要让程序重启后从上一次位置继续消费,我们将 KafkaSource 配置部分做如下修改,启用自动提交消费位移(offset)的功能,并将消费者的起始位移被设置为已提交的最早位移。
注意OffsetResetStrategy.EARLIEST 表示如果没有先前的消费位移(例如,新的消费者组),则从最早的可用消息开始消费。这确保了即使是新的消费者组,也能从消息队列的开头开始消费。
// 指定 KafkaSource 相关配置
val kafkaSource = KafkaSource.builder[String]
  .setBootstrapServers("192.168.121.128:9092")
  .setTopics("order_detail")
  .setGroupId("con")
  .setProperty("enable.auto.commit","true") // 启用自动提交消费位移(offset)的功能
  .setProperty("auto.commit.interval.ms","1000") // 设置自动提交消费位移的时间间隔为1秒
  // 消费者的起始位移被设置为已提交的最早位移
  .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
  //.setStartingOffsets(OffsetsInitializer.earliest)
  .setValueOnlyDeserializer(new SimpleStringSchema())
  .build
//指定 Kafka 作为 Source
val text = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
评论0