当前位置: > > > Flink - State状态详解3(KeyedState样例2:MapState实现直播间数据统计)

Flink - State状态详解3(KeyedState样例2:MapState实现直播间数据统计)

二、KeyedState 样例 1:MapState 实现直播间数据统计

1,需求说明

大致需求是这样的,需要统计平台中每个主播在直播间内收到的礼物信息、点赞、关注等指标,以直播间为单位进行统计。

2,实现逻辑

(1)由于用户每次开播都会生成一个新的直播间 vid,并且我们需要基于这个直播间 vid 统计它里面的数据指标,这些数据指标包含多个维度,例如:礼物对应的数量、点赞对应的数据量、关注对应的数量。这种数据就适合使用 MapState 进行存储了,每一个直播间的数据指标存储到一个 MapState 中。
送礼数据:{"type":"gift","uid":"1001","vid":"29901","value":"100"}
关注数据:{"type":"follow","uid":"1001","vid":"29901"}
点赞数据:{"type":"like","uid":"1001","vid":"29901"}

(2)由于数据格式是 JSON 格式的,所以我们还需要引入 fastjson 依赖:
<!-- JSON 依赖 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.68</version>
</dependency>

3,样例代码

(1)下面是 Scala 语言代码:
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

object KeyedState_VideoDataDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.socketTextStream("192.168.121.128", 9999)

    import org.apache.flink.api.scala._
    text.map(line=>{
        val videoJsonData = JSON.parseObject(line)
        val vid = videoJsonData.getString("vid")
        val videoType = videoJsonData.getString("type")
        //也可以使用if语句实现
        videoType match {
          case "gift" => {
            val value = videoJsonData.getIntValue("value")
            (vid,videoType,value)
          }
          case _ => (vid,videoType,1)
        }
      }).keyBy(_._1)//注意:后面也可以使用flatmap算子,在这里换一种形式,使用低级API process
      .process(new KeyedProcessFunction[String,(String,String,Int),(String,String,Int)] {
        //声明一个MapState类型的状态变量,存储用户的直播间数据指标
        //MapState中key的值为gift\follow\like,value的值为累加后的结果
        private var videoDataState: MapState[String,Int] = _

        override def open(parameters: Configuration): Unit = {
          //注册状态
          val mapStateDesc = new MapStateDescriptor[String,Int](
            "videoDataState",//指定状态名称
            classOf[String],//指定key的类型
            classOf[Int]//指定value的类型
          )
          videoDataState = getRuntimeContext.getMapState(mapStateDesc)
        }

        override def processElement(value: (String, String, Int),
                                    ctx: KeyedProcessFunction[String, (String, String, Int),
                                      (String, String, Int)]#Context,
                                    out: Collector[(String, String, Int)]): Unit = {
          val videoType = value._2
          var num = value._3
          //判断状态中是否有这个数据
          if(videoDataState.contains(videoType)){
            num += videoDataState.get(videoType)
          }
          //更新状态
          videoDataState.put(videoType,num)
          out.collect((value._1,videoType,num))
        }
      }).print()

    env.execute("KeyedState_VideoDataDemo")
  }
}

(2)下面是使用 Java 语言实现同样功能:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class KeyedStateVideoDataDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取 socket 数据流
        DataStream<String> text = env.socketTextStream("192.168.121.128", 9999);

        text.map(new MapFunction<String, Tuple3<String, String, Integer>>() {
                    @Override
                    public Tuple3<String, String, Integer> map(String s) throws Exception {
                        JSONObject videoJsonData = JSON.parseObject(s);
                        String vid = videoJsonData.getString("vid");
                        String videoType = videoJsonData.getString("type");

                        // 根据 videoType 判断逻辑
                        if ("gift".equals(videoType)) {
                            int value = videoJsonData.getIntValue("value");
                            return new Tuple3<>(vid, videoType, value);
                        } else {
                            return new Tuple3<>(vid, videoType, 1);
                        }
                    }
                })
                .keyBy(tuple -> tuple.f0) // 按 vid 进行 keyBy
                .process(new KeyedProcessFunction<String, Tuple3<String, String, Integer>,
                        Tuple3<String, String, Integer>>() {
                    private transient MapState<String, Integer> videoDataState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 注册状态
                        MapStateDescriptor<String, Integer> mapStateDesc =
                                new MapStateDescriptor<>(
                                        "videoDataState", // 状态名称
                                        String.class,     // key 的类型
                                        Integer.class     // value 的类型
                                );
                        videoDataState = getRuntimeContext().getMapState(mapStateDesc);
                    }

                    @Override
                    public void processElement(Tuple3<String, String, Integer> value,
                                               Context ctx,
                                               Collector<Tuple3<String, String, Integer>> out)
                            throws Exception {
                        String videoType = value.f1;
                        int num = value.f2;

                        // 判断状态中是否存在此数据
                        if (videoDataState.contains(videoType)) {
                            num += videoDataState.get(videoType);
                        }
                        // 更新状态
                        videoDataState.put(videoType, num);
                        out.collect(new Tuple3<>(value.f0, videoType, num));
                    }
                }).print();

        env.execute("KeyedState_VideoDataDemo");
    }
}

4,运行测试

(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
{"type":"gift","uid":"1001","vid":"29901","value":"100"}
{"type":"follow","uid":"1001","vid":"29901"}
{"type":"like","uid":"1001","vid":"29902"}
{"type":"like","uid":"1001","vid":"29901"}
{"type":"gift","uid":"1001","vid":"29901","value":"100"}
{"type":"like","uid":"1001","vid":"29904"}

(3)可以看到控制台输出如下内容:
评论0