Flink - State状态详解5(KeyedState样例4:带有状态的算子)
四、KeyedState 样例 4:使用带有状态的算子
1,使用 Keyed State 的 3 种形式
(1)通过重写 RichXXXFunction,在里面创建和操作状态。
- 例如针对 map 算子可以使用 RichMapFunction,针对 flatmap 算子可以使用 RichFlatMapFunction 等。
- 在这里使用对应的 RichFunction 主要是因为它里面提供了 getRuntimeContext 这个上下文,通过 getRuntimeContext 可以获取 Keyed State。
该方式使用样例可以参考我之前写的文章:
(2)通过 process() 这种低级 API。
- 主要是因为 process 中也可以获取到 getRuntimeContext 这个上下文。
该方式使用样例可以参考我之前写的文章:
- 这种算子里面针对 Keyed State 直接进行了封装,使用起来更加方便,但是它有一定的局限性。
- 首先是这些带有状态的算子只能应用在 keyBy 算子之后。
- 还有就是这些带有状态的算子里面封装的其实都是 ValueState 这种状态,不能自己控制使用哪种状态。
提示:这种方式了解即可,以后看到了这种写法知道是什么意思就行,个人不建议在工作中使用方式,因为这种方式不好理解,不属于通俗易懂的代码。
2,样例代码
(1)下面是 mapWithState 方法定义:
- T:当前数据流中的数据类型
- R:返回数据类型
- S:State 中存储的数据类型
- 注意:在这里 State 默认是 ValueState,ValueState 中可以存储多种数据类型。
def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])) : DataStream[R] = {
(2)这里我们使用 MapWithState 实现一个带有状态的单词计数案例,下面是 Scala 语言代码:
注意:mapWithState()、flatMapWithState() 是 Scala 中的特有方法,Java 中没有该方法。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object KeyedState_MapWithStateDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("192.168.121.128", 9999) import org.apache.flink.api.scala._ val keyedStream = text.flatMap(_.split(" ")) .map((_, 1)) .keyBy(_._1) keyedStream.mapWithState[(String,Int),Int]((in: (String,Int),count: Option[Int])=>{ count match { //(t1,t2) //t1: 表示这个map操作需要返回的数据 //t2: 表示State中目前的数据 case Some(c) => ((in._1,in._2+c),Some(in._2+c))//第2及以上次数,返回累加后的数据,更新状态 case None => ((in._1,in._2),Some(in._2))//第1次接收到数据,直接返回数据,初始化状态 } }).print() env.execute("KeyedState_MapWithStateDemo") } }
3,运行测试
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
a b a
