Java Functional Reactive 编程

栏目: 编程语言 · 发布时间: 7年前

内容简介:Java Functional Reactive 编程

我们如何才能将传统的变量式线程同步管理转移到基于数据的异步流式编程上?试试 Reactive Programming 吧!这篇分享包含的主题有: reactive 扩展,observers ,以及 RxJava 的工作原理。尽管不容易上手,但是一旦上手,其乐无穷。

Save the date for Droidcon SF in March — a conference with best-in-class presentations from leaders in all parts of the Android ecosystem.

我叫 Juan Gomez 。我今天在这里想和大家聊聊 Reactive Programming (在这里,Reactive Programming 是一个有争议的名词。很多时候,Reactive Programming 指的是响应式编程或者响应式扩展,查看 André Staltz 的文章了解跟多关于响应式编程的论战)。我今天会跟大家介绍一些 Reactive Extensions 的概念,以及在 Reactive extension 中的 Observable 和 Observable Pattern,以及 Subscription。最后我会跟大家分享一些 RxJava 相关的主题。

什么是 Function Reactive 编程?

Erik Meijer 是推动函数式响应编程(function reactive programming)的主力人物。他在微软的时候亲手创建了.NET 环境下的 Reactive Extensions (Rx)。由于 Reactive Programming 包含些别的意思,以及有些历史,所以我一直把这个叫做 函数式 响应型编程。

什么是 reactive extensions?

