Checkpoint & Savepoint

Checkpoint and savepoint is used for store flink job state, they are indispensable for flink job if you want to use flink in production. There're 2 scenarios that you will use checkpoint and savepoint.
  • In fail over, flink will read the state from checkpoint and recover flink job from that state.
  • When your job is finished or killed, and you want to resume your job from some previous state (checkpoint or savepoint)

Enable Checkpoint

By default, checkpoint is not enabled. There're 2 ways to enable checkpoint in Zeppelin
  • Scala API
  • %flink.conf
Now I will show you how to enable checkpoint via these 2 approaches. For more checkpoint setting, you can refer these 2 links

Enable checkpoint via scala api

%flink
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.runtime.state.filesystem.FsStateBackend
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
senv.enableCheckpointing(10 * 1000)
senv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
val chkConfig = senv.getCheckpointConfig
chkConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

Enable checkpoint via %flink.conf

%flink.conf
pipeline.time-characteristic EventTime
execution.checkpointing.interval 10000
execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION
state.backend filesystem
state.checkpoints.dir file:///tmp/flink/checkpoints
Please set execution.checkpointing.externalized-checkpoint-retention as RETAIN_ON_CANCELLATION, otherwise when your flink cluster is terminated, your checkpoint will be lost. It is only retained when you set it as RETAIN_ON_CANCELLATION.

Resume job from checkpoint

There're 2 ways to resume job from checkpoint in Zeppelin:
  • manually resume job from checkpoint
  • resume job automatically

Manually resume job

If you enabled checkpoint, then you can see the checkpoint info in flink web ui.
You can choose one of these checkpoint to be restored from. By default, you can only choose the latest checkpoint, because only the latest one is retained. If you want to retained multiple checkpoints, you can set state.checkpoints.num-retained.
You can resume job by set execution.savepoint.path in %flink.conf
Or you can set execution.savepoint.path explicitly in paragraph.
The difference is that you can only resume one flink job from one single savepoint if using %flink.conf, while you can resume multiple flink jobs from different savepoint via the second approach.
The above approach is not easy, and error-prone, because you need to specify savepoint path explicitly. Zeppelin provides another approach to resume job automatically (no need to specify savepoint path). When you enable checkpoint, Zeppelin would get the latest checkpoint info via flink rest api and store that in note file. The following is a piece of note json where latest_checkpoint_path point to the latest checkpoint path.
"config": {
"template": "\u003ch1\u003eTotal {0} {1}\u003c/h1\u003e",
"jobName": "hello world",
"colWidth": 12.0,
"editorMode": "ace/mode/sql",
"type": "single",
"{1} \u003c/h1\u003e": "{1} \u003c/h1\u003e",
"enabled": true,
"latest_checkpoint_path": "file:/tmp/flink/checkpoints/6d4c697bc3b99b314d5342f2f6bed800/chk-4",
"editorSetting": {
"language": "sql",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
User can resume its job automatically by setting local property resumeFromLatestCheckpoint to be true

Resume job from savepoint

Savepoint is another way to store flink job state. But savepoint is only done when user cancel flink job.
Enable savepoint is pretty easy, just by setting savepointDir , such as following
Then when you cancel this paragraph, flink will do savepoint, and then put that savepoint info into note file. such as following
"config": {
"template": "\u003ch1\u003eTotal {0} {1}\u003c/h1\u003e",
"jobName": "hello world",
"colWidth": 12.0,
"editorMode": "ace/mode/sql",
"savepoint_path": "file:/tmp/flink_savepoint/savepoint-eff863-4610b069bf39",
"type": "single",
"{1} \u003c/h1\u003e": "{1} \u003c/h1\u003e",
"enabled": true,
"editorSetting": {
"language": "sql",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
You can just set resumeFromSavepoint to be true when you restart job next time.

How to handle checkpoint/savepont data missing ?

Sometimes when your checkpoint/savepoint data is missing, you will hit this kind of error
So how to handle this kind of situation ? 2 approaches:
  • If you have other checkpoint or savepoint data, then you can specify them in paragraph directly via execution.savepoint.path
  • If don't have other available checkpoint or savepoint data, then you have to restart job from the very beginning, in this case you need to set resumeFromLatestCheckpoint and resumeFromSavepoint to be false.

Community

Join Zeppelin community to discuss with others
Last modified 1yr ago