标签:
ReactiveCocoa是一个FRP的思想在Objective-C中的实现框架,目前在美团的项目中被广泛使用。对于ReactiveCocoa的基本用法,网上有很多相关的资料,本文不再讨论。RACSignal是ReactiveCocoa中一个非常重要的概念,而本文主要关注RACSignal的实现原理。在阅读之前,你需要基本掌握RACSignal的基本用法
本文主要包含2个部分,前半部分主要分析RACSignal的subscription过程,后半部分是对前半部分的深入,在subscription过程的基础上分析ReactiveCocoa中比较难理解的两个操作:multicast && replay。
PS:为了解释清楚,我们下面只讨论next,不讨论error以及completed,这二者与next类似。本文基于ReactiveCocoa 2.x版本。
我们先刨析RACSignal的subscription过程
#RACSignal的常见用法
-(RACSignal *)signInSignal {
// part 1:[RACSignal createSignal]来获得signal
return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[self.signInService
signInWithUsername:self.usernameTextField.text
password:self.passwordTextField.text
complete:^(BOOL success) {
// part 3: 进入didSubscribe,通过[subscriber sendNext:]来执行next block
[subscriber sendNext:@(success)];
[subscriber sendCompleted];
}];
return nil;
}];
}
// part 2 : [signal subscribeNext:]来获得subscriber,然后进行subscription
[[self signInSignal] subscribeNext:^(id x) {
NSLog(@"Sign in result: %@", x);
}];
#Subscription过程概括
RACSignal的Subscription过程概括起来可以分为三个步骤:
RACSignal.m中:
+ ( RACSignal *)createSignal:( RACDisposable * (^)( id < RACSubscriber > subscriber))didSubscribe {
return [ RACDynamicSignal createSignal :didSubscribe];
}
RACDynamicSignal.m中
+ ( RACSignal *)createSignal:( RACDisposable * (^)( id < RACSubscriber > subscriber))didSubscribe {
RACDynamicSignal *signal = [[ self alloc ] init ];
signal-> _didSubscribe = [didSubscribe copy ];
return [signal setNameWithFormat : @"+createSignal:" ];
}
[RACSignal createSignal]会调用子类RACDynamicSignal的createSignal来返回一个signal,并在signal中保存后面的 didSubscribe这个block
RACSignal.m中:
- ( RACDisposable *)subscribeNext:( void (^)( id x))nextBlock {
RACSubscriber *o = [ RACSubscriber subscriberWithNext :nextBlock error : NULL completed : NULL ];
return [ self subscribe :o];
}
RACSubscriber.m中:
+ ( instancetype )subscriberWithNext:( void (^)( id x))next error:( void (^)( NSError *error))error completed:( void (^)( void ))completed {
RACSubscriber *subscriber = [[ self alloc ] init ];
subscriber-> _next = [next copy ];
subscriber-> _error = [error copy ];
subscriber-> _completed = [completed copy ];
return subscriber;
}
RACDynamicSignal.m中:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
RACSubscriber.m中:
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
任何时候这个[subscriber sendNext:],就直接调用nextBlock
从上面的三个步骤,我们看出:
声明
了流中value到来时的处理方式自动处理
搞清楚了RAC的subscription过程,接着在此基础上我们讨论一个RACSignal中比较容易混淆的两个操作:multicast和replay。
RACSignal+Operation.h中
- (RACMulticastConnection *)publish;
- (RACMulticastConnection *)multicast:(RACSubject *)subject;
- (RACSignal *)replay;
- (RACSignal *)replayLast;
- (RACSignal *)replayLazily;
#multicast && replay的应用场景
"Side effects occur for each subscription by default, but there are certain situations where side effects should only occur once – for example, a network request typically should not be repeated when a new subscriber is added."
// 引用ReactiveCocoa源码的Documentation目录下的一个例子
// This signal starts a new request on each subscription.
RACSignal *networkRequest = [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
AFHTTPRequestOperation *operation = [client
HTTPRequestOperationWithRequest:request
success:^(AFHTTPRequestOperation *operation, id response) {
[subscriber sendNext:response];
[subscriber sendCompleted];
}
failure:^(AFHTTPRequestOperation *operation, NSError *error) {
[subscriber sendError:error];
}];
[client enqueueHTTPRequestOperation:operation];
return [RACDisposable disposableWithBlock:^{
[operation cancel];
}];
}];
// Starts a single request, no matter how many subscriptions `connection.signal`
// gets. This is equivalent to the -replay operator, or similar to
// +startEagerlyWithScheduler:block:.
RACMulticastConnection *connection = [networkRequest multicast:[RACReplaySubject subject]];
[connection connect];
[connection.signal subscribeNext:^(id response) {
NSLog(@"subscriber one: %@", response);
}];
[connection.signal subscribeNext:^(id response) {
NSLog(@"subscriber two: %@", response);
}];
replay是multicast的一个特殊case而已,而multicast的整个过程可以拆分成两个步骤,下面进行详细讨论
RACMulticastConnection.m中:
- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
NSCParameterAssert(source != nil);
NSCParameterAssert(subject != nil);
self = [super init];
if (self == nil) return nil;
_sourceSignal = source;
_serialDisposable = [[RACSerialDisposable alloc] init];
_signal = subject;
return self;
}
RACMulticastConnection.m中:
- (RACDisposable *)connect {
BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
if (shouldConnect) {
self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
}
return self.serialDisposable;
}
在进行multicast的步骤二之前,需要介绍一下RACSubject以及RACReplaySubject
---------------------恼人的分隔线 start------------------
"A subject can be thought of as a signal that you can manually control by sending next, completed, and error."
RACSubject的一个用法如下:
RACSubject *letters = [RACSubject subject];
// Outputs: A B
[letters subscribeNext:^(id x) {
NSLog(@"%@ ", x);
}];
[letters sendNext:@"A"];
[letters sendNext:@"B"];
接下来分析RACSubject的原理
RACSubject.m中:
- (id)init {
self = [super init];
if (self == nil) return nil;
_disposable = [RACCompoundDisposable compoundDisposable];
_subscribers = [[NSMutableArray alloc] initWithCapacity:1];
return self;
}
RACSubject.m中:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
return [RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}];
}
RACSubject.m中:
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value];
}];
}
"A replay subject saves the values it is sent (up to its defined capacity) and resends those to new subscribers.",可以看出,replaySubject是可以对它send next(error,completed)的东西进行buffer的。
RACReplaySubject是继承自RACSubject的,它的内部的实现例如subscribe:、sendNext:的实现也会调用super的实现
RACReplaySubject.m中:
- (instancetype)initWithCapacity:(NSUInteger)capacity {
self = [super init];
if (self == nil) return nil;
_capacity = capacity;
_valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
return self;
}
RACReplaySubject.m中:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
return compoundDisposable;
}
RACReplaySubject.m中:
- (void)sendNext:(id)value {
@synchronized (self) {
[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
[super sendNext:value];
if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
}
}
}
从sendNext:可以看出,RACReplaySubject对象会buffer每次sendNext的value,然后会调用super,对subscribers中的每个subscriber,调用sendNext。buffer的数量是根据self.capacity来决定的
---------------------恼人的分隔线 end------------------
介绍完了RACReplaySubject之后,我们继续进行multicast的part 2部分。
在上面的例子中,我们对connection.signal进行了两次subscription,结合上面的RACReplaySubject的subscription的subscribe:,我们得到以下过程:
ReactiveCocoa github主页
ReactiveCocoa Documentation
ReactiveCocoa raywenderlich上的资料
转载自:http://tech.meituan.com/RACSignalSubscription.html
标签:
原文地址:http://www.cnblogs.com/Jenaral/p/5658882.html