import { buffers, channel } from 'redux-saga';
import { all, call, fork, put, take } from 'redux-saga/effects';

/**
 * creates a queue
 *
 * @param {GeneratorFunction} [handler] request handler
 * @param {number} [workersCount=1] number of workers
 */
function* createConcurrentTaskQueue(handler, workersCount = 1) {
    // a channel to queue incoming action
    const queueChannel = yield call(channel, buffers.expanding());

    function* watcher() {
        // a channel to queue incoming tasks
        const workersChannel = yield call(channel, buffers.expanding());

        // create n worker 'threads'
        yield all(Array(workersCount).fill(fork(worker, workersChannel)));

        // wait for a task
        while (true) {
            // incoming task
            const action = yield take(queueChannel);

            // assign the task to one of the workers
            yield put(workersChannel, action);
        }
    }

    // a single worker
    function* worker(chan) {
        while (true) {
            // incoming task from workersChannel
            const action = yield take(chan);

            // dispatch action for possible side effects
            yield put(action);

            // handle it with the given handler arg
            yield handler(action.payload);
        }
    }

    return {
        watcher,
        queueChannel
    };
}

export default createConcurrentTaskQueue;
