Home Big Data How to Sink File in Apache Flink Correctly

How to Sink File in Apache Flink Correctly

Published: Last Updated on 0 comment

Hey, Tea Lovers! Today we will take a look at how you can resolve Flink’s StreamingFileSink‘s incomplete or .inprogress part file issue in Flink Batch Streaming. Or how you can Sink Files in Flink Batch without any issue. This post is the continuation of the series I am writing on Flink Batch as a Special Case of streaming. (Flink version 1.12)

Note: This will work for not only Batch piepline but also for Streaming pipeline

If you are not familiar with Flink Batch as a Special Case of Streaming, do read my previous post How to Run Flink Batch as Streaming. Where I have discussed what is Flink Batch Streaming and how its different from Normal Event Streaming.

Without further ado, let’s jump to the main topic. You can clone the project Flink-Tutorial-CodersTea. The code could be found on GitHub.


I would be happy to connect with you guys on social media. It’s @coderstea on TwitterLinkedinFacebook, Instagram, and YouTube.

Please Subscribe to the newsletter to know about the latest posts from CodersTea.


What is Sink File in Apache Flink?

The actual word is File Sink, but I don’t want to mix File Sink with Streaming File Sink. Sink in Apache Flink is simply put as an output. Simple. Whenever we say sink, it means the results of the Flink pipeline are being written, in Files (I am not sure if we have another Sink besides the file, if there is please do let me know in the comments).

Flink provides StreamingFileSink and FileSink to sink into the files. I think from the Flink version 1.14 Streaming File Sink is being deprecated. You can read more about this on Flink Doc.

The issue with StreamingFileSink to Sink File

Streaming File Sink works best for Streaming and requires checkpointing. The state of the output file depends on the checkpointing only. It has Finished files as part-file, pending files as .opening part files, and in-progress as .inprogrss part files.

Since batch code doesn’t do checkpointing, it always creates .inprogress files, which may or may not contain the complete output and are prone to inconsistent data. Let me show you with example.

I will be using the example we used in our previous post, How to Run Flink Batch as Streaming, where I used a CSV file and aggregated the values, and simply printed the result. This time let’s save them in an output file. First, we will do the Streaming File Sink.

StreamingFileSink Object Creation

StreamingFileSink<String> streamingFileSink = StreamingFileSink.forRowFormat(
    new Path("src/main/resources/output"), new SimpleStringEncoder<String>()
)
    .withBucketAssigner(new BasePathBucketAssigner<>())
    .build();Code language: JavaScript (javascript)

In the forRowFormat, I provided an output path and an encoder. It depends on how you want to encode your output. For simplicity, I will use String only, meaning the output will be a string. Next is withBucketAssigner, where you can provide a custom path for each element in the pipeline. I will stick to the path I provided so I used BasePathBucketAssigner.

And to tell Flink pipeline to use this sink we attach addSink function to the pipeline.

<meta charset="utf-8">stream.addSink(<meta charset="utf-8">streamingFileSink);Code language: HTML, XML (xml)

Here is the full code.

StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
exeEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); // use batch mode, !important
exeEnv.setParallelism(1);
// there is no need of checkpointing configuration

StreamingFileSink<String> <meta charset="utf-8">streamingFileSink = StreamingFileSink.forRowFormat(
new Path("src/main/resources/output"), new SimpleStringEncoder<String>()
)
    .withBucketAssigner(new BasePathBucketAssigner<>())
    .build();

String filePath = FlinkBatchAsStreaming.class.getClassLoader().getResource("expense-report.csv").getPath();
TextInputFormat txt = new TextInputFormat(new Path(filePath));

DataStream<String> stream = exeEnv.createInput(txt)
    .filter(x -> !x.isEmpty())
    // split the row with comma and create a tuple
    .map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String row) throws Exception {
            // split category and amount
            String[] categoryAndAmount = row.split(",");
            return new Tuple2<>(categoryAndAmount[0], Integer.parseInt(categoryAndAmount[1]));
        }
    })
    .keyBy(t -> t.f0)// group by category which is 0th field
    // can be written as keyBy(0) but its deprecated
    .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    // window is required. You can adjust it based on your data
    .sum(1)
    .map(x -> x.f0 + "," + x.f1);

