delay.js
5.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"use strict";
var __extends = (this && this.__extends) || function (d, b) {
for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p];
function __() { this.constructor = d; }
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
};
var async_1 = require('../scheduler/async');
var isDate_1 = require('../util/isDate');
var Subscriber_1 = require('../Subscriber');
var Notification_1 = require('../Notification');
/**
* Delays the emission of items from the source Observable by a given timeout or
* until a given Date.
*
* <span class="informal">Time shifts each item by some specified amount of
* milliseconds.</span>
*
* <img src="./img/delay.png" width="100%">
*
* If the delay argument is a Number, this operator time shifts the source
* Observable by that amount of time expressed in milliseconds. The relative
* time intervals between the values are preserved.
*
* If the delay argument is a Date, this operator time shifts the start of the
* Observable execution until the given date occurs.
*
* @example <caption>Delay each click by one second</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var delayedClicks = clicks.delay(1000); // each click emitted after 1 second
* delayedClicks.subscribe(x => console.log(x));
*
* @example <caption>Delay all clicks until a future date happens</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var date = new Date('March 15, 2050 12:00:00'); // in the future
* var delayedClicks = clicks.delay(date); // click emitted only after that date
* delayedClicks.subscribe(x => console.log(x));
*
* @see {@link debounceTime}
* @see {@link delayWhen}
*
* @param {number|Date} delay The delay duration in milliseconds (a `number`) or
* a `Date` until which the emission of the source items is delayed.
* @param {Scheduler} [scheduler=async] The Scheduler to use for
* managing the timers that handle the time-shift for each item.
* @return {Observable} An Observable that delays the emissions of the source
* Observable by the specified timeout or Date.
* @method delay
* @owner Observable
*/
function delay(delay, scheduler) {
if (scheduler === void 0) { scheduler = async_1.async; }
var absoluteDelay = isDate_1.isDate(delay);
var delayFor = absoluteDelay ? (+delay - scheduler.now()) : Math.abs(delay);
return this.lift(new DelayOperator(delayFor, scheduler));
}
exports.delay = delay;
var DelayOperator = (function () {
function DelayOperator(delay, scheduler) {
this.delay = delay;
this.scheduler = scheduler;
}
DelayOperator.prototype.call = function (subscriber, source) {
return source.subscribe(new DelaySubscriber(subscriber, this.delay, this.scheduler));
};
return DelayOperator;
}());
/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
var DelaySubscriber = (function (_super) {
__extends(DelaySubscriber, _super);
function DelaySubscriber(destination, delay, scheduler) {
_super.call(this, destination);
this.delay = delay;
this.scheduler = scheduler;
this.queue = [];
this.active = false;
this.errored = false;
}
DelaySubscriber.dispatch = function (state) {
var source = state.source;
var queue = source.queue;
var scheduler = state.scheduler;
var destination = state.destination;
while (queue.length > 0 && (queue[0].time - scheduler.now()) <= 0) {
queue.shift().notification.observe(destination);
}
if (queue.length > 0) {
var delay_1 = Math.max(0, queue[0].time - scheduler.now());
this.schedule(state, delay_1);
}
else {
source.active = false;
}
};
DelaySubscriber.prototype._schedule = function (scheduler) {
this.active = true;
this.add(scheduler.schedule(DelaySubscriber.dispatch, this.delay, {
source: this, destination: this.destination, scheduler: scheduler
}));
};
DelaySubscriber.prototype.scheduleNotification = function (notification) {
if (this.errored === true) {
return;
}
var scheduler = this.scheduler;
var message = new DelayMessage(scheduler.now() + this.delay, notification);
this.queue.push(message);
if (this.active === false) {
this._schedule(scheduler);
}
};
DelaySubscriber.prototype._next = function (value) {
this.scheduleNotification(Notification_1.Notification.createNext(value));
};
DelaySubscriber.prototype._error = function (err) {
this.errored = true;
this.queue = [];
this.destination.error(err);
};
DelaySubscriber.prototype._complete = function () {
this.scheduleNotification(Notification_1.Notification.createComplete());
};
return DelaySubscriber;
}(Subscriber_1.Subscriber));
var DelayMessage = (function () {
function DelayMessage(time, notification) {
this.time = time;
this.notification = notification;
}
return DelayMessage;
}());
//# sourceMappingURL=delay.js.map