import { eventChannel, buffers } from 'redux-saga';
import { call, cancelled, fork, take } from 'redux-saga/effects';

/**
 * An observer to subscribe to a Firestore query.onShapshot and process changes.
 */
class SnapshotObserver {
  constructor() {
    this.channels = {};
  }

  _getEventChannel(query, snapshotHandlerFn) {
    return eventChannel(emit => {
      return query.onSnapshot(snapshot => snapshotHandlerFn(emit, snapshot));
      // Use an expanding buffer to make sure to capture and process every message.
    }, buffers.expanding(10));
  }

  *_processChannel(key, channel, processorFn) {
    try {
      while (true) {
        const data = yield take(channel);
        yield call(processorFn, data);
      }
    } finally {
      if (yield cancelled()) {
        this.unsubscribe(key);
      }
    }
  }

  unsubscribe(key) {
    if (!!this.channels[`${key}`]) {
      this.channels[`${key}`].close();
      delete this.channels[`${key}`];
    }
  }

  /**
   * Setups a firebase onSnapshot subscription. Returns a forked subscription processor task.
   * NOTE: `subscribe` cannot be a generator, it will block until the forked process finishes (only when cancelled).
   * @param {object} query A firebase query that will have `onSnapshot` applied to it.
   * @param {fn} processorFn A function expecting whatever is emitted from `snapshotHandlerFn`.
   * @param {fn} snapshotHandlerFn A function expecting (emit, snapshot), which can emit data to be handled by `processorFn`.
   * @param {string} key A subscription key; will unsubscribe by key prior to making new subscriptions.
   */
  subscribe({ query, processorFn, snapshotHandlerFn, key }) {
    const _key = `${key}`;

    // First unsubscribe from an existing subscription.
    this.unsubscribe(_key);

    const channel = this._getEventChannel(query, snapshotHandlerFn);
    this.channels[_key] = channel;
    return fork([this, this._processChannel], _key, channel, processorFn);
  }
}

export default SnapshotObserver;
