[Big Data] Deploy a Apache Flink session cluster natively on Kubernetes (K8S)

Apache Flink — Stateful Computations over Data Streams

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

Here, we explain Flink’s architecture.

https://ci.apache.org/projects/flink/flink-docs-release-1.12/fig/flink-application-sources-sinks.png

This article describes how to deploy a Flink session cluster natively on Kubernetes.


Flink’s native Kubernetes integration is still experimental. There may be changes in the configuration and CLI flags in latter versions.


Prerequisites

  • Java

    To be able to run Flink, the only requirement is to have a working Java 8 or 11 installation. You can check the correct installation of Java by issuing the following command:

    1
    2
    3
    4
    5
    # CentOS
    # java-11-openjdk
    # yum install java-11-openjdk

    java -version
  • Kubernetes (K8S)

    Kubernetes (K8s) is an open-source system for automating deployment, scaling, and management of containerized applications.

    For more information about installing and using Kubernetes (K8s), see the Kubernetes (K8s) Docs.

  • kubectl

    The Kubernetes command-line tool, kubectl, allows you to run commands against Kubernetes clusters. You can use kubectl to deploy applications, inspect and manage cluster resources, and view logs. Install and Set Up kubectl | Kubernetes - https://kubernetes.io/docs/tasks/tools/install-kubectl/.

Installation

Download

To be able to run Flink, the first step is to download Apache Flink: Downloads - https://flink.apache.org/downloads.html.

1
2
$ tar -xzf flink-1.11.2-bin-scala_2.11.tgz
$ cd flink-1.11.2-bin-scala_2.11

You can relace 2.11 with your prefer Flink version.

Follow these instructions to start a Flink Session within your Kubernetes cluster.

A session will start all required Flink services (JobManager and TaskManagers) so that you can submit programs to the cluster. Note that you can run multiple programs per session.

You can relace cloudolife-example-flink-cluster-id and cloudolife-example-namespace with your prefer values.

Create an Namespace.

1
2
$ kubectl create namespace cloudolife-example-namespace
namespace/cloudolife-example-namespace created

Install Flink Kubernetes (K8S) session.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ ./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=cloudolife-example-flink-cluster-id \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=3600000 \
-Dkubernetes.namespace=cloudolife-example-namespace
-Dkubernetes.config.file=~/.kube/config
2020-12-11 10:41:55,736 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost
2020-12-11 10:41:55,739 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123
2020-12-11 10:41:55,739 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2020-12-11 10:41:55,739 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2020-12-11 10:41:55,739 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2020-12-11 10:41:55,739 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1
2020-12-11 10:41:55,739 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-12-11 10:41:55,870 INFO org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.
2020-12-11 10:41:57,330 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-11 10:41:57,357 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
2020-12-11 10:41:57,357 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
2020-12-11 10:41:57,430 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-12-11 10:41:59,745 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink session cluster cloudolife-example-flink-cluster-id successfully, JobManager Web Interface: http://192.168.88.188:8081

See Pods

1
2
3
$ kubectl get pods -n cloudolife-example-namespace
NAME READY STATUS RESTARTS AGE
cloudolife-example-flink-cluster-id-6bf6d56cfb-p5fwk 1/1 Running 0 14s

Submitting jobs to an existing Session

Use the following command to submit a Flink Job to the Kubernetes cluster.

1
$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=cloudolife-example-flink-cluster-id -Dkubernetes.namespace=cloudolife-example-namespace examples/streaming/WindowJoin.jar

See Pods

1
2
3
$ kubectl get pods -n cloudolife-example-namespace
NAME READY STATUS RESTARTS AGE
cloudolife-example-flink-cluster-id-6bf6d56cfb-p5fwk 1/1 Running 0 14s

Configuration

All the Kubernetes configuration options can be found in Apache Flink 1.11 Documentation: Configuration - Kubernetes - https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#kubernetes.

The follow may be the commmon configuration options.

Key Default Type Description
kubernetes.cluster-id (none) String The cluster-id, which should be no more than 45 characters, is used for identifying a unique Flink cluster. If not set, the client will automatically generate it with a random ID.
kubernetes.config.file (none) String The kubernetes config file will be used to create the client. The default is located at ~/.kube/config
kubernetes.container.image The default value depends on the actually running version. In general it looks like “flink:<FLINK_VERSION>-scala_<SCALA_VERSION>” String Image to use for Flink containers. The specified image must be based upon the same Apache Flink and Scala versions as used by the application. Visit https://hub.docker.com/_/flink?tab=tags for the images provided by the Flink project.
kubernetes.namespace “default” String The namespace that will be used for running the jobmanager and taskmanager pods.

Uninstallation

#TODO

1

References

[1] Apache Flink 1.12 Documentation: Native Kubernetes - https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html

[2] Apache Flink: Stateful Computations over Data Streams - https://flink.apache.org/

[3] Apache Flink: Downloads - https://flink.apache.org/downloads.html

[4] Apache Flink 1.12 Documentation: Configuration - https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#kubernetes

[5] Kubernetes - https://kubernetes.io/

[6] Install and Set Up kubectl | Kubernetes - https://kubernetes.io/docs/tasks/tools/install-kubectl/