码迷,mamicode.com
首页 > 其他好文 > 详细

ReactiveCocoa的冷信号与热信号 探讨

时间:2016-07-09 19:17:58      阅读:220      评论:0      收藏:0      [点我收藏+]

标签:

背景

ReactiveCocoa(简称RAC)是最初由GitHub团队开发的一套基于Cocoa的FRP框架。FRP即Functional Reactive Programming(函数式响应式编程),其优点是用随时间改变的函数表示用户输入,这样就不需要可变状态了。我们之前的文章“RACSignal的Subscription深入分析”里曾经详细讲解过RAC核心概念之一RACSignal的实现原理。在美团客户端中,我们大量使用了这个框架。冷信号与热信号的概念很容易混淆并造成一定的问题。鉴于这个问题具有一定普遍性,我将用一系列文章讲解RAC中冷信号与热信号的相关知识点,希望可以加深大家的理解。本文是系列文章的第一篇。

p.s. 以下代码和示例基于ReactiveCocoa v2.5

什么是冷信号与热信号

冷热信号的概念源于.NET框架Reactive Extensions(RX)中的Hot Observable和Cold Observable,两者的区别是:

  1. Hot Observable是主动的,尽管你并没有订阅事件,但是它会时刻推送,就像鼠标移动;而Cold Observable是被动的,只有当你订阅的时候,它才会发布消息。

  2. Hot Observable可以有多个订阅者,是一对多,集合可以与订阅者共享信息;而Cold Observable只能一对一,当有不同的订阅者,消息是重新完整发送。

