Skip to content

Commit 0ddda1b

Browse files
committed
[FLINK-22495][docs] Add Reactive Mode section to K8s
1 parent 26da1bf commit 0ddda1b

File tree

2 files changed

+160
-2
lines changed
  • docs
    • content.zh/docs/deployment/resource-providers/standalone
    • content/docs/deployment/resource-providers/standalone

2 files changed

+160
-2
lines changed

docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ You can tear down the cluster using the following commands:
9595

9696
### Deploy Application Cluster
9797

98-
A *Flink Application cluster* is a dedicated cluster which runs a single application.
98+
A *Flink Application cluster* is a dedicated cluster which runs a single application, which needs to be available at deployment time.
9999

100100
A basic *Flink Application cluster* deployment in Kubernetes has three components:
101101

@@ -233,6 +233,15 @@ You can access the queryable state of TaskManager if you create a `NodePort` ser
233233
1. Run `kubectl create -f taskmanager-query-state-service.yaml` to create the `NodePort` service for the `taskmanager` pod. The example of `taskmanager-query-state-service.yaml` can be found in [appendix](#common-cluster-resource-definitions).
234234
2. Run `kubectl get svc flink-taskmanager-query-state` to get the `<node-port>` of this service. Then you can create the [QueryableStateClient(&lt;public-node-ip&gt;, &lt;node-port&gt;]({{< ref "docs/dev/datastream/fault-tolerance/queryable_state" >}}#querying-state) to submit state queries.
235235

236+
### Using Standalone Kubernetes with Reactive Mode
237+
238+
[Reactive Mode]({{< ref "docs/deployment/elastic_scaling" >}}#reactive-mode) allows to run Flink in a mode, where the *Application Cluster* is always adjusting the job parallelism to the available resources. In combination with Kubernetes, the replica count of the TaskManager deployment determines the available resources. Increasing the replica count will scale up the job, reducing it will trigger a scale down. This can also be done automatically by using a [Horizontal Pod Autoscaler](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/).
239+
240+
To use Reactive Mode on Kubernetes, follow the same steps as for [deploying a job using an Application Cluster](#deploy-application-cluster). But instead of `flink-configuration-configmap.yaml` use this config map: `flink-reactive-mode-configuration-configmap.yaml`. It contains the `scheduler-mode: reactive` setting for Flink.
241+
242+
Once you have deployed the *Application Cluster*, you can scale your job up or down by changing the replica count in the `flink-taskmanager` deployment.
243+
244+
236245
{{< top >}}
237246

238247
## Appendix
@@ -305,6 +314,76 @@ data:
305314
logger.netty.level = OFF
306315
```
307316

317+
318+
`flink-reactive-mode-configuration-configmap.yaml`
319+
320+
```yaml
321+
apiVersion: v1
322+
kind: ConfigMap
323+
metadata:
324+
name: flink-config
325+
labels:
326+
app: flink
327+
data:
328+
flink-conf.yaml: |+
329+
jobmanager.rpc.address: flink-jobmanager
330+
taskmanager.numberOfTaskSlots: 2
331+
blob.server.port: 6124
332+
jobmanager.rpc.port: 6123
333+
taskmanager.rpc.port: 6122
334+
queryable-state.proxy.ports: 6125
335+
jobmanager.memory.process.size: 1600m
336+
taskmanager.memory.process.size: 1728m
337+
parallelism.default: 2
338+
scheduler-mode: reactive
339+
execution.checkpointing.interval: 10s
340+
log4j-console.properties: |+
341+
# This affects logging for both user code and Flink
342+
rootLogger.level = INFO
343+
rootLogger.appenderRef.console.ref = ConsoleAppender
344+
rootLogger.appenderRef.rolling.ref = RollingFileAppender
345+
346+
# Uncomment this if you want to _only_ change Flink's logging
347+
#logger.flink.name = org.apache.flink
348+
#logger.flink.level = INFO
349+
350+
# The following lines keep the log level of common libraries/connectors on
351+
# log level INFO. The root logger does not override this. You have to manually
352+
# change the log levels here.
353+
logger.akka.name = akka
354+
logger.akka.level = INFO
355+
logger.kafka.name= org.apache.kafka
356+
logger.kafka.level = INFO
357+
logger.hadoop.name = org.apache.hadoop
358+
logger.hadoop.level = INFO
359+
logger.zookeeper.name = org.apache.zookeeper
360+
logger.zookeeper.level = INFO
361+
362+
# Log all infos to the console
363+
appender.console.name = ConsoleAppender
364+
appender.console.type = CONSOLE
365+
appender.console.layout.type = PatternLayout
366+
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
367+
368+
# Log all infos in the given rolling file
369+
appender.rolling.name = RollingFileAppender
370+
appender.rolling.type = RollingFile
371+
appender.rolling.append = false
372+
appender.rolling.fileName = ${sys:log.file}
373+
appender.rolling.filePattern = ${sys:log.file}.%i
374+
appender.rolling.layout.type = PatternLayout
375+
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
376+
appender.rolling.policies.type = Policies
377+
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
378+
appender.rolling.policies.size.size=100MB
379+
appender.rolling.strategy.type = DefaultRolloverStrategy
380+
appender.rolling.strategy.max = 10
381+
382+
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
383+
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
384+
logger.netty.level = OFF
385+
```
386+
308387
`jobmanager-service.yaml` Optional service, which is only necessary for non-HA mode.
309388
```yaml
310389
apiVersion: v1

docs/content/docs/deployment/resource-providers/standalone/kubernetes.md

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ You can tear down the cluster using the following commands:
9595

9696
### Deploy Application Cluster
9797

98-
A *Flink Application cluster* is a dedicated cluster which runs a single application.
98+
A *Flink Application cluster* is a dedicated cluster which runs a single application, which needs to be available at deployment time.
9999

100100
A basic *Flink Application cluster* deployment in Kubernetes has three components:
101101

@@ -233,6 +233,15 @@ You can access the queryable state of TaskManager if you create a `NodePort` ser
233233
1. Run `kubectl create -f taskmanager-query-state-service.yaml` to create the `NodePort` service for the `taskmanager` pod. The example of `taskmanager-query-state-service.yaml` can be found in [appendix](#common-cluster-resource-definitions).
234234
2. Run `kubectl get svc flink-taskmanager-query-state` to get the `<node-port>` of this service. Then you can create the [QueryableStateClient(&lt;public-node-ip&gt;, &lt;node-port&gt;]({{< ref "docs/dev/datastream/fault-tolerance/queryable_state" >}}#querying-state) to submit state queries.
235235

236+
### Using Standalone Kubernetes with Reactive Mode
237+
238+
[Reactive Mode]({{< ref "docs/deployment/elastic_scaling" >}}#reactive-mode) allows to run Flink in a mode, where the *Application Cluster* is always adjusting the job parallelism to the available resources. In combination with Kubernetes, the replica count of the TaskManager deployment determines the available resources. Increasing the replica count will scale up the job, reducing it will trigger a scale down. This can also be done automatically by using a [Horizontal Pod Autoscaler](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/).
239+
240+
To use Reactive Mode on Kubernetes, follow the same steps as for [deploying a job using an Application Cluster](#deploy-application-cluster). But instead of `flink-configuration-configmap.yaml` use this config map: `flink-reactive-mode-configuration-configmap.yaml`. It contains the `scheduler-mode: reactive` setting for Flink.
241+
242+
Once you have deployed the *Application Cluster*, you can scale your job up or down by changing the replica count in the `flink-taskmanager` deployment.
243+
244+
236245
{{< top >}}
237246

238247
## Appendix
@@ -305,6 +314,76 @@ data:
305314
logger.netty.level = OFF
306315
```
307316

317+
318+
`flink-reactive-mode-configuration-configmap.yaml`
319+
320+
```yaml
321+
apiVersion: v1
322+
kind: ConfigMap
323+
metadata:
324+
name: flink-config
325+
labels:
326+
app: flink
327+
data:
328+
flink-conf.yaml: |+
329+
jobmanager.rpc.address: flink-jobmanager
330+
taskmanager.numberOfTaskSlots: 2
331+
blob.server.port: 6124
332+
jobmanager.rpc.port: 6123
333+
taskmanager.rpc.port: 6122
334+
queryable-state.proxy.ports: 6125
335+
jobmanager.memory.process.size: 1600m
336+
taskmanager.memory.process.size: 1728m
337+
parallelism.default: 2
338+
scheduler-mode: reactive
339+
execution.checkpointing.interval: 10s
340+
log4j-console.properties: |+
341+
# This affects logging for both user code and Flink
342+
rootLogger.level = INFO
343+
rootLogger.appenderRef.console.ref = ConsoleAppender
344+
rootLogger.appenderRef.rolling.ref = RollingFileAppender
345+
346+
# Uncomment this if you want to _only_ change Flink's logging
347+
#logger.flink.name = org.apache.flink
348+
#logger.flink.level = INFO
349+
350+
# The following lines keep the log level of common libraries/connectors on
351+
# log level INFO. The root logger does not override this. You have to manually
352+
# change the log levels here.
353+
logger.akka.name = akka
354+
logger.akka.level = INFO
355+
logger.kafka.name= org.apache.kafka
356+
logger.kafka.level = INFO
357+
logger.hadoop.name = org.apache.hadoop
358+
logger.hadoop.level = INFO
359+
logger.zookeeper.name = org.apache.zookeeper
360+
logger.zookeeper.level = INFO
361+
362+
# Log all infos to the console
363+
appender.console.name = ConsoleAppender
364+
appender.console.type = CONSOLE
365+
appender.console.layout.type = PatternLayout
366+
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
367+
368+
# Log all infos in the given rolling file
369+
appender.rolling.name = RollingFileAppender
370+
appender.rolling.type = RollingFile
371+
appender.rolling.append = false
372+
appender.rolling.fileName = ${sys:log.file}
373+
appender.rolling.filePattern = ${sys:log.file}.%i
374+
appender.rolling.layout.type = PatternLayout
375+
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
376+
appender.rolling.policies.type = Policies
377+
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
378+
appender.rolling.policies.size.size=100MB
379+
appender.rolling.strategy.type = DefaultRolloverStrategy
380+
appender.rolling.strategy.max = 10
381+
382+
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
383+
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
384+
logger.netty.level = OFF
385+
```
386+
308387
`jobmanager-service.yaml` Optional service, which is only necessary for non-HA mode.
309388
```yaml
310389
apiVersion: v1

0 commit comments

Comments
 (0)