Flink - State状态详解4(KeyedState样例3:ListState实现订单数据补全)
三、KeyedState 样例 3:使用 ListState 实现订单数据补全(双流 Join)
1,需求说明
大致需求是这样的,某外卖平台需要开发一个实时订单消息推送功能,当用户下单,并且成功支付后向商家推送一条消息。
2,实现逻辑
(1)由于下单数据是一个数据流,支付数据是另外一个数据流。
订单数据流:{"pid":"1001","pname":"n1"} 支付数据流:{"pid":"1001","pstatus":"success"}
(2)这个时候就需要对两个数据流进行关联了,当同一个订单相关的数据都到齐之后向外推送消息。针对这个需求,我们计划使用 ListState 实现。
(3)由于数据格式是 JSON 格式的,所以我们还需要引入 fastjson 依赖:
<!-- JSON 依赖 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency>
3,样例代码
(1)下面是 Scala 语言代码:
import com.alibaba.fastjson.JSON import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector object KeyedState_OrderDataDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val orderText = env.socketTextStream("192.168.121.128", 9998) val payText = env.socketTextStream("192.168.121.128", 9999) import org.apache.flink.api.scala._ //解析订单数据流 val orderTupleData = orderText.map(line => { val orderJsonObj = JSON.parseObject(line) val pid = orderJsonObj.getString("pid") val pname = orderJsonObj.getString("pname") (pid, pname) }) //解析支付数据流 val payTupleData = payText.map(line => { val payJsonObj = JSON.parseObject(line) val pid = payJsonObj.getString("pid") val pstatus = payJsonObj.getString("pstatus") (pid, pstatus) }) //针对两个流进行分组+connect连接(也可以先对两个流分别调用keyBy,再调用connect,效果一样) orderTupleData.connect(payTupleData) .keyBy("_1","_1")//field1表示第1个流里面的分组字段,field2表示第2个流里面的分组字段 .process(new KeyedCoProcessFunction[String,(String,String),(String,String) ,(String,String,String)] { //声明两个ListState类型的状态变量,分别存储订单数据流和支付数据流 /** * 注意:针对这个业务需求,pid在一个数据流中是不会重复的,其实使用ValueState也是可以的, * 因为在这里已经通过keyBy基于pid对数据分组了,所以只需要在状态中存储pname或者pstatus即可。 * 但如果pid数据在一个数据流里会重复,那么就必须要使用ListState,这样才能存储指定pid多条数据 */ private var orderDataState: ListState[(String,String)] = _ private var payDataState: ListState[(String,String)] = _ override def open(parameters: Configuration): Unit = { //注册状态 val orderListStateDesc = new ListStateDescriptor[(String,String)]( "orderDataState", classOf[(String, String)] ) val payListStateDesc = new ListStateDescriptor[(String,String)]( "payDataState", classOf[(String, String)] ) orderDataState = getRuntimeContext.getListState(orderListStateDesc) payDataState = getRuntimeContext.getListState(payListStateDesc) } //处理订单数据流 override def processElement1(orderTup: (String, String), ctx: KeyedCoProcessFunction[String, (String, String), (String, String), (String, String, String)]#Context, out: Collector[(String, String, String)]): Unit = { //获取当前pid对应的支付数据流,关联之后输出数据,(可能是支付数据先到) payDataState.get().forEach(payTup=>{ out.collect((orderTup._1,orderTup._2,payTup._2)) }) //将本次接收到的订单数据添加到状态中,便于和支付数据流中的数据关联 orderDataState.add(orderTup) } //处理支付数据流 override def processElement2(payTup: (String, String), ctx: KeyedCoProcessFunction[String, (String, String), (String, String), (String, String, String)]#Context, out: Collector[(String, String, String)]): Unit = { //获取当前pid对应的订单数据流,关联之后输出数据,(可能是订单数据先到) orderDataState.get().forEach(orderTup=>{ out.collect((orderTup._1,orderTup._2,payTup._2)) }) //将本次接收到的订单数据添加到状态中,便于和订单数据流中的数据关联 payDataState.add(payTup) } }).print() env.execute("KeyedState_OrderDataDemo") } }
(2)下面是使用 Java 语言实现同样功能:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.util.Collector; public class KeyedStateOrderDataDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 读取订单和支付数据流 DataStream<String> orderText = env.socketTextStream("192.168.121.128", 9998); DataStream<String> payText = env.socketTextStream("192.168.121.128", 9999); // 解析订单数据流 DataStream<Tuple2<String, String>> orderTupleData = orderText .map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String s) throws Exception { JSONObject orderJsonObj = JSON.parseObject(s); String pid = orderJsonObj.getString("pid"); String pname = orderJsonObj.getString("pname"); return new Tuple2<>(pid, pname); } }); // 解析支付数据流 DataStream<Tuple2<String, String>> payTupleData = payText .map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String s) throws Exception { JSONObject payJsonObj = JSON.parseObject(s); String pid = payJsonObj.getString("pid"); String pstatus = payJsonObj.getString("pstatus"); return new Tuple2<>(pid, pstatus); } }); // 连接两个流并处理 orderTupleData.connect(payTupleData) .keyBy(order -> order.f0, pay -> pay.f0) // 按 pid 分组 .process(new KeyedCoProcessFunction<String, Tuple2<String, String>, Tuple2<String, String>, Tuple3<String, String, String>>() { private transient ListState<Tuple2<String, String>> orderDataState; private transient ListState<Tuple2<String, String>> payDataState; @Override public void open(Configuration parameters) { // 注册状态 ListStateDescriptor<Tuple2<String, String>> orderListStateDesc = new ListStateDescriptor<>("orderDataState", Types.TUPLE(Types.STRING, Types.STRING)); ListStateDescriptor<Tuple2<String, String>> payListStateDesc = new ListStateDescriptor<>("payDataState", Types.TUPLE(Types.STRING, Types.STRING)); orderDataState = getRuntimeContext().getListState(orderListStateDesc); payDataState = getRuntimeContext().getListState(payListStateDesc); } @Override public void processElement1(Tuple2<String, String> orderTup, Context ctx, Collector<Tuple3<String, String, String>> out) throws Exception { // 处理订单数据流 for (Tuple2<String, String> payTup : payDataState.get()) { out.collect(new Tuple3<>(orderTup.f0, orderTup.f1, payTup.f1)); } orderDataState.add(orderTup); } @Override public void processElement2(Tuple2<String, String> payTup, Context ctx, Collector<Tuple3<String, String, String>> out) throws Exception { // 处理支付数据流 for (Tuple2<String, String> orderTup : orderDataState.get()) { out.collect(new Tuple3<>(orderTup.f0, orderTup.f1, payTup.f1)); } payDataState.add(payTup); } }).print(); env.execute("KeyedState_OrderDataDemo"); } }
4,运行测试
(1)我们首先打开两个终端,分别运行如下命令来启动两个 TCP socket:
nc -lk 9998 nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后:
- 在 socket 9998 中输入如下内容模拟产生订单数据:
{"pid":"1001","pname":"n1"}
- 在 socket 9999 中输入如下内容模拟产生支付数据:
{"pid":"1003","pstatus":"success"}
- 由于此时两条数据不是同一个订单的,所以程序没有任何输出。
{"pid":"1003","pname":"n3"}
- 此时可以看到控制台输出一条数据:

(4)接下来继续在 socket 99999 中模拟产生支付数据,实现了订单数据流和支付数据流的数据关联。
{"pid":"1001","pstatus":"success"}
- 此时可以看到控制台输出一条数据:

附:样例存在问题和解决办法
1,存在的问题
(1)这个案例中两个数据流的数据会一直存储在状态中,随着时间的增长,状态会越来越多,状态如果是存储在内存中的话,内存可能就扛不住了,最终导致内存溢出。
(2)例如上面测试结束后,我们继续不断输入相同的支付数据:
{"pid":"1001","pstatus":"success"} {"pid":"1001","pstatus":"success"} {"pid":"1001","pstatus":"success"} {"pid":"1001","pstatus":"success"}
- 可以看到控制台输出内容如下,说明两个数据流的数据是一直存在状态中,无论是否关联上。

2,两种解决方案
(1)一种方案是我们自己基于业务层面从状态中清理掉不用的数据,例如两份数据 join 到一起之后,就删除状态中的数据,这样可以保证状态中的数据不会一直无限递增。
(2)另一种方案是状态设置一个失效机制,官方提供的有一个 TTL 机制,可以给状态设置一个生存时间 ,过期自动删除,这个 TTL 机制我们后面再具体分析。