Flink CDC Pipeline mysql to doris
版本兼容
flink 与 flink-cdc版本兼容
flink 与doris版本兼容
运行同步程序
最终在 flink-1.20.1
与 flink-cdc-3.1.1
跑通测试
配置yaml文件
[root@chb1 flink-cdc-3.1.1]# cat mysql2doris.yaml
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: chb1port: 3306username: rootpassword: 123456tables: test.\.*server-id: 5400-5404sink:type: dorisfenodes: chb1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2
[root@chb1 flink-cdc-3.1.1]#
提交任务
[root@chb1 flink-cdc-3.1.1]# ./bin/flink-cdc.sh mysql2doris.yaml
Pipeline has been submitted to cluster.
Job ID: 4a71588006d5b5cf25f10101d613cb8b
Job Description: Sync MySQL Database to Doris
下面问题都是版本不兼容问题, 按照各种方法修改都没有左右,最后通过将 flink-1.19.1
改为 flink-1.20.1
问题解决
1、报错.NoSuchMethodError: io.debezium.config.Field.withType(Lorg/apache/kafka/common/config/ConfigDef$Type;)Lio/debezium/config/Field;
Exception in thread "main" java.lang.NoSuchMethodError: io.debezium.config.Field.withType(Lorg/apache/kafka/common/config/ConfigDef$Type;)Lio/debezium/config/Field;at io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig.<clinit>(HistorizedRelationalDatabaseConnectorConfig.java:48)at org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig.<init>(MySqlSourceConfig.java:120)at org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory.createConfig(MySqlSourceConfigFactory.java:356)at org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory.createConfig(MySqlSourceConfigFactory.java:296)at org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory.createDataSource(MySqlDataSourceFactory.java:170)at org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.createDataSource(DataSourceTranslator.java:91)at org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.translate(DataSourceTranslator.java:48)at org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.compose(FlinkPipelineComposer.java:103)at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:89)at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:69)
flink lib 下导入
debezium-connector-mysql-2.5.0.Final.jar
debezium-core-2.5.0.Final.jar
2、报错 Exception in thread "main" java.lang.NoClassDefFoundError: io/debezium/spi/topic/TopicNamingStrategy
flink lib 引入 debezium-api-2.5.0.Final.jar
3、Exception in thread "main" java.lang.NoSuchFieldError: DATABASE_HISTORY
Exception in thread "main" java.lang.NoSuchFieldError: DATABASE_HISTORYat io.debezium.connector.mysql.MySqlConnection$MySqlConnectionConfiguration.lambda$new$0(MySqlConnection.java:590)at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:176)at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1603)at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)at io.debezium.config.Configuration$6.keys(Configuration.java:1659)at io.debezium.config.Configuration.asProperties(Configuration.java:1824)at io.debezium.config.Configuration.asProperties(Configuration.java:1812)at io.debezium.config.Configuration.copy(Configuration.java:755)at io.debezium.config.Configuration.edit(Configuration.java:988)at io.debezium.connector.mysql.MySqlConnection$MySqlConnectionConfiguration.<init>(MySqlConnection.java:596)at org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection(DebeziumUtils.java:91)at org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection(DebeziumUtils.java:84)at org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils.listTables(MySqlSchemaUtils.java:60)at org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory.getTableList(MySqlDataSourceFactory.java:276)at org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory.createDataSource(MySqlDataSourceFactory.java:170)at org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.createDataSource(DataSourceTranslator.java:91)at org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.translate(DataSourceTranslator.java:48)at org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.compose(FlinkPipelineComposer.java:103)at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:89)at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:69)
参考: mysql-cdc sql-client