当前位置: 首页 > news >正文

Flink CDC Pipeline mysql to doris

版本兼容

flink 与 flink-cdc版本兼容
在这里插入图片描述

flink 与doris版本兼容
在这里插入图片描述

运行同步程序

最终在 flink-1.20.1flink-cdc-3.1.1 跑通测试

配置yaml文件

[root@chb1 flink-cdc-3.1.1]# cat mysql2doris.yaml 
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: chb1port: 3306username: rootpassword: 123456tables: test.\.*server-id: 5400-5404sink:type: dorisfenodes: chb1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2
[root@chb1 flink-cdc-3.1.1]# 

提交任务

[root@chb1 flink-cdc-3.1.1]# ./bin/flink-cdc.sh mysql2doris.yaml 
Pipeline has been submitted to cluster.
Job ID: 4a71588006d5b5cf25f10101d613cb8b
Job Description: Sync MySQL Database to Doris

下面问题都是版本不兼容问题, 按照各种方法修改都没有左右,最后通过将 flink-1.19.1 改为 flink-1.20.1 问题解决

1、报错.NoSuchMethodError: io.debezium.config.Field.withType(Lorg/apache/kafka/common/config/ConfigDef$Type;)Lio/debezium/config/Field;
Exception in thread "main" java.lang.NoSuchMethodError: io.debezium.config.Field.withType(Lorg/apache/kafka/common/config/ConfigDef$Type;)Lio/debezium/config/Field;at io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig.<clinit>(HistorizedRelationalDatabaseConnectorConfig.java:48)at org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig.<init>(MySqlSourceConfig.java:120)at org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory.createConfig(MySqlSourceConfigFactory.java:356)at org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory.createConfig(MySqlSourceConfigFactory.java:296)at org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory.createDataSource(MySqlDataSourceFactory.java:170)at org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.createDataSource(DataSourceTranslator.java:91)at org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.translate(DataSourceTranslator.java:48)at org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.compose(FlinkPipelineComposer.java:103)at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:89)at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:69)

flink lib 下导入

debezium-connector-mysql-2.5.0.Final.jar
debezium-core-2.5.0.Final.jar
2、报错 Exception in thread "main" java.lang.NoClassDefFoundError: io/debezium/spi/topic/TopicNamingStrategy

flink lib 引入 debezium-api-2.5.0.Final.jar

3、Exception in thread "main" java.lang.NoSuchFieldError: DATABASE_HISTORY
Exception in thread "main" java.lang.NoSuchFieldError: DATABASE_HISTORYat io.debezium.connector.mysql.MySqlConnection$MySqlConnectionConfiguration.lambda$new$0(MySqlConnection.java:590)at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:176)at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1603)at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)at io.debezium.config.Configuration$6.keys(Configuration.java:1659)at io.debezium.config.Configuration.asProperties(Configuration.java:1824)at io.debezium.config.Configuration.asProperties(Configuration.java:1812)at io.debezium.config.Configuration.copy(Configuration.java:755)at io.debezium.config.Configuration.edit(Configuration.java:988)at io.debezium.connector.mysql.MySqlConnection$MySqlConnectionConfiguration.<init>(MySqlConnection.java:596)at org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection(DebeziumUtils.java:91)at org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection(DebeziumUtils.java:84)at org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils.listTables(MySqlSchemaUtils.java:60)at org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory.getTableList(MySqlDataSourceFactory.java:276)at org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory.createDataSource(MySqlDataSourceFactory.java:170)at org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.createDataSource(DataSourceTranslator.java:91)at org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.translate(DataSourceTranslator.java:48)at org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.compose(FlinkPipelineComposer.java:103)at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:89)at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:69)

参考: mysql-cdc sql-client


http://www.mrgr.cn/news/97089.html

相关文章:

  • 【小沐杂货铺】基于Three.JS绘制三维数字地球Earth(GIS 、WebGL、vue、react)
  • wsl编译openwrt24.10.0
  • 嵌入式开发中栈溢出的处理方法
  • 【统计方法】LASSO筛变量
  • Apache httpclient okhttp(2)
  • CExercise_05_1函数_2海伦公式求三角形面积
  • 大模型学习四:‌DeepSeek Janus-Pro 多模态理解和生成模型 本地部署与调用指南
  • Leetcode 437 -- dfs | 前缀和
  • centos8上实现lvs集群负载均衡dr模式
  • swift-oc和swift block和代理
  • Dive into Deep Learning - 2.4. Calculus (微积分)
  • 如何实现浏览器中的报表打印
  • yolov12检测 聚类轨迹运动速度
  • 【小沐杂货铺】基于Three.JS绘制太阳系Solar System(GIS 、WebGL、vue、react)
  • Vanna:用检索增强生成(RAG)技术革新自然语言转SQL
  • #SVA语法滴水穿石# (002)关于 |-> + ##[min:max] 的联合理解
  • JAVA线程安全
  • orangepi zero烧录及SSH联网
  • c++项目 网络聊天服务器 实现
  • Neo4j操作数据库(Cypher语法)