import { async } from '../scheduler/async'; import { Operator } from '../Operator'; import { Scheduler } from '../Scheduler'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subscription, TeardownLogic } from '../Subscription'; /** * Ignores source values for `duration` milliseconds, then emits the most recent * value from the source Observable, then repeats this process. * * When it sees a source values, it ignores that plus * the next ones for `duration` milliseconds, and then it emits the most recent * value from the source. * * * * `auditTime` is similar to `throttleTime`, but emits the last value from the * silenced time window, instead of the first value. `auditTime` emits the most * recent value from the source Observable on the output Observable as soon as * its internal timer becomes disabled, and ignores source values while the * timer is enabled. Initially, the timer is disabled. As soon as the first * source value arrives, the timer is enabled. After `duration` milliseconds (or * the time unit determined internally by the optional `scheduler`) has passed, * the timer is disabled, then the most recent source value is emitted on the * output Observable, and this process repeats for the next source value. * Optionally takes a {@link Scheduler} for managing timers. * * @example Emit clicks at a rate of at most one click per second * var clicks = Rx.Observable.fromEvent(document, 'click'); * var result = clicks.auditTime(1000); * result.subscribe(x => console.log(x)); * * @see {@link audit} * @see {@link debounceTime} * @see {@link delay} * @see {@link sampleTime} * @see {@link throttleTime} * * @param {number} duration Time to wait before emitting the most recent source * value, measured in milliseconds or the time unit determined internally * by the optional `scheduler`. * @param {Scheduler} [scheduler=async] The {@link Scheduler} to use for * managing the timers that handle the rate-limiting behavior. * @return {Observable} An Observable that performs rate-limiting of * emissions from the source Observable. * @method auditTime * @owner Observable */ export function auditTime(this: Observable, duration: number, scheduler: Scheduler = async): Observable { return this.lift(new AuditTimeOperator(duration, scheduler)); } class AuditTimeOperator implements Operator { constructor(private duration: number, private scheduler: Scheduler) { } call(subscriber: Subscriber, source: any): TeardownLogic { return source.subscribe(new AuditTimeSubscriber(subscriber, this.duration, this.scheduler)); } } /** * We need this JSDoc comment for affecting ESDoc. * @ignore * @extends {Ignored} */ class AuditTimeSubscriber extends Subscriber { private value: T; private hasValue: boolean = false; private throttled: Subscription; constructor(destination: Subscriber, private duration: number, private scheduler: Scheduler) { super(destination); } protected _next(value: T): void { this.value = value; this.hasValue = true; if (!this.throttled) { this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this)); } } clearThrottle(): void { const { value, hasValue, throttled } = this; if (throttled) { this.remove(throttled); this.throttled = null; throttled.unsubscribe(); } if (hasValue) { this.value = null; this.hasValue = false; this.destination.next(value); } } } function dispatchNext(subscriber: AuditTimeSubscriber): void { subscriber.clearThrottle(); }