当前位置: 首页 > news >正文

Flink operator实现自动扩缩容

官网文档位置:

1.Autoscaler | Apache Flink Kubernetes Operator

2.Configuration | Apache Flink Kubernetes Operator

1.部署K8S集群

可参照我之前的文章k8s集群搭建

2.Helm安装Flink-Operator

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/helm repo update--如果没有这个命名空间就创建
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
--namespace=flink-operator \
--create-namespace \
--set webhook.create=false \
--version 1.10.0

3.安装prometheus

operator通过监控prometheus实现自动扩缩容,过两天调整为helm

可以采用helm安装也可采用yaml,由于helm没安装成功我就采用yaml安装了# prometheus-basic.yaml
apiVersion: v1
kind: Namespace
metadata:name: monitoring
---
apiVersion: v1
kind: ConfigMap
metadata:name: prometheus-confignamespace: monitoring
data:prometheus.yml: |global:scrape_interval: 15sevaluation_interval: 15sscrape_configs:- job_name: 'flink'static_configs:- targets: ['flink-metrics.flink-apps.svc.cluster.local:9249']metrics_path: /metrics
---
apiVersion: apps/v1
kind: Deployment
metadata:name: prometheusnamespace: monitoring
spec:selector:matchLabels:app: prometheusreplicas: 1template:metadata:labels:app: prometheusspec:containers:- name: prometheusimage: prom/prometheus:v2.30.3args:- "--config.file=/etc/prometheus/prometheus.yml"- "--storage.tsdb.path=/prometheus"- "--web.enable-lifecycle"ports:- containerPort: 9090volumeMounts:- name: config-volumemountPath: /etc/prometheus/- name: storage-volumemountPath: /prometheusvolumes:- name: config-volumeconfigMap:name: prometheus-config- name: storage-volumeemptyDir: {}
---
apiVersion: v1
kind: Service
metadata:name: prometheusnamespace: monitoring
spec:type: NodePortports:- port: 9090targetPort: 9090nodePort: 30090selector:app: prometheus

4.制作镜像包

Dockerfile内容,flink-test-1.0-SNAPSHOT.jar为测试代码ARG FLINK_VERSION=1.18.1
FROM flink:${FLINK_VERSION}-scala_2.12
RUN mkdir -p /opt/flink/usrlib
COPY flink-test-1.0-SNAPSHOT.jar /opt/flink/usrlib/
COPY flink-metrics-prometheus-1.18.1.jar  /opt/flink/lib/
COPY flink-statebackend-rocksdb-1.18.1.jar  /opt/flink/lib/
COPY flink-connector-files-1.18.1.jar  /opt/flink/lib/
WORKDIR /opt/flink# 1. 构建 Docker 镜像
# -t: 指定镜像名称和标签
# .: 使用当前目录的 Dockerfile
# --no-cache: 不使用缓存,从头构建
docker build -t zht-flink:1.18.1 . --no-cache# 2. 为本地镜像添加远程仓库标签
# 格式: registry地址/命名空间/镜像名:标签
docker tag zht-flink:1.18.1 registry.cn-hangzhou.aliyuncs.com/dinkyhub/zht-flink:1.18.1# 3. 推送镜像到阿里云镜像仓库
# 将标记的镜像推送到远程仓库
docker push registry.cn-hangzhou.aliyuncs.com/dinkyhub/zht-flink:1.18.1

5.创建命名空间和serviceaccount等

kubectl create namespace  flink-appskubectl -n flink-apps create serviceaccount flink-serviceaccountkubectl -n flink-apps create clusterrolebinding flink-role-binding --clusterrole=edit --serviceaccount=flink-apps:flink-serviceaccountkubectl create secret docker-registry flink-apps-secret \
--docker-server=registry.cn-hangzhou.aliyuncs.com \
--docker-username=xx \
--docker-password=xxxx \
-n flink-appskubectl patch serviceaccount flink-serviceaccount -p '{"imagePullSecrets": [{"name": "flink-apps-secret"}]}' -n  flink-apps

