flink-on-k8s-operator

Managing savepoints with the Flink Operator

A Flink savepoint is a consistent image of the execution state of a streaming job. Users can take savepoints of a running job and restart the job from them later. This document introduces how the Flink Operator can help you manage savepoints.

Starting a job from a savepoint

First, you can start a job from a savepoint by specifying the fromSavepoint property in the job spec, for example:

apiVersion: flinkoperator.k8s.io/v1alpha1
kind: FlinkCluster
metadata:
  name: flinkjobcluster-sample
spec:
  ...
  job:
    fromSavePoint: gs://my-bucket/savepoints/savepoint-123
    allowNonRestoredState: false
    ...

The allowNonRestoredState controls whether to allow non-restored state, see more info about the property in the Flink CLI doc.

Taking savepoints for a job

There are two ways the operator can help take savepoints for your job.

1. Automatic savepoints

You can let the operator to take savepoints for you automatically by specifying the autoSavepointSeconds and savepointsDir properties. In the following example, the operator will take a savepoint into the gs://my-bucket/savepoints/ GCS folder every 300 seconds.

apiVersion: flinkoperator.k8s.io/v1alpha1
kind: FlinkCluster
metadata:
  name: flinkjobcluster-sample
spec:
  ...
  job:
    autoSavepointSeconds: 300
    savepointsDir: gs://my-bucket/savepoints/
    ...

You can check the savepoint status in the job status, for example:

kubectl describe flinkclusters flinkjobcluster-sample

Name:         flinkjobcluster-sample
Namespace:    default
API Version:  flinkoperator.k8s.io/v1alpha1
Kind:         FlinkCluster
Spec:
  ...
Status:
  Components:
    ...
    Job:
      Name:                       flinkjobcluster-sample-job
      Id:                         c0c55ce62eba6ab41b6bb9288ef79c12
      Savepoint Generation:       2
      Savepoint Location:         gs://my-bucket/savepoints/savepoint-c0c55c-63ed75ba89c1
      Last Savepoint Time:        2019-11-20T01:50:39Z
      State:                      Running
      ...

For each successful savepoint, the savepoint generation in the job status will increase by 1. The latest savepoint location is also recorded in the job status.

2. Taking savepoints by updating the FlinkCluster custom resource

You can also manually take a savepoint for a running job by editing the savepointGeneration in the job spec to jobStatus.savepointGeneration + 1, then apply the updated manifest YAML to the cluster. The operator will detect the update and trigger a savepoint to savepointsDir.

For example, if the current savepoint generation in the job status is 2, you can manually trigger a savepoint by editing the savepointGeneration in the job spec to 3 as below:

apiVersion: flinkoperator.k8s.io/v1alpha1
kind: FlinkCluster
metadata:
  name: flinkjobcluster-sample
spec:
  ...
  job:
    savepointsDir: gs://my-bucket/savepoints/
    savepointGeneration: 3
    ...

and applying it to the cluster

kubectl apply -f flinkjobcluster_sample.yaml

after a while you should be able to see the 3rd generation of savepoint in the job status:

kubectl describe flinkclusters flinkjobcluster-sample

Name:         flinkjobcluster-sample
Namespace:    default
API Version:  flinkoperator.k8s.io/v1alpha1
Kind:         FlinkCluster
Spec:
  ...
Status:
  Components:
    ...
    Job:
      Name:                       flinkjobcluster-sample-job
      Id:                         c0c55ce62eba6ab41b6bb9288ef79c12
      Savepoint Generation:       3
      Savepoint Location:         gs://my-bucket/savepoints/savepoint-c0c55c-75ed63ba63b2
      Last Savepoint Time:        2019-11-20T02:10:19Z
      State:                      Running
      ...

In some situations, e.g., you didn’t specify savepointsDir in the FlinkCluster custom resource, you might want to bypass the operator and take savepoints by running the Flink CLI or calling the Flink REST API from your local machine.

In this case, you can first create a port forward from your local machine to the JobManager service UI port (8081 by default):

kubectl port-forward svc/[FLINK_CLUSTER_NAME]-jobmanager 8081:8081

then you need to get the Flink job ID from the job status with kubectl get flinkclusters [FLINK_CLUSTER_NAME] or by running the Flink CLI command flink list -m localhost:8081 -a or by calling the Flink jobs API with curl http://localhost:8081/jobs,

then take a savepoint with the Flink CLI:

flink savepoint -m localhost:8081 [JOB_ID] [SAVEPOINT_DIR]

or call the Flink API to trigger an async savepoint operation for the job:

curl -X POST -d '{"target-directory": "[SAVEPOINT_DIR]", "cancel-job": false}' http://localhost:8081/jobs/[JOB_ID]/savepoints

if the request is accept, it will return a trigger request ID which can be used to query the operation status:

curl http://localhost:8081/jobs/[JOB_ID]/savepoints/[TRIGGER_ID]

Automatically restarting job from the lastest savepoint

Long-running jobs may fail for various reasons, in such cases, if you have enabled auto savepoints or manually took savepoints, you might want to check the latest savepoint location in the job status, then use it as fromSavepoint to create a new job cluster. But this is a tedious process, fortunately the operator can help you automate it, all you need to do is set the restartPolicy property to FromSavepointOnFailure in the job spec, for example:

apiVersion: flinkoperator.k8s.io/v1alpha1
kind: FlinkCluster
metadata:
  name: flinkjobcluster-sample
spec:
  ...
  job:
    autoSavePointSeconds: 300
    savepointsDir: gs://my-bucket/savepoints/
    restartPolicy: FromSavepointOnFailure
    ...

Note that

Storing savepoints in remote storages

Usually you want to store savepoints in remote storages, see this doc on how you can store savepoints in GCS.