反应式编程实战 - RxJava 2.x 基本模式

栏目: Java · 发布时间: 6年前

内容简介:RxJava 2.x 提供了五种模式,如下所示。创建 Observable 非常容易,我们首先需要创建一个 Observable 作为被观察者,然后在创建一个 Observer 作为观察者,然后通过 subscribe() 进行订阅。我们可以使用 create 创建一个 Observable,它拥有 onNext, onError, onCompleted 方法。其中,onNext用于发射数据项,可以多次调用,每调用一次发射一条数据, onError 或 onCompleted 只能调用一次,onErro
反应式编程实战 - RxJava 2.x 基本模式

RxJava 2.x 提供了五种模式,如下所示。

模式/类型 描述 接口 消费者
Observable 支持 0…N个数据,不支持背压 io.reactivex.Observable Observer
Flowable 支持 0…N个数据 支持背压 io.reactivex.Flowable Subscriber
Single 只支持1个数据 io.reactivex.Single SingleObserver
Completable 不支持数据 io.reactivex.Completable CompletableObserver
Maybe 只支持0或1个数据 io.reactivex.Maybe MaybeObserver

Observable

创建 Observable 非常容易,我们首先需要创建一个 Observable 作为被观察者,然后在创建一个 Observer 作为观察者,然后通过 subscribe() 进行订阅。

public class ObservableDemo {
    public static void main(String[] args) {
        Observable<String> observable = Observable.create(emitter -> {
            emitter.onNext("Hello World");
            emitter.onNext("Hello World");
            emitter.onComplete();
            emitter.onNext("Hello World");
        });

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Observer.onSubscribe");
            }
            @Override
            public void onNext(String s) {
                System.out.println("Observer.onNext: " + s);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("Observer.onError");
            }
            @Override
            public void onComplete() {
                System.out.println("Observer.onComplete");
            }
        };

        observable.subscribe(observer);
    }
}

我们可以使用 create 创建一个 Observable,它拥有 onNext, onError, onCompleted 方法。其中,onNext用于发射数据项,可以多次调用,每调用一次发射一条数据, onError 或 onCompleted 只能调用一次,onError发射错误事件,除非使用 retry() 操作符来截获错误,否则事件流通常会终止。onCompleted 传递一个完成事件,表示不会再发生onNext调用。两者之间互斥,此后就不能再调用该 Observable 的其他方法。

这里,我们也可以改造成链式调用。

public class ObservableDemo2 {
    public static void main(String[] args) {
        Observable.<String>create(emitter -> {
            emitter.onNext("Hello World");
            emitter.onNext("Hello World");
            emitter.onComplete();
            emitter.onNext("Hello World");
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Observer.onSubscribe");
            }
            @Override
            public void onNext(String s) {
                System.out.println("Observer.onNext: " + s);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("Observer.onError");
            }
            @Override
            public void onComplete() {
                System.out.println("Observer.onComplete");
            }
        });
    }
}

阅读 RxJava 2.x 源码 io.reactivex.Observable,我们可以知道 subscribe 具有很多重载的方法。有兴趣的读者,可以深入了解下。

我们可以省略 onComplete(),只实现 onNext() 和 onError()。这将不再对 onComplete() 执行任何操作。我们甚至可以忽略 onError(),只指定 onNext()。但是,不实现 onError() 是在生产环境中应该避免的事情。在事件流的任何地方发生的错误都将传播到 onError() 进行处理,然后终止事件流。如果我们没有为 onError() 指定一个操作,那么该错误将不会处理。当然,如果出现错误,我们可以先尝试使用 retry() 操作符恢复并重新订阅可观察到的数据项。

public final Disposable subscribe()
public final Disposable subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) 
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) 
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)
public final void subscribe(Observer<? super T> observer)

这里,我们简单来了解一下 subscribe(Consumer&lt;? super T> onNext) 的使用吧。

public class ObservableDemo3 {
    public static void main(String[] args) {
        Observable.<String>create(emitter -> {
            emitter.onNext("Hello World");
            emitter.onNext("Hello World");
            emitter.onComplete();
            emitter.onNext("Hello World");
        }).subscribe(System.out::println);
    }
}

注意, onNext, onError, onCompleted 方法不需要直接推送到最终的观察者,它们可以通过 map() 和 filter() 等操作符创建新的 Observable 然后继续发送。

反应式编程实战 - RxJava 2.x 基本模式

Flowable 是唯一支持背压的模式,它的用法与 Observable 非常相似。(关于背压,笔者会在之后的文章中进行讲解。)