stream.addSink(<meta charset="utf-8">streamingFileSink);
// output: .part-0-0.inprogress.052e111b-79b1-4d2c-ac3c-6a6d63549652

exeEnv.execute();Code language: JavaScript (javascript)

Since we haven’t provided a checkpoint because Batch doesn’t require it, the output you will be seeing is a .inprogrss part file such as .part-0-0.inprogress.052e111b-79b1-4d2c-ac3c-6a6d63549652. Yours may differ, but it will contain.inprogress. Now the content may be correct due to small input, but it’s not guaranteed. It’s designed for Streaming anyway.

Sink File with File Sink in Flink

What a tongue twister it is,Sink File with File Sink in Flink. Now say it faster 20 times. Just kidding 😜. Back to the topic. Long story short, FileSink this is the solution you were searching for. And it’s recommended by Flink itself, as I said earlier, The StreamingFileSink is being phased out.

There is hardly any difference in the way we create FileSink Object from StreamingFileSink. The only difference is we use sinkTo function in the pipeline to use FileSink rather than addSink for StreamingFileSink.

Let’s see the example.

Dependency for File Sink

To add FileSink to your project, you need to add the following dependency. The Flink version I am using for this post series is 1.12. You can see this dependency on Maven Central.

Maven Flink FileSink

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>${flink.version}</version>
</dependency>Code language: HTML, XML (xml)

Gradle Flink FileSink

implementation group: 'org.apache.flink', name: 'flink-connector-files', version: '$<meta charset="utf-8">flink.version'Code language: JavaScript (javascript)

FileSink Code for Flink Batch

As I said there is nothing different in the FileSink Object creation than the Streaming one. We just need to use FileSink class from org.apache.flink.connector.file.sink.FileSink.

FileSink<String> fileSink = FileSink.forRowFormat(
            new Path("src/main/resources/output1"), new SimpleStringEncoder<String>()
    )
    .withBucketAssigner(new BasePathBucketAssigner<>())
    .build();Code language: JavaScript (javascript)

We will use sinkTo instead of addSink.

stream.sinkTo(fileSink);Code language: CSS (css)

Full Code

StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
exeEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); // use batch mode, !important
exeEnv.setParallelism(1);
// there is no need of checkpointing configuration

FileSink<String> fileSink = FileSink.forRowFormat(
            new Path("src/main/resources/output1"), new SimpleStringEncoder<String>()
    )
    .withBucketAssigner(new BasePathBucketAssigner<>())
    .build();

String filePath = FlinkBatchAsStreaming.class.getClassLoader().getResource("expense-report.csv").getPath();
TextInputFormat txt = new TextInputFormat(new Path(filePath));

DataStream<String> stream = exeEnv.createInput(txt)
    .filter(x -> !x.isEmpty())
    // split the row with comma and create a tuple
    .map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String row) throws Exception {
            // split category and amount
            String[] categoryAndAmount = row.split(",");
            return new Tuple2<>(categoryAndAmount[0], Integer.parseInt(categoryAndAmount[1]));
        }
    })
    .keyBy(t -> t.f0)// group by category which is 0th field
    // can be written as keyBy(0) but its deprecated
    .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    // window is required. You can adjust it based on your data
    .sum(1)
    .map(x -> x.f0 + "," + x.f1);

stream.sinkTo(fileSink);
// output: part-13d796db-4956-489f-a0c3-c30baaf30b98-0

exeEnv.execute();Code language: JavaScript (javascript)

The output will generate will be part-*. No progress files. Now you can happily sink the output to the file.

Conclusion and The Flink Table API in Batch

That’s it for this post. Hope this will help you in achieving the correct results from your batch pipeline. This will work for not only the Batch pipeline but also for the Streaming pipeline. So you can use it interchangeably without any issues.You can clone the project Flink-Tutorial-CodersTea. The code could be found on GitHub.

Next in the series, I will be writing for Table API. So do connect us via the following.

See you in the next post. HAKUNA MATATA!!!


I would be happy to connect with you guys on social media. It’s @coderstea on TwitterLinkedinFacebook, Instagram, and YouTube.

Please Subscribe to the newsletter to know about the latest posts from CodersTea.


Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
Ads
Ads
Ads

@2023 All Right Reserved. Designed and Developed by CodersTea

This website uses cookies to improve your experience. We'll assume you're ok with this, but you can opt-out if you wish. Accept Read More