当前位置: 首页 > news >正文

数据流风格

数据流风格概述

数据流风格是一种软件架构风格,强调数据从一个计算单元流向下一个计算单元。这种风格特别适用于处理大数据量的任务或具有复杂数据处理链的系统。数据流架构解耦了数据生成者和处理者,使得系统具有较高的内聚性低耦合,非常适合数据密集型应用。

数据流风格可以细分为两种主要子风格:

  1. 批处理序列 (Batch Sequential)
  2. 管道过滤器 (Pipeline-Filter)

子风格一:批处理序列

批处理序列 是数据流风格中的一个具体形式,指的是一组数据在多阶段的处理流程中被按顺序处理。每个阶段的处理任务完成后,才会进入下一个阶段。

特点:
  1. 强时间顺序:每个阶段必须在前一阶段完全处理完后才能启动,存在严格的顺序性。
  2. 强数据完整性:数据在不同处理单元之间通过明确的接口或数据格式传递,确保每一阶段的数据处理独立且完整。
  3. 计划性:批处理通常通过时间调度或任务计划来触发处理,并按阶段执行,常用于日志分析、数据仓库加载等需要全量数据处理的场景。
适用场景:

批处理序列非常适合处理那些需要完整的数据集并按阶段处理的任务。例如:

  • 日志文件处理:将每天的日志文件分批加载到数据库中,首先解析文件,然后提取信息,最后存储到数据库。
示例:
// 批处理阶段一:解析日志文件
class ParseLogs {public List<String> processLogs(String logFile) {// 假设解析日志并返回日志条目列表return List.of("INFO: Server started", "ERROR: Disk full", "INFO: User logged in");}
}// 批处理阶段二:筛选错误信息
class FilterErrors {public List<String> filter(List<String> logs) {List<String> errors = new ArrayList<>();for (String log : logs) {if (log.startsWith("ERROR")) {errors.add(log);}}return errors;}
}// 批处理阶段三:存储错误信息
class StoreErrors {public void store(List<String> errors) {for (String error : errors) {System.out.println("Storing error: " + error);}}
}public class BatchProcessing {public static void main(String[] args) {ParseLogs parser = new ParseLogs();FilterErrors filter = new FilterErrors();StoreErrors store = new StoreErrors();// 批处理步骤:逐步处理日志文件List<String> logs = parser.processLogs("system.log");List<String> errors = filter.filter(logs);store.store(errors);}
}
执行流程:
  1. ParseLogs 解析日志文件。
  2. FilterErrors 从解析的日志中筛选出错误信息。
  3. StoreErrors 将错误信息存储到目标存储系统中。

该流程严格按照步骤执行,适合批量处理日志等场景。


子风格二:管道过滤器

管道过滤器是一种通过多个独立的过滤器处理数据的架构风格,强调数据从一个过滤器流向另一个过滤器,形成一个数据流管道。

特点:
  1. 增量处理:每个过滤器只处理数据的一部分,然后将处理结果传递给下一个过滤器。数据处理是递增式的,没有整体性处理的概念。
  2. 过滤器独立性:过滤器是完全独立的实体,它们只关心自己的输入和输出,不需要知道其他过滤器的状态。
  3. 解耦与复用:每个过滤器是模块化的,容易被替换、重用,且可以灵活组合形成新的数据流。
  4. 高响应性:与批处理不同,管道过滤器架构可以在数据流经每个过滤器时快速响应,支持流式处理(如秒级或分钟级的处理)。
适用场景:

适合实时处理和流式数据的场景,例如:

  • 日志流处理:可以实时监控日志流中的错误信息,触发告警系统。
  • 实时数据清洗:过滤和转换传感器数据或网络流量数据等。
示例:
import java.util.ArrayList;
import java.util.List;// 定义过滤器接口
interface Filter {List<String> process(List<String> data);
}// 输入过滤器,生成输入数据
class InputFilter implements Filter {@Overridepublic List<String> process(List<String> data) {return List.of("INFO: Server started", "ERROR: Disk full", "INFO: User logged in");}
}// 错误过滤器,筛选出错误信息
class ErrorFilter implements Filter {@Overridepublic List<String> process(List<String> data) {List<String> errors = new ArrayList<>();for (String log : data) {if (log.startsWith("ERROR")) {errors.add(log);}}return errors;}
}// 警报过滤器,处理错误信息并触发警报
class AlertFilter implements Filter {@Overridepublic List<String> process(List<String> data) {for (String log : data) {System.out.println("ALERT: " + log);}return data; // 返回原数据以便于后续处理}
}// 管道类,用于将过滤器组合
class Pipeline {private final List<Filter> filters = new ArrayList<>();public void addFilter(Filter filter) {filters.add(filter);}public List<String> execute(List<String> input) {List<String> data = input;for (Filter filter : filters) {data = filter.process(data); // 每个过滤器独立处理}return data;}
}public class PipelineProcessing {public static void main(String[] args) {// 创建管道并添加过滤器Pipeline pipeline = new Pipeline();pipeline.addFilter(new InputFilter());pipeline.addFilter(new ErrorFilter());pipeline.addFilter(new AlertFilter());// 执行管道List<String> logs = pipeline.execute(null); // 由第一个过滤器生成数据}
}
执行流程:
  1. InputFilter 生成日志数据。
  2. ErrorFilter 筛选出其中的错误日志。
  3. AlertFilter 处理错误日志并触发警报。

过滤器是模块化的,顺序可以自由组合,且每个过滤器之间没有强耦合。


总结:批处理序列 vs 管道过滤器

特点批处理序列管道过滤器
处理方式阶段性处理,每一步必须依次执行流式处理,每个过滤器独立工作
数据完整性每步处理后保持数据完整性,直到所有步骤完成增量处理,数据逐步传递和变换
响应时间较长,适合全量数据处理较短,适合实时流式处理
耦合度依赖严格的顺序每个过滤器独立工作,易于扩展
适用场景日志分析、大规模数据处理实时监控、流式数据处理

这两种子风格分别适用于不同的场景。批处理序列适合需要全量数据和阶段性处理的应用,而管道过滤器适合实时处理和流式数据处理。这两者共同的优点是解耦和复用,保证了数据流风格架构在面对复杂数据处理时的高效性。


http://www.mrgr.cn/news/55713.html

相关文章:

  • 使用Matplotlib绘制极轴散点图
  • 【JAVA】第一张_Java基础语法
  • MySQL企业常见架构与调优经验分享
  • DBdoctor推出无Agent轻量级纳管解决方案
  • Java-多线程2
  • T113 内核中 adbd相关配置1
  • 改变函数调用上下文:apply与call方法详解及实例
  • Windows 11开发:全面指南
  • Spring Boot 3 + Vue3 + Element-Plus 后台基础管理系统.zip
  • Lua中的break语句
  • 02,talend
  • C++详细笔记(四)
  • c++算法第4天
  • django5入门【03】新建一个hello界面
  • 速盾高防 CDN 防御效果如何?
  • 深入理解 Kafka
  • 基于微信小程序二手物品调剂系统设计与实现
  • Qt开发——Qt项目打包、整合以及生成安装包保姆级教程(Windows系统)
  • WSL2安装ros,安装anaconda,配置PX4
  • 传智杯 第六届—第二场—D
  • 【前端】如何制作一个自己的网页(13)
  • Redis 集群
  • 01,hana
  • 开源EMO再升级!复旦|百度|南大推出Hallo2,可以生成4K,一小时的音频驱动的视频。
  • AGV电子地图之贝塞尔曲线
  • 每日OJ题_牛客_[NOIP2001]装箱问题_01背包_C++_Java