观察者模式和RxJava的异同
先来快速的写一个观察者模式的实现,大致如下:1
2
3
4
5
6
7
8public interface Observer {
    void update(Event event);
}
public interface Observable{
    void subscribe(Observer observer);
    void unsubscribe(Observer observer);
    void notifyObserver(Event event);
}
这里有观察者Observer和被观察者Observable,Observable有订阅和取消订阅,以及通知Observer的notify方法,以及参数Event。
这里就囊括了RxJava的四个基本要素:
- 观察者Observer
 - 被观察者Observable
 - 订阅subscribe
 - 事件Event
 
而RxJava在此基础上,还定义了2个特殊事件:
- onCompleted
 - onError
 
RxJava基本实现
这里展示一下1.0的基本用法 后续都是基于rxjava2.0来演示
RxJava1.0
观察者Observer/Subscriber
1  | //Observer<String> observer = new Observer<String>()  | 
Subscriber相对于Observer新增了2个方法:
- onStart
 - unsubscribe
 
被观察者Observable
1  | Observable observable = Observable.create(new Observable.OnSubscribe<String>() {  | 
订阅subscrible
1  | observable.subscrible(subsricber)  | 
RxJava2.0
观察者Observer/Subscriber
1  | Observer<Integer> observer = new Observer<String>() {  | 
被观察者Observable
1  | Observable<String> observable =Observable.create(new ObservableOnSubscribe<String>() {  | 
订阅subscrible
1  | observable.subscrible(subsricber)  | 
rxjava链式写法
1  | Observable.create(new ObservableOnSubscribe<String>() {  | 
输出如下:1
2
3
4
5subscribe
Event1
Event2
Event3
complete
1.0 2.0差别
ObservableEmitter
这里2.0在实例化Observable的时候,使用的是ObservableOnSubscribe这个类,并且重写onSubscribe方法,参数是ObservableEmitter(Emitter译为发射器)。通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件,对应Observer的三个方法。
Disposable
在Observer的onSubscribe方法中,参数类型为Disposable,拿到这个参数,Observer可以随时调用dipose方法来结束本次订阅,尽管Observable的事件依然会按顺序发送直到接送,但Observer在调用dipose之后就停止了接收。
Consumer
在2.0中subscribe有多个重载方法1
2
3
4
5
6public final Disposable subscribe() {}
    public final Disposable subscribe(Consumer<? super T> onNext) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    public final void subscribe(Observer<? super T> observer) {}
拿来替换1.0中的Action0、Action1等,在1.0中通过参数个数来分别对应next、competed、error事件,代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    
    public void call() {
        Log.d(tag, "completed");
    }
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
而在2.0中,使用方法也差不多,也是通过参数判断,只不过整合到一个Consumer类中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36Consumer<String> nextConsumer = new Consumer<String>() {
    
    public void accept(String string) throws Exception {
        System.out.println( "onNext: " + string);
    }
};
Consumer<Disposable> disposableConsumer = new Consumer<Disposable>() {
    
    public void accept(Disposable disposable) throws Exception {
        System.out.println( "Disposable");
    }
};
Action competedAction = new Action() {
    
    public void run() throws Exception {
        System.out.println( "onComplete" );
    }
};
Consumer<Throwable> errorConsumer = new Consumer<Throwable>() {
    
    public void accept(Throwable e) throws Exception {
        System.out.println( "onError: " + e.getMessage());
    }
};
Observable.create(new ObservableOnSubscribe<String>() {
    
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("Event1");
        e.onComplete();
    }
}).subscribe(nextConsumer,errorConsumer,competedAction,disposableConsumer);
输出:1
2
3Disposable
onNext: Event1
onComplete
参考资料
##
