RxJava 是如何实现线程切换的(上)

栏目: Java · 发布时间: 6年前

内容简介:RxJava 是如何实现线程切换的(上)

通过前一篇的 从观察者模式出发,聊聊RxJava ,我们大致理解了RxJava的实现原理,在RxJava中可以非常方便的实现不同线程间的切换。subscribeOn 用于指定上游线程,observeOn 用于指定下游线程,多次用 subscribeOn 指定上游线程只有第一次有效,多次用 observeOn 指定下次线程,每次都有效;简直太方便了,比直接使用Handler省了不少力气,同时也不用去关注内存泄漏的问题了。本篇就来看看在RxJava中上游是如何实现线程切换。

RxJava 基础原理

为了方便后面的叙述,这里通过下面的UML图简单回顾一下上一篇的内容。

RxJava 是如何实现线程切换的(上)

此图并没有完整的展现图中各个接口和类之间的各种关系,因为那样会导致整个图错综复杂,不便于查看,这里只绘制出了RxJava各个类之间核心关系网络

从上面的UML图中可以看出,具体的实现类只有ObservableCreate和CreateEmitter。CreateEmitter是ObservableCreate的内部类(PlantUML 怎么绘制内部类,没搞懂,玩的转的同学请赐教呀( ^▽^ ))。

上篇说过Observable创建的过程,可以简化如下:

Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())

结合图可以更直观的体现出这一点。ObservableCreate 内部持有ObservableOnSubscribe的引用。

当观察者订阅主题后:

mObservable.subscribe(mObserver);

ObservableCreate 中的subscribeActual()方法就会执行,

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

在这个过程中会创建CreateEmitter 的实例,而这个CreateEmitter实现了Emitter和Disposable接口,同时又持有Observer的引用(当然这个引用是ObservableCreate传递给他的)。 接着就会执行ObservableOnSubscribe的subscribe 方法 ,方法的参数即为刚刚创建的CreateEmitter 的实例,接着一系列连锁反应,Emitter 接口中的方法(onNext,onComplete等)开始执行,在CreateEmitter内部,Observer接口中对应的方法依次执行,这样就实现了一次从主题(上游)到观察者(下游)的事件传递。

source.subscribe(parent)

这里的 source 是ObservableOnSubscribe的实例,parent是CreateEmitter的实例。上面加粗文本叙述的内容,就是这行代码,可以说这是整个订阅过程最核心的实现。

好了,回顾完基础知识后,马上进入正题,看看RxJava是如何实现线程切换的。

RxJava 之 subscribeOn

我们知道正常情况下,所有的内容都是在主线程执行,既然这里提到了线程切换,那么必然是切换到了子线程,因此,这里需要关注线程的问题,我们就带着下面这几个问题去阅读代码。

  • 1. 是哪个对象在什么时候创建了子线程,是一种怎样的方式创建的?
  • 2. 子线程又是如何启动的?
  • 3. 上游事件是怎么跑到子线程里执行的?
  • 4. 多次用 subscribeOn 指定上游线程为什么只有第一次有效 ?

示例

首先看一下,日常开发中实现线程切换的具体实现

private void multiThread() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("This msg from work thread :" + Thread.currentThread().getName());
                sb.append("\nsubscribe: currentThreadName==" + Thread.currentThread().getName());
            }
        })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "accept: s= " + s);
                    }
                });
    }

这段代码,使用过RxJava的同学再熟悉不过了,上游事件会在一个名为 RxNewThreadScheduler-1 的线程执行,下游线程会切换回我们熟悉的Android UI线程。

我们就从subscribeOn(Schedulers.newThread()) 出发,看看这个代码的背后,到底发生了什么。

RxJava 是如何实现线程切换的(上)

subscribeOn

这里我们先不管Schedulers.newThread() 是什么鬼,首先看看这个subscribeOn()方法。

Observable.java--- subscribeOn(Scheduler scheduler)

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

可以看到,这个方法需要一个Scheduler 类型的参数。

RxJavaPlugins.java--- onAssembly(@NonNull Observable source)

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

O(∩_∩)O哈哈~,是不是觉得似曾相识,和create操作符一个套路呀。因此,observeOn也可以简化如下:

new ObservableSubscribeOn<T>(this, Schedulers.newThread());

这里你也许会有疑问,这个this是什么呢?其实这个this就是Observable,具体到上面的代码来说就是ObservableCreate,总之就是一个具体的Observable。

