当前位置: 首页 > news >正文

pyfink1.20版本下实现消费kafka中数据并实时计算

1、环境

JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.20

2、执行python-flink脚本

从kafka的demo获取消息,并将其中的a字段存入kafka的test_kafka_topic内,并打印sum(b)的值

from pyflink.table import TableEnvironment, EnvironmentSettingsdef log_processing():# 创建流处理环境env_settings = EnvironmentSettings.in_streaming_mode()t_env = TableEnvironment.create(env_settings)# 设置 Kafka 连接器 JAR 文件的路径# 确保 JAR 文件确实存在于指定路径,并且与 Flink 版本兼容t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///home/data/flink/flink-1.20.0/lib/flink-sql-connector-kafka-3.3.0-1.20.jar")# 定义源表 DDLsource_ddl = """CREATE TABLE source_table(a VARCHAR,b INT  -- 如果 b 字段不重要,可以考虑从源表中移除它) WITH ('connector' = 'kafka','topic' = 'demo','properties.bootstrap.servers' = '192.168.15.130:9092','properties.group.id' = 'test_3','scan.startup.mode' = 'latest-offset','format' = 'json')"""# 定义目标表 DDLsink_ddl = """CREATE TABLE sink_table(a VARCHAR) WITH ('connector' = 'kafka','topic' = 'test_kafka_topic','properties.bootstrap.servers' = '192.168.15.130:9092','format' = 'json')"""# 执行 DDL 语句创建表t_env.execute_sql(source_ddl)#table = t_env.from_path("sql_source")#table.execute().print()table_result  = t_env.execute_sql("select sum(b) sb from source_table")table_result.print()t_env.execute_sql(sink_ddl)# 执行 SQL 查询并将结果插入到目标表# 注意:wait() 方法会阻塞,直到插入操作完成(在流处理中通常是无限的)t_env.sql_query("SELECT a FROM source_table") \.execute_insert("sink_table").wait()  # 考虑是否真的需要 wait()if __name__ == '__main__':log_processing()
python3 KafkaSource.py

3、启动kafka生产者

/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.130:9092 --topic demo
输入模拟数据进行测试
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}

可以看到sum(b)值已输出
在这里插入图片描述

4、启动kafka消费者

查看往test_kafka_topic插入的a字段数据已被消费

/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.130:9092  --from-beginning --topic test_kafka_topic

在这里插入图片描述


http://www.mrgr.cn/news/80003.html

相关文章:

  • 【开源】基于SpringBoot框架的在线视频教育平台 (计算机毕业设计)+万字毕业论文 T027
  • 亚信安全DeepSecurity完成与超云超融合软件兼容性互认
  • 活动预告 |【Part1】Microsoft Azure 在线技术公开课:使用 Azure DevOps 和 GitHub 加速开发
  • 【Flux.jl】 卷积神经网络
  • JPG 转 PDF:免费好用的在线图片转 PDF 工具
  • .NET6 WebApi第1讲:VSCode开发.NET项目、区别.NET5框架【两个框架启动流程详解】
  • 【架构】从 Socket 的角度认识非阻塞模型
  • xshell连接虚拟机,更换网络模式:NAT->桥接模式
  • 网络基础 - TCP/IP 五层模型
  • 爬虫基础知识点
  • 设计模式——Singleton(单例)设计模式
  • 12.12 深度学习-注意力机制
  • Java从入门到工作3 - 框架/工具
  • 如何在项目中使用人大金仓替换mysql
  • 单目深度估计模型 lite-mono 测试
  • 如何使用程序查询域名whois信息?(带PHP/C#示例)
  • all/any函数可以对“条件”打包(Python)
  • 解决VSCode无法识别相对路径的问题
  • 使用 mkcert 工具自签发 https 证书并进行本地受信
  • es 3期 第15节-词项查询与跨度查询实战运用
  • chattts生成的音频与字幕修改完善,每段字幕对应不同颜色的视频,准备下一步插入视频。
  • ArcGIS MultiPatch数据转换Obj数据
  • 优秀前端文章笔记----持续更新(20241211-至今)
  • YashanDB 23.2 YAC 共享集群部署和使用自带YMP迁移工具进行数据迁移,效果很city
  • 麒麟系统+达梦数据库+MybatisPlus+Redis+SpringBoot
  • 使用Windbg排查C++软件安装包安装时被安全软件拦截导致安装堵塞(线程卡住)的问题