在本文中,我们将介绍RXJS调度程序。许多RXJS用户尚未听说过它们或不了解其用例。
a
Scheduler
允许您在哪个执行上下文中定义Observable
将通知向其Observer
传递。
(c)RXJS文档
换句话说,调度程序可以管理Observable
操作的执行顺序和时间。让我们看一个例子。
import { of } from 'rxjs';
console.log('Start');
of('Stream').subscribe(console.log);
console.log('End');
/**
* Logs:
* Start
* Stream
* End
*/
我们可以看到流代码同步执行。换句话说,流代码在同步执行上下文中执行。
有多少个执行环境?
在浏览器代码按以下顺序执行:
- 首先执行同步代码(呼叫堆栈)。
- 然后执行Microtask队列命令(
Promise
)。 - 之后执行了宏大队列命令(
setTimeout
,setInterval
,XMLHttpRequest
,...)。 - 最后,有一个呼叫的队列,这些呼叫在下一个重新读取周期之前就执行(
requestAnimationFrame
)。
对于上面的每个点,RXJS中都有一个调度程序:
-
queueScheduler
-同步 -
asapScheduler
-Microtask -
asyncScheduler
-macrotask -
animationFrameScheduler
-动画框架
如何安排?
- 将使用
observeOn
操作员安排在哪些执行上下文上可观察的值。 - 安排在哪个执行上下文中,
subscribe()
呼叫发生的情况使用subscribeOn
操作员。默认情况下,subscribe()
对Observable
的调用同步发生。
换句话说,observeOn
操作员计划在哪个执行上下文Observable.next()
,Observable.error()
,Observable.complete()
方法将执行,而subscribeOn
操作员会影响Subscriber
,因此subscribe()
呼叫将在另一个上下文中执行。
我们可以使用不同的调度程序确认相同代码的执行顺序。
import {
animationFrameScheduler,
asapScheduler,
asyncScheduler,
merge,
of,
queueScheduler
} from 'rxjs';
import {observeOn} from 'rxjs/operators';
const queue$ = of('queueScheduler').pipe(observeOn(queueScheduler));
const asap$ = of('asapScheduler').pipe(observeOn(asapScheduler));
const asynch$ = of('asyncScheduler').pipe(observeOn(asyncScheduler));
const animationFrame$ = of('animationFrameScheduler').pipe(
observeOn(animationFrameScheduler)
);
merge(
queue$,
asap$,
asynch$,
animationFrame$
).subscribe(console.log);
console.log('synchronous code');
/**
* Logs:
* queueScheduler
* synchronous code
* asapScheduler
* asyncScheduler
* animationFrameScheduler
*/
observeOn
和subscribeOn
操作员以delay
为第二个参数。默认情况下它的值为0
。
注意:对于任何提供的非零
delay
和scheduler
,将使用asyncScheduler
。
在RXJS版本6.5.0之前,我们可以为of
,from
,merge
等几个创建操作员提供调度程序。在较新的版本中,这种行为被弃用。现在有一个称为scheduled
的函数。
import {asyncScheduler, of, scheduled} from 'rxjs';
/**
* Deprecated:
* of('async', asyncScheduler).subscribe(console.log);
*/
scheduled(of('async'), asyncScheduler).subscribe(console.log);
调度程序用例
缓存的可观察物
假设我们有一个Angular Service MovieService
。它具有通过ID获得电影并缓存结果的方法。我们将实施如下:
@Injectable({providedIn: 'root'})
export class MovieService {
private cache: Map<number, Movie> = new Map<number, Movie>();
constructor(private readonly http: HttpClient) {}
// ...
public getById(id: number): Observable<Movie> {
if (this.cache.has(id)) {
return of(this.cache.get(id));
}
return this.http.get<Movie>(...).pipe(
tap((movie: Movie) => this.cache.set(id, movie))
);
}
此实现有效,但是有一个捕获。在首次使用特定ID调用此方法后,结果将异步到达,但是在再次调用后,结果将同步到达。如果您有兴趣,为什么这个不好查看这篇文章: