00001 // @(#)root/thread:$Id: TWin32Condition.cxx 20882 2007-11-19 11:31:26Z rdm $ 00002 // Author: Bertrand Bellenot 20/10/2004 00003 00004 /************************************************************************* 00005 * Copyright (C) 1995-2004, Rene Brun and Fons Rademakers. * 00006 * All rights reserved. * 00007 * * 00008 * For the licensing terms see $ROOTSYS/LICENSE. * 00009 * For the list of contributors see $ROOTSYS/README/CREDITS. * 00010 *************************************************************************/ 00011 00012 ////////////////////////////////////////////////////////////////////////// 00013 // // 00014 // TWin32Condition // 00015 // // 00016 // This class provides an interface to the win32 condition variable // 00017 // routines. // 00018 // // 00019 ////////////////////////////////////////////////////////////////////////// 00020 00021 #include "TWin32Condition.h" 00022 #include "TWin32Mutex.h" 00023 #include "TTimeStamp.h" 00024 #include "Windows4Root.h" 00025 00026 #include <errno.h> 00027 00028 ClassImp(TWin32Condition) 00029 00030 //______________________________________________________________________________ 00031 TWin32Condition::TWin32Condition(TMutexImp *m) 00032 { 00033 // Create Condition variable. Ctor must be given a pointer to an 00034 // existing mutex. The condition variable is then linked to the mutex, 00035 // so that there is an implicit unlock and lock around Wait() and 00036 // TimedWait(). 00037 00038 fMutex = (TWin32Mutex *) m; 00039 00040 fCond.waiters_count_ = 0; 00041 fCond.was_broadcast_ = 0; 00042 fCond.sema_ = CreateSemaphore(0, // no security 00043 0, // initially 0 00044 0x7fffffff, // max count 00045 0); // unnamed 00046 InitializeCriticalSection (&fCond.waiters_count_lock_); 00047 fCond.waiters_done_ = CreateEvent(0, // no security 00048 FALSE, // auto-reset 00049 FALSE, // non-signaled initially 00050 0); // unnamed 00051 } 00052 00053 //______________________________________________________________________________ 00054 TWin32Condition::~TWin32Condition() 00055 { 00056 // TCondition dtor. 00057 00058 } 00059 00060 //______________________________________________________________________________ 00061 Int_t TWin32Condition::Wait() 00062 { 00063 // Wait for the condition variable to be signalled. The mutex is 00064 // implicitely released before waiting and locked again after waking up. 00065 // If Wait() is called by multiple threads, a signal may wake up more 00066 // than one thread. See POSIX threads documentation for details. 00067 00068 // Avoid race conditions. 00069 EnterCriticalSection(&fCond.waiters_count_lock_); 00070 fCond.waiters_count_++; 00071 LeaveCriticalSection(&fCond.waiters_count_lock_); 00072 00073 // This call atomically releases the mutex and waits on the 00074 // semaphore until <pthread_cond_signal> or <pthread_cond_broadcast> 00075 // are called by another thread. 00076 // SignalObjectAndWait(fMutex->fHMutex, fCond.sema_, INFINITE, FALSE); 00077 ::LeaveCriticalSection(&fMutex->fCritSect); 00078 WaitForSingleObject(fCond.sema_, INFINITE); 00079 00080 // Reacquire lock to avoid race conditions. 00081 EnterCriticalSection(&fCond.waiters_count_lock_); 00082 00083 // We're no longer waiting... 00084 fCond.waiters_count_--; 00085 00086 // Check to see if we're the last waiter after <pthread_cond_broadcast>. 00087 int last_waiter = fCond.was_broadcast_ && fCond.waiters_count_ == 0; 00088 00089 LeaveCriticalSection(&fCond.waiters_count_lock_); 00090 00091 // If we're the last waiter thread during this particular broadcast 00092 // then let all the other threads proceed. 00093 if (last_waiter) { 00094 // This call atomically signals the <waiters_done_> event and waits until 00095 // it can acquire the <fMutex->fHMutex>. This is required to ensure fairness. 00096 // SignalObjectAndWait(fCond.waiters_done_, fMutex->fHMutex, INFINITE, FALSE); 00097 SetEvent(fCond.waiters_done_); 00098 ::EnterCriticalSection(&fMutex->fCritSect); 00099 } 00100 else 00101 // Always regain the external mutex since that's the guarantee we 00102 // give to our callers. 00103 // WaitForSingleObject(fMutex->fHMutex, INFINITE); 00104 ::EnterCriticalSection(&fMutex->fCritSect); 00105 00106 return 0; 00107 } 00108 00109 //______________________________________________________________________________ 00110 Int_t TWin32Condition::TimedWait(ULong_t secs, ULong_t nanoSecs) 00111 { 00112 // TimedWait() is given an absolute time to wait until. To wait for a 00113 // relative time from now, use TThread::GetTime(). See POSIX threads 00114 // documentation for why absolute times are better than relative. 00115 // Returns 0 if successfully signalled, 1 if time expired. 00116 00117 DWORD ret; 00118 TTimeStamp t; 00119 // Get actual time 00120 ULong_t secNow = t.GetSec(); 00121 ULong_t nanosecNow = t.GetNanoSec(); 00122 DWORD dwMillisecondsNow = (DWORD)((secNow * 1000) + (nanosecNow / 1000000)); 00123 DWORD dwMilliseconds = (DWORD)((secs * 1000) + (nanoSecs / 1000000)); 00124 // Calculate delta T to obtain the real time to wait for 00125 DWORD dwTimeWait = (DWORD)(dwMilliseconds - dwMillisecondsNow); 00126 // Avoid race conditions. 00127 EnterCriticalSection(&fCond.waiters_count_lock_); 00128 fCond.waiters_count_++; 00129 LeaveCriticalSection(&fCond.waiters_count_lock_); 00130 00131 // This call atomically releases the mutex and waits on the 00132 // semaphore until <pthread_cond_signal> or <pthread_cond_broadcast> 00133 // are called by another thread. 00134 // ret = SignalObjectAndWait(fMutex->fHMutex, fCond.sema_, dwTimeWait, FALSE); 00135 ::LeaveCriticalSection(&fMutex->fCritSect); 00136 ret = WaitForSingleObject(fCond.sema_, dwTimeWait); 00137 00138 // Reacquire lock to avoid race conditions. 00139 EnterCriticalSection(&fCond.waiters_count_lock_); 00140 00141 // We're no longer waiting... 00142 fCond.waiters_count_--; 00143 00144 // Check to see if we're the last waiter after <pthread_cond_broadcast>. 00145 int last_waiter = fCond.was_broadcast_ && fCond.waiters_count_ == 0; 00146 00147 LeaveCriticalSection(&fCond.waiters_count_lock_); 00148 00149 // If we're the last waiter thread during this particular broadcast 00150 // then let all the other threads proceed. 00151 if (last_waiter) { 00152 // This call atomically signals the <waiters_done_> event and waits until 00153 // it can acquire the <fMutex->fHMutex>. This is required to ensure fairness. 00154 // SignalObjectAndWait(fCond.waiters_done_, fMutex->fHMutex, dwTimeWait, FALSE); 00155 SetEvent(fCond.waiters_done_); 00156 ::EnterCriticalSection(&fMutex->fCritSect); 00157 } 00158 else 00159 // Always regain the external mutex since that's the guarantee we 00160 // give to our callers. 00161 // WaitForSingleObject(fMutex->fHMutex, INFINITE); 00162 ::EnterCriticalSection(&fMutex->fCritSect); 00163 00164 if (ret == WAIT_TIMEOUT) 00165 return 1; 00166 return 0; 00167 } 00168 00169 //______________________________________________________________________________ 00170 Int_t TWin32Condition::Signal() 00171 { 00172 // If one or more threads have called Wait(), Signal() wakes up at least 00173 // one of them, possibly more. See POSIX threads documentation for details. 00174 00175 EnterCriticalSection (&fCond.waiters_count_lock_); 00176 int have_waiters = fCond.waiters_count_ > 0; 00177 LeaveCriticalSection (&fCond.waiters_count_lock_); 00178 00179 // If there aren't any waiters, then this is a no-op. 00180 if (have_waiters) 00181 ReleaseSemaphore(fCond.sema_, 1, 0); 00182 00183 return 0; 00184 } 00185 00186 00187 //______________________________________________________________________________ 00188 Int_t TWin32Condition::Broadcast() 00189 { 00190 // Broadcast is like signal but wakes all threads which have called Wait(). 00191 00192 // This is needed to ensure that <waiters_count_> and <was_broadcast_> are 00193 // consistent relative to each other. 00194 EnterCriticalSection(&fCond.waiters_count_lock_); 00195 int have_waiters = 0; 00196 00197 if (fCond.waiters_count_ > 0) { 00198 // We are broadcasting, even if there is just one waiter... 00199 // Record that we are broadcasting, which helps optimize 00200 // <pthread_cond_wait> for the non-broadcast case. 00201 fCond.was_broadcast_ = 1; 00202 have_waiters = 1; 00203 } 00204 00205 if (have_waiters) { 00206 // Wake up all the waiters atomically. 00207 ReleaseSemaphore(fCond.sema_, fCond.waiters_count_, 0); 00208 00209 LeaveCriticalSection(&fCond.waiters_count_lock_); 00210 00211 // Wait for all the awakened threads to acquire the counting 00212 // semaphore. 00213 WaitForSingleObject(fCond.waiters_done_, INFINITE); 00214 // This assignment is okay, even without the <waiters_count_lock_> held 00215 // because no other waiter threads can wake up to access it. 00216 fCond.was_broadcast_ = 0; 00217 } 00218 else 00219 LeaveCriticalSection(&fCond.waiters_count_lock_); 00220 00221 return 0; 00222 }