Очередь с одновременным выполнением нескольких задач

Написал для одного проекта очередь заданий, которые могут выполняться одновременно.

Модуль использует Backbone.Events для оповещения о ходе выполнения задач и несколько полезных функций из Underscore. Экспорт объекта выполняется через мой фреймворк.


/**
 * Queue for simultaneous task execution.
 * Execution method MUST return the promise object.
 *
 * @param limit {Integer} number of simultaneous tasks
 * @event schedule
 * @event before
 * @event after
 */
(function () {

    var Task = function (obj, execMethod) {
        _.extend(this, {
            id: _.uniqueId("queueitem-"),
            obj: obj,
            execMethod: execMethod,
            active: false
        });
    };

    _.extend(Task.prototype, {
        run: function () {
            var func, value;

            this.active = true;

            func = this.obj[this.execMethod];
            if (_.isFunction(func)) {
                value = func.call(this.obj);
            }
            // return promise object
            return value;
        }
    });

    function runTasks() {
        var activeTasks = _.filter(queue, function (task) {
            return task.active;
        });

        if (queue.length > 0 && activeTasks.length < limit) {
            // we can run another task
            var candidate = _.find(queue, function (task) {
                return !task.active;
            });

            if (candidate) {
                Q.trigger("before", candidate.obj);
                var taskDfd = candidate.run();
                Q.trigger("after", candidate.obj, taskDfd);
                if (taskDfd) {
                    taskDfd.always(function () {
                        var i, id = candidate.id;
                        for (i = 0; i < queue.length; i++) {
                            if (queue[i].id === id) {
                                queue.splice(i, 1);
                                break;
                            }
                        }
                        runTasks();
                    });
                }
                // check tasks one more time
                setTimeout(runTasks, 500);
            }
        }
    }

    var queue, limit;

    var Q = _.extend({
        init: function (opts) {
            queue = [];
            limit = opts.limit;
        },
        schedule: function (obj, execMethod) {
            var task = new Task(obj, execMethod);
            if (queue) {
                queue.push(task);
                Q.trigger("schedule", obj);
                runTasks();
            }
        }
    }, Backbone.Events);

    App.namespace("App.Queue", Q);

}());

Очередь конфигурируется с помощью одного обязательного параметра limit, который задает количество одновременно выполняемых задач.


App.Queue.init({limit: 5});

var message, i;
for (i = 0; i < 20; i += 1) {
    message = new Message("text " + i);
    App.Queue.schedule(message, "delivery");
}

В этом примере будет сгенерировано 20 объектов сообщений, у которых есть метод delivery , реализующий всю работу по доставке этого сообщения. Они все будут сразу поставлены в очередь, но одновременно отправляться смогут только 5 сообщений.

После добавления задания в очередь генерируется событие schedule, а до и после запуска задачи — before и after соответственно. В событии after в качестве параметра передается promise-объект задачи. Через него можно так же отслеживать окончание её выполнения в другом модуле.


App.Queue.on("after", function (messagePromise) {
    messagePromise.done(function () {
        console.log("message " + this.get("id") + " was delivered");
    });
    messagePromise.fail(function () {
        console.log("delivering message " + this.get("id") + " was failed");
    });
});

Контекстом во всех событиях выступает объект, который ставился в очередь.