import { BehaviorSubject, asyncScheduler, race, of, forkJoin, merge } from 'rxjs';
import { withLatestFrom, filter, map, share, scan, switchMap, mapTo, take, shareReplay, subscribeOn, catchError } from 'rxjs/operators';
import { INTERRUPTOR_INIT, INTERRUPTOR_READY, INTERRUPTION_WILL_START, INTERRUPTION_END, INTERRUPTION_CANCELED, INTERRUPTION_DID_START } from './types';
import { Disposable } from '..';

export default class Interruptor extends Disposable {
  constructor({ cancel$ }) {
    super();
    this.interruptions$ = new BehaviorSubject({ type: INTERRUPTOR_INIT, status: INTERRUPTOR_READY });
    this.registeredHooks$ = new BehaviorSubject([]);
    this.hooks$ = Interruptor.createHookStream(this.registeredHooks$);
    this.cancel$ = Interruptor.createInterruptionCancel(cancel$, this.interruptions$);
  }

  registerInterruption(type, interruption) {
    /* Util to register an interruption on a specific hook */
    this.registeredHooks$.next({ type, interruption });
  }

  mapToInterruption(source$, type) {
    /**
     * Util to map a source$ stream to a specific interruption.
     * Returns the subscription object if you need to unsubscribe
     * If your source$ stream is subscribed to elsewhere:
     * make sure to call share() to avoid side-effects
     */
    return source$.pipe(
      withLatestFrom(this.hooks$),
      map(([, hooks]) => ({
        type,
        status: INTERRUPTION_WILL_START,
        autoComplete: !hooks.some(({ type: hookType }) => hookType === type)
      }))
    ).subscribe(this.interruptions$);
  }

  createHook(hookType) {
    /**
     * Hook executor stream:
     * Subscribes to the interruption stream and triggers sequence of registered hook functions
     * for specific hook type via Interruptor::createInterruptionSequence
     *
     * - onWillStart stream determines if interruption should actually be processed
     * - If it completes, onDidStart stream logic will be triggered and will potentially receive
     * data from onWillStart completion.
     * - If onWillStart throws: skip the hook call and process remaining hooks if any.
     * - If every hook throws: cancel interruption
     * - onWillStart can either throw or complete and respectively map to onCancel or onResolve stream
     *
     * Using Scheduler.async to force asynchronicity
     * in the case of a synchronous callback
     * see: http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-bindCallback
     */

    const willStart$ = this.interruptions$.pipe(
      filter(({ status, type }) => status === INTERRUPTION_WILL_START && type === hookType),
      withLatestFrom(this.hooks$, (event, hooks) => ({
        event,
        hooks: hooks
          .filter(({ type }) => type === hookType)
          .map(({ interruption }) => interruption)
      })),
      switchMap(({ event: { autoComplete }, hooks }) => (
        autoComplete
          ? of({ resolved: true }).pipe(subscribeOn(asyncScheduler))
          : forkJoin(...hooks.map((hook) => hook.onWillStart()
            .pipe(
              map((data) => ({ hook, data })),
              catchError((data = {}) => hook.onCancel(data).pipe(subscribeOn(asyncScheduler), mapTo(null))),
              take(1)
            )))
            .pipe(map((readyHooks) => readyHooks.filter(Boolean))))),
      share()
    );

    const didStart$ = willStart$.pipe(
      switchMap((readyHooks) => (
        readyHooks.length
          ? Interruptor.createInterruptionSequence(readyHooks, this.cancel$)
          : of({ resolved: false, didStart: false }) /* if no registered hooks successfuly started */
      )),
      map(({ resolved, didStart = true }) => ({
        type: hookType,
        status: resolved ? INTERRUPTION_END : INTERRUPTION_CANCELED,
        didStart
      }))
    );

    return merge(
      willStart$.pipe(
        filter((readyHooks) => readyHooks.length),
        mapTo({ type: hookType, status: INTERRUPTION_DID_START })
      ),
      didStart$
    ).subscribe(this.interruptions$);
  }

  static createHookStream(source$) {
    return source$.pipe(scan((curr, next) => [...curr, next], []), shareReplay(1));
  }

  static interruptionToObservable({ hook, data }, cancel$) {
    const { onDidStart, onResolve, onCancel } = hook;

    return race(onDidStart(data), cancel$.pipe(mapTo({ resolved: false })))
      .pipe(
        take(1),
        switchMap(({ resolved }) => (resolved ? onResolve : onCancel)(data).pipe(mapTo({ resolved })))
      );
  }

  static createInterruptionSequence(hooks, cancel$) {
    /**
     * Interruption Sequence creator:
     * given an array of interruption instances
     * return a sequential observable waiting for each successive
     * interruption to call cancellation or resolve logic
     * -> can early complete via cancel$ stream -
     */
    return hooks.slice(1)
      /* hooks[0] used as starting value of reduce */
      .reduce(
        (curr, hook) => curr.pipe(switchMap(({ resolved }) => (
          resolved
            ? Interruptor.interruptionToObservable(hook, cancel$)
            : of({ resolved })
        ))),
        Interruptor.interruptionToObservable(hooks[0], cancel$)
      );
  }

  static createInterruptionCancel(cancel$, interruptions$) {
    /**
     * Creates an observable emitting when source cancel$ emits
     * while an interruption is still under way and has not yet ended.
     * Emits the interruption hook type
     */
    return cancel$.pipe(
      withLatestFrom(interruptions$),
      filter(([, { status }]) => status === INTERRUPTION_DID_START),
      map(([, { type }]) => type),
      share()
    );
  }
}

Interruptor.READY_FLAGS = [
  INTERRUPTOR_READY,
  INTERRUPTION_CANCELED,
  INTERRUPTION_END
];

Interruptor.BEFORE_FLAGS = [
  INTERRUPTOR_READY,
  INTERRUPTION_DID_START,
  INTERRUPTION_WILL_START
];

Interruptor.AD_END_FLAGS = [
  INTERRUPTION_CANCELED,
  INTERRUPTION_END
];