接着看ObservableSubscribeOn 这个类

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
}

看一下 AbstractObservableWithUpstream.java

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

再看一下 HasUpstreamObservableSource.java

/**
 * Interface indicating the implementor has an upstream ObservableSource-like source available
 * via {@link #source()} method.
 *
 * @param <T> the value type
 */
public interface HasUpstreamObservableSource<T> {
    /**
     * Returns the upstream source of this Observable.
     * <p>Allows discovering the chain of observables.
     * @return the source ObservableSource
     */
    ObservableSource<T> source();
}

饶了半天,ObservableSubscribeOn 原来和上一篇说的ObservableCreate一样,也是Observable的一个子类。只不过比ObservableCreate多实现了一个接口HasUpstreamObservableSource,这个接口很有意思,他的source()方法返回类型是ObservableSource(还记得这个类的角色吗?)。也就是说ObservableSubscribeOn这个Observable是一个拥有上游的Observable。他有一个非常关键的属性source,这个source就代表了他的上游。

我们接着看ObservableSubscribeOn的具体实现。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
		// observer 调用onSubscribe方法,获取上游的控制权
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}
  • 首先看他的构造函数,参数source就是我们之前提到过的this,scheduler就是Schedulers.newThread()。同时调用了父类AbstractObservableWithUpstream的构造函数,这里结合之前的结论,我们可以确定通过这个构造函数,就创建出来了一个包含上游的ObservableSubscribeOn实例。
  • 再看实现订阅关系的关键方法subscribeActual(),在这里创建了一个SubscribeOnObserver的实例,SubscribeOnObserver 是AtomicReference的子类(保证原子性),同时实现了 Observer接口 和 Disposable 接口;你可以把他理解成一个Observer。

我们之前说过,subscribeActual()是实现上下游之间订阅关系的重要方法。因为只有真正实现了订阅关系,上下游之间才能连接起来。我们看这个方法的最后一句代码。

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

这句代码,可以说就是非常关键,因为从这里开始了一系列的连锁反应。首先看一下SubscribeTask

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

看到这句 source.subscribe(parent) ,是不是觉得似曾相识呢?

SubscribeTask 实现了是Runnable接口,在其run方法中,定义了一个需要在线程中执行的任务。按照类的继承关系,很明显source 就是ObservableSubscribeOn 的上游Observable,parent是一个Observer。也就是说这个run方法要执行的内容就是实现ObservableSubscribeOn的上游和Observer的订阅。 一旦某个线程执行了这个Runnable(SubscribeTask),就会触发了这个run方法,从而实现订阅 ,而一旦这个订阅实现,那么后面的流程就是上节所说的事情了。

这里可以解答第三个问题了,上游事件是怎么给弄到子线程里去的,这里很明显了,就是直接把订阅方法放在了一个Runnable中去执行,这样就一旦这个Runnable在某个子线程执行,那么上游所有事件只能在这个子线程中执行了。

好了,线程要执行的任务 似乎 创建完了,下面就接着找看看子线程是怎么创建的。回过头继续看刚才的方法,

scheduler.scheduleDirect(new SubscribeTask(parent))

Scheduler.java----scheduleDirect

public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }


    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
		// 对run进行了一次装饰
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

	@NonNull
	// 抽象方法
    public abstract Worker createWorker();

首先看一下Worker类

/**
     * Sequential Scheduler for executing actions on a single thread or event loop.
     * <p>
     * Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup.
     */
    public abstract static class Worker implements Disposable {
  
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }

  
        @NonNull
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

        
    }

Worker是Scheduler内部的一个静态抽象类,实现了Disposable接口,其schedule()方法也是抽象的。

再看一下DisposeTask

static final class DisposeTask implements Runnable, Disposable {
        final Runnable decoratedRun;
        final Worker w;

        Thread runner;

        DisposeTask(Runnable decoratedRun, Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

        @Override
        public void dispose() {
            if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                ((NewThreadWorker)w).shutdown();
            } else {
                w.dispose();
            }
        }

        @Override
        public boolean isDisposed() {
            return w.isDisposed();
        }
    }

DisposeTask 又是一个Runnable,同时也实现了Disposable接口。可以看到在他的run方法中会执行decoratedRun的run方法,这个decoratedRun其实就是参数中传递进来的run, 也就是说,执行了这个DisposeTask的run方法,就会触发SubscribeTask中的run方法 ,因此,我们就要关注是谁执行了这个DisposeTask。

回到scheduleDirect()方法

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
		// 对run进行了一次装饰
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

