0%

Android 三方库源码解析系列(五):RxJava 原理及源码解析

一. RxJava 概念

  • RxJava 是基于 Java 虚拟机上的响应式扩展库,它通过使用可观察的序列将异步和基于事件的程序组合起来。与此同时,它扩展了观察者模式来支持数据/事件序列,并且添加了操作符,这些操作符允许你声明性地组合序列,同时抽象出要关注的问题:比如低级线程、同步、线程安全和并发数据结构等
  • RxJava 主要有三大块:订阅流程线程切换操作符。篇幅有限,这里只分析订阅流程和线程切换

二. RxJava 的订阅流程

  • Demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    Observable.create(newObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String>emitter) throws Exception {
    emitter.onNext("1");
    emitter.onNext("2");
    emitter.onNext("3");
    emitter.onComplete();
    }
    }).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    Log.d(TAG, "onSubscribe");
    }
    @Override
    public void onNext(String s) {
    Log.d(TAG, "onNext : " + s);
    }
    @Override
    public void onError(Throwable e) {
    Log.d(TAG, "onError : " + e.toString());
    }
    @Override
    public void onComplete() {
    Log.d(TAG, "onComplete");
    }
    });
  • 这里首先创建了一个被观察者,然后创建一个观察者订阅了这个被观察者,因此下面分两个部分对RxJava的订阅流程进行分析:创建被观察者过程订阅过程

1. 创建被观察者过程

  • Observable#create()

    1
    2
    3
    4
    5
    // 省略一些检测性的注解
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    • Observable的create() 里面实际上是创建了一个新的 ObservableCreate 对象,同时,把我们定义好的 ObservableOnSubscribe 对象传入了ObservableCreate对象中,最后调用了 RxJavaPlugins.onAssembly() 方法
  • ObservableCreate

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public final class ObservableCreate<T> extends Observable<T> {

    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
    }

    ...
    }
    • 这里仅仅是把 ObservableOnSubscribe 这个对象保存在 ObservableCreate 中了
  • RxJavaPlugins#onAssembly()

    1
    2
    3
    4
    5
    6
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {

    // 应用hook函数的一些处理,一般用到不到
    ...
    return source;
    }
    • 最终仅仅是把我们的 ObservableCreate 给返回了
  • 创建被观察者过程小结

    • Observable.create() 方法仅仅是先将我们自定义的 ObservableOnSubscribe 对象重新包装成了一个 ObservableCreate 对象

