一. RxJava 概念
- RxJava 是基于 Java 虚拟机上的响应式扩展库,它通过使用可观察的序列将异步和基于事件的程序组合起来。与此同时,它扩展了观察者模式来支持数据/事件序列,并且添加了操作符,这些操作符允许你声明性地组合序列,同时抽象出要关注的问题:比如低级线程、同步、线程安全和并发数据结构等
- RxJava 主要有三大块:订阅流程、线程切换和操作符。篇幅有限,这里只分析订阅流程和线程切换
二. RxJava 的订阅流程
Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26Observable.create(newObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String>emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});这里首先创建了一个被观察者,然后创建一个观察者订阅了这个被观察者,因此下面分两个部分对RxJava的订阅流程进行分析:创建被观察者过程和订阅过程
1. 创建被观察者过程
Observable#create()
1
2
3
4
5// 省略一些检测性的注解
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}- 在
Observable的create()
里面实际上是创建了一个新的 ObservableCreate 对象,同时,把我们定义好的 ObservableOnSubscribe 对象传入了ObservableCreate对象中,最后调用了RxJavaPlugins.onAssembly()
方法
- 在
ObservableCreate
1
2
3
4
5
6
7
8
9
10public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
...
}- 这里仅仅是把 ObservableOnSubscribe 这个对象保存在 ObservableCreate 中了
RxJavaPlugins#onAssembly()
1
2
3
4
5
6public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
// 应用hook函数的一些处理,一般用到不到
...
return source;
}- 最终仅仅是把我们的 ObservableCreate 给返回了
创建被观察者过程小结
Observable.create()
方法仅仅是先将我们自定义的 ObservableOnSubscribe 对象重新包装成了一个 ObservableCreate 对象
2. 订阅过程
Observable#subscribe()
1
2
3
4
5
6
7
8
9
10
11
12
13public final void subscribe(Observer<? super T> observer) {
...
// 1
observer = RxJavaPlugins.onSubscribe(this,observer);
...
// 2
subscribeActual(observer);
...
}- 在注释1处,在
Observable的subscribe()
方法内部首先调用了RxJavaPlugins的onSubscribe()
方法
- 在注释1处,在
RxJavaPlugins#onSubscribe()
1
2
3
4
5
6
7public static <T> Observer<? super T> onSubscribe( Observable<T> source, Observer<? super T> observer) {
// 应用hook函数的一些处理,一般用到不到
...
return observer;
}- 除去 hook 应用的逻辑,这里仅仅是将 observer 返回了
Observable#subscribeActual()
1
protected abstract void subscribeActual(Observer<? super T> observer);
- 这是一个抽象的方法,很明显,它对应的具体实现类就是我们在第一步创建的 ObservableCreate 类
ObservableCreate#subscribeActual()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15@Override
protected void subscribeActual(Observer<? super T> observer) {
// 1
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 2
observer.onSubscribe(parent);
try {
// 3
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}- 在注释1处,首先新创建了一个 CreateEmitter 对象,同时传入了我们自定义的 observer 对象进去
CreateEmitter
1
2
3
4
5
6
7
8
9
10
11
12
13
14static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
...
}- 从上面可以看出,CreateEmitter 通过继承了Java并发包中的原子引用类 AtomicReference 保证了事件流切断状态 Dispose 的一致性,并实现了 ObservableEmitter 接口和 Disposable 接口。接着我们分析下注释 2 处的
observer.onSubscribe(parent)
,这个onSubscribe()
回调的含义其实就是告诉观察者已经成功订阅了被观察者。再看到注释 3 处的source.subscribe(parent)
这行代码,这里的 source 其实是 ObservableOnSubscribe 对象,我们看到ObservableOnSubscribe的subscribe()
方法
- 从上面可以看出,CreateEmitter 通过继承了Java并发包中的原子引用类 AtomicReference 保证了事件流切断状态 Dispose 的一致性,并实现了 ObservableEmitter 接口和 Disposable 接口。接着我们分析下注释 2 处的
ObservableOnSubscribe#subscribe()
1
2
3
4
5
6
7
8
9Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public voidsubscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
});- 这里面使用到了 ObservableEmitter 的
onNext()
方法将事件流发送出去,最后调用了onComplete()
方法完成了订阅过程。 ObservableEmitter 是一个抽象类,实现类就是我们传入的 CreateEmitter 对象
- 这里面使用到了 ObservableEmitter 的
CreateEmitter#onNext() && CreateEmitter#onComplete()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
public void onNext(T t) {
...
if (!isDisposed()) {
//调用观察者的onNext()
observer.onNext(t);
}
}
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
...
}- 在 CreateEmitter 的
onNext()
和onComplete()
方法中首先都要经过一个 isDisposed 的判断,作用就是看当前的事件流是否被切断(废弃)掉了。默认是不切断的,如果想要切断,可以调用 Disposable 的dispose()
方法将此状态设置为切断(废弃)状态
- 在 CreateEmitter 的
ObservableEmitter#isDisposed()
1
2
3
4@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}- 注意到这里通过
get()
方法首先从 ObservableEmitter 的 AtomicReference 中拿到了保存的 Disposable 状态。然后交给了 DisposableHelper 进行判断处理
- 注意到这里通过
DisposableHelper#isDisposed() && DisposableHelper#set()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48public enum DisposableHelper implements Disposable {
DISPOSED;
public static boolean isDisposed(Disposable d) {
// 1
return d == DISPOSED;
}
public static boolean set(AtomicReference<Disposable> field, Disposable d) {
for (;;) {
Disposable current = field.get();
if (current == DISPOSED) {
if (d != null) {
d.dispose();
}
return false;
}
// 2
if (field.compareAndSet(current, d)) {
if (current != null) {
current.dispose();
}
return true;
}
}
}
...
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
// ...
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
...
}- DisposableHelper 是一个枚举类,内部只有一个值即 DISPOSED, 从上面的分析可知它就是用来标记事件流被切断(废弃)状态的。先看到注释 2 和注释 3 处的代码
field.compareAndSet(current, d)
和field.getAndSet(d)
,这里使用了原子引用 AtomicReference 内部包装的 CAS 方法处理了标志 Disposable 的并发读写问题。最后看到注释 3 处,将我们传入的 CreateEmitter 这个原子引用类保存的 Dispable 状态和 DisposableHelper 内部的 DISPOSED 进行比较,如果相等,就证明数据流被切断了
- DisposableHelper 是一个枚举类,内部只有一个值即 DISPOSED, 从上面的分析可知它就是用来标记事件流被切断(废弃)状态的。先看到注释 2 和注释 3 处的代码
CreateEmitter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45@Override
public void onNext(T t) {
...
// 1
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
// 2
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
...
// 3
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
// 4
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
// 5
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
// 6
dispose();
}
}
}- 在注释 1、3、5 处,
onNext()
和onError()
、onComplete()
方法首先都会判断事件流是否被切断,如果事件流此时被切断了,那么onNext()
和onComplete()
则会退出方法体,不做处理,onError()
则会执行到RxJavaPlugins.onError(t)
这句代码,内部会直接抛出异常,导致崩溃。如果事件流没有被切断,那么在onError()
和onComplete()
内部最终会调用到注释 4、6 处的这句dispose()
代码,将事件流进行切断,由此可知,onError()
和onComplete()
只能调用一个,如果先执行的是onComplete()
,再调用onError()
的话就会导致异常崩溃
- 在注释 1、3、5 处,
三. RxJava 的线程切换
Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29Observable.create(new ObservableOnSubscribe<String>() {
@Override
public voidsubscribe(ObservableEmitter<String>emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " +e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});可以看到,RxJava 的线程切换主要分为
subscribeOn()
和observeOn()
方法
1. subscribeOn(Scheduler.io())
- 在
Schedulers.io()
方法中,我们需要先传入一个 Scheduler 调度类,这里是传入了一个调度到 io子线程的调度类
2. Schedulers#io()
1 | static final Scheduler IO; |
- Schedulers 这个类的代码很多,这里我只拿出有关
Schedulers.io()
这个方法涉及的逻辑代码进行讲解。首先,在注释 1 处,同前面分析的订阅流程的处理一样,只是一个处理 hook 的逻辑,最终返回的还是传入的这个 IO 对象 - 再看到注释2处,在 Schedulers 的静态代码块中将 IO 对象进行了初始化,其实质就是新建了一个 IOTask 的静态内部类,在 IOTask 的
call()
方法中,也就是注释 3 处,可以了解到使用了静态内部类的方式把创建的 IOScheduler 对象给返回出去了。实际上,Schedulers.io()
方法其实质就是返回了一个 IOScheduler 对象
3. Observable#subscribeOn()
1 | public final Observable<T> subscribeOn(Scheduler scheduler) { |
- 在
subscribeOn()
方法里面,又将 ObservableCreate 包装成了一个 ObservableSubscribeOn 对象
4. ObservableSubscribeOn
1 | public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { |
- 首先,在注释 1 处,将传进来的 source 和 scheduler 保存起来。接着,等到实际订阅的时候,就会执行到这个
subscribeActual()
方法,在注释 2 处,将我们自定义的 Observer 包装成了一个 SubscribeOnObserver 对象。在注释 3 处,通知观察者订阅了被观察者。在注释 4 处,内部先创建了一个 SubscribeTask 对象
5. ObservableSubscribeOn#SubscribeTask
1 | final class SubscribeTask implements Runnable { |
- SubscribeTask 是 ObservableSubscribeOn 的内部类,它实质上就是一个任务类,在它的
run()
方法中会执行到source.subscribe(parent)
的订阅方法,这个 source 其实就是我们在 ObservableSubscribeOn 构造方法中传进来的 ObservableCreate 对象
6. Scheduler#scheduleDirect()
1 | public Disposable scheduleDirect(@NonNull Runnable run) { |
这里最后会执行到上面这个
scheduleDirect()
重载方法。首先,在注释 1 处,会调用createWorker()
方法创建一个工作者对象 Worker,它是一个抽象类,这里的实现类就是 IoSchedulerIOScheduler#createWorker()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29final AtomicReference<CachedWorkerPool> pool;
...
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
...
public Worker createWorker() {
// 1
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
...
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
// 2
this.threadWorker = pool.get();
}
}- 首先,在注释 1 处调用了
pool.get()
这个方法,pool 是一个 CachedWorkerPool 类型的原子引用对象,它的作用就是用于缓存工作者对象 Worker 的。然后,将得到的 CachedWorkerPool 传入新创建的 EventLoopWorker 对象中。重点关注一下注释 2 处,这里将 CachedWorkerPool 缓存的 threadWorker 对象保存起来了 - 继续分析 3.6 处代码段的注释 2 处的代码,这里又是一个关于 hook 的封装处理,最终还是返回的当前的 Runnable 对象。在注释 3 处新建了一个切断任务 DisposeTask 将 decoratedRun 和 w 对象包装了起来。最后在注释 4 处调用了工作者的
schedule()
方法
- 首先,在注释 1 处调用了
IoScheduler#schedule()
1
2
3
4
5
6@Override
public Disposable schedule(@NonNull Runnableaction, long delayTime, @NonNull TimeUnit unit){
...
return threadWorker.scheduleActual(action,delayTime, unit, tasks);
}- 内部调用了 threadWorker 的
scheduleActual()
方法,实际上是调用到了父类 NewThreadWorker 的scheduleActual()
方法
- 内部调用了 threadWorker 的
NewThreadWorker#scheduleActual()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 1
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
// 2
if (delayTime <= 0) {
// 3
f = executor.submit((Callable<Object>)sr);
} else {
// 4
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}- 在 NewThreadWorker 的
scheduleActual()
方法的内部,在注释 1 处首先会新建一个 ScheduledRunnable 对象,将 Runnable 对象和 parent 包装起来了,这里 parent 是一个 DisposableContainer 对象,它实际的实现类是 CompositeDisposable 类,它是一个保存所有事件流是否被切断状态的容器,其内部的实现是使用了 RxJava 自己定义的一个简单的 OpenHashSet类进行存储 - 最后注释 2 处,判断是否设置了延迟时间,如果设置了,则调用线程池的
submit()
方法立即进行线程切换,否则,调用schedule()
方法进行延时执行线程切换
- 在 NewThreadWorker 的
7. 为什么多次执行 subscribeOn(),只有第一次有效
- 从上面的分析可以很容易了解到被观察者被订阅时是从最外面的一层(ObservableSubscribeOn)通知到里面的一层(ObservableOnSubscribe),当连续执行了到多次
subscribeOn()
的时候,其实就是先执行倒数第一次的subscribeOn()
方法,直到最后一次执行的subscribeOn()
方法,这样肯定会覆盖前面的线程切换
8. observeOn(AndroidSchedulers.mainThread())
1 | public final Observable<T> observeOn(Scheduler scheduler) { |
- 可以看到,
observeOn()
方法内部最终也是返回了一个 ObservableObserveOn 对象
9. ObservableObserveOn#subscribeActual()
1 |
|
- 首先,在注释 1 处,判断指定的调度器是不是 TrampolineScheduler,这是一个不进行线程切换,立即执行当前代码的调度器。如果是,则会直接调用 ObservableSubscribeOn 的
subscribe()
方法;如果不是,则会在注释 3 处创建一个工作者对象 - 然后,在注释 4 处创建一个新的 ObserveOnObserver 将 SubscribeOnobserver 对象包装起来,并传入 ObservableSubscribeOn 的
subscribe()
方法进行订阅
10. ObserveOnObserver
1 | @Override |
- 去除非主线逻辑的代码,在 ObserveOnObserver 的
onNext()
和onError()
、onComplete()
方法中最后都会调用到schedule()
方法。接着看schedule()
方法,其中onNext()
还会把消息存放到队列中
11. ObserveOnObserver#schedule()
1 | void schedule() { |
- 这里使用了 worker 进行调度 ObserveOnObserver 这个实现了 Runnable 的任务。worker 就是在
AndroidSchedulers.mainThread()
中创建的,内部其实就是使用 Handler 进行线程切换的
12. ObserveOnObserver#run()
1 |
|
- 在注释 1 处会先判断 outputFused 这个标志位,它表示事件流是否被融化掉,默认是 false,所以,最后会执行到
drainNormal()
方法
13. ObserveOnObserver#drainNormal()
1 | void drainNormal() { |
- 在注释 1 处,这里的 downstream 实际上是从外面传进来的 SubscribeOnObserver 对象。在注释 2 处将队列中的消息取出来,接着在注释 3 处调用了 SubscribeOnObserver 的
onNext()
方法。最终,会从我们包装类的最外层一直调用到最里面的我们自定义的 Observer 中的onNext()
方法,所以,在observeOn()
方法下面的链式代码都会执行到它所指定的线程中