RxJava流程分析
基本使用
1  | Observable.create(new ObservableOnSubscribe<String>() {  | 
create
无脑跟进法
1  | public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {  | 
通过Observable.create()返回一个Observable子类ObservableCreate的实例。
subscribe
跟进subscribe,ObservableCreate没有重写该方法,在父类Observable中查找:
1  | public final void subscribe(Observer<? super T> observer) {  | 
可以再来看看RxJavaPlugins.onSubscribe搞什么鬼
1  | public static <T> Observer<? super T> onSubscribe( Observable<T> source, Observer<? super T> observer) {  | 
可以发现包括上面的onAssembly,RxJavaPlugins做的都是一些额外包装的工作,类似于hook的功能,如果分析朱脉络的可以先不管这个东西。
在看看subscribeActual(observer);,在Observable中是抽象方法,说明针对不同的创建操作符,如create、just等,都继承于Observable,并重写了subscribeActual来实现具体的逻辑。
1  | protected abstract void subscribeActual(Observer<? super T> observer);  | 
跟进ObservableCreate.subscribeActual
1  | 
  | 
先讲observer包装为CreateEmitter
1  | static final class CreateEmitter<T> extends AtomicReference<Disposable>  | 
再通过observer.onSubscribe(parent);回调我们传入的observer的void onSubscribe(@NonNull Disposable d)方法,通常我们就是在这里拿到disposable对象来中断流的发送。
然后通过source.subscribe(parent);回调我们传入的Observable的void subscribe(@NonNull ObservableEmitter<T> e) throws Exception方法,通常我们是在这里拿到ObservableEmitter对象,即上述的parent对象,来调用他的onNext等方法。
再回到基本使用
1  | Observable.create(new ObservableOnSubscribe<String>() {  | 
在create传入的ObservableOnSubscribe就是ObservableCreate中的source,subscribe方法中的参数ObservableEmitter<String> e就是ObservableCreate的方法subscribeActual中source.subscribe(parent);的参数parent,即new CreateEmitter<T>(observer);。
e = parent = new CreateEmitter
所以来看看parent即CreateEmitter的onNext方法
1  | 
  | 
就是这么回调到observer.onNext(t);中滴,流程很清楚。
线程调度
1  | Observable.create(new ObservableOnSubscribe<String>() {  | 
subscribeOn
“订阅”所在的线程,即Observable中subscribe方法所在线程,即上游所在线程。
无脑跟进法,启动。。
1  | public final Observable<T> subscribeOn(Scheduler scheduler) {  | 
跟上面分析的create其实差不多,在没有Function的情况下,onAssembly的返回值就是他的参数。再回顾一下create方法,返回的是包装了source的ObservableCreate。
1  | public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {  | 
那么在subscribeOn方法中,onAssembly参数中构造的ObservableSubscribeOn的参数this就是这个ObservableCreate,同时还有一个scheduler。
1  | public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {  | 
先不考虑observerOn,直接按照上面分析的订阅流程来,会调用到ObservableSubscribeOn的subscribeActual方法
1  | 
  | 
比ObservableCreate的subscribeActual多了一个parent.setDisposable…,跟进scheduler.scheduleDirect之前先看一下scheduler具体是哪个子类
1  | 
  | 
返回的是传入的NEW_THREAD,那么在看看这个NEW_THREAD又是个啥
1  | NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());  | 
看上面代码就知道返回的是new NewThreadTask().call()
1  | static final class NewThreadTask implements Callable<Scheduler> {  | 
oj8k,上面线程调度传入的Scheduler.newThread()即为NewThreadScheduler实例。
再回到上面ObservableSubscribeOn的subscribeActual方法中跟进scheduler.scheduleDirect(new SubscribeTask(parent)
1  | public Disposable scheduleDirect(@NonNull Runnable run) {  | 
createWorker由NewThreadScheduler重写
。。就写到这吧,不想分析了,后面就是封装runnable submit到线程池执行,溜了溜了,准备上班
runnable就是
1  | 
  | 
其实就是把source.subscribe(parent);放到县城里执行达到调度的效果嘛。
