闲鱼是如何利用RxJava提升异步编程能力的

简介: 使用RxJava,姿势正确很重要

作者——闲鱼技术鲲鸣

RxJava是Java对于反应式编程的一个实现框架,是一个基于事件的、提供实现强大且优雅的异步调用程序的代码库。18年以来,由淘宝技术部发起的应用架构升级项目,希望通过反应式架构、全异步化的改造,提升系统整体性能和机器资源利用率,减少网络延时,资源的重复使用,并为业务快速创新提供敏捷的架构支撑。在闲鱼的基础链路诸如商品批量更新、订单批量查询等,都利用了RxJava的异步编程能力。

不过,RxJava是入门容易精通难,一不小心遍地坑。今天来一起看下RxJava的使用方式、基本原理、注意事项。

1.开始之前


让我们先看下,使用RxJava之前,我们曾经写过的回调代码存在的痛点。


当我们的应用需要处理用户事件、异步调用时,随着流式事件的复杂性和处理逻辑的复杂性的增加,代码的实现难度将爆炸式增长。比如我们有时需要处理多个事件流的组合、处理事件流的异常或超时、在事件流结束后做清理工作等,如果需要我们从零实现,势必要小心翼翼地处理回调、监听、并发等很多棘手问题。


还有一个被称作“回调地狱”的问题,描述的是代码的不可读性。
Code 1.1

// 示例引自callbackhell.com
fs.readdir(source, function (err, files) {
  if (err) {
    console.log('Error finding files: ' + err)
  } else {
    files.forEach(function (filename, fileIndex) {
      console.log(filename)
      gm(source + filename).size(function (err, values) {
        if (err) {
          console.log('Error identifying file size: ' + err)
        } else {
          console.log(filename + ' : ' + values)
          aspect = (values.width / values.height)
          widths.forEach(function (width, widthIndex) {
            height = Math.round(width / aspect)
            console.log('resizing ' + filename + 'to ' + height + 'x' + height)
            this.resize(width, height).write(dest + 'w' + width + '_' + filename, function(err) {
              if (err) console.log('Error writing file: ' + err)
            })
          }.bind(this))
        }
      })
    })
  }
})

以上js代码有两个明显槽点: 1.由于传入的层层回调方法,代码结尾出现一大堆的 }) ; 2. 代码书写的顺序与代码执行的顺序相反:后面出现回调函数会先于之前行的代码先执行。


而如果使用了RxJava,我们处理回调、异常等将得心应手。

2.引入RxJava

假设现在要异步地获得一个用户列表,然后将结果进行处理,比如展示到ui或者写到缓存,我们使用RxJava后代码如下:
Code 2.1

Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception {
        System.out.println(Thread.currentThread().getName() + "----TestRx.subscribe");
        List<UserDo> result = userService.getAllUser();
        for (UserDo st : result) {emitter.onNext(st);}
    }
});
Observable<String> map = observable.map(s -> s.toString());
// 创建订阅关系
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub1 = " + o)/*更新到ui*/);
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub2 = " + o)/*写缓存*/,
                     e-> System.out.println("e = " + e)),
                     ()->System.out.println("finish")));

userService.getAllUser()是一个普通的同步方法,但是我们把它包到了一个Observable中,当有结果返回时,将user逐个发送至监听者。第一个监听者更新ui,第二个监听者写到缓存。并且当上游发生异常时,进行打印;在事件流结束时,打印finish。
另外还可以很方便的配置上游超时时间、调用线程池、fallback结果等,是不是非常强大。

需要注意的是,RxJava代码就像上面例子中看起来很容易上手,可读性也很强,但是如果理解不充分,很容易出现意想不到的bug:初学者可能会认为,上面的代码中,一个user列表返回后,每个元素会被异步地发送给两个下游的观察者,这两个观察者在各自的线程内打印结果。但事实却不是这样:userService.getAllUser()会被调用两次(每当建立订阅关系时方法getAllUser()都会被重新调用),而user列表被查询出后,会同步的发送给两个观察者,观察者也是同步地打印出每个元素。即sub1 = user1,sub1 = user2,sub1 = user3,sub2 = user1,sub2 = user2,sub2 = user3。


