我用rxbus连续发送大量消息报错
08-26 09:37:13.458 10637-10637/com.dituwuyou E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.dituwuyou, PID: 10637java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)at android.os.Handler.handleCallback(Handler.java:739)at android.os.Handler.dispatchMessage(Handler.java:95)at android.os.Looper.loop(Looper.java:148)at android.app.ActivityThread.main(ActivityThread.java:5417)at java.lang.reflect.Method.invoke(Native Method)at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616) Caused by: rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requestsat rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)at android.os.Handler.handleCallback(Handler.java:739) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:148) at android.app.ActivityThread.main(ActivityThread.java:5417) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616) Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requestsat rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:308)at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)at com.dituwuyou.widget.rxjava.RxBus.send(RxBus.java:45)at com.dituwuyou.joint.CoorSocketService.messageReceived(CoorSocketService.java:51)at com.dituwuyou.fayeclient.FayeClient.parseFayeMessage(FayeClient.java:535)at com.dituwuyou.fayeclient.FayeClient.onMessage(FayeClient.java:390)at com.dituwuyou.fayeclient.HybiParser.emitFrame(HybiParser.java:304)at com.dituwuyou.fayeclient.HybiParser.start(HybiParser.java:130)at com.dituwuyou.fayeclient.WebSocketClient$1.run(WebSocketClient.java:119)at java.lang.Thread.run(Thread.java:818)
我的rxbus是这样定义的
import rx.Observable; import rx.subjects.PublishSubject; import rx.subjects.SerializedSubject; import rx.subjects.Subject;/*** Created by xg on 2016/6/24.* 消息传递(替换handler,eventbus)*/ public class RxBus { private static volatile RxBus mInstance; private final Subject bus;public RxBus() { bus = new SerializedSubject<>(PublishSubject.create()); }/*** 单例模式RxBus** @return*/ public static RxBus ge优艾设计网_PS问答tRxBusSingleton() { RxBus rxBus2 = mInstance; if (mInstance == null) { synchronized (RxBus.class) { rxBus2 = mInstance; if (mInstance == null) { rxBus2 = new RxBus(); mInstance = rxBus2; } } } return rxBus2; }/*** 发送消息** @param object*/ public void send(Object object) { bus.onNext(object); }/*** 接收消息** @return*/ public Observable toObserverable() { return bus; } }
这是第45行
bus.onNext(object);
应该是出现背压了
rikiki优艾设计网_设计百科 2022-09-22 13:57
用的什么版本的RxJava. 在1.1.6版本中已经没有rx.subjects.PublishSubject$PublishSubjectProducer这个类了
方益军 2022-09-22 13:59
1.看不懂你为什么要加个rxBus22.要处理onErr优艾设计网_设计模板or的情况3.先注册 后send
精彩评论