rxjava-keynote



rxjava-keynote

0 7


rxjava-keynote

rxjava-keynote

On Github MrFuFuFu / rxjava-keynote

RxJava

MrFu / @MrFuFuFu

What is the RxJava?

a library for composing asynchronous and event-based programs using observable sequences for the Java VM

一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库

Why use it?

简洁,异步

随着程序逻辑变得越来越复杂,它依然能够保持简洁

最早,我们这么写代码

private void requestFinished(JSONObject json) {
    new DealAsync().execute(json);
}

private class DealAsync extends AsyncTask<Void, Void, Void> {
    @Override
    protected List<Object> doInBackground(JSONArray... params) {
        //TOO deal Data
        dealData(params);
        return data;
    }
    @Override
    protected void onPostExecute(List<Object> entities) {
        super.onPostExecute(entities);
        //TODO deal data to view
    }
}

现在,可以这么写

private void requestFinished(JSONObject json) {
    Observable.just(dealData(json))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Object>() {
                @Override
                public void call(Object object) {
                    //TODO deal data to view
                }
            });
}

没啥变化?

sample again!

一组文件夹下所有的 png 图片都加载出来,并显示在 imageCollectorView 内

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {

@Override

public void run() {

    imageCollectorView.addImage(bitmap);

}
                    });
                }
            }
        }
    }
}.start();

一组文件夹下所有的 png 图片都加载出来,并显示在 imageCollectorView 内

Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
    @Override
    public Observable<File> call(File file) {
        return Observable.from(file.listFiles());
    }
})
.filter(new Func1<File, Boolean>() {
    @Override
    public Boolean call(File file) {
        return file.getName().endsWith(".png");
    }
})
.map(new Func1<File, Bitmap>() {
    @Override
    public Bitmap call(File file) {
        return getBitmapFromFile(file);
    }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
    @Override
    public void call(Bitmap bitmap) {
        imageCollectorView.addImage(bitmap);
    }
});

每次你打开,看到的是这样的:

Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

入了个门

核心:Observables(被观察者,事件源)

Subscribers(观察者)

Observables 发出事件, Subscribers 处理事件

eg: 触摸事件,web接口调用返回的数据

常见观察者模式

RxJava

区别:Observable 没有 Subscriber 的话,则不会发出任何事件

Observable<String> myObservable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            sub.onNext("Hello, world!");
            sub.onCompleted();
        }
    }
);
Subscriber<String> mySubscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) { System.out.println(s); }

    @Override
    public void onCompleted() { }

    @Override
    public void onError(Throwable e) { }
};
myObservable.subscribe(mySubscriber);

太复杂?

Observable.just("Hello, world!")
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });

Observable.just("Hello, world!")
    .subscribe(s -> System.out.println(s));

需求:

Hello, world!中加签名:

Hello, world! -MrFu

  • 修改 Observable 对象
    Observable.just("Hello, world! -MrFu")
    
  • Subscriber 中修改:
    .subscribe(s -> System.out.println(s + " -MrFu"));
    

NO!!!!

操作符

Observable.just("Hello, world!")
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return s + " -Dan";
            }
        })
        .subscribe(s -> System.out.println(s));
Observable.just("Hello, world!")
        .map(s -> s + " -Dan")
        .subscribe(s -> System.out.println(s));
Observable.just("Hello, world!")
    .map(new Func1<String, Integer>() {
        @Override
        public Integer call(String s) {
            return s.hashCode();
        }
    })
    .subscribe(i -> System.out.println(Integer.toString(i)));
Observable.just("Hello, world!")
    .map(s -> s.hashCode())
    .subscribe(i -> System.out.println(Integer.toString(i)));
  • Observable 可以是一个数据库查询,Subscriber 用来显示查询结果;

  • Observable 可以是屏幕上的点击事件,Subscriber 用来响应点击事件;

  • Observable 可以是一个网络请求,Subscriber 用来显示请求结果。

Observable 和 Subscriber 是独立于中间的变换过程的

来点牛逼的?

此处应该有表情!

造个搜索引擎!

Observable<List<String>> query(String text);  //返回 urls

按照之前做法,我们会这么做:

query("Hello, world!")
    .subscribe(urls -> {
        for (String url : urls) {
            System.out.println(url);
        }
    });

但是,还能这么做:

query("Hello, world!")
    .subscribe(urls -> {
        Observable.from(urls)
            .subscribe(url -> System.out.println(url));
    });

不够!!!

超牛逼的操作符:flatMap

query("Hello, world!")
    .flatMap(new Func1<List<String>, Observable<String>>() {
        @Override
        public Observable<String> call(List<String> urls) {
            return Observable.from(urls);
        }
    })
    .subscribe(url -> System.out.println(url));
query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .subscribe(url -> System.out.println(url));
  • Observable.flatMap() 接收一个 Observable 的输出作为输入,同时输出另外一个 Observable。

  • flatMap 输出的新的 Observable 正是我们在 Subscriber 想要接收的。现在 Subscriber 不再收到 List(String),而是收到一系列单个的字符串,就像 Observable.from() 的输出一样。

See More!

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .subscribe(title -> System.out.println(title));
query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)//过滤不为 null
    .take(5)//只要5个结果
    .doOnNext(title -> saveTitle(title))//打印前保存标题
    .subscribe(title -> System.out.println(title));
  • Operators 可以让你对数据流做任何操作

  • 响应式函数编程的魅力:一系列的 Operators 链接起来就可以完成复杂的逻辑。代码被分解成一系列可以组合的片段

Deal Error

Observable.just("Hello, world!")
    .map(s -> potentialException(s))
    .map(s -> anotherPotentialException(s))
    .subscribe(new Subscriber<String>() {
        @Override
        public void onNext(String s) { System.out.println(s); }

        @Override
        public void onCompleted() { System.out.println("Completed!"); }

        @Override
        public void onError(Throwable e) { System.out.println("Ouch!"); }
    });
  • 错误处理
  • 所有的异常交给订阅者处理
  • 订阅者知道什么时候已经接收了全部数据

牛逼哄哄的 Scheduler

  • subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
  • observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

常用于:后台线程取数据,主线程显示

切一次切不够!!

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread)
    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

Sample

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        new Thread() {
            @Override
            public void run() {
                processUser(user); // 尝试修正 User 数据
                runOnUiThread(new Runnable() { // 切回 UI 线程
                    @Override
                    public void run() {

userView.setUser(user);
                    }
                });
            }).start();
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};

Callback hell !!!!!

Solve it!

getUser(userId)
    .doOnNext(new Action1<User>() {
        @Override
        public void call(User user) {
            processUser(user);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

Anything else?

RxBinding

Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件
    .subscribe(new Action1<ViewClickEvent>() {
        @Override
        public void call(ViewClickEvent event) {
            // Click handling
        }
    });
RxView.clickEvents(button)
    .throttleFirst(500, TimeUnit.MILLISECONDS)
    .subscribe(clickAction);

RxBus

一种模式,使用 RxJava 实现 EventBus,从而不需要使用 Otto 或者 EventBus。[1]

各种异步!!

Question

Hey, do not be shy.

THE END

MrFu
Blog / GitHub
RxJava MrFu / @MrFuFuFu