这里面的Observables可以理解为RACSignal。为了加深理解,我们来看这样的几组代码:

    RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        [subscriber sendCompleted];
        return nil;
    }];
    NSLog(@"Signal was created.");
    [[RACScheduler mainThreadScheduler] afterDelay:0.1 schedule:^{
        [signal subscribeNext:^(id x) {
            NSLog(@"Subscriber 1 recveive: %@", x);
        }];
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:1 schedule:^{
        [signal subscribeNext:^(id x) {
            NSLog(@"Subscriber 2 recveive: %@", x);
        }];
    }];

以上简单地创建了一个信号,并且依次发送@1,@2,@3作为值。下面分别有两个订阅者在不同的时间段进行了订阅,运行的结果如下:

2015-08-11 18:33:21.681 RACDemos[6505:1125196] Signal was created.
2015-08-11 18:33:21.793 RACDemos[6505:1125196] Subscriber 1 recveive: 1
2015-08-11 18:33:21.793 RACDemos[6505:1125196] Subscriber 1 recveive: 2
2015-08-11 18:33:21.793 RACDemos[6505:1125196] Subscriber 1 recveive: 3
2015-08-11 18:33:22.683 RACDemos[6505:1125196] Subscriber 2 recveive: 1
2015-08-11 18:33:22.683 RACDemos[6505:1125196] Subscriber 2 recveive: 2
2015-08-11 18:33:22.683 RACDemos[6505:1125196] Subscriber 2 recveive: 3

我们可以看到,信号在18:33:21.681时被创建,18:33:21.793依次接到1、2、3三个值,而在18:33:22.683再依次接到1、2、3三个值。说明了变量名为signal的这个信号,在两个不同时间段的订阅过程中,分别完整地发送了所有的消息。

我们再对这段代码进行一个小的改动:

    RACMulticastConnection *connection = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [[RACScheduler mainThreadScheduler] afterDelay:1 schedule:^{
            [subscriber sendNext:@1];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
            [subscriber sendNext:@2];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
            [subscriber sendNext:@3];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{
            [subscriber sendCompleted];
        }];
        return nil;
    }] publish];
    [connection connect];
    RACSignal *signal = connection.signal;

    NSLog(@"Signal was created.");
    [[RACScheduler mainThreadScheduler] afterDelay:1.1 schedule:^{
        [signal subscribeNext:^(id x) {
            NSLog(@"Subscriber 1 recveive: %@", x);
        }];
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:2.1 schedule:^{
        [signal subscribeNext:^(id x) {
            NSLog(@"Subscriber 2 recveive: %@", x);
        }];
    }];

稍微有些复杂,我们来一一分析:

  • 创建了一个信号,在1秒、2秒、3秒分别发送1、2、3这三个值,4秒发送结束信号。
  • 对这个信号调用publish方法得到一个RACMulticastConnection。
  • 让connection进行连接操作。
  • 获得connection的信号。
  • 分别在1.1秒和2.1秒订阅获得的信号。

抛开RACMulticastConnection是个什么东东,我们先来看下结果:

2015-08-12 11:07:49.943 RACDemos[9418:1186344] Signal was created.
2015-08-12 11:07:52.088 RACDemos[9418:1186344] Subscriber 1 recveive: 2
2015-08-12 11:07:53.044 RACDemos[9418:1186344] Subscriber 1 recveive: 3
2015-08-12 11:07:53.044 RACDemos[9418:1186344] Subscriber 2 recveive: 3

首先告诉大家- [RACSignal publish]- [RACMulticastConnection connect]- [RACMulticastConnection signal]这几个操作生成了一个热信号。
我们再来关注下输出结果的一些细节:

  • 信号在11:07:49.943被创建
  • 11:07:52.088时订阅者1才收到2这个值,说明1这个值没有接收到,时间间隔是2秒多
  • 11:07:53.044时订阅者1和订阅者2同时收到3这个值,时间间隔是3秒多

参考一开始的Hot Observables的论述和两段小程序的输出结果,我们可以确定冷热信号的如下特点:

  1. 热信号是主动的,即使你没有订阅事件,它仍然会时刻推送。如第二个例子,信号在50秒被创建,51秒的时候1这个值就推送出来了,但是当时还没有订阅者。而冷信号是被动的,只有当你订阅的时候,它才会发送消息。如第一个例子。
  2. 热信号可以有多个订阅者,是一对多,信号可以与订阅者共享信息。如第二个例子,订阅者1和订阅者2是共享的,他们都能在同一时间接收到3这个值。而冷信号只能一对一,当有不同的订阅者,消息会从新完整发送。如第一个例子,我们可以观察到两个订阅者没有联系,都是基于各自的订阅时间开始接收消息的。

好的,至此我们知道了什么是冷信号与热信号,了解了它们的特点。下一篇文章我们来看看为什么要区分冷信号与热信号。

 

 

 

 

 

 

前一篇文章我们介绍了冷信号与热信号的概念,可能有同学会问了,为什么RAC要搞得如此复杂呢,只用一种信号不就行了么?要解释这个问题,需要绕一些圈子。

前面可能比较难懂,如果不能很好理解,请仔细阅读相关文档。

最前面提到了RAC是一套基于Cocoa的FRP框架,那就来说说FRP吧。FRP的全称是Functional Reactive Programming,中文译作函数式响应式编程,是RP(Reactive Programm,响应式编程)的FP(Functional Programming,函数式编程)实现。说起来很拗口。太多的细节不多讨论,我们着重关注下FRP的FP特征。

FP有个很重要的概念是和我们的主题相关的,那就是纯函数。

纯函数就是返回值只由输入值决定、而且没有可见副作用的函数或者表达式。这和数学中的函数是一样的,比如:

f(x) = 5x + 1

这个函数在调用的过程中除了返回值以外的没有任何对外界的影响,除了入参x以外也不受任何其他外界因素的影响。

那么副作用都有哪些呢?我来列举以下几个情况:

  • 函数的处理过程中,修改了外部的变量,例如全局变量。一个特殊点的例子,就是如果把OC的一个方法看做一个函数,所有的成员变量的赋值都是对外部变量的修改。是的,从FP的角度看OOP是充满副作用的。
  • 函数的处理过程中,触发了一些额外的动作,例如发送了一个全局的Notification,在console里面输出了一行信息,保存了文件,触发了网络,更新了屏幕等。
  • 函数的处理过程中,受到外部变量的影响,例如全局变量,方法里面用到的成员变量。注意block中捕获的外部变量也算副作用。
  • 函数的处理过程中,受到线程锁的影响算副作用。

由此我们可以看出,在目前的iOS编程中,我们是很难摆脱副作用的。甚至可以这么说,我们iOS编程的目的其实就是产生各种副作用。(基于用户触摸的外界因素,最终反馈到网络变化和屏幕变化上。)

接下来我们来分析副作用与冷热信号的关系。既然iOS编程中少不了副作用,那么RAC在实际的使用中也不可避免地要接触副作用。下面通过一个业务场景,来看看冷信号中副作用的坑:


    self.sessionManager = [[AFHTTPSessionManager alloc] initWithBaseURL:[NSURL URLWithString:@"http://api.xxxx.com"]];

    self.sessionManager.requestSerializer = [AFJSONRequestSerializer serializer];
    self.sessionManager.responseSerializer = [AFJSONResponseSerializer serializer];

    @weakify(self)
    RACSignal *fetchData = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        @strongify(self)
        NSURLSessionDataTask *task = [self.sessionManager GET:@"fetchData" parameters:@{@"someParameter": @"someValue"} success:^(NSURLSessionDataTask *task, id responseObject) {
            [subscriber sendNext:responseObject];
            [subscriber sendCompleted];
        } failure:^(NSURLSessionDataTask *task, NSError *error) {
            [subscriber sendError:error];
        }];
        return [RACDisposable disposableWithBlock:^{
            if (task.state != NSURLSessionTaskStateCompleted) {
                [task cancel];
            }
        }];
    }];

    RACSignal *title = [fetchData flattenMap:^RACSignal *(NSDictionary *value) {
        if ([value[@"title"] isKindOfClass:[NSString class]]) {
            return [RACSignal return:value[@"title"]];
        } else {
            return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]];
        }
    }];

    RACSignal *desc = [fetchData flattenMap:^RACSignal *(NSDictionary *value) {
        if ([value[@"desc"] isKindOfClass:[NSString class]]) {
            return [RACSignal return:value[@"desc"]];
        } else {
            return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]];
        }
    }];

    RACSignal *renderedDesc = [desc flattenMap:^RACStream *(NSString *value) {
        NSError *error = nil;
        RenderManager *renderManager = [[RenderManager alloc] init];
        NSAttributedString *rendered = [renderManager renderText:value error:&error];
        if (error) {
            return [RACSignal error:error];
        } else {
            return [RACSignal return:rendered];
        }
    }];

    RAC(self.someLablel, text) = [[title catchTo:[RACSignal return:@"Error"]]  startWith:@"Loading..."];
    RAC(self.originTextView, text) = [[desc catchTo:[RACSignal return:@"Error"]] startWith:@"Loading..."];
    RAC(self.renderedTextView, attributedText) = [[renderedDesc catchTo:[RACSignal return:[[NSAttributedString alloc] initWithString:@"Error"]]] startWith:[[NSAttributedString alloc] initWithString:@"Loading..."]];

    [[RACSignal merge:@[title, desc, renderedDesc]] subscribeError:^(NSError *error) {
        UIAlertView *alertView = [[UIAlertView alloc] initWithTitle:@"Error" message:error.domain delegate:nil cancelButtonTitle:@"OK" otherButtonTitles:nil];
        [alertView show];
    }];

