标签:
从本篇博客开始,我将叙述一序列ReactiveX模式的讲解,ReactiveX是微软推出的开源一个项目,里面包含了RxJava,RxJs,RxSwift,RxCpp,Rx.Net,RxPhp等一序列的Functional Reactive Programming(FRP,函数响应式编程)。从这篇博客开始我将详细介绍以上提到的6个语言RX的FRP,及其内部具体实现。
Swift是苹果公司新推出的一门现代化的编程语言,并且将其开源出来了,Swift具有很多的优点,这也使得这门语言推出的短时间引起了很大反应的原因,在最近的2016年3月的编程语言排行榜处于第14位,甚至超过了OC(15位)。可见Swift的在开发者心中的地位。
在RxSwift中,可以有多种创建Observable对象的方法,主要有以下几种:
要弄明白Observable就要先弄清楚Observable是继承了哪些类和协议,从源码开始分析:
首先第一个是ObservableConvertibleType:
/**
Type that can be converted to observable sequence (`Observer<E>`).
*/
public protocol ObservableConvertibleType {
/**
Type of elements in sequence.
*/
typealias E
/**
Converts `self` to `Observable` sequence.
- returns: Observable sequence that represents `self`.
*/
func asObservable() -> Observable<E>
}
从ObservableConvertibleType协议源码可以看出,它定义了一个typealias类型别名和asObservable方法,类型别名是用来定义将要处理的类型(例如String,Int等等),而asObervable这个我们在后面会具体叙述。其次是ObservableType,它继承了ObservableConvertibleType,ObservableType主要干了两个事情,第一个是创建出subscribe方法,它是用来执行订阅事件的(onNext、onError/onComplete),第二个就是简易实现asObservable方法(通过extension ObservableType 实现),asObservable主要是通过Observable.create(subscrible())实现的。再上来就是Observable,它是一个类,继承了ObservableType协议接口。
下面我们分别对以上几种创建Observable对象做详细的介绍。
asObservable其实是相当于clone方法,其内部实现如下:
public func asObservable() -> Observable<E> {
return self
}
从这里看,它return self也就是自己,这就意味着,你必须先有Observable对象才能调用asObservable方法。例如:
var obs = Observable<String>.create { (observer) -> Disposable in
observer.on(.Next("hahah"))
observer.on(.Next("deasd"))
observer.on(.Completed)
return NopDisposable.instance
}
let observable = obs.asObservable()
observable.subscribeOn(MainScheduler.instance)
.subscribe{ event in
print(event.debugDescription)
}
第二个是subscribe方法,这个方法具体实现调用了一个“抽象”方法,这个“抽象”方法就是打印出来一个错误日志并且停止运行。
public func subscribe<O: ObserverType where O.E == E>(observer: O) -> Disposable {
abstractMethod()
}
当然,这个Observable类中方法,但是extension Observable其实是有很多用法的。这也是我们上面提到创建Observable的各种方法。
2、create方法
public static func create(subscribe: (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
这是一个“静态方法”(在class中用static关键字标注,在struct和enum中使用class关键字标注),这个方法的参数是一个函数(通常我们会用闭包的方式),函数的参数是AnyObserver,返回的是Disposable。AnyObserver其实就是订阅者,Disposable是一个协议接口,里面只有一个dispose方法,用来释放一些资源。整个create方法返回的是一个AnonymousObservable(匿名Observable),AnonymousObservable继承自Producer,Producer实现了线程调度功能,可以安排某个线程来执行run方法。因此create方法返回的AnonymousObservable是可以运行在指定线程中Observable。完整的create例子:
var obs = Observable<String>
.create ({ (observer) -> Disposable in
observer.on(.Next("hahah"))
observer.on(.Next("deasd"))
observer.on(.Completed)
return NopDisposable.instance
})
.observeOn(MainScheduler.instance)
.subscribe({event in
if let str = event.element {
print(str)
}
})
//.dispose()
最后obs变量是一个Disposable类型变量,可以继续调用dispose方法释放资源。整个代码输出结果:
hahah
deasd
3、empty方法
public static func empty() -> Observable<E> {
return Empty<E>()
}
empty方法是一个空方法,里面没有onNext事件处理,只会处理onComplete方法。empty创建Observable对象比较简单。代码例子:
let obs1 = Observable<String>.empty()
obs1.subscribe(
onNext: {str in print(str)},
onError: { (errorType) -> Void in
print(errorType)
},
onCompleted: { () -> Void in
print("complete")
})
{ () -> Void in
print("dispose")
}
输出结果:
complete
dispose
这个例子中有四个闭包,其中最后一个是尾随闭包,而且这些闭包都是可选类型。当然你也可以如下写法:
let obs1 = Observable<String>.empty()
obs1.subscribe(
onNext: {str in
print(str)
},
onError: { (errorType) -> Void in
print(errorType)
},
onCompleted: { () -> Void in
print("complete")
},
onDisposed: {() -> Void in
print("dispose")
})
4、never方法
public static func never() -> Observable<E> {
return Never()
}
官方解释是返回一个无终止的观察者事件序列,可以用来表示无限持续时间。尽管我们给安排了next事件,但实际上,他是不会执行的。不会输出onNext
Observable<String>
.never()
.subscribeNext( { (str) -> Void in
print("onNext")
})
//.dispose()
5、just方法
public static func just(element: E, scheduler: ImmediateSchedulerType) -> Observable<E> {
return JustScheduled(element: element, scheduler: scheduler)
}
just方法只能处理单个事件,简单来说,我们使用just方法不能将一组数据一起处理,只能一个一个处理。例如:
Observable<String>
.just("just test")
.subscribeOn(MainScheduler.instance)
.subscribeNext({ (str) -> Void in
print(str)
})
.dispose()
输出结果:
just test
just方法是一个多态方法,允许在传入参数时候指定线程,例如:
它指定当前线程完成subscribe相关事件。
Observable<String>
.just("just with Scheduler", scheduler: CurrentThreadScheduler.instance)
.subscribeNext({ (str) -> Void in
print(str)
})
.dispose()
6、error方法
public static func error(error: ErrorType) -> Observable<E> {
return Error(error: error)
}
error方法是返回一个只能调用onError方法的Observable序列。其中的onNext和OnComleted方法是不会执行的。例如:
public static func error(error: ErrorType) -> Observable<E> {
return Error(error: error)
}
Observable<String>
.error(RxError.Timeout)
.subscribe(
onNext: { (str) -> Void in
print(str)
print("onNext")
},
onError: { (error)-> Void in
print(error)
},
onCompleted: { () -> Void in
print("onCompleted")
},
onDisposed: { () -> Void in
print("dispose")
})
.dispose()
最后的输出结果是:
Sequence timeout
dispose
7、of方法
可以说of方法是just方法的升级版,它可以将一序列的事情组合起来一起处理。极大方便了开发者对数组(Array)、字典(Dictionary)进行分布处理。
public static func of(elements: E ..., scheduler: ImmediateSchedulerType? = nil) -> Observable<E> {
return Sequence(elements: elements, scheduler: scheduler)
}
Observable<String>
.of("d1","d2", "d3", "d4")
.subscribe( { (event) -> Void in
if let els = event.element {
print(els)
}
})
.dispose()
这里解释一下subscribe(on: Event->Void)方法,例子中event.element在调用get属性的时候其实会执行一个onNext方法,它返回的是一个可选类型,因此要用if let解析处理。当然如果代码改成如下,那么是不会输出结果的,因为event.error执行的是错误监听(也就是执行的onError方法,因此不会输出结果)。of和just一样,存在一个多态方法,可以带入线程控制。
Observable<String>
.of("d1","d2", "d3", "d4")
.subscribe( { (event) -> Void in
if let els = event.error{
print(els)
}
})
.dispose()
8、deferred方法
deferred方法是延时创建Observable对象,当subscribe的时候才去创建,它为每一个bserver创建一个新的Observable; deferred采用一个Factory函数型作为参数,Factory函数返回的是Observable类型。这也是其延时创建Observable的主要实现。
public static func deferred(observableFactory: () throws -> Observable<E>)
-> Observable<E> {
return Deferred(observableFactory: observableFactory)
}
整个deferred方法的原理如上图,从图中可以看出,deferred不是第一步创建Observable,而是在subscriber的时候创建的。(图中红色的是error,绿色的是next事件)
9、generate方法
public static func generate(initialState initialState: E, condition: E throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: E throws -> E) -> Observable<E> {
return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler)
}
generate方法是一个迭代器,它一直循环onNext事件,直到condition不满足要求退出。generate有四个参数,第一个是最开始的循环变量,第二个是条件,第三个是迭代器,这个迭代器每次运行都会返回一个E类型,作为下一次是否执行onNext事件源,而是否正的要执行则看是否满足condition条件。其实我们可以理解generate就是一个循环体(其内部实现也正是一个循环,代码在:GenerateSink的run方法中)。例子:
Observable<String>
.generate(
initialState: "ah",
condition: ({ (str) -> Bool in
return str.hasPrefix("ah")
}),
iterate: ({ (str1) -> String in
return "h" + str1
}))
//.subscribeOn(MainScheduler.instance)
.subscribe ({ (event) -> Void in
if let res = event.element {
print(res)
}
})
.dispose()
输出结果:
ah
上面这个例子说的是,初始的变量是“ah”,第一个条件满足,执行onNext事件,同时生成一个hah,不满足条件,不执行onNext事件。generate是一个具有高度可变的of方法,它同时兼备了后面要介绍的过滤(filter)特性。当然generate还有一个多态方法,允许传入执行线程。这个线程是为循环体而生的,并不是为subscrible而生的。
10、repeatElement方法
public static func repeatElement(element: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return RepeatElement(element: element, scheduler: scheduler)
}
repeatElement方法是一个无限循环的,它会一直循环onNext方法。当然这种循环是可以指定线程的。例子:
Observable<String>
.repeatElement("daa")
.subscribeNext { (str) -> Void in
print(str)
}
.dispose()
其中subscribeNext是一个尾随闭包。
11、using方法
public static func using<R: Disposable>(resourceFactory: () throws -> R, observableFactory: R throws -> Observable<E>) -> Observable<E> {
return Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
}
using方法是通过Factory方法生成一个对象(resourceFactory)再转换成Observable,中间我们要使用Factory方法,上面已经介绍过一次Factory方法。using方法相对其他的方法比较复杂和特殊,原因是using方法是由我们构建出资源和构建清除资源的,中间通过一个转换生成Observable对象。
Observable<String>
.using( { () -> Student<String> in
return Student(source: Observable<String>.just("jarlene"), disposeAction: { () -> () in
print("hah")
})
},
observableFactory: { (r) -> Observable<String> in
return r.asObservable()
})
.subscribeNext( { (ss) -> Void in
print(ss)
})
.dispose()
其中Student类继承了两个协议:ObservableConvertibleType和Disposable;ObservableConvertibleType是为了生成Observable对象(通过调用asObservable方法),Disposable是为了清除资源。源码如下:
class Student<E>: ObservableConvertibleType, Disposable{
private let _source: Observable<E>
private let _dispose: AnonymousDisposable
init(source: Observable<E>, disposeAction: () -> ()) {
_source = source
_dispose = AnonymousDisposable(disposeAction)
}
func dispose() {
_dispose.dispose()
}
func asObservable() -> Observable<E> {
return _source
}
var name :String{
get {
return self.name
}
set {
self.name = newValue
}
}
}
在上面例子中,我们采用Observable.just方法生成了一个Observable对象传递给Student对象,同时也定义了一个释放资源的方法。等到调用dispose()方法,就会执行我们定义的释放资源的方法。例子结果为:
jarlene
hah
12、range方法
public static func range(start start: E, count: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return RangeProducer<E>(start: start, count: count, scheduler: scheduler)
}
range方法其实方便版of方法,其功能和of差不多,我们只要输出start和count然后就能生成一组数据,让他们执行onNext。值得注意的是,range方法只生成Observable型。在调用bindNext的时候可以将其对应成其他相应的类型。
例如:
let arr: [String] = ["ad", "cd", "ef", "gh"]
Observable<Int>
.range(start: 1, count: 3)
.subscribeNext { (n) -> Void in
print(arr[n])
}
.dispose()
结果
cd
ef
gh
13、toObservable(from)
public func toObservable(scheduler: ImmediateSchedulerType? = nil) -> Observable<Generator.Element> {
return Sequence(elements: self, scheduler: scheduler)
}
toObservable方法是扩展自Array,是将一个一个array转换成Observable,其内部实调用了一个序列Sequence,其用法很简单。
let arr: [String] = ["ab", "cd", "ef", "gh"]
arr.toObservable()
.subscribeNext { (s) -> Void in
print(s)
}
.dispose()
运行结果:
ab
cd
ef
gh
14、interval/timer
public static func interval(period: RxTimeInterval, scheduler: SchedulerType)
-> Observable<E> {
return Timer(dueTime: period,
period: period,
scheduler: scheduler
)
}
public static func timer(dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType)
-> Observable<E> {
return Timer(
dueTime: dueTime,
period: period,
scheduler: scheduler
)
}
interval方法是定时产生一个序列,interval第一个参数就是时间间隔,第二个参数是指定线程。 可以看出interval是range和repeateElement的结合。timer方法和interval方法类似。差别在于timer可以设置间隔时间和持续时间,而interval的间隔时间和持续时间是一样的。
至此,我们将Observable对象基本的产生方法都讲述完了,下一节开始我们详细讲述Observer的创建以及制作器Producer,其次将详细叙述Producer和事件方法onNext、onError、onComplete之间的联系,以及Producer是怎么调度线程来完成线程控制的。
对观察着对象进行变换使得一个对象变换成另一个对象,这个是RxSwift核心之一,因此对于熟悉RxSwift特别重要。RxSwift存在以下变换方法:
- buffer
- flatMap
- flatMapFirst
- flatMapLatest
- map
- scan
- window
过滤方法
- debounce / throttle
- distinctUntilChanged
- elementAt
- filter
- sample
- skip
- take
- takeLast
- single
下面我们分别对以上几种对Observable对象变换做详细的介绍(不全部阐述)。
1、 buffer方法:
buffer方法是extension ObservableType中的一个方法,它的作用是缓冲组合,第一个参数是缓冲时间,第二个参数是缓冲个数,第三个参数是线程。总体来说就是经过一定时间,将一定个数的事件组合成一个数组,一起处理,在组合的过程中,你可以选择线程。
public func buffer(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
-> Observable<[E]> {
return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
}
例子:
Observable<String>
.of("ab", "cd", "ef", "gh")
.buffer(timeSpan: 1, count: 2, scheduler: MainScheduler.instance)
.subscribeNext({ (n) -> Void in
if !n.isEmpty {
print(n)
}
})
.dispose()
输出结果
["ab", "cd"]
["ef", "gh"]
2、flatMap
flatMap也是扩展自ObservableType,它的作用是将一种类型变换成另一种类型。flatMap的参数是一个方法,这个方法的输入参数与Observable的E是同一种类型,输出依然是Observable类型。
public func flatMap<O: ObservableConvertibleType>(selector: (E) throws -> O)
-> Observable<O.E> {
return FlatMap(source: asObservable(), selector: selector)
}
我们看一个例子,例子中首先是一组Observable,通过flatMap后还是一组Observable,但是flatMap作用是,如果元素中遇到“a”字母开头的,那么它就重新组装一个数组,这个数组是只有元素和“a”;如果元素不是“a”字母开头的就与“b”字母组装成另一个数组。这两种情况都通过调用toObservable返回Observable。flatMapFirst、flatMapLast、flatMapWithIndex都是类似的作用,这里就不重复。
Observable<String>
.of("ab", "cd", "aef", "gh")
.flatMap({ (element: String) -> Observable<String> in
if element.hasPrefix("a") {
let sd : [String] = [element, "a"]
return sd.toObservable()
} else {
let sd : [String] = [element, "b"]
return sd.toObservable()
}
})
.subscribeNext({ (n) -> Void in
if !n.isEmpty {
print(n)
}
})
.dispose()
结果
ab
a
cd
b
aef
a
gh
b
3、map
map方法是通过其实flatMap的简化版本,它返回的可以是任何类型。其中R是返回类型。
public func map<R>(selector: Self.E throws -> R) -> RxSwift.Observable<R>
例子:
Observable<String>
.of("ab", "cd", "aef", "gh")
.map({ (str) -> String in
return str+"ss"
})
.subscribeNext({ (n) -> Void in
if !n.isEmpty {
print(n)
}
})
.dispose()
结果
abss
cdss
aefss
ghss
4、scan方法
scan方法有两个参数,第一个参数是种子,第二个参数是加速器。所谓的种子就是最初的状态,加速器就是将每一次运行的结果延续到下一次。scan方法也是扩展自ObservableType
public func scan<A>(seed: A, accumulator: (A, E) throws -> A)
-> Observable<A> {
return Scan(source: self.asObservable(), seed: seed, accumulator: accumulator)
}
例子:
Observable<String>
.of("a", "b", "c", "d", "e")
.scan("s", accumulator: { (a, b) -> String in
return a+b
})
.subscribeNext({ (n) -> Void in
print(n)
})
.dispose()
这个例子中是将所有的字符依次串起来,运行结果是:
sa
sab
sabc
sabcd
sabcde
5、window
window方法同样扩展自ObservableType,它有三个参数,第一个是时间间隔,第二个是数量,第三个是线程。时间间隔指的的是window方法开窗的时间间隔;第二个参数数量指的的是每次通过窗口的个数;线程就是这种操作执行在什么线程上。起源码如下:
public func window(timeSpan timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
-> Observable<Observable<E>> {
return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
}
需要特别注意的是window方法之后,返回的是Observable
Observable<String>
.of("ab", "bc", "cd", "de", "ef")
.window(timeSpan: 1, count: 2, scheduler: MainScheduler.instance)
.subscribeNext({ (n) -> Void in
n.subscribeNext({ (ss) -> Void in
print(ss)
})
})
.dispose()
结果:
ab
bc
cd
de
ef
变换的方法基本就这些,但是开发者可以通过自定义的方式扩展变换的方法以达到所需的目的。接下来我们看看过滤方法。
1、debounce / throttle
debounce/throttle 方法在规定的时间中过滤序列元素,就如上图描述的一样,当debounce打开的时候,刚好那个黄色的序列元素过来,那么它就不会通知到事件(onNext、onError、onComplete)上去。下面是debounce方法源码。
public func throttle(dueTime: RxTimeInterval, scheduler: SchedulerType)
-> Observable<E> {
return Throttle(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
}
例子:
Observable<String>
.of("a", "b", "c", "d", "e", "f")
.debounce(1, scheduler: MainScheduler.instance)
.subscribeNext { (str) -> Void in
print(str)
}
.dispose()
输出结果
f
2、distinctUntilChanged
distinctUntilChanged 主要是过滤相邻两个元素是否重复,重复的话就过滤掉其中之一。
public func distinctUntilChanged<K>(keySelector: (E) throws -> K, comparer: (lhs: K, rhs: K) throws -> Bool)
-> Observable<E> {
return DistinctUntilChanged(source: self.asObservable(), selector: keySelector, comparer: comparer)
}
例子:
Observable<String>
.of("a", "a", "c", "e", "e", "f")
.distinctUntilChanged({ (lhs, rhs) -> Bool in
return lhs==rhs
})
.subscribeNext { (str) -> Void in
print(str)
}
.dispose()
输出结果:
a
c
e
f
3、elementAt
elementAt方法其实就挑选出所需要的序列元素,上图描述的很清楚。
这个方法很简单。没有什么难点。当index超界的时候,throwOnEmpty参数是否抛出异常。
public func elementAt(index: Int)
-> Observable<E> {
return ElementAt(source: asObservable(), index: index, throwOnEmpty: true)
}
例子:
Observable<String>
.of("aa", "av", "cs", "ed", "ee", "ff")
.elementAt(2)
.subscribeNext { (str) -> Void in
print(str)
}
.dispose()
输出结果
cs
4、filter
filter方法很简单,指出过滤条件就行,满足过滤条件的就能执行事件通知,否则不行
public func filter(predicate: (E) throws -> Bool)
-> Observable<E> {
return Filter(source: asObservable(), predicate: predicate)
}
例子:
Observable<String>
.of("aa", "av", "cs", "ed", "ee", "ff")
.filter({ (ss) -> Bool in
return ss.hasPrefix("a")
})
.subscribeNext { (str) -> Void in
print(str)
}
.dispose()
输出结果
aa
av
接下来的几个方法都是类似的,这里不就在详细叙述啦。
由第一部分可以知道,几乎在创建所有的Observable的时候都要用到Producer,而在事件处理(onNext、onError、onComplete)过程中经常要用到线程调度(Scheduler),它们之间存在一种很巧妙的设计。首先先看看Producer源码。
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
if !CurrentThreadScheduler.isScheduleRequired {
return run(observer)
} else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
return self.run(observer)
}
}
}
func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
abstractMethod()
}
}
Producer是继承了Observable类,我们在创建Observable类时候都用到了Producer,那么Producer主要做了两件事情,第一个实现subscribe方法,在subscribe方法中传入了observer参数,observer类型是ObserverType,在上一部分介绍了ObserverType中有一个类型别名E,那么在Producer的范型element就必须和ObserverType中类型别名E一样。回过头来说subscribe,我们首先看CurrentThreadScheduler 的源码,CurrentThreadScheduler 是继承ImmediateSchedulerType协议,它里面就定义了一个方法:
func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable
而这个方法在CurrentThreadScheduler 具体实现了。
public class CurrentThreadScheduler : ImmediateSchedulerType {
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
/**
The singleton instance of the current thread scheduler.
*/
public static let instance = CurrentThreadScheduler()
static var queue : ScheduleQueue? {
get {
return NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKeyInstance)
}
set {
NSThread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKeyInstance)
}
}
/**
Gets a value that indicates whether the caller must call a `schedule` method.
*/
public static private(set) var isScheduleRequired: Bool {
get {
let value: CurrentThreadSchedulerValue? = NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerKeyInstance)
return value == nil
}
set(isScheduleRequired) {
NSThread.setThreadLocalStorageValue(isScheduleRequired ? nil : CurrentThreadSchedulerValueInstance, forKey: CurrentThreadSchedulerKeyInstance)
}
}
/**
Schedules an action to be executed as soon as possible on current thread.
If this method is called on some thread that doesn‘t have `CurrentThreadScheduler` installed, scheduler will be
automatically installed and uninstalled after all work is performed.
- parameter state: State passed to the action to be executed.
- parameter action: Action to be executed.
- returns: The disposable object used to cancel the scheduled action (best effort).
*/
public func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable {
if CurrentThreadScheduler.isScheduleRequired {
CurrentThreadScheduler.isScheduleRequired = false
let disposable = action(state)
defer {
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
guard let queue = CurrentThreadScheduler.queue else {
return disposable
}
while let latest = queue.value.dequeue() {
if latest.disposed {
continue
}
latest.invoke()
}
return disposable
}
let existingQueue = CurrentThreadScheduler.queue
let queue: RxMutableBox<Queue<ScheduledItemType>>
if let existingQueue = existingQueue {
queue = existingQueue
}
else {
queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
CurrentThreadScheduler.queue = queue
}
let scheduledItem = ScheduledItem(action: action, state: state)
queue.value.enqueue(scheduledItem)
return scheduledItem
}
}
其实主要是根据CurrentThreadScheduler.isScheduleRequired参数来选择是否需要当前线程运行,如果需要,首调用action方法,而这个action方法其实就是onNext、onError、onCompelete方法。然后做了一个延迟清除(defer)和一个判断(guard)。然后循环一个queue其实主要是看看是否还有没有执行完的onNext时间。latest.invoke()其实就是做action(state),然后返回Disposable。如果不需要,则组合queue,生成Disposable返回。接下来我们看看怎么设置线程执行的,首选看看subscribleOn方法,这个方法就是指定接下来事情要发生在那个线程中,具体看一下代码:
public func observeOn(scheduler: ImmediateSchedulerType)
-> Observable<E> {
if let scheduler = scheduler as? SerialDispatchQueueScheduler {
return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
}
else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
}
方法是定义在extension ObservableType 中的,它指定ObservableType 运行线程,这里面指定了两种运行方式,第一种是运行ObserveOnSerialDispatchQueue,第二种是ObserveOn这两个都继承自Producer,上面我们已经叙述了Producer,不管是ObserveOnSerialDispatchQueue还是ObserveOn都重写了run方法,他们返回的都是ObserverBase。ObserverBase其实就是在执行onNext、onError、onComplete方法。
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private var _isStopped: AtomicInt = 0
func on(event: Event<E>) {
switch event {
case .Next:
if _isStopped == 0 {
onCore(event)
}
case .Error, .Completed:
if !AtomicCompareAndSwap(0, 1, &_isStopped) {
return
}
onCore(event)
}
}
func onCore(event: Event<E>) {
abstractMethod()
}
func dispose() {
_isStopped = 1
}
}
onCore方法是由继承者实现,比如在ObserveOnSink类中及具体实现了onCore方法
override func onCore(event: Event<E>) {
let shouldStart = _lock.calculateLocked { () -> Bool in
self._queue.enqueue(event)
switch self._state {
case .Stopped:
self._state = .Running
return true
case .Running:
return false
}
}
if shouldStart {
_scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
}
}
这个onCore方法是判断当前运行到那一步(onNext,onError,onComplete)。现在我们回过头来看Producer中的subscribe其实就是执行事件,只不过这个事件是在某个线程上执行的。我们可以绘制一个简单的流程图描述这些。
Observable执行subscribleOn方法,产生一个新的Observable,这个新Observable是Produce,他继承了Observable,当Observable执行subscrible方法的时候,会根据线程来执行,如果指定了线程,那么就会通过run方法去执行事件。如果没有指定线程,就用当前线程执行run方法去执行事件。当然如果要用到变换或者过滤,也可以通过指定线程来执行变换和过滤,其原理是一样的。
对观察着合并就是将多个观察着(Observables)合并起来处理,使用起来更方便。它主要由以下方法:
- merge
- startWith
- switchLatest
- combineLatest
- zip
链接器
- multicast
- publish
- refCount
- replay
- shareReplay
当然为了将多个相同类型观察者对象合并起来处理,可以极大减少重复代码的工作量。从本节开始我们将会叙述观察者对象的合并和发布。
1、merge
从图中很容易看出merge方法就是将多个Observable对象合并处理。
public func merge() -> Observable<E.E> {
return Merge(source: asObservable())
}
例子:
Observable.of(
Observable.of("a", "b"),
Observable.of("c", "d"),
Observable.of("e", "f"),
Observable.of("g", "h"))
.merge()
.subscribeNext { (str) -> Void in
print(str)
}
.dispose()
结果:
a
b
c
d
e
f
g
h
merge方法看起来特别简单。
2、startWith
startWith方法可以说是定制开始位置的,是一种比较特殊的merge方法。
public func startWith(elements: E ...)
-> Observable<E> {
return StartWith(source: self.asObservable(), elements: elements)
}
startWith方法其实就是指定一个特殊的开头,
例子:
Observable.of(
Observable.of("a", "b"),
Observable.of("c", "d"),
Observable.of("e", "f"),
Observable.of("g", "h"))
.merge()
.startWith("x")
.subscribeNext { (str) -> Void in
print(str)
}
.dispose()
结果:
x
a
b
c
d
e
f
g
h
3、switchLatest
RxSwift中switchLatest相当与其他语言的switch方法,从图中可以很明显的看出来,第一个序列的最后一个元素被去掉了,没有执行OnNext方法。
public func switchLatest() -> Observable<E.E> {
return Switch(source: asObservable())
}
例子:
let var1 = Variable(0)
let var2 = Variable(200)
// var3 is like an Observable<Observable<Int>>
let var3 = Variable(var1.asObservable())
let d = var3
.asObservable()
.switchLatest()
.subscribeNext { (str) -> Void in
print(str)
}
var1.value = 1
var1.value = 2
var1.value = 3
var1.value = 4
var3.value = var2.asObservable()
var2.value = 201
var1.value = 5
var1.value = 6
var1.value = 7
结果:
0
1
2
3
4
200
201
Variable是一个类,它里面包含了一个Observable对象(BehaviorSubject),另外Variable中还是实现了asObservable方法,而这个方法返回的就是里面的Observable对象。Variable源码很简单,这里不做特别的介绍。至于var1.value=5\6\7没有执行,这个正是switchLatest的作用,当var1的作用完成后,切换到var2的Observable。那么var1后续变化,是不会通知到var3的。
4、combineLatest
combineLatest和其他方法一样都是扩展自ObservableType协议,
public func combineLatest<R>(resultSelector: [Generator.Element.E] throws -> R) -> Observable<R> {
return CombineLatestCollectionType(sources: self, resultSelector: resultSelector)
}
例子:
let var1 = Variable(0)
let var2 = Variable(200)
// var3 is like an Observable<Observable<Int>>
let var3 = Variable(var1.asObservable())
let d = var3
.asObservable()
.switchLatest()
.subscribeNext { (str) -> Void in
print(str)
}
var1.value = 1
var1.value = 2
var1.value = 3
var1.value = 4
var3.value = var2.asObservable()
var2.value = 201
var1.value = 5
var1.value = 6
var1.value = 7
Observable.combineLatest(var1.asObservable(), var2.asObservable()) { (as1, as2) -> Int in
return as1 + as2
}
.subscribeNext { (mon) -> Void in
print(mon)
}
.dispose()
输出结果:
208
简单的分析一下,就可以看出来,var2.value = 201; var1.value = 7,最后就是208;combineLatest是将多个Observable方法按照一定意愿组合起来。它提供了开发者组合的方法,自己实现就行了。
5、zip
zip和combineLatest差不多,可以将多个Observable合并起来处理,上面的例子同样可以用zip来实现,具体看例子,下面是zip方法的源码
public static func zip<O1: ObservableType, O2: ObservableType>
(source1: O1, _ source2: O2, resultSelector: (O1.E, O2.E) throws -> E)
-> Observable<E> {
return Zip2(
source1: source1.asObservable(), source2: source2.asObservable(),
resultSelector: resultSelector
)
}
例子:
Observable.zip(var1.asObservable(), var2.asObservable()) { (s1, s2) -> Int in
return s1+s2
}
.subscribeNext { (mon) -> Void in
print(mon)
}
.dispose()
当所有的工作都做完之后,我们要的观察着对象进行发布,那么这个时候就要用到Connect,它是链接观察着对象和被观察着之间的一个链接器。它主要由以下方法:
1、multicast/publish
multicast和publish方法一样,它们都是通过发布/多播将Observable发出去,当然发出去必须要有一个连接(connect)的过程。只有链接的对象才会收到publish/multicast的通知。下面是源码:
public func multicast<S: SubjectType where S.SubjectObserverType.E == E>(subject: S)
-> ConnectableObservable<S.E> {
return ConnectableObservableAdapter(source: self.asObservable(), subject: subject)
}
public func publish() -> ConnectableObservable<E> {
return self.multicast(PublishSubject())
}
例子:
let subject1 = PublishSubject<Int>()
_ = subject1
.subscribe {
print("Subject \($0)")
}
let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.multicast(subject1)
_ = int1
.subscribe {
print("first subscription \($0)")
}
int1.connect()
这个例子通过使用一个interval方法一直放送,然后通过multicast将subject1通知(多播)出去,int1.subscribe来接受subject1的变化。
输出的结果:
Subject Next(0)
first subscription Next(0)
Subject Next(1)
first subscription Next(1)
Subject Next(2)
first subscription Next(2)
Subject Next(3)
first subscription Next(3)
Subject Next(4)
first subscription Next(4)
当然publish也有类似的功能。
let subject1 = PublishSubject<Int>()
_ = subject1
.subscribe {
print("Subject \($0)")
}
let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.publish()
_ = int1
.subscribe {
print("first subscription \($0)")
}
int1.connect()
输出结果:
first subscription Next(0)
first subscription Next(1)
first subscription Next(2)
first subscription Next(3)
first subscription Next(4)
first subscription Next(5)
first subscription Next(6)
first subscription Next(7)
first subscription Next(8)
first subscription Next(9)
first subscription Next(10)
publish方法只是将发布出去的结果的变化告知,原变化没有告知出来,当然这个事publish和multicast一点小区别。
2、refCount
refCount是结合了publish方法使用的,当Observable发布出去,通过一个引用计数(refCount)方法来记录,其实refCount就是相当于connect方法。refCount源码如下,
public func refCount() -> Observable<E> {
return RefCount(source: self)
}
例子:
let int1 = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.publish()
.refCount()
.subscribeNext({ (ss) -> Void in
print(ss)
})
输出结果:
0
1
2
3
4
...
3、replay/shareReplay
其实replay和multicast是一样,在其源码中其实也是调用multicast方法。起源码如下:
public func replay(bufferSize: Int)
-> ConnectableObservable<E> {
return self.multicast(ReplaySubject.create(bufferSize: bufferSize))
}
public func shareReplay(bufferSize: Int)
-> Observable<E> {
if bufferSize == 1 {
return ShareReplay1(source: self.asObservable())
}
else {
return self.replay(bufferSize).refCount()
}
}
只不过,之前multicast传入的PublishSubject,这里是ReplaySubject,两者其实区别不大。
ReplaySubject中调用了ReplayMany,ReplayMany有一个事件队列来轮循事件。ReplaySubject和ReplayMany比较简单,这里不再叙述。
例子:
let int2 = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.replay(1)
_ = int2.subscribeNext({ (ss) -> Void in
print(ss)
})
int2.connect()
_ = int2.subscribeNext({ (ss) -> Void in
print("a..\(ss)")
})
输出结果:
0
a..0
1
a..1
2
a..2
3
a..3
4
a..4
5
a..5
其中shareReplay和replay是一回事,从源码中酒可以看出来,shareReplay是replay和refCount的组合,这里不再叙述。
至此为止,关于Swift的一些基本用法和基本的概念都讲述完毕,当然还有一些相关的扩展,但是这个都和上面讲述原理是一样的,大家可以参考源码理解。
从本节开始讲叙述与RxSwift配套使用的RxCocoa,RxCocoa主要是针对上层Ui做扩展,这些扩展主要是将上面所叙述的东西结合Ui控件使用。将RxSwift和RxCocoa结合起来使用对ios开发可以节省很大开发的时间,RxCocoa扩展性和RxSwift一样灵活,可以针对不同的业务进行不同的扩展,很方便开发者使用。由于本人对于ios sdk不是很了解。所有这里不具体讨论
整篇博客写下来字数达到2w多,可以当作一个小论文了,作为一个swift新手,我花了10天左右的时间熟悉swift基本语法,为了能够更加熟悉swift,于是选择研究RxSwift,已达到更加熟练掌握swift的目的,整篇博客下下来花费时间就长达一个多月,很多swift的基本东西看完之后就忘记,又不得不重新看起语法来。不过总算完成了,当然研究完RxSwift,我更加可以熟练的swift进行ios开发。之后我会继续研究ReactiveX其他语言。期待越来越好吧。
标签:
原文地址:http://blog.csdn.net/xwl198937/article/details/50823558