• forkJoin
    • 签名: forkJoin(...args, selector : function): Observable
  • 当所有 observables 完成时,发出每个 observable 的最新值。
    • 为什么使用 forkJoin
    • 示例
      • 示例 1: Observables 再不同的时间间隔后完成
      • 示例 2: 发起任意多个请求
      • 示例 3: 在外部处理错误
      • 示例 4: 当某个内部 observable 报错时得到成功结果
      • 示例 5: Angular 中的 forkJoin
  • 其他资源

    forkJoin

    签名: forkJoin(...args, selector : function): Observable

    当所有 observables 完成时,发出每个 observable 的最新值。


    :bulb: 如果你想要多个 observables 按发出顺序相对应的值的组合,试试 zip!

    :warning: 如果内部 observable 不完成的话,forkJoin 永远不会发出值!


    为什么使用 forkJoin

    当有一组 observables,但你只关心每个 observable 最后发出的值时,此操作符是最适合的。此操作符的一个常见用例是在页面加载(或其他事件)时你希望发起多个请求,并在所有请求都响应后再采取行动。它可能与 Promise.all 的使用方式类似。

    注意,如果任意作用于 forkJoin 的内部 observable 报错的话,对于那些在内部 observable 上没有正确 catch 错误,从而导致完成的 observable,你将丢失它们的值 (参见示例 4)。如果你只关心所有内部 observables 是否成功完成的话,可以在外部捕获错误。

    还需要注意的是如果 observable 发出的项多于一个的话,并且你只关心前一个发出的话,那么 forkJoin 并非正确的选择。在这种情况下,应该选择像 combineLatest 或 zip 这样的操作符。

    forkJoin - 图3

    示例

    示例 1: Observables 再不同的时间间隔后完成

    ( StackBlitz |
    jsBin |
    jsFiddle )

    1. import { delay, take } from 'rxjs/operators';
    2. import { forkJoin } from 'rxjs/observable/forkJoin';
    3. import { of } from 'rxjs/observable/of';
    4. import { interval } from 'rxjs/observable/interval';
    5. const myPromise = val =>
    6. new Promise(resolve =>
    7. setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
    8. );
    9. /*
    10. 当所有 observables 完成时,将每个 observable
    11. 的最新值作为数组发出
    12. */
    13. const example = forkJoin(
    14. // 立即发出 'Hello'
    15. of('Hello'),
    16. // 1秒后发出 'World'
    17. of('World').pipe(delay(1000)),
    18. // 1秒后发出0
    19. interval(1000).pipe(take(1)),
    20. // 以1秒的时间间隔发出0和1
    21. interval(1000).pipe(take(2)),
    22. // 5秒后解析 'Promise Resolved' 的 promise
    23. myPromise('RESULT')
    24. );
    25. //输出: ["Hello", "World", 0, 1, "Promise Resolved: RESULT"]
    26. const subscribe = example.subscribe(val => console.log(val));
    示例 2: 发起任意多个请求

    ( StackBlitz |
    jsBin |
    jsFiddle )

    1. import { mergeMap } from 'rxjs/operators';
    2. import { forkJoin } from 'rxjs/observable/forkJoin';
    3. import { of } from 'rxjs/observable/of';
    4. const myPromise = val =>
    5. new Promise(resolve =>
    6. setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
    7. );
    8. const source = of([1, 2, 3, 4, 5]);
    9. // 发出数组的全部5个结果
    10. const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));
    11. /*
    12. 输出:
    13. [
    14. "Promise Resolved: 1",
    15. "Promise Resolved: 2",
    16. "Promise Resolved: 3",
    17. "Promise Resolved: 4",
    18. "Promise Resolved: 5"
    19. ]
    20. */
    21. const subscribe = example.subscribe(val => console.log(val));
    示例 3: 在外部处理错误

    ( StackBlitz |
    jsBin |
    jsFiddle )

    1. import { delay, catchError } from 'rxjs/operators';
    2. import { forkJoin } from 'rxjs/observable/forkJoin';
    3. import { of } from 'rxjs/observable/of';
    4. import { _throw } from 'rxjs/observable/throw';
    5. /*
    6. 当所有 observables 完成时,将每个 observable
    7. 的最新值作为数组发出
    8. */
    9. const example = forkJoin(
    10. // 立即发出 'Hello'
    11. of('Hello'),
    12. // 1秒后发出 'World'
    13. of('World').pipe(delay(1000)),
    14. // 抛出错误
    15. _throw('This will error')
    16. ).pipe(catchError(error => of(error)));
    17. // 输出: 'This will Error'
    18. const subscribe = example.subscribe(val => console.log(val));
    示例 4: 当某个内部 observable 报错时得到成功结果

    ( StackBlitz |
    jsBin |
    jsFiddle )

    1. import { delay, catchError } from 'rxjs/operators';
    2. import { forkJoin } from 'rxjs/observable/forkJoin';
    3. import { of } from 'rxjs/observable/of';
    4. import { _throw } from 'rxjs/observable/throw';
    5. /*
    6. 当所有 observables 完成时,将每个 observable
    7. 的最新值作为数组发出
    8. */
    9. const example = forkJoin(
    10. // 立即发出 'Hello'
    11. of('Hello'),
    12. // 1秒后发出 'World'
    13. of('World').pipe(delay(1000)),
    14. // 抛出错误
    15. _throw('This will error').pipe(catchError(error => of(error)))
    16. );
    17. // 输出: ["Hello", "World", "This will error"]
    18. const subscribe = example.subscribe(val => console.log(val));
    示例 5: Angular 中的 forkJoin

    ( plunker )

    1. @Injectable()
    2. export class MyService {
    3. makeRequest(value: string, delayDuration: number) {
    4. // 模拟 http 请求
    5. return of(`Complete: ${value}`).pipe(
    6. delay(delayDuration)
    7. );
    8. }
    9. }
    10. @Component({
    11. selector: 'my-app',
    12. template: `
    13. <div>
    14. <h2>forkJoin Example</h2>
    15. <ul>
    16. <li> {{propOne}} </li>
    17. <li> {{propTwo}} </li>
    18. <li> {{propThree}} </li>
    19. </ul>
    20. </div>
    21. `,
    22. })
    23. export class App {
    24. public propOne: string;
    25. public propTwo: string;
    26. public propThree: string;
    27. constructor(private _myService: MyService) {}
    28. ngOnInit() {
    29. // 使用不同的延迟模拟3个请求
    30. forkJoin(
    31. this._myService.makeRequest('Request One', 2000),
    32. this._myService.makeRequest('Request Two', 1000)
    33. this._myService.makeRequest('Request Three', 3000)
    34. )
    35. .subscribe(([res1, res2, res3]) => {
    36. this.propOne = res1;
    37. this.propTwo = res2;
    38. this.propThree = res3;
    39. });
    40. }
    41. }

    其他资源

    • forkJoin :newspaper: - 官方文档

    :file_folder: 源码: https://github.com/ReactiveX/rxjs/blob/master/src/observable/ForkJoinObservable.ts