不知道大家有没有被这么一大段的代码吓到,我想要表达的是,在真正的工程中,我们的业务逻辑是很复杂的,而一些坑就隐藏在如此看似复杂但是又很合理的代码之下。所以我尽量模拟了一些需求,使得代码看起来更丰富。下面我们还是来仔细看下这段代码的逻辑吧:

  1. 创建了一个AFHTTPSessionManager用来做网络接口的数据获取。
  2. 创建了一个名为fetchData的信号来通过网络获取信息。
  3. 创建一个名为title的信号从获取的data中取得title字段,如果没有该字段则反馈一个错误。
  4. 创建一个名为desc的信号从获取的data中取得desc字段,如果没有该字段则反馈一个错误。
  5. 针对desc这个信号做一个渲染,得到一个名为renderedDesc的新信号,该信号会在渲染失败的时候反馈一个错误。
  6. title信号所有的错误转换为字符串@"Error"并且在没有获取值之前以字符串@"Loading..."占位,之后与self.someLableltext属性绑定。
  7. desc信号所有的错误转换为字符串@"Error"并且在没有获取值之前以字符串@"Loading..."占位,之后与self.originTextViewtext属性绑定。
  8. renderedDesc信号所有的错误转换为属性字符串@"Error"并且在没有获取值之前以属性字符串@"Loading..."占位,之后与self.renderedTextViewtext属性绑定。
  9. 订阅titledescrenderedDesc这三个信号的任何错误,并且弹出UIAlertView

这些代码体现了RAC的一些优势,例如良好的错误处理和各种链式处理。很不错,对不对?但是很遗憾的告诉大家,这段代码其实有很严重的错误。

如果你去尝试运行这段代码,并且打开Charles查看,你会惊奇的发现,这个网络请求发送了6次。没错,是6次请求。我们也可以想象到类似的代码存在其他副作用的问题,重新刷新了6次屏幕,写入6次文件,发了6个全局通知。

下面来分析,为什么是6次网络请求呢?首先根据上面的知识,可以推断出名为fetchData信号是一个冷信号。那么这个信号在订阅的时候就会执行里面的过程。那这个信号是在什么时候被订阅了呢?仔细回看了代码,我们发现并没有订阅这个信号,只是调用这个信号的flattenMap产生了两个新的信号。

这里有一个很重要的概念,就是任何的信号转换即是对原有的信号进行订阅从而产生新的信号。由此我们可以写出flattenMap的伪代码如下:


