06.RxJava初探

简介: 基本使用在build.gradle中加入配置,注意,rxJava和rxAndroid版本一定要相互兼容,不然可能会报错More than one file was found with OS independent path 'META-INF/rxjava.

基本使用

在build.gradle中加入配置,注意,rxJava和rxAndroid版本一定要相互兼容,不然可能会报错More than one file was found with OS independent path 'META-INF/rxjava.properties'

    compile 'io.reactivex.rxjava2:rxjava:2.0.2'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.2'

被观察者订阅观察者,当被观察者状态改变,可以通知观察者进行操作

第一种写法:
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                 try {
                    //这三个方法分别对应着observer中的三个方
                    //法,调用哪个就执行observer中相应的方法
                    observableEmitter.onNext("星期一");
                    observableEmitter.onComplete();
                } catch (Exception e) {
                    e.printStackTrace();
                    observableEmitter.onError(new NullPointerException("发生异常"));
                }
            }
        });

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable disposable) {
                LogUtils.i(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String s) {
                LogUtils.i(TAG, "onNext");
            }

            @Override
            public void onError(Throwable throwable) {
                LogUtils.i(TAG, "onError");
            }

            @Override
            public void onComplete() {
                LogUtils.i(TAG, "onComplete");
            }
        };
        observable.subscribe(observer);
第二种写法
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> observableEmitter) throws Exception {
                LogUtils.i(TAG, "subscribe");
                //这三个方法同样和下边三个是对应的
                observableEmitter.onNext("onNext");
                observableEmitter.onComplete();
                observableEmitter.onError(new IllegalAccessException("发送错误"));
            }
        }).subscribe(new Consumer() {
            @Override
            public void accept(Object o) throws Exception {
                LogUtils.i(TAG, "subscribe(new Consumer() " + o.toString());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                LogUtils.i(TAG, "new Consumer<Throwable>");
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                LogUtils.i(TAG, "new Action()");
            }
        });

线程控制

RxJava如何切换线程?

Schedulers.immerdiate():

直接在当前线程运行,相当于不指定线程,这是默认的Scheduler

Schedulers.newThread():

总是启用新线程,并在新线程执行操作

Schedulers.io():

I/O操作(读写文件,读写数据库,网络信息交互)所使用的Scheduler,行为模式和newThread()差不多,区别在于io的内部实现使用一个无数量上线的线程池,可以重用空闲线程,因此多数情况下io()比newThread更有效率,不要把计算工作放在io()中,可以避免创建不必要的线程

Schedulers.computation():

计算所使用的Scheduler,这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形计算,这个Scheduler使用的固定的线程池,大小为CPU核数,不要把IO操作放在这里,否则等待时间会浪费cpu

AndroidSchedulers.mainThread():(rxAndroid中的类)

它指定的操作将在Android主线程运行

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull final ObservableEmitter<String> observableEmitter) throws Exception {
                String url1 = "http://is.snssdk.com/2/essay/discovery/v3/?iid=6152551759&aid=7";
                URL url = new URL(url1);
                //得到connection对象。
                HttpURLConnection connection = (HttpURLConnection) url.openConnection();
                //设置请求方式
                connection.setRequestMethod("GET");
                //连接
                connection.connect();
                //得到响应码
                int responseCode = connection.getResponseCode();
                if (responseCode == HttpURLConnection.HTTP_OK) {
                    //得到响应流
                    InputStream inputStream = connection.getInputStream();
                    //将响应流转换成字符串
                    String result = stream2String(inputStream);//将流转换为字符串。
                    LogUtils.i(TAG, "onSuccess");
                    observableEmitter.onNext(result);
                } else {
                    observableEmitter.onError(new Exception("失败"));
                }
            }
        })
           //指定网络请求在io线程,界面更新在主线程
           .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable disposable) {
                LogUtils.i(TAG, "onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                LogUtils.i(TAG, "onNext");
                mTextView.setText(s);

            }

            @Override
            public void onError(@NonNull Throwable throwable) {
                LogUtils.i(TAG, "onError");
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                LogUtils.i(TAG, "onComplete");
            }
        });

常用操作符

map

map的作用简单来说就是我输入一个数据类型的对象,转换得到另一个我想要的数据类型的对象,例如下边,传入integer,得到String。

map操作符对原始的Observable发送的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable

Observable.just(1).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return 1 + "哈哈,下雨了";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                LogUtils.i(TAG, s);
            }
        });
flatMap

flatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后flatMap合并这些Observables发射的数据,最后将合并后的结果当作它自己的数据序列发射

