rxswift

栏目: Swift · 发布时间: 6年前

内容简介:我们在编程中经常处理各种事件,各种事件也和我们的业务逻辑紧密相关,比如点击了这个btn,我该执行什么逻辑等等、我该用代理、通知、还是target-action来传递我们的事件、我该在main queue还是global queue上执行任务等等。在rxswift看来,无论是什么场景,这里面有一个不变的东西,那就是消息的生产、消息的发送、消息的处理、资源的销毁。我们以前实际上是用业务逻辑去包裹了这一套东西。rxswift认为这样实际上是做反了,我们应该以消息的流动去包裹业务逻辑,而不应该用业务逻辑去包裹消息

我们在编程中经常处理各种事件,各种事件也和我们的业务逻辑紧密相关,比如点击了这个btn,我该执行什么逻辑等等、我该用代理、通知、还是target-action来传递我们的事件、我该在main queue还是global queue上执行任务等等。在rxswift看来,无论是什么场景,这里面有一个不变的东西,那就是消息的生产、消息的发送、消息的处理、资源的销毁。

我们以前实际上是用业务逻辑去包裹了这一套东西。rxswift认为这样实际上是做反了,我们应该以消息的流动去包裹业务逻辑,而不应该用业务逻辑去包裹消息的流动。因为消息的流动是不变的,业务逻辑是变的,根据封装 变化的原则,我们用一个抽象的叫做Observable的东西来封装不同的业务逻辑。然后用Observerbal、Observer、disposable、scheduler等实现一套通用的消息处理机制。这样就能从各种代理、通知、target-action不同的消息处理方式中解放出来,使用同一的Observerable、Observer来实现。而且rx是一整套的,所以如果你切换到java,c++等其他语言,你也可以不用对每种语言都了解线程、传值、通知等,就能直接使用顶层的抽象。ok,按照这样的理解,我们来看看rxswift中的几个比较重要的类。

Observable

消息的流动是从消息的生成开始。而Observable就是代表这个抽象。不过呢,rxswift不止把消息生成的任务交给了Observable,还将消息发送的功能交给了Observable,也就是subscribe函数。

public protocol ObservableType:ObservableConvertibleType{
 //自身将消息交给Observer,让他去处理,处理后返回一个disposable用于取消或者释放资源
 func subscribe<O: ObserverType>(_observer: O) -> Disposable where O.E == E
}

Observer

消息流动过程中消息的处理者。Observer就代表这个抽象。

//Observable发送Event,Observer通过on函数接收,然后执行自己的处理逻辑。
public protocol ObserverType{
    associatedtype E
    func on(_event: Event<E>)
}

Disposable

如果调用了disposable的dispose方法,则代表了,任务已经完成或者取消执行,然后进行资源的清理。disposable有时会很复杂,因为他要清理资源,而我们的Observable可能是由多个Observable按照某种约定生成的,所以最终的disposable需要负责dispose所有的disposable。当然如果情况简单,可以创建一个简单的disposable。

public protocol Disposable{
    /// Dispose resource.
    func dispose()
}

scheduler

提供了统一的方式让我们在不同线程执行任务。MainQueue, GloableQueue…

Producer

producer是实现了Observable协议的一个基类。他重写了subscribe方法,处理了isScheduleRequired的问题,然后将具体的消息发送交给了run方法。

class Producer<Element> :Observable<Element>{
    override init() {
        super.init()
    }
    
    override func subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                print(Thread.current)
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
    
    func run<O : ObserverType>(_observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }
}

读过一遍之后,我们可能会有这几个疑问:

  • 为什么要去判断isScheduleRequired?
  • 为什么需要创建SinkDisposer, 用它来包裹两个其他disposable?
  • 这两个disposable究竟是什么?

为什么要去判断isScheduleRequired?

要解释这个问题,我们的先了解什么是CurrentThreadScheduler ,他是拿来干啥的?

