Hey, tea lovers! Today I will show you how to specifically select the desired folders or files in Flink Batch Job programmatically. We will discuss when given a root path you can decide which file or folder to read in Flink Batch Job.

And why suddenly do I pick up this topic you say? Well, I was working on such a task myself where I needed to read the S3 files which are under folders with dates. Those date folders are in a rolling fashion like , 2021_01_13_1, 2021_01_13_2, 2021_01_13_3 and so on. And in these folders, there were multiple files, so based on the given date I needed to read all those files which are in the given date folder (the folder name should contain the date obviously).

There may be other ways of doing it, but I couldn’t find any of them. So I had to create it on my own. If there is any other better way to do it, please let me know, it will not only help me but countless other programmers as well.

So let’s get started. But before preparing your cup of tea sip and code.


I have written a new post on the Batch code with the streaming mediation. Please do check out the How to Run Flink Batch as Streaming.

Prerequisites for this Code

First thing, you should know at least the basics of Flink. The code I am showing is from Flink 1.7, but I suppose it should work for other versions as well. If not please let me know in the comments.

For the Installation of Apache Flink on Mac OS check out the post “How to Install Apache Flink On Mac OS“.

And for theWindows check out the post “How to Install Apache Flink On Local Windows

Have Ubuntu? Then refer to “How to Install Apache Flink On Ubuntu“.

Knowing Java version 8 or above is a huge plus, as I will be using the Stream API to make the code more readable and efficient. If you are not familiar with Stream API, you can check my post on it: “Stream API: The Hero Without a Cape“. Also for simplicity, I will be using the local file and not S3.

You can follow me on social media via @coderstea on TwitterLinkedinFacebook, or Instagram. We also share high-quality videos about programming on our Youtube channel. You can also publish your post on CodersTea, just share your thought on Contact Us or let us know in the comments.

Create a New Flink Project

The best way to create a Flink project is to use Maven Archetype. As I stated I will be using Flink 1.7, so here is the command to generate it via mvn. You can change the version to your need.

mvn archetype:generate \
  -DarchetypeGroupId=org.apache.flink \
  -DarchetypeArtifactId=flink-quickstart-java \

You can also use this command instead.

curl https://flink.apache.org/q/quickstart.sh | bash -s 1.7.1

This will ask you for the group, artifact ids, and other details. Once you are done, go to pom.xml. There you will see the Flink dependencies scope as provided. You may need to remove that scope if you are planning to run it locally. However, you can run it locally with some modifications to your project in your IDE. but that’s a different topic altogether, so you can look for it on the internet.

The easiest and the best solution is, I have created the project repo on GitHub, pull it, and start using it.

Setup Specific Folders for Flink Job

Now that we have our project, let’s quickly set up the input environment. For this demo, I will be creating the folders which have the dates in YYYY_MM_DD format and have the numbers appended to them in the end. For example, 2021_01_01_01, 2021_01_01_02, 2021_01_02_01 and so on. In these folders, we will be having files with random numbers, which we will simply sum numbers.

This is what my Input data File structure looks like.

Flink Specific Folder input strucure
Directory Structure

And the data is in the following format:


I have put the input-data folders inside the resource directory. You can copy it and put wherever you like. I will be putting it in D:\projects\temp\flink-specific the folder.

Program to Read Specific Folders in Flink

Now that we have set up our environment, let’s jump into the code. As I said, my path will be the D:\projects\temp\flink-specific\.

Take Input for the Flink Specific Folders Program

As you must know we can parse the command line argument with the ParameterTool class. I will be using that to get the inputs. For the demo, I passed the default values in the program.

ParameterTool param = ParameterTool.fromArgs(args);
// get the inputFolder path from arguments otherwise pass the default path
String inputFolder = param.get("inputFolder", INPUT_FOLDER);
log.info("The input folder path is {}", inputFolder);
String date = param.get("date", "2021_01_01");

Select Specific Folders in Flink

We will be using Path class of Flink lib. First, we will pass the root folder path to it and get a list of all its subdirectories of it. Next, we will filter out the folders by checking if they contain the date Then collect all the folders with it.

