timestamp.ts
1.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import { Operator } from '../Operator';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { Scheduler } from '../Scheduler';
import { async } from '../scheduler/async';
/**
* @param scheduler
* @return {Observable<Timestamp<any>>|WebSocketSubject<T>|Observable<T>}
* @method timestamp
* @owner Observable
*/
export function timestamp<T>(this: Observable<T>, scheduler: Scheduler = async): Observable<Timestamp<T>> {
return this.lift(new TimestampOperator(scheduler));
}
export class Timestamp<T> {
constructor(public value: T, public timestamp: number) {
}
};
class TimestampOperator<T> implements Operator<T, Timestamp<T>> {
constructor(private scheduler: Scheduler) {
}
call(observer: Subscriber<Timestamp<T>>, source: any): any {
return source.subscribe(new TimestampSubscriber(observer, this.scheduler));
}
}
class TimestampSubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<Timestamp<T>>, private scheduler: Scheduler) {
super(destination);
}
protected _next(value: T): void {
const now = this.scheduler.now();
this.destination.next(new Timestamp(value, now));
}
}