标签:
RxJS(Reactive Extensions)是个Angular提供的第三方库,实现异步观察模式(asynchronous observable pattern)。
RxJS非常大,通常只要我们所需要的特性就好了。Angular在rxjs/Observable
模块中提供了简版的Observable
,但是它缺乏我们所需要的所有的操作,包括上面提到的map
方法。
我们在应用启动时引入所有RxJS操作:
import ‘rxjs/Rx‘;
首先我们观‘rxjs/Observable
‘中引入Observable方法
import {Observable} from ‘rxjs/Observable‘;
接下来,在我们的构造函数中,我们创建了一个新的可观察的。
public data:Observable<Array<number>>; this.data = new Observable(observer => { setTimeout(() => { observer.next(42); observer.next(43); observer.complete(); }, 2000); console.log(‘Started Observable sequence!‘); });
最后我们订阅该对象:
let subscription = this.data.subscribe( value => this.values.push(value), error => this.anyErrors = true, () => this.finished = true );
filter,map
import {Component} from ‘angular2/core‘ import { Observable } from ‘rxjs/Observable‘; import ‘rxjs/Rx‘; @Component({ template : ` ` }) export class MyrxComponent { public data:Observable; constructor(){ this.data = new Observable(observer=>{ var count = 0; setInterval(()=>{ observer.next(count++); },1000) }) this.data.filter(value => value<3) .subscribe( value=> console.log(value) ) } }
尽管http
API返回的是Observable<Response>
,但是我们可以把它转换成Promise
observable转换成promise只需要调用toPromise(success, fail)
。
this.http.get(‘./friends.json‘).toPromise() .then((res: Response) => { this.friendsAsPromise.friends = res.json().friends; });
在angular1.x中这是使用$ q.all完成的,但在新的http服务,这可以通过使用forkJoin操作者来完成。这个想法是十分相似,虽然在您列出的并行调用,并得到结果返回在数组中。
import {Observable} from ‘rxjs/Observable‘ Observable.forkJoin( this.http.get(‘./friends.json‘).map((res: Response) =>res.json()), this.http.get(‘./customer.json‘).map((res: Response) =>res.json()) ).subscribe(res => this.combined = {friends:res[0].friends, customer:res[1]});
Observales支持取消订阅。如果你在不经意间做一个HTTP请求,并希望取消响应的处理。
getCapitol(country){ if(this.pendingRequest){ this.pendingRequest.unsubscribe(); } this.pendingRequest = this.http.get(‘./country-info/‘ + country) .map((res: Response) => res.json()) .subscribe(res => this.capitol = res.capitol); }
unsubscribe()有效地消除了HTTP响应的处理。这使得以确保我们只兑现了最新的请求,并避免了请求的订单处理。
在现实中,我们希望不要每个字符输入都请求服务器,我们希望300毫秒以内的字符输入都被中断掉!
Subject用来创建流:
import { Subject } from ‘rxjs/Subject‘
声明一个流:
country = new Subject();
传递数据,主要数据改变,就触发Observable:
country.next(‘data‘)
定义请求方法:
this.country .switchMap((country) =>this.http.get(‘./country-info/‘ + country + ‘.json‘)) .map((res: Response) => res.json()) .subscribe(res => this.capitol = res.capitol);
import { Jsonp, Response } from ‘angular2/http‘; import { Observable } from ‘rxjs/Observable‘; import { Subject } from ‘rxjs/Subject‘ @Injectable() export class SearchService { queryStream = new Subject(); generateSuggestions(query: string) { this.queryStream.onNext(query) .debounce(500) .map( query => this.jsonp.request( `http://urlendpoint/${query}` ) .map( (res: Response) => res.json() ) .subscribe( results => console.log(results) ); ) } }
标签:
原文地址:http://www.cnblogs.com/bq-med/p/5390240.html