DABC (Data Acquisition Backbone Core)  2.9.9
threads.h
Go to the documentation of this file.
1 // $Id: threads.h 4781 2021-07-14 12:28:15Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #ifndef DABC_threads
17 #define DABC_threads
18 
19 #include <pthread.h>
20 
21 #ifndef DABC_defines
22 #include "dabc/defines.h"
23 #endif
24 
25 #ifndef DABC_logging
26 #include "dabc/logging.h"
27 #endif
28 
29 #if defined(__MACH__) /* Apple OSX section */
30 
31 // try to provide dummy wrapper for all using functions around affinity
32 
33 struct cpu_set_t {
34  unsigned flag{0};
35 };
36 
37 #define CPU_SETSIZE 32
38 
39 extern "C" void CPU_ZERO(cpu_set_t *arg);
40 
41 extern "C" void CPU_SET(int cpu, cpu_set_t *arg);
42 
43 extern "C" bool CPU_ISSET(int cpu, cpu_set_t *arg);
44 
45 extern "C" void CPU_CLR(int cpu, cpu_set_t *arg);
46 
47 extern "C" int sched_getaffinity(int, int, cpu_set_t* set);
48 
49 extern "C" int sched_setaffinity(int, int, cpu_set_t*);
50 
51 #endif
52 
53 
54 namespace dabc {
55 
61  class Mutex {
62  friend class LockGuard;
63  friend class UnlockGuard;
64  friend class Condition;
65  friend class MutexPtr;
66  protected:
67  pthread_mutex_t fMutex;
68  public:
69  Mutex(bool recursive = false);
70  inline ~Mutex() { pthread_mutex_destroy(&fMutex); }
71  inline void Lock() { pthread_mutex_lock(&fMutex); }
72  inline void Unlock() { pthread_mutex_unlock(&fMutex); }
73  bool TryLock();
74  bool IsLocked();
75  };
76 
77  // ____________________________________________________________
78 
84  class MutexPtr {
85  protected:
86  pthread_mutex_t* fMutex;
87  public:
88  inline MutexPtr(pthread_mutex_t& mutex) : fMutex(&mutex) {}
89  inline MutexPtr(pthread_mutex_t* mutex) : fMutex(mutex) {}
90  inline MutexPtr(const Mutex& mutex) : fMutex((pthread_mutex_t*)&(mutex.fMutex)) {}
91  inline MutexPtr(const Mutex* mutex) : fMutex(mutex ? (pthread_mutex_t*) &(mutex->fMutex) : 0) {}
92  inline MutexPtr(const MutexPtr& src) : fMutex(src.fMutex) {}
93 
94  inline ~MutexPtr() {}
95 
96  bool null() const { return fMutex == 0; }
97  void clear() { fMutex = 0; }
98 
99  inline void Lock() { if (fMutex) pthread_mutex_lock(fMutex); }
100  inline void Unlock() { if (fMutex) pthread_mutex_unlock(fMutex); }
101  bool TryLock();
102  bool IsLocked();
103  };
104 
105  // ___________________________________________________________
106 
107 #ifdef DABC_EXTRA_CHECKS
108 
109 //#define DABC_LOCKGUARD(mutex,info) dabc::LockGuard dabc_guard(mutex)
110 
111 #define DABC_LOCKGUARD(mutex,info) \
112  dabc::LockGuard dabc_guard(mutex, false); \
113  while (!dabc_guard.TryingLock()) EOUT(info);
114 
115 #else
116 
117 #define DABC_LOCKGUARD(mutex,info) dabc::LockGuard dabc_guard(mutex)
118 
119 #endif
120 
121 
127  class LockGuard {
128  protected:
130  public:
131  inline LockGuard(pthread_mutex_t& mutex) :
132  fMutex(mutex)
133  {
134  fMutex.Lock();
135  }
136  inline LockGuard(pthread_mutex_t* mutex) :
137  fMutex(mutex)
138  {
139  fMutex.Lock();
140  }
141  inline LockGuard(const Mutex& mutex) :
142  fMutex(mutex)
143  {
144  fMutex.Lock();
145  }
146  inline LockGuard(const Mutex* mutex) :
147  fMutex(mutex)
148  {
149  fMutex.Lock();
150  }
151 
152  inline LockGuard(pthread_mutex_t& mutex, bool) :
153  fMutex(mutex)
154  {
155  }
156  inline LockGuard(pthread_mutex_t* mutex, bool) :
157  fMutex(mutex)
158  {
159  }
160  inline LockGuard(const Mutex& mutex, bool) :
161  fMutex(mutex)
162  {
163  }
164  inline LockGuard(const Mutex* mutex, bool) :
165  fMutex(mutex)
166  {
167  }
168 
169  inline ~LockGuard()
170  {
171  fMutex.Unlock();
172  }
173 
174  bool TryingLock()
175  {
176  if (fMutex.null()) return true;
177 
178  int cnt=1000000;
179  while (cnt-->0)
180  if (fMutex.TryLock()) return true;
181 
182  // return false if many attempts to lock mutex fails
183  return false;
184  }
185 
186 
187  };
188 
189  // ____________________________________________________________
190 
215  class UnlockGuard {
216  protected:
217  pthread_mutex_t* fMutex;
218  public:
219  inline UnlockGuard(const Mutex* mutex) : fMutex(mutex ? (pthread_mutex_t*) &(mutex->fMutex) : 0)
220  {
221  if (fMutex) pthread_mutex_unlock(fMutex);
222  }
223  inline ~UnlockGuard()
224  {
225  if (fMutex) pthread_mutex_lock(fMutex);
226  }
227  };
228 
229  // ______________________________________________________________
230 
231 
240  class IntGuard {
241  private:
242  int* fInt;
243 
244  public:
245  inline IntGuard(const int* value) { fInt = (int*) value; if (fInt) (*fInt)++; }
246  inline IntGuard(const int& value) { fInt = (int*) &value; (*fInt)++; }
247  inline ~IntGuard() { if (fInt) (*fInt)--; }
248 
249  inline int* ptr() { return fInt; }
250  };
251 
252 
253 
254  // ___________________________________________________________
255 
261  class Condition {
262  protected:
265  pthread_cond_t fCond;
266  long int fFiredCounter;
267  bool fWaiting;
268  public:
269  Condition(Mutex* ext_mtx = nullptr);
270  virtual ~Condition();
271 
272  inline void DoFire()
273  {
274  LockGuard lock(fCondMutex);
275  _DoFire();
276  }
277 
278  inline bool DoWait(double wait_seconds = -1.)
279  {
280  LockGuard lock(fCondMutex);
281  return _DoWait(wait_seconds);
282  }
283 
284  inline void _DoReset()
285  {
286  if (!fWaiting) fFiredCounter = 0;
287  }
288 
289 
290  inline void Reset()
291  {
292  LockGuard lock(fCondMutex);
293  _DoReset();
294  }
295 
296  inline void _DoFire()
297  {
298  // mutex must be already locked at this point
299  fFiredCounter++;
300  if (fWaiting)
301  pthread_cond_signal(&fCond);
302  }
303 
304  bool _DoWait(double wait_seconds);
305 
306  Mutex* CondMutex() const { return fCondMutex; }
307 
308  bool _Waiting() const { return fWaiting; }
309 
310  long int _FiredCounter() const { return fFiredCounter; }
311  };
312 
313  // ___________________________________________________________
314 
320  class Runnable {
321  friend class Thread;
322 
323  public:
324  virtual void* MainLoop() = 0;
325 
326  virtual void RunnableCancelled() {}
327 
328  virtual ~Runnable();
329  };
330 
331  // ___________________________________________________________
332 
333  typedef pthread_t Thread_t;
334 
340  class PosixThread {
341  protected:
342  pthread_t fThrd;
343  cpu_set_t fCpuSet;
344  static cpu_set_t fDfltSet;
345  static cpu_set_t fSpecialSet;
346 
347  void UseCurrentAsSelf() { fThrd = pthread_self(); }
348 
349  public:
350 
351  typedef void* (StartRoutine)(void*);
352 
353  PosixThread();
354  virtual ~PosixThread();
355 
364  bool SetAffinity(const char* aff);
365 
374  bool GetAffinity(bool actual, char* buf, unsigned maxbuf);
375 
377  void Start(Runnable* run);
378 
380  void Start(StartRoutine* func, void* args);
381 
383  void Join();
384 
386  void Kill(int sig = 9);
387 
389  void Cancel();
390 
392  void SetPriority(int prio);
393 
395  void SetThreadName(const char *thrdname);
396 
398  inline Thread_t Id() const { return fThrd; }
399 
401  static Thread_t Self() { return pthread_self(); }
402 
404  inline bool IsItself() const { return pthread_equal(fThrd, pthread_self()) != 0; }
405 
417  static bool SetDfltAffinity(const char* aff = nullptr);
418 
422  static bool GetDfltAffinity(char* buf, unsigned maxbuf);
423  };
424 
425 }
426 
427 #endif
posix pthread condition
Definition: threads.h:261
bool _Waiting() const
Definition: threads.h:308
virtual ~Condition()
Definition: threads.cxx:100
void _DoFire()
Definition: threads.h:296
long int fFiredCounter
Definition: threads.h:266
Mutex fInternCondMutex
Definition: threads.h:263
void DoFire()
Definition: threads.h:272
Condition(Mutex *ext_mtx=nullptr)
Definition: threads.cxx:91
long int _FiredCounter() const
Definition: threads.h:310
Mutex * CondMutex() const
Definition: threads.h:306
void _DoReset()
Definition: threads.h:284
bool DoWait(double wait_seconds=-1.)
Definition: threads.h:278
void Reset()
Definition: threads.h:290
pthread_cond_t fCond
Definition: threads.h:265
Mutex * fCondMutex
Definition: threads.h:264
bool _DoWait(double wait_seconds)
Definition: threads.cxx:105
Guard for integer value.
Definition: threads.h:240
IntGuard(const int &value)
Definition: threads.h:246
int * ptr()
Definition: threads.h:249
int * fInt
Definition: threads.h:242
IntGuard(const int *value)
Definition: threads.h:245
Lock guard for posix mutex.
Definition: threads.h:127
bool TryingLock()
Definition: threads.h:174
LockGuard(pthread_mutex_t &mutex, bool)
Definition: threads.h:152
LockGuard(const Mutex *mutex, bool)
Definition: threads.h:164
LockGuard(const Mutex *mutex)
Definition: threads.h:146
LockGuard(pthread_mutex_t &mutex)
Definition: threads.h:131
LockGuard(const Mutex &mutex)
Definition: threads.h:141
LockGuard(pthread_mutex_t *mutex, bool)
Definition: threads.h:156
LockGuard(const Mutex &mutex, bool)
Definition: threads.h:160
MutexPtr fMutex
Definition: threads.h:129
LockGuard(pthread_mutex_t *mutex)
Definition: threads.h:136
Pointer on posix pthread mutex
Definition: threads.h:84
MutexPtr(pthread_mutex_t *mutex)
Definition: threads.h:89
bool null() const
Definition: threads.h:96
MutexPtr(const Mutex &mutex)
Definition: threads.h:90
bool TryLock()
Definition: threads.cxx:73
bool IsLocked()
Definition: threads.cxx:79
void Lock()
Definition: threads.h:99
MutexPtr(pthread_mutex_t &mutex)
Definition: threads.h:88
pthread_mutex_t * fMutex
Definition: threads.h:86
MutexPtr(const Mutex *mutex)
Definition: threads.h:91
MutexPtr(const MutexPtr &src)
Definition: threads.h:92
void clear()
Definition: threads.h:97
void Unlock()
Definition: threads.h:100
posix pthread mutex
Definition: threads.h:61
bool TryLock()
Definition: threads.cxx:57
bool IsLocked()
Definition: threads.cxx:63
Mutex(bool recursive=false)
Definition: threads.cxx:44
void Lock()
Definition: threads.h:71
pthread_mutex_t fMutex
Definition: threads.h:67
void Unlock()
Definition: threads.h:72
class represents posix pthread functionality
Definition: threads.h:340
static Thread_t Self()
Definition: threads.h:401
static bool SetDfltAffinity(const char *aff=nullptr)
Sets default affinity for next threads to be created and for main process.
Definition: threads.cxx:183
cpu_set_t fCpuSet
affinity property of the thread
Definition: threads.h:343
pthread_t fThrd
pthread handle
Definition: threads.h:342
void Start(Runnable *run)
Start thread with provided runnable.
Definition: threads.cxx:358
bool SetAffinity(const char *aff)
Sets affinity mask for the thread.
Definition: threads.cxx:286
void UseCurrentAsSelf()
Definition: threads.h:347
void Join()
Join thread - method waits until thread execution is completed.
Definition: threads.cxx:389
bool GetAffinity(bool actual, char *buf, unsigned maxbuf)
Provides thread affinity in form of "xxxooooo".
Definition: threads.cxx:451
void Kill(int sig=9)
Kill thread with specified signal.
Definition: threads.cxx:407
void *() StartRoutine(void *)
Definition: threads.h:351
virtual ~PosixThread()
Definition: threads.cxx:282
static cpu_set_t fDfltSet
default affinity for new thread
Definition: threads.h:344
bool IsItself() const
Returns true if called from thread context.
Definition: threads.h:404
void Cancel()
Try to cancel thread execution.
Definition: threads.cxx:412
static bool GetDfltAffinity(char *buf, unsigned maxbuf)
Returns default affinity mask in form "xxxooosss".
Definition: threads.cxx:424
static cpu_set_t fSpecialSet
set of processors, which can be used for special threads
Definition: threads.h:345
void SetPriority(int prio)
Change thread priority.
Definition: threads.cxx:395
void SetThreadName(const char *thrdname)
Set thread name, which can be seen from htop.
Definition: threads.cxx:417
Thread_t Id() const
Definition: threads.h:398
Object which could be run inside the dabc::PosixThread
Definition: threads.h:320
virtual void * MainLoop()=0
virtual void RunnableCancelled()
Definition: threads.h:326
virtual ~Runnable()
Definition: threads.cxx:148
Represent thread functionality.
Definition: Thread.h:109
Unlock guard for posix mutex.
Definition: threads.h:215
UnlockGuard(const Mutex *mutex)
Definition: threads.h:219
pthread_mutex_t * fMutex
Definition: threads.h:217
Event manipulation API.
Definition: api.h:23
pthread_t Thread_t
Definition: threads.h:333