2. 订阅过程

  • Observable#subscribe()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public final void subscribe(Observer<? super T> observer) {
    ...

    // 1
    observer = RxJavaPlugins.onSubscribe(this,observer);

    ...

    // 2
    subscribeActual(observer);

    ...
    }
    • 在注释1处,在 Observable的subscribe() 方法内部首先调用了 RxJavaPlugins的onSubscribe() 方法
  • RxJavaPlugins#onSubscribe()

    1
    2
    3
    4
    5
    6
    7
    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {

    // 应用hook函数的一些处理,一般用到不到
    ...

    return observer;
    }
    • 除去 hook 应用的逻辑,这里仅仅是将 observer 返回了
  • Observable#subscribeActual()

    1
    protected abstract void subscribeActual(Observer<? super T> observer);
    • 这是一个抽象的方法,很明显,它对应的具体实现类就是我们在第一步创建的 ObservableCreate 类
  • ObservableCreate#subscribeActual()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
    // 1
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 2
    observer.onSubscribe(parent);

    try {
    // 3
    source.subscribe(parent);
    } catch (Throwable ex) {
    Exceptions.throwIfFatal(ex);
    parent.onError(ex);
    }
    }
    • 在注释1处,首先新创建了一个 CreateEmitter 对象,同时传入了我们自定义的 observer 对象进去
  1. CreateEmitter

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

    ...

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
    this.observer = observer;
    }

    ...
    }
    • 从上面可以看出,CreateEmitter 通过继承了Java并发包中的原子引用类 AtomicReference 保证了事件流切断状态 Dispose 的一致性,并实现了 ObservableEmitter 接口和 Disposable 接口。接着我们分析下注释 2 处的 observer.onSubscribe(parent),这个 onSubscribe() 回调的含义其实就是告诉观察者已经成功订阅了被观察者。再看到注释 3 处的 source.subscribe(parent) 这行代码,这里的 source 其实是 ObservableOnSubscribe 对象,我们看到 ObservableOnSubscribe的subscribe() 方法
  2. ObservableOnSubscribe#subscribe()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public voidsubscribe(ObservableEmitter<String> emitter) throws Exception {
    emitter.onNext("1");
    emitter.onNext("2");
    emitter.onNext("3");
    emitter.onComplete();
    }
    });
    • 这里面使用到了 ObservableEmitter 的 onNext() 方法将事件流发送出去,最后调用了 onComplete() 方法完成了订阅过程。 ObservableEmitter 是一个抽象类,实现类就是我们传入的 CreateEmitter 对象
  3. CreateEmitter#onNext() && CreateEmitter#onComplete()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

    ...

    @Override
    public void onNext(T t) {
    ...

    if (!isDisposed()) {
    //调用观察者的onNext()
    observer.onNext(t);
    }
    }

    @Override
    public void onComplete() {
    if (!isDisposed()) {
    try {
    observer.onComplete();
    } finally {
    dispose();
    }
    }
    }


    ...

    }
    • 在 CreateEmitter 的 onNext()onComplete() 方法中首先都要经过一个 isDisposed 的判断,作用就是看当前的事件流是否被切断(废弃)掉了。默认是不切断的,如果想要切断,可以调用 Disposable 的 dispose() 方法将此状态设置为切断(废弃)状态
  4. ObservableEmitter#isDisposed()

    1
    2
    3
    4
    @Override
    public boolean isDisposed() {
    return DisposableHelper.isDisposed(get());
    }
    • 注意到这里通过 get() 方法首先从 ObservableEmitter 的 AtomicReference 中拿到了保存的 Disposable 状态。然后交给了 DisposableHelper 进行判断处理
  5. DisposableHelper#isDisposed() && DisposableHelper#set()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    public enum DisposableHelper implements Disposable {

    DISPOSED;

    public static boolean isDisposed(Disposable d) {
    // 1
    return d == DISPOSED;
    }

    public static boolean set(AtomicReference<Disposable> field, Disposable d) {
    for (;;) {
    Disposable current = field.get();
    if (current == DISPOSED) {
    if (d != null) {
    d.dispose();
    }
    return false;
    }
    // 2
    if (field.compareAndSet(current, d)) {
    if (current != null) {
    current.dispose();
    }
    return true;
    }
    }
    }

    ...

    public static boolean dispose(AtomicReference<Disposable> field) {
    Disposable current = field.get();
    Disposable d = DISPOSED;
    if (current != d) {
    // ...
    current = field.getAndSet(d);
    if (current != d) {
    if (current != null) {
    current.dispose();
    }
    return true;
    }
    }
    return false;
    }

    ...
    }
    • DisposableHelper 是一个枚举类,内部只有一个值即 DISPOSED, 从上面的分析可知它就是用来标记事件流被切断(废弃)状态的。先看到注释 2 和注释 3 处的代码 field.compareAndSet(current, d)field.getAndSet(d),这里使用了原子引用 AtomicReference 内部包装的 CAS 方法处理了标志 Disposable 的并发读写问题。最后看到注释 3 处,将我们传入的 CreateEmitter 这个原子引用类保存的 Dispable 状态和 DisposableHelper 内部的 DISPOSED 进行比较,如果相等,就证明数据流被切断了
  6. CreateEmitter

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    @Override
    public void onNext(T t) {
    ...
    // 1
    if (!isDisposed()) {
    observer.onNext(t);
    }
    }

    @Override
    public void onError(Throwable t) {
    if (!tryOnError(t)) {
    // 2
    RxJavaPlugins.onError(t);
    }
    }

    @Override
    public boolean tryOnError(Throwable t) {
    ...
    // 3
    if (!isDisposed()) {
    try {
    observer.onError(t);
    } finally {
    // 4
    dispose();
    }
    return true;
    }
    return false;
    }

    @Override
    public void onComplete() {
    // 5
    if (!isDisposed()) {
    try {
    observer.onComplete();
    } finally {
    // 6
    dispose();
    }
    }
    }
    • 在注释 1、3、5 处,onNext()onError()onComplete() 方法首先都会判断事件流是否被切断,如果事件流此时被切断了,那么 onNext()onComplete() 则会退出方法体,不做处理,onError() 则会执行到 RxJavaPlugins.onError(t) 这句代码,内部会直接抛出异常,导致崩溃。如果事件流没有被切断,那么在 onError()onComplete() 内部最终会调用到注释 4、6 处的这句 dispose() 代码,将事件流进行切断,由此可知,onError()onComplete() 只能调用一个,如果先执行的是 onComplete(),再调用 onError() 的话就会导致异常崩溃

