当前位置: 首页 > news >正文

响应式编程-reactor

什么是响应式编程

正如我们所知道的过程式编程、命令式编程、面向对象编程、事件驱动编程范式,响应式编程也是编程范式的一种。

基础介绍

reactor作为一种响应式编程具体实现,能够跟踪异步场景下的数据流和数据状态变更。在Java9中已经集成了响应式编程的接口,类名为Flow。

现在的硬件已经非常厉害了,但是随着并发量的增加,仍然会达到瓶颈。现在主要有两个方式解决程序的系统:
(1)并行:单机维度增加线程,集群维度扩机器;
(2)高效的资源利用:寻找更有效的方式利用当前已有的资源;

现在我们传统的做法是写阻塞式的代码,如果出现性能瓶颈,可能会想办法提升并发度,但是这种方式可能会导致竞争和并发等问题。更糟糕的是,可能由于等待IO资源,导致线程一直处于空闲等待,导致资源浪费。

目前在jvm中听过了两种异步编程的模式:Callbacks、Futures。
Callbacks模式很难实现组合,从而导致读和理解都较困难,而Futures模式比Callbacks模式好一些,但是也存在多种问题:不支持懒计算、缺失多值及错误处理等机制。

多种异步写法的比较

Callbacks写法

// 定义回调接口
interface Callback {void callback(String res);
}// 异步回调模拟调用
public void asyncWork(Callback callback) {new Thread(()->{// 先做某些事情String s = doSomething();// 等事情做完之后,执行回调函数callback.callback(s);}).start();
}// 模拟做的事情
private String doSomething() {try{Thread.sleep(5000);}catch (Exception ignored){}return "test";
}

Futures写法

// 异步执行
CompletableFuture.runAsync(() ->{doSomething();
});// 模拟做的事情
private String doSomething() {try{Thread.sleep(5000);}catch (Exception ignored){}return "test";
}

Reactor写法

@Test
public void test10() {// 创建一个流式数据Flux<String> flux = Flux.just("a", "b", "c");// 在一个异步调度器中执行数据打印flux.subscribeOn(Schedulers.parallel()).subscribe(System.out::println);
}

优缺点

优点

  • 非阻塞式;
  • 更好的资源利用;
  • 可组合性;
  • 错误处理;

缺点

  • 学习曲线可能复杂些;
  • 调试麻烦些;
  • 过度使用可能导致性能问题;

使用方式

引入依赖包

<dependencyManagement> <dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.11</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId> </dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId> <scope>test</scope></dependency>
</dependencies>

进行Flux或者Mono封装

@Test
public void test3() {Mono<String> future1 = Mono.fromCallable(() -> {Thread.sleep(5000);return "Result 1";}).subscribeOn(Schedulers.elastic());Mono<String> future2 = Mono.fromCallable(() -> {Thread.sleep(5000);return "Result 2";}).subscribeOn(Schedulers.elastic());long start = System.currentTimeMillis();Mono<List<String>> mono = Flux.merge(future1, future2).collectList().timeout(Duration.ofSeconds(9));List<String> res = mono.block();System.out.println(res);System.out.println("cost:" + (System.currentTimeMillis() - start));
}@Test
public void test9() {Mono<String> mono = Mono.fromSupplier(()-> {try {System.out.println("name:" + Thread.currentThread().getName());Thread.sleep(5000);}catch (Exception e){}return "hello";});long start = System.currentTimeMillis();mono.subscribeOn(Schedulers.parallel()).block();System.out.println("cost:" + (System.currentTimeMillis() - start));
}

Q&A

Q:publishOn和subscribeOn的区别
A:参考下面文档:
https://blog.csdn.net/qq_33797928/article/details/105005629
https://juejin.cn/post/7147655449695748126
https://juejin.cn/post/7251894360722292792

个人思考

1、计算机涉及多种资源,比如:CPU资源、IO资源(网络设备、存储设备)等,如果很容易出现资源瓶颈,会导致计算机资源没法充分利用。比如当CPU资源使用率为30%时,IO资源使用率已达到100%,这时候会导致CPU资源出现严重的浪费。如果能通过合理的系统调度,保证CPU、IO等资源都能进行合理的利用,可能会导致整体的性能有很大的提升。

参考文档

1、reactor问题社区
https://stackoverflow.com/questions/tagged/reactor
2、官方文档
https://projectreactor.io/docs/core/release/reference/#_blocking_can_be_wasteful


http://www.mrgr.cn/news/65803.html

相关文章:

  • 基于SSM+微信小程序的社团登录管理系统(社团1)
  • python中应该使用while 1吗?按位运算符可以代替逻辑运算符使用吗?
  • 力扣题目解析--最长公共前缀
  • Rust的enum枚举的强大用法
  • T31开发笔记:简单的Log日记输出
  • 向量数据库|第2期|pgvectorscale
  • 动态内存分配
  • 使用 pytorch 运行预训练模型的框架
  • FFmpeg 4.3 音视频-多路H265监控录放C++开发十二:在屏幕上显示多路视频播放,可以有不同的分辨率,格式和帧率。
  • HTB:Shocker[WriteUP]
  • 如何在BSV区块链上实现可验证AI
  • 隆盛策略股票杠杆交易市场罕见,26只“牛股”提示风险
  • VSCode 1.82之后的vscode server离线安装
  • Centos使用yum获取离线安装包
  • springboot 单元测试-各个模块举例
  • 爱奇艺大数据多AZ统一调度架构:打破数据孤岛,提升效率
  • windows——病毒的编写
  • Fish Agent:集成 ASR 和 TTS 的端到端语音处理模型,支持多语言转换
  • 单体架构的 IM 系统设计
  • 【教学类-12-10】20241104《连连看竖版6*6 (3套题目空心图案)中2班
  • 泛微开发修炼之旅--53ecology表单转pdf源码修改相关(表单转pdf时可以修改最后生成的pdf的内容)
  • mysql5安装
  • 数字证书的简单记录
  • 基于SpringBoot司机信用评价的货运管理系统【附源码】
  • Windows无法访问\\192.168.1.156,错误代码0x800704cf
  • 11.4OpenCV_图像预处理习题02