- 错误处理
- 重试 - 现在怎么样?
- retry
- retryWhen
- retryWhen 和 delay 一起使用没有次数限制
- 转换 - 这个没什么好看的
- 修补它
- 多个流呢?
- 适者生存
- 适者生存
- 重试 - 现在怎么样?
错误处理
有两种主要的方法来处理流中的错误。你可以重试流并保证流最终会正常运行,或者处理错误并进行转换。
重试 - 现在怎么样?
当你认为错误是由于某些原因是暂时导致的,那么这种方法是适用的。通常不稳定的网络是个很好的例子。当网络不稳定时端点可能会在你多次尝试后才能回应。要点是你的首次尝试可能失败,但重试x次并且在两次尝试之间有一定的时间间隔,最终端点会回应。
retry
retry()
操作符可以让我们重试整个流,只接收一个参数,参数的值是要重试的次数,函数签名如下:
retry([times])
重要的是要注意当错误回调被调用的话, retry()
操作符会有延迟。下面代码中的错误回调会立即被调用:
let stream$ = Rx.Observable.of(1,2,3)
.map(value => {
if(value > 2) { throw 'error' }
});
stream$.subscribe(
data => console.log(data),
err => console.log(err)
)
这个流很快的就死了,错误回调被调用,这个时候 retry()
操作符登场。像下面这样把它附加上即可:
let stream$ = Rx.Observable.of(1,2,3)
.map(value => {
if(value > 2) { throw 'error' }
})
.retry(5)
这将运行值序列5次,最后放弃并进入错误回调。然而在这个案例中,由于编写代码的方式,它只会生成5次1,2
。所以我们的代码并没有真正利用操作符的最大潜力。你可能想要的是能够在每次尝试之间改变一些东西。想象下你的 observable 看起来像这样:
let urlsToHit$ = Rx.Observable.of(url, url2, url3);
在这一点上,它清楚地表明,在你的第一次尝试中,端点可能回应的不好,或者根本就没有回应,所以重试x次是很有用的。
然而在调用 ajax 的情况下,并想象一下我们的业务场景中网络不稳定,那么立即重试是没有意义的,所以我们需要再找到一个更好的操作符,那就是 retryWhen()
retryWhen
retryWhen()
操作符让我们有机会对流进行操作并恰当地处理。
retryWhen( stream => {
// 希望能在更好的条件下返回
})
现在我们来写段简单的代码:
let values$ = Rx.Observable
.of( 1,2,3,4 )
.map(val => {
if(val === 2) { throw 'err'; }
else return val;
})
.retryWhen( stream => {
return stream;
} );
values$.subscribe(
data => console.log('Retry when - data',data),
err => console.error('Retry when - Err',err)
)
这样写的话会一直返回 1
,直到我们用完内存为止,由于缺少结束条件,算法总是会在值2
上崩溃,并将永远重试流。我们需要做的就是以某种方式告知错误已经修复。如果流尝试点击网址而不是发出数字,响应端点将会被压垮,所以在这种情况下,我们必须写这样的东西:
let values$ = Rx.Observable.interval(1000).take(5);
let errorFixed = false;
values$
.map((val) => {
if(errorFixed) { return val; }
else if( val > 0 && val % 2 === 0) {
errorFixed = true;
throw { error : 'error' };
} else {
return val;
}
})
.retryWhen((err) => {
console.log('retrying the entire sequence');
return err.delay(200);
})
.subscribe((val) => { console.log('value',val) })
// 0 1 '等待200毫秒' retrying the whole sequence 0 1 2 3 4
然而,这与我们用 retry()
运算符所做的很多类似,上面的代码只会重试一次。使用 retryWhen()
真正的好处是可以改变操作符中返回的流,也就是这里调用的 delay()
操作符,像这样:
.retryWhen((err) => {
console.log('retrying the entire sequence');
return err.delay(200)
})
这会确保在流重试前有200毫秒的延迟,如果是在 ajax 场景下,可以确保端点有足够的时间重整旗鼓,然后开始响应。
陷阱
在 retryWhen()
中使用 delay()
操作符来确保重试晚一点发生,在这个案例中可以给网络一个恢复的机会。
retryWhen 和 delay 一起使用没有次数限制
到目前为止,当我们想要重试整个流x次时使用的是 retry()
操作符,当我们想要在重试之间有一些延迟时间时使用的是 retryWhen()
操作符,但是如果我们两者都想要,可以做到吗?可以的。我们需要考虑一下要以某种方式记住到目前为止我们的尝试次数。引入一个外部变量用来保持这个数量是非常诱人的,但那不是函数式做事的方式,记住副作用是被禁止的。那么我们该如何解决呢?有一个叫做 scan()
的操作符,它允许我们累积每次迭代的值。所以如果在 retryWhen()
中使用 scan
的话,我们就可以追踪尝试的次数:
let ATTEMPT_COUNT = 3;
let DELAY = 1000;
let delayWithTimes$ = Rx.Observable.of(1,2,3)
.map( val => {
if(val === 2) throw 'err'
else return val;
})
.retryWhen(e => e.scan((errorCount, err) => {
if (errorCount >= ATTEMPT_COUNT) {
throw err;
}
return errorCount + 1;
}, 0).delay(DELAY));
delayWithTimes$.subscribe(
val => console.log('delay and times - val',val),
err => console.error('delay and times - err',err)
)
转换 - 这个没什么好看的
这个方法是当出现错误时你选择将错误重制成一个有效的 Observable 。
所以我们可以通过创建一个 Observable 来体现这一点,这个 Observable 的使命就是报错
let error$ = Rx.Observable.throw('crash');
error$.subscribe(
data => console.log( data ),
err => console.log( err ),
() => console.log('complete')
)
这段代码只会执行错误回调而不会执行完成回调。
修补它
我们可以通过引入 catch()
操作符来进行修补。它是这样使用的:
let errorPatched$ = error$.catch(err => { return Rx.Observable.of('Patched' + err) });
errorPatched$.subscribe((data) => console.log(data) );
如你所见,使用 .catch()
进行修补并返回一个新的 Observable 修复 流。问题是这是否是你想要的。流确实存活下来最终完成了,它可以发出崩溃之后发生的任何值。
如果这不是你想要的,那么上面的重试方法可能会更适合你,决定权在你手中。
多个流呢?
你没想到会这么容易吧?当你编写 RxJS 代码时,通常会处理多个流,如果你知道在哪放置 catch()
操作符的话,那么使用 catch()
的方法是很棒的。
let badStream$ = Rx.Observable.throw('crash');
let goodStream$ = Rx.Observable.of(1,2,3,);
let merged$ = Rx.Observable.merge(
badStream$,
goodStream$
);
merged$.subscribe(
data => console.log(data),
err => console.error(err),
() => console.log('merge completed')
)
猜猜发生了什么?1)错误和值都发出了,流也完成了 2)错误和值都发出了 3)只发出了错误
遗憾的是发生的是 3)。这意味着我们几乎没有处理错误。
修复 - 所以我们需要修复错误。我们使用 catch()
操作符进行修复。问题在哪呢?
来试试这个?
let mergedPatched$ = Rx.Observable.merge(
badStream$,
goodStream$
).catch(err => Rx.Observable.of(err));
mergedPatched$.subscribe(
data => console.log(data),
err => console.error(err),
() => console.log('patchedMerged completed')
)
在这个案例中,得到结果的是 crash
和 patchedMerged completed
。所以流是完成了的,但我们还是没有得到 goodStream$
的值。所以这是一个更好的解决方法,但还不够好。
更好的修复 - 在 merge()
后面添加 catch()
操作符可以确保流完成,但是还不够好。我们来尝试下在 merge
之前进行 catch
操作。
let preMergedPatched$ = Rx.Observable.merge(
badStream$.catch(err => Rx.Observable.of(err)),
goodStream$
).catch(err => Rx.Observable.of(err));
preMergedPatched$.subscribe(
data => console.log(data),
err => console.error(err),
() => console.log('pre patched merge completed')
)
瞧,我们得到了值,我们捕获了错误并将错误作为一个新的 Observable 发出,并且流也完成了。
陷阱 catch()
所放的位置很重要。
适者生存
还有另外一种情况可能会很有意思。上面的场景假设你希望发出所有的,错误信息、值、所有的一切。
如果你关心的不是这些呢,你只关心流的值怎么办?我们来假设一下,有一个叫做 onErrorResumeNext()
的操作符
let secondBadStream$ = Rx.Observable.throw('bam');
let gloriaGaynorStream$ = Rx.Observable.of('I will survive');
let emitSurviving = Rx.Observable.onErrorResumeNext(
badStream$,
secondBadStream$,
gloriaGaynorStream$
);
emitSurviving.subscribe(
data => console.log(data),
err => console.error(err),
() => console.log('Survival of the fittest, completed')
)
输出的结果是 ‘I will survive’ 和 ‘Survival of the fittest, completed’ 。