CurrentThreadScheduler抽象了在当前线程执行某个任务的这样一个功能。非常重要的一点是,他是一个 Serail scheduler 。也就是说我是串行执行的,因为我们可能在其他不同线程上dispatch了一些任务到这个线程。那要怎么保证串行执行呢?恩,我们定义一个pthread_key 。如果当前线程有任务再执行,那么我们在当前的线程中设置一个specific存储这个key和value。任务执行完成后,我们清除这个标记。当有其他任务一起进来时,会先判断有没有这个key,有的话表示线程正在执行任务,其他任务会被放到一个Queue中缓存起来,在当前任务执行完成之后执行。所以 isScheduleRequired 其实是一个防止并发的标记。我们来看看代码:

public static fileprivate(set) var isScheduleRequired: Bool {
    get {
        //从线程中获取key的值,如果是首次进入,那么会为nil,isScheduleRequired也就是true
        return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
    }
    set(isScheduleRequired) {
        if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
            rxFatalError("pthread_setspecific failed")
        }
    }
}
  public func schedule<StateType>(_state: StateType, action: @escaping(StateType) -> Disposable) -> Disposable {
      //如果当前没有任务在执行,isScheduleRequired = true
      if CurrentThreadScheduler.isScheduleRequired {
          //将其设置为false,就是设置 pthread_setspecific
          CurrentThreadScheduler.isScheduleRequired = false
          //执行任务,拿到disposable
          let disposable = action(state)

          defer {
              //任务执行完毕,将isScheduleRequired设置为true
              CurrentThreadScheduler.isScheduleRequired = true
              CurrentThreadScheduler.queue = nil
          }
          //如果当前任务执行完了,queue里也没有什么东西,退出
          guard let queue = CurrentThreadScheduler.queue else {
              return disposable
          }
	//queue里还有缓存的schduledItem, 遍历执行
          while let latest = queue.value.dequeue() {
              if latest.isDisposed {
                  continue
              }
              latest.invoke()
          }

          return disposable
      }
	//isScheduleRequired为false时执行这部分代码
      let existingQueue = CurrentThreadScheduler.queue

      let queue: RxMutableBox<Queue<ScheduledItemType>>
      if let existingQueue = existingQueue {
          queue = existingQueue
      }
      else {
          queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
          CurrentThreadScheduler.queue = queue
      }
//构建一个ScheduledItem入队
      let scheduledItem = ScheduledItem(action: action, state: state)
      queue.value.enqueue(scheduledItem)

      return scheduledItem
  }

好了,现在我们知道Producer处理了通用情况,在当前线程执行任务,并且通过isScheduleRequired处理了并发的情况。

为什么需要创建SinkDisposer, 用它来包裹两个其他disposable?

我的理解是用于取消。在subscribe过程中,我们可能创建许多的disposable,但是假如我们要取消,怎么办呢?没错,所有disposable都调用dispose就行啦。好的,那总有一个东西记录所有的disposable吧,没错,于是我们定义了SinkDisposable,他包裹了两个disposable。在取消时,调用sinkdisposable的dispose就行了,sinkdisposable又会去调用sink和subscribe返回的disposable的dispose。这样所有都释放完了,我们不用知道这个过程中生成了那些disposable。

///初始化一个SinkDisposer
let disposer = SinkDisposer()
//调用run,得到一个sink和subscription
let sinkAndSubscription = self.run(observer, cancel: disposer)
//将sink和subscription设置到disposer里
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
///sinkdisposer的dispose
func dispose() {
    let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)

    if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
        return
    }

    if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
        guard let sink = _sink else {
            rxFatalError("Sink not set")
        }
        guard let subscription = _subscription else {
            rxFatalError("Subscription not set")
        }
		//前面是一坨判断,用于处理已经dispose的情况,dispose可能被调多次,原因在于Sink的dispose。
        sink.dispose()
        subscription.dispose()

        _sink = nil
        _subscription = nil
    }
}

这两个disposable究竟是什么?

subscription这个disposable,我们很好理解。subscribe方法就会返回一个disposable。这里的就是那个disposable。

