当前位置: > > > Flink - State状态详解4(KeyedState样例3:ListState实现订单数据补全)

Flink - State状态详解4(KeyedState样例3:ListState实现订单数据补全)

三、KeyedState 样例 3:使用 ListState 实现订单数据补全(双流 Join)

1,需求说明

大致需求是这样的,某外卖平台需要开发一个实时订单消息推送功能,当用户下单,并且成功支付后向商家推送一条消息。

2,实现逻辑

(1)由于下单数据是一个数据流,支付数据是另外一个数据流。
订单数据流:{"pid":"1001","pname":"n1"}
支付数据流:{"pid":"1001","pstatus":"success"}

(2)这个时候就需要对两个数据流进行关联了,当同一个订单相关的数据都到齐之后向外推送消息。针对这个需求,我们计划使用 ListState 实现。

(3)由于数据格式是 JSON 格式的,所以我们还需要引入 fastjson 依赖:
<!-- JSON 依赖 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.68</version>
</dependency>

3,样例代码

(1)下面是 Scala 语言代码:
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

object KeyedState_OrderDataDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val orderText = env.socketTextStream("192.168.121.128", 9998)
    val payText = env.socketTextStream("192.168.121.128", 9999)

    import org.apache.flink.api.scala._
    //解析订单数据流
    val orderTupleData = orderText.map(line => {
      val orderJsonObj = JSON.parseObject(line)
      val pid = orderJsonObj.getString("pid")
      val pname = orderJsonObj.getString("pname")
      (pid, pname)
    })

    //解析支付数据流
    val payTupleData = payText.map(line => {
      val payJsonObj = JSON.parseObject(line)
      val pid = payJsonObj.getString("pid")
      val pstatus = payJsonObj.getString("pstatus")
      (pid, pstatus)
    })

    //针对两个流进行分组+connect连接(也可以先对两个流分别调用keyBy,再调用connect,效果一样)
    orderTupleData.connect(payTupleData)
      .keyBy("_1","_1")//field1表示第1个流里面的分组字段,field2表示第2个流里面的分组字段
      .process(new KeyedCoProcessFunction[String,(String,String),(String,String)
        ,(String,String,String)] {
        //声明两个ListState类型的状态变量,分别存储订单数据流和支付数据流
        /**
         * 注意:针对这个业务需求,pid在一个数据流中是不会重复的,其实使用ValueState也是可以的,
         * 因为在这里已经通过keyBy基于pid对数据分组了,所以只需要在状态中存储pname或者pstatus即可。
         * 但如果pid数据在一个数据流里会重复,那么就必须要使用ListState,这样才能存储指定pid多条数据
         */
        private var orderDataState: ListState[(String,String)] = _
        private var payDataState: ListState[(String,String)] = _

        override def open(parameters: Configuration): Unit = {
          //注册状态
          val orderListStateDesc = new ListStateDescriptor[(String,String)](
            "orderDataState",
            classOf[(String, String)]
          )
          val payListStateDesc = new ListStateDescriptor[(String,String)](
            "payDataState",
            classOf[(String, String)]
          )
          orderDataState = getRuntimeContext.getListState(orderListStateDesc)
          payDataState = getRuntimeContext.getListState(payListStateDesc)
        }

        //处理订单数据流
        override def processElement1(orderTup: (String, String),
                                     ctx: KeyedCoProcessFunction[String, (String, String),
                                       (String, String), (String, String, String)]#Context,
                                     out: Collector[(String, String, String)]): Unit = {
          //获取当前pid对应的支付数据流,关联之后输出数据,(可能是支付数据先到)
          payDataState.get().forEach(payTup=>{
            out.collect((orderTup._1,orderTup._2,payTup._2))
          })
          //将本次接收到的订单数据添加到状态中,便于和支付数据流中的数据关联
          orderDataState.add(orderTup)
        }
        //处理支付数据流
        override def processElement2(payTup: (String, String),
                                     ctx: KeyedCoProcessFunction[String, (String, String),
                                       (String, String), (String, String, String)]#Context,
                                     out: Collector[(String, String, String)]): Unit = {
          //获取当前pid对应的订单数据流,关联之后输出数据,(可能是订单数据先到)
          orderDataState.get().forEach(orderTup=>{
            out.collect((orderTup._1,orderTup._2,payTup._2))
          })
          //将本次接收到的订单数据添加到状态中,便于和订单数据流中的数据关联
          payDataState.add(payTup)
        }
      }).print()

    env.execute("KeyedState_OrderDataDemo")
  }
}

(2)下面是使用 Java 语言实现同样功能:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;

