当前位置: 首页 > news >正文

flink SQL实现mysql source sink

接上文:一文说清flink从编码到部署上线
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。

1.代码实现

1.1 EnvUtil实现

EnvUtil用于创建flink的运行环境。

package com.zl.utils;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;/*** EnvUtil* @description:*/
public class EnvUtil {/*** 设置flink执行环境* @param parallelism 并行度*/public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {// System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为rootSystem.setProperty("HADOOP_USER_NAME", "root");Configuration conf = new Configuration();conf.setInteger("rest.port", 1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);if (parallelism >0 ){//设置并行度env.setParallelism(parallelism);} else {env.setParallelism(1);// 默认1}// 添加重启机制env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));// 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000env.enableCheckpointing(600000, CheckpointingMode.EXACTLY_ONCE);//rocksdb状态后端,启用增量checkpointenv.setStateBackend(new EmbeddedRocksDBStateBackend(true));//设置checkpoint路径CheckpointConfig checkpointConfig = env.getCheckpointConfig();// 同一时间只允许一个 checkpoint 进行(默认)checkpointConfig.setMaxConcurrentCheckpoints(1);//最小间隔,10*60*1000=60000checkpointConfig.setMinPauseBetweenCheckpoints(60000);// 取消任务后,checkpoint仍然保存checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//checkpoint容忍失败的次数checkpointConfig.setTolerableCheckpointFailureNumber(5);//checkpoint超时时间 默认10分钟checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));//禁用operator chain(方便排查反压)env.disableOperatorChaining();return env;}public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//设置时区 东八tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));Configuration configuration = tenv.getConfig().getConfiguration();// 开启miniBatchconfiguration.setString("table.exec.mini-batch.enabled", "true");// 批量输出的间隔时间configuration.setString("table.exec.mini-batch.allow-latency", "5 s");// 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条configuration.setString("table.exec.mini-batch.size", "20000");// 开启LocalGlobalconfiguration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");//设置TTL API指定tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));return tenv;}}

1.2 核心代码

package com.zl;import com.zl.utils.EnvUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.concurrent.TimeUnit;public class MysqlExampleSQL {public static void main(String[] args) throws Exception {// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/MysqlExampleSQL");EnvironmentSettings settings = EnvironmentSettings.newInstance().build();StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);/*** 数据库版本:8.0.27* mysql sink*/tenv.executeSql(" CREATE TABLE `sink_products` (" +"id INT," +"name STRING," +"description STRING," +"PRIMARY KEY (`id`) NOT ENFORCED" +")with (" +"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://10.86.45.12:30105/flinktest2'," +"'driver' = 'com.mysql.cj.jdbc.Driver'," +//com.mysql.jdbc.Driver,com.mysql.cj.jdbc.Driver"'username' = '" + "root" + "'," +"'password' = '" + "pwd" + "'," +// 记得修改为实际密码"'table-name' = 'products'" +")");/*** 数据库版本:5.7.20* cdc方式同步业务库数据*/tenv.executeSql("CREATE TABLE `src_products`( " +"id INT," +"name STRING," +"description STRING," +"PRIMARY KEY (`id`)  NOT ENFORCED" +") with (" +"'connector' = 'mysql-cdc', " +"'hostname' = '" + "10.86.37.169" + "', " +"'port' = '" + "3306" + "', " +"'username' = '" + "root" + "', " +"'password' = '" + "pwd" + "', " +// 记得修改为实际密码"'database-name' = '" + "flinktest1" + "', " +"'table-name' = 'products'," +"'debezium.snapshot.mode' = 'initial'" +")");/*** 数据同步*/TableResult tableResult1 = tenv.executeSql("INSERT INTO sink_products " +"SELECT " +"id, " +"name, " +"description " +"FROM src_products");// 通过 TableResult 来获取作业状态tableResult1.print();}
}

1.3 pom.xml

注意修改此处:
在这里插入图片描述

2.web UI

在这里插入图片描述
在这里插入图片描述

3.数据库

flinktest1.products
在这里插入图片描述
flinktest2.products
在这里插入图片描述

4.部署

相关构建、部署,参考:一文说清flink从编码到部署上线
部署脚本:

flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcMysql"  -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.MysqlExampleSQL /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

部署日志:
在这里插入图片描述
yarn:
在这里插入图片描述

5.常见问题

5.1 错误1

开发环境,错误日志:
“Caused by: java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;”

解决:去掉pom.xml中“mysql-connector-java”相关依赖。

5.2 错误2

部署后,错误日志:
①“Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: ‘connector’=‘mysql-cdc’”。
②“Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier ‘jdbc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath”。

解决:
“flink-connector-jdbc_2.11-1.14.0.jar”、“flink-connector-mysql-cdc-2.4.0.jar”放到服务器flink的lib目录,如下图所示:

在这里插入图片描述

5.3 错误3

部署后,错误日志:
“ Exception java.lang.NoClassDefFoundError: com/mysql/cj/jdbc/Driver”。

解决:
“mysql-connector-java-8.0.27.jar”放到服务器flink的lib目录,如下图所示:
在这里插入图片描述

6.代码

完整代码见:https://gitee.com/core815/flink-cdc-mysql


http://www.mrgr.cn/news/80577.html

相关文章:

  • Web网络通信 --- 后端消息推送
  • HarmonyOS学习 --- Mac电脑获取手机UDID
  • 实操给自助触摸一体机接入大模型语音交互
  • 〔 MySQL 〕 C接口使用
  • C语言 内联函数 + 递归函数
  • Qt-Advanced-Docking-System配置及使用、心得
  • 面试题整理2---Nginx 性能优化全方案
  • next.js 存在缓存中毒漏洞(CVE-2024-46982)
  • Qt之修改窗口标题、图标以及自定义标题栏(九)
  • 登陆harbor发现证书是错误的, 那么如何更新harbor的证书呢
  • day4:tomcat—maven-jdk
  • SQL server学习07-查询数据表中的数据(下)
  • 24-12 空间转录组数据分析之分子niche与细胞niche
  • 【信息系统项目管理师-论文真题】2015下半年论文详解(包括解题思路和写作要点)
  • window.getSelection() 获取划线内容并实现 dom 追随功能
  • 实战 | 某院校小程序记录
  • vue3 v-model实例之二,tab标签页的实现
  • 奇怪的知识又增加了,ESP32下的Lisp编程:ULisp--Lisp for microcontrollers
  • RK3588, FFmpeg 拉流 RTSP, mpp 硬解码转RGB
  • 电源的分类
  • 深度学习——现代卷积神经网络(七)
  • VSCode编辑+GCC for ARM交叉编译工具链+CMake构建+OpenOCD调试(基于STM32的标准库/HAL库)
  • FreeBSD安装教程
  • HanLP 2.x 的安装与使用
  • CTFHub ssrf
  • python 配置 oracle instant client