OkHttp、Retrofit、RxJava:一文讲清楚
一、okHttp的同步和异步请求
Call 是 OkHttp 的核心接口,代表一个已准备好执行的 HTTP 请求。它支持 同步 和 异步 两种模式:
enqueue——>okHttp异步
OkHttpClient client = new OkHttpClient();Request request = new Request.Builder().url("https://example.com").build();client.newCall(request).enqueue(new Callback() {@Overridepublic void onResponse(Call call, Response response) throws IOException {System.out.println("Response: " + response.body().string());}@Overridepublic void onFailure(Call call, IOException e) {System.err.println("Request failed: " + e.getMessage());}
});
execute——>同步请求
特点
-
阻塞线程
请求发送后,当前线程会阻塞,直到服务器返回响应或超时。 -
直接返回结果
通过execute()
方法直接返回Response
对象,无需回调。 -
线程管理
不能在主线程中执行,否则会触发NetworkOnMainThreadException
异常。同步请求必须在后台线程中执行,可以使用Thread
、ExecutorService
或 RxJava 等工具管理线程。 -
资源释放
Response
对象实现了Closeable
接口,使用try-with-resources
语法确保资源释放。 -
超时设置
默认超时为 10 秒,可通过OkHttpClient.Builder
自定义:
OkHttpClient client = new OkHttpClient();Request request = new Request.Builder().url("https://example.com").build();
try (Response response = client.newCall(request).execute()) {if (response.isSuccessful()) {System.out.println("Response: " + response.body().string());} else {System.err.println("Request failed: " + response.code());}
} catch (IOException e) {System.err.println("Request error: " + e.getMessage());
}
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {try (Response response = client.newCall(request).execute()) {if (response.isSuccessful()) {System.out.println
特性 | 同步请求(execute ) | 异步请求(enqueue ) |
---|---|---|
线程阻塞 | 阻塞当前线程,直到请求完成 | 不阻塞线程,后台执行 |
结果获取 | 直接返回 Response 对象 | 通过 Callback 回调处理结果 |
线程管理 | 需手动管理线程,避免主线程阻塞 | 自动在后台线程执行,主线程无影响 |
适用场景 | 需立即获取结果的场景(如单元测试) | 需异步处理的场景(如网络请求) |
二、okHttp+Rxjava
Observable<Response> observable = Observable.create(emitter -> {Call call = client.newCall(request);call.enqueue(new Callback() {@Overridepublic void onResponse(Call call, Response response) {if (!emitter.isDisposed()) {emitter.onNext(response);emitter.onComplete();}}@Overridepublic void onFailure(Call call, IOException e) {if (!emitter.isDisposed()) {emitter.onError(e);}}});emitter.setCancellable(call::cancel);
});observable.subscribeOn(Schedulers.io()) // 在 IO 线程执行.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理结果.subscribe(new Observer<Response>() {@Overridepublic void onSubscribe(Disposable d) {// 订阅时调用}@Overridepublic void onNext(Response response) {System.out.println("Response: " + response.body().string());}@Overridepublic void onError(Throwable e) {System.err.println("Error: " + e.getMessage());}@Overridepublic void onComplete() {// 请求完成时调用}});
二、Retrofit 的同步与异步用法,用Call发起请求
Retrofit 内部使用 OkHttp 作为 HTTP 客户端,这意味着在 Retrofit 的配置中,你可以定制 OkHttp 的各种参数(如超时设置、拦截器等)。
为了更好地支持异步编程,Retrofit 提供了 RxJava 的 CallAdapter。通过这个适配器,你可以让 API 接口直接返回 Observable、Single 等 RxJava 类型,从而使用 RxJava 的强大操作符来处理请求结果和错误。
public interface ApiService {@GET("users")Call<List<User>> getUsers();
}
Retrofit retrofit = new Retrofit.Builder().baseUrl("https://api.example.com/").addConverterFactory(GsonConverterFactory.create()) // 使用 Gson 进行 JSON 转换.build();ApiService apiService = retrofit.create(ApiService.class);
Retrofit同步请求
需在子线程中调用,否则会触发 NetworkOnMainThreadException
new Thread(new Runnable() {@Overridepublic void run() {try {Call<List<User>> call = apiService.getUsers();Response<List<User>> response = call.execute();if (response.isSuccessful() && response.body() != null) {List<User> users = response.body();// 处理返回的数据} else {Log.e("Retrofit", "Response error: " + response.code());}} catch (IOException e) {e.printStackTrace();}}
}).start();
Retrofit异步请求
Call<List<User>> call = apiService.getUsers();
call.enqueue(new Callback<List<User>>() {@Overridepublic void onResponse(Call<List<User>> call, Response<List<User>> response) {if (response.isSuccessful() && response.body() != null) {List<User> users = response.body();// 处理返回的数据,比如更新 UI} else {// 请求成功,但是服务器返回错误码,比如 404、500 等Log.e("Retrofit", "Response error: " + response.code());}}@Overridepublic void onFailure(Call<List<User>> call, Throwable t) {// 请求失败,比如网络错误或解析错误Log.e("Retrofit", "Request failed", t);}
});
三、Retrofit + RxJava 的同步/异步与线程统一处理,用Observable发起请求
Observable 是 RxJava 的响应式编程模型,用于处理异步数据流。其优势包括:
- 链式调用:通过操作符(如
map
、flatMap
)简化复杂逻辑。 - 线程调度:通过
subscribeOn
和observeOn
控制线程切换。 - 错误处理:通过
onErrorReturn
或retry
实现容错。
同步请求
public interface ApiService {@GET("users/{id}")Observable<User> getUserRx(@Path("id") int id); // 返回 Observable
}apiService.getUserRx(1).subscribeOn(Schedulers.io()) // 指定请求线程.observeOn(Schedulers.io()) // 指定响应处理线程.blockingSubscribe(user -> {// 同步阻塞获取结果});
异步请求
// 1. 定义 API 接口,使用 RxJava 的 Observable 作为返回类型
public interface ApiService {@GET("users")Observable<List<User>> getUsers();
}// 2. 配置 OkHttp 客户端(可添加拦截器、日志打印等)
OkHttpClient okHttpClient = new OkHttpClient.Builder()// 例如:添加日志拦截器// .addInterceptor(new HttpLoggingInterceptor().setLevel(HttpLoggingInterceptor.Level.BODY)).build();// 3. 配置 Retrofit,同时添加 Gson 转换器和 RxJava 适配器
Retrofit retrofit = new Retrofit.Builder().baseUrl("https://api.example.com/").client(okHttpClient).addConverterFactory(GsonConverterFactory.create()) // JSON 数据解析.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // RxJava 适配器.build();// 4. 创建 API 服务实例
ApiService apiService = retrofit.create(ApiService.class);// 5. 使用 RxJava 进行网络请求
apiService.getUsers().subscribeOn(Schedulers.io()) // 在 IO 线程执行网络请求.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理返回结果.subscribe(users -> {// 成功回调,处理用户数据}, throwable -> {// 错误回调,处理异常情况});
方案 | 线程管理 | 代码复杂度 | 适用场景 |
---|---|---|---|
OkHttp 原生 | 手动切换 | 低 | 简单请求、低耦合场景 |
Retrofit 原生 | 手动切换 | 中 | 接口化请求、中等复杂度 |
Retrofit+RxJava | 自动统一管理 | 高 | 复杂异步流、高可维护性 |
四、具体使用Retrofit+RxJava
// 1. 定义服务接口
public interface UserService {@POST("/user/{userId}/profile")Observable<BaseResponse<UserProfile>> updateProfile(@Path("userId") String userId,@Body ProfileParams params);
}// 2. 发起请求
NetworkApi.createService(UserService.class, ServiceType.User.name()).updateProfile("123", new ProfileParams("Kimi", "avatar.jpg")).compose(NetworkApi.applySchedulers()).subscribe(new BaseObserver<UserProfile>() {@Overridepublic void onSuccess(UserProfile profile) {// 更新UI}@Overridepublic void onBusinessError(int code, String msg) {// 显示错误提示}});
设计优势:
- 责任链模式:每个方法专注单一职责(创建→配置→调度→响应)
- 类型安全:通过泛型确保数据模型一致性
- 异常隔离:BaseObserver集中处理网络/业务异常
- 线程透明:applySchedulers()隐藏线程切换细节
该模式常见于需要支持多环境、多服务类型的现代移动应用架构,特别适合企业级应用开发中需要统一管理API请求的场景。
这是典型的Retrofit+RxJava组合的网络请求链式调用写法,结合了工厂模式、建造者模式和响应式编程思想。其核心结构可分为四个关键部分:
1.服务实例创建
NetworkApi.createService(ApiService.class, ServiceType.License.name())
NetworkApi
是自定义的工厂类,封装了Retrofit实例的创建过程createService()
方法通过动态代理生成ApiService接口的实现类ServiceType.License.name()
指定服务类型,通常用于动态配置baseUrl(如区分认证服务、业务服务等)
2.具体API操作
.action(id, params)
action()
对应ApiService接口中定义的端点方法:
@POST("/api/{serviceType}/actions")
Observable<BaseResponse<T>> action(@Path("serviceType") String serviceType,@Body ActionParams params
);
id
参数可能用于路径替换(@Path注解),params
作为请求体(@Body注解)
3.线程调度组合
.compose(NetworkApi.applySchedulers(...))
compose()
是RxJava的操作符,统一应用线程切换规则applySchedulers()
典型实现:
4.观察者封装
new BaseObserver<BaseResponse<AccessConsentRecordDetailBean>>()
- 自定义的BaseObserver处理通用逻辑:
public abstract class BaseObserver<T> implements Observer<T> {@Overridepublic void onError(Throwable e) {// 统一错误处理(网络异常、业务异常等)}@Overridepublic void onNext(T response) {if (response.isSuccess()) {onSuccess(response.getData());} else {onBusinessError(response.getCode(), response.getMessage());}}
}
五、Flowable与Observable
Flowable与Observable是RxJava中两种核心的响应式数据流类型,结合Retrofit使用时需要根据具体场景进行选型。以下是两者的核心区别及在Retrofit中的实践建议:
类型 | 特性 | Retrofit集成场景 |
---|---|---|
Observable | 无背压机制的异步数据流,支持同步/异步操作,适合轻量级请求 | 简单API调用(如获取用户信息、配置数据) |
Flowable | 支持背压控制的响应式流,内置5种策略处理生产消费速度差异 | 大文件传输、实时数据推送等高并发场景 |
核心区别
1. 背压处理机制
- Flowable
通过BackpressureStrategy
配置策略(ERROR/BUFFER/DROP/LATEST/MISSING),在Retrofit中处理大数据流时需显式指定策略:
@GET("api/sensor-data")
Flowable<Data> getSensorData(@Query("deviceId") String id); // 默认使用ERROR策略[5](@ref)
- Observable
无背压控制,Retrofit接口直接返回Observable时需确保数据量可控,否则可能引发OOM
2. 线程模型
Flowable
强制异步订阅,Retrofit需配合subscribeOn(Schedulers.io())
使用:
api.getSensorData("DEV001").subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
Observable
支持同步调用,适合快速响应的本地缓存查询:
@GET("api/profile")
Observable<User> getUserProfile(); // 同步获取用户数据[9](@ref)
3. 性能表现
- Flowable
因背压检测机制,吞吐量比Observable低约15-30%,但内存更安全 - Observable
无额外性能损耗,适合高频低数据量请求(如按钮点击事件统计)
4.集成实现
- BUFFER:文件下载(需注意内存监控)
Flowable.create(emitter -> {...}, BackpressureStrategy.BUFFER)
- DROP:实时股票行情推送(丢弃过时数据)
- LATEST:即时聊天消息(保留最新消息)
5.错误处理对比
- Observable需手动捕获异常:
.subscribe(data -> {...},error -> { // 需处理所有异常 }
)
- Flowable通过
onBackpressureXXX
操作符自动处理:
.onBackpressureDrop(dropped -> log("丢弃数据:" + dropped))
性能优化建议
-
混合使用策略
对核心业务接口使用Flowable+BUFFER策略,非核心功能使用Observable -
动态缓存控制
通过rx2.buffer-size
参数调整Flowable缓存池:System.setProperty("rx2.buffer-size", "256"); // 默认128[7](@ref)
-
生命周期管理
使用CompositeDisposable
统一释放资源:CompositeDisposable disposables = new CompositeDisposable();disposables.add(api.getDataStream().subscribe(data -> {...}));
通过合理选择Flowable与Observable,可使Retrofit网络层在保证稳定性的同时获得最佳性能。可以在金融交易、物联网等高频场景优先采用Flowable,而在常规业务API中使用Observable以降低复杂度。