sink是一个observer+cancelable组合成的disposable。我们从上面可以看到,subscribe方法将具体的消息发送交给了run方法。所以我们需要在run方法里实现类似于Observer.on()这样的代码。而Sink就是对这部分代码的封装。他们都有一些共同的特性,比如都需要一个Observer来执行on,需要一个cancelabl来取消,都需要一个方法来执行Observer.on方法。将这些通用的东西提取出来就是Sink类。

class Sink<O:ObserverType> :Disposable{
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate var _disposed: Bool

    init(observer: O, cancel: Cancelable) {
        _observer = observer
        _cancel = cancel
        _disposed = false
    }
    
    final func forwardOn(_event: Event<O.E>) {
        if _disposed {
            return
        }
        _observer.on(event)
    }
    //SinkForward也封装了on方法,和forwardOn的区别在于,会调用cancel.dispose。
    final func forwarder() -> SinkForward<O> {
        return SinkForward(forward: self)
    }

    final var disposed: Bool {
        return _disposed
    }

    func dispose() {
        _disposed = true
        _cancel.dispose()
    }
}
///只是简单的对on方法的封装
final class SinkForward<O:ObserverType>:ObserverType{
    typealias E = O.E
    
    private let _forward: Sink<O>
    
    init(forward: Sink<O>) {
        _forward = forward
    }
    
    final func on(_event: Event<E>) {
        switch event {
        case .next:
            _forward._observer.on(event)
        case .error, .completed:
            _forward._observer.on(event)
            _forward._cancel.dispose()
        }
    }
}

实际例子

好了,现在来看看一个完整的例子,我们选简单的Just来分析下。Just只是简单的发送一个element而已。

