| /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ |
| /* ***** BEGIN LICENSE BLOCK ***** |
| * Version: MPL 1.1/GPL 2.0/LGPL 2.1 |
| * |
| * The contents of this file are subject to the Mozilla Public License Version |
| * 1.1 (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * http://www.mozilla.org/MPL/ |
| * |
| * Software distributed under the License is distributed on an "AS IS" basis, |
| * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License |
| * for the specific language governing rights and limitations under the |
| * License. |
| * |
| * The Original Code is the Netscape Portable Runtime (NSPR). |
| * |
| * The Initial Developer of the Original Code is |
| * Netscape Communications Corporation. |
| * Portions created by the Initial Developer are Copyright (C) 1998-2000 |
| * the Initial Developer. All Rights Reserved. |
| * |
| * Contributor(s): |
| * |
| * Alternatively, the contents of this file may be used under the terms of |
| * either the GNU General Public License Version 2 or later (the "GPL"), or |
| * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), |
| * in which case the provisions of the GPL or the LGPL are applicable instead |
| * of those above. If you wish to allow use of your version of this file only |
| * under the terms of either the GPL or the LGPL, and not to allow others to |
| * use your version of this file under the terms of the MPL, indicate your |
| * decision by deleting the provisions above and replace them with the notice |
| * and other provisions required by the GPL or the LGPL. If you do not delete |
| * the provisions above, a recipient may use your version of this file under |
| * the terms of any one of the MPL, the GPL or the LGPL. |
| * |
| * ***** END LICENSE BLOCK ***** */ |
| |
| #include "prio.h" |
| #include "prprf.h" |
| #include "prlog.h" |
| #include "prmem.h" |
| #include "pratom.h" |
| #include "prlock.h" |
| #include "prmwait.h" |
| #include "prclist.h" |
| #include "prerror.h" |
| #include "prinrval.h" |
| #include "prnetdb.h" |
| #include "prthread.h" |
| |
| #include "plstr.h" |
| #include "plerror.h" |
| #include "plgetopt.h" |
| |
| #include <string.h> |
| |
| typedef struct Shared |
| { |
| const char *title; |
| PRLock *list_lock; |
| PRWaitGroup *group; |
| PRIntervalTime timeout; |
| } Shared; |
| |
| typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity; |
| |
| static PRFileDesc *debug = NULL; |
| static PRInt32 desc_allocated = 0; |
| static PRUint16 default_port = 12273; |
| static enum Verbosity verbosity = quiet; |
| static PRInt32 ops_required = 1000, ops_done = 0; |
| static PRThreadScope thread_scope = PR_LOCAL_THREAD; |
| static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50; |
| |
| #if defined(DEBUG) |
| #define MW_ASSERT(_expr) \ |
| ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__)) |
| static void _MW_Assert(const char *s, const char *file, PRIntn ln) |
| { |
| if (NULL != debug) PL_FPrintError(debug, NULL); |
| PR_Assert(s, file, ln); |
| } /* _MW_Assert */ |
| #else |
| #define MW_ASSERT(_expr) |
| #endif |
| |
| static void PrintRecvDesc(PRRecvWait *desc, const char *msg) |
| { |
| const char *tag[] = { |
| "PR_MW_INTERRUPT", "PR_MW_TIMEOUT", |
| "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"}; |
| PR_fprintf( |
| debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n", |
| msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout); |
| } /* PrintRecvDesc */ |
| |
| static Shared *MakeShared(const char *title) |
| { |
| Shared *shared = PR_NEWZAP(Shared); |
| shared->group = PR_CreateWaitGroup(1); |
| shared->timeout = PR_SecondsToInterval(1); |
| shared->list_lock = PR_NewLock(); |
| shared->title = title; |
| return shared; |
| } /* MakeShared */ |
| |
| static void DestroyShared(Shared *shared) |
| { |
| PRStatus rv; |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: destroying group\n", shared->title); |
| rv = PR_DestroyWaitGroup(shared->group); |
| MW_ASSERT(PR_SUCCESS == rv); |
| PR_DestroyLock(shared->list_lock); |
| PR_DELETE(shared); |
| } /* DestroyShared */ |
| |
| static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout) |
| { |
| PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait); |
| MW_ASSERT(NULL != desc_out); |
| |
| MW_ASSERT(NULL != fd); |
| desc_out->fd = fd; |
| desc_out->timeout = timeout; |
| desc_out->buffer.length = 120; |
| desc_out->buffer.start = PR_CALLOC(120); |
| |
| PR_AtomicIncrement(&desc_allocated); |
| |
| if (verbosity > chatty) |
| PrintRecvDesc(desc_out, "Allocated"); |
| return desc_out; |
| } /* CreateRecvWait */ |
| |
| static void DestroyRecvWait(PRRecvWait *desc_out) |
| { |
| if (verbosity > chatty) |
| PrintRecvDesc(desc_out, "Destroying"); |
| PR_Close(desc_out->fd); |
| if (NULL != desc_out->buffer.start) |
| PR_DELETE(desc_out->buffer.start); |
| PR_Free(desc_out); |
| (void)PR_AtomicDecrement(&desc_allocated); |
| } /* DestroyRecvWait */ |
| |
| static void CancelGroup(Shared *shared) |
| { |
| PRRecvWait *desc_out; |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title); |
| |
| do |
| { |
| desc_out = PR_CancelWaitGroup(shared->group); |
| if (NULL != desc_out) DestroyRecvWait(desc_out); |
| } while (NULL != desc_out); |
| |
| MW_ASSERT(0 == desc_allocated); |
| MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError()); |
| } /* CancelGroup */ |
| |
| static void PR_CALLBACK ClientThread(void* arg) |
| { |
| PRStatus rv; |
| PRInt32 bytes; |
| PRIntn empty_flags = 0; |
| PRNetAddr server_address; |
| unsigned char buffer[100]; |
| Shared *shared = (Shared*)arg; |
| PRFileDesc *server = PR_NewTCPSocket(); |
| if ((NULL == server) |
| && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return; |
| MW_ASSERT(NULL != server); |
| |
| if (verbosity > chatty) |
| PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server); |
| |
| /* Initialize the buffer so that Purify won't complain */ |
| memset(buffer, 0, sizeof(buffer)); |
| |
| rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address); |
| MW_ASSERT(PR_SUCCESS == rv); |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: Client opening connection\n", shared->title); |
| rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT); |
| |
| if (PR_FAILURE == rv) |
| { |
| if (verbosity > silent) PL_FPrintError(debug, "Client connect failed"); |
| return; |
| } |
| |
| while (ops_done < ops_required) |
| { |
| bytes = PR_Send( |
| server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); |
| if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break; |
| MW_ASSERT(sizeof(buffer) == bytes); |
| if (verbosity > chatty) |
| PR_fprintf( |
| debug, "%s: Client sent %d bytes\n", |
| shared->title, sizeof(buffer)); |
| bytes = PR_Recv( |
| server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); |
| if (verbosity > chatty) |
| PR_fprintf( |
| debug, "%s: Client received %d bytes\n", |
| shared->title, sizeof(buffer)); |
| if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break; |
| MW_ASSERT(sizeof(buffer) == bytes); |
| PR_Sleep(shared->timeout); |
| } |
| rv = PR_Close(server); |
| MW_ASSERT(PR_SUCCESS == rv); |
| |
| } /* ClientThread */ |
| |
| static void OneInThenCancelled(Shared *shared) |
| { |
| PRStatus rv; |
| PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait); |
| |
| shared->timeout = PR_INTERVAL_NO_TIMEOUT; |
| |
| desc_in->fd = PR_NewTCPSocket(); |
| desc_in->timeout = shared->timeout; |
| |
| if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc"); |
| |
| rv = PR_AddWaitFileDesc(shared->group, desc_in); |
| MW_ASSERT(PR_SUCCESS == rv); |
| |
| if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling"); |
| rv = PR_CancelWaitFileDesc(shared->group, desc_in); |
| MW_ASSERT(PR_SUCCESS == rv); |
| |
| desc_out = PR_WaitRecvReady(shared->group); |
| MW_ASSERT(desc_out == desc_in); |
| MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome); |
| MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); |
| if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); |
| |
| rv = PR_Close(desc_in->fd); |
| MW_ASSERT(PR_SUCCESS == rv); |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: destroying group\n", shared->title); |
| |
| PR_DELETE(desc_in); |
| } /* OneInThenCancelled */ |
| |
| static void OneOpOneThread(Shared *shared) |
| { |
| PRStatus rv; |
| PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait); |
| |
| desc_in->fd = PR_NewTCPSocket(); |
| desc_in->timeout = shared->timeout; |
| |
| if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc"); |
| |
| rv = PR_AddWaitFileDesc(shared->group, desc_in); |
| MW_ASSERT(PR_SUCCESS == rv); |
| desc_out = PR_WaitRecvReady(shared->group); |
| MW_ASSERT(desc_out == desc_in); |
| MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); |
| MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); |
| if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); |
| |
| rv = PR_Close(desc_in->fd); |
| MW_ASSERT(PR_SUCCESS == rv); |
| |
| PR_DELETE(desc_in); |
| } /* OneOpOneThread */ |
| |
| static void ManyOpOneThread(Shared *shared) |
| { |
| PRStatus rv; |
| PRIntn index; |
| PRRecvWait *desc_in; |
| PRRecvWait *desc_out; |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects); |
| |
| for (index = 0; index < wait_objects; ++index) |
| { |
| desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); |
| |
| rv = PR_AddWaitFileDesc(shared->group, desc_in); |
| MW_ASSERT(PR_SUCCESS == rv); |
| } |
| |
| while (ops_done < ops_required) |
| { |
| desc_out = PR_WaitRecvReady(shared->group); |
| MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); |
| MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); |
| if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding"); |
| rv = PR_AddWaitFileDesc(shared->group, desc_out); |
| MW_ASSERT(PR_SUCCESS == rv); |
| (void)PR_AtomicIncrement(&ops_done); |
| } |
| |
| CancelGroup(shared); |
| } /* ManyOpOneThread */ |
| |
| static void PR_CALLBACK SomeOpsThread(void *arg) |
| { |
| PRRecvWait *desc_out; |
| PRStatus rv = PR_SUCCESS; |
| Shared *shared = (Shared*)arg; |
| do /* until interrupted */ |
| { |
| desc_out = PR_WaitRecvReady(shared->group); |
| if (NULL == desc_out) |
| { |
| MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); |
| if (verbosity > quiet) PR_fprintf(debug, "Aborted\n"); |
| break; |
| } |
| MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); |
| MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); |
| if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); |
| |
| if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding"); |
| desc_out->timeout = shared->timeout; |
| rv = PR_AddWaitFileDesc(shared->group, desc_out); |
| PR_AtomicIncrement(&ops_done); |
| if (ops_done > ops_required) break; |
| } while (PR_SUCCESS == rv); |
| MW_ASSERT(PR_SUCCESS == rv); |
| } /* SomeOpsThread */ |
| |
| static void SomeOpsSomeThreads(Shared *shared) |
| { |
| PRStatus rv; |
| PRThread **thread; |
| PRIntn index; |
| PRRecvWait *desc_in; |
| |
| thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads); |
| |
| /* Create some threads */ |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: creating threads\n", shared->title); |
| for (index = 0; index < worker_threads; ++index) |
| { |
| thread[index] = PR_CreateThread( |
| PR_USER_THREAD, SomeOpsThread, shared, |
| PR_PRIORITY_HIGH, thread_scope, |
| PR_JOINABLE_THREAD, 16 * 1024); |
| } |
| |
| /* then create some operations */ |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: creating desc\n", shared->title); |
| for (index = 0; index < wait_objects; ++index) |
| { |
| desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); |
| rv = PR_AddWaitFileDesc(shared->group, desc_in); |
| MW_ASSERT(PR_SUCCESS == rv); |
| } |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: sleeping\n", shared->title); |
| while (ops_done < ops_required) PR_Sleep(shared->timeout); |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title); |
| for (index = 0; index < worker_threads; ++index) |
| { |
| rv = PR_Interrupt(thread[index]); |
| MW_ASSERT(PR_SUCCESS == rv); |
| rv = PR_JoinThread(thread[index]); |
| MW_ASSERT(PR_SUCCESS == rv); |
| } |
| PR_DELETE(thread); |
| |
| CancelGroup(shared); |
| } /* SomeOpsSomeThreads */ |
| |
| static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc) |
| { |
| PRInt32 bytes_out; |
| |
| if (verbosity > chatty) |
| PR_fprintf( |
| debug, "%s: Service received %d bytes\n", |
| shared->title, desc->bytesRecv); |
| |
| if (0 == desc->bytesRecv) goto quitting; |
| if ((-1 == desc->bytesRecv) |
| && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted; |
| |
| bytes_out = PR_Send( |
| desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout); |
| if (verbosity > chatty) |
| PR_fprintf( |
| debug, "%s: Service sent %d bytes\n", |
| shared->title, bytes_out); |
| |
| if ((-1 == bytes_out) |
| && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted; |
| MW_ASSERT(bytes_out == desc->bytesRecv); |
| |
| return PR_SUCCESS; |
| |
| aborted: |
| quitting: |
| return PR_FAILURE; |
| } /* ServiceRequest */ |
| |
| static void PR_CALLBACK ServiceThread(void *arg) |
| { |
| PRStatus rv = PR_SUCCESS; |
| PRRecvWait *desc_out = NULL; |
| Shared *shared = (Shared*)arg; |
| do /* until interrupted */ |
| { |
| if (NULL != desc_out) |
| { |
| desc_out->timeout = PR_INTERVAL_NO_TIMEOUT; |
| if (verbosity > chatty) |
| PrintRecvDesc(desc_out, "Service re-adding"); |
| rv = PR_AddWaitFileDesc(shared->group, desc_out); |
| MW_ASSERT(PR_SUCCESS == rv); |
| } |
| |
| desc_out = PR_WaitRecvReady(shared->group); |
| if (NULL == desc_out) |
| { |
| MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); |
| break; |
| } |
| |
| switch (desc_out->outcome) |
| { |
| case PR_MW_SUCCESS: |
| { |
| PR_AtomicIncrement(&ops_done); |
| if (verbosity > chatty) |
| PrintRecvDesc(desc_out, "Service ready"); |
| rv = ServiceRequest(shared, desc_out); |
| break; |
| } |
| case PR_MW_INTERRUPT: |
| MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); |
| rv = PR_FAILURE; /* if interrupted, then exit */ |
| break; |
| case PR_MW_TIMEOUT: |
| MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); |
| case PR_MW_FAILURE: |
| if (verbosity > silent) |
| PL_FPrintError(debug, "RecvReady failure"); |
| break; |
| default: |
| break; |
| } |
| } while (PR_SUCCESS == rv); |
| |
| if (NULL != desc_out) DestroyRecvWait(desc_out); |
| |
| } /* ServiceThread */ |
| |
| static void PR_CALLBACK EnumerationThread(void *arg) |
| { |
| PRStatus rv; |
| PRIntn count; |
| PRRecvWait *desc; |
| Shared *shared = (Shared*)arg; |
| PRIntervalTime five_seconds = PR_SecondsToInterval(5); |
| PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group); |
| MW_ASSERT(NULL != enumerator); |
| |
| while (PR_SUCCESS == PR_Sleep(five_seconds)) |
| { |
| count = 0; |
| desc = NULL; |
| while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc))) |
| { |
| if (verbosity > chatty) PrintRecvDesc(desc, shared->title); |
| count += 1; |
| } |
| if (verbosity > silent) |
| PR_fprintf(debug, |
| "%s Enumerated %d objects\n", shared->title, count); |
| } |
| |
| MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); |
| |
| |
| rv = PR_DestroyMWaitEnumerator(enumerator); |
| MW_ASSERT(PR_SUCCESS == rv); |
| } /* EnumerationThread */ |
| |
| static void PR_CALLBACK ServerThread(void *arg) |
| { |
| PRStatus rv; |
| PRIntn index; |
| PRRecvWait *desc_in; |
| PRThread **worker_thread; |
| Shared *shared = (Shared*)arg; |
| PRFileDesc *listener, *service; |
| PRNetAddr server_address, client_address; |
| |
| worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads); |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title); |
| for (index = 0; index < worker_threads; ++index) |
| { |
| worker_thread[index] = PR_CreateThread( |
| PR_USER_THREAD, ServiceThread, shared, |
| PR_PRIORITY_HIGH, thread_scope, |
| PR_JOINABLE_THREAD, 16 * 1024); |
| } |
| |
| rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address); |
| MW_ASSERT(PR_SUCCESS == rv); |
| |
| listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener); |
| if (verbosity > chatty) |
| PR_fprintf( |
| debug, "%s: Server listener socket @0x%x\n", |
| shared->title, listener); |
| rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv); |
| rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv); |
| while (ops_done < ops_required) |
| { |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: Server accepting connection\n", shared->title); |
| service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT); |
| if (NULL == service) |
| { |
| if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break; |
| PL_PrintError("Accept failed"); |
| MW_ASSERT(!"Accept failed"); |
| } |
| else |
| { |
| desc_in = CreateRecvWait(service, shared->timeout); |
| desc_in->timeout = PR_INTERVAL_NO_TIMEOUT; |
| if (verbosity > chatty) |
| PrintRecvDesc(desc_in, "Service adding"); |
| rv = PR_AddWaitFileDesc(shared->group, desc_in); |
| MW_ASSERT(PR_SUCCESS == rv); |
| } |
| } |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title); |
| for (index = 0; index < worker_threads; ++index) |
| { |
| rv = PR_Interrupt(worker_thread[index]); |
| MW_ASSERT(PR_SUCCESS == rv); |
| rv = PR_JoinThread(worker_thread[index]); |
| MW_ASSERT(PR_SUCCESS == rv); |
| } |
| PR_DELETE(worker_thread); |
| |
| PR_Close(listener); |
| |
| CancelGroup(shared); |
| |
| } /* ServerThread */ |
| |
| static void RealOneGroupIO(Shared *shared) |
| { |
| /* |
| ** Create a server that listens for connections and then services |
| ** requests that come in over those connections. The server never |
| ** deletes a connection and assumes a basic RPC model of operation. |
| ** |
| ** Use worker_threads threads to service how every many open ports |
| ** there might be. |
| ** |
| ** Oh, ya. Almost forget. Create (some) clients as well. |
| */ |
| PRStatus rv; |
| PRIntn index; |
| PRThread *server_thread, *enumeration_thread, **client_thread; |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: creating server_thread\n", shared->title); |
| |
| server_thread = PR_CreateThread( |
| PR_USER_THREAD, ServerThread, shared, |
| PR_PRIORITY_HIGH, thread_scope, |
| PR_JOINABLE_THREAD, 16 * 1024); |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title); |
| |
| enumeration_thread = PR_CreateThread( |
| PR_USER_THREAD, EnumerationThread, shared, |
| PR_PRIORITY_HIGH, thread_scope, |
| PR_JOINABLE_THREAD, 16 * 1024); |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title); |
| PR_Sleep(5 * shared->timeout); |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: creating client_threads\n", shared->title); |
| client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads); |
| for (index = 0; index < client_threads; ++index) |
| { |
| client_thread[index] = PR_CreateThread( |
| PR_USER_THREAD, ClientThread, shared, |
| PR_PRIORITY_NORMAL, thread_scope, |
| PR_JOINABLE_THREAD, 16 * 1024); |
| } |
| |
| while (ops_done < ops_required) PR_Sleep(shared->timeout); |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title); |
| for (index = 0; index < client_threads; ++index) |
| { |
| rv = PR_Interrupt(client_thread[index]); |
| MW_ASSERT(PR_SUCCESS == rv); |
| rv = PR_JoinThread(client_thread[index]); |
| MW_ASSERT(PR_SUCCESS == rv); |
| } |
| PR_DELETE(client_thread); |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title); |
| rv = PR_Interrupt(enumeration_thread); |
| MW_ASSERT(PR_SUCCESS == rv); |
| rv = PR_JoinThread(enumeration_thread); |
| MW_ASSERT(PR_SUCCESS == rv); |
| |
| if (verbosity > quiet) |
| PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title); |
| rv = PR_Interrupt(server_thread); |
| MW_ASSERT(PR_SUCCESS == rv); |
| rv = PR_JoinThread(server_thread); |
| MW_ASSERT(PR_SUCCESS == rv); |
| } /* RealOneGroupIO */ |
| |
| static void RunThisOne( |
| void (*func)(Shared*), const char *name, const char *test_name) |
| { |
| Shared *shared; |
| if ((NULL == test_name) || (0 == PL_strcmp(name, test_name))) |
| { |
| if (verbosity > silent) |
| PR_fprintf(debug, "%s()\n", name); |
| shared = MakeShared(name); |
| ops_done = 0; |
| func(shared); /* run the test */ |
| MW_ASSERT(0 == desc_allocated); |
| DestroyShared(shared); |
| } |
| } /* RunThisOne */ |
| |
| static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta) |
| { |
| PRIntn verbage = (PRIntn)verbosity; |
| return (Verbosity)(verbage += delta); |
| } /* ChangeVerbosity */ |
| |
| int main(int argc, char **argv) |
| { |
| PLOptStatus os; |
| const char *test_name = NULL; |
| PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:"); |
| |
| while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) |
| { |
| if (PL_OPT_BAD == os) continue; |
| switch (opt->option) |
| { |
| case 0: |
| test_name = opt->value; |
| break; |
| case 'd': /* debug mode */ |
| if (verbosity < noisy) |
| verbosity = ChangeVerbosity(verbosity, 1); |
| break; |
| case 'q': /* debug mode */ |
| if (verbosity > silent) |
| verbosity = ChangeVerbosity(verbosity, -1); |
| break; |
| case 'G': /* use global threads */ |
| thread_scope = PR_GLOBAL_THREAD; |
| break; |
| case 'c': /* number of client threads */ |
| client_threads = atoi(opt->value); |
| break; |
| case 'o': /* operations to compelete */ |
| ops_required = atoi(opt->value); |
| break; |
| case 'p': /* default port */ |
| default_port = atoi(opt->value); |
| break; |
| case 't': /* number of threads waiting */ |
| worker_threads = atoi(opt->value); |
| break; |
| case 'w': /* number of wait objects */ |
| wait_objects = atoi(opt->value); |
| break; |
| default: |
| break; |
| } |
| } |
| PL_DestroyOptState(opt); |
| |
| if (verbosity > 0) |
| debug = PR_GetSpecialFD(PR_StandardError); |
| |
| RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name); |
| RunThisOne(OneOpOneThread, "OneOpOneThread", test_name); |
| RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name); |
| RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name); |
| RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name); |
| return 0; |
| } /* main */ |
| |
| /* multwait.c */ |