The information of the folder will be fetched by the getFileSystem() method, which returns the FileSystem object. Now, to get all the subdirectories or folders, we need to call the method listStatus(path), the path is the same path we created earlier.

Path inputFolderPath = new Path(inputFolder);
FileSystem fileSystem = inputFolderPath.getFileSystem();
FileStatus[] fileStatuses = fileSystem.listStatus(inputFolderPath);

Select Specific folders: For Loop

Now that we have all the files and folders within let’s filter out our specific folders which contain the Date. This code is with the normal For Loop on the fileStatuses.

List<Path> foldersWithGivenDate = new ArrayList<>();
for (FileStatus fileStatus : fileStatuses) {
  // it should be a directory
    // get the path of the folder
    Path folderWithGivenDate = fileStatus.getPath();
    //check if the directory name contains the date
    if (folderWithGivenDate.getName().contains(date)) {
      //add it to the list

Select Specific folders: Stream API

Now, the same thing using Stream API is shown below. This makes it more clear and readable. Also, the Stream API code is much less redundant.

// the same thing using Stream API
List<Path> foldersWithGivenDateWithStreamApi = Stream.of(fileStatuses)
    .filter(path -> path.getName().contains(date))

Both are the same, but which one do you prefer? Please let me know in the comments.

Create DataSet from multiple Paths in Flink

We got all the paths, the locations, of the file we needed. One thing to notice is that I am using folders or directories here as input. As I need to read all the files within the folder. But in case you need to filter based on the files, you can then play with the FileStatus object. Think of this object as a File object in Java.

Ok, back to the folders. For the Flink Batch, I will be using the createInput method of the ExecutionEnvironment. It takes an TextInputFormat object. In this TextInputFormat I will be putting the paths we generated. Here is how to do it.

// convert list to array
Path[] foldersWithGivenDateArr = foldersWithGivenDate.toArray(new Path[0]);
TextInputFormat textInputFormat = new TextInputFormat(null);
// pass the arr to the text input

The Flink Batch Job with Specific Folder Data

Up until now, we were just setting up the input we needed for the program. The main Flink execution starts now. We will be using them ExecutionEnvironment as opposed to StreamExecutionEnvironment the Batch job, the bounded data input. First, we will create a DataSet user createinput and will pass the created TextInputFormat object. Next, we will parse the input to Integer. Then, sum all the numbers using reduce and collect them.

ExecutionEnvironment env =  ExecutionEnvironment.getExecutionEnvironment();
List<Integer> result = env.createInput(textInputFormat)
  //convert string to integer
  // sum the numbers
 // will only have one element though
log.info("The sum of all the data is {}", result);


That is it for this post. In this post, we mainly talked about how to list and filter the files of the given folder in Flink. Then select the specific one and create an input from them via TextInputFormat. Using this we can simply create a Dataset with desired input files.

The whole code can be found on GitHub. I hope this will help you. I will be creating multiple posts on the same topic. The next post will be on the Integration test on the Flink. I will update you about the latest post on our social media platform. You can follow us on @coderstea on Twitter, Instagram, Linked In, and Facebook.

See you in the next post. HAKUNA MATATA!!!

You can follow me on social media via @coderstea on TwitterLinkedinFacebook, or Instagram. We also share high-quality videos about programming on our Youtube channel. You can also publish your post on CodersTea, just share your thought on Contact Us or let us know in the comments.


Samir June 27, 2021 - 11:12 am

Hi Mahesh/Imran,
Hope you are doing well.
I am a newbee to flink so was going through articles on google.
I had came across your website coderstea.in https://coderstea.in/post/big-data/how-to-select-specific-folders-or-files-as-input-in-flink/ found it very useful for my problem statement.
With this i though i should discuss my problem statement wit you.

I have problem statement:

I will have a folder(fix folder) where some third party will put a csv file everyday at some time.

Example :

I have a folder path D:input and at this path everyday a file will be placed.

So suppose today is 27-jun-2021. One file say my_instrument_27_june-2021_10_59AM.csv come and places at this location.

Tomorrow i.e. on 28-June another file say 28_june_2021_10_59AM.csv comes to this location.

So my input folder have multiple files.

If i execute flink job today then it will read 27_june-2021_10_59AM.csv.After completion of reading this file will resides at the same location(We will think of file moving logic in future but yes if you have idea you can tell us) and if i execute flink job tomorrow then it will have 2 files i.e. 27_june-2021_10_59AM.csv and 28_june-2021_10_59AM.csv.

I have 2 questions here

1)How can i read these files (files with different namings) from a directory

2)How can i read flink where it can read only latest files and not already read file.

