RxJava 2.x 实战系列(二):RxJava 基础知识

1. 怎样理解 Observable?

答:

RxJava 的使用通常需要三步:

  • 创建 ObservableObservable 的字面意思是被观察者,使用 RxJava 时需要创建一个被观察者,它会决定什么时候触发事件以及触发怎样的事件。有点类似上游发送命令,可以在这里决定异步操作模块的顺序和异步操作模块的次数。
  • 创建 ObserverObserver 即观察者,它可以在不同的线程中执行任务。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,可以在未来某个时刻响应 Observable 的通知,而不需要阻塞等待 Observable 发射数据。
  • 使用 subscribe() 进行订阅:创建了 ObservableObserver 之后,我们还需要使用 subscribe() 方法将它们连接起来,这样整个上下游就能衔接起来实现链式调用。

2. subscribe 的重载方法有?

答:

  • subscribe(onNext)
  • subscribe(onNext, onError)
  • subscribe(onNext, onError, onComplete): 在 subscribeonComplete 是执行完 onNext 之后再执行的。
  • subscribe(onNext, onError, onComplete, onSubscribe): 先执行 onSubscribe,再执行 onNextonComplete

3. 【笔试题】写出下面代码的输出结果?

答:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.just("Hello World)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println(throwable.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete());
}
});

执行结果:
Hello World
onComplete()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.just("Hello World)
.subscribe(new Consumer<String> () {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
}, new Consumer<Throwable> () {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println(throwable.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
}, new Consumer<Disposable> () {
System.out.println("subscribe);
});

执行结果:
subscribe
Hello World
onComplete()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.just("Hello World)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("subscribe");
}

@Override
public void onNext(String s) {
System.out.println(s);
}

@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}

@Override
public void onComplete() {
System.out.println("onComplete()");
}
});

执行结果:
subscribe
Hello World
onComplete()

4. RxJava 2 的 5 种观察者模式?

答:

  • ObservableObserver

    • Observable: 能够发射 0 或 n 个数据,并以成功或错误事件终止。
  • FlowableSubscriber

    • Flowable:能够发射 0 或 n 个数据,并以成功或错误事件终止。支持背压,可以控制数据源发射的速度
  • SingleSingleObserver

    • Single:只发射单个数据或错误事件。
  • CompletableCompletableObserver

    • Completable:从来不发射数据,只处理 onCompleteonError 事件。可以看成 RxRunnable
  • MaybeMaybeObserver

    • Maybe:能够发射 0 或者 1 个数据,要么成功、要么失败。有点类似 Optional

5. 怎样理解 do 操作符?

答:

do 操作符可以给 Observable 的生命周期的各个阶段加上一系列的回调监听,当 Observable 执行到这个阶段时,这些回调就会被触发

6. 下面代码的执行结果是?

答:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
Observable.just("Hello")
.doOnNext(new Consumer<String> () {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("doOnNext: " + s);
}
})
.doAfterNext(new Consumer<String> () {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("doAfterNext: " + s);
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnComplete: ");
}
})
.doOnSubscribe(new Consumer<Disposable> () { // 订阅之后回调的方法
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
System.out.println("doOnSubscribe: ");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
System.out.println("doAfterTerminate: ");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
System.out.println("doFinally: ");
}
})
.doOnEach(new Consumer<Notification<String>> () { // Observable 每发射一个数据就会触发这个回调,不仅包括 onNext, 还包括 onError 和 onCompleted
@Override
public void accept(@NonNull Notification<String> stringNotification) throws Exception {
System.out.println("doOnEach: " + (stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError"));
}
})
.doOnLifecycle(new Consumer<Disposable>() { // 订阅之后可以取消订阅
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
System.out.println("doOnLifecycle: " + disposable.isDisposed());
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnLifecycle run: ");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("收到消息:" + s);
}
});

执行结果:
doOnSubscribe:
doOnLifecycle: false
doOnNext: Hello
doOnEach: onNext
收到消息:Hello
doAfterNext: Hello
doOnComplete:
doOnEach: onComplete
doFinally:
doAfterTerminate:

7. 描述一下常用的 do 操作符?

答:

  • doOnSubscribe:

    • 一旦观察者订阅了 Observable,它就会被调用。
  • doOnLifecycle:

    • 可以在观察者订阅之后,设置是否取消订阅。
  • doOnNext:

    • 它产生的 Observable 每发射一项数据就会调用它一次,它的 Consumer 接受发射的数据项。一般用于在 subscribe 之前对数据进行处理。
  • doOnEach:

    • 它产生的 Observable 每发射一项数据就会调用它一次,不仅包括 onNext,还包括 onErroronCompleted
  • doAfterNext:

    • onNext 之后执行,doOnNext() 是在 onNext 之前执行
  • doOnComplete:

    • 当它产生的 Observable正常终止调用 onComplete 时会被调用。
  • doFinally:

    • 在它产生的 Observable 终止之后会被调用,无论是正常终止还是异常终止doFinally 优先于 doAfterTerminate 的调用。
  • doAfterTerminate:

    • 注册一个 Action,当 Observable 调用 onCompleteonError 时触发。
-------------本文结束感谢您的阅读-------------