- (instancetype)flattenMap_:(RACStream * (^)(id value))block {
{
    return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
       return [self subscribeNext:^(id x) {
           RACSignal *signal = (RACSignal *)block(x);
           [signal subscribeNext:^(id x) {
               [subscriber sendNext:x];
           } error:^(NSError *error) {
               [subscriber sendError:error];
           } completed:^{
               [subscriber sendCompleted];
           }];
       } error:^(NSError *error) {
           [subscriber sendError:error];
       } completed:^{
           [subscriber sendCompleted];
       }];
    }];
}

除了没有高度复用和缺少一些disposable的处理以外,上述代码大致可以比较直观地说明flattenMap的机制。观察会发现其实是在调用这个方法的时候,生成了一个新的信号,并在这个新信号的执行过程中对self进行的了订阅。还需要注意一个细节,就是这个返回信号在未来订阅的时候,才会间接的订阅self。后续的startWithcatchTo等都可以这样理解。

回到我们的问题,那就是说,在fetchDataflattenMap之后,它就会因为名为titledesc信号的订阅而订阅。而后续对desc也会进行flattenMap,得到了renderedDesc,因此未来renderedDesc被订阅的时候,fetchData也会被间接订阅。这就解释了,为什么后续我们用RAC宏进行绑定的时候,fetchData会订阅3次。由于fetchData是冷信号,所以3次订阅意味着它的过程被执行了3次,也就是有3次网络请求。

另外的3次订阅来自RACSignal类的merge方法。根据上述的描述,我们也可以猜测merge方法也一定是创建了一个新的信号,在这个信号被订阅的时候,把它包含的所有信号订阅。所以我们又得到了额外的3次网络请求。

由此可以看到,不熟悉冷热信号对业务造成的影响。我们可以想象对用户流量的影响,对服务器负载的影响,对统计的影响,如果这是一个点赞的接口,会不会造成多次点赞?后果不堪设想啊。而这些都可以通过将fetchData转换为热信号来解决。

接下来也许你会问,如果我的整个计算过程中都没有副作用,是否就不会有这个问题?答案是肯定的。试想下刚才那段代码如果没有网络请求,换成一些标准化的计算会怎样。虽然可以肯定它不会出现bug,但是不要忽视其中的运算也会执行多次。纯函数还有一个概念就是引用透明。在纯函数式语言(例如Haskell)中对此可以进行一定的优化,也就是说纯函数的调用在相同参数下的返回值第二次不需要计算,所以在纯函数式语言里面的FRP并没有冷信号的担忧。然而Objective-C语言中并没有这种纯函数优化,因此有大规模运算的冷信号对性能是有一定影响的。

从上文内容可以看出,如果我们想更好地掌握RAC这个框架,区分冷信号与热信号是十分重要的。接下来的系列第三篇文章,我会揭示冷信号与热信号的本质,帮助大家正确的理解冷信号与热信号。

 

 

 

 

 

第一篇文章中我们介绍了冷信号与热信号的概念,前一篇文章我们也讨论了为什么要区分冷信号与热信号,下面我会先为大家揭晓热信号的本质,再给出冷信号转换成热信号的方法。

揭示热信号的本质

ReactiveCocoa中,究竟什么才是热信号呢?冷信号是比较常见的,map一下就会得到一个冷信号。但在RAC中,好像并没有“hot signal”这个单独的说法。原来在RAC的世界中,所有的热信号都属于一个类——RACSubject。接下来我们来看看究竟它为什么这么“神奇”。

在RAC2.5文档的框架概述中,有着这样一段描述:

A subject, represented by the RACSubject class, is a signal that can be manually controlled.

Subjects can be thought of as the "mutable" variant of a signal, much like NSMutableArray is for NSArray. They are extremely useful for bridging non-RAC code into the world of signals.

For example, instead of handling application logic in block callbacks, the blocks can simply send events to a shared subject instead. The subject can then be returned as a RACSignal, hiding the implementation detail of the callbacks.

Some subjects offer additional behaviors as well. In particular, RACReplaySubject can be used to buffer events for future subscribers, like when a network request finishes before anything is ready to handle the result.

从这段描述中,我们可以发现Subject具备如下三个特点:

  1. Subject是“可变”的。
  2. Subject是非RAC到RAC的一个桥梁。
  3. Subject可以附加行为,例如RACReplaySubject具备为未来订阅者缓冲事件的能力。

