内容简介:我们在编程中经常处理各种事件,各种事件也和我们的业务逻辑紧密相关,比如点击了这个btn,我该执行什么逻辑等等、我该用代理、通知、还是target-action来传递我们的事件、我该在main queue还是global queue上执行任务等等。在rxswift看来,无论是什么场景,这里面有一个不变的东西,那就是消息的生产、消息的发送、消息的处理、资源的销毁。我们以前实际上是用业务逻辑去包裹了这一套东西。rxswift认为这样实际上是做反了,我们应该以消息的流动去包裹业务逻辑,而不应该用业务逻辑去包裹消息
我们在编程中经常处理各种事件,各种事件也和我们的业务逻辑紧密相关,比如点击了这个btn,我该执行什么逻辑等等、我该用代理、通知、还是target-action来传递我们的事件、我该在main queue还是global queue上执行任务等等。在rxswift看来,无论是什么场景,这里面有一个不变的东西,那就是消息的生产、消息的发送、消息的处理、资源的销毁。
我们以前实际上是用业务逻辑去包裹了这一套东西。rxswift认为这样实际上是做反了,我们应该以消息的流动去包裹业务逻辑,而不应该用业务逻辑去包裹消息的流动。因为消息的流动是不变的,业务逻辑是变的,根据封装 变化的原则,我们用一个抽象的叫做Observable的东西来封装不同的业务逻辑。然后用Observerbal、Observer、disposable、scheduler等实现一套通用的消息处理机制。这样就能从各种代理、通知、target-action不同的消息处理方式中解放出来,使用同一的Observerable、Observer来实现。而且rx是一整套的,所以如果你切换到java,c++等其他语言,你也可以不用对每种语言都了解线程、传值、通知等,就能直接使用顶层的抽象。ok,按照这样的理解,我们来看看rxswift中的几个比较重要的类。
Observable
消息的流动是从消息的生成开始。而Observable就是代表这个抽象。不过呢,rxswift不止把消息生成的任务交给了Observable,还将消息发送的功能交给了Observable,也就是subscribe函数。
public protocol ObservableType:ObservableConvertibleType{
//自身将消息交给Observer,让他去处理,处理后返回一个disposable用于取消或者释放资源
func subscribe<O: ObserverType>(_observer: O) -> Disposable where O.E == E
}
Observer
消息流动过程中消息的处理者。Observer就代表这个抽象。
//Observable发送Event,Observer通过on函数接收,然后执行自己的处理逻辑。
public protocol ObserverType{
associatedtype E
func on(_event: Event<E>)
}
Disposable
如果调用了disposable的dispose方法,则代表了,任务已经完成或者取消执行,然后进行资源的清理。disposable有时会很复杂,因为他要清理资源,而我们的Observable可能是由多个Observable按照某种约定生成的,所以最终的disposable需要负责dispose所有的disposable。当然如果情况简单,可以创建一个简单的disposable。
public protocol Disposable{
/// Dispose resource.
func dispose()
}
scheduler
提供了统一的方式让我们在不同线程执行任务。MainQueue, GloableQueue…
Producer
producer是实现了Observable协议的一个基类。他重写了subscribe方法,处理了isScheduleRequired的问题,然后将具体的消息发送交给了run方法。
class Producer<Element> :Observable<Element>{
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
print(Thread.current)
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
}
读过一遍之后,我们可能会有这几个疑问:
- 为什么要去判断isScheduleRequired?
- 为什么需要创建SinkDisposer, 用它来包裹两个其他disposable?
- 这两个disposable究竟是什么?
为什么要去判断isScheduleRequired?
要解释这个问题,我们的先了解什么是CurrentThreadScheduler ,他是拿来干啥的?
CurrentThreadScheduler抽象了在当前线程执行某个任务的这样一个功能。非常重要的一点是,他是一个 Serail scheduler
。也就是说我是串行执行的,因为我们可能在其他不同线程上dispatch了一些任务到这个线程。那要怎么保证串行执行呢?恩,我们定义一个pthread_key 。如果当前线程有任务再执行,那么我们在当前的线程中设置一个specific存储这个key和value。任务执行完成后,我们清除这个标记。当有其他任务一起进来时,会先判断有没有这个key,有的话表示线程正在执行任务,其他任务会被放到一个Queue中缓存起来,在当前任务执行完成之后执行。所以 isScheduleRequired
其实是一个防止并发的标记。我们来看看代码:
public static fileprivate(set) var isScheduleRequired: Bool {
get {
//从线程中获取key的值,如果是首次进入,那么会为nil,isScheduleRequired也就是true
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed")
}
}
}
public func schedule<StateType>(_state: StateType, action: @escaping(StateType) -> Disposable) -> Disposable {
//如果当前没有任务在执行,isScheduleRequired = true
if CurrentThreadScheduler.isScheduleRequired {
//将其设置为false,就是设置 pthread_setspecific
CurrentThreadScheduler.isScheduleRequired = false
//执行任务,拿到disposable
let disposable = action(state)
defer {
//任务执行完毕,将isScheduleRequired设置为true
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
//如果当前任务执行完了,queue里也没有什么东西,退出
guard let queue = CurrentThreadScheduler.queue else {
return disposable
}
//queue里还有缓存的schduledItem, 遍历执行
while let latest = queue.value.dequeue() {
if latest.isDisposed {
continue
}
latest.invoke()
}
return disposable
}
//isScheduleRequired为false时执行这部分代码
let existingQueue = CurrentThreadScheduler.queue
let queue: RxMutableBox<Queue<ScheduledItemType>>
if let existingQueue = existingQueue {
queue = existingQueue
}
else {
queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
CurrentThreadScheduler.queue = queue
}
//构建一个ScheduledItem入队
let scheduledItem = ScheduledItem(action: action, state: state)
queue.value.enqueue(scheduledItem)
return scheduledItem
}
好了,现在我们知道Producer处理了通用情况,在当前线程执行任务,并且通过isScheduleRequired处理了并发的情况。
为什么需要创建SinkDisposer, 用它来包裹两个其他disposable?
我的理解是用于取消。在subscribe过程中,我们可能创建许多的disposable,但是假如我们要取消,怎么办呢?没错,所有disposable都调用dispose就行啦。好的,那总有一个东西记录所有的disposable吧,没错,于是我们定义了SinkDisposable,他包裹了两个disposable。在取消时,调用sinkdisposable的dispose就行了,sinkdisposable又会去调用sink和subscribe返回的disposable的dispose。这样所有都释放完了,我们不用知道这个过程中生成了那些disposable。
///初始化一个SinkDisposer let disposer = SinkDisposer() //调用run,得到一个sink和subscription let sinkAndSubscription = self.run(observer, cancel: disposer) //将sink和subscription设置到disposer里 disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
///sinkdisposer的dispose
func dispose() {
let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = _sink else {
rxFatalError("Sink not set")
}
guard let subscription = _subscription else {
rxFatalError("Subscription not set")
}
//前面是一坨判断,用于处理已经dispose的情况,dispose可能被调多次,原因在于Sink的dispose。
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
这两个disposable究竟是什么?
subscription这个disposable,我们很好理解。subscribe方法就会返回一个disposable。这里的就是那个disposable。
sink是一个observer+cancelable组合成的disposable。我们从上面可以看到,subscribe方法将具体的消息发送交给了run方法。所以我们需要在run方法里实现类似于Observer.on()这样的代码。而Sink就是对这部分代码的封装。他们都有一些共同的特性,比如都需要一个Observer来执行on,需要一个cancelabl来取消,都需要一个方法来执行Observer.on方法。将这些通用的东西提取出来就是Sink类。
class Sink<O:ObserverType> :Disposable{
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate var _disposed: Bool
init(observer: O, cancel: Cancelable) {
_observer = observer
_cancel = cancel
_disposed = false
}
final func forwardOn(_event: Event<O.E>) {
if _disposed {
return
}
_observer.on(event)
}
//SinkForward也封装了on方法,和forwardOn的区别在于,会调用cancel.dispose。
final func forwarder() -> SinkForward<O> {
return SinkForward(forward: self)
}
final var disposed: Bool {
return _disposed
}
func dispose() {
_disposed = true
_cancel.dispose()
}
}
///只是简单的对on方法的封装
final class SinkForward<O:ObserverType>:ObserverType{
typealias E = O.E
private let _forward: Sink<O>
init(forward: Sink<O>) {
_forward = forward
}
final func on(_event: Event<E>) {
switch event {
case .next:
_forward._observer.on(event)
case .error, .completed:
_forward._observer.on(event)
_forward._cancel.dispose()
}
}
}
实际例子
好了,现在来看看一个完整的例子,我们选简单的Just来分析下。Just只是简单的发送一个element而已。
public static func just(_element: E) -> Observable<E> {
return Just(element: element)
}
//继承自producer
final fileprivate class Just<Element> :Producer<Element>{
private let _element: Element
init(element: Element) {
_element = element
}
///做了优化,不是重写run方法,而是直接重写subscribe,observer.on,然后返回NopDisposable。
override func subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == Element {
observer.on(.next(_element))
observer.on(.completed)
return Disposables.create()
}
}
public static func just(_element: E, scheduler: ImmediateSchedulerType) -> Observable<E> {
return JustScheduled(element: element, scheduler: scheduler)
}
//也是继承producer
final fileprivate class JustScheduled<Element> :Producer<Element>{
fileprivate let _scheduler: ImmediateSchedulerType
fileprivate let _element: Element
init(element: Element, scheduler: ImmediateSchedulerType) {
_scheduler = scheduler
_element = element
}
//重写run方法,将observer.on这部分代码,交给Sink去实现
override func run<O : ObserverType>(_observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
//sink持有cancelable,也就是Producer中的SinkDispose。当on接收了所有sequence之后调用了Sink的dispose,Sink的dispose会调用cancelable的dispose。cancelable实际上有持有sink和subscruption。此时sink是已经dispose的了,不会再次执行,subscruption的dispose正常执行。
let sink = JustScheduledSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
final fileprivate class JustScheduledSink<O:ObserverType> :Sink<O>{
typealias Parent = JustScheduled<O.E>
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Cancelable) {
_parent = parent
super.init(observer: observer, cancel: cancel)
}
//调用Observer.on
func run() -> Disposable {
let scheduler = _parent._scheduler
return scheduler.schedule(_parent._element) { element in
self.forwardOn(.next(element))
return scheduler.schedule(()) { _ in
self.forwardOn(.completed)
//既然sinkDisposer都已经释放了,我们自己创建一个什么也不做的disposable返回
self.dispose()
return Disposables.create()
}
}
}
}
好了,其实很多rxswift operater都是按照这样写法完成的,我们其实可以先不看他的实现,自己先按照这一套东西写一遍,再比对,可以加深我们对泛型的理解还有具体的实现。
Subject
在消息传递的线路中,我们可能会需要这样的一种中间节点。他从别的地方接收消息(也可能自己直接onEvent),然后将这个消息分发给其他多个Observer。相当于一个中间的消息转发装置。Subject就是对这样的节点的一个抽象。
由于他能接收消息,所以应该实现ObserverType协议。他又能将消息发送给其他多个Observer,所以他还必须是一个Observable 。同时他还能断开这个节点,所以他还需要是一个disposable,在dispos时移除所有observer。
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType {
public typealias SubjectObserverType = PublishSubject<Element>
typealias Observers = AnyObserver<Element>.s
typealias DisposeKey = Observers.KeyType
/// Indicates whether the subject has any observers
public var hasObservers: Bool {
_lock.lock()
let count = _observers.count > 0
_lock.unlock()
return count
}
private let _lock = RecursiveLock()
// state
private var _isDisposed = false
private var _observers = Observers()//一个Bag,保存了所有Observer的on方法
private var _stopped = false
private var _stoppedEvent = nil as Event<Element>?
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
/// Indicates whether the subject has been isDisposed.
public var isDisposed: Bool {
return _isDisposed
}
/// Creates a subject.
public override init() {
super.init()
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
/// 可以通过on方法接收事件,接收后,如果连接的有任何observer会将event发送给他们
public func on(_event: Event<Element>) {
dispatch(_synchronized_on(event), event)
}
func _synchronized_on(_event: Event<E>) -> Observers {
_lock.lock(); defer { _lock.unlock() }
switch event {
case .next(_):
if _isDisposed || _stopped {
return Observers()
}
return _observers
case .completed, .error:
if _stoppedEvent == nil {
_stoppedEvent = event
_stopped = true
let observers = _observers
_observers.removeAll()
return observers
}
return Observers()
}
}
//当subjectsubscribe某个observer,实际上是将其添加到了自己的Observers中。
public override func subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == Element {
_lock.lock()
let subscription = _synchronized_subscribe(observer)
_lock.unlock()
return subscription
}
func _synchronized_subscribe<O : ObserverType>(_observer: O) -> Disposable where O.E == E {
//如果已经stop了,返回一个stoppedEvent
if let stoppedEvent = _stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
//如果已经dispose了,返回一个Error
if _isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
//将observer的on方法插入observers,并得到一个key,返回一个SubscriptionDisposable,在他dispose时会通过key移除对应的on。
let key = _observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
//SubscriptionDisposable dispose时会调用他
func synchronizedUnsubscribe(_disposeKey: DisposeKey) {
_lock.lock()
_synchronized_unsubscribe(disposeKey)
_lock.unlock()
}
func _synchronized_unsubscribe(_disposeKey: DisposeKey) {
//通过key移除对应的on。
_ = _observers.removeKey(disposeKey)
}
/// Returns observer interface for subject.
public func asObserver() -> PublishSubject<Element> {
return self
}
/// Unsubscribe all observers and release resources.
public func dispose() {
_lock.lock()
_synchronized_dispose()
_lock.unlock()
}
final func _synchronized_dispose() {
_isDisposed = true
_observers.removeAll()
_stoppedEvent = nil
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}
还有其他几种subject,他们都是处理某些特殊情况,比如:如果我的observer是在事件流动的过程中加入的,那么我想加入后能够立即收到SUbject最后收到的那个消息,这就是 BehaviorSubject
完成的工作。还有ReplaySubject和AsyncSubject,都比较简单。
谢谢你请我吃糖果
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
数据结构与算法分析
Frank.M.Carrano / 金名 / 清华大学出版社 / 2007-11 / 98.00元
“数据结构”是计算机专业的基础与核心课程之一,Java是现今一种热门的语言。本书在编写过程中特别考虑到了面向对象程序设计(OOP)的思想与Java语言的特性。它不是从基于另一种程序设计语言的数据结构教材简单地“改编”而来的,因此在数据结构的实现上更加“地道”地运用了Java语言,并且自始至终强调以面向对象的方式来思考、分析和解决问题。 本书是为数据结构入门课程(通常课号是CS-2)而编写的教......一起来看看 《数据结构与算法分析》 这本书的介绍吧!