schedulers是RxJava2中非常重要的概念,也是其最為核心的部分之一。schedulers能夠使RxJava2的執行更加高效,可控,能夠有效地防止RxJava2中常見的一些問題,如線程阻塞、卡頓等。本文將從多個方面對schedulers進行詳細的闡述。
一、schedulers介紹
schedulers是RxJava2中用來控制Observable數據源的線程的調度程序,能夠有效地控制Observable的運行、訂閱和發射。RxJava2中提供了許多的schedulers,如Schedulers.io(),Schedulers.newThread(),Schedulers.computation()和AndroidSchedulers.mainThread()等。
為了便於理解schedulers的作用,我們來看一下下面這個例子:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
}
});
在這個例子中,我們在Observable中發射了數字1並在Observer中接收這個數字。我們打印出了Observable和Observer所在的線程名。如果我們不使用schedulers,我們會發現Observable和Observer都在同一個線程中,即主線程中。這樣的話,就可能會出現Observable中執行耗時操作導致主線程卡頓或者ANR的問題。
那麼,我們可以使用schedulers來改變Observable和Observer所在的線程。例如,我們可以使用Schedulers.newThread()將Observable切換到一個新的線程中:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribeOn(Schedulers.newThread()).subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
}
});
這樣,Observable就會切換到一個新的線程中執行,而Observer仍然在主線程中執行。
二、schedulers的類型
RxJava2中提供了多種schedulers類型,每種類型都有不同的用途:
1. Schedulers.io()
Schedulers.io()是一個異步任務線程池,用於執行I/O操作或延遲任務。它比Schedulers.newThread()更有效,因為在同一時間可以有多個I/O操作運行在Schedulers.io()線程池中,而Schedulers.newThread()始終只能運行一個線程。
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribeOn(Schedulers.io()).subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
}
});
2. Schedulers.newThread()
Schedulers.newThread()會創建一個新的線程執行任務。如果你沒有為任務創建專門的線程池,它就會為你創建線程。
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribeOn(Schedulers.newThread()).subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
}
});
3. Schedulers.computation()
Schedulers.computation()會創建一個固定大小的線程池,用於執行CPU密集型操作,比如數學運算。它的線程池大小默認為CPU內核數量,但你也可以通過調用Schedulers.computation(int n)來使用不同大小的線程池。
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribeOn(Schedulers.computation()).subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
}
});
4. AndroidSchedulers.mainThread()
AndroidSchedulers.mainThread()是在Android中使用的schedulers類型,它會將任務切換到Android主線程中運行。一般用於將結果通知給UI組件。
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
textView.setText(String.valueOf(integer));
}
});
三、鏈式調用和指定線程
在RxJava2中,我們可以使用鏈式調用來指定Observable和Observer的線程:
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
textView.setText(String.valueOf(integer));
}
});
在這個例子中,我們先使用just創建了一個包含數字1-4的Observable,然後使用subscribeOn指定Observable的線程為io線程。接着使用observeOn將Observer的線程切換到Android主線程中。最後,在subscribe中指定接收Observable發射出的整數,並將其設置為textView的內容。
四、指定多個線程
有時候,我們需要在不同的地方指定不同的線程,這時我們可以使用RxJava2中的更高級的方法。例如,我們可以使用flatMap來指定多個不同的線程:
Observable.just(1, 2, 3)
.flatMap(new Function<Integer, ObservableSource>() {
@Override
public ObservableSource apply(Integer integer) throws Exception {
return Observable.just(String.valueOf(integer))
.subscribeOn(Schedulers.newThread())
.map(new Function() {
@Override
public String apply(String s) throws Exception {
Log.d(TAG, "map apply thread is : " + Thread.currentThread().getName());
return s + " mapped to " + Thread.currentThread().getName();
}
})
.observeOn(AndroidSchedulers.mainThread());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "accept: " + s);
}
});
在這個例子中,我們使用flatMap將Observable中的每個整數轉換為一個新的Observable。在新的Observable中,我們先使用subscribeOn將線程切換到新的線程中,然後使用map進行數據轉換並指定線程,最後使用observeOn將線程切換回主線程中。在subscribe中我們指定接收字符串數據並打印出來。
總結
本文從介紹schedulers,schedulers的類型,鏈式調用和指定線程以及指定多個線程4個方面對schedulers進行了詳細的闡述。如果在RxJava2的開發中沒有使用schedulers,就意味着你失去了很多優化和控制的機會。通過本文,希望大家可以更好地理解和使用schedulers。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/153104.html
微信掃一掃
支付寶掃一掃