Rx学习

简介:   RXjava学习资料:    https://www.gitbook.com/book/yuxingxin/rxjava-essentials-cn/details    如下只是学习笔记而已,后面添加实战案例,现在只是理论总结:    Rxjava语言特点:    1,易于并发从而更好的利用服务器的能力;    2,易于有条件的异步执行;    3,一种更好的方式来避免回调地狱;    4,一种响应式方法。

  RXjava学习资料:
  
  https://www.gitbook.com/book/yuxingxin/rxjava-essentials-cn/details
  
  如下只是学习笔记而已,后面添加实战案例,现在只是理论总结:
  
  Rxjava语言特点:
  
  1,易于并发从而更好的利用服务器的能力;
  
  2,易于有条件的异步执行;
  
  3,一种更好的方式来避免回调地狱;
  
  4,一种响应式方法。
  
  RXjava源于观察者模式:
  
  添加了如下三个缺少的功能:
  
  1,生产者在没有更多数据可用时能够发出信号通知:oncompleted()事件。
  
  2,生产者在发生错误的时候能够发出信号通知:onError()事件。
  
  3,RxJavaObservables能够组合而不是嵌套,从而避免开发者陷入回调地狱。
  
  RXjava中的四个角色:
  
  1,Observable:观察得到的,看的见得
  
  2,Observer:观察者
  
  3,Subscriber:订阅者
  
  4,Subjects:服从。
  
  Observables和Subjects是两个“生产”实体
  
  Observers和Subscribers是两个“消费”实体。
  
  Rxjava之Observable
  
  onNext(T)检索数据
  
  onError(Throwable)发现错误
  
  onCompleted()完成
  
  RXjava之热Observables和冷Observables
  
  热Observables:只要创建完就发射数据,订阅他的观察者从序列中某位置接受数据。
  
  冷Observables:一直等待,知道观察者订阅他才发射数据,可确保能收到数据序列。

 

Observale创建的两种方式:

1:create

Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {

    }
});

 

2:from:从列表,数组来创建Observable,并一个个发射数据

List<Integer> items = new ArrayList<Integer>();
items.add(1);
Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
        System.out.println("Observable completed");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println("Oh,no! Something wrong happened!");
    }
    @Override
    public void onNext(Integer item) {
        System.out.println("Item is " + item);
    }
}); 

  3,just

  just()方法可以传入一到九个参数,它们会按照传入的参数的顺序来发射它

  们。

  无理由不发射数据并结束操作:

  Observable.empty(),Observable.never(),和

  Observable.throw()

  RxJava提供四种不同的Subject:

  PublishSubject

  BehaviorSubject:向订阅者发送截止订阅前最新的一个数据对象,然后发送订阅后数据流;

  ReplaySubject:缓存所订阅的所有数据,向任意一个订阅他的观察者重发数据流;

  AsyncSubject:当Observable完成AnsySubject只会发布最后一个数据给已定订阅的每一个观察者。

  as中开发使用RXjava,RXandroid:

  gradle依赖:

 

compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.0'

  其他推荐的as插件:

  1,lombok:getsettostringequal等注解代码

  2,butterknife:findviewbyidonclick代码等的注解代码

  3,retrolambda:java8lambda相关函数

  具体作用情百度或者谷歌进行搜索

  just主要是得到原始的observable版本,在一个新的响应式架构的基础上迁移已存在的代码,这个方法可

  能是一个有用的开始点。