从第三个特点来看,Subject具备为未来订阅者缓冲事件的能力,那也就说明它是自身是有状态的。根据上文的介绍,Subject是符合热信号的特点的。为了验证它,我们再来做个简单实验:

    RACSubject *subject = [RACSubject subject];
    RACSubject *replaySubject = [RACReplaySubject subject];

    [[RACScheduler mainThreadScheduler] afterDelay:0.1 schedule:^{
        // Subscriber 1
        [subject subscribeNext:^(id x) {
            NSLog(@"Subscriber 1 get a next value: %@ from subject", x);
        }];
        [replaySubject subscribeNext:^(id x) {
            NSLog(@"Subscriber 1 get a next value: %@ from replay subject", x);
        }];

        // Subscriber 2
        [subject subscribeNext:^(id x) {
            NSLog(@"Subscriber 2 get a next value: %@ from subject", x);
        }];
        [replaySubject subscribeNext:^(id x) {
            NSLog(@"Subscriber 2 get a next value: %@ from replay subject", x);
        }];
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:1 schedule:^{
        [subject sendNext:@"send package 1"];
        [replaySubject sendNext:@"send package 1"];
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:1.1 schedule:^{
        // Subscriber 3
        [subject subscribeNext:^(id x) {
            NSLog(@"Subscriber 3 get a next value: %@ from subject", x);
        }];
        [replaySubject subscribeNext:^(id x) {
            NSLog(@"Subscriber 3 get a next value: %@ from replay subject", x);
        }];

        // Subscriber 4
        [subject subscribeNext:^(id x) {
            NSLog(@"Subscriber 4 get a next value: %@ from subject", x);
        }];
        [replaySubject subscribeNext:^(id x) {
            NSLog(@"Subscriber 4 get a next value: %@ from replay subject", x);
        }];
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
        [subject sendNext:@"send package 2"];
        [replaySubject sendNext:@"send package 2"];
    }];

按照时间线来解读一下上述代码:

  1. 0s时创建subjectreplaySubject这两个subject。
  2. 0.1s时Subscriber 1分别订阅了subjectreplaySubject
  3. 0.1s时Subscriber 2也分别订阅了subjectreplaySubject
  4. 1s时分别向subjectreplaySubject发送了"send package 1"这个字符串作为
  5. 1.1s时Subscriber 3分别订阅了subjectreplaySubject
  6. 1.1s时Subscriber 4也分别订阅了subjectreplaySubject
  7. 2s时再分别向subjectreplaySubject发送了"send package 2"这个字符串作为

接下来看一下输出的结果:

2015-09-28 13:35:22.855 RACDemos[13646:1269269] Start
2015-09-28 13:35:23.856 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 1 from subject
2015-09-28 13:35:23.856 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 1 from subject
2015-09-28 13:35:23.857 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 1 from replay subject
2015-09-28 13:35:23.857 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 1 from replay subject
2015-09-28 13:35:24.059 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 1 from replay subject
2015-09-28 13:35:24.059 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 1 from replay subject
2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 2 from subject
2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 2 from subject
2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 2 from subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 2 from subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 2 from replay subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 2 from replay subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 2 from replay subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 2 from replay subject

结合结果可以分析出如下内容:

  1. 22.855s时,测试启动,subjectreplaySubject创建完毕。
  2. 23.856s时,距离启动大约1s后,Subscriber 1Subscriber 2同时subject接收到了"send package 1"这个值。
  3. 23.857s时,也是距离启动大约1s后,Subscriber 1Subscriber 2同时replaySubject接收到了"send package 1"这个值。
  4. 24.059s时,距离启动大约1.2s后,Subscriber 3Subscriber 4同时replaySubject接收到了"send package 1"这个值。注意Subscriber 3Subscriber 4并没有从subject接收"send package 1"这个值。
  5. 25.039s时,距离启动大约2.1s后,Subscriber 1Subscriber 2Subscriber 3Subscriber 4同时subject接收到了"send package 2"这个值。
  6. 25.040s时,距离启动大约2.1s后,Subscriber 1Subscriber 2Subscriber 3Subscriber 4同时replaySubject接收到了"send package 2"这个值。

只关注subject,根据时间线,我们可以得到下图:

技术分享

经过观察不难发现,4个订阅者实际上是共享subject的,一旦这个subject发送了值,当前的订阅者就会同时接收到。由于Subscriber 3Subscriber 4的订阅时间稍晚,所以错过了第一次值的发送。这与冷信号是截然不同的反应。冷信号的图类似下图:

技术分享

对比上面两张图,是不是可以发现,subject类似“直播”,错过了就不再处理。而signal类似“点播”,每次订阅都会从头开始。所以我们有理由认定subject天然就是热信号。

下面再来看看replaySubject,根据时间线,我们能得到另一张图:

技术分享

将图3与图1对比会发现,Subscriber 3Subscriber 4在订阅后马上接收到了“历史值”。对于Subscriber 3Subscriber 4来说,它们只关心“历史的值”而不关心“历史的时间线”,因为实际上12是间隔1s发送的,但是它们接收到的显然不是。举个生动的例子,就好像科幻电影里面主人公穿越时间线后会先把所有的回忆快速闪过再来到现实一样。(见《X战警:逆转未来》、《蝴蝶效应》)所以我们也有理由认定replaySubject天然也是热信号。

看到这里,我们终于揭开了热信号的面纱,结论就是:

  1. RACSubject及其子类是热信号
  2. RACSignal排除RACSubject类以外的是冷信号

如何将一个冷信号转化成热信号——广播

冷信号与热信号的本质区别在于是否保持状态,冷信号的多次订阅是不保持状态的,而热信号的多次订阅可以保持状态。所以一种将冷信号转换为热信号的方法就是,将冷信号订阅,订阅到的每一个时间通过RACSbuject发送出去,其他订阅者只订阅这个RACSubject

观察下面的代码:

    RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        NSLog(@"Cold signal be subscribed.");
        [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{
            [subscriber sendNext:@"A"];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
            [subscriber sendNext:@"B"];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{
            [subscriber sendCompleted];
        }];

        return nil;
    }];

    RACSubject *subject = [RACSubject subject];
    NSLog(@"Subject created.");

    [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
        [coldSignal subscribe:subject];
    }];

    [subject subscribeNext:^(id x) {
        NSLog(@"Subscriber 1 recieve value:%@.", x);
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{
        [subject subscribeNext:^(id x) {
            NSLog(@"Subscriber 2 recieve value:%@.", x);
        }];

执行顺序是这样的:

  1. 创建一个冷信号:coldSignal。该信号声明了“订阅后1.5秒发送‘A’,3秒发送‘B‘,5秒发送完成事件”。
  2. 创建一个RACSubject:subject
  3. 在2秒后使用这个subject订阅coldSignal
  4. 立即订阅这个subject
  5. 4秒后订阅这个subject

如果所料不错的话,通过订阅这个subject并不会引起coldSignal重复执行block的内容。我们来看下结果:

2015-09-28 19:36:45.703 RACDemos[14110:1556061] Subject created.
2015-09-28 19:36:47.705 RACDemos[14110:1556061] Cold signal be subscribed.
2015-09-28 19:36:49.331 RACDemos[14110:1556061] Subscriber 1 recieve value:A.
2015-09-28 19:36:50.999 RACDemos[14110:1556061] Subscriber 1 recieve value:B.
2015-09-28 19:36:50.999 RACDemos[14110:1556061] Subscriber 2 recieve value:B.

参考时间线,会得到下图:
技术分享

不难发现其中的几个重点:

  1. subject是从一开始就创建好的,等到2s后便开始订阅coldSignal
  2. Subscriber 1subject创建后就开始订阅的,但是第一个接收时间与subject接收coldSignal第一个值的时间是一样的。
  3. Subscriber 2subject创建4s后开始订阅的,所以只能接收到第二个值。

通过观察可以确定,subject就是coldSignal转化的热信号。所以使用RACSubject来将冷信号转化为热信号是可行的。

当然,使用这种RACSubject来订阅冷信号得到热信号的方式仍有一些小的瑕疵。例如subject的订阅者提前终止了订阅,而subject并不能终止对coldSignal的订阅。(RACDisposable是一个比较大的话题,我计划在其他的文章中详细阐述它,也希望感兴趣的同学自己来理解。)所以在RAC库中对于冷信号转化成热信号有如下标准的封装:

- (RACMulticastConnection *)publish;
- (RACMulticastConnection *)multicast:(RACSubject *)subject;
- (RACSignal *)replay;
- (RACSignal *)replayLast;
- (RACSignal *)replayLazily;

这5个方法中,最为重要的就是- (RACMulticastConnection *)multicast:(RACSubject *)subject;这个方法了,其他几个方法也是间接调用它的。我们来看看它的实现:

/// implementation RACSignal (Operations)
- (RACMulticastConnection *)multicast:(RACSubject *)subject {
    [subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
    RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
    return connection;
}

/// implementation RACMulticastConnection

- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
    NSCParameterAssert(source != nil);
    NSCParameterAssert(subject != nil);

    self = [super init];
    if (self == nil) return nil;

    _sourceSignal = source;
    _serialDisposable = [[RACSerialDisposable alloc] init];
    _signal = subject;

    return self;
}

#pragma mark Connecting

- (RACDisposable *)connect {
    BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);

    if (shouldConnect) {
        self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
    }

    return self.serialDisposable;
}

- (RACSignal *)autoconnect {
    __block volatile int32_t subscriberCount = 0;

    return [[RACSignal
        createSignal:^(id<RACSubscriber> subscriber) {
            OSAtomicIncrement32Barrier(&subscriberCount);

            RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
            RACDisposable *connectionDisposable = [self connect];

            return [RACDisposable disposableWithBlock:^{
                [subscriptionDisposable dispose];

                if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
                    [connectionDisposable dispose];
                }
            }];
        }]
        setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
}

虽然代码比较短但不是很好懂,大概来说明一下:

  1. RACSignal类的实例调用- (RACMulticastConnection *)multicast:(RACSubject *)subject时,以selfsubject作为构造参数创建一个RACMulticastConnection实例。
  2. RACMulticastConnection构造的时候,保存sourcesubject作为成员变量,创建一个RACSerialDisposable对象,用于取消订阅。
  3. RACMulticastConnection类的实例调用- (RACDisposable *)connect这个方法的时候,判断是否是第一次。如果是的话_signal这个成员变量来订阅sourceSignal之后返回self.serialDisposable;否则直接返回self.serialDisposable。这里面订阅sourceSignal是重点。
  4. RACMulticastConnectionsignal只读属性,就是一个热信号,订阅这个热信号就避免了各种副作用的问题。它会在- (RACDisposable *)connect第一次调用后,根据sourceSignal的订阅结果来传递事件。
  5. 想要确保第一次订阅就能成功订阅sourceSignal,可以使用- (RACSignal *)autoconnect这个方法,它保证了第一个订阅者触发sourceSignal的订阅,也保证了当返回的信号所有订阅者都关闭连接后sourceSignal被正确关闭连接。

由于RAC是一个线程安全的框架,所以好奇的同学可以了解下“OSAtomic*”这一系列的原子操作。抛开这些应该不难理解上述代码。

了解源码之后,这个方法的正确使用就清楚了,应该像这样:

    RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        NSLog(@"Cold signal be subscribed.");
        [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{
            [subscriber sendNext:@"A"];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
            [subscriber sendNext:@"B"];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{
            [subscriber sendCompleted];
        }];


        return nil;
    }];

    RACSubject *subject = [RACSubject subject];
    NSLog(@"Subject created.");

    RACMulticastConnection *multicastConnection = [coldSignal multicast:subject];
    RACSignal *hotSignal = multicastConnection.signal;

    [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
        [multicastConnection connect];
    }];

    [hotSignal subscribeNext:^(id x) {
        NSLog(@"Subscribe 1 recieve value:%@.", x);
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{
        [hotSignal subscribeNext:^(id x) {
            NSLog(@"Subscribe 2 recieve value:%@.", x);
        }];
    }];

或者这样:

    RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        NSLog(@"Cold signal be subscribed.");
        [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{
            [subscriber sendNext:@"A"];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
            [subscriber sendNext:@"B"];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{
            [subscriber sendCompleted];
        }];


        return nil;
    }];

    RACSubject *subject = [RACSubject subject];
    NSLog(@"Subject created.");

    RACMulticastConnection *multicastConnection = [coldSignal multicast:subject];
    RACSignal *hotSignal = multicastConnection.autoconnect;

    [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
        [hotSignal subscribeNext:^(id x) {
            NSLog(@"Subscribe 1 recieve value:%@.", x);
        }];
    }];


    [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{
        [hotSignal subscribeNext:^(id x) {
            NSLog(@"Subscribe 2 recieve value:%@.", x);
        }];
    }];

