@[TOC]
将 RxJava 融入到实际项目架构中,解决复杂问题。
1.1 与 Retrofit 结合
Retrofit 官方支持返回 Observable 或 Flowable,是处理网络请求的黄金搭档。
public interface ApiService {
@GET("users/{id}")
Observable<User> getUser(@Path("id") int id);
}
// 使用
apiService.getUser(123)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
user -> { /* 处理成功 */ },
error -> { /* 处理错误 */}
);
1.2 与 MVP/MVVM 架构结合
将 RxJava 与 MVP 或 MVVM 架构结合使用,是现代 Android(以及 Java Swing)开发中的最佳实践之一。这种结合能充分发挥各自的优势:架构模式负责清晰的职责分离和可测试性,而 RxJava 则优雅地处理异步、事件流和复杂的线程切换。
1.2.1 与 MVP (Model-View-Presenter) 结合
MVP 的核心思想是将 Activity/Fragment (View) 从繁重的业务逻辑中解放出来,使其只负责 UI 的展示和用户交互的传递。Presenter 作为中间层,负责处理业务逻辑并与 Model 层交互。
// 1. 定义 View 接口
public interface UserView {
void showLoading();
void hideLoading();
void showUsers(List<User> users);
void showError(String message);
}
// 2. Model 层 (返回 RxJava 数据流)
public class UserModel {
private ApiService apiService; // 假设使用 Retrofit
public Observable<List<User>> getUsers() {
return apiService.getUsers() // 返回 Observable<List<User>>
.subscribeOn(Schedulers.io()); // 在 IO 线程执行网络请求
}
}
// 3. Presenter 层 (核心: 处理 RxJava 链)
public class UserPresenter {
private UserView view;
private UserModel model;
private CompositeDisposable disposables = new CompositeDisposable();
public UserPresenter(UserView view, UserModel model) {
this.view = view;
this.model = model;
}
public void loadUsers() {
view.showLoading();
// 建立 RxJava 订阅
disposables.add(
model.getUsers()
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程更新 UI
.subscribe(
users -> {
view.hideLoading();
view.showUsers(users); // 成功: 更新 UI
},
error -> {
view.hideLoading();
view.showError("加载失败: " + error.getMessage()); // 失败: 显示错误
}
)
);
}
// 在 Activity/Fragment onDestroy 时调用,防止内存泄漏
public void onDestroy() {
disposables.clear();
}
}
// 4. View 层 (Activity/Fragment)
public class UserActivity extends AppCompatActivity implements UserView {
private UserPresenter presenter;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_user);
// 初始化 Presenter
presenter = new UserPresenter(this, new UserModel());
// 触发加载
presenter.loadUsers();
}
@Override
protected void onDestroy() {
super.onDestroy();
presenter.onDestroy(); // 关键: 取消所有订阅
}
// 实现 View 接口方法
@Override
public void showLoading() {
// 显示进度条
}
@Override
public void hideLoading() {
// 隐藏进度条
}
@Override
public void showUsers(List<User> users) {
// 更新 RecyclerView 或 ListView
}
@Override
public void showError(String message) {
// 弹出 Toast 或 Snackbar
}
}
优势:
职责清晰: View 只管展示,Presenter 处理逻辑和数据流。
易于测试: Presenter 不依赖 Android 组件,可以方便地进行单元测试。
异步处理优雅: RxJava 完美解决了网络请求、数据库操作等异步问题。挑战:
内存泄漏: 必须妥善管理 Disposable,在生命周期结束时取消订阅。
Presenter 膨胀: 如果业务逻辑复杂,Presenter 可能会变得非常庞大。可以通过引入 Interactor (或称 Use Case) 层来分担业务逻辑。1.2.2 RxJava 与 MVVM (Model-View-ViewModel) 结合
MVVM 通过 数据绑定 (Data Binding) 或 LiveData/StateFlow 将 View 与 ViewModel 解耦。View 通过观察 ViewModel 中的数据变化来自动更新 UI,ViewModel 则负责准备和管理这些数据。
RxJava 可以作为 ViewModel 内部处理异步数据流的强大工具,最终将结果暴露给 View。由于 Android 官方推荐在 MVVM 中使用 LiveData,而 LiveData 本身不是响应式流,我们通常使用 LiveDataReactiveStreams 工具类进行桥接。
// 1. Model 层 (同 MVP)
public class UserModel {
public Observable<List<User>> getUsers() {
return apiService.getUsers()
.subscribeOn(Schedulers.io());
}
}
// 2. ViewModel 层 (核心: 使用 RxJava 处理逻辑,输出 LiveData)
public class UserViewModel extends ViewModel {
private UserModel model;
// 暴露给 View 的 LiveData
private MutableLiveData<List<User>> usersLiveData = new MutableLiveData<>();
private MutableLiveData<Boolean> loadingLiveData = new MutableLiveData<>();
private MutableLiveData<String> errorLiveData = new MutableLiveData<>();
// 提供 LiveData 给 View 观察
public LiveData<List<User>> getUsers() {
return usersLiveData; }
public LiveData<Boolean> isLoading() {
return loadingLiveData; }
public LiveData<String> getError() {
return errorLiveData; }
public UserViewModel(UserModel model) {
this.model = model;
}
public void loadUsers() {
loadingLiveData.setValue(true);
// 将 RxJava Observable 转换为 LiveData
LiveData<List<User>> liveData = LiveDataReactiveStreams.fromPublisher(
model.getUsers()
.toFlowable(BackpressureStrategy.LATEST) // 转换为 Flowable 以支持背压
.observeOn(AndroidSchedulers.mainThread()) // 确保在主线程发射
);
// 订阅这个 LiveData
liveData.observeForever(users -> {
loadingLiveData.setValue(false);
usersLiveData.setValue(users);
});
// 你也可以直接订阅 Observable,并手动设置 LiveData
/*
model.getUsers()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
users -> {
loadingLiveData.setValue(false);
usersLiveData.setValue(users);
},
error -> {
loadingLiveData.setValue(false);
errorLiveData.setValue(error.getMessage());
}
);
*/
}
@Override
protected void onCleared() {
super.onCleared();
// ViewModel 被销毁时,LiveDataReactiveStreams 会自动取消订阅
// 如果是手动 subscribe,需要在此处管理 Disposable
}
}
// 3. View 层 (Activity/Fragment)
public class UserActivity extends AppCompatActivity {
private UserViewModel viewModel;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_user);
viewModel = new ViewModelProvider(this).get(UserViewModel.class);
// 观察数据变化
viewModel.getUsers().observe(this, users -> {
// 更新 UI,例如设置 Adapter
});
viewModel.isLoading().observe(this, isLoading -> {
if (isLoading) {
// 显示加载框
} else {
// 隐藏加载框
}
});
viewModel.getError().observe(this, errorMsg -> {
if (errorMsg != null) {
// 显示错误信息
}
});
// 触发加载
viewModel.loadUsers();
}
}
- 优势:
解耦更彻底: View 通过观察数据变化来更新,无需主动调用方法。
生命周期感知: LiveData 和 StateFlow 能自动感知 Activity/Fragment 的生命周期,避免在非活跃状态下更新 UI。
数据驱动 UI: UI 的状态完全由数据决定,逻辑更清晰。 - 挑战:
桥接成本: 需要将 RxJava 流转换为 LiveData 或 StateFlow,增加了复杂性。
学习曲线: 需要同时理解 MVVM、数据绑定和 RxJava。1.3. RxBus (事件总线)
利用 PublishSubject 或 BehaviorSubject 实现组件间的解耦通信。
```Java
public class RxBus {
private final PublishSubject