可见,如果没有其他配置,RxJava默认是同步阻塞的!!!那么,我们如何使用它的异步非阻塞能力呢,我们接着往下看。
Code 2.2

Observable
    .fromCallable(() -> {
         System.out.println(Thread.currentThread().getName() + "----observable fromCallable");
         Thread.sleep(1000); //  imitate expensive computation
         return "event";
     })
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.single())
    .map(i->{
        System.out.println(Thread.currentThread().getName() + "----observable map");
        return i;
    })
    .observeOn(Schedulers.newThread())
    .subscribe(str -> System.out.println(Thread.currentThread().getName() + "----inputStr=" + str));

System.out.println(Thread.currentThread().getName() + "----end");

Thread.sleep(2000); // <--- wait for the flow to finish. In RxJava the default Schedulers run on daemon threads

我们用Observable.fromCallable()代替code2.1中最底层的Observable.create方法,来创建了一个Observable(即被观察者)。fromCallable方法创建的是一个lazy的Observable,只有当有人监听它时,传入的代码才被执行。(关于这一点,我们后面会讲,这里只是为了展示有很多种创建Observable的方式)。
然后通过subscribeOn(Schedulers.io())指定了被观察者执行的线程池。observeOn(Schedulers.single())指定了下游观察者(map方法实际也是一个观察者)执行的线程池。map方法如同很多流式编程api一样,将上游的每个元素转化成另一个元素。最后又通过observeOn(Schedulers.newThread())制定了当前下游的观察者,即最后的subscribe中传入的观察者(lambda方式)执行的线程池。
上面的代码执行后,通过打印的线程名可以看出,被观察者、map、观察者均是不同的线程,并且,主线程最后的"end"会先执行,也就是实现了异步非阻塞。

3. 使用方式

本文不是RxJava的接口文档,不会详细介绍每个api,只是简单讲下一些常见或者特殊api,进一步阐述RxJava的能力。

3.1 基本组件

RxJava的核心原理其实非常简单。可类比观察者模式。Observable是被观察者,作为数据源产生数据。Observer是观察者,消费上游的数据源。
每个Observable可注册多个Observer。但是默认情况下,每当有注册发生时,Observable的生产方法subscribe都会被调用。如果想只生产一次,可以调用Observable.cached方法。


被观察者Observable还有多个变体,如Single、Flowable。Single代表只产生一个元素的数据源。Flowable是支持背压的数据源。通过背压设计,下游监听者可以向上游反馈信息,可以达到控制发送速率的功能。

Observable和Observer是通过装饰器模式层层包装达到从而串联起来。转换API如map等,会创建一个新的ObservableMap(基层自Observable),包装原始的Observable作为source,而在真正执行时,先做转换操作,再发给下游的观察者。


Scheduler是RxJava为多线程执行提供的支持类,它将可以将生产者或者消费者的执行逻辑包装成一个Worker,提交到框架提供的公共线程池中,如Schedulers.io()、Schedulers.newThread()等。便于理解,可以将Schedulers类比做线程池,Worker类比做线程池中的线程。可以通过Observable.subscribeOn和Observable.observeOn分别制定被观察者和观察者执行的线程,来达到异步非阻塞。

RxJava核心架构图如下:

image.png

3.2 转换API

  • map: 见Code 2.2,一对一转换,如同很多流式编程api一样,将上游的每个元素转化成另一个元素
  • flatMap: 一对多转换,将上游的每个元素转化成0到多个元素。类比Java8:Stream.flatMap内返回的是stream,Observerable.flatMap内返回的是Observerable。注意,本方法非常强大,很多api底层都是基于此方法。并且由于flatMap返回的多个Observerable是相互独立的,可以基于这个特点,实现并发。

3.3 组合API

  • merge:将两个事件流合并成一个时间流,合并后的事件流的顺序,与上流两个流中元素到来的时间顺序一致。

