Checkpoint & Savepoint
Checkpoint and savepoint is used for store flink job state, they are indispensable for flink job if you want to use flink in production. There're 2 scenarios that you will use checkpoint and savepoint.
In fail over, flink will read the state from checkpoint and recover flink job from that state.
When your job is finished or killed, and you want to resume your job from some previous state (checkpoint or savepoint)
Enable Checkpoint
By default, checkpoint is not enabled. There're 2 ways to enable checkpoint in Zeppelin
Scala API
%flink.conf
Now I will show you how to enable checkpoint via these 2 approaches. For more checkpoint setting, you can refer these 2 links
Enable checkpoint via scala api
Enable checkpoint via %flink.conf
Please set execution.checkpointing.externalized-checkpoint-retention
as RETAIN_ON_CANCELLATION
, otherwise when your flink cluster is terminated, your checkpoint will be lost. It is only retained when you set it as RETAIN_ON_CANCELLATION
.
Resume job from checkpoint
There're 2 ways to resume job from checkpoint in Zeppelin:
manually resume job from checkpoint
resume job automatically
Manually resume job
If you enabled checkpoint, then you can see the checkpoint info in flink web ui.
You can choose one of these checkpoint to be restored from. By default, you can only choose the latest checkpoint, because only the latest one is retained. If you want to retained multiple checkpoints, you can set state.checkpoints.num-retained
.
You can resume job by set execution.savepoint.path
in %flink.conf
Or you can set execution.savepoint.path
explicitly in paragraph.
The difference is that you can only resume one flink job from one single savepoint if using %flink.conf
, while you can resume multiple flink jobs from different savepoint via the second approach.
Automatically resume flink job
The above approach is not easy, and error-prone, because you need to specify savepoint path explicitly. Zeppelin provides another approach to resume job automatically (no need to specify savepoint path). When you enable checkpoint, Zeppelin would get the latest checkpoint info via flink rest api and store that in note file. The following is a piece of note json where latest_checkpoint_path
point to the latest checkpoint path.
User can resume its job automatically by setting local property resumeFromLatestCheckpoint
to be true
Resume job from savepoint
Savepoint is another way to store flink job state. But savepoint is only done when user cancel flink job.
Enable savepoint is pretty easy, just by setting savepointDir , such as following
Then when you cancel this paragraph, flink will do savepoint, and then put that savepoint info into note file. such as following
You can just set resumeFromSavepoint
to be true when you restart job next time.
How to handle checkpoint/savepont data missing ?
Sometimes when your checkpoint/savepoint data is missing, you will hit this kind of error
So how to handle this kind of situation ? 2 approaches:
If you have other checkpoint or savepoint data, then you can specify them in paragraph directly via
execution.savepoint.path
If don't have other available checkpoint or savepoint data, then you have to restart job from the very beginning, in this case you need to set
resumeFromLatestCheckpoint
andresumeFromSavepoint
to be false.
Community
Join Zeppelin community to discuss with others
Last updated
Was this helpful?