| /* |
| * Copyright (C) 2005-2011 Martin Willi |
| * Copyright (C) 2011 revosec AG |
| * Copyright (C) 2008-2013 Tobias Brunner |
| * Copyright (C) 2005 Jan Hutter |
| * HSR Hochschule fuer Technik Rapperswil |
| * |
| * This program is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License as published by the |
| * Free Software Foundation; either version 2 of the License, or (at your |
| * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. |
| * |
| * This program is distributed in the hope that it will be useful, but |
| * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY |
| * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * for more details. |
| */ |
| |
| #include <stdlib.h> |
| #include <string.h> |
| #include <errno.h> |
| |
| #include "processor.h" |
| |
| #include <utils/debug.h> |
| #include <threading/thread.h> |
| #include <threading/condvar.h> |
| #include <threading/mutex.h> |
| #include <threading/thread_value.h> |
| #include <collections/linked_list.h> |
| |
| typedef struct private_processor_t private_processor_t; |
| |
| /** |
| * Private data of processor_t class. |
| */ |
| struct private_processor_t { |
| |
| /** |
| * Public processor_t interface. |
| */ |
| processor_t public; |
| |
| /** |
| * Number of running threads |
| */ |
| u_int total_threads; |
| |
| /** |
| * Desired number of threads |
| */ |
| u_int desired_threads; |
| |
| /** |
| * Number of threads currently working, for each priority |
| */ |
| u_int working_threads[JOB_PRIO_MAX]; |
| |
| /** |
| * All threads managed in the pool (including threads that have been |
| * canceled, this allows to join them later), as worker_thread_t |
| */ |
| linked_list_t *threads; |
| |
| /** |
| * A list of queued jobs for each priority |
| */ |
| linked_list_t *jobs[JOB_PRIO_MAX]; |
| |
| /** |
| * Threads reserved for each priority |
| */ |
| int prio_threads[JOB_PRIO_MAX]; |
| |
| /** |
| * access to job lists is locked through this mutex |
| */ |
| mutex_t *mutex; |
| |
| /** |
| * Condvar to wait for new jobs |
| */ |
| condvar_t *job_added; |
| |
| /** |
| * Condvar to wait for terminated threads |
| */ |
| condvar_t *thread_terminated; |
| }; |
| |
| /** |
| * Worker thread |
| */ |
| typedef struct { |
| |
| /** |
| * Reference to the processor |
| */ |
| private_processor_t *processor; |
| |
| /** |
| * The actual thread |
| */ |
| thread_t *thread; |
| |
| /** |
| * Job currently being executed by this worker thread |
| */ |
| job_t *job; |
| |
| /** |
| * Priority of the current job |
| */ |
| job_priority_t priority; |
| |
| } worker_thread_t; |
| |
| static void process_jobs(worker_thread_t *worker); |
| |
| /** |
| * restart a terminated thread |
| */ |
| static void restart(worker_thread_t *worker) |
| { |
| private_processor_t *this = worker->processor; |
| job_t *job; |
| |
| DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id()); |
| |
| this->mutex->lock(this->mutex); |
| /* cleanup worker thread */ |
| this->working_threads[worker->priority]--; |
| worker->job->status = JOB_STATUS_CANCELED; |
| job = worker->job; |
| /* unset the job before releasing the mutex, otherwise cancel() might |
| * interfere */ |
| worker->job = NULL; |
| /* release mutex to avoid deadlocks if the same lock is required |
| * during queue_job() and in the destructor called here */ |
| this->mutex->unlock(this->mutex); |
| job->destroy(job); |
| this->mutex->lock(this->mutex); |
| |
| /* respawn thread if required */ |
| if (this->desired_threads >= this->total_threads) |
| { |
| worker_thread_t *new_worker; |
| |
| INIT(new_worker, |
| .processor = this, |
| ); |
| new_worker->thread = thread_create((thread_main_t)process_jobs, |
| new_worker); |
| if (new_worker->thread) |
| { |
| this->threads->insert_last(this->threads, new_worker); |
| this->mutex->unlock(this->mutex); |
| return; |
| } |
| free(new_worker); |
| } |
| this->total_threads--; |
| this->thread_terminated->signal(this->thread_terminated); |
| this->mutex->unlock(this->mutex); |
| } |
| |
| /** |
| * Get number of idle threads, non-locking variant |
| */ |
| static u_int get_idle_threads_nolock(private_processor_t *this) |
| { |
| u_int count, i; |
| |
| count = this->total_threads; |
| for (i = 0; i < JOB_PRIO_MAX; i++) |
| { |
| count -= this->working_threads[i]; |
| } |
| return count; |
| } |
| |
| /** |
| * Get a job from any job queue, starting with the highest priority. |
| * |
| * this->mutex is expected to be locked. |
| */ |
| static bool get_job(private_processor_t *this, worker_thread_t *worker) |
| { |
| int i, reserved = 0, idle; |
| |
| idle = get_idle_threads_nolock(this); |
| |
| for (i = 0; i < JOB_PRIO_MAX; i++) |
| { |
| if (reserved && reserved >= idle) |
| { |
| DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, " |
| "but %d reserved for higher priorities", |
| job_priority_names, i, idle, reserved); |
| /* wait until a job of higher priority gets queued */ |
| return FALSE; |
| } |
| if (this->working_threads[i] < this->prio_threads[i]) |
| { |
| reserved += this->prio_threads[i] - this->working_threads[i]; |
| } |
| if (this->jobs[i]->remove_first(this->jobs[i], |
| (void**)&worker->job) == SUCCESS) |
| { |
| worker->priority = i; |
| return TRUE; |
| } |
| } |
| return FALSE; |
| } |
| |
| /** |
| * Process a single job (provided in worker->job, worker->priority is also |
| * expected to be set) |
| * |
| * this->mutex is expected to be locked. |
| */ |
| static void process_job(private_processor_t *this, worker_thread_t *worker) |
| { |
| job_t *to_destroy = NULL; |
| job_requeue_t requeue; |
| |
| this->working_threads[worker->priority]++; |
| worker->job->status = JOB_STATUS_EXECUTING; |
| this->mutex->unlock(this->mutex); |
| /* canceled threads are restarted to get a constant pool */ |
| thread_cleanup_push((thread_cleanup_t)restart, worker); |
| while (TRUE) |
| { |
| requeue = worker->job->execute(worker->job); |
| if (requeue.type != JOB_REQUEUE_TYPE_DIRECT) |
| { |
| break; |
| } |
| else if (!worker->job->cancel) |
| { /* only allow cancelable jobs to requeue directly */ |
| requeue.type = JOB_REQUEUE_TYPE_FAIR; |
| break; |
| } |
| } |
| thread_cleanup_pop(FALSE); |
| this->mutex->lock(this->mutex); |
| this->working_threads[worker->priority]--; |
| if (worker->job->status == JOB_STATUS_CANCELED) |
| { /* job was canceled via a custom cancel() method or did not |
| * use JOB_REQUEUE_TYPE_DIRECT */ |
| to_destroy = worker->job; |
| } |
| else |
| { |
| switch (requeue.type) |
| { |
| case JOB_REQUEUE_TYPE_NONE: |
| worker->job->status = JOB_STATUS_DONE; |
| to_destroy = worker->job; |
| break; |
| case JOB_REQUEUE_TYPE_FAIR: |
| worker->job->status = JOB_STATUS_QUEUED; |
| this->jobs[worker->priority]->insert_last( |
| this->jobs[worker->priority], worker->job); |
| this->job_added->signal(this->job_added); |
| break; |
| case JOB_REQUEUE_TYPE_SCHEDULE: |
| /* scheduler_t does not hold its lock when queuing jobs |
| * so this should be safe without unlocking our mutex */ |
| switch (requeue.schedule) |
| { |
| case JOB_SCHEDULE: |
| lib->scheduler->schedule_job(lib->scheduler, |
| worker->job, requeue.time.rel); |
| break; |
| case JOB_SCHEDULE_MS: |
| lib->scheduler->schedule_job_ms(lib->scheduler, |
| worker->job, requeue.time.rel); |
| break; |
| case JOB_SCHEDULE_TV: |
| lib->scheduler->schedule_job_tv(lib->scheduler, |
| worker->job, requeue.time.abs); |
| break; |
| } |
| break; |
| default: |
| break; |
| } |
| } |
| /* unset the current job to avoid interference with cancel() when |
| * destroying the job below */ |
| worker->job = NULL; |
| |
| if (to_destroy) |
| { /* release mutex to avoid deadlocks if the same lock is required |
| * during queue_job() and in the destructor called here */ |
| this->mutex->unlock(this->mutex); |
| to_destroy->destroy(to_destroy); |
| this->mutex->lock(this->mutex); |
| } |
| } |
| |
| /** |
| * Process queued jobs, called by the worker threads |
| */ |
| static void process_jobs(worker_thread_t *worker) |
| { |
| private_processor_t *this = worker->processor; |
| |
| /* worker threads are not cancelable by default */ |
| thread_cancelability(FALSE); |
| |
| DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id()); |
| |
| this->mutex->lock(this->mutex); |
| while (this->desired_threads >= this->total_threads) |
| { |
| if (get_job(this, worker)) |
| { |
| process_job(this, worker); |
| } |
| else |
| { |
| this->job_added->wait(this->job_added, this->mutex); |
| } |
| } |
| this->total_threads--; |
| this->thread_terminated->signal(this->thread_terminated); |
| this->mutex->unlock(this->mutex); |
| } |
| |
| METHOD(processor_t, get_total_threads, u_int, |
| private_processor_t *this) |
| { |
| u_int count; |
| |
| this->mutex->lock(this->mutex); |
| count = this->total_threads; |
| this->mutex->unlock(this->mutex); |
| return count; |
| } |
| |
| METHOD(processor_t, get_idle_threads, u_int, |
| private_processor_t *this) |
| { |
| u_int count; |
| |
| this->mutex->lock(this->mutex); |
| count = get_idle_threads_nolock(this); |
| this->mutex->unlock(this->mutex); |
| return count; |
| } |
| |
| /** |
| * Check priority bounds |
| */ |
| static job_priority_t sane_prio(job_priority_t prio) |
| { |
| if ((int)prio < 0 || prio >= JOB_PRIO_MAX) |
| { |
| return JOB_PRIO_MAX - 1; |
| } |
| return prio; |
| } |
| |
| METHOD(processor_t, get_working_threads, u_int, |
| private_processor_t *this, job_priority_t prio) |
| { |
| u_int count; |
| |
| this->mutex->lock(this->mutex); |
| count = this->working_threads[sane_prio(prio)]; |
| this->mutex->unlock(this->mutex); |
| return count; |
| } |
| |
| METHOD(processor_t, get_job_load, u_int, |
| private_processor_t *this, job_priority_t prio) |
| { |
| u_int load; |
| |
| prio = sane_prio(prio); |
| this->mutex->lock(this->mutex); |
| load = this->jobs[prio]->get_count(this->jobs[prio]); |
| this->mutex->unlock(this->mutex); |
| return load; |
| } |
| |
| METHOD(processor_t, queue_job, void, |
| private_processor_t *this, job_t *job) |
| { |
| job_priority_t prio; |
| |
| prio = sane_prio(job->get_priority(job)); |
| job->status = JOB_STATUS_QUEUED; |
| |
| this->mutex->lock(this->mutex); |
| this->jobs[prio]->insert_last(this->jobs[prio], job); |
| this->job_added->signal(this->job_added); |
| this->mutex->unlock(this->mutex); |
| } |
| |
| METHOD(processor_t, execute_job, void, |
| private_processor_t *this, job_t *job) |
| { |
| job_priority_t prio; |
| bool queued = FALSE; |
| |
| this->mutex->lock(this->mutex); |
| if (this->desired_threads && get_idle_threads_nolock(this)) |
| { |
| prio = sane_prio(job->get_priority(job)); |
| job->status = JOB_STATUS_QUEUED; |
| /* insert job in front to execute it immediately */ |
| this->jobs[prio]->insert_first(this->jobs[prio], job); |
| queued = TRUE; |
| } |
| this->job_added->signal(this->job_added); |
| this->mutex->unlock(this->mutex); |
| |
| if (!queued) |
| { |
| job->execute(job); |
| job->destroy(job); |
| } |
| } |
| |
| METHOD(processor_t, set_threads, void, |
| private_processor_t *this, u_int count) |
| { |
| int i; |
| |
| this->mutex->lock(this->mutex); |
| for (i = 0; i < JOB_PRIO_MAX; i++) |
| { |
| this->prio_threads[i] = lib->settings->get_int(lib->settings, |
| "%s.processor.priority_threads.%N", 0, lib->ns, |
| job_priority_names, i); |
| } |
| if (count > this->total_threads) |
| { /* increase thread count */ |
| worker_thread_t *worker; |
| int i; |
| |
| this->desired_threads = count; |
| DBG1(DBG_JOB, "spawning %d worker threads", count - this->total_threads); |
| for (i = this->total_threads; i < count; i++) |
| { |
| INIT(worker, |
| .processor = this, |
| ); |
| worker->thread = thread_create((thread_main_t)process_jobs, worker); |
| if (worker->thread) |
| { |
| this->threads->insert_last(this->threads, worker); |
| this->total_threads++; |
| } |
| else |
| { |
| free(worker); |
| } |
| } |
| } |
| else if (count < this->total_threads) |
| { /* decrease thread count */ |
| this->desired_threads = count; |
| } |
| this->job_added->broadcast(this->job_added); |
| this->mutex->unlock(this->mutex); |
| } |
| |
| METHOD(processor_t, cancel, void, |
| private_processor_t *this) |
| { |
| enumerator_t *enumerator; |
| worker_thread_t *worker; |
| job_t *job; |
| int i; |
| |
| this->mutex->lock(this->mutex); |
| this->desired_threads = 0; |
| /* cancel potentially blocking jobs */ |
| enumerator = this->threads->create_enumerator(this->threads); |
| while (enumerator->enumerate(enumerator, (void**)&worker)) |
| { |
| if (worker->job && worker->job->cancel) |
| { |
| worker->job->status = JOB_STATUS_CANCELED; |
| if (!worker->job->cancel(worker->job)) |
| { /* job requests to be canceled explicitly, otherwise we assume |
| * the thread terminates itself and can be joined */ |
| worker->thread->cancel(worker->thread); |
| } |
| } |
| } |
| enumerator->destroy(enumerator); |
| while (this->total_threads > 0) |
| { |
| this->job_added->broadcast(this->job_added); |
| this->thread_terminated->wait(this->thread_terminated, this->mutex); |
| } |
| while (this->threads->remove_first(this->threads, |
| (void**)&worker) == SUCCESS) |
| { |
| worker->thread->join(worker->thread); |
| free(worker); |
| } |
| for (i = 0; i < JOB_PRIO_MAX; i++) |
| { |
| while (this->jobs[i]->remove_first(this->jobs[i], |
| (void**)&job) == SUCCESS) |
| { |
| job->destroy(job); |
| } |
| } |
| this->mutex->unlock(this->mutex); |
| } |
| |
| METHOD(processor_t, destroy, void, |
| private_processor_t *this) |
| { |
| int i; |
| |
| cancel(this); |
| this->thread_terminated->destroy(this->thread_terminated); |
| this->job_added->destroy(this->job_added); |
| this->mutex->destroy(this->mutex); |
| for (i = 0; i < JOB_PRIO_MAX; i++) |
| { |
| this->jobs[i]->destroy(this->jobs[i]); |
| } |
| this->threads->destroy(this->threads); |
| free(this); |
| } |
| |
| /* |
| * Described in header. |
| */ |
| processor_t *processor_create() |
| { |
| private_processor_t *this; |
| int i; |
| |
| INIT(this, |
| .public = { |
| .get_total_threads = _get_total_threads, |
| .get_idle_threads = _get_idle_threads, |
| .get_working_threads = _get_working_threads, |
| .get_job_load = _get_job_load, |
| .queue_job = _queue_job, |
| .execute_job = _execute_job, |
| .set_threads = _set_threads, |
| .cancel = _cancel, |
| .destroy = _destroy, |
| }, |
| .threads = linked_list_create(), |
| .mutex = mutex_create(MUTEX_TYPE_DEFAULT), |
| .job_added = condvar_create(CONDVAR_TYPE_DEFAULT), |
| .thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT), |
| ); |
| |
| for (i = 0; i < JOB_PRIO_MAX; i++) |
| { |
| this->jobs[i] = linked_list_create(); |
| } |
| return &this->public; |
| } |