image.png

  • zip: 逐个接收上游多个流的每个元素,并且一对一的组合起来,转换后发送给下游。示例见code3.1

code 3.1

//第一个流每1秒输出一个偶数
Observable<Long> even = Observable.interval(1000, TimeUnit.MILLISECONDS).map(i -> i * 2L);
//第二个流每3秒输出一个奇数
Observable<Long> odd = Observable.interval(3000, TimeUnit.MILLISECONDS).map(i -> i * 2L + 1);
//zip也可以传入多个流,这里只传入了两个
Observable.zip(even, odd, (e, o) -> e + "," + o).forEach(x -> {
    System.out.println("observer = " + x);
});

/* 输出如下,可以看到,当某个流有元素到来时,会等待其他所有流都有元素到达时,才会合并处理然后发给下游
observer = 0,1
observer = 2,3
observer = 4,5
observer = 6,7
...
*/

代码code 3.1看起来没什么问题,两个流并发执行,最后用zip等待他们的结果。但是却隐藏了一个很重要的问题:RxJava默认是同步、阻塞的!!当我们想去仿照上面的方式并发发送多个请求,最后用zip监听所有结果时,很容易发先一个诡异的现象, code 3.2的代码中,ob2的代码总是在ob1执行之后才会执行,并不是我们预期的两个请求并发执行。而打印出来的线程名也可以看到,两个Single是在同一个线程中顺序执行的!

code 3.2

// Single是只返回一个元素的Observable的实现类
Single<String> ob1 = Single.fromCallable(() -> {
        System.out.println(Thread.currentThread().getName() + "----observable 1");
        TimeUnit.SECONDS.sleep(3);
        return userService.queryById(1).getName();
    });

Single<String> ob2 = Single.fromCallable(() -> {
        System.out.println(Thread.currentThread().getName() + "----observable 2");
        TimeUnit.SECONDS.sleep(1);
        return userService.queryById(1).getName();
    });

String s =  Single.zip(ob1, ob2, 
                       (e, o) -> {System.out.println(e + "++++" + o);

那为什么code 3.1的两个流能够并发执行呢?阅读源码可以发现zip的实现其实就是先订阅第一个流,再订阅第二个流,那么默认当然是顺序执行。但是通过Observable.interval创建的流,默认会被提交到 Schedulers.computation()提供的线程池中。关于线程池,本文后面会讲解。

3.4 创建API

  • create :最原始的create和subscribe,其他创建方法都基于此

code 3.3

// 返回的子类是ObservableCreate
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("event");
        emitter.onNext("event2");
        emitter.onComplete();
    }
});
// 订阅observable
observable.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        System.out.println(Thread.currentThread().getName() + " ,TestRx.onSubscribe");
    }
    @Override
    public void onNext(String s) {
        System.out.println(Thread.currentThread().getName() + " ,s = " + s);
    }
    @Override
    public void onError(Throwable e) {}
    @Override
    public void onComplete() {
        System.out.println(Thread.currentThread().getName() + " ,TestRx.onComplete");
    }
});
  • just : Observable.just("e1","e2"); 简单的创建一个Observable,发出指定的n个元素。
  • interval:code 3.1已给出示例,创建一个按一定间隔不断产生元素的Observable,默认执行在Schedulers.comutation()提供的线程池中
  • defer:产生一个延迟创建的Observable。 有点绕:Observable.create等创建出来的被观察者虽然是延迟执行的,只有有人订阅的时候才会真正开始生成数据。但是创建Observable的方法却是立即执行的。而 Observable.defer方法会在有人订阅的时候才开始创建Observable。如代码Code3.4
public String myFun() {
    String now = new Date().toString();
    System.out.println("myFun = " + now);
    return now;
}

