RxJava是一個神奇的框架,用法很簡單,但內(nèi)部實現(xiàn)有點復雜,代碼邏輯有點繞。我讀源碼時,確實有點似懂非懂的感覺。網(wǎng)上關于RxJava源碼分析的文章,源碼貼了一大堆,代碼邏輯繞來繞去的,讓人看得云里霧里的。既然用拆輪子的方式來分析源碼比較難啃,不如換種方式,以造輪子的方式,將源碼中與性能、兼容性、擴展性有關的代碼剔除,留下核心代碼帶大家揭秘 RxJava 的實現(xiàn)原理。
成都創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設、高性價比古冶網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式古冶網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設找我們,業(yè)務覆蓋古冶地區(qū)。費用合理售后完善,10年實體公司更值得信賴。什么是RxJava
將上面的例子進行代碼抽象,步驟如下:
上面示例的演示代碼如下:
//1.創(chuàng)建被觀察者
Observable<String> observable =
Observable.create(new Observable.OnSubscribe<String>() {@Override
br/>@Override
//4.開始發(fā)送事件
//事件有3個類型 分別是onNext() onCompleted() onError()
//onCompleted() onError() 一般都是用來通知觀察者 事件發(fā)送完畢了,兩者只取其一。
subscriber.onNext("Hello Android !");
subscriber.onNext("Hello Java !");
subscriber.onNext("Hello C !");
subscriber.onCompleted();
}
});
//2.創(chuàng)建觀察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: "+e.getLocalizedMessage());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};
//3.訂閱
observable.subscribe(subscriber);
輸出如下:
com.m520it.rxjava I/IT520: onNext: Hello Android !
com.m520it.rxjava I/IT520: onNext: Hello Java !
com.m520it.rxjava I/IT520: onNext: Hello C !
com.m520it.rxjava I/IT520: onCompleted
代碼運行的原理
? onError - 如果無法發(fā)射需要的值,Single發(fā)射一個Throwable對象到這個方法
Single只會調(diào)用這兩個方法中的一個,而且只會調(diào)用一次,調(diào)用了任何一個方法之后,訂閱關系終止。
final Single<String> single = Single.create(new Single.OnSubscribe<String>() {@Override
br/>@Override
//先調(diào)用onNext() 最后調(diào)用onCompleted()
//singleSubscriber.onSuccess("Hello Android !");
//只調(diào)用onError();
singleSubscriber.onError(new NullPointerException("mock Exception !"));
}
});
Observer<String> observer = new Observer<String>() {@Override
br/>@Override
Log.i(TAG, "onCompleted ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: "+e.getLocalizedMessage());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};
single.subscribe(observer);
6.觀察者變種
Observer觀察者對象,上面我們用Subscriber對象代替。因為該對象本身就是繼承了Observer。
該對象實現(xiàn)了onNext()&onCompleted()&onError()事件,我們?nèi)绻麑δ膫€事件比較關心,只需要實現(xiàn)對應的方法即可,代碼如下:
//創(chuàng)建觀察者
Subscriber<String> subscriber = new Subscriber<String>() {@Override
br/>@Override
Log.i(TAG, "onCompleted ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: "+e.getLocalizedMessage());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};
//訂閱
observable.subscribe(subscriber);
上面的代碼中,如果你只關心onNext()事件,但卻不得不實現(xiàn)onCompleted()&onError()事件.這樣的代碼就顯得很臃腫。鑒于這種需求,RxJava框架在訂閱方面做了特定的調(diào)整,代碼如下:
//為指定的onNext事件創(chuàng)建獨立的接口
Action1<String> onNextAction = new Action1<String>() {@Override
br/>@Override
Log.i(TAG, "call: "+s);
}
};
//訂閱
observable.subscribe(onNextAction);
不知道大家注意到?jīng)]有,subscribe()訂閱的不再是觀察者,而是特定的onNext接口對象。類似的函數(shù)如下,我們可以根據(jù)需要實現(xiàn)對應的訂閱:
public Subscription subscribe(final Observer observer)
public Subscription subscribe(final Action1 onNext)
public Subscription subscribe(final Action1 onNext, Action1 onError)
public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)
這里還有一個forEach函數(shù)有類似的功能:
public void forEach(final Action1 onNext)
public void forEach(final Action1 onNext, Action1 onError)
public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)
##7.Subject變種
上面2節(jié)中既介紹了被觀察者變種,又介紹了觀察者變種,這里再介紹一種雌雄同體的對象(既作為被觀察者使用,也可以作為觀察者)。
針對不同的場景一共有四種類型的Subject。他們并不是在所有的實現(xiàn)中全部都存在。
###AsyncSubject
一個AsyncSubject只在原始Observable完成后,發(fā)射來自原始Observable的最后一個值。它會把這最后一個值發(fā)射給任何后續(xù)的觀察者。
以下貼出代碼:
//創(chuàng)建被觀察者final AsyncSubject<String> subject = AsyncSubject.create();//創(chuàng)建觀察者
Subscriber<String> subscriber = new Subscriber<String>() {@Override
br/>@Override
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.i(TAG, "s:" + s);
}
};//訂閱事件
subject.subscribe(subscriber);//被觀察者發(fā)出事件 如果調(diào)用onCompleted(),onNext()則會打印最后一個事件;如果沒有,onNext()則不打印任何事件。
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
subject.onCompleted();
輸出:
s:Hello Java onCompleted
然而,如果原始的Observable因為發(fā)生了錯誤而終止,AsyncSubject將不會發(fā)射任何數(shù)據(jù),只是簡單的向前傳遞這個錯誤通知。
上面的觀察者被觀察者代碼相同,現(xiàn)在發(fā)出一系列信號,并在最后發(fā)出異常 代碼如下:
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");//因為發(fā)送了異常 所以onNext()無法被打印
subject.onError(null);
###BehaviorSubject
當觀察者訂閱BehaviorSubject時,他會將訂閱前最后一次發(fā)送的事件和訂閱后的所有發(fā)送事件都打印出來,如果訂閱前無發(fā)送事件,則會默認接收構(gòu)造器create(T)里面的對象和訂閱后的所有事件,代碼如下:
BehaviorSubject subject=BehaviorSubject.create("NROMAL");
Subscriber subscriber = new Subscriber() {@Override
br/>@Override
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext: " + o);
}
};
//subject.onNext("Hello Android !");//subject.onNext("Hello Java !");//subject.onNext("Hello C !");//這里開始訂閱 如果上面的3個注釋沒去掉,則Hello C的事件和訂閱后面的事件生效//如果上面的三個注釋去掉 則打印構(gòu)造器NORMAL事件生效后和訂閱后面的事件生效
subject.subscribe(subscriber);
subject.onNext("Hello CPP !");
subject.onNext("Hello IOS !");
PublishSubject
PublishSubject只會把在訂閱發(fā)生的時間點之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者。
需要注意的是,PublishSubject可能會一創(chuàng)建完成就立刻開始發(fā)射數(shù)據(jù),因此這里有一個風險:在Subject被創(chuàng)建后到有觀察者訂閱它之前這個時間段內(nèi),一個或多個數(shù)據(jù)可能會丟失。
代碼如下:
PublishSubject subject= PublishSubject.create();
Action1<String> onNextAction1 = new Action1<String>(){
@Override
public void call(String s) {
Log.i(TAG, "onNextAction1 call: "+s);
}
};
Action1<String> onNextAction2 = new Action1<String>(){
@Override
public void call(String s) {
Log.i(TAG, "onNextAction2 call: "+s);
}
};
subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");
輸出如下:
onNextAction1 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
ReplaySubject
ReplaySubject會發(fā)射所有來自原始Observable的數(shù)據(jù)給觀察者,無論它們是何時訂閱的。
代碼如下:
ReplaySubject subject= ReplaySubject.create();
Action1<String> onNextAction1 = new Action1<String>(){
@Override
public void call(String s) {
Log.i(TAG, "onNextAction1 call: "+s);
}
};
Action1<String> onNextAction2 = new Action1<String>(){
@Override
public void call(String s) {
Log.i(TAG, "onNextAction2 call: "+s);
}
};
subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");
輸出如下:
onNextAction1 call: Hello Android !
onNextAction1 call: Hello Java !
onNextAction2 call: Hello Android !
onNextAction2 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
###Subject總結(jié)
AsyncSubject無論何時訂閱 只會接收最后一次onNext()事件,如果最后出現(xiàn)異常,則不會打印任何onNext()
BehaviorSubject會從訂閱前最后一次oNext()開始打印直至結(jié)束。如果訂閱前無調(diào)用onNext(),則調(diào)用默認creat(T)傳入的對象。如果異常后才調(diào)用,則不打印onNext()
PublishSubject只會打印訂閱后的任何事件。
ReplaySubject無論訂閱在何時都會調(diào)用發(fā)送的事件。
創(chuàng)新互聯(lián)www.cdcxhl.cn,專業(yè)提供香港、美國云服務器,動態(tài)BGP最優(yōu)骨干路由自動選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡助力業(yè)務部署。公司持有工信部辦法的idc、isp許可證, 機房獨有T級流量清洗系統(tǒng)配攻擊溯源,準確進行流量調(diào)度,確保服務器高可用性。佳節(jié)活動現(xiàn)已開啟,新人活動云服務器買多久送多久。
當前文章:史上最淺顯易懂的RxJava入門教程-創(chuàng)新互聯(lián)
標題URL:http://jinyejixie.com/article44/pehee.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設計、商城網(wǎng)站、網(wǎng)站制作、網(wǎng)站導航、網(wǎng)站內(nèi)鏈、營銷型網(wǎng)站建設
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容