反应式编程实战 - RxJava 2.x 基本模式

栏目: Java · 发布时间: 6年前

内容简介:RxJava 2.x 提供了五种模式,如下所示。创建 Observable 非常容易,我们首先需要创建一个 Observable 作为被观察者,然后在创建一个 Observer 作为观察者,然后通过 subscribe() 进行订阅。我们可以使用 create 创建一个 Observable,它拥有 onNext, onError, onCompleted 方法。其中,onNext用于发射数据项,可以多次调用,每调用一次发射一条数据, onError 或 onCompleted 只能调用一次,onErro
反应式编程实战 - RxJava 2.x 基本模式

RxJava 2.x 提供了五种模式,如下所示。

模式/类型 描述 接口 消费者
Observable 支持 0…N个数据,不支持背压 io.reactivex.Observable Observer
Flowable 支持 0…N个数据 支持背压 io.reactivex.Flowable Subscriber
Single 只支持1个数据 io.reactivex.Single SingleObserver
Completable 不支持数据 io.reactivex.Completable CompletableObserver
Maybe 只支持0或1个数据 io.reactivex.Maybe MaybeObserver

Observable

创建 Observable 非常容易,我们首先需要创建一个 Observable 作为被观察者,然后在创建一个 Observer 作为观察者,然后通过 subscribe() 进行订阅。

public class ObservableDemo {
    public static void main(String[] args) {
        Observable<String> observable = Observable.create(emitter -> {
            emitter.onNext("Hello World");
            emitter.onNext("Hello World");
            emitter.onComplete();
            emitter.onNext("Hello World");
        });

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Observer.onSubscribe");
            }
            @Override
            public void onNext(String s) {
                System.out.println("Observer.onNext: " + s);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("Observer.onError");
            }
            @Override
            public void onComplete() {
                System.out.println("Observer.onComplete");
            }
        };

        observable.subscribe(observer);
    }
}

我们可以使用 create 创建一个 Observable,它拥有 onNext, onError, onCompleted 方法。其中,onNext用于发射数据项,可以多次调用,每调用一次发射一条数据, onError 或 onCompleted 只能调用一次,onError发射错误事件,除非使用 retry() 操作符来截获错误,否则事件流通常会终止。onCompleted 传递一个完成事件,表示不会再发生onNext调用。两者之间互斥,此后就不能再调用该 Observable 的其他方法。

这里,我们也可以改造成链式调用。

public class ObservableDemo2 {
    public static void main(String[] args) {
        Observable.<String>create(emitter -> {
            emitter.onNext("Hello World");
            emitter.onNext("Hello World");
            emitter.onComplete();
            emitter.onNext("Hello World");
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Observer.onSubscribe");
            }
            @Override
            public void onNext(String s) {
                System.out.println("Observer.onNext: " + s);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("Observer.onError");
            }
            @Override
            public void onComplete() {
                System.out.println("Observer.onComplete");
            }
        });
    }
}

阅读 RxJava 2.x 源码 io.reactivex.Observable,我们可以知道 subscribe 具有很多重载的方法。有兴趣的读者,可以深入了解下。

我们可以省略 onComplete(),只实现 onNext() 和 onError()。这将不再对 onComplete() 执行任何操作。我们甚至可以忽略 onError(),只指定 onNext()。但是,不实现 onError() 是在生产环境中应该避免的事情。在事件流的任何地方发生的错误都将传播到 onError() 进行处理,然后终止事件流。如果我们没有为 onError() 指定一个操作,那么该错误将不会处理。当然,如果出现错误,我们可以先尝试使用 retry() 操作符恢复并重新订阅可观察到的数据项。

public final Disposable subscribe()
public final Disposable subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) 
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) 
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)
public final void subscribe(Observer<? super T> observer)

这里,我们简单来了解一下 subscribe(Consumer&lt;? super T> onNext) 的使用吧。

public class ObservableDemo3 {
    public static void main(String[] args) {
        Observable.<String>create(emitter -> {
            emitter.onNext("Hello World");
            emitter.onNext("Hello World");
            emitter.onComplete();
            emitter.onNext("Hello World");
        }).subscribe(System.out::println);
    }
}

注意, onNext, onError, onCompleted 方法不需要直接推送到最终的观察者,它们可以通过 map() 和 filter() 等操作符创建新的 Observable 然后继续发送。

反应式编程实战 - RxJava 2.x 基本模式

Flowable 是唯一支持背压的模式,它的用法与 Observable 非常相似。(关于背压,笔者会在之后的文章中进行讲解。)

