Kubernetes Operator for Apache Flink is built on top of the Kubernetes controller-runtime library. The project structure and boilerplate files are generated with Kubebuilder. Knowledge of controller-runtime and Kubebuilder is required to understand this project.
The Flink custom resource is defined in Go struct FlinkCluster, then Kubebuild generates related Go files and YAML files, e.g. flinkclusters.yaml. The custom logic for reconciling a Flink custom resource is inside of the controllers directory, e.g., flinkcluster_controller.go.
Dockerfile defines the steps of building the Flink Operator image.
Makefile includes various actions you can take to generate code, build the Flink Operator binary, run unit tests, build and push docker image, deploy the Flink Operator to a Kubernetes cluster.
The following dependencies are required to build the Flink Operator binary and run unit tests:
But you don’t have to install them on your local machine, because this project includes a builder Docker image with the dependencies installed. Build and unit test can happen inside of the builder container. This is the recommended way for local development.
But to create the Flink Operator Docker image and deploy it to a Kubernetes cluster, the following dependencies are required on you local machine:
To build the Flink Operator binary and run unit tests, run
make test-in-docker
make test
Build a Docker image for the Flink Operator and then push it to an image registry with
make operator-image push-operator-image IMG=<tag>
Depending on which image registry you want to use, choose a tag accordingly,
e.g., if you are using Google Container Registry
you want to use a tag like gcr.io/<project>/flink-operator:latest
.
After building the image, it automatically saves the image tag in
config/default/manager_image_patch.yaml
, so that when you deploy the Flink
operator later, it knows what image to use.
Assume you have built and pushed the Flink Operator image, then you need to have a running Kubernetes cluster. Verify the cluster info with
kubectl cluster-info
Deploy the Flink Custom Resource Definitions and the Flink Operator to the cluster with
make deploy
After that, you can verify CRD flinkclusters.flinkoperator.k8s.io
has been
created with
kubectl get crds | grep flinkclusters.flinkoperator.k8s.io
You can also view the details of the CRD with
kubectl describe crds/flinkclusters.flinkoperator.k8s.io
The operator runs as a Kubernetes Deployment, you can find out the deployment with
kubectl get deployments -n flink-operator-system
or verify the operator pod is up and running.
kubectl get pods -n flink-operator-system
You can also check the operator logs with
kubectl logs -n flink-operator-system -l app=flink-operator --all-containers
After deploying the Flink CRDs and the Flink Operator to a Kubernetes cluster, the operator serves as a control plane for Flink. In other words, previously the cluster only understands the language of Kubernetes, now it understands the language of Flink. You can then create custom resources representing Flink session clusters or job clusters, the operator will detect the custom resources automatically, then create the actual clusters optionally run jobs, and update status in the custom resources.
Create a sample Flink session cluster custom resource with
kubectl apply -f config/samples/flinkoperator_v1alpha1_flinksessioncluster.yaml
and a sample Flink job cluster custom resource with
kubectl apply -f config/samples/flinkoperator_v1alpha1_flinkjobcluster.yaml
There are several ways to submit jobs to a session cluster.
1) Flink web UI
You can submit jobs through the Flink web UI. See instructions in the Monitoring section on how to setup a proxy to the Flink Web UI.
2) From within the cluster
You can submit jobs through a client Pod in the same cluster, for example:
kubectl run my-job-submitter --image=flink:1.8.1 --generator=run-pod/v1 -- \
/opt/flink/bin/flink run -m flinksessioncluster-sample-jobmanager:8081 \
/opt/flink/examples/batch/WordCount.jar --input /opt/flink/README.txt
3) From outside the cluster
If you have configured the access scope of JobManager as External
or VPC
,
you can submit jobs from a machine which is in the scope, for example:
flink run -m <jobmanager-service-ip>:8081 \
examples/batch/WordCount.jar --input /opt/flink/README.txt
Or if the access scope is Cluster
which is the default, you can use port
forwarding to establish a tunnel from a machine which has access to the
Kubernetes API service (typically your local machine) to the JobManager service
first, for example:
kubectl port-forward service/flinksessioncluster-sample-jobmanager 8081:8081
then submit jobs through the tunnel, for example:
flink run -m localhost:8081 \
examples/batch/WordCount.jar --input ./README.txt
You can check the operator logs with
kubectl logs -n flink-operator-system -l app=flink-operator --all-containers -f --tail=1000
After deploying a Flink cluster with the operator, you can find the cluster custom resource with
kubectl get flinkclusters
check the cluster status with
kubectl describe flinkclusters <CLUSTER-NAME>
In a job cluster, the job is automatically submitted by the operator you can check the Flink job status and logs with
kubectl describe jobs <CLUSTER-NAME>-job
kubectl logs jobs/<CLUSTER-NAME>-job -f --tail=1000
In a session cluster, depending on how you submit the job, you can check the job status and logs accordingly.
You can also access the Flink web UI, REST API and CLI by first creating a port forward from you local machine to the JobManager service UI port (8081 by default).
kubectl port-forward svc/[FLINK_CLUSTER_NAME]-jobmanager 8081:8081
then access the web UI with your browser through the following URL:
http://localhost:8081
call the Flink REST API, e.g., list jobs:
curl http://localhost:8081/jobs
run the Flink CLI, e.g. list jobs:
flink list -m localhost:8081
Undeploy the operator and CRDs from the Kubernetes cluster with
make undeploy