I have stucked up here badly so any help would really be appreciable.

Thanks in advance

Imran Shaikh
Imran Shaikh June 27, 2021 - 3:02 pm

Hi Samir, glad the post was helpful for you.
For your query regarding new files each day, the solution depends on the scenarios.
If you continuously want to monitor the folder you can use FileProcessingMode.PROCESS_CONTINUOUSLY with the interval, in your case a day.

If you only run it once a day for some time to read and then shut down the system as there is no need to run it the whole day, you can simply filter out the files by matching the current date with the file name. You can use code inside the heading ‘Select Specific folders’ of this post (just remove the isDir condition) and change the date format to match with the filename’s date format.

Hope this helps you.

Samir June 27, 2021 - 4:37 pm

indeed helpful

Samir June 27, 2021 - 4:58 pm

is there any api to get the file name under these date(2021_01_01)folders?
Example : 2021_01_01 folder has files random-numbers-00.txt and random-numbers-01.txt.
So how can i get this file name
Thanks in advance

Imran Shaikh
Imran Shaikh June 27, 2021 - 8:28 pm

The same object, FileStatus. It’s for both Directories and Files. Use Path.getName() is to getting the name of the file/directory.

Samir July 19, 2021 - 7:55 pm


Samir June 27, 2021 - 5:29 pm

THis program will get executed on stand alone machine as it is uses ExecutionEnvironment but what changes are required if i want to use with StreamExecutionEnvironment ?

Imran Shaikh
Imran Shaikh June 27, 2021 - 8:30 pm

Since you are using File Reading, which is a finite data source. Stream env will not provide the desired result.

Samir June 28, 2021 - 11:06 am

Thanks for your reply.
But how it wont give the desired result?

Samir June 28, 2021 - 11:21 am

thanks for your reply.
But how it wont work in StreamExecutionEnvironment?

Imran Shaikh
Imran Shaikh June 28, 2021 - 7:11 pm

The Stream is for continuous infinite (kind of) data.
And the dataset is for finite data such as files.
I tried with stream for file reading, but it didn’t read all the data or sometimes didn’t stop at all.

Samir June 29, 2021 - 1:54 pm

Can you send the program if you have on git.
I just found some working solution around this so may be that could help to solve the problem you are facing.

Imran Shaikh
Imran Shaikh July 3, 2021 - 11:25 am
Samir June 29, 2021 - 1:57 pm
Imran Shaikh
Imran Shaikh July 3, 2021 - 11:26 am

In the newer versions we can do batch in Streaming Environment with RuntimeMode set to BATCH.

Samir June 28, 2021 - 11:22 am

Will above program work on Flink Cluster?

Imran Shaikh
Imran Shaikh June 28, 2021 - 7:09 pm


Samir July 19, 2021 - 8:00 pm

There is a use case where i want to read a csv fileusing flink pipeline .
This flink job will read file data row by row and will put each single row on Kafka topic.
I have a query here
1)With above requirement after readin all rows from csv i want to move the file to another foler say history.So that flink job will read data only from the new file and not from already processed file.
So how can i achieve this?


Leave a Comment

* By using this form you agree with the storage and handling of your data by this website.




Subscribe my Newsletter for new blog posts, tips & new photos. Let's stay updated!

@2022 All Right Reserved. Designed and Developed by CodersTea

This website uses cookies to improve your experience. We'll assume you're ok with this, but you can opt-out if you wish. Accept Read More