Flink - State状态详解3(KeyedState样例2:MapState实现直播间数据统计)
二、KeyedState 样例 1:MapState 实现直播间数据统计
1,需求说明
大致需求是这样的,需要统计平台中每个主播在直播间内收到的礼物信息、点赞、关注等指标,以直播间为单位进行统计。
2,实现逻辑
(1)由于用户每次开播都会生成一个新的直播间 vid,并且我们需要基于这个直播间 vid 统计它里面的数据指标,这些数据指标包含多个维度,例如:礼物对应的数量、点赞对应的数据量、关注对应的数量。这种数据就适合使用 MapState 进行存储了,每一个直播间的数据指标存储到一个 MapState 中。
送礼数据:{"type":"gift","uid":"1001","vid":"29901","value":"100"} 关注数据:{"type":"follow","uid":"1001","vid":"29901"} 点赞数据:{"type":"like","uid":"1001","vid":"29901"}
(2)由于数据格式是 JSON 格式的,所以我们还需要引入 fastjson 依赖:
<!-- JSON 依赖 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency>
3,样例代码
(1)下面是 Scala 语言代码:
import com.alibaba.fastjson.JSON import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector object KeyedState_VideoDataDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("192.168.121.128", 9999) import org.apache.flink.api.scala._ text.map(line=>{ val videoJsonData = JSON.parseObject(line) val vid = videoJsonData.getString("vid") val videoType = videoJsonData.getString("type") //也可以使用if语句实现 videoType match { case "gift" => { val value = videoJsonData.getIntValue("value") (vid,videoType,value) } case _ => (vid,videoType,1) } }).keyBy(_._1)//注意:后面也可以使用flatmap算子,在这里换一种形式,使用低级API process .process(new KeyedProcessFunction[String,(String,String,Int),(String,String,Int)] { //声明一个MapState类型的状态变量,存储用户的直播间数据指标 //MapState中key的值为gift\follow\like,value的值为累加后的结果 private var videoDataState: MapState[String,Int] = _ override def open(parameters: Configuration): Unit = { //注册状态 val mapStateDesc = new MapStateDescriptor[String,Int]( "videoDataState",//指定状态名称 classOf[String],//指定key的类型 classOf[Int]//指定value的类型 ) videoDataState = getRuntimeContext.getMapState(mapStateDesc) } override def processElement(value: (String, String, Int), ctx: KeyedProcessFunction[String, (String, String, Int), (String, String, Int)]#Context, out: Collector[(String, String, Int)]): Unit = { val videoType = value._2 var num = value._3 //判断状态中是否有这个数据 if(videoDataState.contains(videoType)){ num += videoDataState.get(videoType) } //更新状态 videoDataState.put(videoType,num) out.collect((value._1,videoType,num)) } }).print() env.execute("KeyedState_VideoDataDemo") } }
(2)下面是使用 Java 语言实现同样功能:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; public class KeyedStateVideoDataDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 读取 socket 数据流 DataStream<String> text = env.socketTextStream("192.168.121.128", 9999); text.map(new MapFunction<String, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(String s) throws Exception { JSONObject videoJsonData = JSON.parseObject(s); String vid = videoJsonData.getString("vid"); String videoType = videoJsonData.getString("type"); // 根据 videoType 判断逻辑 if ("gift".equals(videoType)) { int value = videoJsonData.getIntValue("value"); return new Tuple3<>(vid, videoType, value); } else { return new Tuple3<>(vid, videoType, 1); } } }) .keyBy(tuple -> tuple.f0) // 按 vid 进行 keyBy .process(new KeyedProcessFunction<String, Tuple3<String, String, Integer>, Tuple3<String, String, Integer>>() { private transient MapState<String, Integer> videoDataState; @Override public void open(Configuration parameters) throws Exception { // 注册状态 MapStateDescriptor<String, Integer> mapStateDesc = new MapStateDescriptor<>( "videoDataState", // 状态名称 String.class, // key 的类型 Integer.class // value 的类型 ); videoDataState = getRuntimeContext().getMapState(mapStateDesc); } @Override public void processElement(Tuple3<String, String, Integer> value, Context ctx, Collector<Tuple3<String, String, Integer>> out) throws Exception { String videoType = value.f1; int num = value.f2; // 判断状态中是否存在此数据 if (videoDataState.contains(videoType)) { num += videoDataState.get(videoType); } // 更新状态 videoDataState.put(videoType, num); out.collect(new Tuple3<>(value.f0, videoType, num)); } }).print(); env.execute("KeyedState_VideoDataDemo"); } }
4,运行测试
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
{"type":"gift","uid":"1001","vid":"29901","value":"100"} {"type":"follow","uid":"1001","vid":"29901"} {"type":"like","uid":"1001","vid":"29902"} {"type":"like","uid":"1001","vid":"29901"} {"type":"gift","uid":"1001","vid":"29901","value":"100"} {"type":"like","uid":"1001","vid":"29904"}
(3)可以看到控制台输出如下内容:
