Hey, Tea Lovers! This post is about how you can do some tasks after the Flink Job is completed or submitted. To do this we will be using Flink Job Listener. There can be many scenarios where you want to do some additional processing after the Flink job is completed. It can be a notification push, some update in the database, or some kind of log you want to do. It’s up to the requirement. Not only post-job completion, but we sometimes also need to do something after the Job is submitted to the Flink Cluster.
Since, Flink is executed in a different thread, determining when the job is completed via normal code is not doable. Therefore, Flink has a Listener class, which gets called after Job submission and completion. It’s an interface, so we need to implement and register it with the Flink Environment, either Execution or Stream.
I would be happy to connect with you guys on social media. It’s @coderstea on Twitter, Linkedin, Facebook, Instagram, and YouTube.
Please Subscribe to the newsletter to know about the latest posts from CodersTea.
The requirement to Understand Flink Job Listener
You don’t need to be an expert in it. But you should know the basics about the Flink and its basic inner working. The knowledge about how this Flink Job works will be a great help to clear any doubt you will experience during this post-job task or job listener. To get started with Flink you can see how to install Flink on Mac, Windows, or Linux.
For this example, I will be using Flink 1.12 but it works with any of the Flink versions. I will show you how you can register and how it works. The example would be pretty straightforward, but it can be done in many ways, as I said earlier. Such as a database call, some simple log or calling an API, or push notifications. Anything that you can do via normal code. Flink will simply call the function and it gets executed.
Flink Job Listener Interface
All you need is the implementation of this interface. And write your logic in the overridden functions. The Flink JobListener interface has two methods, one is for submission and another one is for completion. The definition of the interface is as follows.
interface JobListener {
void onJobSubmitted(@Nullable JobClient var1, @Nullable Throwable var2);
void onJobExecuted(@Nullable JobExecutionResult var1, @Nullable Throwable var2);
}
Code language: JavaScript (javascript)
Both methods have two parameters. One common parameter is Throwable. The Throwable parameter is the error thrown by the process if any. Let’s look at both of the methods individually.
Flink JobListener interface: onJobSubmitted
As the name suggests the code inside this function is called whenever a job is submitted for execution, and the onJobSubmitted
method gets called. Suppose you submitted the job and each time a new job is submitted, you need to get notified via Email that Job has been submitted. You can do so in this function. Simply write the code of Email sending or call an API that does that, and you will be notified, as simple as that.
@Override
public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
if (throwable != null) {
log.error("Job failed to submit.", throwable);
// do something or notify about failed job submission
return;
}
log.info("Job submitted successfully");
// do something
// push notification
// or Call an API
// or Insert something in DB
}
Code language: JavaScript (javascript)
Flink Job Listener: onJobExecuted
The onJobExecuted
is the function that gets called when the job is executed, that is when it finishes, either successfully or throws an error. Sometimes we may want to get an Email (similar to the submission example). Or we need to log the details about the job into the DB such as start time, end time, the input taken, and many more. We can do so in this function. Whenever a job is completed it calls the registers JobListener onJobExecuted method. Similar onJobSubmitted
it has Throwable
parameter. It gets set whenever the job throws an error and the job gets completed or terminated abnormally.
@Override
public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
if (throwable != null) {
log.error("Job failed to finish", throwable);
return;
}
log.info("Job completed successfully");
try (
// get DB conneciton
Connection connection = DriverManager.getConnection("url-to-db")
) {
// do something
// select or insert query
} catch (SQLException e) {
log.error("Error in communication DB", e);
}
}
Code language: JavaScript (javascript)
You may notice the completion task triggers before the Flink job completly shuts down. Its because its a different thread, and your code triggers when Job is completed (pipeline). But there are things Flink has to do such as shutting down the cluster (local).
Note
Complete Example of Flink Post Job Tasks
The example below shows a simple Flink batch process. In this, we are using ExecutionEnvironment
and registering a JobListener
. This JobListener
simply logs if the submission or completion is successful or not. I commented that you can do even more, but for simplicity let’s just work with log only.
The main job or pipeline is to take a list of integers, multiply them by 2 and store them in a temporary file, that’s it. The output is generated in the temporary folder of the system with flink-job-listener-
as a prefix.
You can download the code via GitHub. The full project is Flink Tutorial or the Main class of this post
log.info("Starting the JobListener Example Code");
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String outPut = File.createTempFile("1", "2").getParent();
log.info("Output will be store at folder {}", outPut);
List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
env.fromCollection(integers)
.map(x -> x * 2) // multiply numbers by 2
.writeAsText(outPut + "/flink-job-listener-" + UUID.randomUUID());
log.info("Registering the JobListener");
env.registerJobListener(new JobListener() {
@Override
public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
if (throwable != null) {
log.error("Job failed to submit", throwable);
return;
}
log.info("Job submitted successfully");
// do something
// push notification
// or Call an API
// or Insert something in DB
}
@Override
public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
if (throwable != null) {
log.error("Job failed to finish ", throwable);
return;
}
log.info("Job completed successfully");
// do something
// push notification
// or Call an API
// or Insert something in DB
}
});
env.execute();
Code language: JavaScript (javascript)
Conclusion
That’s it for this post. I hope it has helped you determine the flow of your Flink project. How you can do something after your Flink Job is completed? Or How to do something after the Job is submitted. This post is to help Flink users give some help on the topic. You can read my previous post on Flink on How to Select Specific Folders or Files As Input in Flink or the other post under apache flink.
The full project is on GitHub. See you in the next post.
HAKUNA MATATA!!!
I would be happy to connect with you guys on social media. It’s @coderstea on Twitter, Linkedin, Facebook, Instagram, and YouTube.
Please Subscribe to the newsletter to know about the latest posts from CodersTea.