public void testDefer(){
    // 该代码会立即执行myFun()
    Observable<String> ob1 = Observable.just(myFun());
    // 该代码会在产生订阅时,才会调用myFun(), 可类比Java8的Supplier接口
    Observable<String> ob2 = Observable.defer(() -> Observable.just(myFun()) ); 
}
  • fromCallable :产生一个延迟创建的Observable,简化的defer方法。Observable.fromCallable(() -> myFun()) 等同于Observable.defer(() -> Observable.just(myFun()) );

4.基本原理

RxJava的代码,就是观察者模式+装饰器模式的体现。

4.1 Observable.create

见代码code 3.3,create方法接收一个ObserverableOnSubscribe接口对象,我们定义了了发送元素的代码,create方法返回一个ObserverableCreate类型对象(继承自Observerable抽象类)。跟进create方法原码,直接返回new出来的ObserverableCreate,它包装了一个source对象,即传入的ObserverableOnSubscribe。
code4.1

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        //onAssembly默认直接返回ObservableCreate
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); 
    }

Create方法就这么简单,只需要记住它返回了一个包装了source的Observerble。
4.2 Observerable.subscribe(observer)
看下code3.3中创建订阅关系时(observalbe.subscribe)发生了什么:
code4.2

 public final void subscribe(Observer<? super T> observer) {
     ObjectHelper.requireNonNull(observer, "observer is null");
     try {
         observer = RxJavaPlugins.onSubscribe(this, observer);
         ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
         subscribeActual(observer);
     } catch (NullPointerException e) {... } catch (Throwable e) {... }
 }

Observable是一个抽象类,定义了subscribe这个final方法,最终会调用subscribeActual(observer);而subscribeActual是由子类实现的方法,自然我们需要看ObserverableCreate实现的该方法。
code4.3

//ObserverableCreate实现的subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent); //source是ObservableOnSubscribe,即我们写的生产元素的代码
    } catch (Throwable ex) {...}
}
  1. 将观察者observer包装到一个CreateEmitter里。
  2. 调用observer的onSubscribe方法,传入这个emitter。
  3. 调用source(即生产代码接口)的subscribe方法,传入这个emitter。


第二步中,直接调用了我们写的消费者的onSubscribe方法,很好理解,即创建订阅关系的回调方法。


重点在第三步,source.subscribe(parent); 这个parent是包装了observer的emitter。还记得source就是我们写的发送事件的代码。其中手动调用了emitter.onNext()来发送数据。那么我们CreateEmitter.onNext()做了什么
code4.4

public void onNext(T t) {
    if (t == null) {...}
    if (!isDisposed()) { observer.onNext(t); }
}

!isDisposed()判断若订阅关系还没取消,则调用observer.onNext(t);这个observer就是我们写的消费者,code 3.3中我们重写了它的onNext方法来print接收到的元素。


以上就是RxJava最基本的原理,其实逻辑很简单,就是在创建订阅关系的时候,直接调用生产逻辑代码,然后再生产逻辑的onNext中,调用了观察者observer.onNext。时序图如下。


image.png

显然,最基本的原理,完全解耦了和异步回调、多线程的关系。

4.2 Observable.map

通过最简答的map方法,看下转换api做了什么。
如Code2.1中,调用map方法,传入一个转换函数,可以一对一地将上游的元素转换成另一种类型的元素。
code4.5

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

code4.5是Observable定义的final的map方法,可见map方法将this(即原始的observer)和转换函数mapper包装到一个ObservableMap中(ObservableMap也继承Observable),然后返回这个ObservableMap(onAssembly默认什么都不做)。
由于ObservableMap也是一个Observable,所以他的subscribe方法会在创建订阅者时被层层调用到,subscribe是Observable定义的final方法,最终会调用到他实现的subscribeAcutal方法。
code4.6

//ObservableMap的subscribeActual
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

可以看到ObservableMap的subscribeActual中,将原始的观察者t和变换函数function包装到了一个新的观察者MapObserver中,并将它订阅到被观察者source上。
我们知道,发送数据的时候,观察者的onNext会被调用,所以看下MapObserver的onNext方法
code4.7

@Override
public void onNext(T t) {
    if (done) {return; }
    if (sourceMode != NONE) { actual.onNext(null);return;}
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {...}
    actual.onNext(v);
}

