RxJava
a library for composing asynchronous and event-based programs using observable sequences for the Java VM
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
随着程序逻辑变得越来越复杂,它依然能够保持简洁
最早,我们这么写代码
private void requestFinished(JSONObject json) { new DealAsync().execute(json); } private class DealAsync extends AsyncTask<Void, Void, Void> { @Override protected List<Object> doInBackground(JSONArray... params) { //TOO deal Data dealData(params); return data; } @Override protected void onPostExecute(List<Object> entities) { super.onPostExecute(entities); //TODO deal data to view } }
现在,可以这么写
private void requestFinished(JSONObject json) { Observable.just(dealData(json)) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Object>() { @Override public void call(Object object) { //TODO deal data to view } }); }
sample again!
一组文件夹下所有的 png 图片都加载出来,并显示在 imageCollectorView 内
new Thread() { @Override public void run() { super.run(); for (File folder : folders) { File[] files = folder.listFiles(); for (File file : files) { if (file.getName().endsWith(".png")) { final Bitmap bitmap = getBitmapFromFile(file); getActivity().runOnUiThread(new Runnable() { @Override public void run() { imageCollectorView.addImage(bitmap); } }); } } } } }.start();
一组文件夹下所有的 png 图片都加载出来,并显示在 imageCollectorView 内
Observable.from(folders) .flatMap(new Func1<File, Observable<File>>() { @Override public Observable<File> call(File file) { return Observable.from(file.listFiles()); } }) .filter(new Func1<File, Boolean>() { @Override public Boolean call(File file) { return file.getName().endsWith(".png"); } }) .map(new Func1<File, Bitmap>() { @Override public Bitmap call(File file) { return getBitmapFromFile(file); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); } });
每次你打开,看到的是这样的:
Observable.from(folders) .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }) .filter((Func1) (file) -> { file.getName().endsWith(".png") }) .map((Func1) (file) -> { getBitmapFromFile(file) }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
核心:Observables(被观察者,事件源)
Subscribers(观察者)
Observables 发出事件, Subscribers 处理事件
eg: 触摸事件,web接口调用返回的数据
常见观察者模式
RxJava
区别:Observable 没有 Subscriber 的话,则不会发出任何事件
Observable<String> myObservable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> sub) { sub.onNext("Hello, world!"); sub.onCompleted(); } } );
Subscriber<String> mySubscriber = new Subscriber<String>() { @Override public void onNext(String s) { System.out.println(s); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } };
myObservable.subscribe(mySubscriber);
Observable.just("Hello, world!") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } });
或
Observable.just("Hello, world!") .subscribe(s -> System.out.println(s));
Hello, world!中加签名:
Hello, world! -MrFu
Observable.just("Hello, world! -MrFu")
.subscribe(s -> System.out.println(s + " -MrFu"));
Observable.just("Hello, world!") .map(new Func1<String, String>() { @Override public String call(String s) { return s + " -Dan"; } }) .subscribe(s -> System.out.println(s));
Observable.just("Hello, world!") .map(s -> s + " -Dan") .subscribe(s -> System.out.println(s));
Observable.just("Hello, world!") .map(new Func1<String, Integer>() { @Override public Integer call(String s) { return s.hashCode(); } }) .subscribe(i -> System.out.println(Integer.toString(i)));
Observable.just("Hello, world!") .map(s -> s.hashCode()) .subscribe(i -> System.out.println(Integer.toString(i)));
Observable 可以是一个数据库查询,Subscriber 用来显示查询结果;
Observable 可以是屏幕上的点击事件,Subscriber 用来响应点击事件;
Observable 可以是一个网络请求,Subscriber 用来显示请求结果。
Observable 和 Subscriber 是独立于中间的变换过程的
此处应该有表情!
Observable<List<String>> query(String text); //返回 urls
按照之前做法,我们会这么做:
query("Hello, world!") .subscribe(urls -> { for (String url : urls) { System.out.println(url); } });
但是,还能这么做:
query("Hello, world!") .subscribe(urls -> { Observable.from(urls) .subscribe(url -> System.out.println(url)); });
不够!!!
超牛逼的操作符:flatMap
query("Hello, world!") .flatMap(new Func1<List<String>, Observable<String>>() { @Override public Observable<String> call(List<String> urls) { return Observable.from(urls); } }) .subscribe(url -> System.out.println(url));
query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .subscribe(url -> System.out.println(url));
Observable.flatMap() 接收一个 Observable 的输出作为输入,同时输出另外一个 Observable。
flatMap 输出的新的 Observable 正是我们在 Subscriber 想要接收的。现在 Subscriber 不再收到 List(String),而是收到一系列单个的字符串,就像 Observable.from() 的输出一样。
See More!
query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(url -> getTitle(url)) .subscribe(title -> System.out.println(title));
query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(url -> getTitle(url)) .filter(title -> title != null)//过滤不为 null .take(5)//只要5个结果 .doOnNext(title -> saveTitle(title))//打印前保存标题 .subscribe(title -> System.out.println(title));
Operators 可以让你对数据流做任何操作
响应式函数编程的魅力:一系列的 Operators 链接起来就可以完成复杂的逻辑。代码被分解成一系列可以组合的片段
Deal Error
Observable.just("Hello, world!") .map(s -> potentialException(s)) .map(s -> anotherPotentialException(s)) .subscribe(new Subscriber<String>() { @Override public void onNext(String s) { System.out.println(s); } @Override public void onCompleted() { System.out.println("Completed!"); } @Override public void onError(Throwable e) { System.out.println("Ouch!"); } });
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } });
常用于:后台线程取数据,主线程显示
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定 .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .map(mapOperator) // 新线程,由 observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // IO 线程,由 observeOn() 指定 .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber); // Android 主线程,由 observeOn() 指定
Sample
getUser(userId, new Callback<User>() { @Override public void success(User user) { new Thread() { @Override public void run() { processUser(user); // 尝试修正 User 数据 runOnUiThread(new Runnable() { // 切回 UI 线程 @Override public void run() { userView.setUser(user); } }); }).start(); } @Override public void failure(RetrofitError error) { // Error handling ... } };
getUser(userId) .doOnNext(new Action1<User>() { @Override public void call(User user) { processUser(user); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<User>() { @Override public void onNext(User user) { userView.setUser(user); } @Override public void onCompleted() { } @Override public void onError(Throwable error) { // Error handling ... } });
Anything else?
Button button = ...; RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件 .subscribe(new Action1<ViewClickEvent>() { @Override public void call(ViewClickEvent event) { // Click handling } });
RxView.clickEvents(button) .throttleFirst(500, TimeUnit.MILLISECONDS) .subscribe(clickAction);
一种模式,使用 RxJava 实现 EventBus,从而不需要使用 Otto 或者 EventBus。[1]
Hey, do not be shy.