当前位置: 首页 > news >正文

Structured-Streaming集成Kafka

一、上下文

《Structured-Streaming初识》博客中已经初步认识了Structured-Streaming,Kafka作为目前最流行的一个分布式的实时流消息系统,是众多实时流处理框架的最优数据源之一。下面我们就跟着官方例子来看看Structured-Streaming是如何集成Kafka的?

二、官方例子

这里我们先把官方例子贴出来,所属包路径为:org.apache.spark.examples.sql.streaming

该示例使用Kafka中一个或多个Topic的消息并进行字数统计。

object StructuredKafkaWordCount {def main(args: Array[String]): Unit = {if (args.length < 3) {System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +"<subscribe-type> <topics> [<checkpoint-location>]")System.exit(1)}val Array(bootstrapServers, subscribeType, topics, _*) = argsval checkpointLocation =if (args.length > 3) args(3) else "/tmp/temporary-" + UUID.randomUUID.toStringval spark = SparkSession.builder.appName("StructuredKafkaWordCount").getOrCreate()import spark.implicits._// 创建表示来自kafka的输入行流的DataSetval lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option(subscribeType, topics).load().selectExpr("CAST(value AS STRING)").as[String]// 运行 word countval wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()// 开始运行将运行计数打印到控制台的查询val query = wordCounts.writeStream.outputMode("complete").format("console").option("checkpointLocation", checkpointLocation).start()query.awaitTermination()}}

三、分析

1、参数解释

运行该官方示例需要3或4个参数,分别是

  • Kafka的bootstrap-servers
  • 订阅Kafka TopicPartition 的类型
  • 订阅Kafka的Topic
  • checkpointLocation(不是必须的)

bootstrap-servers用于连接Kafka集群。

订阅类型有3种,且只能选择1种:

  1. assign:手动指定分区消费,需要自己管理分区的分配和再平衡。需要指定一个Json字符串,例如:{"topicA":[0,1],"topicB":[2,4]}
  2. subscribe:订阅一个或多个topic进行消费(逗号分割),Kafka会自动处理分区的分配和再平衡。
  3. subscribePattern:基于正则的topic订阅方式,但可能增加一些复杂性和性能开销。

Topic的指定根据订阅类型的变化而变化。

checkpointLocation如果不指定默认会在/tmp下存放。

2、将从Kafka订阅的数据做成一个DataSet

1、构建DataStreamReader

用于从外部存储系统(如文件系统、键值存储等)加载流式“数据集”的接口。使用`SparkSession.readStream`访问此内容。

2、指定输入源格式

默认的输入源格式是parquet,这里指定的是 kafka,输入源格式是DataStreamReader中的一个属性。

private var source: String = sparkSession.sessionState.conf.defaultDataSourceName
  val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default").doc("The default data source to use in input/output.").version("1.3.0").stringConf.createWithDefault("parquet")

3、用输入的3个参数对DataStreamReader添加选项

DataStreamReader中维护了一个Map来接收这些选项,比如:

kafka.bootstrap.servers -> cdh1:9092

assign -> {"topicA":[0,1],"topicB":[2,4]}

subscribe -> topicA,topicB

subscribePattern -> topicP*

private var extraOptions = CaseInsensitiveMap[String](Map.empty)

4、加载输入流数据为DataFrame

final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {def load(): DataFrame = loadInternal(None)private def loadInternal(path: Option[String]): DataFrame = {//*******//根据输入源格式获取相应的输入源提供者//这里的 source 为 kafka ,因此会返回KafkaSourceProvider//它是 所有Kafka readers 和 writers 的提供者类//此外还有ConsoleSinkProvider、JdbcRelationProvider、TextSocketSourceProvider等等val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).getConstructor().newInstance()// 我们需要生成V1数据源,以便将其作为匀场传递给V2关系。目前我们无法确定是否真的要使用V2,因为我们不知道编写者,也不知道查询是否是连续的。val v1DataSource = DataSource(sparkSession,userSpecifiedSchema = userSpecifiedSchema,className = source,options = optionsWithPath.originalMap)val v1Relation = ds match {case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))case _ => None}ds match {//******Dataset.ofRows(sparkSession,StreamingRelationV2( //用于将[[表]]链接到流式[[LogicalPlan]]。Some(provider), source, table, dsOptions,table.schema.toAttributes, None, None, v1Relation))//******}}}

并将表中的数据设置成STRING

3、WordCount统计

在第2步的基础上进行数据处理:

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

4、开始运行并将结果打印到控制台

val query = wordCounts.writeStream.outputMode("complete").format("console").option("checkpointLocation", checkpointLocation).start()

writeStream是用于将流式数据集的内容保存到外部存储的接口。将返回一个DataStreamWriter

outputMode是指定如何将流式DataFrame/Dataset的数据写入流式接收器。

  1. append:只有流式DataFrame/Dataset中的新行才会写入接收器
  2. complete:每次有更新时,流式DataFrame/Dataset中的所有行都将写入接收器
  3. update:每次有更新时,只有流式DataFrame/Dataset中更新的行才会写入接收器。如果查询不包含聚合,则相当于“append”模式

format是指定外部存储,这里的取值有6种:memory、foreach、foreachBatch、console、table、noop。

四、运行

1、创建Topic

kafka-topics --create --topic structured-streaming-wc --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

2、启动程序

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/

bin/run-example sql.streaming.StructuredKafkaWordCount cdh1:9092,cdh2:9092 subscribe structured-streaming-wc

3、向topic推送数据

kafka-console-producer --topic structured-streaming-wc --broker-list cdh1:9092,cdh2:9092,cdh3:9092

4、控制台查看结果

 他和sparksql一样默认的分区为200个,如果数据量很小,速度非常慢。需要根据数据量来设置自己的分区数。


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)

  • 广州
  • https://ais.cn/u/fi2yym

第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)

