【我的Android进阶之旅】 RxJava 理解Backpressure并解决异常 rx.exceptions.MissingBackpressureException

简介: 今天测试人员在测试应用APP的时候应用crash了,查看了下crash log如下所示:java.lang.IllegalStateException: Exception thrown on Scheduler.

今天测试人员在测试应用APP的时候应用crash了,查看了下crash log如下所示:

java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
    at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)
    at android.os.Handler.handleCallback(Handler.java:751)
    at android.os.Handler.dispatchMessage(Handler.java:95)
    at android.os.Looper.loop(Looper.java:154)
    at android.app.ActivityThread.main(ActivityThread.java:6119)
    at java.lang.reflect.Method.invoke(Native Method)
    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)
Caused by: rx.exceptions.OnErrorNotImplementedException
    at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
    at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
    at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)
    at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
    at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
    ... 7 more
Caused by: rx.exceptions.MissingBackpressureException
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162)
    at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
    at rx.Scheduler$Worker$1.call(Scheduler.java:134)
    at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)
rx.exceptions.OnErrorNotImplementedException
    at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
    at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
    at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)
    at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
    at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
    at android.os.Handler.handleCallback(Handler.java:751)
    at android.os.Handler.dispatchMessage(Handler.java:95)
    at android.os.Looper.loop(Looper.java:154)
    at android.app.ActivityThread.main(ActivityThread.java:6119)
    at java.lang.reflect.Method.invoke(Native Method)
    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)
Caused by: rx.exceptions.MissingBackpressureException
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162)
    at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
    at rx.Scheduler$Worker$1.call(Scheduler.java:134)
    at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)
rx.exceptions.MissingBackpressureException
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162)
    at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
    at rx.Scheduler$Worker$1.call(Scheduler.java:134)
    at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)

查看MissingBackpressureException异常类的源代码如下描述:

/**
 * Copyright 2014 Netflix, Inc.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package rx.exceptions;

/**
 * Represents an exception that indicates that a Subscriber or operator attempted to apply reactive pull
 * backpressure to an Observable that does not implement it.
 * <p>
 * If an Observable has not been written to support reactive pull backpressure (such support is not a
 * requirement for Observables), you can apply one of the following operators to it, each of which forces a
 * simple form of backpressure behavior:
 * <dl>
 *  <dt><code>onBackpressureBuffer</code></dt>
 *   <dd>maintains a buffer of all emissions from the source Observable and emits them to downstream Subscribers
 *       according to the requests they generate</dd>
 *  <dt><code>onBackpressureDrop</code></dt>
 *   <dd>drops emissions from the source Observable unless there is a pending request from a downstream
 *       Subscriber, in which case it will emit enough items to fulfill the request</dd>
 * </dl>
 * If you do not apply either of these operators to an Observable that does not support backpressure, and if
 * either you as the Subscriber or some operator between you and the Observable attempts to apply reactive pull
 * backpressure, you will encounter a {@code MissingBackpressureException} which you will be notified of via
 * your {@code onError} callback.
 * <p>
 * There are, however, other options. You can throttle an over-producing Observable with operators like
 * {@code sample}/{@code throttleLast}, {@code throttleFirst}, or {@code throttleWithTimeout}/{@code debounce}.
 * You can also take the large number of items emitted by an over-producing Observable and package them into
 * a smaller set of emissions by using operators like {@code buffer} and {@code window}.
 * <p>
 * For a more complete discussion of the options available to you for dealing with issues related to
 * backpressure and flow control in RxJava, see
 * <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>.
 */
public class MissingBackpressureException extends Exception {

    private static final long serialVersionUID = 7250870679677032194L;

    /**
     * Constructs the exception without any custom message.
     */
    public MissingBackpressureException() {

    }

    /**
     * Constructs the exception with the given customized message.
     * @param message the customized message
     */
    public MissingBackpressureException(String message) {
        super(message);
    }

}

如上面文字描述,抛出MissingBackpressureException往往就是因为,被观察者发送事件的速度太快,而观察者处理太慢,而且你还没有做相应措施,所以报异常。

  • onBackpressurebuffer:
    把observable发送出来的事件做缓存,当request方法被调用的时候,给下层流发送一个item(如果给这个缓存区设置了大小,那么超过了这个大小就会抛出异常)。
    这里写图片描述

  • onBackpressureDrop:
    将observable发送的事件抛弃掉,直到subscriber再次调用request(n)方法的时候,就发送给它这之后的n个事件。