6.任务和扩缩容配置

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:name: flink-autoscaling-sum-jobnamespace: flink-apps
spec:image: registry.cn-hangzhou.aliyuncs.com/dinkyhub/zht-flink:1.18.1flinkVersion: v1_18mode: nativeflinkConfiguration:taskmanager.numberOfTaskSlots: "2"parallelism.default: "2"state.backend: rocksdbstate.checkpoints.dir: file:///flink-data/checkpointsstate.savepoints.dir: file:///flink-data/savepointsmetrics.reporters: prometheusmetrics.reporter.prometheus.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactorymetrics.reporter.prometheus.port: "9249"execution.checkpointing.interval: "10000"execution.checkpointing.mode: "EXACTLY_ONCE"execution.checkpointing.timeout: "600000"execution.checkpointing.min.pause: "10000"execution.checkpointing.max.concurrent.checkpoints: "1"# 启用 Source 指标收集metrics.source.enable: "true"metrics.source.records.in.enable: "true"metrics.source.records.out.enable: "true"metrics.source.records.lag.enable: "true"# 启用所有算子指标metrics.operator.enable: "true"metrics.operator.records.in.enable: "true"metrics.operator.records.out.enable: "true"# 启用任务指标metrics.task.enable: "true"metrics.task.records.in.enable: "true"metrics.task.records.out.enable: "true"# 设置指标收集间隔metrics.fetcher.update-interval: "1000"metrics.latency.interval: "1000"# 启用 IO 指标metrics.io.enable: "true" jobmanager.scheduler: "adaptive"# 自动扩缩容配置job.autoscaler.enabled: "true"job.autoscaler.metrics.window: "20s"job.autoscaler.target.utilization: "0.30"job.autoscaler.scale.up.threshold: "0.05"job.autoscaler.scale.down.threshold: "0.1"job.autoscaler.metrics.memory.average: "1.0"job.autoscaler.metrics.memory.window: "5s"job.autoscaler.stabilization.interval: "5s"job.autoscaler.cooldown.period: "5s"job.autoscaler.scale.up.max.factor: "1.5"job.autoscaler.scale.down.max.factor: "0.5"    # 指标相关配置job.autoscaler.backpressure.enabled: "true"metrics.latency.granularity: "operator"web.backpressure.refresh-interval: "1000"metrics.backpressure.enabled: "true"metrics.backpressure.interval: "1000"metrics.backpressure.timeout: "60000"# 修改 job status metrics 配置metrics.job.status.enable: "STATE"# 新增 CPU 指标配置metrics.system.cpu: "true"metrics.system.cpu.load: "true"metrics.system.resource: "true"serviceAccount: flink-serviceaccountjobManager:resource:memory: "1024m"cpu: 1replicas: 1taskManager:resource:memory: "1024m"cpu: 1job:jarURI: local:///opt/flink/usrlib/flink-test-1.0-SNAPSHOT.jarentryClass: com.zht.sumJobargs: []parallelism: 1upgradeMode: statelesspodTemplate:spec:volumes:- name: checkpoint-datahostPath:path: /data/flink-checkpointstype: DirectoryOrCreatecontainers:- name: flink-main-containervolumeMounts:- name: checkpoint-datamountPath: /flink-datametadata:annotations:prometheus.io/scrape: "true"prometheus.io/port: "9249"---
apiVersion: batch/v1
kind: Job
metadata:name: init-checkpoint-dirnamespace: flink-apps
spec:template:spec:serviceAccountName: flink-serviceaccountcontainers:- name: init-dirimage: busyboxcommand: ["/bin/sh", "-c"]args:- |mkdir -p /data/flink-checkpoints/checkpointsmkdir -p /data/flink-checkpoints/savepointschmod -R 777 /data/flink-checkpointsvolumeMounts:- name: checkpoint-datamountPath: /data/flink-checkpointsresources:limits:cpu: "0.1"memory: "64Mi"requests:cpu: "0.1"memory: "64Mi"volumes:- name: checkpoint-datahostPath:path: /data/flink-checkpointstype: DirectoryOrCreaterestartPolicy: NeverbackoffLimit: 4---
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-uinamespace: flink-apps
spec:type: NodePortports:- name: webuiport: 8081targetPort: 8081nodePort: 30081selector:component: jobmanagerapp: flink-autoscaling-sum-job---
apiVersion: v1
kind: Service
metadata:name: flink-metricsnamespace: flink-apps
spec:type: NodePortports:- name: metricsport: 9249targetPort: 9249nodePort: 30249selector:component: taskmanagerapp: flink-autoscaling-sum-job
注意点:1.添加 flink-metrics-prometheus-1.18.1.jar 不然启动不了metrics
2.注意先排查metrics是否启用成功。curl http://localhost:9249/metrics查看是否有值
3.之后查看prometheus页面的target是否有flink metrics
4.yaml或者flink任务配置好启用监控的配置


http://www.mrgr.cn/news/82243.html

相关文章:

  • MySQL 锁那些事
  • ELK 使用教程采集系统日志 Elasticsearch、Logstash、Kibana
  • TLS(传输层安全,Transport Layer Security)是用于在网络上提供通信安全的一种加密协议。
  • 从零用java实现 小红书 springboot vue uniapp (7)im 在线聊天功能 关注功能
  • MySQL的锁机制及排查锁问题
  • uniapp本地加载腾讯X5浏览器内核插件
  • AWVS安装使用教程
  • SpringCloud源码-openFeign
  • 关于无线AP信道调整的优化(锐捷)
  • Linux的源码在Windows下解压时提示文件名字相同(重名)的原因及解决办法
  • Promise实现原理解析,及实现方法。
  • rouyi(前后端分离版本)配置
  • msposd 开源代码之树莓派3B+ Bookworm部署
  • 笔上云世界微服务版
  • 单片机--51- RAM
  • 【蓝桥杯研究生组】第15届Java试题答案整理
  • C++ —— 字符型和字符串
  • 作业:C高级:Day4
  • Ceph 手动部署(CentOS9)
  • Coding Our First Neurons
  • javaEE-多线程进阶-JUC的常见类
  • 设计模式 创建型 工厂模式(Factory Pattern)与 常见技术框架应用 解析
  • 升级Cypress到10.8.0
  • 【老白学 Java】Border 布局管理器
  • C++并发:在线程间共享数据
  • spring boot 异步线程池的使用