public class FlowableDemo {
    public static void main(String[] args) {
        Flowable.<String>create(e -> {
            e.onNext("Hello world!");
            e.onNext("Hello World");
            e.onComplete();
            e.onNext("Hello World");
        }, BackpressureStrategy.MISSING).subscribe(new Subscriber<String>(){
            @Override
            public void onSubscribe(Subscription subscription) {
                System.out.println("Subscriber.onSubscribe");
            }
            @Override
            public void onNext(String s) {
                System.out.println("Subscriber.onNext: " + s);
            }
            @Override
            public void onError(Throwable throwable) {
                System.out.println("Subscriber.onError");
            }
            @Override
            public void onComplete() {
                System.out.println("Subscriber.onComplete");
            }
        });
    }
}

阅读 RxJava 2.x 源码 io.reactivex.Flowable ,我们可以知道 subscribe 也具有很多重载的方法。

public final Disposable subscribe() 
public final Disposable subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Subscription> onSubscribe)
public final void subscribe(FlowableSubscriber<? super T> s)
public final void subscribe(Subscriber<? super T> s)

Single

Single 的工作就像 Observable 一样,但是它只有 onSuccess ?事件和 onError 事件,并且它有自己的 SingleObserver 接口。 onSuccess 整合了 onNextonComplete 事件,因此,这里 onSuccess 只能发送一个数据,换句话说,即使多次发送也不会产生效果。

public class SingleDemo {
    public static void main(String[] args) {
        Single.<String>create(e -> {
            e.onSuccess("success");
            e.onSuccess("success");
        }).subscribe(new SingleObserver<String>(){
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("SingleObserver.onSubscribe");
            }
            @Override
            public void onSuccess(String s) {
                System.out.println("SingleObserver.onSuccess:"+s);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("SingleObserver.onError");
            }
        });
    }
}

从控制台的打印结果可以看出,即使多次发送“success”,但是只会消费一次。

阅读 RxJava 2.x 源码 io.reactivex.Single ,我们可以知道 subscribe 也具有很多重载的方法。

public final Disposable subscribe()
public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable> onCallback)
public final Disposable subscribe(Consumer<? super T> onSuccess)
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError)
public final void subscribe(SingleObserver<? super T> subscriber)

这里,我们简单来了解一下 subscribe(Consumer&lt;? super T> onSuccess) 的使用吧。

public class SingleDemo2 {
    public static void main(String[] args) {
        Single.<String>create(e -> {
            e.onSuccess("success");
        }).subscribe(System.out::println);
    }
}

我们可以通过 toObservable 转换成一个 Observable 对象。

Single.just("success").toObservable().subscribe(System.out::println);

Completable

Completable 不发送数据,只有 onComplete 事件和 onError 事件。

public class CompletableDemo {
    public static void main(String[] args) {
        Completable.create(e -> {
            e.onComplete();
        })
        .subscribe(System.out::println);
    }
}

此外,我们可以通过 complete() 快速创建一个 Completable 对象,它会立即调用 onComplete 事件。

Completable.complete().subscribe(System.out::println);

或者,也可以通过 fromAction()fromRunnable() 在调用 onComplete 事件之前执行指定的操作。

Completable.fromAction(System.out::println).subscribe();

Maybe

Maybe 结合了 Single 和 Completable 特性。Maybe 包含 onSuccessonErroronComplete 事件。 这里, onSuccess 可以发送 0 ~ 1 个数据,换句话说,即使多次发送也不会产生效果。如果调用 onComplete 事件,就会停止发送数据。

public class MaybeDemo {
    public static void main(String[] args) {
        Maybe.<String>create(e -> {
            e.onComplete();
            e.onSuccess("success");
            e.onSuccess("success");
        }).subscribe(System.out::println);
    }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Ajax模式与最佳实践

Ajax模式与最佳实践

Christian Gross / 李锟、张祖良、蔡毅、赵泽欣 / 电子工业出版社 / 2007-3 / 49.80元

Ajax 正在将我们带入到下一代的网络应用中。 本书深入探讨了动态的网络应用,将Ajax和REST集成在一起作为单独的解决方案。一个很大的优势是,与Ajax相似,REST可以和现今存在的技术一起使用。现在上百万的客户端计算机都是基于Ajax的,上百万的服务器是基于REST的。   无论你是否已经开发过Ajax应用程序,这都是一本理想的书。因为这本书描述了各种各样的模式和最好的实践经验。通过此......一起来看看 《Ajax模式与最佳实践》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试