code4.7中可以看到mapper.apply(t)将变换函数mapper施加到每个元素t上,变换后得到v,最后调用actual.onNext(v)将v发送给下游观察者actual(actual为code4.6中创建MapObserver时传入的t)。

总结一下例如map之类的变换api的原理:

  1. map方法返回一个ObservableMap,包装了原始的观察者t和变换函数function
  2. ObservableMap继承自AbstractObservableWithUpstream(它继承自Observable)
  3. 订阅发生时,observable的final方法subscribe()会调用实现类的subscribeActual
  4. ObservableMap.subscribeActual中创建MapObserver(包装了原observer),订阅到原Observable
  5. 发送数据onNext被调用时,先apply变换操作,再调用原observer的onNext,即传给下游观察者



4.3 线程调度


代码Code 2.2中给出了线程调度的示例。subscribeOn(Schedulers.io())指定了被观察者执行的线程池。observeOn(Schedulers.single())指定了下游观察者执行的线程池。经过了上面的学习,很自然的能够明白,原理还是通过装饰器模式,将Observable和Observer层层包装,丢到线程池里执行。我们以observeOn()为例,见code4.8。

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    //observeOn(Scheduler) 返回ObservableObserveOn(继承自Observable)
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

// Observable的subscribe方法最终会调用到ObservableObserveOn.subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        //创建一个ObserveOnObserver包装了原观察者、worker,把它订阅到source(原observable)
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
  1. observeOn(Scheduler) 返回ObservableObserveOn
  2. ObservableObserveOn继承自Observable
  3. 所以subscribe方法最终会调用到ObservableObserveOn重写的subscribeActual方法
  4. subscribeActual返回一个ObserveOnObserver(是一个Observer)包装了真实的observer和worker

根据Observer的逻辑,发送数据时onNext方法会被调用,所以要看下ObserveOnObserver的onNext方法:
code4.9

public void onNext(T t) {
    if (done) { return; }
    if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t);}
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this); //this是ObserveOnObserver,他同样实现了Runable
    }
}

public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal(); //最终会调用actual.onNext(v) , 即调用被封装的下游观察者,v是emmiter
    }
}
  1. 最终生产者代码中调用onNext时,会调用schedule方法
  2. schedule方法中,会提交自身(ObserveOnObserver)到线程池
  3. 而run方法会调用onNext(emmiter)


可见,RxJava线程调度的机制就是通过observeOn(Scheduler)将发送元素的代码onNext(emmiter)提交到线程池里执行。


5.使用注意

最后,给出几个我们在开发中总结的注意事项,避免大家踩坑。

5.1 适用场景

并不是所有的IO操作、异步回调都需要使用RxJava来解决,比如如果我们只是一两个RPC服务的调用组合,或者每个请求都是独立的处理逻辑,那么引入RxJava并不会带来多大的收益。下面给出几个最佳的适用场景。

  • 处理UI事件
  • 异步响应和处理IO结果
  • 事件或数据 是由无法控制的生产者推送过来的
  • 组合接收到的事件


下面给一个闲鱼商品批量补数据的使用场景:
背景:算法推荐了用户的一些商品,目前只有基础信息,需要调用多个业务接口,补充用户和商品的附加业务信息,如用户头像、商品视频连接、商品首图等。并且根据商品的类型不同,填充不同的垂直业务信息。
难点:1. 多个接口存在前后依赖甚至交叉依赖;2. 每个接口都有可能超时或者报错,继而影响后续逻辑;3.根据不同的依赖接口特点,需要单独控制超时和fallback。整个接口也需要设置整体的超时和fallback。
方案:如果只是多个接口独立的异步查询,那么完全可以使用CompletableFuture。但基于它对组合、超时、fallback支持不友好,并不适用于此场景。我们最终采用RxJava来实现。下面是大致的代码逻辑。代码中的HsfInvoker是阿里内部将普通HSF接口转为Rx接口的工具类,默认运行到单独的线程池中,所以能实现并发调用。

