当前位置: 首页 > news >正文

Flume实战--Flume中的选择器、自动容灾(故障转移)、负载均衡的详解与操作

        本文详细介绍了Apache Flume的关键特性,包括选择器、拦截器、故障转移和负载均衡。选择器负责将数据分发到多个Channel,拦截器用于修改或丢弃Event。故障转移机制能够在Sink故障时自动切换,而负载均衡则在多个Sink间分配负载。文章还提供了自定义拦截器的示例,展示了Flume在复杂数据处理中的灵活性和稳定性。

选择器

        当一个Source连接到多个Channel时,选择器决定了数据如何分发到这些Channel。Flume提供了复制选择器和多路复用选择器,允许我们根据需要选择数据分发策略。

一个Source对应多个channel的情况下,多个Channel中的数据是否相同,取决于我们使用了什么选择器,默认是复制选择器。也可以手动的使用多路选择器。

复制选择器

        复制选择器会将数据复制到所有配置的Channel,适用于需要在多个地方处理相同数据的场景。

 配置示例

编写flume脚本,需要一个source,两个channel,以及两个sink

a1.sources = r1  
a1.channels = c1 c2
a1.sinks = s1 s2
#  avro http  syslogtcp
# avro  avro-client
# http  curl
# syslogtcp  nc 
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 7777#执行选择器类型为复制选择器
a1.sources.r1.selector.type=replicatinga1.channels.c1.type=memory
a1.channels.c2.type=memorya1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y-%m-%d/rep
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.useLocalTimeStamp=truea1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=/flume/%Y-%m-%d/rep
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.useLocalTimeStamp=truea1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

启动这个flume脚本:

flume-ng agent -c ./ -f syslogtcp-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

向bigdata01 中的 7777 端口发送消息:

echo "hello world" | nc bigdata01 7777如果nc命令无法识别,需要安装一下   yum install -y nc

 查看里面的数据发现都一样,说明使用的是复制选择器。

多路复用选择器

        多路复用选择器可以根据数据内容选择性地将数据发送到特定的Channel,适用于根据数据特性进行分流处理的场景。

就是每次发送消息的时候,可以指定发送消息走哪条channel,只有这条channel对应的sink才有数据,其他sink没数据。

 配置示例

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state        #以每个Event的header中的state这个属性的值作为选择channel的依据
a1.sources.r1.selector.mapping.CZ = c1       #如果state=CZ,则选择c1这个channel
a1.sources.r1.selector.mapping.US = c2 c3    #如果state=US,则选择c2 和 c3 这两个channel
a1.sources.r1.selector.default = c4          #默认使用c4这个channel

说明一个小区别:

avro
syslogtcp
http

可以指定一个hostname和端口号

不同的source,我们使用的发送数据的方式是不一样的:
avro-client
nc
curl

curl  是可以模拟发送get 或者 post 请求的。
比如: curl www.baidu.com

 编写脚本:mul.conf

a1.sources = r1  
a1.channels = c1 c2
a1.sinks = s1 s2a1.sources.r1.type= http
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 8888a1.sources.r1.selector.type=multiplexing
# header 跟  mapping 结合在一起,用于发送消息时,指定发送的方向
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.USER = c1
a1.sources.r1.selector.mapping.ORDER = c2
# 发送的消息找不到具体的channel,就走默认的c1
a1.sources.r1.selector.default = c1a1.channels.c1.type=memory
a1.channels.c2.type=memorya1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y-%m-%d/mul
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.useLocalTimeStamp=truea1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=/flume/%Y-%m-%d/mul
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.useLocalTimeStamp=truea1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

 启动该脚本:

flume-ng agent -c ./ -f mul.conf -n a1 -Dflume.root.logger=INFO,console

 模拟http请求:

curl -X POST -d '[{"headers":{"state":"USER"},"body":"this my multiplex to c1"}]' http://bigdata01:8888
curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://bigdata01:8888

效果就是,当我发送一条指令的时候,走state=USER的路径,只生成一个文件,走另一条路才会生成另一个不同的文件。

 

自动容灾(故障转移)

        Flume的故障转移机制允许在一个Sink组中的多个Sink之间自动切换,确保数据的持续处理,即使某个Sink出现故障。

多个sink组成一个组,这个组内的sink只有一台工作,假如这一台坏了,另一台自动的工作。

为了演示这个效果,我使用了三个Agent.模型如下:

在bigdata02和bigdata03上安装flume

在集群中可以使用脚本
xsync.sh /opt/installs/flume/
xsync.sh /etc/profile
xcall.sh source /etc/profile也可以使用长拷贝命令,例如:
scp -r /opt/installs/flume1.9.0/ root@hadoop11:/opt/installs/
# 因为 /etc/hosts 文件中没有配置映射,所以使用ip代替了
scp -r /opt/installs/flume1.9.0/ root@192.168.52.12:/opt/installs/scp -r /etc/profile root@hadoop11:/etc
scp -r /etc/profile root@192.168.52.12:/etc两个虚拟机需要刷新配置文件
source /etc/profile

在bigdata01上,编写flume脚本:

failover.conf

#list names
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1# source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 10086# channel
a1.channels.c1.type = memory# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 10087a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 10088#设置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
#  此处是设置的权重,权重越多,就工作
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
a1.sinkgroups.g1.processor.maxpenalty = 10000

启动该脚本:

flume-ng agent -c ../conf -f ./failover.conf -n a1 -Dflume.root.logger=INFO,console

在bigdata02上,编写 failover2.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigdata02
a1.sources.r1.port = 10087# channel
a1.channels.c1.type = memory# sink
a1.sinks.k1.type = logger

