内容简介:这里被观察者其实就是这里的观察者就是一个接口,只有一个被观察者通过
Observable.just("测试") .subscribe(new Consumer<String>() { @Override public void accept(String str) throws Exception { System.out.println(str); } }); 复制代码
Observable.just() subscribe accept
2.创建被观察者
public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "The item is null"); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); } 复制代码
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } 复制代码
这里被观察者其实就是 new ObservableJust<T>(item)
3.创建观察者
这里的观察者就是一个接口,只有一个 accept
方法。被观察者发起订阅后,观察者会回调 accept
方法,将被观察者发送的数据拿到
4.发起订阅
被观察者通过 subscribe
方法订阅观察者
public final Disposable subscribe(Consumer<? super T> onNext) { return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } 复制代码
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; } 复制代码
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } 复制代码
最终调用抽象方法
protected abstract void subscribeActual(Observer<? super T> observer); 复制代码
也就是 Observable
子类 ObservableJust
调用了这个方法,参数是 LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
的 ls
protected void subscribeActual(Observer<? super T> s) { ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value); s.onSubscribe(sd); sd.run(); } 复制代码
订阅方法最终执行了 LambdaObserver
的 onSubscribe
方法和 ScalarDisposable
的 run
方法
4.1 LambdaObserver
的 onSubscribe
方法
public void onSubscribe(Disposable s) { if (DisposableHelper.setOnce(this, s)) { try { onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); onError(ex); } } } 复制代码
这里解释了为什么发起订阅后就会回调观察者的 accept
方法
4.2 ScalarDisposable
的 run
方法
public void run() { if (get() == START && compareAndSet(START, ON_NEXT)) { observer.onNext(value); if (get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); } } } 复制代码
直接结束此次订阅
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【源码阅读】AndPermission源码阅读
- 【源码阅读】Gson源码阅读
- 如何阅读Java源码 ,阅读java的真实体会
- 我的源码阅读之路:redux源码剖析
- JDK源码阅读(六):HashMap源码分析
- 如何阅读源码?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Django企业开发实战
胡阳 / 人民邮电出版社 / 2019-2 / 99.00元
本书以博客系统贯穿始末,介绍了Django的方方面面。书中共分四部分,第一部分介绍了正式进入编码之前的准备工作,内容包括需求分析、基础知识和Demo系统的开发;第二部分开始实现需求,内容涉及环境配置、编码规范以及项目结构规划,编写了Model层、admin页面、Form代码和View逻辑,引入了Bootstrap框架;第三部分重点介绍xadmin、django-autocomple-light和d......一起来看看 《Django企业开发实战》 这本书的介绍吧!