// 查找当前用户的所有商品
Single<List<IdleItemDO>> userItemsFlow =
    HSFInvoker.invoke(() -> idleItemReadService.queryUserItems(userId, userItemsQueryParameter))
    .timeout(300, TimeUnit.MILLISECONDS)
    .onErrorReturnItem(errorRes)
    .map(res -> {
        if (!res.isSuccess()) {
            return emptyList;
        }
        return res.getResult();
    })
    .singleOrError();

//补充商品,依赖userItemsFlow
Single<List<FilledItemInfo>> fillInfoFlow =
    userItemsFlow.flatMap(userItems -> {

        if (userItems.isEmpty()) {
            return Single.just(emptyList);
        }

        Single<List<FilledItemInfo>> extraInfo =
            Flowable.fromIterable(userItems)
            .flatMap(item -> {

                //查找商品extendsDo
                Flowable<Optional<ItemExtendsDO>> itemFlow =
                    HSFInvoker.invoke(() -> newItemReadService.query(item.getItemId(), new ItemQueryParameter()))
                    .timeout(300, TimeUnit.MILLISECONDS)
                    .onErrorReturnItem(errorRes)
                    .map(res -> Optional.ofNullable(res.getData()));

                //视频url
                Single<String> injectFillVideoFlow = 
                    HSFInvoker.invoke(() -> videoFillManager.getVideoUrl(item))
                              .timeout(100, TimeUnit.MILLISECONDS)
                              .onErrorReturnItem(fallbackUrl);

                //填充首图
                Single<Map<Long, FrontCoverPageDO>> frontPageFlow =
                    itemFlow.flatMap(item -> {
                        ...
                        return frontCoverPageManager.rxGetFrontCoverPageWithTpp(item.id);
                    })
                    .timeout(200, TimeUnit.MILLISECONDS)
                    .onErrorReturnItem(fallbackPage);

                return Single.zip(itemFlow, injectFillVideoFlow, frontPageFlow, (a, b, c) -> fillInfo(item, a, b, c));
            })
            .toList();  //转成商品List

        return extraInfo;
    });

//头像信息
Single<Avater> userAvaterFlow =
    userAvaterFlow = userInfoManager.rxGetUserAvaters(userId).timeout(150, TimeUnit.MILLISECONDS).singleOrError().onErrorReturnItem(fallbackAvater);

//组合用户头像和商品信息,一并返回
return Single.zip(fillInfoFlow, userAvaterFlow,(info,avater) -> fillResult(info,avater))
             .timeout(300, TimeUnit.MILLISECONDS)
             .onErrorReturn(t -> errorResult)
             .blockingGet(); //最后阻塞式的返回

可以看到,通过引入RxJava,对于超时控制、兜底策略、请求回调、结果组合都能更方便的支持。

5.2 Scheduler线程池

RxJava2 内置多个 Scheduler 的实现,但是我们建议使用Schedulers.from(executor)指定线程池,这样可以避免使用框架提供的默认公共线程池,防止单个长尾任务block其他线程执行,或者创建了过多的线程导致OOM。

5.3 CompletableFuture

当我们的逻辑比较简单,只想异步调用一两个RPC服务的时,完全可以考虑使用Java8提供的CompletableFuture实现,它相较于Future是异步执行的,也可以实现简单的组合逻辑。

5.4 并发

单个Observable始终是顺序执行的,不允许并发地调用onNext()。
code5.1

Observable.create(emitter->{
    new Thread(()->emitter.onNext("a1")).start();
    new Thread(()->emitter.onNext("a2")).start();
})

但是,每个Observable可以独立的并发执行。
code5.2

Observable ob1 = Observable.create(e->new Thread(()->e.onNext("a1")).start());
Observable ob2 = Observable.create(e->new Thread(()->e.onNext("a2")).start());
Observable ob3 = Observable.merge(ob1,ob2);

