A Flink savepoint is a consistent image of the execution state of a streaming job. Users can take savepoints of a running job and restart the job from them later. This document introduces how the Flink Operator can help you manage savepoints.
First, you can start a job from a savepoint by specifying the fromSavepoint
property in the job spec, for example:
apiVersion: flinkoperator.k8s.io/v1alpha1
kind: FlinkCluster
metadata:
name: flinkjobcluster-sample
spec:
...
job:
fromSavePoint: gs://my-bucket/savepoints/savepoint-123
allowNonRestoredState: false
...
The allowNonRestoredState
controls whether to allow non-restored state, see more info about the property in the
Flink CLI doc.
There are two ways the operator can help take savepoints for your job.
You can let the operator to take savepoints for you automatically by specifying the autoSavepointSeconds
and
savepointsDir
properties. In the following example, the operator will take a savepoint into the
gs://my-bucket/savepoints/
GCS folder every 300 seconds.
apiVersion: flinkoperator.k8s.io/v1alpha1
kind: FlinkCluster
metadata:
name: flinkjobcluster-sample
spec:
...
job:
autoSavepointSeconds: 300
savepointsDir: gs://my-bucket/savepoints/
...
You can check the savepoint status in the job status, for example:
kubectl describe flinkclusters flinkjobcluster-sample
Name: flinkjobcluster-sample
Namespace: default
API Version: flinkoperator.k8s.io/v1alpha1
Kind: FlinkCluster
Spec:
...
Status:
Components:
...
Job:
Name: flinkjobcluster-sample-job
Id: c0c55ce62eba6ab41b6bb9288ef79c12
Savepoint Generation: 2
Savepoint Location: gs://my-bucket/savepoints/savepoint-c0c55c-63ed75ba89c1
Last Savepoint Time: 2019-11-20T01:50:39Z
State: Running
...
For each successful savepoint, the savepoint generation in the job status will increase by 1. The latest savepoint location is also recorded in the job status.
You can also manually take a savepoint for a running job by editing the savepointGeneration
in the job spec to
jobStatus.savepointGeneration + 1
, then apply the updated manifest YAML to the cluster. The operator will detect the
update and trigger a savepoint to savepointsDir
.
For example, if the current savepoint generation in the job status is 2, you can manually trigger a savepoint by editing
the savepointGeneration
in the job spec to 3 as below:
apiVersion: flinkoperator.k8s.io/v1alpha1
kind: FlinkCluster
metadata:
name: flinkjobcluster-sample
spec:
...
job:
savepointsDir: gs://my-bucket/savepoints/
savepointGeneration: 3
...
and applying it to the cluster
kubectl apply -f flinkjobcluster_sample.yaml
after a while you should be able to see the 3rd generation of savepoint in the job status:
kubectl describe flinkclusters flinkjobcluster-sample
Name: flinkjobcluster-sample
Namespace: default
API Version: flinkoperator.k8s.io/v1alpha1
Kind: FlinkCluster
Spec:
...
Status:
Components:
...
Job:
Name: flinkjobcluster-sample-job
Id: c0c55ce62eba6ab41b6bb9288ef79c12
Savepoint Generation: 3
Savepoint Location: gs://my-bucket/savepoints/savepoint-c0c55c-75ed63ba63b2
Last Savepoint Time: 2019-11-20T02:10:19Z
State: Running
...
In some situations, e.g., you didn’t specify savepointsDir
in the FlinkCluster custom resource, you might want to
bypass the operator and take savepoints by running the Flink CLI
or calling the Flink REST API from
your local machine.
In this case, you can first create a port forward from your local machine to the JobManager service UI port (8081 by default):
kubectl port-forward svc/[FLINK_CLUSTER_NAME]-jobmanager 8081:8081
then you need to get the Flink job ID from the job status with kubectl get flinkclusters [FLINK_CLUSTER_NAME]
or by
running the Flink CLI command flink list -m localhost:8081 -a
or by calling the Flink jobs API with
curl http://localhost:8081/jobs
,
then take a savepoint with the Flink CLI:
flink savepoint -m localhost:8081 [JOB_ID] [SAVEPOINT_DIR]
or call the Flink API to trigger an async savepoint operation for the job:
curl -X POST -d '{"target-directory": "[SAVEPOINT_DIR]", "cancel-job": false}' http://localhost:8081/jobs/[JOB_ID]/savepoints
if the request is accept, it will return a trigger request ID which can be used to query the operation status:
curl http://localhost:8081/jobs/[JOB_ID]/savepoints/[TRIGGER_ID]
Long-running jobs may fail for various reasons, in such cases, if you have enabled auto savepoints or manually took
savepoints, you might want to check the latest savepoint location in the job status, then use it as fromSavepoint
to
create a new job cluster. But this is a tedious process, fortunately the operator can help you automate it, all you need
to do is set the restartPolicy
property to FromSavepointOnFailure
in the job spec, for example:
apiVersion: flinkoperator.k8s.io/v1alpha1
kind: FlinkCluster
metadata:
name: flinkjobcluster-sample
spec:
...
job:
autoSavePointSeconds: 300
savepointsDir: gs://my-bucket/savepoints/
restartPolicy: FromSavepointOnFailure
...
Note that
fromSavepoint
property which is the actual savepoint from which the job start or
restarted. It could be different from the one you specified in the job spec in case of restart.Usually you want to store savepoints in remote storages, see this doc on how you can store savepoints in GCS.