RxJava流程分析
基本使用
1 | Observable.create(new ObservableOnSubscribe<String>() { |
create
无脑跟进法
1 | public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { |
通过Observable.create()
返回一个Observable子类ObservableCreate的实例。
subscribe
跟进subscribe
,ObservableCreate没有重写该方法,在父类Observable中查找:
1 | public final void subscribe(Observer<? super T> observer) { |
可以再来看看RxJavaPlugins.onSubscribe
搞什么鬼
1 | public static <T> Observer<? super T> onSubscribe( Observable<T> source, Observer<? super T> observer) { |
可以发现包括上面的onAssembly
,RxJavaPlugins做的都是一些额外包装的工作,类似于hook的功能,如果分析朱脉络的可以先不管这个东西。
在看看subscribeActual(observer);
,在Observable中是抽象方法,说明针对不同的创建操作符,如create、just等,都继承于Observable,并重写了subscribeActual来实现具体的逻辑。
1 | protected abstract void subscribeActual(Observer<? super T> observer); |
跟进ObservableCreate.subscribeActual
1 |
|
先讲observer包装为CreateEmitter
1 | static final class CreateEmitter<T> extends AtomicReference<Disposable> |
再通过observer.onSubscribe(parent);
回调我们传入的observer的void onSubscribe(@NonNull Disposable d)
方法,通常我们就是在这里拿到disposable对象来中断流的发送。
然后通过source.subscribe(parent);
回调我们传入的Observable的void subscribe(@NonNull ObservableEmitter<T> e) throws Exception
方法,通常我们是在这里拿到ObservableEmitter对象,即上述的parent对象,来调用他的onNext等方法。
再回到基本使用
1 | Observable.create(new ObservableOnSubscribe<String>() { |
在create传入的ObservableOnSubscribe就是ObservableCreate中的source,subscribe方法中的参数ObservableEmitter<String> e
就是ObservableCreate的方法subscribeActual中source.subscribe(parent);
的参数parent
,即new CreateEmitter<T>(observer);
。
e = parent = new CreateEmitter
所以来看看parent即CreateEmitter的onNext方法
1 |
|
就是这么回调到observer.onNext(t);
中滴,流程很清楚。
线程调度
1 | Observable.create(new ObservableOnSubscribe<String>() { |
subscribeOn
“订阅”所在的线程,即Observable中subscribe方法所在线程,即上游所在线程。
无脑跟进法,启动。。
1 | public final Observable<T> subscribeOn(Scheduler scheduler) { |
跟上面分析的create其实差不多,在没有Function的情况下,onAssembly的返回值就是他的参数。再回顾一下create方法,返回的是包装了source
的ObservableCreate。
1 | public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { |
那么在subscribeOn
方法中,onAssembly参数中构造的ObservableSubscribeOn的参数this就是这个ObservableCreate,同时还有一个scheduler。
1 | public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { |
先不考虑observerOn,直接按照上面分析的订阅流程来,会调用到ObservableSubscribeOn的subscribeActual
方法
1 |
|
比ObservableCreate的subscribeActual多了一个parent.setDisposable…
,跟进scheduler.scheduleDirect之前先看一下scheduler具体是哪个子类
1 |
|
返回的是传入的NEW_THREAD,那么在看看这个NEW_THREAD又是个啥
1 | NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask()); |
看上面代码就知道返回的是new NewThreadTask().call()
1 | static final class NewThreadTask implements Callable<Scheduler> { |
oj8k,上面线程调度传入的Scheduler.newThread()即为NewThreadScheduler实例。
再回到上面ObservableSubscribeOn的subscribeActual方法中跟进scheduler.scheduleDirect(new SubscribeTask(parent)
1 | public Disposable scheduleDirect(@NonNull Runnable run) { |
createWorker由NewThreadScheduler重写
。。就写到这吧,不想分析了,后面就是封装runnable submit到线程池执行,溜了溜了,准备上班
runnable就是
1 |
|
其实就是把source.subscribe(parent);放到县城里执行达到调度的效果嘛。