当前位置: 首页 > news >正文

flink cdc 应用

SQLServer

1. The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.

遇到了一下问题,多次尝试,最终发现是数据库大小写要一致。

Caused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at io.debezium.schema.HistorizedDatabaseSchema.recover(HistorizedDatabaseSchema.java:38) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext.validateAndLoadDatabaseHistory(SqlServerSourceFetchTaskContext.java:187) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext.configure(SqlServerSourceFetchTaskContext.java:130) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:84) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.submitStreamSplit(IncrementalSourceSplitReader.java:261) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:153) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.20.0.jar:1.20.0]at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-1.20.0.jar:1.20.0]at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-files-1.20.0.jar:1.20.0]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
  CREATE TABLE Member_Extend (ID INT, MemberID INT,  PRIMARY KEY (ID) NOT ENFORCED) WITH ('connector' = 'sqlserver-cdc','hostname' = '192.168.1.3','port' = '1433','username' = 'test','password' = 'test','database-name' = 'CrmExtend','table-name' = 'dbo.Member_Extend');

作业安全启停


show jobs
Flink SQL> show jobs;
+----------------------------------+------------------------------------------------------------------+----------+-------------------------+
|                           job id |                                                         job name |   status |              start time |
+----------------------------------+------------------------------------------------------------------+----------+-------------------------+
| ce5a0e938563cf52317c5b9055ad102f| testjob |  RUNNING | 2024-11-15T03:38:47.919 |+----------------------------------+------------------------------------------------------------------+----------+-------------------------+
4 rows in setSET state.checkpoints.dir='s3://flink/cdc-1.20/savepoints';
stop job 'ce5a0e938563cf52317c5b9055ad102f' with savepoint;Flink SQL> stop job 'ce5a0e938563cf52317c5b9055ad102f' with savepoint;
+--------------------------------------------------------------+
|                                               savepoint path |
+--------------------------------------------------------------+
| s3://flink/cdc-1.20/savepoints/savepoint-ce5a0e-2935055bb307 |
+--------------------------------------------------------------+
1 row in setSET execution.savepoint.path='s3://flink/cdc-1.20/savepoints/savepoiznt-ce5a0e-2935055bb307';  
set 'execution.savepoint.ignore-unclaimed-state' = 'true'; 重新执行原有sqlinsert into flink_user select * from user ;

http://www.mrgr.cn/news/76363.html

相关文章:

  • 3.flask蓝图使用
  • 三维卷积( 3D CNN)
  • windows系统如何将基座大模型私有化部署
  • C语言gdb调试
  • JavaEE之线程池
  • 后端Java开发:第十二天
  • 重建大师跑空三,出现进度条回退是什么原因?
  • 城市轨道交通数据可视化的应用与优势
  • Intelligent Transportation Scheduling
  • IT框架与库:理解它们的不同与共同点
  • 用友YonBIP-R5旗舰版 yonbiplogin 任意文件读取漏洞复现
  • Rust 语言学习笔记(一)
  • uniapp luch-request 使用教程+响应对象创建
  • C++ 20的条件判断语句的增强
  • Docker 容器常见故障排查及处理
  • JavaWeb后端开发知识储备1
  • 大型网站架构演进过程
  • Istio分布式链路监控搭建:Jaeger与Zipkin
  • MT4 编程—DLL编程的整个流程以及案例分析
  • STM32外设应用:深入探索STM32微控制器的强大功能
  • 【AlphaFold3】开源本地的安装及使用
  • 国际专线网络:助力企业全球化转型的关键技术
  • Java的栈与队列以及代码实现
  • Linux下使用miniconda构建python运行环境
  • 随机数
  • 【NOIP提高组】潜伏者