| /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ |
| /* ***** BEGIN LICENSE BLOCK ***** |
| * Version: MPL 1.1/GPL 2.0/LGPL 2.1 |
| * |
| * The contents of this file are subject to the Mozilla Public License Version |
| * 1.1 (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * http://www.mozilla.org/MPL/ |
| * |
| * Software distributed under the License is distributed on an "AS IS" basis, |
| * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License |
| * for the specific language governing rights and limitations under the |
| * License. |
| * |
| * The Original Code is the Netscape Portable Runtime (NSPR). |
| * |
| * The Initial Developer of the Original Code is |
| * Netscape Communications Corporation. |
| * Portions created by the Initial Developer are Copyright (C) 1999-2000 |
| * the Initial Developer. All Rights Reserved. |
| * |
| * Contributor(s): |
| * |
| * Alternatively, the contents of this file may be used under the terms of |
| * either the GNU General Public License Version 2 or later (the "GPL"), or |
| * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), |
| * in which case the provisions of the GPL or the LGPL are applicable instead |
| * of those above. If you wish to allow use of your version of this file only |
| * under the terms of either the GPL or the LGPL, and not to allow others to |
| * use your version of this file under the terms of the MPL, indicate your |
| * decision by deleting the provisions above and replace them with the notice |
| * and other provisions required by the GPL or the LGPL. If you do not delete |
| * the provisions above, a recipient may use your version of this file under |
| * the terms of any one of the MPL, the GPL or the LGPL. |
| * |
| * ***** END LICENSE BLOCK ***** */ |
| |
| #include "nspr.h" |
| |
| /* |
| * Thread pools |
| * Thread pools create and manage threads to provide support for |
| * scheduling jobs onto one or more threads. |
| * |
| */ |
| #ifdef OPT_WINNT |
| #include <windows.h> |
| #endif |
| |
| /* |
| * worker thread |
| */ |
| typedef struct wthread { |
| PRCList links; |
| PRThread *thread; |
| } wthread; |
| |
| /* |
| * queue of timer jobs |
| */ |
| typedef struct timer_jobq { |
| PRCList list; |
| PRLock *lock; |
| PRCondVar *cv; |
| PRInt32 cnt; |
| PRCList wthreads; |
| } timer_jobq; |
| |
| /* |
| * queue of jobs |
| */ |
| typedef struct tp_jobq { |
| PRCList list; |
| PRInt32 cnt; |
| PRLock *lock; |
| PRCondVar *cv; |
| PRCList wthreads; |
| #ifdef OPT_WINNT |
| HANDLE nt_completion_port; |
| #endif |
| } tp_jobq; |
| |
| /* |
| * queue of IO jobs |
| */ |
| typedef struct io_jobq { |
| PRCList list; |
| PRPollDesc *pollfds; |
| PRInt32 npollfds; |
| PRJob **polljobs; |
| PRLock *lock; |
| PRInt32 cnt; |
| PRFileDesc *notify_fd; |
| PRCList wthreads; |
| } io_jobq; |
| |
| /* |
| * Threadpool |
| */ |
| struct PRThreadPool { |
| PRInt32 init_threads; |
| PRInt32 max_threads; |
| PRInt32 current_threads; |
| PRInt32 idle_threads; |
| PRUint32 stacksize; |
| tp_jobq jobq; |
| io_jobq ioq; |
| timer_jobq timerq; |
| PRLock *join_lock; /* used with jobp->join_cv */ |
| PRCondVar *shutdown_cv; |
| PRBool shutdown; |
| }; |
| |
| typedef enum io_op_type |
| { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; |
| |
| #ifdef OPT_WINNT |
| typedef struct NT_notifier { |
| OVERLAPPED overlapped; /* must be first */ |
| PRJob *jobp; |
| } NT_notifier; |
| #endif |
| |
| struct PRJob { |
| PRCList links; /* for linking jobs */ |
| PRBool on_ioq; /* job on ioq */ |
| PRBool on_timerq; /* job on timerq */ |
| PRJobFn job_func; |
| void *job_arg; |
| PRCondVar *join_cv; |
| PRBool join_wait; /* == PR_TRUE, when waiting to join */ |
| PRCondVar *cancel_cv; /* for cancelling IO jobs */ |
| PRBool cancel_io; /* for cancelling IO jobs */ |
| PRThreadPool *tpool; /* back pointer to thread pool */ |
| PRJobIoDesc *iod; |
| io_op_type io_op; |
| PRInt16 io_poll_flags; |
| PRNetAddr *netaddr; |
| PRIntervalTime timeout; /* relative value */ |
| PRIntervalTime absolute; |
| #ifdef OPT_WINNT |
| NT_notifier nt_notifier; |
| #endif |
| }; |
| |
| #define JOB_LINKS_PTR(_qp) \ |
| ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links))) |
| |
| #define WTHREAD_LINKS_PTR(_qp) \ |
| ((wthread *) ((char *) (_qp) - offsetof(wthread, links))) |
| |
| #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv) |
| |
| #define JOIN_NOTIFY(_jobp) \ |
| PR_BEGIN_MACRO \ |
| PR_Lock(_jobp->tpool->join_lock); \ |
| _jobp->join_wait = PR_FALSE; \ |
| PR_NotifyCondVar(_jobp->join_cv); \ |
| PR_Unlock(_jobp->tpool->join_lock); \ |
| PR_END_MACRO |
| |
| #define CANCEL_IO_JOB(jobp) \ |
| PR_BEGIN_MACRO \ |
| jobp->cancel_io = PR_FALSE; \ |
| jobp->on_ioq = PR_FALSE; \ |
| PR_REMOVE_AND_INIT_LINK(&jobp->links); \ |
| tp->ioq.cnt--; \ |
| PR_NotifyCondVar(jobp->cancel_cv); \ |
| PR_END_MACRO |
| |
| static void delete_job(PRJob *jobp); |
| static PRThreadPool * alloc_threadpool(void); |
| static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp); |
| static void notify_ioq(PRThreadPool *tp); |
| static void notify_timerq(PRThreadPool *tp); |
| |
| /* |
| * locks are acquired in the following order |
| * |
| * tp->ioq.lock,tp->timerq.lock |
| * | |
| * V |
| * tp->jobq->lock |
| */ |
| |
| /* |
| * worker thread function |
| */ |
| static void wstart(void *arg) |
| { |
| PRThreadPool *tp = (PRThreadPool *) arg; |
| PRCList *head; |
| |
| /* |
| * execute jobs until shutdown |
| */ |
| while (!tp->shutdown) { |
| PRJob *jobp; |
| #ifdef OPT_WINNT |
| BOOL rv; |
| DWORD unused, shutdown; |
| LPOVERLAPPED olp; |
| |
| PR_Lock(tp->jobq.lock); |
| tp->idle_threads++; |
| PR_Unlock(tp->jobq.lock); |
| rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, |
| &unused, &shutdown, &olp, INFINITE); |
| |
| PR_ASSERT(rv); |
| if (shutdown) |
| break; |
| jobp = ((NT_notifier *) olp)->jobp; |
| PR_Lock(tp->jobq.lock); |
| tp->idle_threads--; |
| tp->jobq.cnt--; |
| PR_Unlock(tp->jobq.lock); |
| #else |
| |
| PR_Lock(tp->jobq.lock); |
| while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { |
| tp->idle_threads++; |
| PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); |
| tp->idle_threads--; |
| } |
| if (tp->shutdown) { |
| PR_Unlock(tp->jobq.lock); |
| break; |
| } |
| head = PR_LIST_HEAD(&tp->jobq.list); |
| /* |
| * remove job from queue |
| */ |
| PR_REMOVE_AND_INIT_LINK(head); |
| tp->jobq.cnt--; |
| jobp = JOB_LINKS_PTR(head); |
| PR_Unlock(tp->jobq.lock); |
| #endif |
| |
| jobp->job_func(jobp->job_arg); |
| if (!JOINABLE_JOB(jobp)) { |
| delete_job(jobp); |
| } else { |
| JOIN_NOTIFY(jobp); |
| } |
| } |
| PR_Lock(tp->jobq.lock); |
| tp->current_threads--; |
| PR_Unlock(tp->jobq.lock); |
| } |
| |
| /* |
| * add a job to the work queue |
| */ |
| static void |
| add_to_jobq(PRThreadPool *tp, PRJob *jobp) |
| { |
| /* |
| * add to jobq |
| */ |
| #ifdef OPT_WINNT |
| PR_Lock(tp->jobq.lock); |
| tp->jobq.cnt++; |
| PR_Unlock(tp->jobq.lock); |
| /* |
| * notify worker thread(s) |
| */ |
| PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, |
| FALSE, &jobp->nt_notifier.overlapped); |
| #else |
| PR_Lock(tp->jobq.lock); |
| PR_APPEND_LINK(&jobp->links,&tp->jobq.list); |
| tp->jobq.cnt++; |
| if ((tp->idle_threads < tp->jobq.cnt) && |
| (tp->current_threads < tp->max_threads)) { |
| wthread *wthrp; |
| /* |
| * increment thread count and unlock the jobq lock |
| */ |
| tp->current_threads++; |
| PR_Unlock(tp->jobq.lock); |
| /* create new worker thread */ |
| wthrp = PR_NEWZAP(wthread); |
| if (wthrp) { |
| wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart, |
| tp, PR_PRIORITY_NORMAL, |
| PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); |
| if (NULL == wthrp->thread) { |
| PR_DELETE(wthrp); /* this sets wthrp to NULL */ |
| } |
| } |
| PR_Lock(tp->jobq.lock); |
| if (NULL == wthrp) { |
| tp->current_threads--; |
| } else { |
| PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); |
| } |
| } |
| /* |
| * wakeup a worker thread |
| */ |
| PR_NotifyCondVar(tp->jobq.cv); |
| PR_Unlock(tp->jobq.lock); |
| #endif |
| } |
| |
| /* |
| * io worker thread function |
| */ |
| static void io_wstart(void *arg) |
| { |
| PRThreadPool *tp = (PRThreadPool *) arg; |
| int pollfd_cnt, pollfds_used; |
| int rv; |
| PRCList *qp, *nextqp; |
| PRPollDesc *pollfds; |
| PRJob **polljobs; |
| int poll_timeout; |
| PRIntervalTime now; |
| |
| /* |
| * scan io_jobq |
| * construct poll list |
| * call PR_Poll |
| * for all fds, for which poll returns true, move the job to |
| * jobq and wakeup worker thread. |
| */ |
| while (!tp->shutdown) { |
| PRJob *jobp; |
| |
| pollfd_cnt = tp->ioq.cnt + 10; |
| if (pollfd_cnt > tp->ioq.npollfds) { |
| |
| /* |
| * re-allocate pollfd array if the current one is not large |
| * enough |
| */ |
| if (NULL != tp->ioq.pollfds) |
| PR_Free(tp->ioq.pollfds); |
| tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * |
| (sizeof(PRPollDesc) + sizeof(PRJob *))); |
| PR_ASSERT(NULL != tp->ioq.pollfds); |
| /* |
| * array of pollfds |
| */ |
| pollfds = tp->ioq.pollfds; |
| tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); |
| /* |
| * parallel array of jobs |
| */ |
| polljobs = tp->ioq.polljobs; |
| tp->ioq.npollfds = pollfd_cnt; |
| } |
| |
| pollfds_used = 0; |
| /* |
| * add the notify fd; used for unblocking io thread(s) |
| */ |
| pollfds[pollfds_used].fd = tp->ioq.notify_fd; |
| pollfds[pollfds_used].in_flags = PR_POLL_READ; |
| pollfds[pollfds_used].out_flags = 0; |
| polljobs[pollfds_used] = NULL; |
| pollfds_used++; |
| /* |
| * fill in the pollfd array |
| */ |
| PR_Lock(tp->ioq.lock); |
| for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { |
| nextqp = qp->next; |
| jobp = JOB_LINKS_PTR(qp); |
| if (jobp->cancel_io) { |
| CANCEL_IO_JOB(jobp); |
| continue; |
| } |
| if (pollfds_used == (pollfd_cnt)) |
| break; |
| pollfds[pollfds_used].fd = jobp->iod->socket; |
| pollfds[pollfds_used].in_flags = jobp->io_poll_flags; |
| pollfds[pollfds_used].out_flags = 0; |
| polljobs[pollfds_used] = jobp; |
| |
| pollfds_used++; |
| } |
| if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { |
| qp = tp->ioq.list.next; |
| jobp = JOB_LINKS_PTR(qp); |
| if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) |
| poll_timeout = PR_INTERVAL_NO_TIMEOUT; |
| else if (PR_INTERVAL_NO_WAIT == jobp->timeout) |
| poll_timeout = PR_INTERVAL_NO_WAIT; |
| else { |
| poll_timeout = jobp->absolute - PR_IntervalNow(); |
| if (poll_timeout <= 0) /* already timed out */ |
| poll_timeout = PR_INTERVAL_NO_WAIT; |
| } |
| } else { |
| poll_timeout = PR_INTERVAL_NO_TIMEOUT; |
| } |
| PR_Unlock(tp->ioq.lock); |
| |
| /* |
| * XXXX |
| * should retry if more jobs have been added to the queue? |
| * |
| */ |
| PR_ASSERT(pollfds_used <= pollfd_cnt); |
| rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); |
| |
| if (tp->shutdown) { |
| break; |
| } |
| |
| if (rv > 0) { |
| /* |
| * at least one io event is set |
| */ |
| PRStatus rval_status; |
| PRInt32 index; |
| |
| PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); |
| /* |
| * reset the pollable event, if notified |
| */ |
| if (pollfds[0].out_flags & PR_POLL_READ) { |
| rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); |
| PR_ASSERT(PR_SUCCESS == rval_status); |
| } |
| |
| for(index = 1; index < (pollfds_used); index++) { |
| PRInt16 events = pollfds[index].in_flags; |
| PRInt16 revents = pollfds[index].out_flags; |
| jobp = polljobs[index]; |
| |
| if ((revents & PR_POLL_NVAL) || /* busted in all cases */ |
| (revents & PR_POLL_ERR) || |
| ((events & PR_POLL_WRITE) && |
| (revents & PR_POLL_HUP))) { /* write op & hup */ |
| PR_Lock(tp->ioq.lock); |
| if (jobp->cancel_io) { |
| CANCEL_IO_JOB(jobp); |
| PR_Unlock(tp->ioq.lock); |
| continue; |
| } |
| PR_REMOVE_AND_INIT_LINK(&jobp->links); |
| tp->ioq.cnt--; |
| jobp->on_ioq = PR_FALSE; |
| PR_Unlock(tp->ioq.lock); |
| |
| /* set error */ |
| if (PR_POLL_NVAL & revents) |
| jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; |
| else if (PR_POLL_HUP & revents) |
| jobp->iod->error = PR_CONNECT_RESET_ERROR; |
| else |
| jobp->iod->error = PR_IO_ERROR; |
| |
| /* |
| * add to jobq |
| */ |
| add_to_jobq(tp, jobp); |
| } else if (revents) { |
| /* |
| * add to jobq |
| */ |
| PR_Lock(tp->ioq.lock); |
| if (jobp->cancel_io) { |
| CANCEL_IO_JOB(jobp); |
| PR_Unlock(tp->ioq.lock); |
| continue; |
| } |
| PR_REMOVE_AND_INIT_LINK(&jobp->links); |
| tp->ioq.cnt--; |
| jobp->on_ioq = PR_FALSE; |
| PR_Unlock(tp->ioq.lock); |
| |
| if (jobp->io_op == JOB_IO_CONNECT) { |
| if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) |
| jobp->iod->error = 0; |
| else |
| jobp->iod->error = PR_GetError(); |
| } else |
| jobp->iod->error = 0; |
| |
| add_to_jobq(tp, jobp); |
| } |
| } |
| } |
| /* |
| * timeout processing |
| */ |
| now = PR_IntervalNow(); |
| PR_Lock(tp->ioq.lock); |
| for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { |
| nextqp = qp->next; |
| jobp = JOB_LINKS_PTR(qp); |
| if (jobp->cancel_io) { |
| CANCEL_IO_JOB(jobp); |
| continue; |
| } |
| if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) |
| break; |
| if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && |
| ((PRInt32)(jobp->absolute - now) > 0)) |
| break; |
| PR_REMOVE_AND_INIT_LINK(&jobp->links); |
| tp->ioq.cnt--; |
| jobp->on_ioq = PR_FALSE; |
| jobp->iod->error = PR_IO_TIMEOUT_ERROR; |
| add_to_jobq(tp, jobp); |
| } |
| PR_Unlock(tp->ioq.lock); |
| } |
| } |
| |
| /* |
| * timer worker thread function |
| */ |
| static void timer_wstart(void *arg) |
| { |
| PRThreadPool *tp = (PRThreadPool *) arg; |
| PRCList *qp; |
| PRIntervalTime timeout; |
| PRIntervalTime now; |
| |
| /* |
| * call PR_WaitCondVar with minimum value of all timeouts |
| */ |
| while (!tp->shutdown) { |
| PRJob *jobp; |
| |
| PR_Lock(tp->timerq.lock); |
| if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { |
| timeout = PR_INTERVAL_NO_TIMEOUT; |
| } else { |
| PRCList *qp; |
| |
| qp = tp->timerq.list.next; |
| jobp = JOB_LINKS_PTR(qp); |
| |
| timeout = jobp->absolute - PR_IntervalNow(); |
| if (timeout <= 0) |
| timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ |
| } |
| if (PR_INTERVAL_NO_WAIT != timeout) |
| PR_WaitCondVar(tp->timerq.cv, timeout); |
| if (tp->shutdown) { |
| PR_Unlock(tp->timerq.lock); |
| break; |
| } |
| /* |
| * move expired-timer jobs to jobq |
| */ |
| now = PR_IntervalNow(); |
| while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { |
| qp = tp->timerq.list.next; |
| jobp = JOB_LINKS_PTR(qp); |
| |
| if ((PRInt32)(jobp->absolute - now) > 0) { |
| break; |
| } |
| /* |
| * job timed out |
| */ |
| PR_REMOVE_AND_INIT_LINK(&jobp->links); |
| tp->timerq.cnt--; |
| jobp->on_timerq = PR_FALSE; |
| add_to_jobq(tp, jobp); |
| } |
| PR_Unlock(tp->timerq.lock); |
| } |
| } |
| |
| static void |
| delete_threadpool(PRThreadPool *tp) |
| { |
| if (NULL != tp) { |
| if (NULL != tp->shutdown_cv) |
| PR_DestroyCondVar(tp->shutdown_cv); |
| if (NULL != tp->jobq.cv) |
| PR_DestroyCondVar(tp->jobq.cv); |
| if (NULL != tp->jobq.lock) |
| PR_DestroyLock(tp->jobq.lock); |
| if (NULL != tp->join_lock) |
| PR_DestroyLock(tp->join_lock); |
| #ifdef OPT_WINNT |
| if (NULL != tp->jobq.nt_completion_port) |
| CloseHandle(tp->jobq.nt_completion_port); |
| #endif |
| /* Timer queue */ |
| if (NULL != tp->timerq.cv) |
| PR_DestroyCondVar(tp->timerq.cv); |
| if (NULL != tp->timerq.lock) |
| PR_DestroyLock(tp->timerq.lock); |
| |
| if (NULL != tp->ioq.lock) |
| PR_DestroyLock(tp->ioq.lock); |
| if (NULL != tp->ioq.pollfds) |
| PR_Free(tp->ioq.pollfds); |
| if (NULL != tp->ioq.notify_fd) |
| PR_DestroyPollableEvent(tp->ioq.notify_fd); |
| PR_Free(tp); |
| } |
| return; |
| } |
| |
| static PRThreadPool * |
| alloc_threadpool(void) |
| { |
| PRThreadPool *tp; |
| |
| tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp)); |
| if (NULL == tp) |
| goto failed; |
| tp->jobq.lock = PR_NewLock(); |
| if (NULL == tp->jobq.lock) |
| goto failed; |
| tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); |
| if (NULL == tp->jobq.cv) |
| goto failed; |
| tp->join_lock = PR_NewLock(); |
| if (NULL == tp->join_lock) |
| goto failed; |
| #ifdef OPT_WINNT |
| tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, |
| NULL, 0, 0); |
| if (NULL == tp->jobq.nt_completion_port) |
| goto failed; |
| #endif |
| |
| tp->ioq.lock = PR_NewLock(); |
| if (NULL == tp->ioq.lock) |
| goto failed; |
| |
| /* Timer queue */ |
| |
| tp->timerq.lock = PR_NewLock(); |
| if (NULL == tp->timerq.lock) |
| goto failed; |
| tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); |
| if (NULL == tp->timerq.cv) |
| goto failed; |
| |
| tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); |
| if (NULL == tp->shutdown_cv) |
| goto failed; |
| tp->ioq.notify_fd = PR_NewPollableEvent(); |
| if (NULL == tp->ioq.notify_fd) |
| goto failed; |
| return tp; |
| failed: |
| delete_threadpool(tp); |
| PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); |
| return NULL; |
| } |
| |
| /* Create thread pool */ |
| PR_IMPLEMENT(PRThreadPool *) |
| PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, |
| PRUint32 stacksize) |
| { |
| PRThreadPool *tp; |
| PRThread *thr; |
| int i; |
| wthread *wthrp; |
| |
| tp = alloc_threadpool(); |
| if (NULL == tp) |
| return NULL; |
| |
| tp->init_threads = initial_threads; |
| tp->max_threads = max_threads; |
| tp->stacksize = stacksize; |
| PR_INIT_CLIST(&tp->jobq.list); |
| PR_INIT_CLIST(&tp->ioq.list); |
| PR_INIT_CLIST(&tp->timerq.list); |
| PR_INIT_CLIST(&tp->jobq.wthreads); |
| PR_INIT_CLIST(&tp->ioq.wthreads); |
| PR_INIT_CLIST(&tp->timerq.wthreads); |
| tp->shutdown = PR_FALSE; |
| |
| PR_Lock(tp->jobq.lock); |
| for(i=0; i < initial_threads; ++i) { |
| |
| thr = PR_CreateThread(PR_USER_THREAD, wstart, |
| tp, PR_PRIORITY_NORMAL, |
| PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); |
| PR_ASSERT(thr); |
| wthrp = PR_NEWZAP(wthread); |
| PR_ASSERT(wthrp); |
| wthrp->thread = thr; |
| PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); |
| } |
| tp->current_threads = initial_threads; |
| |
| thr = PR_CreateThread(PR_USER_THREAD, io_wstart, |
| tp, PR_PRIORITY_NORMAL, |
| PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); |
| PR_ASSERT(thr); |
| wthrp = PR_NEWZAP(wthread); |
| PR_ASSERT(wthrp); |
| wthrp->thread = thr; |
| PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); |
| |
| thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, |
| tp, PR_PRIORITY_NORMAL, |
| PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); |
| PR_ASSERT(thr); |
| wthrp = PR_NEWZAP(wthread); |
| PR_ASSERT(wthrp); |
| wthrp->thread = thr; |
| PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); |
| |
| PR_Unlock(tp->jobq.lock); |
| return tp; |
| } |
| |
| static void |
| delete_job(PRJob *jobp) |
| { |
| if (NULL != jobp) { |
| if (NULL != jobp->join_cv) { |
| PR_DestroyCondVar(jobp->join_cv); |
| jobp->join_cv = NULL; |
| } |
| if (NULL != jobp->cancel_cv) { |
| PR_DestroyCondVar(jobp->cancel_cv); |
| jobp->cancel_cv = NULL; |
| } |
| PR_DELETE(jobp); |
| } |
| } |
| |
| static PRJob * |
| alloc_job(PRBool joinable, PRThreadPool *tp) |
| { |
| PRJob *jobp; |
| |
| jobp = PR_NEWZAP(PRJob); |
| if (NULL == jobp) |
| goto failed; |
| if (joinable) { |
| jobp->join_cv = PR_NewCondVar(tp->join_lock); |
| jobp->join_wait = PR_TRUE; |
| if (NULL == jobp->join_cv) |
| goto failed; |
| } else { |
| jobp->join_cv = NULL; |
| } |
| #ifdef OPT_WINNT |
| jobp->nt_notifier.jobp = jobp; |
| #endif |
| return jobp; |
| failed: |
| delete_job(jobp); |
| PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); |
| return NULL; |
| } |
| |
| /* queue a job */ |
| PR_IMPLEMENT(PRJob *) |
| PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable) |
| { |
| PRJob *jobp; |
| |
| jobp = alloc_job(joinable, tpool); |
| if (NULL == jobp) |
| return NULL; |
| |
| jobp->job_func = fn; |
| jobp->job_arg = arg; |
| jobp->tpool = tpool; |
| |
| add_to_jobq(tpool, jobp); |
| return jobp; |
| } |
| |
| /* queue a job, when a socket is readable or writeable */ |
| static PRJob * |
| queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, |
| PRBool joinable, io_op_type op) |
| { |
| PRJob *jobp; |
| PRIntervalTime now; |
| |
| jobp = alloc_job(joinable, tpool); |
| if (NULL == jobp) { |
| return NULL; |
| } |
| |
| /* |
| * Add a new job to io_jobq |
| * wakeup io worker thread |
| */ |
| |
| jobp->job_func = fn; |
| jobp->job_arg = arg; |
| jobp->tpool = tpool; |
| jobp->iod = iod; |
| if (JOB_IO_READ == op) { |
| jobp->io_op = JOB_IO_READ; |
| jobp->io_poll_flags = PR_POLL_READ; |
| } else if (JOB_IO_WRITE == op) { |
| jobp->io_op = JOB_IO_WRITE; |
| jobp->io_poll_flags = PR_POLL_WRITE; |
| } else if (JOB_IO_ACCEPT == op) { |
| jobp->io_op = JOB_IO_ACCEPT; |
| jobp->io_poll_flags = PR_POLL_READ; |
| } else if (JOB_IO_CONNECT == op) { |
| jobp->io_op = JOB_IO_CONNECT; |
| jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT; |
| } else { |
| delete_job(jobp); |
| PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); |
| return NULL; |
| } |
| |
| jobp->timeout = iod->timeout; |
| if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || |
| (PR_INTERVAL_NO_WAIT == iod->timeout)) { |
| jobp->absolute = iod->timeout; |
| } else { |
| now = PR_IntervalNow(); |
| jobp->absolute = now + iod->timeout; |
| } |
| |
| |
| PR_Lock(tpool->ioq.lock); |
| |
| if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || |
| (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { |
| PR_APPEND_LINK(&jobp->links,&tpool->ioq.list); |
| } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { |
| PR_INSERT_LINK(&jobp->links,&tpool->ioq.list); |
| } else { |
| PRCList *qp; |
| PRJob *tmp_jobp; |
| /* |
| * insert into the timeout-sorted ioq |
| */ |
| for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; |
| qp = qp->prev) { |
| tmp_jobp = JOB_LINKS_PTR(qp); |
| if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { |
| break; |
| } |
| } |
| PR_INSERT_AFTER(&jobp->links,qp); |
| } |
| |
| jobp->on_ioq = PR_TRUE; |
| tpool->ioq.cnt++; |
| /* |
| * notify io worker thread(s) |
| */ |
| PR_Unlock(tpool->ioq.lock); |
| notify_ioq(tpool); |
| return jobp; |
| } |
| |
| /* queue a job, when a socket is readable */ |
| PR_IMPLEMENT(PRJob *) |
| PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, |
| PRBool joinable) |
| { |
| return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); |
| } |
| |
| /* queue a job, when a socket is writeable */ |
| PR_IMPLEMENT(PRJob *) |
| PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg, |
| PRBool joinable) |
| { |
| return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); |
| } |
| |
| |
| /* queue a job, when a socket has a pending connection */ |
| PR_IMPLEMENT(PRJob *) |
| PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, |
| void * arg, PRBool joinable) |
| { |
| return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); |
| } |
| |
| /* queue a job, when a socket can be connected */ |
| PR_IMPLEMENT(PRJob *) |
| PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, |
| const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable) |
| { |
| PRStatus rv; |
| PRErrorCode err; |
| |
| rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT); |
| if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){ |
| /* connection pending */ |
| return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); |
| } else { |
| /* |
| * connection succeeded or failed; add to jobq right away |
| */ |
| if (rv == PR_FAILURE) |
| iod->error = err; |
| else |
| iod->error = 0; |
| return(PR_QueueJob(tpool, fn, arg, joinable)); |
| } |
| } |
| |
| /* queue a job, when a timer expires */ |
| PR_IMPLEMENT(PRJob *) |
| PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, |
| PRJobFn fn, void * arg, PRBool joinable) |
| { |
| PRIntervalTime now; |
| PRJob *jobp; |
| |
| if (PR_INTERVAL_NO_TIMEOUT == timeout) { |
| PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); |
| return NULL; |
| } |
| if (PR_INTERVAL_NO_WAIT == timeout) { |
| /* |
| * no waiting; add to jobq right away |
| */ |
| return(PR_QueueJob(tpool, fn, arg, joinable)); |
| } |
| jobp = alloc_job(joinable, tpool); |
| if (NULL == jobp) { |
| return NULL; |
| } |
| |
| /* |
| * Add a new job to timer_jobq |
| * wakeup timer worker thread |
| */ |
| |
| jobp->job_func = fn; |
| jobp->job_arg = arg; |
| jobp->tpool = tpool; |
| jobp->timeout = timeout; |
| |
| now = PR_IntervalNow(); |
| jobp->absolute = now + timeout; |
| |
| |
| PR_Lock(tpool->timerq.lock); |
| jobp->on_timerq = PR_TRUE; |
| if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) |
| PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); |
| else { |
| PRCList *qp; |
| PRJob *tmp_jobp; |
| /* |
| * insert into the sorted timer jobq |
| */ |
| for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; |
| qp = qp->prev) { |
| tmp_jobp = JOB_LINKS_PTR(qp); |
| if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { |
| break; |
| } |
| } |
| PR_INSERT_AFTER(&jobp->links,qp); |
| } |
| tpool->timerq.cnt++; |
| /* |
| * notify timer worker thread(s) |
| */ |
| notify_timerq(tpool); |
| PR_Unlock(tpool->timerq.lock); |
| return jobp; |
| } |
| |
| static void |
| notify_timerq(PRThreadPool *tp) |
| { |
| /* |
| * wakeup the timer thread(s) |
| */ |
| PR_NotifyCondVar(tp->timerq.cv); |
| } |
| |
| static void |
| notify_ioq(PRThreadPool *tp) |
| { |
| PRStatus rval_status; |
| |
| /* |
| * wakeup the io thread(s) |
| */ |
| rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); |
| PR_ASSERT(PR_SUCCESS == rval_status); |
| } |
| |
| /* |
| * cancel a job |
| * |
| * XXXX: is this needed? likely to be removed |
| */ |
| PR_IMPLEMENT(PRStatus) |
| PR_CancelJob(PRJob *jobp) { |
| |
| PRStatus rval = PR_FAILURE; |
| PRThreadPool *tp; |
| |
| if (jobp->on_timerq) { |
| /* |
| * now, check again while holding the timerq lock |
| */ |
| tp = jobp->tpool; |
| PR_Lock(tp->timerq.lock); |
| if (jobp->on_timerq) { |
| jobp->on_timerq = PR_FALSE; |
| PR_REMOVE_AND_INIT_LINK(&jobp->links); |
| tp->timerq.cnt--; |
| PR_Unlock(tp->timerq.lock); |
| if (!JOINABLE_JOB(jobp)) { |
| delete_job(jobp); |
| } else { |
| JOIN_NOTIFY(jobp); |
| } |
| rval = PR_SUCCESS; |
| } else |
| PR_Unlock(tp->timerq.lock); |
| } else if (jobp->on_ioq) { |
| /* |
| * now, check again while holding the ioq lock |
| */ |
| tp = jobp->tpool; |
| PR_Lock(tp->ioq.lock); |
| if (jobp->on_ioq) { |
| jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); |
| if (NULL == jobp->cancel_cv) { |
| PR_Unlock(tp->ioq.lock); |
| PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); |
| return PR_FAILURE; |
| } |
| /* |
| * mark job 'cancelled' and notify io thread(s) |
| * XXXX: |
| * this assumes there is only one io thread; when there |
| * are multiple threads, the io thread processing this job |
| * must be notified. |
| */ |
| jobp->cancel_io = PR_TRUE; |
| PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ |
| notify_ioq(tp); |
| PR_Lock(tp->ioq.lock); |
| while (jobp->cancel_io) |
| PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); |
| PR_Unlock(tp->ioq.lock); |
| PR_ASSERT(!jobp->on_ioq); |
| if (!JOINABLE_JOB(jobp)) { |
| delete_job(jobp); |
| } else { |
| JOIN_NOTIFY(jobp); |
| } |
| rval = PR_SUCCESS; |
| } else |
| PR_Unlock(tp->ioq.lock); |
| } |
| if (PR_FAILURE == rval) |
| PR_SetError(PR_INVALID_STATE_ERROR, 0); |
| return rval; |
| } |
| |
| /* join a job, wait until completion */ |
| PR_IMPLEMENT(PRStatus) |
| PR_JoinJob(PRJob *jobp) |
| { |
| if (!JOINABLE_JOB(jobp)) { |
| PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); |
| return PR_FAILURE; |
| } |
| PR_Lock(jobp->tpool->join_lock); |
| while(jobp->join_wait) |
| PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); |
| PR_Unlock(jobp->tpool->join_lock); |
| delete_job(jobp); |
| return PR_SUCCESS; |
| } |
| |
| /* shutdown threadpool */ |
| PR_IMPLEMENT(PRStatus) |
| PR_ShutdownThreadPool(PRThreadPool *tpool) |
| { |
| PRStatus rval = PR_SUCCESS; |
| |
| PR_Lock(tpool->jobq.lock); |
| tpool->shutdown = PR_TRUE; |
| PR_NotifyAllCondVar(tpool->shutdown_cv); |
| PR_Unlock(tpool->jobq.lock); |
| |
| return rval; |
| } |
| |
| /* |
| * join thread pool |
| * wait for termination of worker threads |
| * reclaim threadpool resources |
| */ |
| PR_IMPLEMENT(PRStatus) |
| PR_JoinThreadPool(PRThreadPool *tpool) |
| { |
| PRStatus rval = PR_SUCCESS; |
| PRCList *head; |
| PRStatus rval_status; |
| |
| PR_Lock(tpool->jobq.lock); |
| while (!tpool->shutdown) |
| PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); |
| |
| /* |
| * wakeup worker threads |
| */ |
| #ifdef OPT_WINNT |
| /* |
| * post shutdown notification for all threads |
| */ |
| { |
| int i; |
| for(i=0; i < tpool->current_threads; i++) { |
| PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, |
| TRUE, NULL); |
| } |
| } |
| #else |
| PR_NotifyAllCondVar(tpool->jobq.cv); |
| #endif |
| |
| /* |
| * wakeup io thread(s) |
| */ |
| notify_ioq(tpool); |
| |
| /* |
| * wakeup timer thread(s) |
| */ |
| PR_Lock(tpool->timerq.lock); |
| notify_timerq(tpool); |
| PR_Unlock(tpool->timerq.lock); |
| |
| while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { |
| wthread *wthrp; |
| |
| head = PR_LIST_HEAD(&tpool->jobq.wthreads); |
| PR_REMOVE_AND_INIT_LINK(head); |
| PR_Unlock(tpool->jobq.lock); |
| wthrp = WTHREAD_LINKS_PTR(head); |
| rval_status = PR_JoinThread(wthrp->thread); |
| PR_ASSERT(PR_SUCCESS == rval_status); |
| PR_DELETE(wthrp); |
| PR_Lock(tpool->jobq.lock); |
| } |
| PR_Unlock(tpool->jobq.lock); |
| while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { |
| wthread *wthrp; |
| |
| head = PR_LIST_HEAD(&tpool->ioq.wthreads); |
| PR_REMOVE_AND_INIT_LINK(head); |
| wthrp = WTHREAD_LINKS_PTR(head); |
| rval_status = PR_JoinThread(wthrp->thread); |
| PR_ASSERT(PR_SUCCESS == rval_status); |
| PR_DELETE(wthrp); |
| } |
| |
| while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { |
| wthread *wthrp; |
| |
| head = PR_LIST_HEAD(&tpool->timerq.wthreads); |
| PR_REMOVE_AND_INIT_LINK(head); |
| wthrp = WTHREAD_LINKS_PTR(head); |
| rval_status = PR_JoinThread(wthrp->thread); |
| PR_ASSERT(PR_SUCCESS == rval_status); |
| PR_DELETE(wthrp); |
| } |
| |
| /* |
| * Delete queued jobs |
| */ |
| while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { |
| PRJob *jobp; |
| |
| head = PR_LIST_HEAD(&tpool->jobq.list); |
| PR_REMOVE_AND_INIT_LINK(head); |
| jobp = JOB_LINKS_PTR(head); |
| tpool->jobq.cnt--; |
| delete_job(jobp); |
| } |
| |
| /* delete io jobs */ |
| while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { |
| PRJob *jobp; |
| |
| head = PR_LIST_HEAD(&tpool->ioq.list); |
| PR_REMOVE_AND_INIT_LINK(head); |
| tpool->ioq.cnt--; |
| jobp = JOB_LINKS_PTR(head); |
| delete_job(jobp); |
| } |
| |
| /* delete timer jobs */ |
| while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { |
| PRJob *jobp; |
| |
| head = PR_LIST_HEAD(&tpool->timerq.list); |
| PR_REMOVE_AND_INIT_LINK(head); |
| tpool->timerq.cnt--; |
| jobp = JOB_LINKS_PTR(head); |
| delete_job(jobp); |
| } |
| |
| PR_ASSERT(0 == tpool->jobq.cnt); |
| PR_ASSERT(0 == tpool->ioq.cnt); |
| PR_ASSERT(0 == tpool->timerq.cnt); |
| |
| delete_threadpool(tpool); |
| return rval; |
| } |