码迷,mamicode.com
首页 > Web开发 > 详细

[RxJS] Reusable multicasting with Subject factories

时间:2016-10-26 20:03:47      阅读:236      评论:0      收藏:0      [点我收藏+]

标签:err   new   timeout   cut   each   lis   log   ror   set   

The way we use publish() (or multicast with an RxJS Subject) makes the shared Observable not reusable if the shared execution happens to complete or emit an error. In this lesson we will see how to use a simple Subject factory function in order to create a new Subject, one for each shared execution, whenever connect() is called.

 

var shared = Rx.Observable.interval(1000).take(3)
  .do(x => console.log(source  + x))
  .multicast(new Rx.Subject())
  .refCount();

The code above, after subject emit 0,1,2, three values, then it completes. It means if you want to subscribe the subject again, it won‘t emit anything because it is completed. 

 

If you want to reuse the ‘shared‘ subject even after subject complete, you need to use subject factories, which simply just a function return new Subject():

function subjectFactory() {
  return new Rx.Subject(); 
}

var shared = Rx.Observable.interval(1000).take(3)
  .do(x => console.log(source  + x))
  .multicast(subjectFactory)
  .refCount();

 

So now even you resubscribe after subject complete, it will emit you new value.

function subjectFactory() {
  return new Rx.Subject(); 
}

var shared = Rx.Observable.interval(1000).take(3)
  .do(x => console.log(source  + x))
  .multicast(subjectFactory)
  .refCount();

// subject: --0--1--2--3--4--5|
//                               A
// subject2:                     --0--1--2--3--4--5|

var observerA = {
  next: function (x) { console.log(A next  + x); },
  error: function (err) { console.log(A error  + err); },
  complete: function () { console.log(A done); },
};

var subA = shared.subscribe(observerA); // 0 => 1
console.log(subscribed A);

var observerB = {
  next: function (x) { console.log(B next  + x); },
  error: function (err) { console.log(B error  + err); },
  complete: function () { console.log(B done); },
};

var subB;
setTimeout(function () {
  subB = shared.subscribe(observerB);
  console.log(subscribed B);
}, 2000);

setTimeout(function () {
  subA.unsubscribe();
  console.log(unsubscribed A);
}, 3000);

setTimeout(function () {
  subB.unsubscribe();
  console.log(unsubscribed B);
}, 5000);

setTimeout(function () {
  subA = shared.subscribe(observerA); // 0 => 1 (connect)
  console.log(subscribed A);
}, 6000);

 

/**
"subscribed A"
"source 0"
"A next 0"
"source 1"
"A next 1"
"subscribed B"
"source 2"
"A next 2"
"B next 2"
"A done"
"B done"
"unsubscribed A"
"unsubscribed B"
"subscribed A"
"source 0"
"A next 0"
"source 1"
"A next 1"
"source 2"
"A next 2"
"A done"

*/

 

[RxJS] Reusable multicasting with Subject factories

标签:err   new   timeout   cut   each   lis   log   ror   set   

原文地址:http://www.cnblogs.com/Answer1215/p/6001403.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!