• 错误处理
    • 重试 - 现在怎么样?
      • retry
      • retryWhen
        • retryWhen 和 delay 一起使用没有次数限制
    • 转换 - 这个没什么好看的
      • 修补它
      • 多个流呢?
        • 适者生存

    错误处理

    有两种主要的方法来处理流中的错误。你可以重试流并保证流最终会正常运行,或者处理错误并进行转换。

    重试 - 现在怎么样?

    当你认为错误是由于某些原因是暂时导致的,那么这种方法是适用的。通常不稳定的网络是个很好的例子。当网络不稳定时端点可能会在你多次尝试后才能回应。要点是你的首次尝试可能失败,但重试x次并且在两次尝试之间有一定的时间间隔,最终端点会回应。

    retry

    retry() 操作符可以让我们重试整个流,只接收一个参数,参数的值是要重试的次数,函数签名如下:

    1. retry([times])

    重要的是要注意当错误回调被调用的话, retry() 操作符会有延迟。下面代码中的错误回调会立即被调用:

    1. let stream$ = Rx.Observable.of(1,2,3)
    2. .map(value => {
    3. if(value > 2) { throw 'error' }
    4. });
    5. stream$.subscribe(
    6. data => console.log(data),
    7. err => console.log(err)
    8. )

    这个流很快的就死了,错误回调被调用,这个时候 retry() 操作符登场。像下面这样把它附加上即可:

    1. let stream$ = Rx.Observable.of(1,2,3)
    2. .map(value => {
    3. if(value > 2) { throw 'error' }
    4. })
    5. .retry(5)

    这将运行值序列5次,最后放弃并进入错误回调。然而在这个案例中,由于编写代码的方式,它只会生成5次1,2。所以我们的代码并没有真正利用操作符的最大潜力。你可能想要的是能够在每次尝试之间改变一些东西。想象下你的 observable 看起来像这样:

    1. let urlsToHit$ = Rx.Observable.of(url, url2, url3);

    在这一点上,它清楚地表明,在你的第一次尝试中,端点可能回应的不好,或者根本就没有回应,所以重试x次是很有用的。

    然而在调用 ajax 的情况下,并想象一下我们的业务场景中网络不稳定,那么立即重试是没有意义的,所以我们需要再找到一个更好的操作符,那就是 retryWhen()

    retryWhen

    retryWhen() 操作符让我们有机会对流进行操作并恰当地处理。

    1. retryWhen( stream => {
    2. // 希望能在更好的条件下返回
    3. })

    现在我们来写段简单的代码:

    1. let values$ = Rx.Observable
    2. .of( 1,2,3,4 )
    3. .map(val => {
    4. if(val === 2) { throw 'err'; }
    5. else return val;
    6. })
    7. .retryWhen( stream => {
    8. return stream;
    9. } );
    10. values$.subscribe(
    11. data => console.log('Retry when - data',data),
    12. err => console.error('Retry when - Err',err)
    13. )

    这样写的话会一直返回 1,直到我们用完内存为止,由于缺少结束条件,算法总是会在值2上崩溃,并将永远重试流。我们需要做的就是以某种方式告知错误已经修复。如果流尝试点击网址而不是发出数字,响应端点将会被压垮,所以在这种情况下,我们必须写这样的东西:

    1. let values$ = Rx.Observable.interval(1000).take(5);
    2. let errorFixed = false;
    3. values$
    4. .map((val) => {
    5. if(errorFixed) { return val; }
    6. else if( val > 0 && val % 2 === 0) {
    7. errorFixed = true;
    8. throw { error : 'error' };
    9. } else {
    10. return val;
    11. }
    12. })
    13. .retryWhen((err) => {
    14. console.log('retrying the entire sequence');
    15. return err.delay(200);
    16. })
    17. .subscribe((val) => { console.log('value',val) })
    18. // 0 1 '等待200毫秒' retrying the whole sequence 0 1 2 3 4

    然而,这与我们用 retry() 运算符所做的很多类似,上面的代码只会重试一次。使用 retryWhen() 真正的好处是可以改变操作符中返回的流,也就是这里调用的 delay() 操作符,像这样:

    1. .retryWhen((err) => {
    2. console.log('retrying the entire sequence');
    3. return err.delay(200)
    4. })

    这会确保在流重试前有200毫秒的延迟,如果是在 ajax 场景下,可以确保端点有足够的时间重整旗鼓,然后开始响应。

    陷阱

    retryWhen() 中使用 delay() 操作符来确保重试晚一点发生,在这个案例中可以给网络一个恢复的机会。

    retryWhen 和 delay 一起使用没有次数限制

    到目前为止,当我们想要重试整个流x次时使用的是 retry() 操作符,当我们想要在重试之间有一些延迟时间时使用的是 retryWhen() 操作符,但是如果我们两者都想要,可以做到吗?可以的。我们需要考虑一下要以某种方式记住到目前为止我们的尝试次数。引入一个外部变量用来保持这个数量是非常诱人的,但那不是函数式做事的方式,记住副作用是被禁止的。那么我们该如何解决呢?有一个叫做 scan() 的操作符,它允许我们累积每次迭代的值。所以如果在 retryWhen() 中使用 scan 的话,我们就可以追踪尝试的次数:

    1. let ATTEMPT_COUNT = 3;
    2. let DELAY = 1000;
    3. let delayWithTimes$ = Rx.Observable.of(1,2,3)
    4. .map( val => {
    5. if(val === 2) throw 'err'
    6. else return val;
    7. })
    8. .retryWhen(e => e.scan((errorCount, err) => {
    9. if (errorCount >= ATTEMPT_COUNT) {
    10. throw err;
    11. }
    12. return errorCount + 1;
    13. }, 0).delay(DELAY));
    14. delayWithTimes$.subscribe(
    15. val => console.log('delay and times - val',val),
    16. err => console.error('delay and times - err',err)
    17. )

    转换 - 这个没什么好看的

    这个方法是当出现错误时你选择将错误重制成一个有效的 Observable 。

    所以我们可以通过创建一个 Observable 来体现这一点,这个 Observable 的使命就是报错

    1. let error$ = Rx.Observable.throw('crash');
    2. error$.subscribe(
    3. data => console.log( data ),
    4. err => console.log( err ),
    5. () => console.log('complete')
    6. )

    这段代码只会执行错误回调而不会执行完成回调。

    修补它

    我们可以通过引入 catch() 操作符来进行修补。它是这样使用的:

    1. let errorPatched$ = error$.catch(err => { return Rx.Observable.of('Patched' + err) });
    2. errorPatched$.subscribe((data) => console.log(data) );

    如你所见,使用 .catch() 进行修补并返回一个新的 Observable 修复 流。问题是这是否是你想要的。流确实存活下来最终完成了,它可以发出崩溃之后发生的任何值。

    如果这不是你想要的,那么上面的重试方法可能会更适合你,决定权在你手中。

    多个流呢?

    你没想到会这么容易吧?当你编写 RxJS 代码时,通常会处理多个流,如果你知道在哪放置 catch() 操作符的话,那么使用 catch() 的方法是很棒的。

    1. let badStream$ = Rx.Observable.throw('crash');
    2. let goodStream$ = Rx.Observable.of(1,2,3,);
    3. let merged$ = Rx.Observable.merge(
    4. badStream$,
    5. goodStream$
    6. );
    7. merged$.subscribe(
    8. data => console.log(data),
    9. err => console.error(err),
    10. () => console.log('merge completed')
    11. )

    猜猜发生了什么?1)错误和值都发出了,流也完成了 2)错误和值都发出了 3)只发出了错误

    遗憾的是发生的是 3)。这意味着我们几乎没有处理错误。

    修复 - 所以我们需要修复错误。我们使用 catch() 操作符进行修复。问题在哪呢?

    来试试这个?

    1. let mergedPatched$ = Rx.Observable.merge(
    2. badStream$,
    3. goodStream$
    4. ).catch(err => Rx.Observable.of(err));
    5. mergedPatched$.subscribe(
    6. data => console.log(data),
    7. err => console.error(err),
    8. () => console.log('patchedMerged completed')
    9. )

    在这个案例中,得到结果的是 crashpatchedMerged completed 。所以流是完成了的,但我们还是没有得到 goodStream$ 的值。所以这是一个更好的解决方法,但还不够好。

    更好的修复 - 在 merge() 后面添加 catch() 操作符可以确保流完成,但是还不够好。我们来尝试下在 merge 之前进行 catch 操作。

    1. let preMergedPatched$ = Rx.Observable.merge(
    2. badStream$.catch(err => Rx.Observable.of(err)),
    3. goodStream$
    4. ).catch(err => Rx.Observable.of(err));
    5. preMergedPatched$.subscribe(
    6. data => console.log(data),
    7. err => console.error(err),
    8. () => console.log('pre patched merge completed')
    9. )

    瞧,我们得到了值,我们捕获了错误并将错误作为一个新的 Observable 发出,并且流也完成了。

    陷阱 catch() 所放的位置很重要。

    适者生存

    还有另外一种情况可能会很有意思。上面的场景假设你希望发出所有的,错误信息、值、所有的一切。

    如果你关心的不是这些呢,你只关心流的值怎么办?我们来假设一下,有一个叫做 onErrorResumeNext() 的操作符

    1. let secondBadStream$ = Rx.Observable.throw('bam');
    2. let gloriaGaynorStream$ = Rx.Observable.of('I will survive');
    3. let emitSurviving = Rx.Observable.onErrorResumeNext(
    4. badStream$,
    5. secondBadStream$,
    6. gloriaGaynorStream$
    7. );
    8. emitSurviving.subscribe(
    9. data => console.log(data),
    10. err => console.error(err),
    11. () => console.log('Survival of the fittest, completed')
    12. )

    输出的结果是 ‘I will survive’ 和 ‘Survival of the fittest, completed’ 。