- exhaustMap
- 签名:
exhaustMap(project: function, resultSelector: function): Observable
- 签名:
- 映射成内部 observable,忽略其他值直到该 observable 完成。
- 示例
- 示例 1: 使用 interval 的 exhaustMap
- 示例 2: 另一个使用 interval 的 exhaustMap
- 示例
- 外部示例
exhaustMap
用于 @ngrx 示例应用 的 login Effect
- 其他资源
exhaustMap
签名: exhaustMap(project: function, resultSelector: function): Observable
映射成内部 observable,忽略其他值直到该 observable 完成。
示例
示例 1: 使用 interval 的 exhaustMap
( Stackblitz |
jsBin |
jsFiddle )
import { interval } from 'rxjs/observable/interval';
import { merge } from 'rxjs/observable/merge';
import { of } from 'rxjs/observable/of';
import { delay, take, exhaustMap } from 'rxjs/operators';
const sourceInterval = interval(1000);
const delayedInterval = sourceInterval.pipe(delay(10), take(4));
const exhaustSub = merge(
// 延迟10毫秒,然后开始 interval 并发出4个值
delayedInterval,
// 立即发出
of(true)
)
.pipe(exhaustMap(_ => sourceInterval.pipe(take(5))))
/*
* 第一个发出的值 (of(true)) 会被映射成每秒发出值、
* 5秒后完成的 interval observable 。
* 因为 delayedInterval 的发送是晚于前者的,虽然 observable
* 仍然是活动的,但它们会被忽略。
*
* 与类似的操作符进行下对比:
* concatMap 会进行排队
* switchMap 会在每次发送时切换成新的内部 observable
* mergeMap 会为每个发出值维护新的 subscription
*/
// 输出: 0, 1, 2, 3, 4
.subscribe(val => console.log(val));
示例 2: 另一个使用 interval 的 exhaustMap
( Stackblitz |
jsBin |
jsFiddle )
import { interval } from 'rxjs/observable/interval';
import { exhaustMap, tap, take } from 'rxjs/operators';
const firstInterval = interval(1000).pipe(take(10));
const secondInterval = interval(1000).pipe(take(2));
const exhaustSub = firstInterval
.pipe(
exhaustMap(f => {
console.log(`Emission Corrected of first interval: ${f}`);
return secondInterval;
})
)
/*
当我们订阅第一个 interval 时,它开始发出值(从0开始)。
这个值会映射成第二个 interval,然后它开始发出值(从0开始)。
当第二个 interval 出于激活状态时,第一个 interval 的值会被忽略。
我们可以看到 firstInterval 发出的数字为3,6,等等...
输出:
Emission of first interval: 0
0
1
Emission of first interval: 3
0
1
Emission of first interval: 6
0
1
Emission of first interval: 9
0
1
*/
.subscribe(s => console.log(s));
外部示例
exhaustMap
用于 @ngrx 示例应用 的 login Effect
(
Source
)
@Effect()
login$ = this.actions$.pipe(
ofType(AuthActionTypes.Login),
map((action: Login) => action.payload),
exhaustMap((auth: Authenticate) =>
this.authService
.login(auth)
.pipe(
map(user => new LoginSuccess({ user })),
catchError(error => of(new LoginFailure(error)))
)
)
);
其他资源
- exhaustMap - 官方文档
源码: https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/exhaustMap.ts