public static func just(_element: E) -> Observable<E> {
    return Just(element: element)
}
//继承自producer
final fileprivate class Just<Element> :Producer<Element>{
private let _element: Element

init(element: Element) {
    _element = element
}
///做了优化,不是重写run方法,而是直接重写subscribe,observer.on,然后返回NopDisposable。
override func subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == Element {
    observer.on(.next(_element))
    observer.on(.completed)
    return Disposables.create()
}
}
public static func just(_element: E, scheduler: ImmediateSchedulerType) -> Observable<E> {
    return JustScheduled(element: element, scheduler: scheduler)
}
//也是继承producer
final fileprivate class JustScheduled<Element> :Producer<Element>{
    fileprivate let _scheduler: ImmediateSchedulerType
    fileprivate let _element: Element

    init(element: Element, scheduler: ImmediateSchedulerType) {
        _scheduler = scheduler
        _element = element
    }
  //重写run方法,将observer.on这部分代码,交给Sink去实现
    override func run<O : ObserverType>(_observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        //sink持有cancelable,也就是Producer中的SinkDispose。当on接收了所有sequence之后调用了Sink的dispose,Sink的dispose会调用cancelable的dispose。cancelable实际上有持有sink和subscruption。此时sink是已经dispose的了,不会再次执行,subscruption的dispose正常执行。
        let sink = JustScheduledSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
final fileprivate class JustScheduledSink<O:ObserverType> :Sink<O>{
    typealias Parent = JustScheduled<O.E>

    private let _parent: Parent

    init(parent: Parent, observer: O, cancel: Cancelable) {
        _parent = parent
        super.init(observer: observer, cancel: cancel)
    }
    //调用Observer.on
    func run() -> Disposable {
        let scheduler = _parent._scheduler
        return scheduler.schedule(_parent._element) { element in
            self.forwardOn(.next(element))
            return scheduler.schedule(()) { _ in
                self.forwardOn(.completed)
				//既然sinkDisposer都已经释放了,我们自己创建一个什么也不做的disposable返回
                self.dispose()
                return Disposables.create()
            }
        }
    }
}

好了,其实很多rxswift operater都是按照这样写法完成的,我们其实可以先不看他的实现,自己先按照这一套东西写一遍,再比对,可以加深我们对泛型的理解还有具体的实现。

Subject

在消息传递的线路中,我们可能会需要这样的一种中间节点。他从别的地方接收消息(也可能自己直接onEvent),然后将这个消息分发给其他多个Observer。相当于一个中间的消息转发装置。Subject就是对这样的节点的一个抽象。

由于他能接收消息,所以应该实现ObserverType协议。他又能将消息发送给其他多个Observer,所以他还必须是一个Observable 。同时他还能断开这个节点,所以他还需要是一个disposable,在dispos时移除所有observer。

public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType {
    public typealias SubjectObserverType = PublishSubject<Element>

    typealias Observers = AnyObserver<Element>.s
    typealias DisposeKey = Observers.KeyType
    
    /// Indicates whether the subject has any observers
    public var hasObservers: Bool {
        _lock.lock()
        let count = _observers.count > 0
        _lock.unlock()
        return count
    }
    
    private let _lock = RecursiveLock()
    
    // state
    private var _isDisposed = false
    private var _observers = Observers()//一个Bag,保存了所有Observer的on方法
    private var _stopped = false
    private var _stoppedEvent = nil as Event<Element>?

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    /// Indicates whether the subject has been isDisposed.
    public var isDisposed: Bool {
        return _isDisposed
    }
    
    /// Creates a subject.
    public override init() {
        super.init()
        #if TRACE_RESOURCES
            _ = Resources.incrementTotal()
        #endif
    }
    
    /// 可以通过on方法接收事件,接收后,如果连接的有任何observer会将event发送给他们
    public func on(_event: Event<Element>) {
        dispatch(_synchronized_on(event), event)
    }

    func _synchronized_on(_event: Event<E>) -> Observers {
        _lock.lock(); defer { _lock.unlock() }
        switch event {
        case .next(_):
            if _isDisposed || _stopped {
                return Observers()
            }
            
            return _observers
        case .completed, .error:
            if _stoppedEvent == nil {
                _stoppedEvent = event
                _stopped = true
                let observers = _observers
                _observers.removeAll()
                return observers
            }

            return Observers()
        }
    }
    
    //当subjectsubscribe某个observer,实际上是将其添加到了自己的Observers中。
    public override func subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == Element {
        _lock.lock()
        let subscription = _synchronized_subscribe(observer)
        _lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == E {
        //如果已经stop了,返回一个stoppedEvent
        if let stoppedEvent = _stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        //如果已经dispose了,返回一个Error
        if _isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        //将observer的on方法插入observers,并得到一个key,返回一个SubscriptionDisposable,在他dispose时会通过key移除对应的on。
        let key = _observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }
    //SubscriptionDisposable dispose时会调用他
    func synchronizedUnsubscribe(_disposeKey: DisposeKey) {
        _lock.lock()
        _synchronized_unsubscribe(disposeKey)
        _lock.unlock()
    }

    func _synchronized_unsubscribe(_disposeKey: DisposeKey) {
        //通过key移除对应的on。
        _ = _observers.removeKey(disposeKey)
    }
    
    /// Returns observer interface for subject.
    public func asObserver() -> PublishSubject<Element> {
        return self
    }
    
    /// Unsubscribe all observers and release resources.
    public func dispose() {
        _lock.lock()
        _synchronized_dispose()
        _lock.unlock()
    }

    final func _synchronized_dispose() {
        _isDisposed = true
        _observers.removeAll()
        _stoppedEvent = nil
    }

    #if TRACE_RESOURCES
        deinit {
            _ = Resources.decrementTotal()
        }
    #endif
}

还有其他几种subject,他们都是处理某些特殊情况,比如:如果我的observer是在事件流动的过程中加入的,那么我想加入后能够立即收到SUbject最后收到的那个消息,这就是 BehaviorSubject 完成的工作。还有ReplaySubject和AsyncSubject,都比较简单。

谢谢你请我吃糖果


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

遗传算法原理及应用

遗传算法原理及应用

周明、孙树栋 / 国防工业出版社 / 1999-6 / 18.0

一起来看看 《遗传算法原理及应用》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换