RxJava
a library for composing asynchronous and event-based programs using observable sequences for the Java VM
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
随着程序逻辑变得越来越复杂,它依然能够保持简洁
最早,我们这么写代码
private void requestFinished(JSONObject json) {
new DealAsync().execute(json);
}
private class DealAsync extends AsyncTask<Void, Void, Void> {
@Override
protected List<Object> doInBackground(JSONArray... params) {
//TOO deal Data
dealData(params);
return data;
}
@Override
protected void onPostExecute(List<Object> entities) {
super.onPostExecute(entities);
//TODO deal data to view
}
}
现在,可以这么写
private void requestFinished(JSONObject json) {
Observable.just(dealData(json))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object object) {
//TODO deal data to view
}
});
}
sample again!
一组文件夹下所有的 png 图片都加载出来,并显示在 imageCollectorView 内
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
一组文件夹下所有的 png 图片都加载出来,并显示在 imageCollectorView 内
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});
每次你打开,看到的是这样的:
Observable.from(folders)
.flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
.filter((Func1) (file) -> { file.getName().endsWith(".png") })
.map((Func1) (file) -> { getBitmapFromFile(file) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
核心:Observables(被观察者,事件源)
Subscribers(观察者)
Observables 发出事件, Subscribers 处理事件
eg: 触摸事件,web接口调用返回的数据
常见观察者模式
RxJava
区别:Observable 没有 Subscriber 的话,则不会发出任何事件
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();
}
}
);
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
};
myObservable.subscribe(mySubscriber);
Observable.just("Hello, world!")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
或
Observable.just("Hello, world!")
.subscribe(s -> System.out.println(s));
Hello, world!中加签名:
Hello, world! -MrFu
Observable.just("Hello, world! -MrFu")
.subscribe(s -> System.out.println(s + " -MrFu"));
Observable.just("Hello, world!")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + " -Dan";
}
})
.subscribe(s -> System.out.println(s));
Observable.just("Hello, world!")
.map(s -> s + " -Dan")
.subscribe(s -> System.out.println(s));
Observable.just("Hello, world!")
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.hashCode();
}
})
.subscribe(i -> System.out.println(Integer.toString(i)));
Observable.just("Hello, world!")
.map(s -> s.hashCode())
.subscribe(i -> System.out.println(Integer.toString(i)));
Observable 可以是一个数据库查询,Subscriber 用来显示查询结果;
Observable 可以是屏幕上的点击事件,Subscriber 用来响应点击事件;
Observable 可以是一个网络请求,Subscriber 用来显示请求结果。
Observable 和 Subscriber 是独立于中间的变换过程的
此处应该有表情!
Observable<List<String>> query(String text); //返回 urls
按照之前做法,我们会这么做:
query("Hello, world!")
.subscribe(urls -> {
for (String url : urls) {
System.out.println(url);
}
});
但是,还能这么做:
query("Hello, world!")
.subscribe(urls -> {
Observable.from(urls)
.subscribe(url -> System.out.println(url));
});
不够!!!
超牛逼的操作符:flatMap
query("Hello, world!")
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(List<String> urls) {
return Observable.from(urls);
}
})
.subscribe(url -> System.out.println(url));
query("Hello, world!")
.flatMap(urls -> Observable.from(urls))
.subscribe(url -> System.out.println(url));
Observable.flatMap() 接收一个 Observable 的输出作为输入,同时输出另外一个 Observable。
flatMap 输出的新的 Observable 正是我们在 Subscriber 想要接收的。现在 Subscriber 不再收到 List(String),而是收到一系列单个的字符串,就像 Observable.from() 的输出一样。
See More!
query("Hello, world!")
.flatMap(urls -> Observable.from(urls))
.flatMap(url -> getTitle(url))
.subscribe(title -> System.out.println(title));
query("Hello, world!")
.flatMap(urls -> Observable.from(urls))
.flatMap(url -> getTitle(url))
.filter(title -> title != null)//过滤不为 null
.take(5)//只要5个结果
.doOnNext(title -> saveTitle(title))//打印前保存标题
.subscribe(title -> System.out.println(title));
Operators 可以让你对数据流做任何操作
响应式函数编程的魅力:一系列的 Operators 链接起来就可以完成复杂的逻辑。代码被分解成一系列可以组合的片段
Deal Error
Observable.just("Hello, world!")
.map(s -> potentialException(s))
.map(s -> anotherPotentialException(s))
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { System.out.println("Completed!"); }
@Override
public void onError(Throwable e) { System.out.println("Ouch!"); }
});
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
常用于:后台线程取数据,主线程显示
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
Sample
getUser(userId, new Callback<User>() {
@Override
public void success(User user) {
new Thread() {
@Override
public void run() {
processUser(user); // 尝试修正 User 数据
runOnUiThread(new Runnable() { // 切回 UI 线程
@Override
public void run() {
userView.setUser(user);
}
});
}).start();
}
@Override
public void failure(RetrofitError error) {
// Error handling
...
}
};
getUser(userId)
.doOnNext(new Action1<User>() {
@Override
public void call(User user) {
processUser(user);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable error) {
// Error handling
...
}
});
Anything else?
Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件
.subscribe(new Action1<ViewClickEvent>() {
@Override
public void call(ViewClickEvent event) {
// Click handling
}
});
RxView.clickEvents(button)
.throttleFirst(500, TimeUnit.MILLISECONDS)
.subscribe(clickAction);
一种模式,使用 RxJava 实现 EventBus,从而不需要使用 Otto 或者 EventBus。[1]
Hey, do not be shy.