Rx 系列,发起于 Erik Meijer (一开始只有 C# 和 .Net,后来才出现其他语言)。最早是一个库,协助你完成 函数式响应型编程。在 Rx 里,利用『Schedule Operation』便可以参数化异步数据流并发。简单来说,Rx 把你从底层的线程、同步、线程安全、并发数据结构里拯救出来。

Rx 已经实现了很多语言版本了,比如: C# (最早), JavaScript, RxJava (一个 JVM 库,所以说所有基于 JVM 的语言都可以用), Ruby, RxPY for Python (by Microsoft) 以及 更多

为什么是 functional reactive programming?

写并发程序是个很麻烦的事情。FRP 帮助你从一个更高的层面去思考你的异步程序,让你的程序流程更简单些,帮助你优化错误处理,写更少的代码,避免更多的 Bug,这些主要是 Reactive 的部分。Function 部分是 Reactive Extension。Rx 允许你操作和合并事件流。这也正是 function reative programming 强大之处:合并函数,操作和变换事件流。

Receive news and updates from Realm straight to your inbox

当你自以为很了解多线程编程的时候,想想 Hackborn 的那句话:

“Once you think you now understand it and are an expert, you are heading soon to another painful lesson that multithreading is hard.”

函数式响应型编程的本质是:我们处理事务的时候,通常迭代式同步处理的(比如:从时间线上获取推文 - 我们 pull 一个迭代器,同步处理,然后返回结果)。 FRP 的核心是,创建 Push 模型。产生一系列的事件流(或者事件集合),

for (Iterator<Tweet> iterator = timeline.iterator(); iterator.hasNext();)
{
    Tweet tweet = iterator.next();
    Log.i(TAG, tweet.getTitle());
}

Observable 描述了二元性(duality)的概念(查看 Erik Meijer 的在线 视频 了解更多),你可以从一个 Iterable 的对象(Pull-Based 模型)继承出 Observable 类型(Push-Based 模型)。这种情况下,Observable 是一个类似镜像的东西,Iterables 会持续供给下一个元素。在一个 Observable 对象里,有 onNext :一个新的事件被引入。你 Pull 了下一个元素;另一个 Push 出去,你得到了事件,然后处理掉。

Observable 有三个方法: onNextonErroronComplete 。在一个 Observable 里,出现错误的时候,会回调 onError 。Observable 停止执行, hasNext 返回 false。如果出现错误,会立即停止遍历。如果执行成功,会调用 onComplete 方法。

The “Subscribe” method

在一个 Subscription 里要实现 onNextonErroronComplete

Observable<Tweet> tweetObservable =
Observable.create(new Observable.OnSubscribe<Tweet>() {
    @Override
    public void call(Subscriber<? super Tweet> subscriber) {
        if (!subscriber.isUnsubscribed()){
            try {
                Tweet favorite = TwitterService.getNewFavorites();
                subscriber.onNext(favorite);
                Tweet mention = TwitterService.getNewMentions();
                subscriber.onNext(mention);
                subscriber.onCompleted();
            } catch (JsonParseException e) {
                subscriber.onError(e);
            }
        }
    }
});

在上面的几行代码里,你可以看到处理事件都是流式的。我有一个 Observable,然后将它赋给一个 Subscription Observer 。然后 Observer 挨个处理每个到来的 tweets。这是一个很简单的例子来掩饰处理推文。正如你所见,我们在最下面有一个 catch 语句,当有异常的 JSON 的时候,我们会直接调用 onError 方法通知 observer 出错了,后续的 event 也会被停止。

Uncle Bob Martin 发过一篇文章:『 “Make the Magic go away” 』。它讲述说:Rx 的概念已经出现很久了,不是什么新鲜词。尽管我不同意他的某些观点,但是我同意他文末说到的:『任何你使用的库,对你来说都是 Magic 一般的存在,你需要了解这些库为什么能够工作,这样才能避免一些问题。』我另外要介绍一个 Reactivex.IO 官方站下的 一个 链接 ,你可以从中学习如何创建一个非常简单的 Rx 框架。更好的理解 Reactive extensions,再也不会有:『哦 天哪,这个东西好神奇』的感觉了。

RxJava 和 Retrolambda

RxJava 是 Rx 的 JVM 实现。

aObservable.filter(new Func1<Integer, Boolean>() {
        public Boolean call(Integer n) {
            return n % 2 == 0;
        }
    })
    .map(new Func1<Integer, Integer>() {
        public Integer call(Integer n) {
            return n * n;
        }
    })
    .subscribe(new Action1<Integer>() {
        public void call(Integer n) {
            System.out.println(n);
        }
    });

这是 Java 7 里 Observable 的操作:看着很啰嗦对吧?你可以用 Retrolambda ( Gradle plugin ; Java 8) 来简化它。

aObservable.filter(n -> n % 2 == 0)
    .map(n -> n * n)
    .subscribe(System.out::println);

RxJava 里的 Operators

Rx 最厉害的是它的一切都基于事件流。你需要决策的是:如何高效地组合操作。

aObservable.map(x -> x*x) //Square
.reduce((a, b) -> a+b) //Sum
.subscribe(x -> println(x)); //Show

上面的代码段提及的操作有:map,reduce,和 subscribe。实现的东西也很简单:先算对每个数求乘法得到面积,再把他们的面积都加起来。然后打印结果出来。随后我会用 marble diagram 来跟大家展示下。

Marble diagram (译者注:参考这个图来看)里,有一个时间的概念,时间轴是从左到右。你可以看到最上面有个带线的箭头,它有起点和终点。起点代表事件开始,终点代表结束。map 操作在时间轴中间某部分。map 操作把这些 marble 变换成了 diamonds。

Creating an Observable

List<String> aList = …;
ob = Observable.create(subscriber -> {
    try {
        for (String s : aList) {
            if (subscriber.isUnsubscribed())
                return;
            subscriber.onNext(s);
        }
        subscriber.onCompleted();
    } catch (Exception e) {
        subscriber.onError(e);
    }
});

Observer 必须得实现至少一个基本方法,在这里,上面的例子里,我们没有实现 onError ,我们只是呼叫了 onNext ,调用 onNext 结束后,执行了 onComplete 方法。同样,由于执行间隙可能���被调用 unsubscribe ,所以我们还得不间断的检查是否被 unsubscribe 过。

List<String> aList = …;
Observable<String> ob = Observable.from(aList);

如果你有一个集合,你想让每个数据都被挨个执行,你可以直接调用 Observable.from

Observable<List<String>> ob = Observable.just(aList); Observable<String> ob2 = Observable.just(“Some String”);

上面的代码用到了 Observable.just ,传入了一个集合。留意:用 Observable.from 的时候,Observable 在一个接一个的发射元素的。而 Observable.just 是一次发出整个集合。另外还有很多其他的操作函数来加速你的开发。

Transforming Observables

Map 可能是最为大家所知道的一个高阶函数。它可以对每个输入的内容进行变换。

这是一个具有代表性的 marble diagram 的例子。

Observable.range(0, 5)
    .map(x -> toBinaryString(x*x))
    .subscribe(s -> println(s),
        err -> err.printStackTrace(),
        () -> println(“done”));

上面的例子是将数字的十进制表示转换成为二进制表示的一个例子,然后将转换的结果在控制台输出出来。上面的操作都是异步操作:拿到每个发射的数字,变换,然后把结果输出出来。

Observable.range(1, 3)
.flatMap(x -> Observable.just(x).repeat(x)) .subscribe(System.out::println);

还有一个操作叫做 flatMap :他接受一个 Observable (这个 Observable 是从多个 Observable 中创建的) 参数。同样:我们有 Observable.Range 函数来发射数字,然后用 just 和 repeat 来重复发射这 3 个数字。并且用 flatMap 来平整 ( flatten ) 这些操作,让他们以事件流来发射。

Filtering Observables

当你只需要一个事件流里满足条件的一部分的时候,就需要用到 filter 过滤器。下面的例子是:我们传入0 - 10,我们只需要打印出 2 的整数倍的数。

Observable.range(0, 10)
    .filter(x -> (x % 2) == 0)
    .subscribe(System.out::println);

还有很多其他的变换函数,你可以查看这个 链接 了解更多。

你可以加和或者聚合一切事件流的元素。然后发射一个单一的输出出去。下面的例子是 10 的阶乘的写法。

Observable.range(1, 10)
    .reduce((a, b) -> a*b)
    .subscribe(System.out::println);

Combining Observables

你可以使用 zip 组合不同的 Observable ,生成一个新的 Observable。比如:你可以将读取磁盘的 Observable 和 读取 RAM 的 Observable 结合起来生成一个 Observable。

Observable<String> lower =
Observable.from(new String[]{“a”, “b”, “c”});
Observable<String> upper =
Observable.from(new String[]{“A”, “B”, “C”});

Observable.zip(lower, upper, Pair::create)
    .map(pair -> pair.first +”_” +pair.second)
    .subscribe(System.out::println);

上面的例子把两个 Observable 结合起来了。这就可能带来一个新的问题:假使有一个 Observable 发射事件的速度比另一个要快很多,那么 zip 后的 Observable,就可能产生快的等待慢的的情况。

使用 schedulers 的一个好处是你可以决定发射事件活着订阅回调以及处理事件所在的线程。使用 observeOnsubscribeOn 来设置你想让 observerable 以及 subscription 发生在哪个线程里。而且这些事件都是异步调用的,所以你不需要手动的操作线程。只要专注在组合 Operator 以及处理你的流式事件即可。

当你有两个执行速度差异很大的 Observable 的时候,可能会出现 backpressure 的情况。RxJava 允许你手动处理缓冲,不过一旦缓冲区满载以后,就会出现 backpressure 的错误。

Hot and Cold Observables

Cold observable 只发射那些你 Subscribe 的事件。而 Hot observable 会发射所有事件。

RxAndroid 是基于 Android 的 RxJava 的绑定。有了 RxAndroid 后,你就有能力在 Handler 线程做 Schedule 操作,实现随意的切换线程操作了。

拥抱 Rx 吧,了解全新的编程概念,把玩把玩各个语言版本的函数式编程。操作数据流,专注于逻辑,让你的代码更上一层楼。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Linux Programming Interface

The Linux Programming Interface

Michael Kerrisk / No Starch Press / 2010-11-6 / GBP 79.99

The Linux Programming Interface describes the Linux API (application programming interface)-the system calls, library functions, and other low-level interfaces that are used, directly or indirectly, b......一起来看看 《The Linux Programming Interface》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

html转js在线工具
html转js在线工具

html转js在线工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具