public class KeyedStateOrderDataDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取订单和支付数据流
        DataStream<String> orderText = env.socketTextStream("192.168.121.128", 9998);
        DataStream<String> payText = env.socketTextStream("192.168.121.128", 9999);

        // 解析订单数据流
        DataStream<Tuple2<String, String>> orderTupleData = orderText
                .map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String s) throws Exception {
                JSONObject orderJsonObj = JSON.parseObject(s);
                String pid = orderJsonObj.getString("pid");
                String pname = orderJsonObj.getString("pname");
                return new Tuple2<>(pid, pname);
            }
        });

        // 解析支付数据流
        DataStream<Tuple2<String, String>> payTupleData = payText
                .map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String s) throws Exception {
                JSONObject payJsonObj = JSON.parseObject(s);
                String pid = payJsonObj.getString("pid");
                String pstatus = payJsonObj.getString("pstatus");
                return new Tuple2<>(pid, pstatus);
            }
        });

        // 连接两个流并处理
        orderTupleData.connect(payTupleData)
                .keyBy(order -> order.f0, pay -> pay.f0) // 按 pid 分组
                .process(new KeyedCoProcessFunction<String, Tuple2<String, String>,
                        Tuple2<String, String>, Tuple3<String, String, String>>() {

                    private transient ListState<Tuple2<String, String>> orderDataState;
                    private transient ListState<Tuple2<String, String>> payDataState;

                    @Override
                    public void open(Configuration parameters) {
                        // 注册状态
                        ListStateDescriptor<Tuple2<String, String>> orderListStateDesc =
                                new ListStateDescriptor<>("orderDataState",
                                        Types.TUPLE(Types.STRING, Types.STRING));
                        ListStateDescriptor<Tuple2<String, String>> payListStateDesc =
                                new ListStateDescriptor<>("payDataState",
                                        Types.TUPLE(Types.STRING, Types.STRING));

                        orderDataState = getRuntimeContext().getListState(orderListStateDesc);
                        payDataState = getRuntimeContext().getListState(payListStateDesc);
                    }

                    @Override
                    public void processElement1(Tuple2<String, String> orderTup,
                                                Context ctx,
                                                Collector<Tuple3<String, String, String>> out)
                            throws Exception {
                        // 处理订单数据流
                        for (Tuple2<String, String> payTup : payDataState.get()) {
                            out.collect(new Tuple3<>(orderTup.f0, orderTup.f1, payTup.f1));
                        }
                        orderDataState.add(orderTup);
                    }

                    @Override
                    public void processElement2(Tuple2<String, String> payTup,
                                                Context ctx,
                                                Collector<Tuple3<String, String, String>> out)
                            throws Exception {
                        // 处理支付数据流
                        for (Tuple2<String, String> orderTup : orderDataState.get()) {
                            out.collect(new Tuple3<>(orderTup.f0, orderTup.f1, payTup.f1));
                        }
                        payDataState.add(payTup);
                    }
                }).print();

        env.execute("KeyedState_OrderDataDemo");
    }
}

4,运行测试

(1)我们首先打开两个终端,分别运行如下命令来启动两个 TCP socket
nc -lk 9998
nc -lk 9999

(2)然后运行 Flink 程序。待程序启动后:
  • socket 9998 中输入如下内容模拟产生订单数据:
{"pid":"1001","pname":"n1"}
  • socket 9999 中输入如下内容模拟产生支付数据:
{"pid":"1003","pstatus":"success"}
  • 由于此时两条数据不是同一个订单的,所以程序没有任何输出。

(3)接下来继续在 socket 9998 中模拟产生订单数据,实现了订单数据流和支付数据流的数据关联。
{"pid":"1003","pname":"n3"}
  • 此时可以看到控制台输出一条数据:

(4)接下来继续在 socket 99999 中模拟产生支付数据,实现了订单数据流和支付数据流的数据关联。
{"pid":"1001","pstatus":"success"}
  • 此时可以看到控制台输出一条数据:

附:样例存在问题和解决办法

1,存在的问题

(1)这个案例中两个数据流的数据会一直存储在状态中,随着时间的增长,状态会越来越多,状态如果是存储在内存中的话,内存可能就扛不住了,最终导致内存溢出。

(2)例如上面测试结束后,我们继续不断输入相同的支付数据:
{"pid":"1001","pstatus":"success"}
{"pid":"1001","pstatus":"success"}
{"pid":"1001","pstatus":"success"}
{"pid":"1001","pstatus":"success"}
  • 可以看到控制台输出内容如下,说明两个数据流的数据是一直存在状态中,无论是否关联上。

2,两种解决方案

(1)一种方案是我们自己基于业务层面从状态中清理掉不用的数据,例如两份数据 join 到一起之后,就删除状态中的数据,这样可以保证状态中的数据不会一直无限递增。

(2)另一种方案是状态设置一个失效机制,官方提供的有一个 TTL 机制,可以给状态设置一个生存时间 ,过期自动删除,这个 TTL 机制我们后面再具体分析。
评论0