内容简介:我们在编程中经常处理各种事件,各种事件也和我们的业务逻辑紧密相关,比如点击了这个btn,我该执行什么逻辑等等、我该用代理、通知、还是target-action来传递我们的事件、我该在main queue还是global queue上执行任务等等。在rxswift看来,无论是什么场景,这里面有一个不变的东西,那就是消息的生产、消息的发送、消息的处理、资源的销毁。我们以前实际上是用业务逻辑去包裹了这一套东西。rxswift认为这样实际上是做反了,我们应该以消息的流动去包裹业务逻辑,而不应该用业务逻辑去包裹消息
我们在编程中经常处理各种事件,各种事件也和我们的业务逻辑紧密相关,比如点击了这个btn,我该执行什么逻辑等等、我该用代理、通知、还是target-action来传递我们的事件、我该在main queue还是global queue上执行任务等等。在rxswift看来,无论是什么场景,这里面有一个不变的东西,那就是消息的生产、消息的发送、消息的处理、资源的销毁。
我们以前实际上是用业务逻辑去包裹了这一套东西。rxswift认为这样实际上是做反了,我们应该以消息的流动去包裹业务逻辑,而不应该用业务逻辑去包裹消息的流动。因为消息的流动是不变的,业务逻辑是变的,根据封装 变化的原则,我们用一个抽象的叫做Observable的东西来封装不同的业务逻辑。然后用Observerbal、Observer、disposable、scheduler等实现一套通用的消息处理机制。这样就能从各种代理、通知、target-action不同的消息处理方式中解放出来,使用同一的Observerable、Observer来实现。而且rx是一整套的,所以如果你切换到java,c++等其他语言,你也可以不用对每种语言都了解线程、传值、通知等,就能直接使用顶层的抽象。ok,按照这样的理解,我们来看看rxswift中的几个比较重要的类。
Observable
消息的流动是从消息的生成开始。而Observable就是代表这个抽象。不过呢,rxswift不止把消息生成的任务交给了Observable,还将消息发送的功能交给了Observable,也就是subscribe函数。
public protocol ObservableType:ObservableConvertibleType{ //自身将消息交给Observer,让他去处理,处理后返回一个disposable用于取消或者释放资源 func subscribe<O: ObserverType>(_observer: O) -> Disposable where O.E == E }
Observer
消息流动过程中消息的处理者。Observer就代表这个抽象。
//Observable发送Event,Observer通过on函数接收,然后执行自己的处理逻辑。 public protocol ObserverType{ associatedtype E func on(_event: Event<E>) }
Disposable
如果调用了disposable的dispose方法,则代表了,任务已经完成或者取消执行,然后进行资源的清理。disposable有时会很复杂,因为他要清理资源,而我们的Observable可能是由多个Observable按照某种约定生成的,所以最终的disposable需要负责dispose所有的disposable。当然如果情况简单,可以创建一个简单的disposable。
public protocol Disposable{ /// Dispose resource. func dispose() }
scheduler
提供了统一的方式让我们在不同线程执行任务。MainQueue, GloableQueue…
Producer
producer是实现了Observable协议的一个基类。他重写了subscribe方法,处理了isScheduleRequired的问题,然后将具体的消息发送交给了run方法。
class Producer<Element> :Observable<Element>{ override init() { super.init() } override func subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == Element { if !CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was disposed. let disposer = SinkDisposer() let sinkAndSubscription = run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } else { return CurrentThreadScheduler.instance.schedule(()) { _ in print(Thread.current) let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } } } func run<O : ObserverType>(_observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { rxAbstractMethod() } }
读过一遍之后,我们可能会有这几个疑问:
- 为什么要去判断isScheduleRequired?
- 为什么需要创建SinkDisposer, 用它来包裹两个其他disposable?
- 这两个disposable究竟是什么?
为什么要去判断isScheduleRequired?
要解释这个问题,我们的先了解什么是CurrentThreadScheduler ,他是拿来干啥的?
CurrentThreadScheduler抽象了在当前线程执行某个任务的这样一个功能。非常重要的一点是,他是一个 Serail scheduler
。也就是说我是串行执行的,因为我们可能在其他不同线程上dispatch了一些任务到这个线程。那要怎么保证串行执行呢?恩,我们定义一个pthread_key 。如果当前线程有任务再执行,那么我们在当前的线程中设置一个specific存储这个key和value。任务执行完成后,我们清除这个标记。当有其他任务一起进来时,会先判断有没有这个key,有的话表示线程正在执行任务,其他任务会被放到一个Queue中缓存起来,在当前任务执行完成之后执行。所以 isScheduleRequired
其实是一个防止并发的标记。我们来看看代码:
public static fileprivate(set) var isScheduleRequired: Bool { get { //从线程中获取key的值,如果是首次进入,那么会为nil,isScheduleRequired也就是true return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil } set(isScheduleRequired) { if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 { rxFatalError("pthread_setspecific failed") } } }
public func schedule<StateType>(_state: StateType, action: @escaping(StateType) -> Disposable) -> Disposable { //如果当前没有任务在执行,isScheduleRequired = true if CurrentThreadScheduler.isScheduleRequired { //将其设置为false,就是设置 pthread_setspecific CurrentThreadScheduler.isScheduleRequired = false //执行任务,拿到disposable let disposable = action(state) defer { //任务执行完毕,将isScheduleRequired设置为true CurrentThreadScheduler.isScheduleRequired = true CurrentThreadScheduler.queue = nil } //如果当前任务执行完了,queue里也没有什么东西,退出 guard let queue = CurrentThreadScheduler.queue else { return disposable } //queue里还有缓存的schduledItem, 遍历执行 while let latest = queue.value.dequeue() { if latest.isDisposed { continue } latest.invoke() } return disposable } //isScheduleRequired为false时执行这部分代码 let existingQueue = CurrentThreadScheduler.queue let queue: RxMutableBox<Queue<ScheduledItemType>> if let existingQueue = existingQueue { queue = existingQueue } else { queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1)) CurrentThreadScheduler.queue = queue } //构建一个ScheduledItem入队 let scheduledItem = ScheduledItem(action: action, state: state) queue.value.enqueue(scheduledItem) return scheduledItem }
好了,现在我们知道Producer处理了通用情况,在当前线程执行任务,并且通过isScheduleRequired处理了并发的情况。
为什么需要创建SinkDisposer, 用它来包裹两个其他disposable?
我的理解是用于取消。在subscribe过程中,我们可能创建许多的disposable,但是假如我们要取消,怎么办呢?没错,所有disposable都调用dispose就行啦。好的,那总有一个东西记录所有的disposable吧,没错,于是我们定义了SinkDisposable,他包裹了两个disposable。在取消时,调用sinkdisposable的dispose就行了,sinkdisposable又会去调用sink和subscribe返回的disposable的dispose。这样所有都释放完了,我们不用知道这个过程中生成了那些disposable。
///初始化一个SinkDisposer let disposer = SinkDisposer() //调用run,得到一个sink和subscription let sinkAndSubscription = self.run(observer, cancel: disposer) //将sink和subscription设置到disposer里 disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
///sinkdisposer的dispose func dispose() { let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state) if (previousState & DisposeStateInt32.disposed.rawValue) != 0 { return } if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 { guard let sink = _sink else { rxFatalError("Sink not set") } guard let subscription = _subscription else { rxFatalError("Subscription not set") } //前面是一坨判断,用于处理已经dispose的情况,dispose可能被调多次,原因在于Sink的dispose。 sink.dispose() subscription.dispose() _sink = nil _subscription = nil } }
这两个disposable究竟是什么?
subscription这个disposable,我们很好理解。subscribe方法就会返回一个disposable。这里的就是那个disposable。
sink是一个observer+cancelable组合成的disposable。我们从上面可以看到,subscribe方法将具体的消息发送交给了run方法。所以我们需要在run方法里实现类似于Observer.on()这样的代码。而Sink就是对这部分代码的封装。他们都有一些共同的特性,比如都需要一个Observer来执行on,需要一个cancelabl来取消,都需要一个方法来执行Observer.on方法。将这些通用的东西提取出来就是Sink类。
class Sink<O:ObserverType> :Disposable{ fileprivate let _observer: O fileprivate let _cancel: Cancelable fileprivate var _disposed: Bool init(observer: O, cancel: Cancelable) { _observer = observer _cancel = cancel _disposed = false } final func forwardOn(_event: Event<O.E>) { if _disposed { return } _observer.on(event) } //SinkForward也封装了on方法,和forwardOn的区别在于,会调用cancel.dispose。 final func forwarder() -> SinkForward<O> { return SinkForward(forward: self) } final var disposed: Bool { return _disposed } func dispose() { _disposed = true _cancel.dispose() } }
///只是简单的对on方法的封装 final class SinkForward<O:ObserverType>:ObserverType{ typealias E = O.E private let _forward: Sink<O> init(forward: Sink<O>) { _forward = forward } final func on(_event: Event<E>) { switch event { case .next: _forward._observer.on(event) case .error, .completed: _forward._observer.on(event) _forward._cancel.dispose() } } }
实际例子
好了,现在来看看一个完整的例子,我们选简单的Just来分析下。Just只是简单的发送一个element而已。
public static func just(_element: E) -> Observable<E> { return Just(element: element) }
//继承自producer final fileprivate class Just<Element> :Producer<Element>{ private let _element: Element init(element: Element) { _element = element } ///做了优化,不是重写run方法,而是直接重写subscribe,observer.on,然后返回NopDisposable。 override func subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == Element { observer.on(.next(_element)) observer.on(.completed) return Disposables.create() } }
public static func just(_element: E, scheduler: ImmediateSchedulerType) -> Observable<E> { return JustScheduled(element: element, scheduler: scheduler) }
//也是继承producer final fileprivate class JustScheduled<Element> :Producer<Element>{ fileprivate let _scheduler: ImmediateSchedulerType fileprivate let _element: Element init(element: Element, scheduler: ImmediateSchedulerType) { _scheduler = scheduler _element = element } //重写run方法,将observer.on这部分代码,交给Sink去实现 override func run<O : ObserverType>(_observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E { //sink持有cancelable,也就是Producer中的SinkDispose。当on接收了所有sequence之后调用了Sink的dispose,Sink的dispose会调用cancelable的dispose。cancelable实际上有持有sink和subscruption。此时sink是已经dispose的了,不会再次执行,subscruption的dispose正常执行。 let sink = JustScheduledSink(parent: self, observer: observer, cancel: cancel) let subscription = sink.run() return (sink: sink, subscription: subscription) } }
final fileprivate class JustScheduledSink<O:ObserverType> :Sink<O>{ typealias Parent = JustScheduled<O.E> private let _parent: Parent init(parent: Parent, observer: O, cancel: Cancelable) { _parent = parent super.init(observer: observer, cancel: cancel) } //调用Observer.on func run() -> Disposable { let scheduler = _parent._scheduler return scheduler.schedule(_parent._element) { element in self.forwardOn(.next(element)) return scheduler.schedule(()) { _ in self.forwardOn(.completed) //既然sinkDisposer都已经释放了,我们自己创建一个什么也不做的disposable返回 self.dispose() return Disposables.create() } } } }
好了,其实很多rxswift operater都是按照这样写法完成的,我们其实可以先不看他的实现,自己先按照这一套东西写一遍,再比对,可以加深我们对泛型的理解还有具体的实现。
Subject
在消息传递的线路中,我们可能会需要这样的一种中间节点。他从别的地方接收消息(也可能自己直接onEvent),然后将这个消息分发给其他多个Observer。相当于一个中间的消息转发装置。Subject就是对这样的节点的一个抽象。
由于他能接收消息,所以应该实现ObserverType协议。他又能将消息发送给其他多个Observer,所以他还必须是一个Observable 。同时他还能断开这个节点,所以他还需要是一个disposable,在dispos时移除所有observer。
public final class PublishSubject<Element> : Observable<Element> , SubjectType , Cancelable , ObserverType , SynchronizedUnsubscribeType { public typealias SubjectObserverType = PublishSubject<Element> typealias Observers = AnyObserver<Element>.s typealias DisposeKey = Observers.KeyType /// Indicates whether the subject has any observers public var hasObservers: Bool { _lock.lock() let count = _observers.count > 0 _lock.unlock() return count } private let _lock = RecursiveLock() // state private var _isDisposed = false private var _observers = Observers()//一个Bag,保存了所有Observer的on方法 private var _stopped = false private var _stoppedEvent = nil as Event<Element>? #if DEBUG fileprivate let _synchronizationTracker = SynchronizationTracker() #endif /// Indicates whether the subject has been isDisposed. public var isDisposed: Bool { return _isDisposed } /// Creates a subject. public override init() { super.init() #if TRACE_RESOURCES _ = Resources.incrementTotal() #endif } /// 可以通过on方法接收事件,接收后,如果连接的有任何observer会将event发送给他们 public func on(_event: Event<Element>) { dispatch(_synchronized_on(event), event) } func _synchronized_on(_event: Event<E>) -> Observers { _lock.lock(); defer { _lock.unlock() } switch event { case .next(_): if _isDisposed || _stopped { return Observers() } return _observers case .completed, .error: if _stoppedEvent == nil { _stoppedEvent = event _stopped = true let observers = _observers _observers.removeAll() return observers } return Observers() } } //当subjectsubscribe某个observer,实际上是将其添加到了自己的Observers中。 public override func subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == Element { _lock.lock() let subscription = _synchronized_subscribe(observer) _lock.unlock() return subscription } func _synchronized_subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == E { //如果已经stop了,返回一个stoppedEvent if let stoppedEvent = _stoppedEvent { observer.on(stoppedEvent) return Disposables.create() } //如果已经dispose了,返回一个Error if _isDisposed { observer.on(.error(RxError.disposed(object: self))) return Disposables.create() } //将observer的on方法插入observers,并得到一个key,返回一个SubscriptionDisposable,在他dispose时会通过key移除对应的on。 let key = _observers.insert(observer.on) return SubscriptionDisposable(owner: self, key: key) } //SubscriptionDisposable dispose时会调用他 func synchronizedUnsubscribe(_disposeKey: DisposeKey) { _lock.lock() _synchronized_unsubscribe(disposeKey) _lock.unlock() } func _synchronized_unsubscribe(_disposeKey: DisposeKey) { //通过key移除对应的on。 _ = _observers.removeKey(disposeKey) } /// Returns observer interface for subject. public func asObserver() -> PublishSubject<Element> { return self } /// Unsubscribe all observers and release resources. public func dispose() { _lock.lock() _synchronized_dispose() _lock.unlock() } final func _synchronized_dispose() { _isDisposed = true _observers.removeAll() _stoppedEvent = nil } #if TRACE_RESOURCES deinit { _ = Resources.decrementTotal() } #endif }
还有其他几种subject,他们都是处理某些特殊情况,比如:如果我的observer是在事件流动的过程中加入的,那么我想加入后能够立即收到SUbject最后收到的那个消息,这就是 BehaviorSubject
完成的工作。还有ReplaySubject和AsyncSubject,都比较简单。
谢谢你请我吃糖果
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。