• 热&冷的 Observables
    • 由冷及热 - 凯蒂·佩里模式
    • 暖的 Observables
    • 天生的热 observables
  • 共享

    热&冷的 Observables

    Observable 有冷热两种类型。我们先来看看什么是冷的 observable 。如果是冷的 observable 的话,那么两个订阅者得到值是两份完全相同的副本,示例如下:

    1. // 冷的 observable 示例
    2. let stream$ = Rx.Observable.of(1,2,3);
    3. //订阅者 1: 1,2,3
    4. stream.subscribe(
    5. data => console.log(data),
    6. err => console.error(err),
    7. () => console.log('completed')
    8. )
    9. //订阅者 2: 1,2,3
    10. stream.subscribe(
    11. data => console.log(data),
    12. err => console.error(err),
    13. () => console.log('completed')
    14. )

    如果是热的 observable 的话,订阅者只能收到当它开始订阅后的值,这很像是足球比赛的实况直播,如果你在开场5分钟后才开始观看,你会错失开场前5分钟的一切,从观看的这一刻起你才开始接收数据:

    1. let liveStreaming$ = Rx.Observable.interval(1000).take(5);
    2. liveStreaming$.subscribe(
    3. data => console.log('subscriber from first minute')
    4. err => console.log(err),
    5. () => console.log('completed')
    6. )
    7. setTimeout(() => {
    8. liveStreaming$.subscribe(
    9. data => console.log('subscriber from 2nd minute')
    10. err => console.log(err),
    11. () => console.log('completed')
    12. )
    13. },2000)

    由冷及热 - 凯蒂·佩里模式

    上面的示例其实并不是真正的热的 observable,事实上两个订阅者接收到的值都是0,1,2,3,4。因为这是一场足球比赛的实况直播,所以这样的结果并不是我们想要的,那么如何来修复呢?

    需要两个部件来将冷的 observable 转变成热的, publish()connect()

    1. let publisher$ = Rx.Observable
    2. .interval(1000)
    3. .take(5)
    4. .publish();
    5. publisher$.subscribe(
    6. data => console.log('subscriber from first minute',data),
    7. err => console.log(err),
    8. () => console.log('completed')
    9. )
    10. setTimeout(() => {
    11. publisher$.subscribe(
    12. data => console.log('subscriber from 2nd minute', data),
    13. err => console.log(err),
    14. () => console.log('completed')
    15. )
    16. }, 3000)
    17. publisher$.connect();

    在这个案例中,我们看到第一个订阅者输出的是0,1,2,3,4,而第二个输出的是3,4。很明显订阅的时间点是很重要的。

    暖的 Observables

    这是 observalbes 的另外一种类型,它的表现很像热的 observable ,但它在某种程度上是惰性的。我想表达的是从本质上来说,在有订阅发生之前它们不会发出任何值。让我们来比较一下热的和暖的 observable

    热的 observable

    1. let stream$ = Rx.Observable
    2. .interval(1000)
    3. .take(4)
    4. .publish();
    5. stream$.connect();
    6. setTimeout(() => {
    7. stream$.subscribe(data => console.log(data))
    8. }, 2000);

    这里我们可以看到热的 observable 会丢失第一个发出的值,因为订阅是延迟发生的。

    和暖的 observable 进行下对比

    暖的 observable

    1. let obs = Rx.Observable.interval(1000).take(3).publish().refCount();
    2. setTimeout(() => {
    3. obs.subscribe(data => console.log('sub1', data));
    4. },1100)
    5. setTimeout(() => {
    6. obs.subscribe(data => console.log('sub2', data));
    7. },2100)

    refCount() 操作符确保 observable 变成暖的,也就是不会发出值直到 sub1 订阅了流。另一方面 sub2 是后加入的,也就是说订阅接收的是当前的值,而无法接收订阅之前的值。

    所以这个输出是:

    1. sub1 : 0
    2. sub1 : 1
    3. sub2 : 1
    4. sub1 : 2
    5. sub2 : 2

    如上面显示的,第一个订阅者是从0开始。如果它是热的话,它会从一个更大的数字开始,因为它是后加入的。当第二个订阅者达到时,它得到的第一个数字不是0而是1,这也说明了它确实变热了。

    天生的热 observables

    通常来说,如果 observable 的值被立即发出而不需要订阅者的话,那么就认为它是热的,一个最常见的例子就是 mousemove 。其它大多数热的 observables 都是通过使用 publish()connect() ,或者使用 share() 操作符将冷的 observables 转变成热的结果。

    共享

    共享意味着要使用一个十分有用,叫做 share() 的操作符。想象一下你有这样一个普通的冷 observable:

    1. let stream$ = Rx.Observable.create((observer) => {
    2. observer.next( 1 );
    3. observer.next( 2 );
    4. observer.next( 3 );
    5. observer.complete();
    6. }).share()
    7. stream$.subscribe(
    8. (data) => console.log('subscriber 1', data),
    9. err => console.error(err),
    10. () => console.log('completed')
    11. );
    12. stream$.subscribe(
    13. (data) => console.log('subscriber 2', data),
    14. err => console.error(err),
    15. () => console.log('completed')
    16. );

    如果在 observer.next(1) 打个断点,你会注意到它执行了两次,每个订阅者一次。这个行为是我们对冷的 observable 的期望。共享操作符用了一种不同的方式将其转换成热的 observable,事实上,它不仅在正确的条件下转变成热的 observable,而且在某些条件下可以回退成冷的 observable 。那么这些条件是?

    1) 创建热的 Observable: 当有新的订阅时 Observable 还未完成并且订阅者数量大于0

    2) 创建冷的 Observable 在新的订阅之前订阅者的数量已经变成了0,也就是说,一个或多个订阅存在过一段时间,但是在新的订阅发生前已经取消了订阅。

    3) 创建冷的 Observable 当新的订阅发生之前 Observable 已经完成

    结果就是一个活动的 Observable 要继续产生值至少要有一个存在的订阅者。我们可以看到情况1)中的 Observable 在第有两个订阅者之前是休眠的,当订阅发生时它会立即转变成热的从而开始共享数据。