How To Run Flink Batch As Streaming | CodersTea
Home Big Data How to Run Flink Batch as Streaming

How to Run Flink Batch as Streaming

by Imran Shaikh
Published: Last Updated on 1464 views
How to Run Flink Batch as Streaming

Hey, Tea Lovers! I will be writing a series of posts on the Flink Batch topic. This post is about the basic knowledge you must have to create a Flink Batch using Streaming. I will talk about what Flink Batch is and how it’s different from Flink Streaming. It will be a quick and simple post to get you started. If you like the post and want to know more about the Flink Batch as a whole, please let me know in the comment.

Why do You need to use Flink Batch?

The TL;DR is, it is used for bounded data i.e. data that has an end to it. Flink Streaming on the other hand is used for connecting event streams that are unbounded such as Kafka. These data or events keep coming and will never end (probably). But bounded data, such as Files, CSV or even a simple ArrayList have limited data. They have an end.

An example would be, your client sends you the files instead of Kafka Streams once a day. And you just need to run some operations and be done with it. This takes an hour or so. Then repeat the same task the next day. In this case, you have bounded data, a file. Which should be processed only once which takes about an hour or so.

You can’t do the Streaming operations in these types of situations. So that’s why we need Flink Batch.

Prerequisite for Code

The thing you will be needing is a basic understanding of Flink architecture. Don’t worry, you don’t need to be an expert, as I am not😜. For simplicity, I will run in IDE only and not show the whole deployment process, it’s for another post. The Flink version for this example is 1.12.0. I suppose it will work from version 11 to latest version 14. You can simply clone the projectFlink-Tutorial-CodersTea. And yes, that’s all you need to understand this post of Flink Batch.

Now let’s jump directly into it. The code could be found on GitHub.


You can follow me on social media via @coderstea on TwitterLinkedinFacebook, or Instagram. We also share high-quality videos about programming on our Youtube channel. You can also publish your post on CodersTea, just share your thought on Contact Us or let us know in the comments.


Long Gone the Days of Flink DataSet API

Earlier, to process the bounded data, batch, Flink provided DataSet API. But in recent releases, it has been deprecated. The DataSet API was separate from the DataStream API.

We couldn’t fully replicate the pipeline of DataStram API in DataSet API. It resulted in rewriting the pipeline according to DataSet API, which to be honest, didn’t provide much flexibility and was very troublesome to work with.

I have one post on DataSet API where I explored one of the ways you can filter the files or folders as input, How to Select Specific Folders or Files As Input in Flink.

Add Execution Mode as BATCH

Ok, enough beating around the bush. Let’s jump into the new way of Flink Batch. This functionality or process Flink likes to call “Batch as a Special Case of Streaming“. The reason being you use the same pipeline for both Streaming and Batching.

You simply add runtime more in the execution environment setting i.e. setRuntimeMode(RuntimeExecutionMode.BATCH). And the remaining code or pipeline is constructed is exactly as you do in streaming.

StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
exeEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

TheRuntimeExecutionMode has STREAMING, BATCH, and AUTOMATIC as values. Following is the explanation based on Flink Doc,

  • STREAMING: The classic DataStream execution mode (default)
  • BATCH: Batch-style execution on the DataStream API
  • AUTOMATIC: Let the system decide based on the boundedness of the sources

Flink says you don’t even need to provide it. As it will automatically infer it based on the input provided. But it’s better to provide it. You can also provide this info via the command line. This way you don’t need to change the code every time you switch to Batching from Streaming or vice versa.

flink run -Dexecution.runtime-mode=BATCH /path/to/your/application.jar

Flink Batch as a Special Case of Streaming Code

“Show me the code”, you said. “Here it is”, I replied.

Let’s create a CSV file for my expense report. For simplicity, this will contain two columns. Category and expense amount. Flink will read this, aggregate expense amount based on category, and print it. Simple. I will try to explain the main pieces of the code.

Input File for Flink Batch

As I said, it will be a simple CSV file. I won’t be using any CSV parser and simply split the line by a comma. Here is the input file I will be passing to the Flink Batch code.

Food,1200
Bill,2300
Movies,500
Groceries,2000
Food,1600
Bill,2700
Movies,800
Groceries,3000
Movies,500
Netflix,200
Rent,600
candy,3

Code to Aggregate Expense Amount by Category

StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
exeEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); // use batch mode, !important
// there is no need of checkpointing configuration

String filePath = FlinkBatchAsStreaming.class.getClassLoader().getResource("expense-report.csv").getPath();
TextInputFormat txt = new TextInputFormat(new Path(filePath));

exeEnv.createInput(txt)
    .filter(x -> !x.isEmpty())
    // split the row with comma and create a tuple
    .map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String row) throws Exception {
            // split category and amount
            String[] categoryAndAmount = row.split(",");
            return new Tuple2<>(categoryAndAmount[0], Integer.parseInt(categoryAndAmount[1]));
        }
    })
    .keyBy(t -> t.f0)// group by category which is 0th field
        // can be written as keyBy(0) but its deprecated
    .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    // window is required. You can adjust it based on your data
    .sum(1) // sum the amount which is on 1st position
    .print();
exeEnv.execute();

One thing which is different from Flink DataSet API is that there is no groupBy in Streaming (at least not in Flink 12.1). We used keyBy instead. Also, we have to provide window which is required. I have passed the tumbling window and the timing is up to you.The full code is on GitHub.

The pipeline resembles the Stream API, which you can read more about on Stream API: The Hero Without a Cape.

Output

1> (Food,2800)
5> (candy,3)
10> (Rent,600)
2> (Bill,5000)
4> (Netflix,200)
14> (Movies,1800)
9> (Groceries,5000)

As you can see it has grouped and sum the numbers by category. And you must have noticed I spent too much on Movies 😜. The number with > the sign (1>, 5>, 14>) is probably the task number or something, not sure. But please ignore that.

Conclusion and Further Posts on Flink Batch

That’s it for this post. I will write the next post soon on the topic for the Flink batch series. The next post would be How to Sink Flink Batch Streaming to resolve the incomplete output issue. Which I guess is a pretty common issue. So please subscribe to our newsletter for updates or follow me on social media. The code for this post could be found on GitHub. or you can see the full project on Flink-Tutorial-CodersTea.

See you in the next post. HAKUNA MATATA!!!

Subscribe
Notify of
guest
1 Comment
Most Voted
Newest Oldest
Inline Feedbacks
View all comments

This website uses cookies to improve your experience. We'll assume you're ok with this, but you can opt-out if you wish. Accept Read More