RxJava之subscribeOn解惑

阅读:493 2019-03-19 14:42:28 来源:新网

有一天,我在使用rxjava和retrofit实现android上面的网络请求。突然,遇到了一个坑,在过了这些坑之后得到一些经验,觉得需要和大家分享一二。

用retrofit搭配rxjava的朋友应该都知道,一般实现代码最终大都长得一幅下面的样子。

publicinterfacebeanapi{@get("bean/{id}")observablegetbean(@path("id")intbeanid);}beanapiapi=···通过retrofit的create实例化···api.getbean(1).subscribeon(schedulers.io()).observeon(androidschedulers.mainthread()).subscribe(bean->{//dosomethingonmainthread.},throwable->{//dosomethingonmainthread.});

上面的代码形式相信各位都写得很熟了,但是我实在烦透了每个api调用的地方都写一次subscribon,observeon。然后,我找到一篇不错的文章——don'tbreakthechain:userxjava'scompose()operator,里面提到了一个方法避免这种重复代码(其实作者本意是要体现“不要打破链式操作”,而非避免重复代码)。最后改进到的代码就长下面的样子了。

//thisisacommonmethod.transformerapplyschedulers(){returnobservable->observable.subscribeon(schedulers.io()).observeon(androidschedulers.mainthread());}api.getbean(1).compose(applyschedulers()).subscribe(bean->{//dosomethingonmainthread.},throwable->{//dosomethingonmainthread.});

改进后的代码比原来的代码少了一行。但是写多几次之后,我还是烦透了这个applyschedulers()。于是我疯了,就自己实现一个retrofit的calladapter.factory,让retrotfit在每次调用api的时候自动就给我封装好subscribeon和observeon这些重复的代码,具体实现可以参考我的另外一篇文章——通过委派模式包装一个rxjavacalladapterfactory。最后,我的代码就是长下面这个样子了。

api.getbean(1).subscribe(bean->{//dosomethingonmainthread.},throwable->{//dosomethingonmainthread.});

所有的subscribeon和observeon不用再写了。因为每次调用api.getbean(1),retrotfit就调用我自定义的calladapter.factory把结果封装成observable对象的时候就已经把subscribeon和observeon添加上去了。

好,用得很爽。但是问题问题比办法多,所以问题来了。有几个特殊的地方我需要网络加载和结果监听都在当前线程。相信理解了上面代码的朋友都已经看出来了,现在我通过api.getbean(1)获取到的observable最后被订阅都会是在io线程获取网络数据,然后在mainthread线程进行结果处理。所以,我要想个办法出来,覆盖原来的schedulers,包括subscribeon的scheduler和observeon的scheduler。于是,我写了下面的代码。

//isasyncisabooleanvariableindicatewhethertherequestisaasynchronousornot.api.getbean(1).subscribeon(isasync?schedulers.io():schedulers.immediate()).observeon(isasync?androidschedulers.mainthread():schedulers.immediate()).subscribe(bean->{//dosomethingonmainthread.},throwable->{//dosomethingonmainthread.});

上面的代码再结合我之前写的calladapter.factory,其实就是相当于没有自定义calladapter.factory之前显式调用两次subscribeon和两次observeon,就像下面的样子。

api.getbean(1).subscribeon(schedulers.io()).observeon(androidschedulers.mainthread()).subscribeon(isasync?schedulers.io():schedulers.immediate()).observeon(isasync?androidschedulers.mainthread():schedulers.immediate()).subscribe(bean->{//dosomethingonmainthread.},throwable->{//dosomethingonmainthread.});前凑

作为一名rxjava的标准菜鸟,我被验证了自己的确很菜,我天真的认为后面的subscribeon和observeon会覆盖之前的scheduler,我理所当然的认为,当isasync为true的时候,这次api的调用就会在当前线程执行网络访问和结果处理。于是,我被搞疯了。我就看了subscribeon的源码,如下。

publicfinalobservablesubscribeon(schedulerscheduler){···省略一些于逻辑阅读不重要的代码returnnest().lift(newoperatorsubscribeon(scheduler));}

nest的代码如下,

publicfinalobservable>nest(){returnjust(this);}

意思就是新建一个observable,并且只会向订阅者发送一个元素——原来api.getbean(1)获得的observale对象。所以nest操作获得的结果是observable>对象。好,这里记着这个东东。下面我先分析一下lift操作,然后我们再回头把他们串在一起。

在看lift操作之前,我们稍微回顾一下observable的创建方法,

finalonsubscribeonsubscribe;publicfinalstaticobservablecreate(onsubscribef){returnnewobservable(hook.oncreate(f));}protectedobservable(onsubscribef){this.onsubscribe=f;}

其他的什么from、just等创建方法最后都是把数据转化为一个onsubscribe对象再通过上面的create方法创建。所以我们只关注这个create方法。上面代码的意思很简单,就是new一个observable对象,并且设置onsubscribe。所以这里的关键是onsubscribe这个对象。这里我管它做数据源,即observable对象会用它来产生数据,并且发布给订阅者。

看到这里,我们可以发现,observable其实没有什么,只有两个关键点:1、装载着一个onsubscribe对象,2、有订阅者注册时,就调用用这个onsubscribe的call(subscriber)方法。

这里我们要看一下这个call(subscriber)方法。该方法接受一个参数subscriber,即订阅者。当有订阅者注册到observable对象时,observable对象就调用onsubscribe的这个call方法,并且把当前当前注册的订阅者作为参数传递过去。所以在call方法的实现中就可以调用订阅者的onnext方法来发布数据或者做其他事(不一定是发布数据)。

先把lift操作的代码贴出来。

publicfinalobservablelift(finaloperatoroperator){returnnewobservable(newonsubscribe(){@overridepublicvoidcall(subscribero){try{subscriberst=hook.onlift(operator).call(o);try{//newsubscribercreatedandbeingsubscribedwithso'onstart'itst.onstart();onsubscribe.call(st);}catch(throwablee){···省略一些于逻辑阅读不重要的代码st.onerror(e);}}catch(throwablee){···省略一些于逻辑阅读不重要的代码//aswedon'thavetheoperatoravailabletouso.onerror(e);}}});}

从上面代码第2行的newobservable可以看出,lift操作其实是新建一个observable对象然后返回。这里加点高级的标识方便下面的阅读,被new出来的observable对象我们叫它做派生observable,而当前observable就叫父级observable。

在上面observable结构一节中,我们知道每个observable都持有一个onsubscribe对象作为数据源。通过lift方法派生所得的observable也不例外,也有一个,就是上面代码第二行newonsubscribe实例化的匿名对象。这个onsubscribe虽然也是数据源,但是自己却从来不产生数据,也不直接向订阅者直接发布数据。它做的事就只是把自己的订阅者包装成为一个父级observable认可的订阅者,然后委派给父级的数据源(上面代码第十行)。父级observable的数据源向自己的订阅者发布数据,就是发送到被包装出来的这个订阅者中来。

小结:到这里为止,要记住很重要的一点,通过lift操作产生的派生observable对象的数据源(onsubscribe)是不实际产生数据的,它做的事就只是把自己的订阅者包装成为一个父级observable认可的订阅者,然后委派给父级的数据源。

那么被包装出来的这个订阅者是怎么处理父级数据源发布的数据呢?这里就要回到上面代码的第6行。那里通过一个我们调用lift操作时传进去的operator把派生observable认可的subscriber包装成一个父级observable认可的subscriber。

下面我看一个lift操作的例子,用lift模拟了两次map操作。

代码视图1:

classbean{intvalue;bean(intv){this.value=v;}}observable.just(newbean(1),newbean(2),newbean(3),newbean(4)).lift(newobservable.operator(){@overridepublicsubscribercall(subscribersubscriber){returnnewsubscriber(subscriber){@overridepublicvoidoncompleted(){subscriber.oncompleted();}@overridepublicvoidonerror(throwablee){subscriber.onerror(e);}@overridepublicvoidonnext(beanbean){subscriber.onnext(bean.value);}};}}).lift(newobservable.operator(){@overridepublicsubscribercall(subscribersubscriber){returnnewsubscriber(subscriber){@overridepublicvoidoncompleted(){subscriber.oncompleted();}@overridepublicvoidonerror(throwablee){subscriber.onerror(e);}@overridepublicvoidonnext(integeri){subscriber.onnext(string.valueof(i));}};}}).subscribe(system.out::println);

上面的代码中observable的链式操作其实是等价于下面代码形式的,

代码视图2:

observableo1=observable.just(newbean(1),newbean(2),newbean(3),newbean(4));observableo2=o1.lift(···);observableo3=o2.lift(···);subscribersubscriber3=newsubscriber(){@overridepublicvoidoncompleted(){}@overridepublicvoidonerror(throwablee){}@overridepublicvoidonnext(strings){system.out.println(s);}};o3.subscribe(subscriber3);

从代码视图2中,我们可以发现这一连串的操作下来,一共产生了3个observable对象:o1、o2、o3。之前我们说过每个observable对象都会持有一个onsubscribe对象作为数据源,用来向订阅者发布数据。我们以不同的标识来区分一下上面三个observable对象对应的onsubscribe对象:o1=>onsubscribe1,o2=>onsubscribe2,o3=>onsubscribe3。

从上面lift操作说明一节,我们知道lift操作产生的observable对象的数据源是不产生数据的,它做的事就只是把自己的订阅者包装成为一个父级observable认可的订阅者,然后委派给父级的数据源。o3是一个通过lift操作产生的派生observable,当订阅者subscriber3注册到o3时,o3的数据源onsubscribe3就会把subscriber3包装成一个父级(o2)可以的订阅者(这里命名为subscriber2),然后委派给父级数据源(onsubscribe2)。现在请回头看代码视图1的第34-49行。这一段代码就显示了把subscriber3包装成为subscriber2的过程。可以发现,被包装出来的subscriber2只做一件事,就是把onsubscribe2发布给自己的数据转化为subscriber3可以消化的数据,然后就交给subscriber3,相当于充当了一个subscriber3和onsubscribe2之间的桥梁。

接着分析,onsubscribe2虽然说,通过subscriber2间接把数据发布到了subscriber3。但是实际上,作为数据源,它的持有者o2,也是通过lift操作产生的派生observable,所以这个onsubscribe2也是不直接产生数据的。它也是把自己的订阅者(subscriber2)包装一个父级(o1)认可的订阅者(这里命名为subscriber1),然后委派给父级数据源(onsubscribe1)。现在请再回头看代码视图1的第13-28行。这段代码显示了如何把subscriber2包装成为subscriber1的过程。同样,subscriber1也只做一件事,就是把onsubscribe1发布给自己的数据转化为subscriber2可以消化的数据,然后就交给subscriber2,相当于充当了一个subscriber2和onsubscribe1之间的桥梁。

最后,整个过程可以描述为下面的一个示意图,

rxjava中,lift操作几乎贯穿了整个observable,因为差不多所有所有的操作符都是通过lift操作来实现的。比如map操作符,其实就是通过lift操作产生的派生observable而已。所以这个派生observable的数据源也就如上面我所述,自己不产生数据,而是把自己的订阅者包装成一个父级认可的订阅者。怎么包装呢?上面的讲述中,这个包装过程其实是通过我们调用lift操作时传递的参数operator来完成的。

我们再回顾subscribeon操作符的源码。首先,通过nest操作产生一个observable>(我们原来操作的是observable),然后它上面调用lift操作,那么observable>就是父级了,lift操作最终产生的派生observable就是整个subscribeon操作产生的结果。看subscribeon操作的源码我们可以发现,它是通过operatorsubscribeon这么一个operator来实现subscriber的包装。那么我们来看一下operatorsubscribeon的源码,分析一下具体的包装过程。

publicclassoperatorsubscribeonimplementsoperator>{privatefinalschedulerscheduler;publicoperatorsubscribeon(schedulerscheduler){this.scheduler=scheduler;}@overridepublicsubscriber>call(finalsubscribersubscriber){finalworkerinner=scheduler.createworker();subscriber.add(inner);returnnewsubscriber>(subscriber){···这里省略了一些于逻辑阅读无关的代码。@overridepublicvoidonnext(finalobservableo){inner.schedule(newaction0(){@overridepublicvoidcall(){finalthreadt=thread.currentthread();o.unsafesubscribe(newsubscriber(subscriber){@overridepublicvoidoncompleted(){subscriber.oncompleted();}@overridepublicvoidonerror(throwablee){subscriber.onerror(e);}@overridepublicvoidonnext(tt){subscriber.onnext(t);}···这里省略了一些于逻辑阅读无关的代码。});}});}};}}

从上面的分析中,我们知道subscribeon操作其实是先通过nest操作产生一个observable>对象,再通过lift操作产生派生observable(即(observable)对象的,所以observable>对象就是父级。所以operatorsubscribeon的职责就是为包装一个observable>认可的订阅者。被包装出来的订立者接受到父级发布的数据(即一个observable对象)时,它这里没有把数据转换成下级subscriber(即上面代码第10行传递给call方法的参数)可消化的数据,而是通过inner对象来安装一次订阅。

小结,经过subscribeon操作产生了一个派生observable对象,这个对象的数据源(onsubscribe)的工作是为自己的订阅者在某个线程上安排订阅。

代码视图3

observable.just(newbean(1),newbean(2),newbean(3),newbean(4)).subscribeon(schedulers.io()).subscribeon(schedulers.immediate()).subscribe(bean->system.out.println(bean.value));

看过subscribeon的源码之后,我们应该知道上面的代码几乎等价于下面的写法,

代码视图4

observableo1=observable.just(newbean(1),newbean(2),newbean(3),newbean(4));observable>o2=o1.nest();observableo3=o2.lift(newoperatorsubscribeon(schedulers.io()));observable>o4=o3.nest();observableo5=o4.lift(newoperatorsubscribeon(schedulers.immediate()));subscribersubscriber4=newsubscriber(){@overridepublicvoidoncompleted(){}@overridepublicvoidonerror(throwablee){}@overridepublicvoidonnext(beans){system.out.println(s.value);}};o5.subscribe(subscriber5);

上面代码的执行过程,可以表示成如下示意图,

通过上面示意图可以看出,最后整个整个订阅过程的运行线程是currentthread->immediatethread->iothread。

by啪嗒科技atanl(atanl@padakeji.com)

相关文章
{{ v.title }}
{{ v.description||(cleanHtml(v.content)).substr(0,100)+'···' }}
你可能感兴趣
推荐阅读 更多>
推荐商标

{{ v.name }}

{{ v.cls }}类

立即购买 联系客服