Observable.just(getUserParams()).flatMap(new Function<UserParams, ObservableSource<LoginResult>>() {
            @Override
            public ObservableSource<LoginResult> apply(UserParams userParams) throws Exception {
                //do something 模拟登录传入登录参数后获取到服务器返回值
                LoginResult result = new LoginResult(userParams);
                return Observable.just(result);
            }
        }).flatMap(new Function<LoginResult, ObservableSource<User>>() {
            @Override
            public ObservableSource<User> apply(LoginResult loginResult) throws Exception {
                //do something 模拟从登录返回值中根据userid获取到用户信息,返回User对象
                return Observable.just(new User());
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<User>() {
            @Override
            public void accept(User user) throws Exception {
                //显示用户姓名
                LogUtils.i(TAG, user.toString());
            }
        });
switchMap

将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据

Observable.just(s).filter(new Predicate<String>() {
            @Override
            public boolean test(String s) throws Exception {
                if (!TextUtils.isEmpty(s) && s.contains("a")) {
                    return true;
                }
                return false;
            }
        })//switchMap和flatMap只有一点区别,在这个场景下,由于每次输入的文字变化都会进行搜索,而且搜索
                //结果不一定是先请求的先返回,有可能是本来我要搜索abc,当我输入ab的时候进行了一次搜索,然后输入
                //完abc又请求了一次,但是由于各种原因,第一次请求的结果返回晚于第二次,那么第一次搜索的不是我想要
                //的结果,但是由于它返回的晚,反而把理想的搜索结果覆盖了,用switchMap可以解决这个问题,它会
                //返回最近一次请求的结果,即便是由于上边的原因导致的问题
        .switchMap(new Function<String, ObservableSource<List<String>>>() {
            @Override
            public ObservableSource<List<String>> apply(String s) throws Exception {
                List<String> list = new ArrayList<String>();
                list.add("搜索结果a");
                list.add("搜索结果B");
                return Observable.just(list);
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<List<String>>() {
                    @Override
                    public void accept(List<String> strings) throws Exception {
                        //将搜索结果展示在列表中
                        for (String string : strings) {
                            LogUtils.i(TAG, string);
                        }
                    }
                });

今天忙了很久,也只是在使用上了解了一些rxJava的东西,下一次会从源码层面分析

相关文章
|
12天前
|
存储 关系型数据库 分布式数据库
PostgreSQL 18 发布,快来 PolarDB 尝鲜!
PostgreSQL 18 发布,PolarDB for PostgreSQL 全面兼容。新版本支持异步I/O、UUIDv7、虚拟生成列、逻辑复制增强及OAuth认证,显著提升性能与安全。PolarDB-PG 18 支持存算分离架构,融合海量弹性存储与极致计算性能,搭配丰富插件生态,为企业提供高效、稳定、灵活的云数据库解决方案,助力企业数字化转型如虎添翼!
|
10天前
|
存储 人工智能 搜索推荐
终身学习型智能体
当前人工智能前沿研究的一个重要方向:构建能够自主学习、调用工具、积累经验的小型智能体(Agent)。 我们可以称这种系统为“终身学习型智能体”或“自适应认知代理”。它的设计理念就是: 不靠庞大的内置知识取胜,而是依靠高效的推理能力 + 动态获取知识的能力 + 经验积累机制。
374 133
|
10天前
|
存储 人工智能 Java
AI 超级智能体全栈项目阶段二:Prompt 优化技巧与学术分析 AI 应用开发实现上下文联系多轮对话
本文讲解 Prompt 基本概念与 10 个优化技巧,结合学术分析 AI 应用的需求分析、设计方案,介绍 Spring AI 中 ChatClient 及 Advisors 的使用。
459 131
AI 超级智能体全栈项目阶段二:Prompt 优化技巧与学术分析 AI 应用开发实现上下文联系多轮对话
|
4天前
|
存储 安全 前端开发
如何将加密和解密函数应用到实际项目中?
如何将加密和解密函数应用到实际项目中?
210 138
|
10天前
|
人工智能 Java API
AI 超级智能体全栈项目阶段一:AI大模型概述、选型、项目初始化以及基于阿里云灵积模型 Qwen-Plus实现模型接入四种方式(SDK/HTTP/SpringAI/langchain4j)
本文介绍AI大模型的核心概念、分类及开发者学习路径,重点讲解如何选择与接入大模型。项目基于Spring Boot,使用阿里云灵积模型(Qwen-Plus),对比SDK、HTTP、Spring AI和LangChain4j四种接入方式,助力开发者高效构建AI应用。
423 122
AI 超级智能体全栈项目阶段一:AI大模型概述、选型、项目初始化以及基于阿里云灵积模型 Qwen-Plus实现模型接入四种方式(SDK/HTTP/SpringAI/langchain4j)
|
4天前
|
存储 JSON 安全
加密和解密函数的具体实现代码
加密和解密函数的具体实现代码
219 136
|
22天前
|
机器学习/深度学习 人工智能 前端开发
通义DeepResearch全面开源!同步分享可落地的高阶Agent构建方法论
通义研究团队开源发布通义 DeepResearch —— 首个在性能上可与 OpenAI DeepResearch 相媲美、并在多项权威基准测试中取得领先表现的全开源 Web Agent。
1533 87
|
23天前
|
弹性计算 关系型数据库 微服务
基于 Docker 与 Kubernetes(K3s)的微服务:阿里云生产环境扩容实践
在微服务架构中,如何实现“稳定扩容”与“成本可控”是企业面临的核心挑战。本文结合 Python FastAPI 微服务实战,详解如何基于阿里云基础设施,利用 Docker 封装服务、K3s 实现容器编排,构建生产级微服务架构。内容涵盖容器构建、集群部署、自动扩缩容、可观测性等关键环节,适配阿里云资源特性与服务生态,助力企业打造低成本、高可靠、易扩展的微服务解决方案。
1365 8