当前位置: > > > Flink - State状态详解5(KeyedState样例4:带有状态的算子)

Flink - State状态详解5(KeyedState样例4:带有状态的算子)

四、KeyedState 样例 4:使用带有状态的算子

1,使用 Keyed State 的 3 种形式

(1)通过重写 RichXXXFunction,在里面创建和操作状态。
  • 例如针对 map 算子可以使用 RichMapFunction,针对 flatmap 算子可以使用 RichFlatMapFunction 等。
  • 在这里使用对应的 RichFunction 主要是因为它里面提供了 getRuntimeContext 这个上下文,通过 getRuntimeContext 可以获取 Keyed State
该方式使用样例可以参考我之前写的文章:

(2)通过 process() 这种低级 API
  • 主要是因为 process 中也可以获取到 getRuntimeContext 这个上下文。
该方式使用样例可以参考我之前写的文章:

(3)通过 mapWithState()flatMapWithState() 等直接带有状态的算子。本文将通过样例进行演示。
  • 这种算子里面针对 Keyed State 直接进行了封装,使用起来更加方便,但是它有一定的局限性。
  • 首先是这些带有状态的算子只能应用在 keyBy 算子之后。
  • 还有就是这些带有状态的算子里面封装的其实都是 ValueState 这种状态,不能自己控制使用哪种状态。
提示:这种方式了解即可,以后看到了这种写法知道是什么意思就行,个人不建议在工作中使用方式,因为这种方式不好理解,不属于通俗易懂的代码。

2,样例代码

(1)下面是 mapWithState 方法定义:
  • T:当前数据流中的数据类型
  • R:返回数据类型
  • SState 中存储的数据类型
  • 注意:在这里 State 默认是 ValueStateValueState 中可以存储多种数据类型。
def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S]))
     : DataStream[R] = {

(2)这里我们使用 MapWithState 实现一个带有状态的单词计数案例,下面是 Scala 语言代码:
注意mapWithState()flatMapWithState() Scala 中的特有方法,Java 中没有该方法。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object KeyedState_MapWithStateDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("192.168.121.128", 9999)

    import org.apache.flink.api.scala._
    val keyedStream = text.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    keyedStream.mapWithState[(String,Int),Int]((in: (String,Int),count: Option[Int])=>{
      count match {
        //(t1,t2)
        //t1: 表示这个map操作需要返回的数据
        //t2: 表示State中目前的数据
        case Some(c) => ((in._1,in._2+c),Some(in._2+c))//第2及以上次数,返回累加后的数据,更新状态
        case None => ((in._1,in._2),Some(in._2))//第1次接收到数据,直接返回数据,初始化状态
      }
    }).print()

    env.execute("KeyedState_MapWithStateDemo")
  }
}

3,运行测试

(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
a b
a

(3)程序的控制台输出内容如下:
评论0