scheduleDirect()方法的实现我们总结一下:

  1. 创建一个Worker对象w,而在Scheduler类中createWorker()方法被定义为抽象方法,因此我们需要去Scheduler的具体实现中了解这个Worker的具体实现。
  2. 对参数run通过RxJavaPlugins进行一次装饰,生成一个decoratedRun的Runnable(通过源码可以发现,其实什么也没干,就是原样返回)
  3. 通过decoratedRun和w生成一个DisposeTask对象task
  4. 通过Worker的schedule方法开始执行这个task。

ε=(´ο`*)))唉,说了这么久,子线程是如何创建的依然不清楚,无论是SubscribeTask还是DisposeTask只是定义会在某个子线程中执行的任务,并不代表子线程已被创建。但是通过以上代码,我们也可以收获一些有价值的结论:

  • 最终的Runnable任务,将由某个具体的Worker对象的scheduler()方法执行。
  • 这个scheduleDirect会返回一个Disposable对象,这样我们就可以通过Observer去控制整个上游的执行了。

好了,到这里对于subscribeOn()方法的分析已经到了尽头,我们找了最终需要运行子任务的对象Worker,而这个Worker是个抽象类,因此我们需要关注Worker的具体实现了。

下面我们就从刚才丢下的Schedulers.newThread() 换个角度来分析,看看能不能找到这个Worker的具体实现。

Schedulers.newThread()

前面说了subscribeOn()方法需要一个Scheduler 类型的参数,然而通过前面的分析我们知道Scheduler是个抽象类,是无法被实例化的。因此,这里就从Schedulers类出发。

/**
 * Static factory methods for returning standard Scheduler instances.
 */
public final class Schedulers {
}

注释很清楚,这个Schedulers就是一个用于生成Scheduler实例的静态工厂。

下面我们就来看看,在这个工厂中newThread() 生成了一个什么样的Scheduler实例。

@NonNull
    public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }

	NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());

    static final class NewThreadTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return NewThreadHolder.DEFAULT;
        }
    }

    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }

newThread() 方法经过层层委托处理(最终的创建方式,有点单例模式的意味),最终我们需要的就是一个NewThreadScheduler的实例。

NewThreadScheduler.java

public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}

不出所料NewThreadScheduler 是Scheduler的一个子类,在他的静态代码块中构造了一个Priority=5的线程工厂。而在我们最最关注的 createWorker()方法中他又用这个线程工厂创建了一个NewThreadWorker 的实例 。下面就让我们看看最终的NewThreadWorker 做了些什么工作。

NewThreadWorker.java(节选关键内容)

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable run) {
        return schedule(run, 0, null);
    }

    

    @Override
    public void dispose() {
        if (!disposed) {
            disposed = true;
            executor.shutdownNow();
        }
    }

}

众里寻他千百度,终于找到了Worker的实现了,同时再一次不出所料的又一次实现了Disposable接口,o(╥﹏╥)o。

在其构造函数中,通过NewThreadScheduler中提供的线程工厂threadFactory创建了一个ScheduledExecutorService。

ScheduledExecutorService.java ---create

public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        }
        return exec;
    }

用大名鼎鼎的Executors(Executor的 工具 类),创建了一个核心线程为1的线程。

至此,我们终于找到了第一个问题的答案,子线程是谁如何创建的;在NewThreadScheduler的createWorker()方法中,通过其构建好的线程工厂,在Worker实现类的构造函数中创建了一个ScheduledExecutorService的实例,是通过SchedulerPoolFactory创建的。

同时可以看到,通过执行dispose 方法,可以使用ScheduledExecutorService的shutdown()方法,停止线程的执行。

线程已经创建好了,下面就来看看到底是谁启动了这个线程。前面我们说过,Worker的schedule()方法如果执行了,就会执行我们定义好的Runnable,通过这个Runnable中run方法的执行,就可以实现上下游订阅关系。下面就来看看这个scheduler()方法。

@NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }

    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

到这里,已经很明显了,在schedulerActual方法中,会通过刚才创建好的子线程对象executor通过submit或schedule执行一个Runnable任务(虽然这个Runnable对象再一次经过了各种装饰和包装,但其本质没有发生变化),并将执行结果封装后返回。而这个Runnable对象追根溯源来说,就是我们在ObservableSubscribeOn类中创建的一个SubscribeTask对象。因此,当这个子线程开始运行的时候就是执行SubscribeTask中run()方法的时机;一旦这个run方法执行,那么

source.subscribe(parent)

这句最关键的代码就开始执行了,一切的一切又回到了我们上一篇那熟悉的流程了。

好了,按照上面的流程捋下来,感觉还是有点分散,那么就用UML图看看整体的结构。

RxJava 是如何实现线程切换的(上)

我们看最下面的ObservableSubscribeOn, 他是subscribeOn 返回的Observable对象,他持有一个Scheduler 实例的引用,而这个Scheduler实例就是NewThreadScheduler(即Schedulers.newThreade())的一个实例。ObservableSubscribeOn 的subscribeActual方法,会触发NewThreadScheduler去执行SubscribeTask中定义的任务,而这个具体的任务又将由Worker类创建的子线程去执行。这样就把上游事件放到了一个子线程中实现。

至于最后一个问题, 多次用 subscribeOn 指定上游线程为什么只有第一次有效? ,看完通篇其实也很好理解了,因为上游Observable只有一个任务,就是subscribe(准确的来说是subscribeActual()),而subscribeOn 要做的事情就是把上游任务切换到一个指定线程里,那么一旦被切换到了某个指定的线程里,后面的切换不就是没有意义了吗。

好了,至此上游事件切换到子线程的过程我们就明白了。下游事件又是如何切换的且听下回分解,本来想一篇写完的,结果发现越写越多,只能分成两篇了!!!o(╯□╰)o。

写在后面的话

关于Disposable

在RxJava的分析中,我们经常会遇到Disposable这个单词,确切的说是接口,这里简单说一说这个接口。

/**
 * Represents a disposable resource.
 */
public interface Disposable {
    void dispose();
    boolean isDisposed();
}

我们知道,在 Java 中,类实现某个接口,通俗来说就是代表这个类多了一项功能,比如一个类实现Serializable接口,代表这个类是可以序列化的。这里Disposable也是代表一种能力,这个能力就是Disposable,就是代表一次性的,用后就丢弃的,比如一次性筷子,还有那啥。

在RxJava中很多类都实现了这个接口,这个接口有两个方法,isDisposed()顾名思义返回当前类是否被抛弃,dispose()就是主动抛弃。因此,所有实现了这个接口的类,都拥有了这样一种能力,就是可以判断自己是否被抛弃,同时也可以主动抛弃自己。

上一篇我们说了,Observer通过onSubscribe(@NonNull Disposable d),会获得一个Disposable,这样就有能力控制上游的事件发送了。这样,我们就不难理解,为什么那么多类实现了这个接口,因为下游获取到的是一个拥有Disposable的对象,而一旦拥有了一个这样的对象,那么就可以通过下游控制上游了。可以说,这是RxJava对常规的观察者模式所做的最给力的改变。

关于各种ObservableXXX ,subscribeXXX,ObserverXXX

在查看RxJava的源码时,可能很多人都和我一样,有一个巨大的困扰,就是这些类的名字好他妈难记,感觉长得都差不多,关键念起来好像也差不多。但其实本质上来说,RxJava对类的命名还是非常规范的,只是我们不太习惯而已。按照英文单词翻译:

  • Observable 可观察的
  • Observer 观察者
  • Subscribe 订阅

其实就这么三个主语,其他的什么ObservableCreate,ObservableSubscribeOn,AbstractObservableWithUpstream,还有上面提到的Disposable,都是对各种各样的Observable和Observer的变形和修饰结果,只要理解这个类的核心含义是什么,就不会被这些名字搞晕了。

RxJava 可以说是博大精深,以上所有分析完全是个人平时使用时的总结与感悟,有任何错误之处,还望各位读者提出,共同进步。

关于RxJava 这里墙裂推荐一篇文章 一篇不太一样的RxJava介绍 ,感觉是自扔物线那篇之后,对RxJava思想感悟最深的一篇了。对RxJava 有兴趣的同学,可以多度几遍,每次都会有收获!!


以上所述就是小编给大家介绍的《RxJava 是如何实现线程切换的(上)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Design Accessible Web Sites

Design Accessible Web Sites

Jeremy Sydik / Pragmatic Bookshelf / 2007-11-05 / USD 34.95

It's not a one-browser web anymore. You need to reach audiences that use cell phones, PDAs, game consoles, or other "alternative" browsers, as well as users with disabilities. Legal requirements for a......一起来看看 《Design Accessible Web Sites》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

MD5 加密
MD5 加密

MD5 加密工具