RxJava源码阅读(一)

栏目: Java · 发布时间: 6年前

内容简介:这里被观察者其实就是这里的观察者就是一个接口,只有一个被观察者通过
Observable.just("测试")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {
                        System.out.println(str);
                    }
                });
复制代码
Observable.just()
subscribe
accept

2.创建被观察者

public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
复制代码
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
}
复制代码

这里被观察者其实就是 new ObservableJust<T>(item)

3.创建观察者

这里的观察者就是一个接口,只有一个 accept 方法。被观察者发起订阅后,观察者会回调 accept 方法,将被观察者发送的数据拿到

4.发起订阅

被观察者通过 subscribe 方法订阅观察者

public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
复制代码
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
}
复制代码
public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
}
复制代码

最终调用抽象方法

protected abstract void subscribeActual(Observer<? super T> observer);
复制代码

也就是 Observable 子类 ObservableJust 调用了这个方法,参数是 LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);ls

protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
}
复制代码

订阅方法最终执行了 LambdaObserveronSubscribe 方法和 ScalarDisposablerun 方法

4.1 LambdaObserveronSubscribe 方法

public void onSubscribe(Disposable s) {
        if (DisposableHelper.setOnce(this, s)) {
            try {
                onSubscribe.accept(this);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.dispose();
                onError(ex);
            }
        }
}
复制代码

这里解释了为什么发起订阅后就会回调观察者的 accept 方法

4.2 ScalarDisposablerun 方法

public void run() {
        if (get() == START && compareAndSet(START, ON_NEXT)) {
            observer.onNext(value);
            if (get() == ON_NEXT) {
                lazySet(ON_COMPLETE);
                observer.onComplete();
            }
        }
}
复制代码

直接结束此次订阅


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Computational Geometry

Computational Geometry

Mark de Berg、Otfried Cheong、Marc van Kreveld、Mark Overmars / Springer / 2008-4-16 / USD 49.95

This well-accepted introduction to computational geometry is a textbook for high-level undergraduate and low-level graduate courses. The focus is on algorithms and hence the book is well suited for st......一起来看看 《Computational Geometry》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具