private void loadApps(AppInfo appOne, AppInfo appTwo, AppInfo appThree)
mRecyclerView.setVisibility(View.VISIBLE);
Observable.just(appOne, appTwo, appThree)
.subscribe(new Observer<AppInfo>() {
    @Override
    public void onCompleted() {
        mSwipeRefreshLayout.setRefreshing(false);
        Toast.makeText(getActivity(), "Here is the list!"
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(getActivity(), "Something went wrong!"
                       mSwipeRefreshLayout.setRefreshing(false);
    }
    @Override
    public void onNext(AppInfo appInfo) {
        mAddedApps.add(appInfo);
        mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
    }
});
}

 

  repeat()重复发数据:

  repeat(3):重复发送三次数据;

private void loadApps(AppInfo appOne, AppInfo appTwo, AppInfo appThree)
mRecyclerView.setVisibility(View.VISIBLE);
Observable.just(appOne, appTwo, appThree)
.repeat(3)
.subscribe(new Observer<AppInfo>() {
    @Override
    public void onCompleted() {
        mSwipeRefreshLayout.setRefreshing(false);
        Toast.makeText(getActivity(), "Here is the list!"
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(getActivity(), "Something went wrong!"
                       mSwipeRefreshLayout.setRefreshing(false);
    }
    @Override
    public void onNext(AppInfo appInfo) {
        mAddedApps.add(appInfo);
        mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
    }
});
}

 

defer()声明一个Observable但是你又想推迟这个Observable的创建直到观察者订阅

1
private Observable<Integer> getInt() {
    return Observable.create(subscriber -> {
        if(subscriber.isUnsubscribed()) {
            return;
        }
        App.L.debug("GETINT");
        subscriber.onNext(42);
        subscriber.onCompleted();
    });
}

2
Observable<Integer> deferred = Observable.defer(this::getInt);

3
deferred.subscribe(number -> {
    App.L.debug(String.valueOf(number));
});

 

range()
你需要从一个指定的数字X开始发射N个数字吗?你可以用 range,下面是从10开始,发送3个数字:

Observable.range(10, 3)
.subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
        Toast.makeText(getActivity(), "Yeaaah!", Toast.LENGTH_LONG).show();
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
    }
    @Override
    public void onNext(Integer number) {
        Toast.makeText(getActivity(), "I say " + number, Toast.LENGTH_SHORT).show();
    }
});

 

interval()
interval() 函数在你需要创建一个轮询程序时非常好用。

如下每隔3秒toast下数据:

Subscription stopMePlease = Observable.interval(3, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
        Toast.makeText(getActivity(), "Yeaaah!", Toast.LENGTH_LONG).show();
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
    }
    @Override
    public void onNext(Integer number) {
        Toast.makeText(getActivity(), "I say " + number, Toast.LENGTH_SHORT).show();
    }
});

 

  timer()

  如果你需要一个一段时间之后才发射的Observable,你可以像下面的例子使用timer():

  如下3秒后发送数据:

Observable.timer(3, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Long number) {
        Log.d("RXJAVA", "I say " + number);
    }
});

 

  如何过滤数据 Observables?
  
  RxJava让我们使用 filter() 方法来过滤我们观测序列中不想要的值,filter((appInfo) ->appInfo.getName().startsWith("C"))是只想展示c开头的数据。

1,过滤序列:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable.from(apps)
    .filter((appInfo) ->
            appInfo.getName().startsWith("C"))
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onError(Throwable e) {
            Toast.makeText(getActivity(), "Something went wrong!"
                           mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onNext(AppInfo appInfo) {
            mAddedApps.add(appInfo);
            mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
        }
    });
}

  过滤空数据:

.filter(new Func1<AppInfo, Boolean>() {
    @Override
    public Boolean call(AppInfo appInfo) {
        return appInfo != null;
    }
})

  获取开头或者结尾的几个数据:take() 或 takeLast()。
  
  take前几个数据;
  
  takelast后几个数据。
  
  如下是获取前三个数据:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable.from(apps)
    .take(3)
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onError(Throwable e) {
            Toast.makeText(getActivity(), "Something went wrong!"
                           mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onNext(AppInfo appInfo) {
            mAddedApps.add(appInfo);
            mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
        }
    });
}

  获取后三个数据:

Observable.from(apps)
.takeLast(3)
.subscribe(...);

  数据去重Distinct:

  制造重复数据:

Observable<AppInfo> fullOfDuplicates = Observable.from(apps)
                                       .take(3)
                                       .repeat(3);

  去重:

  

