Fork me on GitHub

Rxjava

RxJava

定义

> Rxjava是一个<kbd>基于事件流</kbd>、<kbd>实现异步操作</kbd>的库

特点

逻辑简洁

实现优雅

使用简单

原理

RxJava原理基于一种扩展的观察者模式

RxJava的扩展观察者模式分为4个角色

角色 作用 类比
被观察者(Observable) 产生事件 顾客
观察者(Observer) 接收事件&给出对应的动作 厨房
订阅(Subscribe) 连接被观察者&观察者 服务员
事件(Event) 被观察者&观察者沟通的载体 菜式

原理可以总结出一句话

Observable通过Subscribe按顺序发送Event给Observer
Observer按顺序接收Event并作出对应的相应动作

基本使用

分步使用

  1. 创建Observable生产Event
  2. 创建Observe,定义响应事件行为
  3. 通过Subscribe连接Observable和Observer

链式调用

  • 实际使用中,使用链式调用

RxJava操作符

详见另外一个文章(单独写一个)

什么时候使用RxJava

网络请求轮询(无条件)

网络请求轮询(有条件)

网络请求嵌套回调

发送网络请求时的差错重试机制

合并数据源&同时展示

获取缓存数据

联合判断

线程操作(切换/调度/控制)

功能防抖

联想搜索请求优化

背压

网络请求轮询(无条件)

客户端隔固定时间主动向服务器发送请求获取信息(pull)(轮询)

RxJava有延时操作符interval()intervalRange()

例子的场景是:RxJava和Retrofit结合,对某一接口实现无条件轮询,部分步骤略,只展示关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/*
* 步骤1:采用interval()延迟发送
* 注:此处主要展示无限次轮询,若要实现有限次轮询,仅需将interval()改成intervalRange()即可
**/
Observable.interval(2,1, TimeUnit.SECONDS)
// 参数说明:
// 参数1 = 第1次延迟时间;
// 参数2 = 间隔时间数字;
// 参数3 = 时间单位;
// 该例子发送的事件特点:延迟2s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)

/*
* 步骤2:每次发送数字前发送1次网络请求(doOnNext()在执行Next事件前调用)
* 即每隔1秒产生1个数字前,就发送1次网络请求,从而实现轮询需求
**/
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long integer) throws Exception {
Log.d(TAG, "第 " + integer + " 次轮询" );

/*
* 步骤3:通过Retrofit发送网络请求
**/
// a. 创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();

// b. 创建 网络请求接口 的实例
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);

// c. 采用Observable<...>形式 对 网络请求 进行封装
Observable<Translation> observable = request.getCall();
// d. 通过线程切换发送网络请求
observable.subscribeOn(Schedulers.io())// 切换到IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread())// 切换回到主线程 处理请求结果
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}

@Override
public void onNext(Translation result) {
// e.接收服务器返回的数据
result.show() ;
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "请求失败");
}

@Override
public void onComplete() {

}
});

}
}).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {

}
@Override
public void onNext(Long value) {

}

@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}

@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});

网络请求轮询(有条件)

客户端隔固定时间主动向服务器发送请求获取信息(pull)(轮询)

根据服务器返回信息停止轮询

RxJava有repeatWhen()

例子的场景是:RxJava和Retrofit结合,对某一接口实现有条件轮询,部分步骤略,只展示关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 步骤1:创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
.build();

// 步骤2:创建 网络请求接口 的实例
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);

// 步骤3:采用Observable<...>形式 对 网络请求 进行封装
Observable<Translation> observable = request.getCall();

// 步骤4:发送网络请求 & 通过repeatWhen()进行轮询
observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
// 在Function函数中,必须对输入的 Observable<Object>进行处理,此处使用flatMap操作符接收上游的数据
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
// 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
// 以此决定是否重新订阅 & 发送原来的 Observable,即轮询
// 此处有2种情况:
// 1. 若返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable,即轮询结束
// 2. 若返回其余事件,则重新订阅 & 发送原来的 Observable,即继续轮询
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {

// 加入判断条件:当轮询次数 = 5次后,就停止轮询
if (i > 3) {
// 此处选择发送onError事件以结束轮询,因为可触发下游观察者的onError()方法回调
return Observable.error(new Throwable("轮询结束"));
}
// 若轮询次数<4次,则发送1Next事件以继续轮询
// 注:此处加入了delay操作符,作用 = 延迟一段时间发送(此处设置 = 2s),以实现轮询间间隔设置
return Observable.just(1).delay(2000, TimeUnit.MILLISECONDS);
}
});

}
}).subscribeOn(Schedulers.io()) // 切换到IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) // 切换回到主线程 处理请求结果
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}

@Override
public void onNext(Translation result) {
// e.接收服务器返回的数据
result.show() ;
i++;
}

@Override
public void onError(Throwable e) {
// 获取轮询结束信息
Log.d(TAG, e.toString());
}

@Override
public void onComplete() {

}
});

网络请求嵌套回调

在第一次请求成功后,再进行第二次回调

RxJava2 中有FlatMap()

例子的场景是:RxJava和Retrofit结合,对两个接口实现网络请求嵌套回调,部分步骤略,只展示关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class MainActivity extends AppCompatActivity {

private static final String TAG = "Rxjava";

// 定义Observable接口类型的网络请求对象
Observable<Translation1> observable1;
Observable<Translation2> observable2;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);

// 步骤1:创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
.build();

// 步骤2:创建 网络请求接口 的实例
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);

// 步骤3:采用Observable<...>形式 对 2个网络请求 进行封装
observable1 = request.getCall();
observable2 = request.getCall_2();


observable1.subscribeOn(Schedulers.io()) // (初始被观察者)切换到IO线程进行网络请求1
.observeOn(AndroidSchedulers.mainThread()) // (新观察者)切换到主线程 处理网络请求1的结果
.doOnNext(new Consumer<Translation1>() {
@Override
public void accept(Translation1 result) throws Exception {
Log.d(TAG, "第1次网络请求成功");
result.show();
// 对第1次网络请求返回的结果进行操作 = 显示翻译结果
}
})

.observeOn(Schedulers.io()) // (新被观察者,同时也是新观察者)切换到IO线程去发起登录请求
// 特别注意:因为flatMap是对初始被观察者作变换,所以对于旧被观察者,它是新观察者,所以通过observeOn切换线程
// 但对于初始观察者,它则是新的被观察者
.flatMap(new Function<Translation1, ObservableSource<Translation2>>() { // 作变换,即作嵌套网络请求
@Override
public ObservableSource<Translation2> apply(Translation1 result) throws Exception {
// 将网络请求1转换成网络请求2,即发送网络请求2
return observable2;
}
})

.observeOn(AndroidSchedulers.mainThread()) // (初始观察者)切换到主线程 处理网络请求2的结果
.subscribe(new Consumer<Translation2>() {
@Override
public void accept(Translation2 result) throws Exception {
Log.d(TAG, "第2次网络请求成功");
result.show();
// 对第2次网络请求返回的结果进行操作 = 显示翻译结果
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("登录失败");
}
});
}
}
-------------本文结束感谢阅读-------------