  • 青岛
  • https://ais.cn/u/nuQr6f

第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

  • 苏州
  • https://ais.cn/u/eYnmQr

第三届通信网络与机器学习国际学术会议(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

http://www.mrgr.cn/news/82320.html

相关文章:

  • 深入刨析数据结构之排序(下)
  • Postgresql 命令还原数据库
  • 《计算机组成及汇编语言原理》阅读笔记:p160-p176
  • 慧集通iPaaS集成平台低代码培训-基础篇
  • lec5-传输层原理与技术
  • OpenCV相机标定与3D重建(41)从 3D 物点和它们对应的 2D 图像点估算初始相机内参矩阵函数initCameraMatrix2D()的使用
  • LinuxC高级day5
  • CTFshow—远程命令执行
  • Kettle迁移至Oracle的空字符串和NULL的问题处理,大坑!
  • 国产编辑器EverEdit - 常用资源汇总
  • ubuntu开启root用户
  • ruoyi开发学习
  • 【计组不挂科】计算机组成综合习题库(选择题207道&判断题93道&填空题143道)(含答案与解析)
  • 数据挖掘——聚类
  • 【生活】冬天如何选口罩(医用口罩,N95, KN95还是KP95?带不带呼吸阀门?带不带活性炭?)
  • 嵌入式Linux驱动开发的基本知识(驱动程序的本质、常见的设备类型、设备号的本质理解、设备实例的注册过程)
  • Geotrust SSL证书
  • PHP入门笔记汇总
  • CG顶会论文阅读|《科技论文写作》硕士课程报告
  • 51c自动驾驶~合集44
  • Javascript算法——回溯算法(组合问题)
  • 25年1月更新。Windows 上搭建 Python 开发环境:Python + PyCharm 安装全攻略(文中有安装包不用官网下载)
  • 用Tkinter制作一个用于合并PDF文件的小程序
  • linux安装git
  • 【Rust自学】9.4. 什么时候该使用panic!
  • 【Rust自学】9.3. Result枚举与可恢复的错误 Pt.2:传播错误、?运算符与链式调用