三. RxJava 的线程切换

  • Demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public voidsubscribe(ObservableEmitter<String>emitter) throws Exception {
    emitter.onNext("1");
    emitter.onNext("2");
    emitter.onNext("3");
    emitter.onComplete();
    }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    Log.d(TAG, "onSubscribe");
    }
    @Override
    public void onNext(String s) {
    Log.d(TAG, "onNext : " + s);
    }
    @Override
    public void onError(Throwable e) {
    Log.d(TAG, "onError : " +e.toString());
    }
    @Override
    public void onComplete() {
    Log.d(TAG, "onComplete");
    }
    });
  • 可以看到,RxJava 的线程切换主要分为 subscribeOn()observeOn() 方法

1. subscribeOn(Scheduler.io())

  • Schedulers.io() 方法中,我们需要先传入一个 Scheduler 调度类,这里是传入了一个调度到 io子线程的调度类

2. Schedulers#io()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static final Scheduler IO;

...

public static Scheduler io() {
// 1
return RxJavaPlugins.onIoScheduler(IO);
}

static {
...

// 2
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}

static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
// 3
return IoHolder.DEFAULT;
}
}

static final class IoHolder {
// 4
static final Scheduler DEFAULT = new IoScheduler();
}
  • Schedulers 这个类的代码很多,这里我只拿出有关 Schedulers.io() 这个方法涉及的逻辑代码进行讲解。首先,在注释 1 处,同前面分析的订阅流程的处理一样,只是一个处理 hook 的逻辑,最终返回的还是传入的这个 IO 对象
  • 再看到注释2处,在 Schedulers 的静态代码块中将 IO 对象进行了初始化,其实质就是新建了一个 IOTask 的静态内部类,在 IOTask 的call() 方法中,也就是注释 3 处,可以了解到使用了静态内部类的方式把创建的 IOScheduler 对象给返回出去了。实际上,Schedulers.io() 方法其实质就是返回了一个 IOScheduler 对象

3. Observable#subscribeOn()

1
2
3
4
5
  public final Observable<T> subscribeOn(Scheduler scheduler) {
...

return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
  • subscribeOn() 方法里面,又将 ObservableCreate 包装成了一个 ObservableSubscribeOn 对象

4. ObservableSubscribeOn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
// 1
super(source);
this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> observer) {
// 2
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

// 3
observer.onSubscribe(parent);

// 4
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

...
}
  • 首先,在注释 1 处,将传进来的 source 和 scheduler 保存起来。接着,等到实际订阅的时候,就会执行到这个 subscribeActual() 方法,在注释 2 处,将我们自定义的 Observer 包装成了一个 SubscribeOnObserver 对象。在注释 3 处,通知观察者订阅了被观察者。在注释 4 处,内部先创建了一个 SubscribeTask 对象