ob3中组合了ob1和ob2两个流,每个流是独立的。(这里需要注意,这两个流能并发执行,还有一个条件是他们的发送代码运行在不同线程,就如果code3.1和code3.2中的示例一样,虽然两个流是独立的,但是如果不提交到不同的线程中,还是顺序执行的)。


5.5 背压

在 RxJava 2.x 中,只有 Flowable 类型支持背压。当然,Observable 能解决的问题,对于 Flowable 也都能解决。但是,其为了支持背压而新增的额外逻辑导致 Flowable 运行性能要比 Observable 慢得多,因此,只有在需要处理背压场景时,才建议使用 Flowable。如果能够确定上下游在同一个线程中工作,或者上下游工作在不同的线程中,而下游处理数据的速度高于上游发射数据的速度,则不会产生背压问题,就没有必要使用Flowable。关于Flowable的使用,由于篇幅原因,就不在本文阐述。


5.6 超时

强烈建议设置异步调用的超时时间,用timeout和onErrorReturn方法设置超时的兜底逻辑,否则这个请求将一直占用一个Observable线程,当大量请求到来时,也会导致OOM。


6.结语

目前,闲鱼的多个业务场景都采用RxJava做异步化,大大降低了开发同学的异步开发成本。同时在多请求响应组合、并发处理都有很好的性能表现。自带的超时逻辑和兜底策略,在批量业务数据处理中能保证可靠性,是用户流畅体验的强力支撑。

