Flink - State状态详解2(KeyedState样例1:ValueState实现温度告警)
一、KeyedState 样例 1:使用 ValueState 实现温度告警
1,需求说明
大致需求是这样的,某机房内的多个设备会实时上报温度信息,在 Flink 任务内部需要对设备最近两次的温度进行对比,如果温差超过了 20 度,则需要发送告警信息,说明设备出问题了。
2,实现逻辑
在这里我们可以把设备的唯一标识 ID 字段作为 keyby 分组的 key,这样的话在 Flink 内部就只需要维护设备的温度即可,温度值是一个数字,数字属于一个普通的单值,所以可以考虑使用 ValueState。
3,样例代码
(1)下面是 Scala 语言代码:
import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector object KeyedState_AlarmDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置任务全局并行度为8 env.setParallelism(8) //数据格式为 设备ID,温度 val text = env.socketTextStream("192.168.121.128", 9999) import org.apache.flink.api.scala._ text.map(line=>{ val tup = line.split(",") (tup(0),tup(1).toInt) }).keyBy(_._1) .flatMap(new RichFlatMapFunction[(String,Int),String] { //声明一个ValueState类型的状态变量,存储设备上一次收到的温度数据 private var lastDataState: ValueState[Int] = _ /** * 任务初始化的时候这个方法执行一次 * @param parameters */ override def open(parameters: Configuration): Unit = { //注册状态 val valueStateDesc = new ValueStateDescriptor[Int]( "lastDataState",//指定状态名称 classOf[Int]//指定状态中存储的数据类型 ) lastDataState = getRuntimeContext.getState(valueStateDesc) } override def flatMap(value: (String, Int), out: Collector[String]): Unit = { println("线程ID:"+Thread.currentThread().getId+",接收到数据:"+value) //打印当前的线程ID和接收到的数据 //初始化 if(lastDataState.value() == null){ lastDataState.update(value._2) println("lastDataState is null")//打印初始的状态为null } println("lastDataState is "+ lastDataState.value())//打印上次的状态值 //获取上次温度 val tmpLastData = lastDataState.value() //如果某个设备的最近两次温差超过20度,则告警 if(Math.abs(value._2 - tmpLastData) >= 20){ out.collect(value._1+"_温度异常") } //更新状态 lastDataState.update(value._2) } }).print() env.execute("KeyedState_AlarmDemo") } }
(2)下面是使用 Java 语言实现同样功能:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.util.Collector; public class KeyedStateAlarmDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置任务全局并行度为8 env.setParallelism(8); // 数据格式为 设备ID,温度 DataStream<String> text = env.socketTextStream("192.168.121.128", 9999); SingleOutputStreamOperator<String> result = text .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { String[] tup = s.split(","); return new Tuple2<>(tup[0], Integer.parseInt(tup[1])); } }) .keyBy(value -> value.f0) .flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, String>() { // 声明一个 ValueState 类型的状态变量,存储设备上一次收到的温度数据 private transient ValueState<Integer> lastDataState; /** * 任务初始化的时候这个方法执行一次 */ @Override public void open(Configuration parameters) throws Exception { // 注册状态 ValueStateDescriptor<Integer> valueStateDesc = new ValueStateDescriptor<>( "lastDataState", // 指定状态名称 Integer.class // 指定状态中存储的数据类型 ); lastDataState = getRuntimeContext().getState(valueStateDesc); } @Override public void flatMap(Tuple2<String, Integer> value, Collector<String> out) throws Exception { System.out.println("线程ID:" + Thread.currentThread().getId() + ",接收到数据:" + value); // 初始化 if (lastDataState.value() == null) { lastDataState.update(value.f1); System.out.println("lastDataState is null"); } System.out.println("lastDataState is " + lastDataState.value()); // 获取上次温度 int tmpLastData = lastDataState.value(); // 如果某个设备的最近两次温差超过20度,则告警 if (Math.abs(value.f1 - tmpLastData) >= 20) { out.collect(value.f0 + "_温度异常"); } // 更新状态 lastDataState.update(value.f1); } }); result.print(); env.execute("KeyedState_AlarmDemo"); } }
4,运行测试
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
s1,50
- 控制台打印的数据如下,说明这条数据被线程 ID 为 117 的线程处理了,s1 这个 key 是第一次过来,对应的状态为 null,所以这里打印出来了 null,当状态为 null 时,程序会把当前的数据赋值给状态,所以下面打印的上次状态值就是 50 了。

s2,10
- 控制台打印的数据如下,此时说明这条数据被线程 ID 为 103 的线程处理了,s2 这个 key 也是第一次过来的,所以初始也为 null,后面给他赋值之后是 10。

(4)接着在 socket 中再模拟产生一条数据:
s3,10
- 控制台打印的数据如下,此时显示的线程 ID 也是 117,说明 s3 这条数据和 s1 那条数据都是由同一个线程处理的,但是注意此时 s3 对应的状态默认依然是 null,因为 s3 也是第一次过来,虽然 ts3 和 s1 都是由同一个线程处理的,但是状态是和 key 绑定的,所以 s3 对应的状态依然是 null。
注意:如果我们把 ValueState 换成一个普通的 int 变量,就不是这样的效果了,当 s3 这条数据过来的时候就可以获取到之前 s1 存储的数据 50 了,因为普通的变量在同一个线程里面是共享的。

(5)最后,我们在 socket 中再模拟产生一条数据:
s1,10
- 控制台打印的数据如下,可以看到触发了 s1 的温度异常的逻辑。