fullOfDuplicates.distinct()
.subscribe(new Observer<AppInfo>() {
    @Override
    public void onCompleted() {
        mSwipeRefreshLayout.setRefreshing(false);
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(getActivity(), "Something went wrong!"
                       mSwipeRefreshLayout.setRefreshing(false);
    }
    @Override
    public void onNext(AppInfo appInfo) {
        mAddedApps.add(appInfo);
        mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
    }
});
}

 忽略掉重复的值并且在温度确实改变的时候才得到通知:ditinctuntilchanged()

也就是说有重复数据的时候,利用ditinctuntilchanged可以去重并展示重复数据中的一个数据。

 

发射第一个数据 first()

发射最后一个数据last()

观测序列完成,不在发射任何值得时候,firstOrDefault(),lastOrDefault()

 

跳过前两个数据 skip(2)

跳过后两个数据 skipLast(2),也就是跳过后两个的其他数据发射。

 

发射指定位置的元素:elementAt(2),从零开始,第三个元素发射

 

指定时间间隔由Observable发射最近一次的数值:

如下每隔三秒,updateDisplay(currentTemperature) 更新当前温度信息

Observable<Integer> sensor = [...]
                             sensor.sample(30, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Integer currentTemperature) {
        updateDisplay(currentTemperature)
    }
});

 timeout:  timeout() 为一个Observable的限时的副本,如果在指定的时间间隔内Observable不发射值的话,它监听的原始的Observable时就会触发 onError() 函
数,也就是限定时间的字段:

下面代码意思就是每隔两秒刷新当前温度,如果不发射数据,就会打印:You should go check the sensor, dude。

Subscription subscription = getCurrentTemperature()
                            .timeout(2, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
        Log.d("RXJAVA", "You should go check the sensor, dude");
    }
    @Override
    public void onNext(Integer currentTemperature) {
        updateDisplay(currentTemperature)
    }
});

Debounce

过滤掉由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。

 

 2016年4月6日23:02:38

转换Observables

Map() :将函数作用于每一个发出去的元素上面去,以此创建一个新的Observable来发射转换的数据。

如下就是将原来的apps的名称小写,并发出去。

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable.from(apps)
    .map(new Func1<AppInfo, AppInfo>() {
        @Override
        public Appinfo call(AppInfo appInfo) {
            String currentName = appInfo.getName();
            String lowerCaseName = currentName.toLowerCase();
            appInfo.setName(lowerCaseName);
            return appInfo;
        }
    })
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onError(Throwable e) {
            Toast.makeText(getActivity(), "Something went wrong!"
                           mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onNext(AppInfo appInfo) {
            mAddedApps.add(appInfo);
            mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
        }
    });
}

FlatMap RxJava的 flatMap() 函数提供一种铺平序列的方式,然后合并这些Observables发射的数据,最后将合并后的结果作为最终的Observable。

 

ConcatMap:解决了 flatMap() 的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们。

 

FlatMapIterable:flatMapInterable() 和 flatMap() 很像。仅有的本质不同是它将源数据两两结成对并生成Iterable,而不是原始数据项和生成的Observables。

 

SwitchMap:switchMap() 和 flatMap() 很像,除了一点:每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项

产生的Observable,并开始监视当前发射的这一个。

 

Scan:scan() 函数对原始

Observable发射的每一项数据都应用一个函数,计算出函数的结果值。

Observable.just(1, 2, 3, 4, 5)
.scan((sum, item) -> sum + item)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
        Log.d("RXJAVA", "Sequence completed.");
    }
    @Override
    public void onError(Throwable e) {
        Log.e("RXJAVA", "Something went south!");
    }
    @Override
    public void onNext(Integer item) {
        Log.d("RXJAVA", "item is: " + item);
    }
});

groupBy()  来分组元素:

如下按照根据最近更新时间来进行分组。

