• Hadoop FileSystem 连接器
    • 分桶文件 Sink

    Hadoop FileSystem 连接器

    这个连接器可以向所有 Hadoop FileSystem 支持的文件系统写入分区文件。使用前,需要在工程里添加下面的依赖:

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-connector-filesystem_2.11</artifactId>
    4. <version>1.9.0</version>
    5. </dependency>

    注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考 这里。

    分桶文件 Sink

    关于分桶的配置我们后面会有讲述,这里先创建一个分桶 sink,默认情况下这个 sink 会将数据写入到按照时间切分的滚动文件中:

    1. DataStream<String> input = ...;
    2. input.addSink(new BucketingSink<String>("/base/path"));
    1. val input: DataStream[String] = ...
    2. input.addSink(new BucketingSink[String]("/base/path"))

    初始化时只需要一个参数,这个参数表示分桶文件存储的路径。分桶 sink 可以通过指定自定义的 bucketer、 writer 和 batch 值进一步配置。

    默认情况下,当数据到来时,分桶 sink 会按照系统时间对数据进行切分,并以 "yyyy-MM-dd—HH" 的时间格式给每个桶命名。然后 DateTimeFormatter 按照这个时间格式将当前系统时间以 JVM 默认时区转换成分桶的路径。用户可以自定义时区来生成分桶的路径。每遇到一个新的日期都会产生一个新的桶。例如,如果时间的格式以分钟为粒度,那么每分钟都会产生一个桶。每个桶都是一个目录,目录下包含了几个部分文件(part files):每个 sink 的并发实例都会创建一个属于自己的部分文件,当这些文件太大的时候,sink 会产生新的部分文件。当一个桶不再活跃时,打开的部分文件会刷盘并且关闭。如果一个桶最近一段时间都没有写入,那么这个桶被认为是不活跃的。sink 默认会每分钟检查不活跃的桶、关闭那些超过一分钟没有写入的桶。这些行为可以通过 BucketingSinksetInactiveBucketCheckInterval()setInactiveBucketThreshold() 进行设置。

    可以调用BucketingSinksetBucketer() 方法指定自定义的 bucketer,如果需要的话,也可以使用一个元素或者元组属性来决定桶的路径。

    默认的 writer 是 StringWriter。数据到达时,通过 toString() 方法得到内容,内容以换行符分隔,StringWriter 将数据内容写入部分文件。可以通过 BucketingSinksetWriter() 指定自定义的 writer。SequenceFileWriter 支持写入 HadoopSequenceFiles,并且可以配置是否开启压缩。

    关闭部分文件和打开新部分文件的时机可以通过两个配置来确定:

    • 设置文件大小(默认文件大小是384MB)
    • 设置文件滚动周期,单位是毫秒(默认滚动周期是 Long.MAX_VALUE

    当上述两个条件中的任意一个被满足,都会生成一个新的部分文件。

    示例:

    1. DataStream<Tuple2<IntWritable,Text>> input = ...;
    2. BucketingSink<Tuple2<IntWritable,Text>> sink = new BucketingSink<Tuple2<IntWritable,Text>>("/base/path");
    3. sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")));
    4. sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
    5. sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
    6. sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
    7. input.addSink(sink);
    1. // the SequenceFileWriter only works with Flink Tuples
    2. import org.apache.flink.api.java.tuple.Tuple2
    3. val input: DataStream[Tuple2[A, B]] = ...
    4. val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path")
    5. sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")))
    6. sink.setWriter(new SequenceFileWriter[IntWritable, Text])
    7. sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
    8. sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
    9. input.addSink(sink)

    上述代码会创建一个 sink,这个 sink 按下面的模式写入桶文件:

    1. /base/path/{date-time}/part-{parallel-task}-{count}

    date-time 是我们从日期/时间格式获得的字符串,parallel-task 是 sink 并发实例的索引,count 是因文件大小或者滚动周期而产生的文件的编号。

    更多信息,请参考 BucketingSink。