5. ObservableSubscribeOn#SubscribeTask

1
2
3
4
5
6
7
8
9
10
11
12
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent);
}
}
  • SubscribeTask 是 ObservableSubscribeOn 的内部类,它实质上就是一个任务类,在它的 run() 方法中会执行到 source.subscribe(parent) 的订阅方法,这个 source 其实就是我们在 ObservableSubscribeOn 构造方法中传进来的 ObservableCreate 对象

6. Scheduler#scheduleDirect()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

// 1
final Worker w = createWorker();

// 2
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

// 3
DisposeTask task = new DisposeTask(decoratedRun, w);

// 4
w.schedule(task, delay, unit);

return task;
}
  • 这里最后会执行到上面这个 scheduleDirect() 重载方法。首先,在注释 1 处,会调用 createWorker() 方法创建一个工作者对象 Worker,它是一个抽象类,这里的实现类就是 IoScheduler

    • IOScheduler#createWorker()

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      final AtomicReference<CachedWorkerPool> pool;

      ...

      public IoScheduler(ThreadFactory threadFactory) {
      this.threadFactory = threadFactory;
      this.pool = new AtomicReference<CachedWorkerPool>(NONE);
      start();
      }

      ...

      @Override
      public Worker createWorker() {
      // 1
      return new EventLoopWorker(pool.get());
      }

      static final class EventLoopWorker extends Scheduler.Worker {
      ...

      EventLoopWorker(CachedWorkerPool pool) {
      this.pool = pool;
      this.tasks = new CompositeDisposable();
      // 2
      this.threadWorker = pool.get();
      }

      }
      • 首先,在注释 1 处调用了 pool.get() 这个方法,pool 是一个 CachedWorkerPool 类型的原子引用对象,它的作用就是用于缓存工作者对象 Worker 的。然后,将得到的 CachedWorkerPool 传入新创建的 EventLoopWorker 对象中。重点关注一下注释 2 处,这里将 CachedWorkerPool 缓存的 threadWorker 对象保存起来了
      • 继续分析 3.6 处代码段的注释 2 处的代码,这里又是一个关于 hook 的封装处理,最终还是返回的当前的 Runnable 对象。在注释 3 处新建了一个切断任务 DisposeTask 将 decoratedRun 和 w 对象包装了起来。最后在注释 4 处调用了工作者的 schedule() 方法
    • IoScheduler#schedule()

      1
      2
      3
      4
      5
      6
      @Override
      public Disposable schedule(@NonNull Runnableaction, long delayTime, @NonNull TimeUnit unit){
      ...

      return threadWorker.scheduleActual(action,delayTime, unit, tasks);
      }
      • 内部调用了 threadWorker 的 scheduleActual() 方法,实际上是调用到了父类 NewThreadWorker 的 scheduleActual() 方法
    • NewThreadWorker#scheduleActual()

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      public NewThreadWorker(ThreadFactory threadFactory) {
      executor = SchedulerPoolFactory.create(threadFactory);
      }


      @NonNull
      public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
      Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

      // 1
      ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);


      if (parent != null) {
      if (!parent.add(sr)) {
      return sr;
      }
      }

      Future<?> f;
      try {
      // 2
      if (delayTime <= 0) {
      // 3
      f = executor.submit((Callable<Object>)sr);
      } else {
      // 4
      f = executor.schedule((Callable<Object>)sr, delayTime, unit);
      }
      sr.setFuture(f);
      } catch (RejectedExecutionException ex) {
      if (parent != null) {
      parent.remove(sr);
      }
      RxJavaPlugins.onError(ex);
      }

      return sr;
      }
      • 在 NewThreadWorker 的 scheduleActual() 方法的内部,在注释 1 处首先会新建一个 ScheduledRunnable 对象,将 Runnable 对象和 parent 包装起来了,这里 parent 是一个 DisposableContainer 对象,它实际的实现类是 CompositeDisposable 类,它是一个保存所有事件流是否被切断状态的容器,其内部的实现是使用了 RxJava 自己定义的一个简单的 OpenHashSet类进行存储
      • 最后注释 2 处,判断是否设置了延迟时间,如果设置了,则调用线程池的 submit() 方法立即进行线程切换,否则,调用 schedule() 方法进行延时执行线程切换