启动flume脚本:

flume-ng agent  -f ./failover2.conf -n a1 -Dflume.root.logger=INFO,console

在bigdata03上,编写 failover3.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigdata03
a1.sources.r1.port = 10088# channel
a1.channels.c1.type = memory# sink
a1.sinks.k1.type = logger

启动flume脚本:

flume-ng agent  -f ./failover3.conf -n a1 -Dflume.root.logger=INFO,console

bigdata02和03启动无异常,再启动01上的脚本:

flume-ng agent -f ./failover.conf -n a1 -Dflume.root.logger=INFO,console

在bigdata01上,发送消息:

echo "wei,wei,wei" | nc hadoop10 10086

发现bigdata02有反应,出现了消息,而bigdata03无反应。因为bigdata02权重大,需要工作

测试故障转移,将bigdata02停掉,再在bigdata01上发消息,就发现bigdata03收到消息了,故障转移了。

负载均衡

        负载均衡机制允许Flume在多个Sink之间平衡数据负载,提高数据处理的效率和可靠性。

演示一下:

bigdata01中创建balance.conf

#list names
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1# source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 10086# channel
a1.channels.c1.type = memory# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 10087a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 10088#设置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random

bigdata02,bigdata03 不变,服务必须是启动的。

使用我们的nc命令发送请求,发现bigdata02和bigdata03随机的处理请求的数据。

  flume轮训是每隔一段时间轮训,而不是每秒轮训一次。所以可能多条在同一时间间隔的events都被一个输出到一个sink端。

练习

需求

编写一个Flume配置,实现以下需求:

 抽取data.json文件中的数据,只保留Warn以及Error级别的日期,并且timestamp只保留4月1日以后的数据。

数据

{"timestamp": "2023-03-01 12:00:00", "level": "INFO", "message": "This is an info message."}
{"timestamp": "2023-03-31 12:01:00", "level": "ERROR", "message": "Error occurred!"}
{"timestamp": "2023-04-01 12:02:00", "level": "WARN", "message": "Warn occurred!"}
{"timestamp": "2023-04-02 12:02:00", "level": "WARN", "message": "Warn occurred!"}
{"timestamp": "2023-04-03 12:02:00", "level": "WARN", "message": "Warn occurred!"}

代码

package com.bigdata;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;public class ETLInterceptor implements Interceptor {/*** {"timestamp": "2023-03-01 12:00:00", "level": "INFO", "message": "This is an info message."}* {"timestamp": "2023-03-31 12:01:00", "level": "ERROR", "message": "Error occurred!"}* {"timestamp": "2023-04-01 12:02:00", "level": "WARN", "message": "Warn occurred!"}** 抽取data.json文件中的数据,只保留Warn以及Error级别的日期,并且timestamp只保留4月1日以后的数据。*/@Overridepublic void initialize() {}// 这个方法是核心方法,可以处理一条,就可以循环处理多条@Overridepublic Event intercept(Event event) {// byte[] --> StringString json = new String(event.getBody());/*JSONObject jsonObject = JSON.parseObject(json);String timestamp = jsonObject.getString("timestamp");String level = jsonObject.getString("level");String message = jsonObject.getString("message");*/Log log = JSON.parseObject(json, Log.class);System.out.println(log);// 日期比较  after  before   --> Calendar 、Date、DateLocal// String --> DateSimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");try {Date date = dateFormat.parse(log.getTimestamp());Date date2 = dateFormat.parse("2023-03-31");if((log.getLevel().equals("WARN") || log.getLevel().equals("ERROR"))  &&  date.after(date2)){System.out.println("符合条件的数据......");return event;}} catch (ParseException e) {throw new RuntimeException(e);}return null;}@Overridepublic List<Event> intercept(List<Event> list) {ArrayList<Event> events = new ArrayList<>();// 任何一个list ,都不能边循环,边添加或者删除,否则报错!!!// 如何过滤掉不符合条件的数据,先将其置为null,然后在循环中,将null的数据不添加到集合中,就过滤掉了for (Event oldEvent : list) {Event newEvent = intercept(oldEvent);if(newEvent != null){events.add(newEvent);}}return events;}@Overridepublic void close() {}public static class EventBuilder implements Builder {@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
}

 视频链接

15-flume中的选择器_哔哩哔哩_bilibili

16-flume的故障转移_哔哩哔哩_bilibili

17-flume中的负载均衡_哔哩哔哩_bilibili


http://www.mrgr.cn/news/39515.html

相关文章:

  • nginx的安装和使用
  • 1.8 软件业务测试
  • MySQL_触发器
  • 建筑物变化检测算法baseline工程,开箱即用,23年5月测试准确度超越阿里云aiearth
  • JavaWeb——Vue组件库Element(3/6):常见组件:Dialog对话框、Form表单(介绍、使用、实际效果)
  • 计算机视觉周边技术解析:从基础到前沿
  • Github 2024-09-30 开源项目周报 Top15
  • 初始MYSQL数据库(8)—— JDBC编程
  • jQuery UI 工作原理
  • 实战OpenCV之边缘检测
  • 【C++】多态(上)
  • 什么是区块链?
  • ISA-95制造业中企业和控制系统的集成的国际标准-(2)
  • 使用 Vue3 和 Axios 实现 CRUD 操作
  • spring全家桶使用教程
  • Stable Diffusion绘画 | 来训练属于自己的模型:素材处理与打标篇
  • PIKACHU | PIKACHU 靶场 XSS 后台配置
  • Spring - @Import注解
  • 读数据湖仓03不同类型的数据
  • Mixture-of-Experts (MoE): 条件计算的诞生与崛起【下篇】