import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Scheduler } from '../Scheduler'; import { async } from '../scheduler/async'; /** * @param scheduler * @return {Observable>|WebSocketSubject|Observable} * @method timestamp * @owner Observable */ export function timestamp(this: Observable, scheduler: Scheduler = async): Observable> { return this.lift(new TimestampOperator(scheduler)); } export class Timestamp { constructor(public value: T, public timestamp: number) { } }; class TimestampOperator implements Operator> { constructor(private scheduler: Scheduler) { } call(observer: Subscriber>, source: any): any { return source.subscribe(new TimestampSubscriber(observer, this.scheduler)); } } class TimestampSubscriber extends Subscriber { constructor(destination: Subscriber>, private scheduler: Scheduler) { super(destination); } protected _next(value: T): void { const now = this.scheduler.now(); this.destination.next(new Timestamp(value, now)); } }