相关文章
|
6月前
|
数据库 Android开发 开发者
构建高性能微服务架构:从理论到实践构建高效Android应用:探究Kotlin协程的优势
【2月更文挑战第16天】 在当今快速迭代和竞争激烈的软件市场中,微服务架构以其灵活性、可扩展性和独立部署能力而受到企业的青睐。本文将深入探讨如何构建一个高性能的微服务系统,涵盖从理论基础到具体实现的各个方面。我们将重点讨论服务拆分策略、通信机制、数据一致性以及性能优化等关键主题,为读者提供一个清晰、实用的指南,以便在复杂多变的业务环境中构建和维护健壮的微服务体系结构。 【2月更文挑战第16天】 在移动开发领域,性能优化和流畅的用户体验是至关重要的。随着技术的不断进步,Kotlin作为一种现代编程语言,在Android开发中被广泛采用,尤其是其协程特性为异步编程带来了革命性的改进。本文旨在深入
|
3月前
|
Java
探索Java新境界!异步+事件驱动,打造响应式编程热潮,未来已来!
【8月更文挑战第30天】在现代软件开发中,系统响应性和可扩展性至关重要。Java作为主流编程语言,提供了多种机制如Future、CompletableFuture及事件驱动编程,有效提升应用性能。本文探讨Java异步编程模型与事件驱动编程,并介绍响应式模式,助您构建高效、灵活的应用程序。
59 3
|
2月前
|
数据处理 开发者 C++
Kotlin协程与RxJava:谁将称雄现代应用开发?揭秘背后的技术博弈与选择之道!
【9月更文挑战第13天】本文对比了现代应用开发中备受欢迎的两种并发编程方案:Kotlin协程与RxJava。Kotlin协程以轻量级线程和挂起函数简化异步编程,尤其适合I/O密集型任务;RxJava基于观察者模式,擅长处理复杂异步数据流。文中还提供了示例代码,帮助开发者根据项目需求和偏好做出合适的选择。
64 1
|
2月前
|
API 数据处理 数据库
掌握 Kotlin Flow 的艺术:让无限数据流处理变得优雅且高效 —— 实战教程揭秘如何在数据洪流中保持代码的健壮与灵活
Kotlin Flow 是一个强大的协程 API,专为处理异步数据流设计。它适合处理网络请求数据、监听数据库变化等场景。本文通过示例代码展示如何使用 Kotlin Flow 管理无限流,如实时数据流。首先定义了一个生成无限整数的流 `infiniteNumbers()`,然后结合多种操作符(如 `buffer`、`onEach`、`scan`、`filter`、`takeWhile` 和 `collectLatest`),实现对无限流的优雅处理,例如计算随机数的平均值并在超过阈值时停止接收新数据。这展示了 Flow 在资源管理和逻辑清晰性方面的优势。
60 0
|
6月前
|
API 调度 Android开发
打造高效Android应用:探究Kotlin协程的优势与实践
【5月更文挑战第27天】在移动开发领域,性能优化和响应速度是衡量应用质量的关键因素。随着Kotlin语言的普及,协程作为其核心特性之一,为Android开发者提供了一种全新的并发处理方式。本文深入探讨了Kotlin协程在Android应用开发中的优势,并通过实例演示如何在实际项目中有效利用协程提升应用性能和用户体验。
|
6月前
|
物联网 区块链 vr&ar
构建高效Android应用:Kotlin协程的实践指南未来交织:新兴技术趋势与跨领域应用探索
【5月更文挑战第28天】随着移动应用开发的不断进步,开发者寻求更高效、更简洁的方式来处理异步任务和提升用户体验。在Android平台上,Kotlin协程作为一种轻量级的线程管理方案,提供了强大的工具来简化并发和异步编程。本文将深入探讨Kotlin协程的核心概念,并通过实例演示如何在Android应用中利用协程优化性能和响应性。通过本文,你将学会如何运用协程来编写更加流畅和高效的代码,同时减少内存消耗和提高应用的稳定性。 【5月更文挑战第28天】 随着科技的迅猛发展,一系列创新技术如区块链、物联网(IoT)、虚拟现实(VR)等正在逐渐从概念验证走向实际应用。这些技术的融合与交叉不仅预示着信息时
|
6月前
|
调度 数据库 Android开发
构建高效安卓应用:探究Kotlin协程的优势
【5月更文挑战第27天】在移动开发领域,性能优化和流畅的用户体验始终是开发者追求的核心目标。随着Kotlin语言在Android平台的广泛采用,其提供的协程功能已经成为实现异步编程和提升应用响应性的重要工具。本文将深入探讨Kotlin协程在Android开发中的应用优势,通过与传统线程和回调机制的对比,揭示协程如何简化代码结构、提高执行效率,并最终增强应用性能。我们将从协程的基本概念出发,逐步解析其在网络请求、数据库操作和UI线程中的具体实践,以期为Android开发者提供性能优化的新思路。
|
6月前
|
移动开发 数据库 Android开发
构建高效Android应用:探究Kotlin的协程优势
【5月更文挑战第22天】随着移动开发技术的不断进步,Android平台的性能优化已经成为开发者关注的焦点。在众多提升应用性能的手段中,Kotlin语言提供的协程概念因其轻量级线程管理和异步编程能力而受到广泛关注。本文将深入探讨Kotlin协程在Android开发中的应用,以及它如何帮助开发者构建出更高效、响应更快的应用,同时保持代码的简洁性和可读性。
|
6月前
|
移动开发 数据库 Android开发
构建高效Android应用:探究Kotlin协程的优势与实践
【5月更文挑战第29天】 随着移动开发技术的不断进步,开发者寻求更高效、更简洁的方式来编写代码。在Android平台上,Kotlin语言凭借其现代化的特性和对协程的原生支持,成为提高开发效率的关键。本文将深入分析Kotlin协程的核心优势,并通过实例展示如何在Android应用开发中有效地利用协程来处理异步任务,优化性能,以及提升用户体验。通过对比传统线程和回调机制,我们将揭示协程如何简化异步编程模型,并减少内存消耗,使应用更加健壮和可维护。
|
6月前
|
移动开发 调度 Android开发
构建高效Android应用:探究Kotlin协程的优势
【5月更文挑战第28天】在移动开发领域,性能优化和资源管理是持续的挑战。为了应对这些挑战,Android开发者转向了多种解决方案,其中Kotlin协程作为一种新兴的异步编程范式,正逐渐受到关注。本文将深入探讨Kotlin协程如何改善Android应用的性能、提高代码可读性以及简化异步逻辑,同时通过实例演示其在实际应用中的实现方式。