• shareReplay
    • 签名: shareReplay(bufferSize?: number, windowTime?: number, scheduler?I IScheduler): Observable
  • 共享源 observable 并重放指定次数的发出。
    • 为什么使用 shareReplay
    • 示例
      • 示例 1: 多个订阅者共享源 observable
  • 其他资源

    shareReplay

    签名: shareReplay(bufferSize?: number, windowTime?: number, scheduler?I IScheduler): Observable

    共享源 observable 并重放指定次数的发出。

    为什么使用 shareReplay

    通常啊,当有副作用或繁重的计算时,你不希望在多个订阅者之间重复执行时,会使用 shareReplay
    当你知道流的后来订阅者也需要访问之前发出的值,shareReplay 在这种场景下也是有价值的。
    这种在订阅过程中重放值的能力是区分 shareshareReplay 的关键。

    例如,加入你有一个发出最后访问 url 的 observable 。
    在第一个示例中,我们将使用 share:

    1. // 使用 subject 模拟 url 的变化
    2. const routeEnd = new Subject<{data: any, url: string}>();
    3. // 提取 url 并与后来订阅者共享
    4. const lastUrl = routeEnd.pipe(
    5. pluck('url'),
    6. share()
    7. );
    8. // 起始订阅者是必须的
    9. const initialSubscriber = lastUrl.subscribe(console.log);
    10. // 模拟路由变化
    11. routeEnd.next({data: {}, url: 'my-path'});
    12. // 没有任何输出
    13. const lateSubscriber = lastUrl.subscribe(console.log);

    上面的示例中,lateSubscriber 订阅源 observable 后没有任何输出。
    现在我们想要访问订阅中的最新发出值,可以通过 shareReplay 来完成:

    1. import { Subject } from 'rxjs/Subject';
    2. import { ReplaySubject } from 'rxjs/ReplaySubject';
    3. import { pluck, share, shareReplay, tap } from 'rxjs/operators';
    4. // 使用 subject 模拟 url 的变化
    5. const routeEnd = new Subject<{data: any, url: string}>();
    6. // 提取 url 并与后来订阅者共享
    7. const lastUrl = routeEnd.pipe(
    8. tap(_ => console.log('executed')),
    9. pluck('url'),
    10. // 默认为重放最后一个值
    11. shareReplay()
    12. );
    13. // 起始订阅者是必须的
    14. const initialSubscriber = lastUrl.subscribe(console.log);
    15. // 模拟路由变化
    16. // 输出: 'executed', 'my-path'
    17. routeEnd.next({data: {}, url: 'my-path'});
    18. // 输出: 'my-path'
    19. const lateSubscriber = lastUrl.subscribe(console.log);

    注意,如果使用 ReplaySubject 订阅 lastUrl 流,然后再订阅 ReplaySubject
    这种行为与使用 shareReplay 类似:

    1. // 使用 subject 模拟 url 的变化
    2. const routeEnd = new Subject<{data: any, url: string}>();
    3. // 使用 ReplaySubject 来替代 shareReplay
    4. const shareWithReplay = new ReplaySubject();
    5. // 取 url 并与后来订阅者共享
    6. const lastUrl = routeEnd.pipe(
    7. pluck('url')
    8. )
    9. .subscribe(val => shareWithReplay.next(val));
    10. // 模拟路由变化
    11. routeEnd.next({data: {}, url: 'my-path'});
    12. // 订阅 ReplaySubject
    13. // 输出: 'my path'
    14. shareWithReplay.subscribe(console.log);

    事实上,如果深入源码,我们可以发现两者之间使用的技术是类似的。
    当订阅发生后,shareReplay 会订阅源 observable,并通过内部的 ReplaySubject
    来发送值:

    (
    source
    )

    1. return function shareReplayOperation(this: Subscriber<T>, source: Observable<T>) {
    2. refCount++;
    3. if (!subject || hasError) {
    4. hasError = false;
    5. subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
    6. subscription = source.subscribe({
    7. next(value) { subject.next(value); },
    8. error(err) {
    9. hasError = true;
    10. subject.error(err);
    11. },
    12. complete() {
    13. isComplete = true;
    14. subject.complete();
    15. },
    16. });
    17. }
    18. const innerSub = subject.subscribe(this);
    19. return () => {
    20. refCount--;
    21. innerSub.unsubscribe();
    22. if (subscription && refCount === 0 && isComplete) {
    23. subscription.unsubscribe();
    24. }
    25. };
    26. };
    27. }
    shareReplay - 图1

    示例

    示例 1: 多个订阅者共享源 observable

    ( Stackblitz )

    1. import { Subject } from 'rxjs/Subject';
    2. import { ReplaySubject } from 'rxjs/ReplaySubject';
    3. import { pluck, share, shareReplay, tap } from 'rxjs/operators';
    4. // 使用 subject 模拟 url 的变化
    5. const routeEnd = new Subject<{data: any, url: string}>();
    6. // 提取 url 并与后来订阅者共享
    7. const lastUrl = routeEnd.pipe(
    8. tap(_ => console.log('executed')),
    9. pluck('url'),
    10. // 默认为重放最后一个值
    11. shareReplay()
    12. );
    13. // 起始订阅者是必须的
    14. const initialSubscriber = lastUrl.subscribe(console.log)
    15. // 模拟路由变化
    16. // 输出: 'executed', 'my-path'
    17. routeEnd.next({data: {}, url: 'my-path'});
    18. // 输出: 'my-path'
    19. const lateSubscriber = lastUrl.subscribe(console.log);

    其他资源

    • shareReplay
      :newspaper: - 官方文档

    :file_folder: 源码:
    https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/shareReplay.ts