Observable<GroupedObservable<String, AppInfo>> groupedItems = Observable.from(apps)
.groupBy(new Func1<AppInfo, String>() {
    @Override
    public String call(AppInfo appInfo) {
        SimpleDateFormat formatter = new SimpleDateFormat("MM/yyyy"
                return formatter.format(new Date(appInfo.getLastUpdateTime()));
    }
});
Observable.concat(groupedItems)
.subscribe(new Observer<AppInfo>() {
    @Override
    public void onCompleted() {
        mSwipeRefreshLayout.setRefreshing(false);
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        mSwipeRefreshLayout.setRefreshing(false);
    }
    @Override
    public void onNext(AppInfo appInfo) {
        mAddedApps.add(appInfo);
        mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
    }
});

  Buffer RxJava中的 buffer() 函数将源Observable变换一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射。
  buffer(count = 3)一组三个的方式进行发射一个列表的obserable。
  buffer(count = 2,skip = 2)一组两个,跳过第三个来进行发射一个列表的obserable。
  buffer(timespan = 2,count = 2)每隔2秒,每组2个发射一个列表的obserable.
  Window:RxJava的 window() 函数和 buffer() 很像,但是它发射的是Observable而不是列表。
  window(count = 3)原来observable的自己,每隔obserable三个元素。
  window(skip = 3,count = 2),每隔第三个,每隔obserable中有2个元素。
  Cast:
  map的特殊版本。将原observale每一项转换为新的类型,把他变成不同class.


 

组合observable

 2016年4月7日22:23:38

 Merge:

从单词角度分析,合并,rx中将帮助你把两个甚至更多的Observables合并到他们发射的数据项里

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    List reversedApps = Lists.reverse(apps);
    Observable<AppInfo> observableApps = Observable.from(apps);
    Observable<AppInfo> observableReversedApps = Observable.from(reversedApps);
//合并操作 Observable<AppInfo> mergedObserbable =
Observable.merge(observableApps, observableReversedApps); mergedObserbable.subscribe(new Observer<AppInfo>() { @Override public void onCompleted() { mSwipeRefreshLayout.setRefreshing(false); Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show(); } @Override public void onError(Throwable e) { Toast.makeText(getActivity(), "One of the two Observable threw an error!" mSwipeRefreshLayout.setRefreshing(false); } @Override public void onNext(AppInfoappInfo) { mAddedApps.add(appInfo); mAdapter.addApplication(mAddedApps.size() - 1, appInfo); } }); }

Zip操作:

zip() 合并两个或者多个Observables发射出的数据项,根据指定的函数 Func* 变换它们,并发射一个新值

 

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable<AppInfo> observableApp = Observable.from(apps);
    Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
    Observable.zip(observableApp, tictoc,
                   (AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }
        @Override
        public void onError(Throwable e) {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        }
        @Override
        public void onNext(AppInfoappInfo) {
            if (mSwipeRefreshLayout.isRefreshing()) {
                mSwipeRefreshLayout.setRefreshing(false);
            }
            mAddedApps.add(appInfo);
            int position = mAddedApps.size() - 1;
            mAdapter.addApplication(position, appInfo);
            mRecyclerView.smoothScrollToPosition(position);
        }
    });
}

  Join操作:
  RxJava的 join() 函数基于时间窗口将两个Observables发射的数据结合在一起。
  如下意思:tictoc 这个Observable数据每秒只发射一个新的 Long 型整数。
  为了合并它们,我们需要指定两个 Func1 变量:
  appInfo -> Observable.timer(2, TimeUnit.SECONDS)
  time -> Observable.timer(0, TimeUnit.SECONDS)
  上面描述了两个时间窗口。下面一行描述我们如何使用 Func2 将两个发射的数据
  结合在一起。
  this::updateTitle

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable<AppInfo> appsSequence =
        Observable.interval(1000, TimeUnit.MILLISECONDS)
    .map(position -> {
        return apps.get(position.intValue());
    });
    Observable<Long> tictoc = Observable.interval(1000, TimeUnit.MILLISECONDS);
    appsSequence.join(
        tictoc,
        appInfo -> Observable.timer(2, TimeUnit.SECONDS),
        time -> Observable.timer(0, TimeUnit.SECONDS),
        this::updateTitle)
    .observeOn(AndroidSchedulers.mainThread())
    .take(10)
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            Toast.makeText(getActivity(), "Here is the list!"
        }
        @Override
        public void onError(Throwable e) {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Something went wrong!"
        }
        @Override
        public void onNext(AppInfoappInfo) {
            if (mSwipeRefreshLayout.isRefreshing()) {
                mSwipeRefreshLayout.setRefreshing(false);
            }
            mAddedApps.add(appInfo);
            int position = mAddedApps.size() - 1;
            mAdapter.addApplication(position, appInfo);
            mRecyclerView.smoothScrollToPosition(position);
        }
    });
}

  combineLatest操作:
  zip() 作用于最近未打包的两个Observables。相反, combineLatest() 作用于最近发射的数据项:如果 Observable1 发射了A并且 Observable2 发射了B和C, combineLatest() 将会分组处理AB和AC。
  如下:
  一个是每秒钟从我们已安装的应用列表发射一个App数据,第二个是每隔1.5秒发射一个 Long 型整数。我们将他们结合起来并执行 updateTitle() 函数。

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
                                       .map(position ->apps.get(position.intValue()));
    Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
    Observable.combineLatest(appsSequence, tictoc,
                             this::updateTitle)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }
        @Override
        public void onError(Throwable e) {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        }
        @Override
        public void onNext(AppInfoappInfo) {
            if (mSwipeRefreshLayout.isRefreshing()) {
                mSwipeRefreshLayout.setRefreshing(false);
            }
            mAddedApps.add(appInfo);
            int position = mAddedApps.size() - 1;
            mAdapter.addApplication(position, appInfo);
            mRecyclerView.smoothScrollToPosition(position);
        }
    });
}

