当前位置: > > > Flink - State状态详解2(KeyedState样例1:ValueState实现温度告警)

Flink - State状态详解2(KeyedState样例1:ValueState实现温度告警)

一、KeyedState 样例 1:使用 ValueState 实现温度告警

1,需求说明

    大致需求是这样的,某机房内的多个设备会实时上报温度信息,在 Flink 任务内部需要对设备最近两次的温度进行对比,如果温差超过了 20 度,则需要发送告警信息,说明设备出问题了。

2,实现逻辑

    在这里我们可以把设备的唯一标识 ID 字段作为 keyby 分组的 key,这样的话在 Flink 内部就只需要维护设备的温度即可,温度值是一个数字,数字属于一个普通的单值,所以可以考虑使用 ValueState

3,样例代码

(1)下面是 Scala 语言代码:
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

object KeyedState_AlarmDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置任务全局并行度为8
    env.setParallelism(8)

    //数据格式为 设备ID,温度
    val text = env.socketTextStream("192.168.121.128", 9999)

    import org.apache.flink.api.scala._
    text.map(line=>{
        val tup = line.split(",")
        (tup(0),tup(1).toInt)
      }).keyBy(_._1)
      .flatMap(new RichFlatMapFunction[(String,Int),String] {
        //声明一个ValueState类型的状态变量,存储设备上一次收到的温度数据
        private var lastDataState: ValueState[Int] = _

        /**
         * 任务初始化的时候这个方法执行一次
         * @param parameters
         */
        override def open(parameters: Configuration): Unit = {
          //注册状态
          val valueStateDesc = new ValueStateDescriptor[Int](
            "lastDataState",//指定状态名称
            classOf[Int]//指定状态中存储的数据类型
          )
          lastDataState = getRuntimeContext.getState(valueStateDesc)
        }

        override def flatMap(value: (String, Int), out: Collector[String]): Unit = {
          println("线程ID:"+Thread.currentThread().getId+",接收到数据:"+value)
          //打印当前的线程ID和接收到的数据
          //初始化
          if(lastDataState.value() == null){
            lastDataState.update(value._2)
            println("lastDataState is null")//打印初始的状态为null
          }
          println("lastDataState is "+ lastDataState.value())//打印上次的状态值
          //获取上次温度
          val tmpLastData = lastDataState.value()
          //如果某个设备的最近两次温差超过20度,则告警
          if(Math.abs(value._2 - tmpLastData) >= 20){
            out.collect(value._1+"_温度异常")
          }
          //更新状态
          lastDataState.update(value._2)
        }
      }).print()

    env.execute("KeyedState_AlarmDemo")
  }
}

(2)下面是使用 Java 语言实现同样功能:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.util.Collector;

public class KeyedStateAlarmDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置任务全局并行度为8
        env.setParallelism(8);

        // 数据格式为 设备ID,温度
        DataStream<String> text = env.socketTextStream("192.168.121.128", 9999);

        SingleOutputStreamOperator<String> result = text
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String s) throws Exception {
                        String[] tup = s.split(",");
                        return new Tuple2<>(tup[0], Integer.parseInt(tup[1]));
                    }
                })
                .keyBy(value -> value.f0)
                .flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, String>() {
                    // 声明一个 ValueState 类型的状态变量,存储设备上一次收到的温度数据
                    private transient ValueState<Integer> lastDataState;

                    /**
                     * 任务初始化的时候这个方法执行一次
                     */
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 注册状态
                        ValueStateDescriptor<Integer> valueStateDesc = new ValueStateDescriptor<>(
                                "lastDataState", // 指定状态名称
                                Integer.class    // 指定状态中存储的数据类型
                        );
                        lastDataState = getRuntimeContext().getState(valueStateDesc);
                    }

                    @Override
                    public void flatMap(Tuple2<String, Integer> value, Collector<String> out) 
                            throws Exception {
                        System.out.println("线程ID:" + Thread.currentThread().getId() 
                                + ",接收到数据:" + value);
                        // 初始化
                        if (lastDataState.value() == null) {
                            lastDataState.update(value.f1);
                            System.out.println("lastDataState is null");
                        }
                        System.out.println("lastDataState is " + lastDataState.value());
                        // 获取上次温度
                        int tmpLastData = lastDataState.value();
                        // 如果某个设备的最近两次温差超过20度,则告警
                        if (Math.abs(value.f1 - tmpLastData) >= 20) {
                            out.collect(value.f0 + "_温度异常");
                        }
                        // 更新状态
                        lastDataState.update(value.f1);
                    }
                });

        result.print();

        env.execute("KeyedState_AlarmDemo");
    }
}

4,运行测试

(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
s1,50
  • 控制台打印的数据如下,说明这条数据被线程 ID117 的线程处理了,s1 这个 key 是第一次过来,对应的状态为 null,所以这里打印出来了 null,当状态为 null 时,程序会把当前的数据赋值给状态,所以下面打印的上次状态值就是 50 了。

(3)接着在 socket 中再模拟产生一条数据:
s2,10
  • 控制台打印的数据如下,此时说明这条数据被线程 ID103 的线程处理了,s2 这个 key 也是第一次过来的,所以初始也为 null,后面给他赋值之后是 10

(4)接着在 socket 中再模拟产生一条数据:
s3,10
  • 控制台打印的数据如下,此时显示的线程 ID 也是 117,说明 s3 这条数据和 s1 那条数据都是由同一个线程处理的,但是注意此时 s3 对应的状态默认依然是 null,因为 s3 也是第一次过来,虽然 ts3s1 都是由同一个线程处理的,但是状态是和 key 绑定的,所以 s3 对应的状态依然是 null
注意:如果我们把 ValueState 换成一个普通的 int 变量,就不是这样的效果了,当 s3 这条数据过来的时候就可以获取到之前 s1 存储的数据 50 了,因为普通的变量在同一个线程里面是共享的。

(5)最后,我们在 socket 中再模拟产生一条数据:
s1,10
  • 控制台打印的数据如下,可以看到触发了 s1 的温度异常的逻辑。
评论0