以上的两种写法和之前用Subject来传递的例子都可以得到相同的结果。

下面再来看看其他几个方法的实现:

/// implementation RACSignal (Operations)
- (RACMulticastConnection *)publish {
    RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
    RACMulticastConnection *connection = [self multicast:subject];
    return connection;
}

- (RACSignal *)replay {
    RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name];

    RACMulticastConnection *connection = [self multicast:subject];
    [connection connect];

    return connection.signal;
}

- (RACSignal *)replayLast {
    RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name];

    RACMulticastConnection *connection = [self multicast:subject];
    [connection connect];

    return connection.signal;
}

- (RACSignal *)replayLazily {
    RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]];
    return [[RACSignal
        defer:^{
            [connection connect];
            return connection.signal;
        }]
        setNameWithFormat:@"[%@] -replayLazily", self.name];
}

这几个方法的实现都相当简单,只是为了简化而封装,具体说明一下:

  1. - (RACMulticastConnection *)publish就是帮忙创建了RACSubject
  2. - (RACSignal *)replay就是用RACReplaySubject来作为subject,并立即执行connect操作,返回connection.signal。其作用是上面提到的replay功能,即后来的订阅者可以收到历史值。
  3. - (RACSignal *)replayLast就是用Capacity为1的RACReplaySubject来替换- (RACSignal *)replay的`subject。其作用是使后来订阅者只收到最后的历史值。
  4. - (RACSignal *)replayLazily- (RACSignal *)replay的区别就是replayLazily会在第一次订阅的时候才订阅sourceSignal

所以,其实本质仍然是

使用一个Subject来订阅原始信号,并让其他订阅者订阅这个Subject,这个Subject就是热信号。

现在再回过来看下之前系列文章第二篇中那个业务场景的例子,其实修改的方法很简单,就是在网络获取的fetchData这个信号后面,增加一个replayLazily变换,就不会出现网络请求重发6次的问题了。

修改后的代码如下,大家可以试试:


    self.sessionManager = [[AFHTTPSessionManager alloc] initWithBaseURL:[NSURL URLWithString:@"http://api.xxxx.com"]];

    self.sessionManager.requestSerializer = [AFJSONRequestSerializer serializer];
    self.sessionManager.responseSerializer = [AFJSONResponseSerializer serializer];

    @weakify(self)
    RACSignal *fetchData = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        @strongify(self)
        NSURLSessionDataTask *task = [self.sessionManager GET:@"fetchData" parameters:@{@"someParameter": @"someValue"} success:^(NSURLSessionDataTask *task, id responseObject) {
            [subscriber sendNext:responseObject];
            [subscriber sendCompleted];
        } failure:^(NSURLSessionDataTask *task, NSError *error) {
            [subscriber sendError:error];
        }];
        return [RACDisposable disposableWithBlock:^{
            if (task.state != NSURLSessionTaskStateCompleted) {
                [task cancel];
            }
        }];
    }] replayLazily];  // modify here!!

    RACSignal *title = [fetchData flattenMap:^RACSignal *(NSDictionary *value) {
        if ([value[@"title"] isKindOfClass:[NSString class]]) {
            return [RACSignal return:value[@"title"]];
        } else {
            return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]];
        }
    }];

    RACSignal *desc = [fetchData flattenMap:^RACSignal *(NSDictionary *value) {
        if ([value[@"desc"] isKindOfClass:[NSString class]]) {
            return [RACSignal return:value[@"desc"]];
        } else {
            return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]];
        }
    }];

    RACSignal *renderedDesc = [desc flattenMap:^RACStream *(NSString *value) {
        NSError *error = nil;
        RenderManager *renderManager = [[RenderManager alloc] init];
        NSAttributedString *rendered = [renderManager renderText:value error:&error];
        if (error) {
            return [RACSignal error:error];
        } else {
            return [RACSignal return:rendered];
        }
    }];

    RAC(self.someLablel, text) = [[title catchTo:[RACSignal return:@"Error"]]  startWith:@"Loading..."];
    RAC(self.originTextView, text) = [[desc catchTo:[RACSignal return:@"Error"]] startWith:@"Loading..."];
    RAC(self.renderedTextView, attributedText) = [[renderedDesc catchTo:[RACSignal return:[[NSAttributedString alloc] initWithString:@"Error"]]] startWith:[[NSAttributedString alloc] initWithString:@"Loading..."]];

    [[RACSignal merge:@[title, desc, renderedDesc]] subscribeError:^(NSError *error) {
        UIAlertView *alertView = [[UIAlertView alloc] initWithTitle:@"Error" message:error.domain delegate:nil cancelButtonTitle:@"OK" otherButtonTitles:nil];
        [alertView show];
    }];

当然,细心的同学会发现这样修改,仍然有许多计算上的浪费,例如将fetchData转换为title的block会执行多次,将fetchData转换为desc的block也会执行多次。但是由于这些block都是无副作用的,计算量并不大,可以忽略不计。如果计算量大的,也需要对中间的信号进行热信号的转换。不过请不要忽略冷热信号的转换本身也是有计算代价的。

好的,写到这里,我们终于揭开RAC中冷信号与热信号的全部面纱,也知道如何使用了。希望这个系列文章可以让大家更好地了解RAC,避免使用RAC遇到的误区。谢谢大家。

 

 

参考链接:

1.http://tech.meituan.com/talk-about-reactivecocoas-cold-signal-and-hot-signal-part-1.html

2.http://tech.meituan.com/talk-about-reactivecocoas-cold-signal-and-hot-signal-part-2.html

3.http://tech.meituan.com/talk-about-reactivecocoas-cold-signal-and-hot-signal-part-3.html

ReactiveCocoa的冷信号与热信号 探讨

标签:

原文地址:http://www.cnblogs.com/Jenaral/p/5656317.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!