大数据应用开发——实时数据采集
前言
hadoop,zookeeper要开启
目录
题目
准备一个Flume agent的配置文件
创建、查看Kafka的Topic中(Topic名称为order,分区数为4)
题目
在主节点使用Flume采集实时数据生成器10050端口的socket数据,将数据存入到Kafka的Topic中(Topic名称为order,分区数为4),使用Kafka自带的消费者消费order(Topic)中的数据,将前2条数据的结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下;
准备一个Flume agent的配置文件
vi /usr/flume/conf/flume-kafka.conf
# 定义了agent a1 的一个source,名称为s1
a1.sources=s1
# 定义了agent a1 的一个sink,名称为k1
a1.sinks=k1
# 定义了agent a1 的一个channel,名称为c1
a1.channels=c1# 指定了source s1 的类型为netcat
a1.sources.s1.type=netcat
# 指定了netcat source绑定的IP地址
a1.sources.s1.bind=localhost
# 指定了netcat source监听的端口号为10050
a1.sources.s1.port=10050# 指定了sink k1 的类型为Kafka sink,用于将数据发送到Kafka集群
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
# 指定了Kafka sink发送数据的目标主题为order
a1.sinks.k1.kafka.topic=order
# 指定了Kafka集群的bootstrap服务器列表
a1.sinks.k1.kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092# 指定了channel c1 的类型
a1.channels.c1.type=memory
# 设置了内存channel的容量为1000个事件,这是channel可以存储的最大事件数
a1.channels.c1.capacity=1000
# 设置了内存channel的事务容量为100个事件,这是channel在一次事务中可以处理的最大事件数
a1.channels.c1.transactionCapacity=100# 将source s1 与channel c1 连接起来
a1.sources.s1.channels=c1
# 将sink k1 与channel c1 连接起来
a1.sinks.k1.channel=c1
创建、查看Kafka的Topic中(Topic名称为order,分区数为4)
# 可以用jps查看Kafka有没有启动,没有启动可以运行此命令
/usr/kafka/bin/kafka-server-start.sh -daemon /usr/kafka/config/server.properties# 查看Kafka中所有已创建的Topics
/usr/kafka/bin/kafka-topics.sh --zookeeper master:2181 --list# 在Kafka中创建一个新的Topic为order,分区数为4
/usr/kafka/bin/kafka-topics.sh --create --topic order --partitions 4 --replication-factor 1 --zookeeper master:2181# 查看Kafka中所有已创建的Topics
/usr/kafka/bin/kafka-topics.sh --zookeeper master:2181 --list# 使用flume-kafka.conf配置文件,在/usr/flume/conf/配置目录下,启动一个名为a1的Flume agent,并将日志级别设置为INFO,输出到控制台
/usr/flume/bin/flume-ng agent -n a1 -c conf/ -f /usr/flume/conf/flume-kafka.conf -Dflume.root.logger=INFO,console# 启动Kafka的消费者,它允许你从指定的Topic中读取消息并显示在控制台上,最大消息数2条
/usr/kafka/bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic order --from-beginning --max-messages 2