当前位置: > > > Flink - DataStream API使用详解4(DataSoure和DataSink:socket数据输出至Redis)

Flink - DataStream API使用详解4(DataSoure和DataSink:socket数据输出至Redis)

    DataSink 是输出组件,负责把计算好的数据输出到其它存储介质中。本文演示如何接收 Socket 传输过来的数据,把数据保存到 Redislist 队列中。

一、DataSoure 与 DataSink 介绍

1,基本介绍

DataStream API 主要分为 3 块:DataSourceTransformationDataSink
  • DataSource 是程序的输入数据源。
  • Transformation 是具体的操作,它对一个或多个输入数据源进行计算处理,例如 mapflatMapfilter 等操作。
  • DataSink 是程序的输出,它可以把 Transformation 处理之后的数据输出到指定的存储介质中。

2,DataSoure

(1)DataSource 是程序的输入数据源,Flink 提供了大量内置的 DataSource,也支持自定义 DataSource,不过目前 Flink 提供的这些已经足够我们正常使用了。
  • Flink 内置数据源有:KafkaKinesis StreamsRabbitMQNiFiTwitter Streaming APIGoogle PubSub
  • Apache Bahir 数据源(需要添加这个依赖包之后才能使用的):ActiveMQNetty

(2)当程序出现错误的时候,Flink 的容错机制能恢复并继续运行程序,这种错误包括机器故障、网络故障、程序故障等。针对 Flink 提供的常用数据源接口,如果程序开启了 checkpoint 快照机制,Flink 可以提供这些容错性保证。
DataSource 容错保证 备注
Socket at most once
Collection exactly once
Kafka exactly once 需要使用 0.10 及以上版本

3,DataSink

(1)DataSink 是 输出组件,负责把计算好的数据输出到其它存储介质中
  • Flink 支持把流数据输出到文件中,不过在实际工作中这种场景不多,因为流数据处理之后一般会存储到一些消息队列里面,或者数据库里面,很少会保存到文件中的。
  • 还有就是 print,直接打印,这个其实我们已经用了很多次了,这种用法主要是在测试的时候使用的,方便查看输出的结果信息
  • Flink 提供了一批 Connectors,可以实现输出到第三方目的地。在实际工作中最常用的是 kafkaredis
Flink 内置 Apache Bahir
Kafka ActiveMQ
Cassandra Flume
Kinesis Streams Redis
Elasticsearch Akka
Hadoop FileSysterm
RabbitMQ
NiFi
JDBC

(2)针对 Flink 提供的常用 sink 组件,可以提供这些容错性保证。
DataSink 容错保证 备注
Redis at least once
Kafka at least once / exactly once Kafka0.90.10 提供 at least onceKafka0.11 及以上提供 exactly once

二、演示样例

1,准备工作

我们创建一个 Maven 项目,然后在 pom.xml 文件中,添加 Flink 实时计算相关的依赖。
注意redis sink 是在 Bahir 这个依赖包中,所以在 pom.xml 中需要添加对应的依赖。
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.12</artifactId>
    <version>1.1.0</version>
</dependency>

2,样例代码

(1)下面是使用 Scala 实现的代码:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper

object StreamRedisSinkScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //连接socket获取输入数据
    val text = env.socketTextStream("192.168.121.128", 9999)

    import org.apache.flink.api.scala._
    //组装数据,这里组装的是tuple2类型
    //第一个元素是指list队列的key名称
    //第二个元素是指需要向list队列中添加的元素
    val listData = text.map(word => ("words", word))

    //指定redisSink
    val conf = new FlinkJedisPoolConfig.Builder()
      .setHost("192.168.121.128")
      .setPort(6379)
      .setPassword("123")
      .build()
    val redisSink = new RedisSink[Tuple2[String, String]](conf, new MyRedisMapper)
    listData.addSink(redisSink)

    env.execute("StreamRedisSinkScala")
  }

  //自定义RedisMapper
  private class MyRedisMapper extends RedisMapper[(String, String)]{
    //指定具体的操作命令
    override def getCommandDescription: RedisCommandDescription = {
      new RedisCommandDescription(RedisCommand.LPUSH)
    }
    //获取key
    override def getKeyFromData(data: (String, String)): String = {
      data._1
    }
    //获取value
    override def getValueFromData(data: (String, String)): String = {
      data._2
    }
  }
}

(2)下面是使用 Java 实现同样功能的代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class StreamRedisSinkJava {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //连接socket获取输入数据
        DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999);

        //组装数据
        SingleOutputStreamOperator<Tuple2<String, String>> listData = text
                .map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String word) throws Exception {
                return new Tuple2<String, String>("words", word);
            }
        });

        //指定redisSink
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost("192.168.121.128")
                .setPort(6379)
                .setPassword("123")
                .build();
        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
        listData.addSink(redisSink);

        env.execute("StreamRedisSinkJava");
    }

    //自定义redisSink的mapper
    public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>>{

        //指定具体的操作命令
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.LPUSH);
        }

        //获取key
        @Override
        public String getKeyFromData(Tuple2<String, String> data) {
            return data.f0;
        }
        //获取value
        @Override
        public String getValueFromData(Tuple2<String, String> data) {
            return data.f1;
        }
    }
}

3,运行测试

(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入一些文本数据:

(3)最终到 redis 中查看结果,可以发现数据都保存到 Redislist 队列中了。
lrange words 0 -1
评论0