import { Observable } from '../Observable';
import { Operator } from '../Operator';
import { Observer } from '../Observer';
import { Subscription } from '../Subscription';
import { OuterSubscriber } from '../OuterSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
/**
* Converts a higher-order Observable into a first-order Observable which
* concurrently delivers all values that are emitted on the inner Observables.
*
* Flattens an Observable-of-Observables.
*
*
*
* `mergeAll` subscribes to an Observable that emits Observables, also known as
* a higher-order Observable. Each time it observes one of these emitted inner
* Observables, it subscribes to that and delivers all the values from the
* inner Observable on the output Observable. The output Observable only
* completes once all inner Observables have completed. Any error delivered by
* a inner Observable will be immediately emitted on the output Observable.
*
* @example
Spawn a new interval Observable for each click event, and blend their outputs as one Observable
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
* var firstOrder = higherOrder.mergeAll();
* firstOrder.subscribe(x => console.log(x));
*
* @example Count from 0 to 9 every second for each click, but only allow 2 concurrent timers
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(10));
* var firstOrder = higherOrder.mergeAll(2);
* firstOrder.subscribe(x => console.log(x));
*
* @see {@link combineAll}
* @see {@link concatAll}
* @see {@link exhaust}
* @see {@link merge}
* @see {@link mergeMap}
* @see {@link mergeMapTo}
* @see {@link mergeScan}
* @see {@link switch}
* @see {@link zipAll}
*
* @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of inner
* Observables being subscribed to concurrently.
* @return {Observable} An Observable that emits values coming from all the
* inner Observables emitted by the source Observable.
* @method mergeAll
* @owner Observable
*/
export function mergeAll(this: Observable, concurrent: number = Number.POSITIVE_INFINITY): T {
return this.lift(new MergeAllOperator(concurrent));
}
export class MergeAllOperator implements Operator, T> {
constructor(private concurrent: number) {
}
call(observer: Observer, source: any): any {
return source.subscribe(new MergeAllSubscriber(observer, this.concurrent));
}
}
/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
export class MergeAllSubscriber extends OuterSubscriber, T> {
private hasCompleted: boolean = false;
private buffer: Observable[] = [];
private active: number = 0;
constructor(destination: Observer, private concurrent: number) {
super(destination);
}
protected _next(observable: Observable) {
if (this.active < this.concurrent) {
this.active++;
this.add(subscribeToResult, T>(this, observable));
} else {
this.buffer.push(observable);
}
}
protected _complete() {
this.hasCompleted = true;
if (this.active === 0 && this.buffer.length === 0) {
this.destination.complete();
}
}
notifyComplete(innerSub: Subscription) {
const buffer = this.buffer;
this.remove(innerSub);
this.active--;
if (buffer.length > 0) {
this._next(buffer.shift());
} else if (this.active === 0 && this.hasCompleted) {
this.destination.complete();
}
}
}