• Observable 剖析
    • Unsubscribe

    Observable 剖析

    Observable 的 subscribe 方法签名如下:

    1. stream.subscribe(fnValue, fnError, fnComplete)

    下面所演示的是第一个参数 fnValue

    1. let stream$ = Rx.Observable.create((observer) => {
    2. observer.next(1)
    3. });
    4. stream$.subscribe((data) => {
    5. console.log('Data', data);
    6. })
    7. // 1

    当执行 observer.next(<value>) 时, fnValue 就会被调用。

    第二个回调函数 fnError 是错误回调,通过下面的代码来调用,例如 observer.error(<message>)

    1. let stream$ = Rx.Observable.create((observer) => {
    2. observer.error('error message');
    3. })
    4. stream$.subscribe(
    5. (data) => console.log('Data', data)),
    6. (error) => console.log('Error', error)

    最后是 fnComplete,当流完成时调用,并且不会再发出任何值。它是通过 observer.complete() 来触发的,像这样:

    1. let stream$ = Rx.Observable.create((observer) => {
    2. // 多次调用 observer.next(<value>)
    3. observer.complete();
    4. })

    Unsubscribe

    目前为止,我们创建的是一个不负责任的 Observable 。在这里不负责任是指它并没有清理它自身。那么我们来看下如何做到这点:

    1. let stream$ = new Rx.Observable.create((observer) => {
    2. let i = 0;
    3. let id = setInterval(() => {
    4. observer.next(i++);
    5. },1000)
    6. return function(){
    7. clearInterval( id );
    8. }
    9. })
    10. let subscription = stream$.subscribe((value) => {
    11. console.log('Value', value)
    12. });
    13. setTimeout(() => {
    14. subscription.unsubscribe() // 在这我们调用了清理函数
    15. }, 3000)

    所以你要确保

    • 定义一个清理函数
    • 通过调用 subscription.unsubscribe() 隐式的调用清理函数