And,Then和When:

  我们有两个发射的序列, observableApp ,发射我们安装的应用列表数据, tictoc 每秒发射一个 Long 型整数。现在我们用 and() 连接源
  Observable和第二个Observable。

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable<AppInfo> observableApp = Observable.from(apps);
    Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
    Pattern2<AppInfo, Long> pattern = JoinObservable.from(observableApp).and(tictoc);
    Plan0<AppInfo> plan = pattern.then(this::updateTitle);
    JoinObservable
    .when(plan)
    .toObservable()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }
        @Override
        public void onError(Throwable e) {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Something went wrong!"
        }
        @Override
        public void onNext(AppInfoappInfo) {
            if (mSwipeRefreshLayout.isRefreshing()) {
                mSwipeRefreshLayout.setRefreshing(false);
            }
            mAddedApps.add(appInfo);
            int position = mAddedApps.size() - 1;
            mAdapter.addApplication(position, appInfo);
            mRecyclerView.smoothScrollToPosition(position);
        }
    });
}

Switch:

  subscribe-unsubscribe 的序列里我们能够从一个Observable自动取消订阅来订阅一个新的Observable。

  将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。

StartWith:

  startWith() 是 concat() 的对应部分。正如 concat() 向发射数据的Observable追加数据那样,在Observable开始发射他们的数据之前,
  startWith() 通过传递一个参数来先发射一个数据序列。

 