public class FlowableDemo {
    public static void main(String[] args) {
        Flowable.<String>create(e -> {
            e.onNext("Hello world!");
            e.onNext("Hello World");
            e.onComplete();
            e.onNext("Hello World");
        }, BackpressureStrategy.MISSING).subscribe(new Subscriber<String>(){
            @Override
            public void onSubscribe(Subscription subscription) {
                System.out.println("Subscriber.onSubscribe");
            }
            @Override
            public void onNext(String s) {
                System.out.println("Subscriber.onNext: " + s);
            }
            @Override
            public void onError(Throwable throwable) {
                System.out.println("Subscriber.onError");
            }
            @Override
            public void onComplete() {
                System.out.println("Subscriber.onComplete");
            }
        });
    }
}

阅读 RxJava 2.x 源码 io.reactivex.Flowable ,我们可以知道 subscribe 也具有很多重载的方法。

public final Disposable subscribe() 
public final Disposable subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Subscription> onSubscribe)
public final void subscribe(FlowableSubscriber<? super T> s)
public final void subscribe(Subscriber<? super T> s)

Single

Single 的工作就像 Observable 一样,但是它只有 onSuccess ?事件和 onError 事件,并且它有自己的 SingleObserver 接口。 onSuccess 整合了 onNextonComplete 事件,因此,这里 onSuccess 只能发送一个数据,换句话说,即使多次发送也不会产生效果。

public class SingleDemo {
    public static void main(String[] args) {
        Single.<String>create(e -> {
            e.onSuccess("success");
            e.onSuccess("success");
        }).subscribe(new SingleObserver<String>(){
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("SingleObserver.onSubscribe");
            }
            @Override
            public void onSuccess(String s) {
                System.out.println("SingleObserver.onSuccess:"+s);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("SingleObserver.onError");
            }
        });
    }
}

从控制台的打印结果可以看出,即使多次发送“success”,但是只会消费一次。

阅读 RxJava 2.x 源码 io.reactivex.Single ,我们可以知道 subscribe 也具有很多重载的方法。

public final Disposable subscribe()
public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable> onCallback)
public final Disposable subscribe(Consumer<? super T> onSuccess)
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError)
public final void subscribe(SingleObserver<? super T> subscriber)

这里,我们简单来了解一下 subscribe(Consumer&lt;? super T> onSuccess) 的使用吧。

public class SingleDemo2 {
    public static void main(String[] args) {
        Single.<String>create(e -> {
            e.onSuccess("success");
        }).subscribe(System.out::println);
    }
}

我们可以通过 toObservable 转换成一个 Observable 对象。

Single.just("success").toObservable().subscribe(System.out::println);

Completable

Completable 不发送数据,只有 onComplete 事件和 onError 事件。

public class CompletableDemo {
    public static void main(String[] args) {
        Completable.create(e -> {
            e.onComplete();
        })
        .subscribe(System.out::println);
    }
}

此外,我们可以通过 complete() 快速创建一个 Completable 对象,它会立即调用 onComplete 事件。

Completable.complete().subscribe(System.out::println);

或者,也可以通过 fromAction()fromRunnable() 在调用 onComplete 事件之前执行指定的操作。

Completable.fromAction(System.out::println).subscribe();

Maybe

Maybe 结合了 Single 和 Completable 特性。Maybe 包含 onSuccessonErroronComplete 事件。 这里, onSuccess 可以发送 0 ~ 1 个数据,换句话说,即使多次发送也不会产生效果。如果调用 onComplete 事件,就会停止发送数据。

public class MaybeDemo {
    public static void main(String[] args) {
        Maybe.<String>create(e -> {
            e.onComplete();
            e.onSuccess("success");
            e.onSuccess("success");
        }).subscribe(System.out::println);
    }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

C++Templates中文版

C++Templates中文版

David Vandevoorde、Nicolai M.Josuttis / 陈伟柱 / 人民邮电出版社 / 2008-2 / 69.00元

本书是C++模板编程的完全指南,旨在通过基本概念、常用技巧和应用实例3方面的有用资料,为读者打下C++模板知识的坚实基础。 全书共22章。第1章全面介绍了本书的内容结构和相关情况。第1部分(第2~7章)以教程的风格介绍了模板的基本概念,第2部分(第8~13章)阐述了模板的语言细节,第3部分(第14~18章)介绍了C++模板所支持的基本设计技术,第4部分(第19~22章)深入探讨了各种使用模板......一起来看看 《C++Templates中文版》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具