RXJS调度程序
#javascript #rxjs #scheduler #typescript

在本文中,我们将介绍RXJS调度程序。许多RXJS用户尚未听说过它们或不了解其用例。

a Scheduler允许您在哪个执行上下文中定义Observable将通知向其Observer传递。
(c)RXJS文档

换句话说,调度程序可以管理Observable操作的执行顺序和时间。让我们看一个例子。

import { of } from  'rxjs';

console.log('Start');
of('Stream').subscribe(console.log);
console.log('End');

/**
 * Logs:
 * Start
 * Stream
 * End
*/

我们可以看到流代码同步执行。换句话说,流代码在同步执行上下文中执行


有多少个执行环境?

在浏览器代码按以下顺序执行:

  • 首先执行同步代码(呼叫堆栈)。
  • 然后执行Microtask队列命令(Promise)。
  • 之后执行了宏大队列命令(setTimeoutsetIntervalXMLHttpRequest,...)。
  • 最后,有一个呼叫的队列,这些呼叫在下一个重新读取周期之前就执行(requestAnimationFrame)。

对于上面的每个点,RXJS中都有一个调度程序:

  • queueScheduler-同步
  • asapScheduler -Microtask
  • asyncScheduler -macrotask
  • animationFrameScheduler-动画框架

如何安排?

  • 将使用observeOn操作员安排在哪些执行上下文上可观察的值。
  • 安排在哪个执行上下文中,subscribe()呼叫发生的情况使用subscribeOn操作员。默认情况下,subscribe()Observable的调用同步发生。

换句话说,observeOn操作员计划在哪个执行上下文Observable.next()Observable.error()Observable.complete()方法将执行,而subscribeOn操作员会影响Subscriber,因此subscribe()呼叫将在另一个上下文中执行。

我们可以使用不同的调度程序确认相同代码的执行顺序。

import {
  animationFrameScheduler,
  asapScheduler,
  asyncScheduler,
  merge,
  of,
  queueScheduler
} from 'rxjs';
import {observeOn} from 'rxjs/operators';

const queue$ = of('queueScheduler').pipe(observeOn(queueScheduler));
const asap$ = of('asapScheduler').pipe(observeOn(asapScheduler));
const asynch$ = of('asyncScheduler').pipe(observeOn(asyncScheduler));
const animationFrame$ = of('animationFrameScheduler').pipe(
  observeOn(animationFrameScheduler)
);

merge(
  queue$,
  asap$,
  asynch$,
  animationFrame$
).subscribe(console.log);

console.log('synchronous code');

/**
 * Logs:
 * queueScheduler
 * synchronous code
 * asapScheduler
 * asyncScheduler
 * animationFrameScheduler
 */

observeOnsubscribeOn操作员以delay为第二个参数。默认情况下它的值为0

注意:对于任何提供的非零delayscheduler,将使用asyncScheduler

在RXJS版本6.5.0之前,我们可以为offrommerge等几个创建操作员提供调度程序。在较新的版本中,这种行为被弃用。现在有一个称为scheduled的函数。

import {asyncScheduler, of, scheduled} from 'rxjs';

/**
 * Deprecated:
 * of('async', asyncScheduler).subscribe(console.log);
 */

scheduled(of('async'), asyncScheduler).subscribe(console.log);

调度程序用例

缓存的可观察物

假设我们有一个Angular Service MovieService。它具有通过ID获得电影并缓存结果的方法。我们将实施如下:

@Injectable({providedIn: 'root'})
export class MovieService {
  private cache: Map<number, Movie> = new Map<number, Movie>();
  constructor(private readonly http: HttpClient) {}
  // ...
  public getById(id: number): Observable<Movie> {
    if (this.cache.has(id)) {
      return of(this.cache.get(id));
    }
    return this.http.get<Movie>(...).pipe(
      tap((movie: Movie) => this.cache.set(id, movie))
    );
  }

此实现有效,但是有一个捕获。在首次使用特定ID调用此方法后,结果将异步到达,但是在再次调用后,结果将同步到达。如果您有兴趣,为什么这个不好查看这篇文章:

使用asyncScheduler
很容易解决该问题

@Injectable({providedIn: 'root'})
export class MovieService {
  private cache: Map<number, Movie> = new Map<number, Movie>();
  constructor(private readonly http: HttpClient) {}
  // ...
  public getById(id: number): Observable<Movie> {
    if (this.cache.has(id)) {
      return scheduled(of(this.cache.get(id)), asyncScheduler);
    }
    return this.http.get<Movie>(...);
  }

嘲笑服务

当我们模拟MovieService进行测试时,将出现同样的问题。
假设我们的测试文件看起来像这样:

let movieService: MovieService;

beforeEach(() => {
  TestBed.configureTestingModule({
    providers: [
      {
        provide: MovieService,
        useValue: jasmine.createSpyObj('MovieService', 'getById')
      },
      ...
    ],
    ...
  });
  movieService = TestBed.get(MovieService);
  (movieService.getById as jasmine.Spy).and.returnValue(of(mockMovie));
});

测试将起作用,但这是错误的,因为我们返回同步结果,而不是异步结果,这是现实的情况。同样,问题是用asyncScheduler解决的:

let movieService: MovieService;

beforeEach(() => {
  TestBed.configureTestingModule({
    providers: [
      {
        provide: MovieService,
        useValue: jasmine.createSpyObj('MovieService', 'getById')
      },
      ...
    ],
    ...
  });
  movieService = TestBed.get(MovieService);
  (movieService.getById as jasmine.Spy).and.returnValue(
    scheduled(of(mockMovie), asyncScheduler)
  );
});

ExpressionChangeDafterithasbeenCheckedError

每个角开发人员都遇到了此错误,并找到了解决方案的不同方法。

以下代码将导致错误:

import { AfterViewInit, Component } from '@angular/core';

@Component({
  selector: 'hello',
  template: `<h1>Hello {{name}}!</h1>`,
  styles: [],
})
export class AppComponent implements AfterViewInit {
  public name: string;

  ngAfterViewInit() {
    this.name = 'John';
  }
}

第一个解决方案是使用setTimeout
更新name属性

import { AfterViewInit, Component } from '@angular/core';

@Component({
  selector: 'hello',
  template: `<h1>Hello {{name}}!</h1>`,
  styles: [],
})
export class AppComponent implements AfterViewInit {
  public name: string;

  ngAfterViewInit() {
    setTimeout(() => {
      this.name = 'John';
    })
  }
}

另一个解决方案是在更新属性后运行更改检测:

import {
  AfterViewInit,
  ChangeDetectorRef,
  Component,
} from '@angular/core';

@Component({
  selector: 'hello',
  template: `<h1>Hello {{name}}!</h1>`,
  styles: [],
})
export class AppComponent implements AfterViewInit {
  constructor(private readonly cdRef: ChangeDetectorRef) {}

  public name: string;

  ngAfterViewInit() {
    this.name = 'John';
    this.cdRef.detectChanges();
  }
}

每个RXJS调度程序都有schedule方法,该方法在调度程序的各自的执行上下文中提供了回调函数。使用该方法还有助于解决问题:

import { AfterViewInit, Component } from '@angular/core';

@Component({
  selector: 'hello',
  template: `<h1>Hello {{name}}!</h1>`,
  styles: [],
})
export class AppComponent implements AfterViewInit {
  public name: string;

  ngAfterViewInit() {
    /**
     * any of asyncScheduler, asapScheduler and animationFrameScheduler
     * solves the issue
     */
    asyncScheduler.schedule(() => {
      this.name = 'John';
    });
  }
}