这里写图片描述

下面参考了几篇博客之后,理解了Backpressure的概念,并解决了rx.exceptions.MissingBackpressureException异常,读者也可以参考以下几篇博客之后,自己根据自己实际情况来选择不同的解决方式来解决该异常。

参考链接如下:

  1. 关于RxJava最友好的文章——背压(Backpressure)
    https://zhuanlan.zhihu.com/p/24473022?refer=dreawer

  2. RxJava 并发之数据流发射太快如何办(背压(Backpressure))
    http://blog.csdn.net/jdsjlzx/article/details/51868640

  3. RxJava 2.0中backpressure(背压)概念的理解
    http://blog.csdn.net/jdsjlzx/article/details/52717636
  4. RxJava 驯服数据流之 hot & cold Observable
    http://blog.csdn.net/jdsjlzx/article/details/51839090
  5. RxJava 教程第四部分:并发 之数据流发射太快如何办
    http://blog.chengyunfeng.com/?p=981&utm_source=tuicool&utm_medium=referral
  6. http://reactivex.io/RxJava/javadoc/rx/exceptions/MissingBackpressureException.html
  7. Backpressure
    https://github.com/ReactiveX/RxJava/wiki/Backpressure

  8. Java Code Examples for rx.exceptions.MissingBackpressureException
    http://www.programcreek.com/java-api-examples/index.php?api=rx.exceptions.MissingBackpressureException


作者:欧阳鹏 欢迎转载,与人分享是进步的源泉!
转载请保留原文地址: http://blog.csdn.net/ouyang_peng/article/details/66978253

这里写图片描述

相关文章
|
5月前
|
Java 调度 Android开发
深入解析Android应用开发中的响应式编程与RxJava应用
在现代Android应用开发中,响应式编程及其核心框架RxJava正逐渐成为开发者的首选。本文将深入探讨响应式编程的基本概念、RxJava的核心特性以及如何在Android应用中利用RxJava提升代码的可读性和性能。 【7月更文挑战第7天】
45 1
|
2月前
|
缓存 数据处理 Android开发
在 Android 中使用 RxJava 更新 View
【10月更文挑战第20天】使用 RxJava 来更新 View 可以提供更优雅、更高效的解决方案。通过合理地运用操作符和订阅机制,我们能够轻松地处理异步数据并在主线程中进行 View 的更新。在实际应用中,需要根据具体情况进行灵活运用,并注意相关的注意事项和性能优化,以确保应用的稳定性和流畅性。可以通过不断的实践和探索,进一步掌握在 Android 中使用 RxJava 更新 View 的技巧和方法,为开发高质量的 Android 应用提供有力支持。
|
7月前
|
API Android开发
Android高手进阶教程(十五)之---通过Location获取Address的使用!
Android高手进阶教程(十五)之---通过Location获取Address的使用!
65 1
|
6月前
|
开发工具 Android开发
android studio build异常
android studio build异常
40 3
|
6月前
|
存储 算法 Java
Android 进阶——代码插桩必知必会&ASM7字节码操作
Android 进阶——代码插桩必知必会&ASM7字节码操作
248 0
|
7月前
|
Android开发
jack-server导致 Android 编译 出现异常
jack-server导致 Android 编译 出现异常
188 6
|
7月前
|
设计模式 缓存 Java
补齐Android技能树——从AGP构建过程到APK打包过程,安卓rxjava面试
补齐Android技能树——从AGP构建过程到APK打包过程,安卓rxjava面试
|
7月前
|
缓存 网络协议 Java
挑战全网,史上最全Android开发进阶,跳槽复习指南(1),掌握这6大技能体系
挑战全网,史上最全Android开发进阶,跳槽复习指南(1),掌握这6大技能体系
|
7月前
|
Android开发
android捕获全局异常,并对异常做出处理
android捕获全局异常,并对异常做出处理
76 4
|
7月前
|
算法 安全 Java
2024年Android最新知识体系最强总结(全方面覆盖Android知识结构,BAT面试&学习进阶)
2024年Android最新知识体系最强总结(全方面覆盖Android知识结构,BAT面试&学习进阶)