7. 为什么多次执行 subscribeOn(),只有第一次有效

  • 从上面的分析可以很容易了解到被观察者被订阅时是从最外面的一层(ObservableSubscribeOn)通知到里面的一层(ObservableOnSubscribe),当连续执行了到多次 subscribeOn() 的时候,其实就是先执行倒数第一次的 subscribeOn() 方法,直到最后一次执行的 subscribeOn() 方法,这样肯定会覆盖前面的线程切换

8. observeOn(AndroidSchedulers.mainThread())

1
2
3
4
5
6
7
8
9
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
....

return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
  • 可以看到,observeOn() 方法内部最终也是返回了一个 ObservableObserveOn 对象

9. ObservableObserveOn#subscribeActual()

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 1
if (scheduler instanceof TrampolineScheduler) {
// 2
source.subscribe(observer);
} else {
// 3
Scheduler.Worker w = scheduler.createWorker();
// 4
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
  • 首先,在注释 1 处,判断指定的调度器是不是 TrampolineScheduler,这是一个不进行线程切换,立即执行当前代码的调度器。如果是,则会直接调用 ObservableSubscribeOn 的 subscribe() 方法;如果不是,则会在注释 3 处创建一个工作者对象
  • 然后,在注释 4 处创建一个新的 ObserveOnObserver 将 SubscribeOnobserver 对象包装起来,并传入 ObservableSubscribeOn 的 subscribe() 方法进行订阅

10. ObserveOnObserver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void onNext(T t) {
...
if (sourceMode != QueueDisposable.ASYNC) {
// 1
queue.offer(t);
}
schedule();
}

@Override
public void onError(Throwable t) {
...
schedule();
}

@Override
public void onComplete() {
...
schedule();
}
  • 去除非主线逻辑的代码,在 ObserveOnObserver 的 onNext()onError()onComplete() 方法中最后都会调用到 schedule() 方法。接着看 schedule() 方法,其中 onNext() 还会把消息存放到队列中

11. ObserveOnObserver#schedule()

1
2
3
4
5
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
  • 这里使用了 worker 进行调度 ObserveOnObserver 这个实现了 Runnable 的任务。worker 就是在 AndroidSchedulers.mainThread() 中创建的,内部其实就是使用 Handler 进行线程切换的

12. ObserveOnObserver#run()

1
2
3
4
5
6
7
8
9
10
@Override
public void run() {
// 1
if (outputFused) {
drainFused();
} else {
// 2
drainNormal();
}
}
  • 在注释 1 处会先判断 outputFused 这个标志位,它表示事件流是否被融化掉,默认是 false,所以,最后会执行到 drainNormal() 方法

13. ObserveOnObserver#drainNormal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void drainNormal() {
int missed = 1;

final SimpleQueue<T> q = queue;

// 1
final Observer<? super T> a = downstream;

...

// 2
v = q.poll();

...
// 3
a.onNext(v);

...
}
  • 在注释 1 处,这里的 downstream 实际上是从外面传进来的 SubscribeOnObserver 对象。在注释 2 处将队列中的消息取出来,接着在注释 3 处调用了 SubscribeOnObserver 的 onNext() 方法。最终,会从我们包装类的最外层一直调用到最里面的我们自定义的 Observer 中的 onNext() 方法,所以,在 observeOn() 方法下面的链式代码都会执行到它所指定的线程中
-------------------- 本文结束感谢您的阅读 --------------------