Schedulers调度器

 处理多线程异步操作:

  RxJava提供了5种调度器:
  .io() :IO操作,增长缩减自适应线程池(线程池无限制
  .computation:计算工作默认的调度器,调度器:buffer(),debounce(),delay(),interval(),sample,skip()。
  .immediate():允许在当前线程执行你指定的工作,是timeout(),timeInterval(),timestamp()默认调度器。
  .newThread():为指定任务启动一个线程。
  .trampoline():任务入队列,按需处理队列中的任务,是repeat()和retry()默认调度器。

 

如下就是简单将对图片文件的IO操作通过RX的io调度器来线程池中进行处理。

public static void storeBitmap(Context context, Bitmap bitmap, String filename)
Schedulers.io().createWorker().schedule(() -> {
    blockingStoreBitmap(context, bitmap, filename);
});
}
private static void blockingStoreBitmap(Context context, Bitmap bitmap, String filename)
FileOutputStream fOut = null;
try {
    fOut = context.openFileOutput(filename, Context.MODE_PRIVATE);
    bitmap.compress(Bitmap.CompressFormat.PNG, 100, fOut);
    fOut.flush();
    fOut.close();
} catch (Exception e) {
    throw new RuntimeException(e);
} finally {
    try {
        if (fOut != null) {
            fOut.close();
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
}

 

在主线程中返回结果。

onBackpressureBuffer()  方法将告诉Observable发射的数据如果比观察者消费的数据要更快的话,它必须把它们存储在缓存中并提供一个合适的时间给它们。

getApps()
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
    [...]

 

处理耗时的任务:

getObservableApps(apps)
.onBackpressureBuffer()
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
    [...]

 

 

相关文章
|
4月前
|
监控
stm32f407探索者开发板(十八)——串口通信实验讲解(USART_RX_STA流程图详解)
stm32f407探索者开发板(十八)——串口通信实验讲解(USART_RX_STA流程图详解)
224 0
|
6月前
|
传感器 存储 运维
161216-01 | Bently Nevada |以太网10BASE-T/100BASE-TX I/O模块
Bentley Nevada的161216-01是款以太网I/O模块,支持10BASE-T/100BASE-TX,用于机械保护监测。具备多通道输入,兼容多种信号类型,保证高可靠性和灵活性。模块带有前板LED指示运行状态和通信情况,具有可编程性及故障保护功能,适应-30°C至+65°C的工作温度。适用于复杂工业环境,确保系统安全运行。如需详细信息,可进一步咨询。
161216-01 | Bently Nevada |以太网10BASE-T/100BASE-TX  I/O模块
STM32的USART发送数据时如何使用TXE和TC标志
STM32的USART发送数据时如何使用TXE和TC标志
202 0
|
机器学习/深度学习 传感器 编解码
基于matlab实现空时分组编码 2 Tx 1 Rx
基于matlab实现空时分组编码 2 Tx 1 Rx
|
芯片 数据格式
ARM架构与编程(基于I.MX6ULL): 串口UART编程(七)(下)
ARM架构与编程(基于I.MX6ULL): 串口UART编程(七)
312 1
ARM架构与编程(基于I.MX6ULL): 串口UART编程(七)(下)
|
定位技术 芯片
ARM架构与编程(基于I.MX6ULL): 串口UART编程(七)(上)
ARM架构与编程(基于I.MX6ULL): 串口UART编程(七)
269 1
ARM架构与编程(基于I.MX6ULL): 串口UART编程(七)(上)
|
网络协议 测试技术 网络架构
以太网PHY的基础知识
以太网是一种计算机网络技术,它定义了开放系统互连 (OSI) 模型的物理层和数据链路层,IEEE 802.3 标准以一种结构化方式描述这些功能,强调系统的逻辑划分以及其如何组合在一起。由媒体访问控制器 (MAC) 组成的数据链路层可创建以太网数据帧,并使用底层以太网物理层通过介质传输数据帧。以太网物理层(简称PHY)是一个抽象层,负责传输和接收数据。PHY对传输的数据帧进行编码,并按照特定的操作调制速度、传输媒体类型和支持的链路长度对接收的帧进行解码。
990 0
以太网PHY的基础知识
|
物联网 开发者
Uart 和 asart 的介绍 | 学习笔记
快速学习 Uart 和 asart 的介绍
Uart 和 asart 的介绍 | 学习笔记
|
安全 C语言 芯片
CB5654串口入门必看——程序设计逻辑与用法
本文介绍 程序设计逻辑与用法
696 1
CB5654串口入门必看——程序设计逻辑与用法
|
缓存 自然语言处理 监控
PHY的基本知识
写DM9000网卡芯片驱动的预备知识 from:http://blog.sina.com.cn/s/blog_522a41b201009ha7.html ETHERNET的接口实质是MAC通过MII总线控制PHY的过程。
820 0